Apache Kafka - recurring reblancing with more consumers
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ height:90px;width:728px;box-sizing:border-box;
}
Context
We are using Kafka to process large messages, very occasionally up to 10MB but mostly in the 500 KB range. The processing of a message can take up to around 30 seconds, but sometimes a minute.
The Issue
Processing the data with a lower amount of consumers (up to around 50) causes no recurring re-balancing by the broker and the processing works fine. Any re-balancing at this scale is also rather fast, mostly under a minute as per the broker logs.
Once the consumers are scaled to 100 or 200, the consumers constantly re-balance, with intervals up to around 5 minutes. This results into 5 minutes working/consuming followed by 5 minutes re-balancing and then the same again. The consumer services don't fail, just re-balance for no real apparent reason. This leads to reduced throughput when scaling consumers up.
When scaled to 2oo consumers, the processing performs at an average rate of 2 messages per minute per consumer. The processing speed for a single consumer when it is not re-balancing is around 6 messages per minute.
I don't suspect the network of the data centers to be an issue as we have some consumers performing a different kind of processing on the messages and they have no issues passing 100's to 1000's of messages per minute.
Did someone else experience this pattern and find a simple solution, e.g. changing a particular configuration parameter?
Additional Information
The Kafka brokers are version 2.0, and there are 10 of them across different data centers. Replication is set to 3. Partitions for this topic are 500. An excerpt of the specific broker configuration to suit the case of processing large messages better:
- compression.type=lz4
- message.max.bytes=10000000 # 10 MB
- replica.fetch.max.bytes=10000000 # 10 MB
- group.max.session.timeout.ms=1320000 # 22 min
- offset.retention.minutes=10080 # 7 days
On the consumer side we use the java client with a re-balance listener that clears up any buffered messages from the revoked partitions. This buffer is 10 messages large. The consumer clients run client API version 2.1, the update of the java client from 2.0 to 2.1 seems to significantly reduce broker logs of the following kind on these larger consumer numbers (we got these for almost every client and every re-balance before):
INFO [GroupCoordinator 2]: Member svc002 in group my_group has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
The consumers are in a different data center than the brokers. The commit of the offsets is performed async. Recurrent polling is performed in a thread that fills the buffer with a timeout of 15 seconds; once the buffer is full the thread is sleeping a few seconds and polls only when the buffer has free space. An excerpt of the configuration for the use case of larger messages:
- max.partition.fetch.bytes.config=200000000 # 200 MB
- max.poll.records.config=2
- session.timeout.ms.config=1200000 # 20 min
Log File
The following is an extract of the broker log file that manages this particular group over a 30 minute time-frame. Naming reduced to my_group and mytopic. There are also a few entries from an unrelated topic.
19:47:36,786] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:47:36,810] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3213 (kafka.coordinator.group.GroupCoordinator)
19:47:51,788] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:48:46,851] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:48:46,902] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3214 (kafka.coordinator.group.GroupCoordinator)
19:50:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
19:54:29,365] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:57:22,161] INFO [ProducerStateManager partition=unrelated_topic-329] Writing producer snapshot at offset 88002 (kafka.log.ProducerStateManager)
19:57:22,162] INFO [Log partition=unrelated_topic-329, dir=/kafkalog] Rolled new log segment at offset 88002 in 11 ms. (kafka.log.Log)
19:59:14,022] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:59:14,061] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3215 (kafka.coordinator.group.GroupCoordinator)
20:00:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
20:02:57,821] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:06:51,360] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:06:51,391] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3216 (kafka.coordinator.group.GroupCoordinator)
20:10:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
20:10:46,863] INFO [ReplicaFetcher replicaId=2, leaderId=8, fetcherId=0] Node 8 was unable to process the fetch request with (sessionId=928976035, epoch=216971): FETCH_SESSION_ID_NOT_FOUND. (org.apache.kafka.clients.FetchSessionHandler)
20:11:19,236] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:13:54,851] INFO [ProducerStateManager partition=mytopic-321] Writing producer snapshot at offset 123640 (kafka.log.ProducerStateManager)
20:13:54,851] INFO [Log partition=mytopic-321, dir=/kafkalog] Rolled new log segment at offset 123640 in 14 ms. (kafka.log.Log)
20:14:30,686] INFO [ProducerStateManager partition=mytopic-251] Writing producer snapshot at offset 133509 (kafka.log.ProducerStateManager)
20:14:30,686] INFO [Log partition=mytopic-251, dir=/kafkalog] Rolled new log segment at offset 133509 in 1 ms. (kafka.log.Log)
20:16:01,892] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3217 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:16:01,938] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3217 (kafka.coordinator.group.GroupCoordinator)
Many thanks for any help with this issue.
java apache-kafka kafka-consumer-api
add a comment |
Context
We are using Kafka to process large messages, very occasionally up to 10MB but mostly in the 500 KB range. The processing of a message can take up to around 30 seconds, but sometimes a minute.
The Issue
Processing the data with a lower amount of consumers (up to around 50) causes no recurring re-balancing by the broker and the processing works fine. Any re-balancing at this scale is also rather fast, mostly under a minute as per the broker logs.
Once the consumers are scaled to 100 or 200, the consumers constantly re-balance, with intervals up to around 5 minutes. This results into 5 minutes working/consuming followed by 5 minutes re-balancing and then the same again. The consumer services don't fail, just re-balance for no real apparent reason. This leads to reduced throughput when scaling consumers up.
When scaled to 2oo consumers, the processing performs at an average rate of 2 messages per minute per consumer. The processing speed for a single consumer when it is not re-balancing is around 6 messages per minute.
I don't suspect the network of the data centers to be an issue as we have some consumers performing a different kind of processing on the messages and they have no issues passing 100's to 1000's of messages per minute.
Did someone else experience this pattern and find a simple solution, e.g. changing a particular configuration parameter?
Additional Information
The Kafka brokers are version 2.0, and there are 10 of them across different data centers. Replication is set to 3. Partitions for this topic are 500. An excerpt of the specific broker configuration to suit the case of processing large messages better:
- compression.type=lz4
- message.max.bytes=10000000 # 10 MB
- replica.fetch.max.bytes=10000000 # 10 MB
- group.max.session.timeout.ms=1320000 # 22 min
- offset.retention.minutes=10080 # 7 days
On the consumer side we use the java client with a re-balance listener that clears up any buffered messages from the revoked partitions. This buffer is 10 messages large. The consumer clients run client API version 2.1, the update of the java client from 2.0 to 2.1 seems to significantly reduce broker logs of the following kind on these larger consumer numbers (we got these for almost every client and every re-balance before):
INFO [GroupCoordinator 2]: Member svc002 in group my_group has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
The consumers are in a different data center than the brokers. The commit of the offsets is performed async. Recurrent polling is performed in a thread that fills the buffer with a timeout of 15 seconds; once the buffer is full the thread is sleeping a few seconds and polls only when the buffer has free space. An excerpt of the configuration for the use case of larger messages:
- max.partition.fetch.bytes.config=200000000 # 200 MB
- max.poll.records.config=2
- session.timeout.ms.config=1200000 # 20 min
Log File
The following is an extract of the broker log file that manages this particular group over a 30 minute time-frame. Naming reduced to my_group and mytopic. There are also a few entries from an unrelated topic.
19:47:36,786] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:47:36,810] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3213 (kafka.coordinator.group.GroupCoordinator)
19:47:51,788] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:48:46,851] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:48:46,902] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3214 (kafka.coordinator.group.GroupCoordinator)
19:50:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
19:54:29,365] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:57:22,161] INFO [ProducerStateManager partition=unrelated_topic-329] Writing producer snapshot at offset 88002 (kafka.log.ProducerStateManager)
19:57:22,162] INFO [Log partition=unrelated_topic-329, dir=/kafkalog] Rolled new log segment at offset 88002 in 11 ms. (kafka.log.Log)
19:59:14,022] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:59:14,061] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3215 (kafka.coordinator.group.GroupCoordinator)
20:00:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
20:02:57,821] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:06:51,360] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:06:51,391] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3216 (kafka.coordinator.group.GroupCoordinator)
20:10:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
20:10:46,863] INFO [ReplicaFetcher replicaId=2, leaderId=8, fetcherId=0] Node 8 was unable to process the fetch request with (sessionId=928976035, epoch=216971): FETCH_SESSION_ID_NOT_FOUND. (org.apache.kafka.clients.FetchSessionHandler)
20:11:19,236] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:13:54,851] INFO [ProducerStateManager partition=mytopic-321] Writing producer snapshot at offset 123640 (kafka.log.ProducerStateManager)
20:13:54,851] INFO [Log partition=mytopic-321, dir=/kafkalog] Rolled new log segment at offset 123640 in 14 ms. (kafka.log.Log)
20:14:30,686] INFO [ProducerStateManager partition=mytopic-251] Writing producer snapshot at offset 133509 (kafka.log.ProducerStateManager)
20:14:30,686] INFO [Log partition=mytopic-251, dir=/kafkalog] Rolled new log segment at offset 133509 in 1 ms. (kafka.log.Log)
20:16:01,892] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3217 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:16:01,938] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3217 (kafka.coordinator.group.GroupCoordinator)
Many thanks for any help with this issue.
java apache-kafka kafka-consumer-api
add a comment |
Context
We are using Kafka to process large messages, very occasionally up to 10MB but mostly in the 500 KB range. The processing of a message can take up to around 30 seconds, but sometimes a minute.
The Issue
Processing the data with a lower amount of consumers (up to around 50) causes no recurring re-balancing by the broker and the processing works fine. Any re-balancing at this scale is also rather fast, mostly under a minute as per the broker logs.
Once the consumers are scaled to 100 or 200, the consumers constantly re-balance, with intervals up to around 5 minutes. This results into 5 minutes working/consuming followed by 5 minutes re-balancing and then the same again. The consumer services don't fail, just re-balance for no real apparent reason. This leads to reduced throughput when scaling consumers up.
When scaled to 2oo consumers, the processing performs at an average rate of 2 messages per minute per consumer. The processing speed for a single consumer when it is not re-balancing is around 6 messages per minute.
I don't suspect the network of the data centers to be an issue as we have some consumers performing a different kind of processing on the messages and they have no issues passing 100's to 1000's of messages per minute.
Did someone else experience this pattern and find a simple solution, e.g. changing a particular configuration parameter?
Additional Information
The Kafka brokers are version 2.0, and there are 10 of them across different data centers. Replication is set to 3. Partitions for this topic are 500. An excerpt of the specific broker configuration to suit the case of processing large messages better:
- compression.type=lz4
- message.max.bytes=10000000 # 10 MB
- replica.fetch.max.bytes=10000000 # 10 MB
- group.max.session.timeout.ms=1320000 # 22 min
- offset.retention.minutes=10080 # 7 days
On the consumer side we use the java client with a re-balance listener that clears up any buffered messages from the revoked partitions. This buffer is 10 messages large. The consumer clients run client API version 2.1, the update of the java client from 2.0 to 2.1 seems to significantly reduce broker logs of the following kind on these larger consumer numbers (we got these for almost every client and every re-balance before):
INFO [GroupCoordinator 2]: Member svc002 in group my_group has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
The consumers are in a different data center than the brokers. The commit of the offsets is performed async. Recurrent polling is performed in a thread that fills the buffer with a timeout of 15 seconds; once the buffer is full the thread is sleeping a few seconds and polls only when the buffer has free space. An excerpt of the configuration for the use case of larger messages:
- max.partition.fetch.bytes.config=200000000 # 200 MB
- max.poll.records.config=2
- session.timeout.ms.config=1200000 # 20 min
Log File
The following is an extract of the broker log file that manages this particular group over a 30 minute time-frame. Naming reduced to my_group and mytopic. There are also a few entries from an unrelated topic.
19:47:36,786] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:47:36,810] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3213 (kafka.coordinator.group.GroupCoordinator)
19:47:51,788] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:48:46,851] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:48:46,902] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3214 (kafka.coordinator.group.GroupCoordinator)
19:50:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
19:54:29,365] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:57:22,161] INFO [ProducerStateManager partition=unrelated_topic-329] Writing producer snapshot at offset 88002 (kafka.log.ProducerStateManager)
19:57:22,162] INFO [Log partition=unrelated_topic-329, dir=/kafkalog] Rolled new log segment at offset 88002 in 11 ms. (kafka.log.Log)
19:59:14,022] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:59:14,061] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3215 (kafka.coordinator.group.GroupCoordinator)
20:00:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
20:02:57,821] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:06:51,360] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:06:51,391] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3216 (kafka.coordinator.group.GroupCoordinator)
20:10:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
20:10:46,863] INFO [ReplicaFetcher replicaId=2, leaderId=8, fetcherId=0] Node 8 was unable to process the fetch request with (sessionId=928976035, epoch=216971): FETCH_SESSION_ID_NOT_FOUND. (org.apache.kafka.clients.FetchSessionHandler)
20:11:19,236] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:13:54,851] INFO [ProducerStateManager partition=mytopic-321] Writing producer snapshot at offset 123640 (kafka.log.ProducerStateManager)
20:13:54,851] INFO [Log partition=mytopic-321, dir=/kafkalog] Rolled new log segment at offset 123640 in 14 ms. (kafka.log.Log)
20:14:30,686] INFO [ProducerStateManager partition=mytopic-251] Writing producer snapshot at offset 133509 (kafka.log.ProducerStateManager)
20:14:30,686] INFO [Log partition=mytopic-251, dir=/kafkalog] Rolled new log segment at offset 133509 in 1 ms. (kafka.log.Log)
20:16:01,892] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3217 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:16:01,938] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3217 (kafka.coordinator.group.GroupCoordinator)
Many thanks for any help with this issue.
java apache-kafka kafka-consumer-api
Context
We are using Kafka to process large messages, very occasionally up to 10MB but mostly in the 500 KB range. The processing of a message can take up to around 30 seconds, but sometimes a minute.
The Issue
Processing the data with a lower amount of consumers (up to around 50) causes no recurring re-balancing by the broker and the processing works fine. Any re-balancing at this scale is also rather fast, mostly under a minute as per the broker logs.
Once the consumers are scaled to 100 or 200, the consumers constantly re-balance, with intervals up to around 5 minutes. This results into 5 minutes working/consuming followed by 5 minutes re-balancing and then the same again. The consumer services don't fail, just re-balance for no real apparent reason. This leads to reduced throughput when scaling consumers up.
When scaled to 2oo consumers, the processing performs at an average rate of 2 messages per minute per consumer. The processing speed for a single consumer when it is not re-balancing is around 6 messages per minute.
I don't suspect the network of the data centers to be an issue as we have some consumers performing a different kind of processing on the messages and they have no issues passing 100's to 1000's of messages per minute.
Did someone else experience this pattern and find a simple solution, e.g. changing a particular configuration parameter?
Additional Information
The Kafka brokers are version 2.0, and there are 10 of them across different data centers. Replication is set to 3. Partitions for this topic are 500. An excerpt of the specific broker configuration to suit the case of processing large messages better:
- compression.type=lz4
- message.max.bytes=10000000 # 10 MB
- replica.fetch.max.bytes=10000000 # 10 MB
- group.max.session.timeout.ms=1320000 # 22 min
- offset.retention.minutes=10080 # 7 days
On the consumer side we use the java client with a re-balance listener that clears up any buffered messages from the revoked partitions. This buffer is 10 messages large. The consumer clients run client API version 2.1, the update of the java client from 2.0 to 2.1 seems to significantly reduce broker logs of the following kind on these larger consumer numbers (we got these for almost every client and every re-balance before):
INFO [GroupCoordinator 2]: Member svc002 in group my_group has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
The consumers are in a different data center than the brokers. The commit of the offsets is performed async. Recurrent polling is performed in a thread that fills the buffer with a timeout of 15 seconds; once the buffer is full the thread is sleeping a few seconds and polls only when the buffer has free space. An excerpt of the configuration for the use case of larger messages:
- max.partition.fetch.bytes.config=200000000 # 200 MB
- max.poll.records.config=2
- session.timeout.ms.config=1200000 # 20 min
Log File
The following is an extract of the broker log file that manages this particular group over a 30 minute time-frame. Naming reduced to my_group and mytopic. There are also a few entries from an unrelated topic.
19:47:36,786] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:47:36,810] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3213 (kafka.coordinator.group.GroupCoordinator)
19:47:51,788] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:48:46,851] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:48:46,902] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3214 (kafka.coordinator.group.GroupCoordinator)
19:50:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
19:54:29,365] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:57:22,161] INFO [ProducerStateManager partition=unrelated_topic-329] Writing producer snapshot at offset 88002 (kafka.log.ProducerStateManager)
19:57:22,162] INFO [Log partition=unrelated_topic-329, dir=/kafkalog] Rolled new log segment at offset 88002 in 11 ms. (kafka.log.Log)
19:59:14,022] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:59:14,061] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3215 (kafka.coordinator.group.GroupCoordinator)
20:00:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
20:02:57,821] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:06:51,360] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:06:51,391] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3216 (kafka.coordinator.group.GroupCoordinator)
20:10:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
20:10:46,863] INFO [ReplicaFetcher replicaId=2, leaderId=8, fetcherId=0] Node 8 was unable to process the fetch request with (sessionId=928976035, epoch=216971): FETCH_SESSION_ID_NOT_FOUND. (org.apache.kafka.clients.FetchSessionHandler)
20:11:19,236] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:13:54,851] INFO [ProducerStateManager partition=mytopic-321] Writing producer snapshot at offset 123640 (kafka.log.ProducerStateManager)
20:13:54,851] INFO [Log partition=mytopic-321, dir=/kafkalog] Rolled new log segment at offset 123640 in 14 ms. (kafka.log.Log)
20:14:30,686] INFO [ProducerStateManager partition=mytopic-251] Writing producer snapshot at offset 133509 (kafka.log.ProducerStateManager)
20:14:30,686] INFO [Log partition=mytopic-251, dir=/kafkalog] Rolled new log segment at offset 133509 in 1 ms. (kafka.log.Log)
20:16:01,892] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3217 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:16:01,938] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3217 (kafka.coordinator.group.GroupCoordinator)
Many thanks for any help with this issue.
java apache-kafka kafka-consumer-api
java apache-kafka kafka-consumer-api
asked Nov 24 '18 at 20:44
calloc_orgcalloc_org
14519
14519
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
After some further engineering and fine-tuning, we managed to get the issue under control.
First, it seems that some services still processed above the limit and this caused them to fail very rarely. The following leaving caused a re-balance, followed by the joining after around 6-7 minutes, which caused a re-balance too. We reduced this further by optimizing our services in terms of throughput.
A second factor was the underlying docker network that we use to scale the services up. By default the heartbeat interval is very short (5 seconds), so that any hard work and network load on the consumer node may remove it from the docker swarm for a very brief interval. This interruption is answered by docker with moving these services to other nodes (re-balancing), followed by re-balancing when the node comes back online. As the services have long startup times of 5-7 minutes, this leads to a few times of re-balancing on each of these events.
A third factor were errors in the consuming services that caused one of them to crash occasionally, say 1% per hour. This causes again two re-balances, one leaving, one joining.
Collectively, these issues combined led to the observed issue. The latest Kafka version also seems to output more information on why a service is leaving the consumer group. It would be nice if Kafka would continue serving data to consumers that are still stable, I may add a feature request around this. Nevertheless, we have it running stable now with decent performance.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53462206%2fapache-kafka-recurring-reblancing-with-more-consumers%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
After some further engineering and fine-tuning, we managed to get the issue under control.
First, it seems that some services still processed above the limit and this caused them to fail very rarely. The following leaving caused a re-balance, followed by the joining after around 6-7 minutes, which caused a re-balance too. We reduced this further by optimizing our services in terms of throughput.
A second factor was the underlying docker network that we use to scale the services up. By default the heartbeat interval is very short (5 seconds), so that any hard work and network load on the consumer node may remove it from the docker swarm for a very brief interval. This interruption is answered by docker with moving these services to other nodes (re-balancing), followed by re-balancing when the node comes back online. As the services have long startup times of 5-7 minutes, this leads to a few times of re-balancing on each of these events.
A third factor were errors in the consuming services that caused one of them to crash occasionally, say 1% per hour. This causes again two re-balances, one leaving, one joining.
Collectively, these issues combined led to the observed issue. The latest Kafka version also seems to output more information on why a service is leaving the consumer group. It would be nice if Kafka would continue serving data to consumers that are still stable, I may add a feature request around this. Nevertheless, we have it running stable now with decent performance.
add a comment |
After some further engineering and fine-tuning, we managed to get the issue under control.
First, it seems that some services still processed above the limit and this caused them to fail very rarely. The following leaving caused a re-balance, followed by the joining after around 6-7 minutes, which caused a re-balance too. We reduced this further by optimizing our services in terms of throughput.
A second factor was the underlying docker network that we use to scale the services up. By default the heartbeat interval is very short (5 seconds), so that any hard work and network load on the consumer node may remove it from the docker swarm for a very brief interval. This interruption is answered by docker with moving these services to other nodes (re-balancing), followed by re-balancing when the node comes back online. As the services have long startup times of 5-7 minutes, this leads to a few times of re-balancing on each of these events.
A third factor were errors in the consuming services that caused one of them to crash occasionally, say 1% per hour. This causes again two re-balances, one leaving, one joining.
Collectively, these issues combined led to the observed issue. The latest Kafka version also seems to output more information on why a service is leaving the consumer group. It would be nice if Kafka would continue serving data to consumers that are still stable, I may add a feature request around this. Nevertheless, we have it running stable now with decent performance.
add a comment |
After some further engineering and fine-tuning, we managed to get the issue under control.
First, it seems that some services still processed above the limit and this caused them to fail very rarely. The following leaving caused a re-balance, followed by the joining after around 6-7 minutes, which caused a re-balance too. We reduced this further by optimizing our services in terms of throughput.
A second factor was the underlying docker network that we use to scale the services up. By default the heartbeat interval is very short (5 seconds), so that any hard work and network load on the consumer node may remove it from the docker swarm for a very brief interval. This interruption is answered by docker with moving these services to other nodes (re-balancing), followed by re-balancing when the node comes back online. As the services have long startup times of 5-7 minutes, this leads to a few times of re-balancing on each of these events.
A third factor were errors in the consuming services that caused one of them to crash occasionally, say 1% per hour. This causes again two re-balances, one leaving, one joining.
Collectively, these issues combined led to the observed issue. The latest Kafka version also seems to output more information on why a service is leaving the consumer group. It would be nice if Kafka would continue serving data to consumers that are still stable, I may add a feature request around this. Nevertheless, we have it running stable now with decent performance.
After some further engineering and fine-tuning, we managed to get the issue under control.
First, it seems that some services still processed above the limit and this caused them to fail very rarely. The following leaving caused a re-balance, followed by the joining after around 6-7 minutes, which caused a re-balance too. We reduced this further by optimizing our services in terms of throughput.
A second factor was the underlying docker network that we use to scale the services up. By default the heartbeat interval is very short (5 seconds), so that any hard work and network load on the consumer node may remove it from the docker swarm for a very brief interval. This interruption is answered by docker with moving these services to other nodes (re-balancing), followed by re-balancing when the node comes back online. As the services have long startup times of 5-7 minutes, this leads to a few times of re-balancing on each of these events.
A third factor were errors in the consuming services that caused one of them to crash occasionally, say 1% per hour. This causes again two re-balances, one leaving, one joining.
Collectively, these issues combined led to the observed issue. The latest Kafka version also seems to output more information on why a service is leaving the consumer group. It would be nice if Kafka would continue serving data to consumers that are still stable, I may add a feature request around this. Nevertheless, we have it running stable now with decent performance.
answered Jan 22 at 14:29
calloc_orgcalloc_org
14519
14519
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53462206%2fapache-kafka-recurring-reblancing-with-more-consumers%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown