Spring Kafka - access offsetsForTimes to start consuming from specific offset
up vote
0
down vote
favorite
I have a fairly straightforward Kafka consumer:
MessageListener<String, T> messageListener = record -> {
doStuff( record.value()));
};
startConsumer(messageListener);
protected void startConsumer(MessageListener<String, T> messageListener) {
ConcurrentMessageListenerContainer<String, T> container = new ConcurrentMessageListenerContainer<>(
consumerFactory(this.brokerAddress, this.groupId),
containerProperties(this.topic, messageListener));
container.start();
}
I can consume messages without any issue.
Now, I have the requirement to seek
from a specific offset based on the result of a call to offsetsForTimes
on the Kafka Consumer.
I understand that I can seek
to a certain position using the ConsumerSeekAware
interface:
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
assignments.forEach((t, o) -> callback.seek(t.topic(), t.partition(), ?????));
}
The problem now, is that I do not have access to the Kafka Consumer inside the callback, therefore I have no way to call offsetsForTimes
.
Is there any other way to achieve this?
apache-kafka spring-kafka
add a comment |
up vote
0
down vote
favorite
I have a fairly straightforward Kafka consumer:
MessageListener<String, T> messageListener = record -> {
doStuff( record.value()));
};
startConsumer(messageListener);
protected void startConsumer(MessageListener<String, T> messageListener) {
ConcurrentMessageListenerContainer<String, T> container = new ConcurrentMessageListenerContainer<>(
consumerFactory(this.brokerAddress, this.groupId),
containerProperties(this.topic, messageListener));
container.start();
}
I can consume messages without any issue.
Now, I have the requirement to seek
from a specific offset based on the result of a call to offsetsForTimes
on the Kafka Consumer.
I understand that I can seek
to a certain position using the ConsumerSeekAware
interface:
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
assignments.forEach((t, o) -> callback.seek(t.topic(), t.partition(), ?????));
}
The problem now, is that I do not have access to the Kafka Consumer inside the callback, therefore I have no way to call offsetsForTimes
.
Is there any other way to achieve this?
apache-kafka spring-kafka
add a comment |
up vote
0
down vote
favorite
up vote
0
down vote
favorite
I have a fairly straightforward Kafka consumer:
MessageListener<String, T> messageListener = record -> {
doStuff( record.value()));
};
startConsumer(messageListener);
protected void startConsumer(MessageListener<String, T> messageListener) {
ConcurrentMessageListenerContainer<String, T> container = new ConcurrentMessageListenerContainer<>(
consumerFactory(this.brokerAddress, this.groupId),
containerProperties(this.topic, messageListener));
container.start();
}
I can consume messages without any issue.
Now, I have the requirement to seek
from a specific offset based on the result of a call to offsetsForTimes
on the Kafka Consumer.
I understand that I can seek
to a certain position using the ConsumerSeekAware
interface:
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
assignments.forEach((t, o) -> callback.seek(t.topic(), t.partition(), ?????));
}
The problem now, is that I do not have access to the Kafka Consumer inside the callback, therefore I have no way to call offsetsForTimes
.
Is there any other way to achieve this?
apache-kafka spring-kafka
I have a fairly straightforward Kafka consumer:
MessageListener<String, T> messageListener = record -> {
doStuff( record.value()));
};
startConsumer(messageListener);
protected void startConsumer(MessageListener<String, T> messageListener) {
ConcurrentMessageListenerContainer<String, T> container = new ConcurrentMessageListenerContainer<>(
consumerFactory(this.brokerAddress, this.groupId),
containerProperties(this.topic, messageListener));
container.start();
}
I can consume messages without any issue.
Now, I have the requirement to seek
from a specific offset based on the result of a call to offsetsForTimes
on the Kafka Consumer.
I understand that I can seek
to a certain position using the ConsumerSeekAware
interface:
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
assignments.forEach((t, o) -> callback.seek(t.topic(), t.partition(), ?????));
}
The problem now, is that I do not have access to the Kafka Consumer inside the callback, therefore I have no way to call offsetsForTimes
.
Is there any other way to achieve this?
apache-kafka spring-kafka
apache-kafka spring-kafka
asked Nov 7 at 15:23
Luciano
1921216
1921216
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
up vote
1
down vote
accepted
Use a ConsumerAwareRebalanceListener
to do the initial seeks (introduced in 2.0).
The current version is 2.2.0.
How to test a ConsumerAwareRebalanceListener?
I added a link to another answer.
– Gary Russell
Nov 7 at 16:36
add a comment |
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
1
down vote
accepted
Use a ConsumerAwareRebalanceListener
to do the initial seeks (introduced in 2.0).
The current version is 2.2.0.
How to test a ConsumerAwareRebalanceListener?
I added a link to another answer.
– Gary Russell
Nov 7 at 16:36
add a comment |
up vote
1
down vote
accepted
Use a ConsumerAwareRebalanceListener
to do the initial seeks (introduced in 2.0).
The current version is 2.2.0.
How to test a ConsumerAwareRebalanceListener?
I added a link to another answer.
– Gary Russell
Nov 7 at 16:36
add a comment |
up vote
1
down vote
accepted
up vote
1
down vote
accepted
Use a ConsumerAwareRebalanceListener
to do the initial seeks (introduced in 2.0).
The current version is 2.2.0.
How to test a ConsumerAwareRebalanceListener?
Use a ConsumerAwareRebalanceListener
to do the initial seeks (introduced in 2.0).
The current version is 2.2.0.
How to test a ConsumerAwareRebalanceListener?
answered Nov 7 at 16:33
Gary Russell
76.7k64166
76.7k64166
I added a link to another answer.
– Gary Russell
Nov 7 at 16:36
add a comment |
I added a link to another answer.
– Gary Russell
Nov 7 at 16:36
I added a link to another answer.
– Gary Russell
Nov 7 at 16:36
I added a link to another answer.
– Gary Russell
Nov 7 at 16:36
add a comment |
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53192483%2fspring-kafka-access-offsetsfortimes-to-start-consuming-from-specific-offset%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown