Spring Kafka - access offsetsForTimes to start consuming from specific offset











up vote
0
down vote

favorite












I have a fairly straightforward Kafka consumer:



MessageListener<String, T> messageListener = record -> {

doStuff( record.value()));
};

startConsumer(messageListener);


protected void startConsumer(MessageListener<String, T> messageListener) {
ConcurrentMessageListenerContainer<String, T> container = new ConcurrentMessageListenerContainer<>(
consumerFactory(this.brokerAddress, this.groupId),
containerProperties(this.topic, messageListener));

container.start();
}


I can consume messages without any issue.
Now, I have the requirement to seek from a specific offset based on the result of a call to offsetsForTimes on the Kafka Consumer.



I understand that I can seek to a certain position using the ConsumerSeekAware interface:



@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {

assignments.forEach((t, o) -> callback.seek(t.topic(), t.partition(), ?????));
}


The problem now, is that I do not have access to the Kafka Consumer inside the callback, therefore I have no way to call offsetsForTimes.



Is there any other way to achieve this?










share|improve this question


























    up vote
    0
    down vote

    favorite












    I have a fairly straightforward Kafka consumer:



    MessageListener<String, T> messageListener = record -> {

    doStuff( record.value()));
    };

    startConsumer(messageListener);


    protected void startConsumer(MessageListener<String, T> messageListener) {
    ConcurrentMessageListenerContainer<String, T> container = new ConcurrentMessageListenerContainer<>(
    consumerFactory(this.brokerAddress, this.groupId),
    containerProperties(this.topic, messageListener));

    container.start();
    }


    I can consume messages without any issue.
    Now, I have the requirement to seek from a specific offset based on the result of a call to offsetsForTimes on the Kafka Consumer.



    I understand that I can seek to a certain position using the ConsumerSeekAware interface:



    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
    ConsumerSeekCallback callback) {

    assignments.forEach((t, o) -> callback.seek(t.topic(), t.partition(), ?????));
    }


    The problem now, is that I do not have access to the Kafka Consumer inside the callback, therefore I have no way to call offsetsForTimes.



    Is there any other way to achieve this?










    share|improve this question
























      up vote
      0
      down vote

      favorite









      up vote
      0
      down vote

      favorite











      I have a fairly straightforward Kafka consumer:



      MessageListener<String, T> messageListener = record -> {

      doStuff( record.value()));
      };

      startConsumer(messageListener);


      protected void startConsumer(MessageListener<String, T> messageListener) {
      ConcurrentMessageListenerContainer<String, T> container = new ConcurrentMessageListenerContainer<>(
      consumerFactory(this.brokerAddress, this.groupId),
      containerProperties(this.topic, messageListener));

      container.start();
      }


      I can consume messages without any issue.
      Now, I have the requirement to seek from a specific offset based on the result of a call to offsetsForTimes on the Kafka Consumer.



      I understand that I can seek to a certain position using the ConsumerSeekAware interface:



      @Override
      public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
      ConsumerSeekCallback callback) {

      assignments.forEach((t, o) -> callback.seek(t.topic(), t.partition(), ?????));
      }


      The problem now, is that I do not have access to the Kafka Consumer inside the callback, therefore I have no way to call offsetsForTimes.



      Is there any other way to achieve this?










      share|improve this question













      I have a fairly straightforward Kafka consumer:



      MessageListener<String, T> messageListener = record -> {

      doStuff( record.value()));
      };

      startConsumer(messageListener);


      protected void startConsumer(MessageListener<String, T> messageListener) {
      ConcurrentMessageListenerContainer<String, T> container = new ConcurrentMessageListenerContainer<>(
      consumerFactory(this.brokerAddress, this.groupId),
      containerProperties(this.topic, messageListener));

      container.start();
      }


      I can consume messages without any issue.
      Now, I have the requirement to seek from a specific offset based on the result of a call to offsetsForTimes on the Kafka Consumer.



      I understand that I can seek to a certain position using the ConsumerSeekAware interface:



      @Override
      public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
      ConsumerSeekCallback callback) {

      assignments.forEach((t, o) -> callback.seek(t.topic(), t.partition(), ?????));
      }


      The problem now, is that I do not have access to the Kafka Consumer inside the callback, therefore I have no way to call offsetsForTimes.



      Is there any other way to achieve this?







      apache-kafka spring-kafka






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 7 at 15:23









      Luciano

      1921216




      1921216
























          1 Answer
          1






          active

          oldest

          votes

















          up vote
          1
          down vote



          accepted










          Use a ConsumerAwareRebalanceListener to do the initial seeks (introduced in 2.0).



          The current version is 2.2.0.



          How to test a ConsumerAwareRebalanceListener?






          share|improve this answer





















          • I added a link to another answer.
            – Gary Russell
            Nov 7 at 16:36











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














           

          draft saved


          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53192483%2fspring-kafka-access-offsetsfortimes-to-start-consuming-from-specific-offset%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes








          up vote
          1
          down vote



          accepted










          Use a ConsumerAwareRebalanceListener to do the initial seeks (introduced in 2.0).



          The current version is 2.2.0.



          How to test a ConsumerAwareRebalanceListener?






          share|improve this answer





















          • I added a link to another answer.
            – Gary Russell
            Nov 7 at 16:36















          up vote
          1
          down vote



          accepted










          Use a ConsumerAwareRebalanceListener to do the initial seeks (introduced in 2.0).



          The current version is 2.2.0.



          How to test a ConsumerAwareRebalanceListener?






          share|improve this answer





















          • I added a link to another answer.
            – Gary Russell
            Nov 7 at 16:36













          up vote
          1
          down vote



          accepted







          up vote
          1
          down vote



          accepted






          Use a ConsumerAwareRebalanceListener to do the initial seeks (introduced in 2.0).



          The current version is 2.2.0.



          How to test a ConsumerAwareRebalanceListener?






          share|improve this answer












          Use a ConsumerAwareRebalanceListener to do the initial seeks (introduced in 2.0).



          The current version is 2.2.0.



          How to test a ConsumerAwareRebalanceListener?







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 7 at 16:33









          Gary Russell

          76.7k64166




          76.7k64166












          • I added a link to another answer.
            – Gary Russell
            Nov 7 at 16:36


















          • I added a link to another answer.
            – Gary Russell
            Nov 7 at 16:36
















          I added a link to another answer.
          – Gary Russell
          Nov 7 at 16:36




          I added a link to another answer.
          – Gary Russell
          Nov 7 at 16:36


















           

          draft saved


          draft discarded



















































           


          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53192483%2fspring-kafka-access-offsetsfortimes-to-start-consuming-from-specific-offset%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          這個網誌中的熱門文章

          Tangent Lines Diagram Along Smooth Curve

          Yusuf al-Mu'taman ibn Hud

          Zucchini