Connecting Multiple Source Streams to The Same Set of Branches





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ height:90px;width:728px;box-sizing:border-box;
}







0















This is a generalization of this question.



Suppose I have multiple source streams for which the same set of predicates apply. I would like to set up branch streams such that records which satisfy the predicate, regardless of which source stream, is processed by the same branch stream. As the diagram below shows each branch stream is like a generic processor which transforms incoming records.



enter image description here



The following code block does not work as it should be since it creates a distinct set of branch streams for each source stream.



StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source1 = builder.stream("x");
KStream<String, String> source2 = builder.stream("y");

Predicate<String, String> branchPredicates = new Predicate[forkCount];
for (int i = 0; i < forkCount; ++i) {
int idx = i;
branchPredicates[i] = ((key, value) ->
key.hashCode() % forkCount == idx);
}

Kstream<String, String> forkStreams = Arrays.asList(source1, source2)
.map(srcStream -> srcStream.branch(branchPredicates)
.flatMap(x -> Arrays.stream())
.collect(Collectors.toList());


sorry, I'm mostly a scala developer :)



In the above example, forkStreams.length == branchPredicates.length x 2 and in general, proportional to the number of source streams. Is there a trick in Kafka stream that allows me to keep a one-to-one relationship between the predicates and fork streams?



UPDATE 11/27/2018
There are some progress in that I can:




  • Read from all source topics using one source stream

  • Connect the source stream to multiple branches

  • Distribute messages evenly amount the branches.


However, as the following code block demonstrates ALL fork streams exist in the same thread. What I would like to achieve is to place each fork stream into a different thread to allow better CPU utilization



StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream(Arrays.asList("a", "b", "c")
// Create workers
// Need to have predicates for the branches
int totalPerdicates = Integer
.parseInt(props.getProperty(WORKER_PROCESSOR_COUNT));
Predicate<String, String> predicates = new Predicate[totalPerdicates];
IntStream
.range(0, totalPerdicates)
.forEach(i -> {
int idx = i;
predicates[i] = (key, value) ->
key.hashCode() % totalPerdicates == idx;
});

forkStreams = Arrays.asList(sourceStreams.branch(predicates));

// Hack- Dump the number of messages processed every 10 seconds
forkStreams
.forEach(fork -> {
KStream<Windowed<String>, Long> tbl =
fork.transformValues(new SourceTopicValueTransformerSupplier())
.selectKey((key, value) -> "foobar")
.groupByKey()
.windowedBy(TimeWindows.of(2000L))
.count()
.toStream();

tbl
.foreach((key, count) -> {
String fromTo = String.format("%d-%d",
key.window().start(),
key.window().end());
System.out.printf("(Thread %d, Index %d) %s - %s: %dn",
Thread.currentThread().getId(),
forkStreams.indexOf(fork),
fromTo, key.key(), count);
});


Here's a snippet of the output



<snip>
(Thread 13, Index 1) 1542132126000-1542132128000 - foobar: 2870
(Thread 13, Index 1) 1542132024000-1542132026000 - foobar: 2955
(Thread 13, Index 1) 1542132106000-1542132108000 - foobar: 1914
(Thread 13, Index 1) 1542132054000-1542132056000 - foobar: 546
<snip>
(Thread 13, Index 2) 1542132070000-1542132072000 - foobar: 524
(Thread 13, Index 2) 1542132012000-1542132014000 - foobar: 2491
(Thread 13, Index 2) 1542132042000-1542132044000 - foobar: 261
(Thread 13, Index 2) 1542132022000-1542132024000 - foobar: 2823
<snip>
(Thread 13, Index 3) 1542132088000-1542132090000 - foobar: 2170
(Thread 13, Index 3) 1542132010000-1542132012000 - foobar: 2962
(Thread 13, Index 3) 1542132008000-1542132010000 - foobar: 2847
(Thread 13, Index 3) 1542132022000-1542132024000 - foobar: 2797
<snip>
(Thread 13, Index 4) 1542132046000-1542132048000 - foobar: 2846
(Thread 13, Index 4) 1542132096000-1542132098000 - foobar: 3216
(Thread 13, Index 4) 1542132108000-1542132110000 - foobar: 2696
(Thread 13, Index 4) 1542132010000-1542132012000 - foobar: 2881
<snip>


Any suggestions as to how to place each fork stream in a different thread is appreciated.










share|improve this question




















  • 1





    Why don't you read both topics at once: StreamsBuilder.stream("x", "y");

    – Matthias J. Sax
    Nov 24 '18 at 5:24











  • There's too much volume for one thread to read from all the topics. I'm trying to avoid pegging the CPU on the source stream.

    – nads
    Nov 24 '18 at 17:08






  • 1





    Topics are scaled via partitions -- also Kafka Streams scales per-partition. Thus, if both input topics have 10 partitions, you can run up to 10 threads, and each thread would process 2 partitions. Would this work for you?

    – Matthias J. Sax
    Nov 24 '18 at 21:49











  • I am aware of the scale-by-partition concept. It's the records within a single partition for which I need more than 1 CPU to process. What I want to see is that the source stream just keeps on reading records and handing them off to the generic processors (branches).

    – nads
    Nov 24 '18 at 23:03













  • For this case, you would need to leave the topology as is, and add to() statements to both parallel running (not connected) parts of the topology. It is ok to write into the same topic multiple times -- of course, there won't be any ordering guarantees for this case. Kafka Streams processed partitions of two different topics only with multiple threads, if they are not connected in the topology graph.

    – Matthias J. Sax
    Nov 25 '18 at 4:18


















0















This is a generalization of this question.



Suppose I have multiple source streams for which the same set of predicates apply. I would like to set up branch streams such that records which satisfy the predicate, regardless of which source stream, is processed by the same branch stream. As the diagram below shows each branch stream is like a generic processor which transforms incoming records.



enter image description here



The following code block does not work as it should be since it creates a distinct set of branch streams for each source stream.



StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source1 = builder.stream("x");
KStream<String, String> source2 = builder.stream("y");

Predicate<String, String> branchPredicates = new Predicate[forkCount];
for (int i = 0; i < forkCount; ++i) {
int idx = i;
branchPredicates[i] = ((key, value) ->
key.hashCode() % forkCount == idx);
}

Kstream<String, String> forkStreams = Arrays.asList(source1, source2)
.map(srcStream -> srcStream.branch(branchPredicates)
.flatMap(x -> Arrays.stream())
.collect(Collectors.toList());


sorry, I'm mostly a scala developer :)



In the above example, forkStreams.length == branchPredicates.length x 2 and in general, proportional to the number of source streams. Is there a trick in Kafka stream that allows me to keep a one-to-one relationship between the predicates and fork streams?



UPDATE 11/27/2018
There are some progress in that I can:




  • Read from all source topics using one source stream

  • Connect the source stream to multiple branches

  • Distribute messages evenly amount the branches.


However, as the following code block demonstrates ALL fork streams exist in the same thread. What I would like to achieve is to place each fork stream into a different thread to allow better CPU utilization



StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream(Arrays.asList("a", "b", "c")
// Create workers
// Need to have predicates for the branches
int totalPerdicates = Integer
.parseInt(props.getProperty(WORKER_PROCESSOR_COUNT));
Predicate<String, String> predicates = new Predicate[totalPerdicates];
IntStream
.range(0, totalPerdicates)
.forEach(i -> {
int idx = i;
predicates[i] = (key, value) ->
key.hashCode() % totalPerdicates == idx;
});

forkStreams = Arrays.asList(sourceStreams.branch(predicates));

// Hack- Dump the number of messages processed every 10 seconds
forkStreams
.forEach(fork -> {
KStream<Windowed<String>, Long> tbl =
fork.transformValues(new SourceTopicValueTransformerSupplier())
.selectKey((key, value) -> "foobar")
.groupByKey()
.windowedBy(TimeWindows.of(2000L))
.count()
.toStream();

tbl
.foreach((key, count) -> {
String fromTo = String.format("%d-%d",
key.window().start(),
key.window().end());
System.out.printf("(Thread %d, Index %d) %s - %s: %dn",
Thread.currentThread().getId(),
forkStreams.indexOf(fork),
fromTo, key.key(), count);
});


Here's a snippet of the output



<snip>
(Thread 13, Index 1) 1542132126000-1542132128000 - foobar: 2870
(Thread 13, Index 1) 1542132024000-1542132026000 - foobar: 2955
(Thread 13, Index 1) 1542132106000-1542132108000 - foobar: 1914
(Thread 13, Index 1) 1542132054000-1542132056000 - foobar: 546
<snip>
(Thread 13, Index 2) 1542132070000-1542132072000 - foobar: 524
(Thread 13, Index 2) 1542132012000-1542132014000 - foobar: 2491
(Thread 13, Index 2) 1542132042000-1542132044000 - foobar: 261
(Thread 13, Index 2) 1542132022000-1542132024000 - foobar: 2823
<snip>
(Thread 13, Index 3) 1542132088000-1542132090000 - foobar: 2170
(Thread 13, Index 3) 1542132010000-1542132012000 - foobar: 2962
(Thread 13, Index 3) 1542132008000-1542132010000 - foobar: 2847
(Thread 13, Index 3) 1542132022000-1542132024000 - foobar: 2797
<snip>
(Thread 13, Index 4) 1542132046000-1542132048000 - foobar: 2846
(Thread 13, Index 4) 1542132096000-1542132098000 - foobar: 3216
(Thread 13, Index 4) 1542132108000-1542132110000 - foobar: 2696
(Thread 13, Index 4) 1542132010000-1542132012000 - foobar: 2881
<snip>


Any suggestions as to how to place each fork stream in a different thread is appreciated.










share|improve this question




















  • 1





    Why don't you read both topics at once: StreamsBuilder.stream("x", "y");

    – Matthias J. Sax
    Nov 24 '18 at 5:24











  • There's too much volume for one thread to read from all the topics. I'm trying to avoid pegging the CPU on the source stream.

    – nads
    Nov 24 '18 at 17:08






  • 1





    Topics are scaled via partitions -- also Kafka Streams scales per-partition. Thus, if both input topics have 10 partitions, you can run up to 10 threads, and each thread would process 2 partitions. Would this work for you?

    – Matthias J. Sax
    Nov 24 '18 at 21:49











  • I am aware of the scale-by-partition concept. It's the records within a single partition for which I need more than 1 CPU to process. What I want to see is that the source stream just keeps on reading records and handing them off to the generic processors (branches).

    – nads
    Nov 24 '18 at 23:03













  • For this case, you would need to leave the topology as is, and add to() statements to both parallel running (not connected) parts of the topology. It is ok to write into the same topic multiple times -- of course, there won't be any ordering guarantees for this case. Kafka Streams processed partitions of two different topics only with multiple threads, if they are not connected in the topology graph.

    – Matthias J. Sax
    Nov 25 '18 at 4:18














0












0








0


1






This is a generalization of this question.



Suppose I have multiple source streams for which the same set of predicates apply. I would like to set up branch streams such that records which satisfy the predicate, regardless of which source stream, is processed by the same branch stream. As the diagram below shows each branch stream is like a generic processor which transforms incoming records.



enter image description here



The following code block does not work as it should be since it creates a distinct set of branch streams for each source stream.



StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source1 = builder.stream("x");
KStream<String, String> source2 = builder.stream("y");

Predicate<String, String> branchPredicates = new Predicate[forkCount];
for (int i = 0; i < forkCount; ++i) {
int idx = i;
branchPredicates[i] = ((key, value) ->
key.hashCode() % forkCount == idx);
}

Kstream<String, String> forkStreams = Arrays.asList(source1, source2)
.map(srcStream -> srcStream.branch(branchPredicates)
.flatMap(x -> Arrays.stream())
.collect(Collectors.toList());


sorry, I'm mostly a scala developer :)



In the above example, forkStreams.length == branchPredicates.length x 2 and in general, proportional to the number of source streams. Is there a trick in Kafka stream that allows me to keep a one-to-one relationship between the predicates and fork streams?



UPDATE 11/27/2018
There are some progress in that I can:




  • Read from all source topics using one source stream

  • Connect the source stream to multiple branches

  • Distribute messages evenly amount the branches.


However, as the following code block demonstrates ALL fork streams exist in the same thread. What I would like to achieve is to place each fork stream into a different thread to allow better CPU utilization



StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream(Arrays.asList("a", "b", "c")
// Create workers
// Need to have predicates for the branches
int totalPerdicates = Integer
.parseInt(props.getProperty(WORKER_PROCESSOR_COUNT));
Predicate<String, String> predicates = new Predicate[totalPerdicates];
IntStream
.range(0, totalPerdicates)
.forEach(i -> {
int idx = i;
predicates[i] = (key, value) ->
key.hashCode() % totalPerdicates == idx;
});

forkStreams = Arrays.asList(sourceStreams.branch(predicates));

// Hack- Dump the number of messages processed every 10 seconds
forkStreams
.forEach(fork -> {
KStream<Windowed<String>, Long> tbl =
fork.transformValues(new SourceTopicValueTransformerSupplier())
.selectKey((key, value) -> "foobar")
.groupByKey()
.windowedBy(TimeWindows.of(2000L))
.count()
.toStream();

tbl
.foreach((key, count) -> {
String fromTo = String.format("%d-%d",
key.window().start(),
key.window().end());
System.out.printf("(Thread %d, Index %d) %s - %s: %dn",
Thread.currentThread().getId(),
forkStreams.indexOf(fork),
fromTo, key.key(), count);
});


Here's a snippet of the output



<snip>
(Thread 13, Index 1) 1542132126000-1542132128000 - foobar: 2870
(Thread 13, Index 1) 1542132024000-1542132026000 - foobar: 2955
(Thread 13, Index 1) 1542132106000-1542132108000 - foobar: 1914
(Thread 13, Index 1) 1542132054000-1542132056000 - foobar: 546
<snip>
(Thread 13, Index 2) 1542132070000-1542132072000 - foobar: 524
(Thread 13, Index 2) 1542132012000-1542132014000 - foobar: 2491
(Thread 13, Index 2) 1542132042000-1542132044000 - foobar: 261
(Thread 13, Index 2) 1542132022000-1542132024000 - foobar: 2823
<snip>
(Thread 13, Index 3) 1542132088000-1542132090000 - foobar: 2170
(Thread 13, Index 3) 1542132010000-1542132012000 - foobar: 2962
(Thread 13, Index 3) 1542132008000-1542132010000 - foobar: 2847
(Thread 13, Index 3) 1542132022000-1542132024000 - foobar: 2797
<snip>
(Thread 13, Index 4) 1542132046000-1542132048000 - foobar: 2846
(Thread 13, Index 4) 1542132096000-1542132098000 - foobar: 3216
(Thread 13, Index 4) 1542132108000-1542132110000 - foobar: 2696
(Thread 13, Index 4) 1542132010000-1542132012000 - foobar: 2881
<snip>


Any suggestions as to how to place each fork stream in a different thread is appreciated.










share|improve this question
















This is a generalization of this question.



Suppose I have multiple source streams for which the same set of predicates apply. I would like to set up branch streams such that records which satisfy the predicate, regardless of which source stream, is processed by the same branch stream. As the diagram below shows each branch stream is like a generic processor which transforms incoming records.



enter image description here



The following code block does not work as it should be since it creates a distinct set of branch streams for each source stream.



StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source1 = builder.stream("x");
KStream<String, String> source2 = builder.stream("y");

Predicate<String, String> branchPredicates = new Predicate[forkCount];
for (int i = 0; i < forkCount; ++i) {
int idx = i;
branchPredicates[i] = ((key, value) ->
key.hashCode() % forkCount == idx);
}

Kstream<String, String> forkStreams = Arrays.asList(source1, source2)
.map(srcStream -> srcStream.branch(branchPredicates)
.flatMap(x -> Arrays.stream())
.collect(Collectors.toList());


sorry, I'm mostly a scala developer :)



In the above example, forkStreams.length == branchPredicates.length x 2 and in general, proportional to the number of source streams. Is there a trick in Kafka stream that allows me to keep a one-to-one relationship between the predicates and fork streams?



UPDATE 11/27/2018
There are some progress in that I can:




  • Read from all source topics using one source stream

  • Connect the source stream to multiple branches

  • Distribute messages evenly amount the branches.


However, as the following code block demonstrates ALL fork streams exist in the same thread. What I would like to achieve is to place each fork stream into a different thread to allow better CPU utilization



StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream(Arrays.asList("a", "b", "c")
// Create workers
// Need to have predicates for the branches
int totalPerdicates = Integer
.parseInt(props.getProperty(WORKER_PROCESSOR_COUNT));
Predicate<String, String> predicates = new Predicate[totalPerdicates];
IntStream
.range(0, totalPerdicates)
.forEach(i -> {
int idx = i;
predicates[i] = (key, value) ->
key.hashCode() % totalPerdicates == idx;
});

forkStreams = Arrays.asList(sourceStreams.branch(predicates));

// Hack- Dump the number of messages processed every 10 seconds
forkStreams
.forEach(fork -> {
KStream<Windowed<String>, Long> tbl =
fork.transformValues(new SourceTopicValueTransformerSupplier())
.selectKey((key, value) -> "foobar")
.groupByKey()
.windowedBy(TimeWindows.of(2000L))
.count()
.toStream();

tbl
.foreach((key, count) -> {
String fromTo = String.format("%d-%d",
key.window().start(),
key.window().end());
System.out.printf("(Thread %d, Index %d) %s - %s: %dn",
Thread.currentThread().getId(),
forkStreams.indexOf(fork),
fromTo, key.key(), count);
});


Here's a snippet of the output



<snip>
(Thread 13, Index 1) 1542132126000-1542132128000 - foobar: 2870
(Thread 13, Index 1) 1542132024000-1542132026000 - foobar: 2955
(Thread 13, Index 1) 1542132106000-1542132108000 - foobar: 1914
(Thread 13, Index 1) 1542132054000-1542132056000 - foobar: 546
<snip>
(Thread 13, Index 2) 1542132070000-1542132072000 - foobar: 524
(Thread 13, Index 2) 1542132012000-1542132014000 - foobar: 2491
(Thread 13, Index 2) 1542132042000-1542132044000 - foobar: 261
(Thread 13, Index 2) 1542132022000-1542132024000 - foobar: 2823
<snip>
(Thread 13, Index 3) 1542132088000-1542132090000 - foobar: 2170
(Thread 13, Index 3) 1542132010000-1542132012000 - foobar: 2962
(Thread 13, Index 3) 1542132008000-1542132010000 - foobar: 2847
(Thread 13, Index 3) 1542132022000-1542132024000 - foobar: 2797
<snip>
(Thread 13, Index 4) 1542132046000-1542132048000 - foobar: 2846
(Thread 13, Index 4) 1542132096000-1542132098000 - foobar: 3216
(Thread 13, Index 4) 1542132108000-1542132110000 - foobar: 2696
(Thread 13, Index 4) 1542132010000-1542132012000 - foobar: 2881
<snip>


Any suggestions as to how to place each fork stream in a different thread is appreciated.







java java-8 apache-kafka apache-kafka-streams






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 27 '18 at 21:44







nads

















asked Nov 24 '18 at 3:48









nadsnads

458




458








  • 1





    Why don't you read both topics at once: StreamsBuilder.stream("x", "y");

    – Matthias J. Sax
    Nov 24 '18 at 5:24











  • There's too much volume for one thread to read from all the topics. I'm trying to avoid pegging the CPU on the source stream.

    – nads
    Nov 24 '18 at 17:08






  • 1





    Topics are scaled via partitions -- also Kafka Streams scales per-partition. Thus, if both input topics have 10 partitions, you can run up to 10 threads, and each thread would process 2 partitions. Would this work for you?

    – Matthias J. Sax
    Nov 24 '18 at 21:49











  • I am aware of the scale-by-partition concept. It's the records within a single partition for which I need more than 1 CPU to process. What I want to see is that the source stream just keeps on reading records and handing them off to the generic processors (branches).

    – nads
    Nov 24 '18 at 23:03













  • For this case, you would need to leave the topology as is, and add to() statements to both parallel running (not connected) parts of the topology. It is ok to write into the same topic multiple times -- of course, there won't be any ordering guarantees for this case. Kafka Streams processed partitions of two different topics only with multiple threads, if they are not connected in the topology graph.

    – Matthias J. Sax
    Nov 25 '18 at 4:18














  • 1





    Why don't you read both topics at once: StreamsBuilder.stream("x", "y");

    – Matthias J. Sax
    Nov 24 '18 at 5:24











  • There's too much volume for one thread to read from all the topics. I'm trying to avoid pegging the CPU on the source stream.

    – nads
    Nov 24 '18 at 17:08






  • 1





    Topics are scaled via partitions -- also Kafka Streams scales per-partition. Thus, if both input topics have 10 partitions, you can run up to 10 threads, and each thread would process 2 partitions. Would this work for you?

    – Matthias J. Sax
    Nov 24 '18 at 21:49











  • I am aware of the scale-by-partition concept. It's the records within a single partition for which I need more than 1 CPU to process. What I want to see is that the source stream just keeps on reading records and handing them off to the generic processors (branches).

    – nads
    Nov 24 '18 at 23:03













  • For this case, you would need to leave the topology as is, and add to() statements to both parallel running (not connected) parts of the topology. It is ok to write into the same topic multiple times -- of course, there won't be any ordering guarantees for this case. Kafka Streams processed partitions of two different topics only with multiple threads, if they are not connected in the topology graph.

    – Matthias J. Sax
    Nov 25 '18 at 4:18








1




1





Why don't you read both topics at once: StreamsBuilder.stream("x", "y");

– Matthias J. Sax
Nov 24 '18 at 5:24





Why don't you read both topics at once: StreamsBuilder.stream("x", "y");

– Matthias J. Sax
Nov 24 '18 at 5:24













There's too much volume for one thread to read from all the topics. I'm trying to avoid pegging the CPU on the source stream.

– nads
Nov 24 '18 at 17:08





There's too much volume for one thread to read from all the topics. I'm trying to avoid pegging the CPU on the source stream.

– nads
Nov 24 '18 at 17:08




1




1





Topics are scaled via partitions -- also Kafka Streams scales per-partition. Thus, if both input topics have 10 partitions, you can run up to 10 threads, and each thread would process 2 partitions. Would this work for you?

– Matthias J. Sax
Nov 24 '18 at 21:49





Topics are scaled via partitions -- also Kafka Streams scales per-partition. Thus, if both input topics have 10 partitions, you can run up to 10 threads, and each thread would process 2 partitions. Would this work for you?

– Matthias J. Sax
Nov 24 '18 at 21:49













I am aware of the scale-by-partition concept. It's the records within a single partition for which I need more than 1 CPU to process. What I want to see is that the source stream just keeps on reading records and handing them off to the generic processors (branches).

– nads
Nov 24 '18 at 23:03







I am aware of the scale-by-partition concept. It's the records within a single partition for which I need more than 1 CPU to process. What I want to see is that the source stream just keeps on reading records and handing them off to the generic processors (branches).

– nads
Nov 24 '18 at 23:03















For this case, you would need to leave the topology as is, and add to() statements to both parallel running (not connected) parts of the topology. It is ok to write into the same topic multiple times -- of course, there won't be any ordering guarantees for this case. Kafka Streams processed partitions of two different topics only with multiple threads, if they are not connected in the topology graph.

– Matthias J. Sax
Nov 25 '18 at 4:18





For this case, you would need to leave the topology as is, and add to() statements to both parallel running (not connected) parts of the topology. It is ok to write into the same topic multiple times -- of course, there won't be any ordering guarantees for this case. Kafka Streams processed partitions of two different topics only with multiple threads, if they are not connected in the topology graph.

– Matthias J. Sax
Nov 25 '18 at 4:18












1 Answer
1






active

oldest

votes


















0














The update on 11/27/2018 has answered the question. That being said, the solution does not work for me as I wanted for each fork stream to run as a separate thread. Calling stream.branch() creates multiple sub-streams within the same thread space. Thus all records within a partition are processed in the same thread space.



To achieve sub-partition processing, I ended up using kafka client API in conjunction with java threads and concurrent queues.






share|improve this answer
























    Your Answer






    StackExchange.ifUsing("editor", function () {
    StackExchange.using("externalEditor", function () {
    StackExchange.using("snippets", function () {
    StackExchange.snippets.init();
    });
    });
    }, "code-snippets");

    StackExchange.ready(function() {
    var channelOptions = {
    tags: "".split(" "),
    id: "1"
    };
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function() {
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled) {
    StackExchange.using("snippets", function() {
    createEditor();
    });
    }
    else {
    createEditor();
    }
    });

    function createEditor() {
    StackExchange.prepareEditor({
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader: {
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    },
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    });


    }
    });














    draft saved

    draft discarded


















    StackExchange.ready(
    function () {
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53455004%2fconnecting-multiple-source-streams-to-the-same-set-of-branches%23new-answer', 'question_page');
    }
    );

    Post as a guest















    Required, but never shown

























    1 Answer
    1






    active

    oldest

    votes








    1 Answer
    1






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    0














    The update on 11/27/2018 has answered the question. That being said, the solution does not work for me as I wanted for each fork stream to run as a separate thread. Calling stream.branch() creates multiple sub-streams within the same thread space. Thus all records within a partition are processed in the same thread space.



    To achieve sub-partition processing, I ended up using kafka client API in conjunction with java threads and concurrent queues.






    share|improve this answer




























      0














      The update on 11/27/2018 has answered the question. That being said, the solution does not work for me as I wanted for each fork stream to run as a separate thread. Calling stream.branch() creates multiple sub-streams within the same thread space. Thus all records within a partition are processed in the same thread space.



      To achieve sub-partition processing, I ended up using kafka client API in conjunction with java threads and concurrent queues.






      share|improve this answer


























        0












        0








        0







        The update on 11/27/2018 has answered the question. That being said, the solution does not work for me as I wanted for each fork stream to run as a separate thread. Calling stream.branch() creates multiple sub-streams within the same thread space. Thus all records within a partition are processed in the same thread space.



        To achieve sub-partition processing, I ended up using kafka client API in conjunction with java threads and concurrent queues.






        share|improve this answer













        The update on 11/27/2018 has answered the question. That being said, the solution does not work for me as I wanted for each fork stream to run as a separate thread. Calling stream.branch() creates multiple sub-streams within the same thread space. Thus all records within a partition are processed in the same thread space.



        To achieve sub-partition processing, I ended up using kafka client API in conjunction with java threads and concurrent queues.







        share|improve this answer












        share|improve this answer



        share|improve this answer










        answered Nov 30 '18 at 6:55









        nadsnads

        458




        458
































            draft saved

            draft discarded




















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid



            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.


            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53455004%2fconnecting-multiple-source-streams-to-the-same-set-of-branches%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            這個網誌中的熱門文章

            Xamarin.form Move up view when keyboard appear

            Post-Redirect-Get with Spring WebFlux and Thymeleaf

            Anylogic : not able to use stopDelay()