Random fail on KafkaStreams stateful application
Hi here is a problem I stumble upon since a few days and can't find the answer by myself.
I am using the scala streams API v2.0.0.
I have two incoming streams, branched over two handlers for segregation and both declaring a Transformer using a common StateStore.
To do a quick overview, it looks like
def buildStream(builder: StreamsBuilder, config: Config) = {
val store = Stores.keyValueStoreBuilder[String, AggregatedState](Stores.persistentKeyValueStore(config.storeName), ...)
builder.addStateStore(store)
val handlers = List(handler1, handler2)
builder
.stream(config.topic)
.branch(handlers.map(_.accepts).toList: _*) // Dispatch events to the first handler accepting it
.zip(handlers.toList) // (KStream[K, V], Handler)
.map((h, stream) => h.handle(stream)) // process the event on the correct handler
.reduce((s1, s2) => s1.merge(s2)) // merge them back as they return the same object
.to(config.output)
builder
}
Each of my handlers look the same: Take an event, do some operations, pass through the transform()
method to derive a state and emit an aggregate:
class Handler1(config: Config) {
def accepts(key: String, value: Event): Boolean = ??? // Implementation not needed
def handle(stream: KStream[String, Event]) = {
stream
.(join/map/filter)
.transform(new Transformer1(config.storeName))
}
}
class Handler2(config: Config) {
def accepts(key: String, value: Event): Boolean = ??? // Implementation not needed
def handle(stream: KStream[String, Event]) = {
stream
.(join/map/filter)
.transform(new Transformer2(config.storeName))
}
}
The transformers use the same StateStore with the following logic: for a new event, check if its aggregate exists, if yes, update it + store it + emit the new aggregate, otherwise build the aggregate + store it + emit .
class Transformer1(storeName: String) {
private var store: KeyValueStore[String, AggregatedState] = _
override def init(context: ProcessorContext): Unit = {
store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[K, AggregatedState]]
}
override def transform(key: String, value: Event): (String, AggregatedState) = {
val existing: Option[AggregatedState] = Option(store.get(key))
val agg = existing.map(_.updateWith(event)).getOrElse(new AggregatedState(event))
store.put(key, agg)
if(agg.isTerminal){
store.delete(key)
}
if(isDuplicate(existing, agg)){
null // Tombstone, we have a duplicate
} else{
(key, agg) // Emit the new aggregate
}
}
override def close() = Unit
}
class Transformer2(storeName: String) {
private var store: KeyValueStore[String, AggregatedState] = _
override def init(context: ProcessorContext): Unit = {
store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[K, AggregatedState]]
}
override def transform(key: String, value: Event): (String, AggregatedState) = {
val existing: Option[AggregatedState] = Option(store.get(key))
val agg = existing.map(_.updateWith(event)).getOrElse(new AggregatedState(event))
store.put(key, agg)
if(agg.isTerminal){
store.delete(key)
}
if(isDuplicate(existing, agg)){
null // Tombstone, we have a duplicate
} else{
(key, agg) // Emit the new aggregate
}
}
override def close() = Unit
}
Transformer2 is the same, it's just the business logic that changes (how to merge a new event with an aggregated state)
The problem I have is that on stream startup, I can either have a normal startup or a boot exception :
15:07:23,420 ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks - stream-thread [job-tracker-prod-5ba8c2f7-d7fd-48b5-af4a-ac78feef71d3-StreamThread-1] Failed to commit stream task 1_0 due to the following error:
org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000003
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:198)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:406)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:380)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:368)
at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:362)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:352)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:401)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1035)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:845)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed
at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:161)
at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:66)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
at com.mycompany.streamprocess.Transformer1.transform(Transformer1.scala:49) // Line with store.put(key, agg)
I already searched and got results with "The transformer uses a Factory Pattern", which is what is used here (as the .transform takes the transformer and creates a TransformerSupplier under the hood).
As the error is pseudo-random (I could re-create it some times), I guess it could be a race condition on startup but I found nothing concluding.
Is it because I use the same state-store on different transformers?
scala apache-kafka-streams stateful kafka-streams-scala
add a comment |
Hi here is a problem I stumble upon since a few days and can't find the answer by myself.
I am using the scala streams API v2.0.0.
I have two incoming streams, branched over two handlers for segregation and both declaring a Transformer using a common StateStore.
To do a quick overview, it looks like
def buildStream(builder: StreamsBuilder, config: Config) = {
val store = Stores.keyValueStoreBuilder[String, AggregatedState](Stores.persistentKeyValueStore(config.storeName), ...)
builder.addStateStore(store)
val handlers = List(handler1, handler2)
builder
.stream(config.topic)
.branch(handlers.map(_.accepts).toList: _*) // Dispatch events to the first handler accepting it
.zip(handlers.toList) // (KStream[K, V], Handler)
.map((h, stream) => h.handle(stream)) // process the event on the correct handler
.reduce((s1, s2) => s1.merge(s2)) // merge them back as they return the same object
.to(config.output)
builder
}
Each of my handlers look the same: Take an event, do some operations, pass through the transform()
method to derive a state and emit an aggregate:
class Handler1(config: Config) {
def accepts(key: String, value: Event): Boolean = ??? // Implementation not needed
def handle(stream: KStream[String, Event]) = {
stream
.(join/map/filter)
.transform(new Transformer1(config.storeName))
}
}
class Handler2(config: Config) {
def accepts(key: String, value: Event): Boolean = ??? // Implementation not needed
def handle(stream: KStream[String, Event]) = {
stream
.(join/map/filter)
.transform(new Transformer2(config.storeName))
}
}
The transformers use the same StateStore with the following logic: for a new event, check if its aggregate exists, if yes, update it + store it + emit the new aggregate, otherwise build the aggregate + store it + emit .
class Transformer1(storeName: String) {
private var store: KeyValueStore[String, AggregatedState] = _
override def init(context: ProcessorContext): Unit = {
store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[K, AggregatedState]]
}
override def transform(key: String, value: Event): (String, AggregatedState) = {
val existing: Option[AggregatedState] = Option(store.get(key))
val agg = existing.map(_.updateWith(event)).getOrElse(new AggregatedState(event))
store.put(key, agg)
if(agg.isTerminal){
store.delete(key)
}
if(isDuplicate(existing, agg)){
null // Tombstone, we have a duplicate
} else{
(key, agg) // Emit the new aggregate
}
}
override def close() = Unit
}
class Transformer2(storeName: String) {
private var store: KeyValueStore[String, AggregatedState] = _
override def init(context: ProcessorContext): Unit = {
store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[K, AggregatedState]]
}
override def transform(key: String, value: Event): (String, AggregatedState) = {
val existing: Option[AggregatedState] = Option(store.get(key))
val agg = existing.map(_.updateWith(event)).getOrElse(new AggregatedState(event))
store.put(key, agg)
if(agg.isTerminal){
store.delete(key)
}
if(isDuplicate(existing, agg)){
null // Tombstone, we have a duplicate
} else{
(key, agg) // Emit the new aggregate
}
}
override def close() = Unit
}
Transformer2 is the same, it's just the business logic that changes (how to merge a new event with an aggregated state)
The problem I have is that on stream startup, I can either have a normal startup or a boot exception :
15:07:23,420 ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks - stream-thread [job-tracker-prod-5ba8c2f7-d7fd-48b5-af4a-ac78feef71d3-StreamThread-1] Failed to commit stream task 1_0 due to the following error:
org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000003
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:198)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:406)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:380)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:368)
at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:362)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:352)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:401)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1035)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:845)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed
at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:161)
at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:66)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
at com.mycompany.streamprocess.Transformer1.transform(Transformer1.scala:49) // Line with store.put(key, agg)
I already searched and got results with "The transformer uses a Factory Pattern", which is what is used here (as the .transform takes the transformer and creates a TransformerSupplier under the hood).
As the error is pseudo-random (I could re-create it some times), I guess it could be a race condition on startup but I found nothing concluding.
Is it because I use the same state-store on different transformers?
scala apache-kafka-streams stateful kafka-streams-scala
add a comment |
Hi here is a problem I stumble upon since a few days and can't find the answer by myself.
I am using the scala streams API v2.0.0.
I have two incoming streams, branched over two handlers for segregation and both declaring a Transformer using a common StateStore.
To do a quick overview, it looks like
def buildStream(builder: StreamsBuilder, config: Config) = {
val store = Stores.keyValueStoreBuilder[String, AggregatedState](Stores.persistentKeyValueStore(config.storeName), ...)
builder.addStateStore(store)
val handlers = List(handler1, handler2)
builder
.stream(config.topic)
.branch(handlers.map(_.accepts).toList: _*) // Dispatch events to the first handler accepting it
.zip(handlers.toList) // (KStream[K, V], Handler)
.map((h, stream) => h.handle(stream)) // process the event on the correct handler
.reduce((s1, s2) => s1.merge(s2)) // merge them back as they return the same object
.to(config.output)
builder
}
Each of my handlers look the same: Take an event, do some operations, pass through the transform()
method to derive a state and emit an aggregate:
class Handler1(config: Config) {
def accepts(key: String, value: Event): Boolean = ??? // Implementation not needed
def handle(stream: KStream[String, Event]) = {
stream
.(join/map/filter)
.transform(new Transformer1(config.storeName))
}
}
class Handler2(config: Config) {
def accepts(key: String, value: Event): Boolean = ??? // Implementation not needed
def handle(stream: KStream[String, Event]) = {
stream
.(join/map/filter)
.transform(new Transformer2(config.storeName))
}
}
The transformers use the same StateStore with the following logic: for a new event, check if its aggregate exists, if yes, update it + store it + emit the new aggregate, otherwise build the aggregate + store it + emit .
class Transformer1(storeName: String) {
private var store: KeyValueStore[String, AggregatedState] = _
override def init(context: ProcessorContext): Unit = {
store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[K, AggregatedState]]
}
override def transform(key: String, value: Event): (String, AggregatedState) = {
val existing: Option[AggregatedState] = Option(store.get(key))
val agg = existing.map(_.updateWith(event)).getOrElse(new AggregatedState(event))
store.put(key, agg)
if(agg.isTerminal){
store.delete(key)
}
if(isDuplicate(existing, agg)){
null // Tombstone, we have a duplicate
} else{
(key, agg) // Emit the new aggregate
}
}
override def close() = Unit
}
class Transformer2(storeName: String) {
private var store: KeyValueStore[String, AggregatedState] = _
override def init(context: ProcessorContext): Unit = {
store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[K, AggregatedState]]
}
override def transform(key: String, value: Event): (String, AggregatedState) = {
val existing: Option[AggregatedState] = Option(store.get(key))
val agg = existing.map(_.updateWith(event)).getOrElse(new AggregatedState(event))
store.put(key, agg)
if(agg.isTerminal){
store.delete(key)
}
if(isDuplicate(existing, agg)){
null // Tombstone, we have a duplicate
} else{
(key, agg) // Emit the new aggregate
}
}
override def close() = Unit
}
Transformer2 is the same, it's just the business logic that changes (how to merge a new event with an aggregated state)
The problem I have is that on stream startup, I can either have a normal startup or a boot exception :
15:07:23,420 ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks - stream-thread [job-tracker-prod-5ba8c2f7-d7fd-48b5-af4a-ac78feef71d3-StreamThread-1] Failed to commit stream task 1_0 due to the following error:
org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000003
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:198)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:406)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:380)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:368)
at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:362)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:352)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:401)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1035)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:845)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed
at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:161)
at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:66)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
at com.mycompany.streamprocess.Transformer1.transform(Transformer1.scala:49) // Line with store.put(key, agg)
I already searched and got results with "The transformer uses a Factory Pattern", which is what is used here (as the .transform takes the transformer and creates a TransformerSupplier under the hood).
As the error is pseudo-random (I could re-create it some times), I guess it could be a race condition on startup but I found nothing concluding.
Is it because I use the same state-store on different transformers?
scala apache-kafka-streams stateful kafka-streams-scala
Hi here is a problem I stumble upon since a few days and can't find the answer by myself.
I am using the scala streams API v2.0.0.
I have two incoming streams, branched over two handlers for segregation and both declaring a Transformer using a common StateStore.
To do a quick overview, it looks like
def buildStream(builder: StreamsBuilder, config: Config) = {
val store = Stores.keyValueStoreBuilder[String, AggregatedState](Stores.persistentKeyValueStore(config.storeName), ...)
builder.addStateStore(store)
val handlers = List(handler1, handler2)
builder
.stream(config.topic)
.branch(handlers.map(_.accepts).toList: _*) // Dispatch events to the first handler accepting it
.zip(handlers.toList) // (KStream[K, V], Handler)
.map((h, stream) => h.handle(stream)) // process the event on the correct handler
.reduce((s1, s2) => s1.merge(s2)) // merge them back as they return the same object
.to(config.output)
builder
}
Each of my handlers look the same: Take an event, do some operations, pass through the transform()
method to derive a state and emit an aggregate:
class Handler1(config: Config) {
def accepts(key: String, value: Event): Boolean = ??? // Implementation not needed
def handle(stream: KStream[String, Event]) = {
stream
.(join/map/filter)
.transform(new Transformer1(config.storeName))
}
}
class Handler2(config: Config) {
def accepts(key: String, value: Event): Boolean = ??? // Implementation not needed
def handle(stream: KStream[String, Event]) = {
stream
.(join/map/filter)
.transform(new Transformer2(config.storeName))
}
}
The transformers use the same StateStore with the following logic: for a new event, check if its aggregate exists, if yes, update it + store it + emit the new aggregate, otherwise build the aggregate + store it + emit .
class Transformer1(storeName: String) {
private var store: KeyValueStore[String, AggregatedState] = _
override def init(context: ProcessorContext): Unit = {
store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[K, AggregatedState]]
}
override def transform(key: String, value: Event): (String, AggregatedState) = {
val existing: Option[AggregatedState] = Option(store.get(key))
val agg = existing.map(_.updateWith(event)).getOrElse(new AggregatedState(event))
store.put(key, agg)
if(agg.isTerminal){
store.delete(key)
}
if(isDuplicate(existing, agg)){
null // Tombstone, we have a duplicate
} else{
(key, agg) // Emit the new aggregate
}
}
override def close() = Unit
}
class Transformer2(storeName: String) {
private var store: KeyValueStore[String, AggregatedState] = _
override def init(context: ProcessorContext): Unit = {
store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[K, AggregatedState]]
}
override def transform(key: String, value: Event): (String, AggregatedState) = {
val existing: Option[AggregatedState] = Option(store.get(key))
val agg = existing.map(_.updateWith(event)).getOrElse(new AggregatedState(event))
store.put(key, agg)
if(agg.isTerminal){
store.delete(key)
}
if(isDuplicate(existing, agg)){
null // Tombstone, we have a duplicate
} else{
(key, agg) // Emit the new aggregate
}
}
override def close() = Unit
}
Transformer2 is the same, it's just the business logic that changes (how to merge a new event with an aggregated state)
The problem I have is that on stream startup, I can either have a normal startup or a boot exception :
15:07:23,420 ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks - stream-thread [job-tracker-prod-5ba8c2f7-d7fd-48b5-af4a-ac78feef71d3-StreamThread-1] Failed to commit stream task 1_0 due to the following error:
org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000003
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:198)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:406)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:380)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:368)
at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:362)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:352)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:401)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1035)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:845)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed
at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:161)
at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:66)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
at com.mycompany.streamprocess.Transformer1.transform(Transformer1.scala:49) // Line with store.put(key, agg)
I already searched and got results with "The transformer uses a Factory Pattern", which is what is used here (as the .transform takes the transformer and creates a TransformerSupplier under the hood).
As the error is pseudo-random (I could re-create it some times), I guess it could be a race condition on startup but I found nothing concluding.
Is it because I use the same state-store on different transformers?
scala apache-kafka-streams stateful kafka-streams-scala
scala apache-kafka-streams stateful kafka-streams-scala
asked Nov 22 '18 at 15:41
aaaaaaaaaaaaaa
76112
76112
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
I assume you are hitting https://issues.apache.org/jira/browse/KAFKA-7250
It's fixed in version 2.0.1 and 2.1.0.
If you cannot upgrade, you need to pass in the TransformerSupplier
explicitly, because the Scale API constructs the supplier incorrectly in 2.0.0.
.transform(() => new Transformer1(config.storeName))
1
Indeed, that's the problem (and solution). Upgrading to 2.1.0 solved my problem. Thank you!
– aaaaaaa
Nov 23 '18 at 14:27
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53434321%2frandom-fail-on-kafkastreams-stateful-application%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
I assume you are hitting https://issues.apache.org/jira/browse/KAFKA-7250
It's fixed in version 2.0.1 and 2.1.0.
If you cannot upgrade, you need to pass in the TransformerSupplier
explicitly, because the Scale API constructs the supplier incorrectly in 2.0.0.
.transform(() => new Transformer1(config.storeName))
1
Indeed, that's the problem (and solution). Upgrading to 2.1.0 solved my problem. Thank you!
– aaaaaaa
Nov 23 '18 at 14:27
add a comment |
I assume you are hitting https://issues.apache.org/jira/browse/KAFKA-7250
It's fixed in version 2.0.1 and 2.1.0.
If you cannot upgrade, you need to pass in the TransformerSupplier
explicitly, because the Scale API constructs the supplier incorrectly in 2.0.0.
.transform(() => new Transformer1(config.storeName))
1
Indeed, that's the problem (and solution). Upgrading to 2.1.0 solved my problem. Thank you!
– aaaaaaa
Nov 23 '18 at 14:27
add a comment |
I assume you are hitting https://issues.apache.org/jira/browse/KAFKA-7250
It's fixed in version 2.0.1 and 2.1.0.
If you cannot upgrade, you need to pass in the TransformerSupplier
explicitly, because the Scale API constructs the supplier incorrectly in 2.0.0.
.transform(() => new Transformer1(config.storeName))
I assume you are hitting https://issues.apache.org/jira/browse/KAFKA-7250
It's fixed in version 2.0.1 and 2.1.0.
If you cannot upgrade, you need to pass in the TransformerSupplier
explicitly, because the Scale API constructs the supplier incorrectly in 2.0.0.
.transform(() => new Transformer1(config.storeName))
answered Nov 23 '18 at 5:20
Matthias J. SaxMatthias J. Sax
31.2k45582
31.2k45582
1
Indeed, that's the problem (and solution). Upgrading to 2.1.0 solved my problem. Thank you!
– aaaaaaa
Nov 23 '18 at 14:27
add a comment |
1
Indeed, that's the problem (and solution). Upgrading to 2.1.0 solved my problem. Thank you!
– aaaaaaa
Nov 23 '18 at 14:27
1
1
Indeed, that's the problem (and solution). Upgrading to 2.1.0 solved my problem. Thank you!
– aaaaaaa
Nov 23 '18 at 14:27
Indeed, that's the problem (and solution). Upgrading to 2.1.0 solved my problem. Thank you!
– aaaaaaa
Nov 23 '18 at 14:27
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53434321%2frandom-fail-on-kafkastreams-stateful-application%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown