Apache Kafka - recurring reblancing with more consumers





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ height:90px;width:728px;box-sizing:border-box;
}







0















Context



We are using Kafka to process large messages, very occasionally up to 10MB but mostly in the 500 KB range. The processing of a message can take up to around 30 seconds, but sometimes a minute.



The Issue



Processing the data with a lower amount of consumers (up to around 50) causes no recurring re-balancing by the broker and the processing works fine. Any re-balancing at this scale is also rather fast, mostly under a minute as per the broker logs.



Once the consumers are scaled to 100 or 200, the consumers constantly re-balance, with intervals up to around 5 minutes. This results into 5 minutes working/consuming followed by 5 minutes re-balancing and then the same again. The consumer services don't fail, just re-balance for no real apparent reason. This leads to reduced throughput when scaling consumers up.



When scaled to 2oo consumers, the processing performs at an average rate of 2 messages per minute per consumer. The processing speed for a single consumer when it is not re-balancing is around 6 messages per minute.



I don't suspect the network of the data centers to be an issue as we have some consumers performing a different kind of processing on the messages and they have no issues passing 100's to 1000's of messages per minute.



Did someone else experience this pattern and find a simple solution, e.g. changing a particular configuration parameter?



Additional Information



The Kafka brokers are version 2.0, and there are 10 of them across different data centers. Replication is set to 3. Partitions for this topic are 500. An excerpt of the specific broker configuration to suit the case of processing large messages better:




  • compression.type=lz4

  • message.max.bytes=10000000 # 10 MB

  • replica.fetch.max.bytes=10000000 # 10 MB

  • group.max.session.timeout.ms=1320000 # 22 min

  • offset.retention.minutes=10080 # 7 days


On the consumer side we use the java client with a re-balance listener that clears up any buffered messages from the revoked partitions. This buffer is 10 messages large. The consumer clients run client API version 2.1, the update of the java client from 2.0 to 2.1 seems to significantly reduce broker logs of the following kind on these larger consumer numbers (we got these for almost every client and every re-balance before):



INFO [GroupCoordinator 2]: Member svc002 in group my_group has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)


The consumers are in a different data center than the brokers. The commit of the offsets is performed async. Recurrent polling is performed in a thread that fills the buffer with a timeout of 15 seconds; once the buffer is full the thread is sleeping a few seconds and polls only when the buffer has free space. An excerpt of the configuration for the use case of larger messages:




  • max.partition.fetch.bytes.config=200000000 # 200 MB

  • max.poll.records.config=2

  • session.timeout.ms.config=1200000 # 20 min


Log File



The following is an extract of the broker log file that manages this particular group over a 30 minute time-frame. Naming reduced to my_group and mytopic. There are also a few entries from an unrelated topic.



19:47:36,786] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:47:36,810] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3213 (kafka.coordinator.group.GroupCoordinator)
19:47:51,788] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:48:46,851] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:48:46,902] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3214 (kafka.coordinator.group.GroupCoordinator)
19:50:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
19:54:29,365] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:57:22,161] INFO [ProducerStateManager partition=unrelated_topic-329] Writing producer snapshot at offset 88002 (kafka.log.ProducerStateManager)
19:57:22,162] INFO [Log partition=unrelated_topic-329, dir=/kafkalog] Rolled new log segment at offset 88002 in 11 ms. (kafka.log.Log)
19:59:14,022] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:59:14,061] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3215 (kafka.coordinator.group.GroupCoordinator)
20:00:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
20:02:57,821] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:06:51,360] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:06:51,391] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3216 (kafka.coordinator.group.GroupCoordinator)
20:10:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
20:10:46,863] INFO [ReplicaFetcher replicaId=2, leaderId=8, fetcherId=0] Node 8 was unable to process the fetch request with (sessionId=928976035, epoch=216971): FETCH_SESSION_ID_NOT_FOUND. (org.apache.kafka.clients.FetchSessionHandler)
20:11:19,236] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:13:54,851] INFO [ProducerStateManager partition=mytopic-321] Writing producer snapshot at offset 123640 (kafka.log.ProducerStateManager)
20:13:54,851] INFO [Log partition=mytopic-321, dir=/kafkalog] Rolled new log segment at offset 123640 in 14 ms. (kafka.log.Log)
20:14:30,686] INFO [ProducerStateManager partition=mytopic-251] Writing producer snapshot at offset 133509 (kafka.log.ProducerStateManager)
20:14:30,686] INFO [Log partition=mytopic-251, dir=/kafkalog] Rolled new log segment at offset 133509 in 1 ms. (kafka.log.Log)
20:16:01,892] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3217 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:16:01,938] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3217 (kafka.coordinator.group.GroupCoordinator)


Many thanks for any help with this issue.










share|improve this question





























    0















    Context



    We are using Kafka to process large messages, very occasionally up to 10MB but mostly in the 500 KB range. The processing of a message can take up to around 30 seconds, but sometimes a minute.



    The Issue



    Processing the data with a lower amount of consumers (up to around 50) causes no recurring re-balancing by the broker and the processing works fine. Any re-balancing at this scale is also rather fast, mostly under a minute as per the broker logs.



    Once the consumers are scaled to 100 or 200, the consumers constantly re-balance, with intervals up to around 5 minutes. This results into 5 minutes working/consuming followed by 5 minutes re-balancing and then the same again. The consumer services don't fail, just re-balance for no real apparent reason. This leads to reduced throughput when scaling consumers up.



    When scaled to 2oo consumers, the processing performs at an average rate of 2 messages per minute per consumer. The processing speed for a single consumer when it is not re-balancing is around 6 messages per minute.



    I don't suspect the network of the data centers to be an issue as we have some consumers performing a different kind of processing on the messages and they have no issues passing 100's to 1000's of messages per minute.



    Did someone else experience this pattern and find a simple solution, e.g. changing a particular configuration parameter?



    Additional Information



    The Kafka brokers are version 2.0, and there are 10 of them across different data centers. Replication is set to 3. Partitions for this topic are 500. An excerpt of the specific broker configuration to suit the case of processing large messages better:




    • compression.type=lz4

    • message.max.bytes=10000000 # 10 MB

    • replica.fetch.max.bytes=10000000 # 10 MB

    • group.max.session.timeout.ms=1320000 # 22 min

    • offset.retention.minutes=10080 # 7 days


    On the consumer side we use the java client with a re-balance listener that clears up any buffered messages from the revoked partitions. This buffer is 10 messages large. The consumer clients run client API version 2.1, the update of the java client from 2.0 to 2.1 seems to significantly reduce broker logs of the following kind on these larger consumer numbers (we got these for almost every client and every re-balance before):



    INFO [GroupCoordinator 2]: Member svc002 in group my_group has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)


    The consumers are in a different data center than the brokers. The commit of the offsets is performed async. Recurrent polling is performed in a thread that fills the buffer with a timeout of 15 seconds; once the buffer is full the thread is sleeping a few seconds and polls only when the buffer has free space. An excerpt of the configuration for the use case of larger messages:




    • max.partition.fetch.bytes.config=200000000 # 200 MB

    • max.poll.records.config=2

    • session.timeout.ms.config=1200000 # 20 min


    Log File



    The following is an extract of the broker log file that manages this particular group over a 30 minute time-frame. Naming reduced to my_group and mytopic. There are also a few entries from an unrelated topic.



    19:47:36,786] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
    19:47:36,810] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3213 (kafka.coordinator.group.GroupCoordinator)
    19:47:51,788] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
    19:48:46,851] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
    19:48:46,902] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3214 (kafka.coordinator.group.GroupCoordinator)
    19:50:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
    19:54:29,365] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
    19:57:22,161] INFO [ProducerStateManager partition=unrelated_topic-329] Writing producer snapshot at offset 88002 (kafka.log.ProducerStateManager)
    19:57:22,162] INFO [Log partition=unrelated_topic-329, dir=/kafkalog] Rolled new log segment at offset 88002 in 11 ms. (kafka.log.Log)
    19:59:14,022] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
    19:59:14,061] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3215 (kafka.coordinator.group.GroupCoordinator)
    20:00:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
    20:02:57,821] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
    20:06:51,360] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
    20:06:51,391] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3216 (kafka.coordinator.group.GroupCoordinator)
    20:10:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
    20:10:46,863] INFO [ReplicaFetcher replicaId=2, leaderId=8, fetcherId=0] Node 8 was unable to process the fetch request with (sessionId=928976035, epoch=216971): FETCH_SESSION_ID_NOT_FOUND. (org.apache.kafka.clients.FetchSessionHandler)
    20:11:19,236] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
    20:13:54,851] INFO [ProducerStateManager partition=mytopic-321] Writing producer snapshot at offset 123640 (kafka.log.ProducerStateManager)
    20:13:54,851] INFO [Log partition=mytopic-321, dir=/kafkalog] Rolled new log segment at offset 123640 in 14 ms. (kafka.log.Log)
    20:14:30,686] INFO [ProducerStateManager partition=mytopic-251] Writing producer snapshot at offset 133509 (kafka.log.ProducerStateManager)
    20:14:30,686] INFO [Log partition=mytopic-251, dir=/kafkalog] Rolled new log segment at offset 133509 in 1 ms. (kafka.log.Log)
    20:16:01,892] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3217 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
    20:16:01,938] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3217 (kafka.coordinator.group.GroupCoordinator)


    Many thanks for any help with this issue.










    share|improve this question

























      0












      0








      0


      1






      Context



      We are using Kafka to process large messages, very occasionally up to 10MB but mostly in the 500 KB range. The processing of a message can take up to around 30 seconds, but sometimes a minute.



      The Issue



      Processing the data with a lower amount of consumers (up to around 50) causes no recurring re-balancing by the broker and the processing works fine. Any re-balancing at this scale is also rather fast, mostly under a minute as per the broker logs.



      Once the consumers are scaled to 100 or 200, the consumers constantly re-balance, with intervals up to around 5 minutes. This results into 5 minutes working/consuming followed by 5 minutes re-balancing and then the same again. The consumer services don't fail, just re-balance for no real apparent reason. This leads to reduced throughput when scaling consumers up.



      When scaled to 2oo consumers, the processing performs at an average rate of 2 messages per minute per consumer. The processing speed for a single consumer when it is not re-balancing is around 6 messages per minute.



      I don't suspect the network of the data centers to be an issue as we have some consumers performing a different kind of processing on the messages and they have no issues passing 100's to 1000's of messages per minute.



      Did someone else experience this pattern and find a simple solution, e.g. changing a particular configuration parameter?



      Additional Information



      The Kafka brokers are version 2.0, and there are 10 of them across different data centers. Replication is set to 3. Partitions for this topic are 500. An excerpt of the specific broker configuration to suit the case of processing large messages better:




      • compression.type=lz4

      • message.max.bytes=10000000 # 10 MB

      • replica.fetch.max.bytes=10000000 # 10 MB

      • group.max.session.timeout.ms=1320000 # 22 min

      • offset.retention.minutes=10080 # 7 days


      On the consumer side we use the java client with a re-balance listener that clears up any buffered messages from the revoked partitions. This buffer is 10 messages large. The consumer clients run client API version 2.1, the update of the java client from 2.0 to 2.1 seems to significantly reduce broker logs of the following kind on these larger consumer numbers (we got these for almost every client and every re-balance before):



      INFO [GroupCoordinator 2]: Member svc002 in group my_group has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)


      The consumers are in a different data center than the brokers. The commit of the offsets is performed async. Recurrent polling is performed in a thread that fills the buffer with a timeout of 15 seconds; once the buffer is full the thread is sleeping a few seconds and polls only when the buffer has free space. An excerpt of the configuration for the use case of larger messages:




      • max.partition.fetch.bytes.config=200000000 # 200 MB

      • max.poll.records.config=2

      • session.timeout.ms.config=1200000 # 20 min


      Log File



      The following is an extract of the broker log file that manages this particular group over a 30 minute time-frame. Naming reduced to my_group and mytopic. There are also a few entries from an unrelated topic.



      19:47:36,786] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      19:47:36,810] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3213 (kafka.coordinator.group.GroupCoordinator)
      19:47:51,788] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      19:48:46,851] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      19:48:46,902] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3214 (kafka.coordinator.group.GroupCoordinator)
      19:50:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
      19:54:29,365] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      19:57:22,161] INFO [ProducerStateManager partition=unrelated_topic-329] Writing producer snapshot at offset 88002 (kafka.log.ProducerStateManager)
      19:57:22,162] INFO [Log partition=unrelated_topic-329, dir=/kafkalog] Rolled new log segment at offset 88002 in 11 ms. (kafka.log.Log)
      19:59:14,022] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      19:59:14,061] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3215 (kafka.coordinator.group.GroupCoordinator)
      20:00:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
      20:02:57,821] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      20:06:51,360] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      20:06:51,391] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3216 (kafka.coordinator.group.GroupCoordinator)
      20:10:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
      20:10:46,863] INFO [ReplicaFetcher replicaId=2, leaderId=8, fetcherId=0] Node 8 was unable to process the fetch request with (sessionId=928976035, epoch=216971): FETCH_SESSION_ID_NOT_FOUND. (org.apache.kafka.clients.FetchSessionHandler)
      20:11:19,236] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      20:13:54,851] INFO [ProducerStateManager partition=mytopic-321] Writing producer snapshot at offset 123640 (kafka.log.ProducerStateManager)
      20:13:54,851] INFO [Log partition=mytopic-321, dir=/kafkalog] Rolled new log segment at offset 123640 in 14 ms. (kafka.log.Log)
      20:14:30,686] INFO [ProducerStateManager partition=mytopic-251] Writing producer snapshot at offset 133509 (kafka.log.ProducerStateManager)
      20:14:30,686] INFO [Log partition=mytopic-251, dir=/kafkalog] Rolled new log segment at offset 133509 in 1 ms. (kafka.log.Log)
      20:16:01,892] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3217 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      20:16:01,938] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3217 (kafka.coordinator.group.GroupCoordinator)


      Many thanks for any help with this issue.










      share|improve this question














      Context



      We are using Kafka to process large messages, very occasionally up to 10MB but mostly in the 500 KB range. The processing of a message can take up to around 30 seconds, but sometimes a minute.



      The Issue



      Processing the data with a lower amount of consumers (up to around 50) causes no recurring re-balancing by the broker and the processing works fine. Any re-balancing at this scale is also rather fast, mostly under a minute as per the broker logs.



      Once the consumers are scaled to 100 or 200, the consumers constantly re-balance, with intervals up to around 5 minutes. This results into 5 minutes working/consuming followed by 5 minutes re-balancing and then the same again. The consumer services don't fail, just re-balance for no real apparent reason. This leads to reduced throughput when scaling consumers up.



      When scaled to 2oo consumers, the processing performs at an average rate of 2 messages per minute per consumer. The processing speed for a single consumer when it is not re-balancing is around 6 messages per minute.



      I don't suspect the network of the data centers to be an issue as we have some consumers performing a different kind of processing on the messages and they have no issues passing 100's to 1000's of messages per minute.



      Did someone else experience this pattern and find a simple solution, e.g. changing a particular configuration parameter?



      Additional Information



      The Kafka brokers are version 2.0, and there are 10 of them across different data centers. Replication is set to 3. Partitions for this topic are 500. An excerpt of the specific broker configuration to suit the case of processing large messages better:




      • compression.type=lz4

      • message.max.bytes=10000000 # 10 MB

      • replica.fetch.max.bytes=10000000 # 10 MB

      • group.max.session.timeout.ms=1320000 # 22 min

      • offset.retention.minutes=10080 # 7 days


      On the consumer side we use the java client with a re-balance listener that clears up any buffered messages from the revoked partitions. This buffer is 10 messages large. The consumer clients run client API version 2.1, the update of the java client from 2.0 to 2.1 seems to significantly reduce broker logs of the following kind on these larger consumer numbers (we got these for almost every client and every re-balance before):



      INFO [GroupCoordinator 2]: Member svc002 in group my_group has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)


      The consumers are in a different data center than the brokers. The commit of the offsets is performed async. Recurrent polling is performed in a thread that fills the buffer with a timeout of 15 seconds; once the buffer is full the thread is sleeping a few seconds and polls only when the buffer has free space. An excerpt of the configuration for the use case of larger messages:




      • max.partition.fetch.bytes.config=200000000 # 200 MB

      • max.poll.records.config=2

      • session.timeout.ms.config=1200000 # 20 min


      Log File



      The following is an extract of the broker log file that manages this particular group over a 30 minute time-frame. Naming reduced to my_group and mytopic. There are also a few entries from an unrelated topic.



      19:47:36,786] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      19:47:36,810] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3213 (kafka.coordinator.group.GroupCoordinator)
      19:47:51,788] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      19:48:46,851] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      19:48:46,902] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3214 (kafka.coordinator.group.GroupCoordinator)
      19:50:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
      19:54:29,365] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      19:57:22,161] INFO [ProducerStateManager partition=unrelated_topic-329] Writing producer snapshot at offset 88002 (kafka.log.ProducerStateManager)
      19:57:22,162] INFO [Log partition=unrelated_topic-329, dir=/kafkalog] Rolled new log segment at offset 88002 in 11 ms. (kafka.log.Log)
      19:59:14,022] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      19:59:14,061] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3215 (kafka.coordinator.group.GroupCoordinator)
      20:00:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
      20:02:57,821] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      20:06:51,360] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      20:06:51,391] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3216 (kafka.coordinator.group.GroupCoordinator)
      20:10:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
      20:10:46,863] INFO [ReplicaFetcher replicaId=2, leaderId=8, fetcherId=0] Node 8 was unable to process the fetch request with (sessionId=928976035, epoch=216971): FETCH_SESSION_ID_NOT_FOUND. (org.apache.kafka.clients.FetchSessionHandler)
      20:11:19,236] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      20:13:54,851] INFO [ProducerStateManager partition=mytopic-321] Writing producer snapshot at offset 123640 (kafka.log.ProducerStateManager)
      20:13:54,851] INFO [Log partition=mytopic-321, dir=/kafkalog] Rolled new log segment at offset 123640 in 14 ms. (kafka.log.Log)
      20:14:30,686] INFO [ProducerStateManager partition=mytopic-251] Writing producer snapshot at offset 133509 (kafka.log.ProducerStateManager)
      20:14:30,686] INFO [Log partition=mytopic-251, dir=/kafkalog] Rolled new log segment at offset 133509 in 1 ms. (kafka.log.Log)
      20:16:01,892] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3217 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      20:16:01,938] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3217 (kafka.coordinator.group.GroupCoordinator)


      Many thanks for any help with this issue.







      java apache-kafka kafka-consumer-api






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 24 '18 at 20:44









      calloc_orgcalloc_org

      14519




      14519
























          1 Answer
          1






          active

          oldest

          votes


















          0














          After some further engineering and fine-tuning, we managed to get the issue under control.



          First, it seems that some services still processed above the limit and this caused them to fail very rarely. The following leaving caused a re-balance, followed by the joining after around 6-7 minutes, which caused a re-balance too. We reduced this further by optimizing our services in terms of throughput.



          A second factor was the underlying docker network that we use to scale the services up. By default the heartbeat interval is very short (5 seconds), so that any hard work and network load on the consumer node may remove it from the docker swarm for a very brief interval. This interruption is answered by docker with moving these services to other nodes (re-balancing), followed by re-balancing when the node comes back online. As the services have long startup times of 5-7 minutes, this leads to a few times of re-balancing on each of these events.



          A third factor were errors in the consuming services that caused one of them to crash occasionally, say 1% per hour. This causes again two re-balances, one leaving, one joining.



          Collectively, these issues combined led to the observed issue. The latest Kafka version also seems to output more information on why a service is leaving the consumer group. It would be nice if Kafka would continue serving data to consumers that are still stable, I may add a feature request around this. Nevertheless, we have it running stable now with decent performance.






          share|improve this answer
























            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














            draft saved

            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53462206%2fapache-kafka-recurring-reblancing-with-more-consumers%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            0














            After some further engineering and fine-tuning, we managed to get the issue under control.



            First, it seems that some services still processed above the limit and this caused them to fail very rarely. The following leaving caused a re-balance, followed by the joining after around 6-7 minutes, which caused a re-balance too. We reduced this further by optimizing our services in terms of throughput.



            A second factor was the underlying docker network that we use to scale the services up. By default the heartbeat interval is very short (5 seconds), so that any hard work and network load on the consumer node may remove it from the docker swarm for a very brief interval. This interruption is answered by docker with moving these services to other nodes (re-balancing), followed by re-balancing when the node comes back online. As the services have long startup times of 5-7 minutes, this leads to a few times of re-balancing on each of these events.



            A third factor were errors in the consuming services that caused one of them to crash occasionally, say 1% per hour. This causes again two re-balances, one leaving, one joining.



            Collectively, these issues combined led to the observed issue. The latest Kafka version also seems to output more information on why a service is leaving the consumer group. It would be nice if Kafka would continue serving data to consumers that are still stable, I may add a feature request around this. Nevertheless, we have it running stable now with decent performance.






            share|improve this answer




























              0














              After some further engineering and fine-tuning, we managed to get the issue under control.



              First, it seems that some services still processed above the limit and this caused them to fail very rarely. The following leaving caused a re-balance, followed by the joining after around 6-7 minutes, which caused a re-balance too. We reduced this further by optimizing our services in terms of throughput.



              A second factor was the underlying docker network that we use to scale the services up. By default the heartbeat interval is very short (5 seconds), so that any hard work and network load on the consumer node may remove it from the docker swarm for a very brief interval. This interruption is answered by docker with moving these services to other nodes (re-balancing), followed by re-balancing when the node comes back online. As the services have long startup times of 5-7 minutes, this leads to a few times of re-balancing on each of these events.



              A third factor were errors in the consuming services that caused one of them to crash occasionally, say 1% per hour. This causes again two re-balances, one leaving, one joining.



              Collectively, these issues combined led to the observed issue. The latest Kafka version also seems to output more information on why a service is leaving the consumer group. It would be nice if Kafka would continue serving data to consumers that are still stable, I may add a feature request around this. Nevertheless, we have it running stable now with decent performance.






              share|improve this answer


























                0












                0








                0







                After some further engineering and fine-tuning, we managed to get the issue under control.



                First, it seems that some services still processed above the limit and this caused them to fail very rarely. The following leaving caused a re-balance, followed by the joining after around 6-7 minutes, which caused a re-balance too. We reduced this further by optimizing our services in terms of throughput.



                A second factor was the underlying docker network that we use to scale the services up. By default the heartbeat interval is very short (5 seconds), so that any hard work and network load on the consumer node may remove it from the docker swarm for a very brief interval. This interruption is answered by docker with moving these services to other nodes (re-balancing), followed by re-balancing when the node comes back online. As the services have long startup times of 5-7 minutes, this leads to a few times of re-balancing on each of these events.



                A third factor were errors in the consuming services that caused one of them to crash occasionally, say 1% per hour. This causes again two re-balances, one leaving, one joining.



                Collectively, these issues combined led to the observed issue. The latest Kafka version also seems to output more information on why a service is leaving the consumer group. It would be nice if Kafka would continue serving data to consumers that are still stable, I may add a feature request around this. Nevertheless, we have it running stable now with decent performance.






                share|improve this answer













                After some further engineering and fine-tuning, we managed to get the issue under control.



                First, it seems that some services still processed above the limit and this caused them to fail very rarely. The following leaving caused a re-balance, followed by the joining after around 6-7 minutes, which caused a re-balance too. We reduced this further by optimizing our services in terms of throughput.



                A second factor was the underlying docker network that we use to scale the services up. By default the heartbeat interval is very short (5 seconds), so that any hard work and network load on the consumer node may remove it from the docker swarm for a very brief interval. This interruption is answered by docker with moving these services to other nodes (re-balancing), followed by re-balancing when the node comes back online. As the services have long startup times of 5-7 minutes, this leads to a few times of re-balancing on each of these events.



                A third factor were errors in the consuming services that caused one of them to crash occasionally, say 1% per hour. This causes again two re-balances, one leaving, one joining.



                Collectively, these issues combined led to the observed issue. The latest Kafka version also seems to output more information on why a service is leaving the consumer group. It would be nice if Kafka would continue serving data to consumers that are still stable, I may add a feature request around this. Nevertheless, we have it running stable now with decent performance.







                share|improve this answer












                share|improve this answer



                share|improve this answer










                answered Jan 22 at 14:29









                calloc_orgcalloc_org

                14519




                14519
































                    draft saved

                    draft discarded




















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53462206%2fapache-kafka-recurring-reblancing-with-more-consumers%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    這個網誌中的熱門文章

                    Xamarin.form Move up view when keyboard appear

                    Post-Redirect-Get with Spring WebFlux and Thymeleaf

                    Anylogic : not able to use stopDelay()