Random fail on KafkaStreams stateful application












0















Hi here is a problem I stumble upon since a few days and can't find the answer by myself.



I am using the scala streams API v2.0.0.



I have two incoming streams, branched over two handlers for segregation and both declaring a Transformer using a common StateStore.



To do a quick overview, it looks like



def buildStream(builder: StreamsBuilder, config: Config) = {
val store = Stores.keyValueStoreBuilder[String, AggregatedState](Stores.persistentKeyValueStore(config.storeName), ...)
builder.addStateStore(store)

val handlers = List(handler1, handler2)

builder
.stream(config.topic)
.branch(handlers.map(_.accepts).toList: _*) // Dispatch events to the first handler accepting it
.zip(handlers.toList) // (KStream[K, V], Handler)
.map((h, stream) => h.handle(stream)) // process the event on the correct handler
.reduce((s1, s2) => s1.merge(s2)) // merge them back as they return the same object
.to(config.output)

builder
}


Each of my handlers look the same: Take an event, do some operations, pass through the transform() method to derive a state and emit an aggregate:



class Handler1(config: Config) {
def accepts(key: String, value: Event): Boolean = ??? // Implementation not needed

def handle(stream: KStream[String, Event]) = {
stream
.(join/map/filter)
.transform(new Transformer1(config.storeName))
}
}


class Handler2(config: Config) {
def accepts(key: String, value: Event): Boolean = ??? // Implementation not needed

def handle(stream: KStream[String, Event]) = {
stream
.(join/map/filter)
.transform(new Transformer2(config.storeName))
}
}


The transformers use the same StateStore with the following logic: for a new event, check if its aggregate exists, if yes, update it + store it + emit the new aggregate, otherwise build the aggregate + store it + emit .



class Transformer1(storeName: String) {
private var store: KeyValueStore[String, AggregatedState] = _

override def init(context: ProcessorContext): Unit = {
store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[K, AggregatedState]]
}

override def transform(key: String, value: Event): (String, AggregatedState) = {
val existing: Option[AggregatedState] = Option(store.get(key))
val agg = existing.map(_.updateWith(event)).getOrElse(new AggregatedState(event))

store.put(key, agg)
if(agg.isTerminal){
store.delete(key)
}
if(isDuplicate(existing, agg)){
null // Tombstone, we have a duplicate
} else{
(key, agg) // Emit the new aggregate
}
}

override def close() = Unit
}


class Transformer2(storeName: String) {
private var store: KeyValueStore[String, AggregatedState] = _

override def init(context: ProcessorContext): Unit = {
store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[K, AggregatedState]]
}

override def transform(key: String, value: Event): (String, AggregatedState) = {
val existing: Option[AggregatedState] = Option(store.get(key))
val agg = existing.map(_.updateWith(event)).getOrElse(new AggregatedState(event))

store.put(key, agg)
if(agg.isTerminal){
store.delete(key)
}
if(isDuplicate(existing, agg)){
null // Tombstone, we have a duplicate
} else{
(key, agg) // Emit the new aggregate
}
}

override def close() = Unit
}


Transformer2 is the same, it's just the business logic that changes (how to merge a new event with an aggregated state)



The problem I have is that on stream startup, I can either have a normal startup or a boot exception :



15:07:23,420 ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks  - stream-thread [job-tracker-prod-5ba8c2f7-d7fd-48b5-af4a-ac78feef71d3-StreamThread-1] Failed to commit stream task 1_0 due to the following error:
org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000003
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:198)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:406)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:380)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:368)
at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:362)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:352)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:401)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1035)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:845)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed
at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:161)
at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:66)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
at com.mycompany.streamprocess.Transformer1.transform(Transformer1.scala:49) // Line with store.put(key, agg)


I already searched and got results with "The transformer uses a Factory Pattern", which is what is used here (as the .transform takes the transformer and creates a TransformerSupplier under the hood).
As the error is pseudo-random (I could re-create it some times), I guess it could be a race condition on startup but I found nothing concluding.
Is it because I use the same state-store on different transformers?










share|improve this question



























    0















    Hi here is a problem I stumble upon since a few days and can't find the answer by myself.



    I am using the scala streams API v2.0.0.



    I have two incoming streams, branched over two handlers for segregation and both declaring a Transformer using a common StateStore.



    To do a quick overview, it looks like



    def buildStream(builder: StreamsBuilder, config: Config) = {
    val store = Stores.keyValueStoreBuilder[String, AggregatedState](Stores.persistentKeyValueStore(config.storeName), ...)
    builder.addStateStore(store)

    val handlers = List(handler1, handler2)

    builder
    .stream(config.topic)
    .branch(handlers.map(_.accepts).toList: _*) // Dispatch events to the first handler accepting it
    .zip(handlers.toList) // (KStream[K, V], Handler)
    .map((h, stream) => h.handle(stream)) // process the event on the correct handler
    .reduce((s1, s2) => s1.merge(s2)) // merge them back as they return the same object
    .to(config.output)

    builder
    }


    Each of my handlers look the same: Take an event, do some operations, pass through the transform() method to derive a state and emit an aggregate:



    class Handler1(config: Config) {
    def accepts(key: String, value: Event): Boolean = ??? // Implementation not needed

    def handle(stream: KStream[String, Event]) = {
    stream
    .(join/map/filter)
    .transform(new Transformer1(config.storeName))
    }
    }


    class Handler2(config: Config) {
    def accepts(key: String, value: Event): Boolean = ??? // Implementation not needed

    def handle(stream: KStream[String, Event]) = {
    stream
    .(join/map/filter)
    .transform(new Transformer2(config.storeName))
    }
    }


    The transformers use the same StateStore with the following logic: for a new event, check if its aggregate exists, if yes, update it + store it + emit the new aggregate, otherwise build the aggregate + store it + emit .



    class Transformer1(storeName: String) {
    private var store: KeyValueStore[String, AggregatedState] = _

    override def init(context: ProcessorContext): Unit = {
    store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[K, AggregatedState]]
    }

    override def transform(key: String, value: Event): (String, AggregatedState) = {
    val existing: Option[AggregatedState] = Option(store.get(key))
    val agg = existing.map(_.updateWith(event)).getOrElse(new AggregatedState(event))

    store.put(key, agg)
    if(agg.isTerminal){
    store.delete(key)
    }
    if(isDuplicate(existing, agg)){
    null // Tombstone, we have a duplicate
    } else{
    (key, agg) // Emit the new aggregate
    }
    }

    override def close() = Unit
    }


    class Transformer2(storeName: String) {
    private var store: KeyValueStore[String, AggregatedState] = _

    override def init(context: ProcessorContext): Unit = {
    store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[K, AggregatedState]]
    }

    override def transform(key: String, value: Event): (String, AggregatedState) = {
    val existing: Option[AggregatedState] = Option(store.get(key))
    val agg = existing.map(_.updateWith(event)).getOrElse(new AggregatedState(event))

    store.put(key, agg)
    if(agg.isTerminal){
    store.delete(key)
    }
    if(isDuplicate(existing, agg)){
    null // Tombstone, we have a duplicate
    } else{
    (key, agg) // Emit the new aggregate
    }
    }

    override def close() = Unit
    }


    Transformer2 is the same, it's just the business logic that changes (how to merge a new event with an aggregated state)



    The problem I have is that on stream startup, I can either have a normal startup or a boot exception :



    15:07:23,420 ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks  - stream-thread [job-tracker-prod-5ba8c2f7-d7fd-48b5-af4a-ac78feef71d3-StreamThread-1] Failed to commit stream task 1_0 due to the following error:
    org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000003
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
    at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:198)
    at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:406)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:380)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:368)
    at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:362)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:352)
    at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:401)
    at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1035)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:845)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
    Caused by: java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:161)
    at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:66)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
    at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
    at com.mycompany.streamprocess.Transformer1.transform(Transformer1.scala:49) // Line with store.put(key, agg)


    I already searched and got results with "The transformer uses a Factory Pattern", which is what is used here (as the .transform takes the transformer and creates a TransformerSupplier under the hood).
    As the error is pseudo-random (I could re-create it some times), I guess it could be a race condition on startup but I found nothing concluding.
    Is it because I use the same state-store on different transformers?










    share|improve this question

























      0












      0








      0








      Hi here is a problem I stumble upon since a few days and can't find the answer by myself.



      I am using the scala streams API v2.0.0.



      I have two incoming streams, branched over two handlers for segregation and both declaring a Transformer using a common StateStore.



      To do a quick overview, it looks like



      def buildStream(builder: StreamsBuilder, config: Config) = {
      val store = Stores.keyValueStoreBuilder[String, AggregatedState](Stores.persistentKeyValueStore(config.storeName), ...)
      builder.addStateStore(store)

      val handlers = List(handler1, handler2)

      builder
      .stream(config.topic)
      .branch(handlers.map(_.accepts).toList: _*) // Dispatch events to the first handler accepting it
      .zip(handlers.toList) // (KStream[K, V], Handler)
      .map((h, stream) => h.handle(stream)) // process the event on the correct handler
      .reduce((s1, s2) => s1.merge(s2)) // merge them back as they return the same object
      .to(config.output)

      builder
      }


      Each of my handlers look the same: Take an event, do some operations, pass through the transform() method to derive a state and emit an aggregate:



      class Handler1(config: Config) {
      def accepts(key: String, value: Event): Boolean = ??? // Implementation not needed

      def handle(stream: KStream[String, Event]) = {
      stream
      .(join/map/filter)
      .transform(new Transformer1(config.storeName))
      }
      }


      class Handler2(config: Config) {
      def accepts(key: String, value: Event): Boolean = ??? // Implementation not needed

      def handle(stream: KStream[String, Event]) = {
      stream
      .(join/map/filter)
      .transform(new Transformer2(config.storeName))
      }
      }


      The transformers use the same StateStore with the following logic: for a new event, check if its aggregate exists, if yes, update it + store it + emit the new aggregate, otherwise build the aggregate + store it + emit .



      class Transformer1(storeName: String) {
      private var store: KeyValueStore[String, AggregatedState] = _

      override def init(context: ProcessorContext): Unit = {
      store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[K, AggregatedState]]
      }

      override def transform(key: String, value: Event): (String, AggregatedState) = {
      val existing: Option[AggregatedState] = Option(store.get(key))
      val agg = existing.map(_.updateWith(event)).getOrElse(new AggregatedState(event))

      store.put(key, agg)
      if(agg.isTerminal){
      store.delete(key)
      }
      if(isDuplicate(existing, agg)){
      null // Tombstone, we have a duplicate
      } else{
      (key, agg) // Emit the new aggregate
      }
      }

      override def close() = Unit
      }


      class Transformer2(storeName: String) {
      private var store: KeyValueStore[String, AggregatedState] = _

      override def init(context: ProcessorContext): Unit = {
      store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[K, AggregatedState]]
      }

      override def transform(key: String, value: Event): (String, AggregatedState) = {
      val existing: Option[AggregatedState] = Option(store.get(key))
      val agg = existing.map(_.updateWith(event)).getOrElse(new AggregatedState(event))

      store.put(key, agg)
      if(agg.isTerminal){
      store.delete(key)
      }
      if(isDuplicate(existing, agg)){
      null // Tombstone, we have a duplicate
      } else{
      (key, agg) // Emit the new aggregate
      }
      }

      override def close() = Unit
      }


      Transformer2 is the same, it's just the business logic that changes (how to merge a new event with an aggregated state)



      The problem I have is that on stream startup, I can either have a normal startup or a boot exception :



      15:07:23,420 ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks  - stream-thread [job-tracker-prod-5ba8c2f7-d7fd-48b5-af4a-ac78feef71d3-StreamThread-1] Failed to commit stream task 1_0 due to the following error:
      org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000003
      at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
      at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:198)
      at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:406)
      at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:380)
      at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:368)
      at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
      at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:362)
      at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:352)
      at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:401)
      at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1035)
      at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:845)
      at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
      at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
      Caused by: java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed
      at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:161)
      at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
      at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:66)
      at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
      at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
      at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
      at com.mycompany.streamprocess.Transformer1.transform(Transformer1.scala:49) // Line with store.put(key, agg)


      I already searched and got results with "The transformer uses a Factory Pattern", which is what is used here (as the .transform takes the transformer and creates a TransformerSupplier under the hood).
      As the error is pseudo-random (I could re-create it some times), I guess it could be a race condition on startup but I found nothing concluding.
      Is it because I use the same state-store on different transformers?










      share|improve this question














      Hi here is a problem I stumble upon since a few days and can't find the answer by myself.



      I am using the scala streams API v2.0.0.



      I have two incoming streams, branched over two handlers for segregation and both declaring a Transformer using a common StateStore.



      To do a quick overview, it looks like



      def buildStream(builder: StreamsBuilder, config: Config) = {
      val store = Stores.keyValueStoreBuilder[String, AggregatedState](Stores.persistentKeyValueStore(config.storeName), ...)
      builder.addStateStore(store)

      val handlers = List(handler1, handler2)

      builder
      .stream(config.topic)
      .branch(handlers.map(_.accepts).toList: _*) // Dispatch events to the first handler accepting it
      .zip(handlers.toList) // (KStream[K, V], Handler)
      .map((h, stream) => h.handle(stream)) // process the event on the correct handler
      .reduce((s1, s2) => s1.merge(s2)) // merge them back as they return the same object
      .to(config.output)

      builder
      }


      Each of my handlers look the same: Take an event, do some operations, pass through the transform() method to derive a state and emit an aggregate:



      class Handler1(config: Config) {
      def accepts(key: String, value: Event): Boolean = ??? // Implementation not needed

      def handle(stream: KStream[String, Event]) = {
      stream
      .(join/map/filter)
      .transform(new Transformer1(config.storeName))
      }
      }


      class Handler2(config: Config) {
      def accepts(key: String, value: Event): Boolean = ??? // Implementation not needed

      def handle(stream: KStream[String, Event]) = {
      stream
      .(join/map/filter)
      .transform(new Transformer2(config.storeName))
      }
      }


      The transformers use the same StateStore with the following logic: for a new event, check if its aggregate exists, if yes, update it + store it + emit the new aggregate, otherwise build the aggregate + store it + emit .



      class Transformer1(storeName: String) {
      private var store: KeyValueStore[String, AggregatedState] = _

      override def init(context: ProcessorContext): Unit = {
      store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[K, AggregatedState]]
      }

      override def transform(key: String, value: Event): (String, AggregatedState) = {
      val existing: Option[AggregatedState] = Option(store.get(key))
      val agg = existing.map(_.updateWith(event)).getOrElse(new AggregatedState(event))

      store.put(key, agg)
      if(agg.isTerminal){
      store.delete(key)
      }
      if(isDuplicate(existing, agg)){
      null // Tombstone, we have a duplicate
      } else{
      (key, agg) // Emit the new aggregate
      }
      }

      override def close() = Unit
      }


      class Transformer2(storeName: String) {
      private var store: KeyValueStore[String, AggregatedState] = _

      override def init(context: ProcessorContext): Unit = {
      store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[K, AggregatedState]]
      }

      override def transform(key: String, value: Event): (String, AggregatedState) = {
      val existing: Option[AggregatedState] = Option(store.get(key))
      val agg = existing.map(_.updateWith(event)).getOrElse(new AggregatedState(event))

      store.put(key, agg)
      if(agg.isTerminal){
      store.delete(key)
      }
      if(isDuplicate(existing, agg)){
      null // Tombstone, we have a duplicate
      } else{
      (key, agg) // Emit the new aggregate
      }
      }

      override def close() = Unit
      }


      Transformer2 is the same, it's just the business logic that changes (how to merge a new event with an aggregated state)



      The problem I have is that on stream startup, I can either have a normal startup or a boot exception :



      15:07:23,420 ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks  - stream-thread [job-tracker-prod-5ba8c2f7-d7fd-48b5-af4a-ac78feef71d3-StreamThread-1] Failed to commit stream task 1_0 due to the following error:
      org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000003
      at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
      at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:198)
      at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:406)
      at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:380)
      at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:368)
      at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
      at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:362)
      at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:352)
      at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:401)
      at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1035)
      at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:845)
      at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
      at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
      Caused by: java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed
      at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:161)
      at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
      at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:66)
      at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
      at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
      at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
      at com.mycompany.streamprocess.Transformer1.transform(Transformer1.scala:49) // Line with store.put(key, agg)


      I already searched and got results with "The transformer uses a Factory Pattern", which is what is used here (as the .transform takes the transformer and creates a TransformerSupplier under the hood).
      As the error is pseudo-random (I could re-create it some times), I guess it could be a race condition on startup but I found nothing concluding.
      Is it because I use the same state-store on different transformers?







      scala apache-kafka-streams stateful kafka-streams-scala






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 22 '18 at 15:41









      aaaaaaaaaaaaaa

      76112




      76112
























          1 Answer
          1






          active

          oldest

          votes


















          1














          I assume you are hitting https://issues.apache.org/jira/browse/KAFKA-7250



          It's fixed in version 2.0.1 and 2.1.0.



          If you cannot upgrade, you need to pass in the TransformerSupplier explicitly, because the Scale API constructs the supplier incorrectly in 2.0.0.



          .transform(() => new Transformer1(config.storeName))





          share|improve this answer



















          • 1





            Indeed, that's the problem (and solution). Upgrading to 2.1.0 solved my problem. Thank you!

            – aaaaaaa
            Nov 23 '18 at 14:27











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53434321%2frandom-fail-on-kafkastreams-stateful-application%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          1














          I assume you are hitting https://issues.apache.org/jira/browse/KAFKA-7250



          It's fixed in version 2.0.1 and 2.1.0.



          If you cannot upgrade, you need to pass in the TransformerSupplier explicitly, because the Scale API constructs the supplier incorrectly in 2.0.0.



          .transform(() => new Transformer1(config.storeName))





          share|improve this answer



















          • 1





            Indeed, that's the problem (and solution). Upgrading to 2.1.0 solved my problem. Thank you!

            – aaaaaaa
            Nov 23 '18 at 14:27
















          1














          I assume you are hitting https://issues.apache.org/jira/browse/KAFKA-7250



          It's fixed in version 2.0.1 and 2.1.0.



          If you cannot upgrade, you need to pass in the TransformerSupplier explicitly, because the Scale API constructs the supplier incorrectly in 2.0.0.



          .transform(() => new Transformer1(config.storeName))





          share|improve this answer



















          • 1





            Indeed, that's the problem (and solution). Upgrading to 2.1.0 solved my problem. Thank you!

            – aaaaaaa
            Nov 23 '18 at 14:27














          1












          1








          1







          I assume you are hitting https://issues.apache.org/jira/browse/KAFKA-7250



          It's fixed in version 2.0.1 and 2.1.0.



          If you cannot upgrade, you need to pass in the TransformerSupplier explicitly, because the Scale API constructs the supplier incorrectly in 2.0.0.



          .transform(() => new Transformer1(config.storeName))





          share|improve this answer













          I assume you are hitting https://issues.apache.org/jira/browse/KAFKA-7250



          It's fixed in version 2.0.1 and 2.1.0.



          If you cannot upgrade, you need to pass in the TransformerSupplier explicitly, because the Scale API constructs the supplier incorrectly in 2.0.0.



          .transform(() => new Transformer1(config.storeName))






          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 23 '18 at 5:20









          Matthias J. SaxMatthias J. Sax

          31.2k45582




          31.2k45582








          • 1





            Indeed, that's the problem (and solution). Upgrading to 2.1.0 solved my problem. Thank you!

            – aaaaaaa
            Nov 23 '18 at 14:27














          • 1





            Indeed, that's the problem (and solution). Upgrading to 2.1.0 solved my problem. Thank you!

            – aaaaaaa
            Nov 23 '18 at 14:27








          1




          1





          Indeed, that's the problem (and solution). Upgrading to 2.1.0 solved my problem. Thank you!

          – aaaaaaa
          Nov 23 '18 at 14:27





          Indeed, that's the problem (and solution). Upgrading to 2.1.0 solved my problem. Thank you!

          – aaaaaaa
          Nov 23 '18 at 14:27




















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53434321%2frandom-fail-on-kafkastreams-stateful-application%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          這個網誌中的熱門文章

          Tangent Lines Diagram Along Smooth Curve

          Yusuf al-Mu'taman ibn Hud

          Zucchini