Optimal orchestration of list of network calls and processing tasks in Java





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ height:90px;width:728px;box-sizing:border-box;
}







0















I have the following workflow:
There are n records that need to be retrieved over the network and subsequently n expensive computations that need to be done on each. Put in code, this would look like:



List<Integer> ids = {1,2,....n};
ids.forEach(id -> {
Record r = RetrieveRecord(id); // Blocking IO
ProcessRecord(r); // CPU Intensive
})


I would like to convert the blocking part into async so that the time is minimized with a single thread- essentially, by ensuring that record i+1 is being retrieved when record i is being processed. So that the execution would look like:



Retrieve(1).start()
Retrieve(1).onEnd(() -> { start Retrieve(2), Process(1) })
Retrieve(2).onEnd(() -> { start Retrieve(3), Process(2) })
....


Now I can come up with the naive way to implement this with a List<> and CompletableFuture, but this would require me to handle the first record differently.



Is there a more elegant way of solving this with something like reactive streams?
A solution that would maybe let me easily configure how many records Process() can trail behind Retreive()?










share|improve this question

























  • RetrieveRecord is a blocking operation. It always blocks some thread. If you move it to a single thread, then this thread would be execute all the retrievings sequentially and the time would be maximized, not minimized.

    – Alexei Kaigorodov
    Nov 23 '18 at 14:55











  • I already have a non blocking implementation of Retrieve() which returns a CompletableFuture. What I am trying to achieve here is a (strictly) bounded prefetch of next k records (k=1 in this case). Basically I dont want all 100 RetrieveRecords to be triggered in the start, neither do I want to wait till Process(i) is complete before starting Retreive(i+1). Basically Retrieve(i+k) should be happening over the wire while Process(i) is running

    – Erric
    Nov 23 '18 at 15:08




















0















I have the following workflow:
There are n records that need to be retrieved over the network and subsequently n expensive computations that need to be done on each. Put in code, this would look like:



List<Integer> ids = {1,2,....n};
ids.forEach(id -> {
Record r = RetrieveRecord(id); // Blocking IO
ProcessRecord(r); // CPU Intensive
})


I would like to convert the blocking part into async so that the time is minimized with a single thread- essentially, by ensuring that record i+1 is being retrieved when record i is being processed. So that the execution would look like:



Retrieve(1).start()
Retrieve(1).onEnd(() -> { start Retrieve(2), Process(1) })
Retrieve(2).onEnd(() -> { start Retrieve(3), Process(2) })
....


Now I can come up with the naive way to implement this with a List<> and CompletableFuture, but this would require me to handle the first record differently.



Is there a more elegant way of solving this with something like reactive streams?
A solution that would maybe let me easily configure how many records Process() can trail behind Retreive()?










share|improve this question

























  • RetrieveRecord is a blocking operation. It always blocks some thread. If you move it to a single thread, then this thread would be execute all the retrievings sequentially and the time would be maximized, not minimized.

    – Alexei Kaigorodov
    Nov 23 '18 at 14:55











  • I already have a non blocking implementation of Retrieve() which returns a CompletableFuture. What I am trying to achieve here is a (strictly) bounded prefetch of next k records (k=1 in this case). Basically I dont want all 100 RetrieveRecords to be triggered in the start, neither do I want to wait till Process(i) is complete before starting Retreive(i+1). Basically Retrieve(i+k) should be happening over the wire while Process(i) is running

    – Erric
    Nov 23 '18 at 15:08
















0












0








0








I have the following workflow:
There are n records that need to be retrieved over the network and subsequently n expensive computations that need to be done on each. Put in code, this would look like:



List<Integer> ids = {1,2,....n};
ids.forEach(id -> {
Record r = RetrieveRecord(id); // Blocking IO
ProcessRecord(r); // CPU Intensive
})


I would like to convert the blocking part into async so that the time is minimized with a single thread- essentially, by ensuring that record i+1 is being retrieved when record i is being processed. So that the execution would look like:



Retrieve(1).start()
Retrieve(1).onEnd(() -> { start Retrieve(2), Process(1) })
Retrieve(2).onEnd(() -> { start Retrieve(3), Process(2) })
....


Now I can come up with the naive way to implement this with a List<> and CompletableFuture, but this would require me to handle the first record differently.



Is there a more elegant way of solving this with something like reactive streams?
A solution that would maybe let me easily configure how many records Process() can trail behind Retreive()?










share|improve this question
















I have the following workflow:
There are n records that need to be retrieved over the network and subsequently n expensive computations that need to be done on each. Put in code, this would look like:



List<Integer> ids = {1,2,....n};
ids.forEach(id -> {
Record r = RetrieveRecord(id); // Blocking IO
ProcessRecord(r); // CPU Intensive
})


I would like to convert the blocking part into async so that the time is minimized with a single thread- essentially, by ensuring that record i+1 is being retrieved when record i is being processed. So that the execution would look like:



Retrieve(1).start()
Retrieve(1).onEnd(() -> { start Retrieve(2), Process(1) })
Retrieve(2).onEnd(() -> { start Retrieve(3), Process(2) })
....


Now I can come up with the naive way to implement this with a List<> and CompletableFuture, but this would require me to handle the first record differently.



Is there a more elegant way of solving this with something like reactive streams?
A solution that would maybe let me easily configure how many records Process() can trail behind Retreive()?







java asynchronous stream






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 23 '18 at 15:07







Erric

















asked Nov 23 '18 at 14:15









ErricErric

340421




340421













  • RetrieveRecord is a blocking operation. It always blocks some thread. If you move it to a single thread, then this thread would be execute all the retrievings sequentially and the time would be maximized, not minimized.

    – Alexei Kaigorodov
    Nov 23 '18 at 14:55











  • I already have a non blocking implementation of Retrieve() which returns a CompletableFuture. What I am trying to achieve here is a (strictly) bounded prefetch of next k records (k=1 in this case). Basically I dont want all 100 RetrieveRecords to be triggered in the start, neither do I want to wait till Process(i) is complete before starting Retreive(i+1). Basically Retrieve(i+k) should be happening over the wire while Process(i) is running

    – Erric
    Nov 23 '18 at 15:08





















  • RetrieveRecord is a blocking operation. It always blocks some thread. If you move it to a single thread, then this thread would be execute all the retrievings sequentially and the time would be maximized, not minimized.

    – Alexei Kaigorodov
    Nov 23 '18 at 14:55











  • I already have a non blocking implementation of Retrieve() which returns a CompletableFuture. What I am trying to achieve here is a (strictly) bounded prefetch of next k records (k=1 in this case). Basically I dont want all 100 RetrieveRecords to be triggered in the start, neither do I want to wait till Process(i) is complete before starting Retreive(i+1). Basically Retrieve(i+k) should be happening over the wire while Process(i) is running

    – Erric
    Nov 23 '18 at 15:08



















RetrieveRecord is a blocking operation. It always blocks some thread. If you move it to a single thread, then this thread would be execute all the retrievings sequentially and the time would be maximized, not minimized.

– Alexei Kaigorodov
Nov 23 '18 at 14:55





RetrieveRecord is a blocking operation. It always blocks some thread. If you move it to a single thread, then this thread would be execute all the retrievings sequentially and the time would be maximized, not minimized.

– Alexei Kaigorodov
Nov 23 '18 at 14:55













I already have a non blocking implementation of Retrieve() which returns a CompletableFuture. What I am trying to achieve here is a (strictly) bounded prefetch of next k records (k=1 in this case). Basically I dont want all 100 RetrieveRecords to be triggered in the start, neither do I want to wait till Process(i) is complete before starting Retreive(i+1). Basically Retrieve(i+k) should be happening over the wire while Process(i) is running

– Erric
Nov 23 '18 at 15:08







I already have a non blocking implementation of Retrieve() which returns a CompletableFuture. What I am trying to achieve here is a (strictly) bounded prefetch of next k records (k=1 in this case). Basically I dont want all 100 RetrieveRecords to be triggered in the start, neither do I want to wait till Process(i) is complete before starting Retreive(i+1). Basically Retrieve(i+k) should be happening over the wire while Process(i) is running

– Erric
Nov 23 '18 at 15:08














3 Answers
3






active

oldest

votes


















1














So you have N tasks and want to run them in parallel but no more than K tasks simultaneously. Most natural way is to have a task generator and a permission counter with K permissions initially. Task generator creates K tasks and waits for more permissions. Each permission is owned by some task and is returned when the task ends. Standard permission counter in Java is class java.util.concurrent.Semaphore:



List<Integer> ids = {1,2,....n};
Semaphore sem = new Semaphore(K);
ids.forEach(id -> {
sem.aquire();
CompletableFuture<Data> fut = Retrieve(id);
fut.thenRun(sem::release);
fut.thenAcceptAsync(this::ProcessRecord, someExecutor);
})


Since the task generator occupies only one thread, there is little sense to make it asynchronous. If, however, you don't want to use a dedicated thread for task generator and want to implement asynchronous solution, then the main question is what class can play the role of asynchronous permission counter. You have 3 options:




  • use implicit asynchronous permission counter which is a part of reactive streams, found in RxJava, project Reactor etc.

  • use explicit asynchronous semaphore org.df4j.core.boundconnector.permitstream.Semafor included in my asynchronous library df4j

  • make it yourself






share|improve this answer
























  • Great! Could you give an example/link to the "implicit asynchronous permission counter" in RxJava? I tried a lot to find how to do this with RxJava since my project already has a dependency on this

    – Erric
    Nov 23 '18 at 17:24











  • Learn how to use backpressure in RxJava. Backpressure limits the number of items in a message queue, and you need to limit the number of asynchronous tasks. Map tasks to messages somehow.

    – Alexei Kaigorodov
    Nov 23 '18 at 17:35











  • But backpressure in RxJava has either "discard", "buffer" or "latest" strategy, which does not seem to fit my needs

    – Erric
    Nov 23 '18 at 17:38











  • I am not a specialist in RxJava, but I think strategy.BUFFER is ok for you, as it does not drops items. Task generator is implemented as an Observable, pushing tasks into a reactive stream and waiting when the stream is able to accept next task. The most tricky part is an Observer on the other end of the stream. It should take the next task and asynchronously wait when it is complete. Then start ProcessRecord() and take the next task.

    – Alexei Kaigorodov
    Nov 23 '18 at 17:50











  • I mostly get stuck with how to provide "k" (like in your snippet) when implementing with RxJava. All my attempts ended up with things working like k = 0 or k = N

    – Erric
    Nov 23 '18 at 17:57





















0














Solution using df4j, with explicit asynchronous semaphore:



import org.df4j.core.boundconnector.permitstream.Semafor;
import org.df4j.core.tasknode.Action;
import org.df4j.core.tasknode.messagestream.Actor;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;

public class AsyncSemaDemo extends Actor {
List<Integer> ids = Arrays.asList(1, 2, 3, 4, 5);
Semafor sema = new Semafor(this, 2);
Iterator<Integer> iter = ids.iterator();
int tick = 100; // millis
CountDownLatch done = new CountDownLatch(ids.size());
long start = System.currentTimeMillis();

private void printClock(String s) {
long ticks = (System.currentTimeMillis() - start)/tick;
System.out.println(Long.toString(ticks) + " " + s);
}

CompletableFuture<Integer> Retrieve(Integer e) {
return CompletableFuture.supplyAsync(() -> {
printClock("Req " + e + " started");
try {
Thread.sleep(tick); // Network
} catch (InterruptedException ex) {
}
printClock(" Req " + e + " done");
return e;
}, executor);
}

void ProcessRecord(Integer s) {
printClock(" Proc " + s + " started");
try {
Thread.sleep(tick*2); // Compute
} catch (InterruptedException ex) {
}
printClock(" Proc " + s + " done");
}

@Action
public void act() {
if (iter.hasNext()) {
CompletableFuture<Integer> fut = Retrieve(iter.next());
fut.thenRun(sema::release);
fut.thenAcceptAsync(this::ProcessRecord, executor)
.thenRun(done::countDown);
} else {
super.stop();
}
}

public static void main(String args) throws InterruptedException {
AsyncSemaDemo asyncSemaDemo = new AsyncSemaDemo();
asyncSemaDemo.start(ForkJoinPool.commonPool());
asyncSemaDemo.done.await();
}
}


its log should be:



0 Req 1 started
0 Req 2 started
1 Req 1 done
1 Proc 1 started
1 Req 3 started
1 Req 2 done
1 Proc 2 started
1 Req 4 started
2 Req 3 done
2 Proc 3 started
2 Req 5 started
2 Req 4 done
2 Proc 4 started
3 Proc 1 done
3 Req 5 done
3 Proc 5 started
3 Proc 2 done
4 Proc 3 done
4 Proc 4 done
5 Proc 5 done


Note how this solution is close to my previous answer with standard java.util.concurrent.Semaphore.






share|improve this answer
























  • This is spawns up k separate instances of Process() as well. I was looking at having only 1 Process() happening at any given point of time, while k requests for future records are in the air. I was able to acheive this with flatMap and concatMap methods: github.com/ReactiveX/RxJava#parallel-processing

    – Erric
    Nov 24 '18 at 6:23













  • why do you think it spawns new Processes? It does not. Process is not even mentioned in the program.

    – Alexei Kaigorodov
    Nov 24 '18 at 6:43











  • I meant Proc 1 and Proc 2 are started together here. I understand that in most scenarious you would want this. But I did not want my workflow to be compute intensive, hence restricting Proc 1 and Proc 2 to be running in parallel. Plus it seems nice to use RxJava and not end up managing semaphores yourself

    – Erric
    Nov 24 '18 at 6:46













  • For N records, total time in my approach would be Treq/K * N + K*Tproc whereas yours would be Treq/K * N + Tproc (faster, but at the expense of more CPU tasks in parallel)

    – Erric
    Nov 24 '18 at 6:58











  • if you want to run ProcessRecord() sequentially, just submit them to a dedicated single-threaded of serial executor.

    – Alexei Kaigorodov
    Nov 24 '18 at 7:30



















0














Here's what I finally came up with that seems to get the job done:



Flowable.just(1,2,3,4,5,6) // Completes in 1 + 6 * 3 = 19 secs
.concatMapEager(v->
Flowable.just(v)
.subscribeOn(Schedulers.io())
.map( e->{
System.out.println(getElapsed("Req " + e + " started");
Thread.sleep(1000); // Network: 1 sec
System.out.println(getElapsed("Req " + e + " done");
return e;
}, requestsOnWire, 1) // requestsOnWire = K = 2
.blockingSubscribe(new DisposableSubscriber<Integer>() {
@Override
protected void onStart() {
request(1);
}
@Override
public void onNext(Integer s) {
request(1);
System.out.println("Proc " + s + " started");
try {
Thread.sleep(3000); // Compute: 3 secs
System.out.println("Proc " + s + " done");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {

}
@Override
public void onComplete() {

}
});


Below is the execution order. Note that at any given point of time, there are 1 record being processed, at most 2 requests on wire and at most 2 unprocessed records in memory (Process trails behind by K=2) records:



 0 secs: Req 1 started
: Req 2 started
1 secs: Req 2 done
: Req 1 done
: Proc 1 started
: Req 3 started
: Req 4 started
2 secs: Req 3 done
: Req 4 done
4 secs: Proc 1 done
: Proc 2 started
: Req 5 started
5 secs: Req 5 done
7 secs: Proc 2 done
: Proc 3 started
: Req 6 started
8 secs: Req 6 done
10 secs: Proc 3 done
: Proc 4 started
13 secs: Proc 4 done
: Proc 5 started
16 secs: Proc 5 done
: Proc 6 started
19 secs: Proc 6 done


Hope there are no anti-patterns/pitfalls here.






share|improve this answer


























  • Note that increasing concurrency K is not beneficial in this example, but in scenarios where network takes longer than processing, increasing K speeds up the process

    – Erric
    Nov 24 '18 at 6:44












Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53448313%2foptimal-orchestration-of-list-of-network-calls-and-processing-tasks-in-java%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























3 Answers
3






active

oldest

votes








3 Answers
3






active

oldest

votes









active

oldest

votes






active

oldest

votes









1














So you have N tasks and want to run them in parallel but no more than K tasks simultaneously. Most natural way is to have a task generator and a permission counter with K permissions initially. Task generator creates K tasks and waits for more permissions. Each permission is owned by some task and is returned when the task ends. Standard permission counter in Java is class java.util.concurrent.Semaphore:



List<Integer> ids = {1,2,....n};
Semaphore sem = new Semaphore(K);
ids.forEach(id -> {
sem.aquire();
CompletableFuture<Data> fut = Retrieve(id);
fut.thenRun(sem::release);
fut.thenAcceptAsync(this::ProcessRecord, someExecutor);
})


Since the task generator occupies only one thread, there is little sense to make it asynchronous. If, however, you don't want to use a dedicated thread for task generator and want to implement asynchronous solution, then the main question is what class can play the role of asynchronous permission counter. You have 3 options:




  • use implicit asynchronous permission counter which is a part of reactive streams, found in RxJava, project Reactor etc.

  • use explicit asynchronous semaphore org.df4j.core.boundconnector.permitstream.Semafor included in my asynchronous library df4j

  • make it yourself






share|improve this answer
























  • Great! Could you give an example/link to the "implicit asynchronous permission counter" in RxJava? I tried a lot to find how to do this with RxJava since my project already has a dependency on this

    – Erric
    Nov 23 '18 at 17:24











  • Learn how to use backpressure in RxJava. Backpressure limits the number of items in a message queue, and you need to limit the number of asynchronous tasks. Map tasks to messages somehow.

    – Alexei Kaigorodov
    Nov 23 '18 at 17:35











  • But backpressure in RxJava has either "discard", "buffer" or "latest" strategy, which does not seem to fit my needs

    – Erric
    Nov 23 '18 at 17:38











  • I am not a specialist in RxJava, but I think strategy.BUFFER is ok for you, as it does not drops items. Task generator is implemented as an Observable, pushing tasks into a reactive stream and waiting when the stream is able to accept next task. The most tricky part is an Observer on the other end of the stream. It should take the next task and asynchronously wait when it is complete. Then start ProcessRecord() and take the next task.

    – Alexei Kaigorodov
    Nov 23 '18 at 17:50











  • I mostly get stuck with how to provide "k" (like in your snippet) when implementing with RxJava. All my attempts ended up with things working like k = 0 or k = N

    – Erric
    Nov 23 '18 at 17:57


















1














So you have N tasks and want to run them in parallel but no more than K tasks simultaneously. Most natural way is to have a task generator and a permission counter with K permissions initially. Task generator creates K tasks and waits for more permissions. Each permission is owned by some task and is returned when the task ends. Standard permission counter in Java is class java.util.concurrent.Semaphore:



List<Integer> ids = {1,2,....n};
Semaphore sem = new Semaphore(K);
ids.forEach(id -> {
sem.aquire();
CompletableFuture<Data> fut = Retrieve(id);
fut.thenRun(sem::release);
fut.thenAcceptAsync(this::ProcessRecord, someExecutor);
})


Since the task generator occupies only one thread, there is little sense to make it asynchronous. If, however, you don't want to use a dedicated thread for task generator and want to implement asynchronous solution, then the main question is what class can play the role of asynchronous permission counter. You have 3 options:




  • use implicit asynchronous permission counter which is a part of reactive streams, found in RxJava, project Reactor etc.

  • use explicit asynchronous semaphore org.df4j.core.boundconnector.permitstream.Semafor included in my asynchronous library df4j

  • make it yourself






share|improve this answer
























  • Great! Could you give an example/link to the "implicit asynchronous permission counter" in RxJava? I tried a lot to find how to do this with RxJava since my project already has a dependency on this

    – Erric
    Nov 23 '18 at 17:24











  • Learn how to use backpressure in RxJava. Backpressure limits the number of items in a message queue, and you need to limit the number of asynchronous tasks. Map tasks to messages somehow.

    – Alexei Kaigorodov
    Nov 23 '18 at 17:35











  • But backpressure in RxJava has either "discard", "buffer" or "latest" strategy, which does not seem to fit my needs

    – Erric
    Nov 23 '18 at 17:38











  • I am not a specialist in RxJava, but I think strategy.BUFFER is ok for you, as it does not drops items. Task generator is implemented as an Observable, pushing tasks into a reactive stream and waiting when the stream is able to accept next task. The most tricky part is an Observer on the other end of the stream. It should take the next task and asynchronously wait when it is complete. Then start ProcessRecord() and take the next task.

    – Alexei Kaigorodov
    Nov 23 '18 at 17:50











  • I mostly get stuck with how to provide "k" (like in your snippet) when implementing with RxJava. All my attempts ended up with things working like k = 0 or k = N

    – Erric
    Nov 23 '18 at 17:57
















1












1








1







So you have N tasks and want to run them in parallel but no more than K tasks simultaneously. Most natural way is to have a task generator and a permission counter with K permissions initially. Task generator creates K tasks and waits for more permissions. Each permission is owned by some task and is returned when the task ends. Standard permission counter in Java is class java.util.concurrent.Semaphore:



List<Integer> ids = {1,2,....n};
Semaphore sem = new Semaphore(K);
ids.forEach(id -> {
sem.aquire();
CompletableFuture<Data> fut = Retrieve(id);
fut.thenRun(sem::release);
fut.thenAcceptAsync(this::ProcessRecord, someExecutor);
})


Since the task generator occupies only one thread, there is little sense to make it asynchronous. If, however, you don't want to use a dedicated thread for task generator and want to implement asynchronous solution, then the main question is what class can play the role of asynchronous permission counter. You have 3 options:




  • use implicit asynchronous permission counter which is a part of reactive streams, found in RxJava, project Reactor etc.

  • use explicit asynchronous semaphore org.df4j.core.boundconnector.permitstream.Semafor included in my asynchronous library df4j

  • make it yourself






share|improve this answer













So you have N tasks and want to run them in parallel but no more than K tasks simultaneously. Most natural way is to have a task generator and a permission counter with K permissions initially. Task generator creates K tasks and waits for more permissions. Each permission is owned by some task and is returned when the task ends. Standard permission counter in Java is class java.util.concurrent.Semaphore:



List<Integer> ids = {1,2,....n};
Semaphore sem = new Semaphore(K);
ids.forEach(id -> {
sem.aquire();
CompletableFuture<Data> fut = Retrieve(id);
fut.thenRun(sem::release);
fut.thenAcceptAsync(this::ProcessRecord, someExecutor);
})


Since the task generator occupies only one thread, there is little sense to make it asynchronous. If, however, you don't want to use a dedicated thread for task generator and want to implement asynchronous solution, then the main question is what class can play the role of asynchronous permission counter. You have 3 options:




  • use implicit asynchronous permission counter which is a part of reactive streams, found in RxJava, project Reactor etc.

  • use explicit asynchronous semaphore org.df4j.core.boundconnector.permitstream.Semafor included in my asynchronous library df4j

  • make it yourself







share|improve this answer












share|improve this answer



share|improve this answer










answered Nov 23 '18 at 16:27









Alexei KaigorodovAlexei Kaigorodov

10.2k11330




10.2k11330













  • Great! Could you give an example/link to the "implicit asynchronous permission counter" in RxJava? I tried a lot to find how to do this with RxJava since my project already has a dependency on this

    – Erric
    Nov 23 '18 at 17:24











  • Learn how to use backpressure in RxJava. Backpressure limits the number of items in a message queue, and you need to limit the number of asynchronous tasks. Map tasks to messages somehow.

    – Alexei Kaigorodov
    Nov 23 '18 at 17:35











  • But backpressure in RxJava has either "discard", "buffer" or "latest" strategy, which does not seem to fit my needs

    – Erric
    Nov 23 '18 at 17:38











  • I am not a specialist in RxJava, but I think strategy.BUFFER is ok for you, as it does not drops items. Task generator is implemented as an Observable, pushing tasks into a reactive stream and waiting when the stream is able to accept next task. The most tricky part is an Observer on the other end of the stream. It should take the next task and asynchronously wait when it is complete. Then start ProcessRecord() and take the next task.

    – Alexei Kaigorodov
    Nov 23 '18 at 17:50











  • I mostly get stuck with how to provide "k" (like in your snippet) when implementing with RxJava. All my attempts ended up with things working like k = 0 or k = N

    – Erric
    Nov 23 '18 at 17:57





















  • Great! Could you give an example/link to the "implicit asynchronous permission counter" in RxJava? I tried a lot to find how to do this with RxJava since my project already has a dependency on this

    – Erric
    Nov 23 '18 at 17:24











  • Learn how to use backpressure in RxJava. Backpressure limits the number of items in a message queue, and you need to limit the number of asynchronous tasks. Map tasks to messages somehow.

    – Alexei Kaigorodov
    Nov 23 '18 at 17:35











  • But backpressure in RxJava has either "discard", "buffer" or "latest" strategy, which does not seem to fit my needs

    – Erric
    Nov 23 '18 at 17:38











  • I am not a specialist in RxJava, but I think strategy.BUFFER is ok for you, as it does not drops items. Task generator is implemented as an Observable, pushing tasks into a reactive stream and waiting when the stream is able to accept next task. The most tricky part is an Observer on the other end of the stream. It should take the next task and asynchronously wait when it is complete. Then start ProcessRecord() and take the next task.

    – Alexei Kaigorodov
    Nov 23 '18 at 17:50











  • I mostly get stuck with how to provide "k" (like in your snippet) when implementing with RxJava. All my attempts ended up with things working like k = 0 or k = N

    – Erric
    Nov 23 '18 at 17:57



















Great! Could you give an example/link to the "implicit asynchronous permission counter" in RxJava? I tried a lot to find how to do this with RxJava since my project already has a dependency on this

– Erric
Nov 23 '18 at 17:24





Great! Could you give an example/link to the "implicit asynchronous permission counter" in RxJava? I tried a lot to find how to do this with RxJava since my project already has a dependency on this

– Erric
Nov 23 '18 at 17:24













Learn how to use backpressure in RxJava. Backpressure limits the number of items in a message queue, and you need to limit the number of asynchronous tasks. Map tasks to messages somehow.

– Alexei Kaigorodov
Nov 23 '18 at 17:35





Learn how to use backpressure in RxJava. Backpressure limits the number of items in a message queue, and you need to limit the number of asynchronous tasks. Map tasks to messages somehow.

– Alexei Kaigorodov
Nov 23 '18 at 17:35













But backpressure in RxJava has either "discard", "buffer" or "latest" strategy, which does not seem to fit my needs

– Erric
Nov 23 '18 at 17:38





But backpressure in RxJava has either "discard", "buffer" or "latest" strategy, which does not seem to fit my needs

– Erric
Nov 23 '18 at 17:38













I am not a specialist in RxJava, but I think strategy.BUFFER is ok for you, as it does not drops items. Task generator is implemented as an Observable, pushing tasks into a reactive stream and waiting when the stream is able to accept next task. The most tricky part is an Observer on the other end of the stream. It should take the next task and asynchronously wait when it is complete. Then start ProcessRecord() and take the next task.

– Alexei Kaigorodov
Nov 23 '18 at 17:50





I am not a specialist in RxJava, but I think strategy.BUFFER is ok for you, as it does not drops items. Task generator is implemented as an Observable, pushing tasks into a reactive stream and waiting when the stream is able to accept next task. The most tricky part is an Observer on the other end of the stream. It should take the next task and asynchronously wait when it is complete. Then start ProcessRecord() and take the next task.

– Alexei Kaigorodov
Nov 23 '18 at 17:50













I mostly get stuck with how to provide "k" (like in your snippet) when implementing with RxJava. All my attempts ended up with things working like k = 0 or k = N

– Erric
Nov 23 '18 at 17:57







I mostly get stuck with how to provide "k" (like in your snippet) when implementing with RxJava. All my attempts ended up with things working like k = 0 or k = N

– Erric
Nov 23 '18 at 17:57















0














Solution using df4j, with explicit asynchronous semaphore:



import org.df4j.core.boundconnector.permitstream.Semafor;
import org.df4j.core.tasknode.Action;
import org.df4j.core.tasknode.messagestream.Actor;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;

public class AsyncSemaDemo extends Actor {
List<Integer> ids = Arrays.asList(1, 2, 3, 4, 5);
Semafor sema = new Semafor(this, 2);
Iterator<Integer> iter = ids.iterator();
int tick = 100; // millis
CountDownLatch done = new CountDownLatch(ids.size());
long start = System.currentTimeMillis();

private void printClock(String s) {
long ticks = (System.currentTimeMillis() - start)/tick;
System.out.println(Long.toString(ticks) + " " + s);
}

CompletableFuture<Integer> Retrieve(Integer e) {
return CompletableFuture.supplyAsync(() -> {
printClock("Req " + e + " started");
try {
Thread.sleep(tick); // Network
} catch (InterruptedException ex) {
}
printClock(" Req " + e + " done");
return e;
}, executor);
}

void ProcessRecord(Integer s) {
printClock(" Proc " + s + " started");
try {
Thread.sleep(tick*2); // Compute
} catch (InterruptedException ex) {
}
printClock(" Proc " + s + " done");
}

@Action
public void act() {
if (iter.hasNext()) {
CompletableFuture<Integer> fut = Retrieve(iter.next());
fut.thenRun(sema::release);
fut.thenAcceptAsync(this::ProcessRecord, executor)
.thenRun(done::countDown);
} else {
super.stop();
}
}

public static void main(String args) throws InterruptedException {
AsyncSemaDemo asyncSemaDemo = new AsyncSemaDemo();
asyncSemaDemo.start(ForkJoinPool.commonPool());
asyncSemaDemo.done.await();
}
}


its log should be:



0 Req 1 started
0 Req 2 started
1 Req 1 done
1 Proc 1 started
1 Req 3 started
1 Req 2 done
1 Proc 2 started
1 Req 4 started
2 Req 3 done
2 Proc 3 started
2 Req 5 started
2 Req 4 done
2 Proc 4 started
3 Proc 1 done
3 Req 5 done
3 Proc 5 started
3 Proc 2 done
4 Proc 3 done
4 Proc 4 done
5 Proc 5 done


Note how this solution is close to my previous answer with standard java.util.concurrent.Semaphore.






share|improve this answer
























  • This is spawns up k separate instances of Process() as well. I was looking at having only 1 Process() happening at any given point of time, while k requests for future records are in the air. I was able to acheive this with flatMap and concatMap methods: github.com/ReactiveX/RxJava#parallel-processing

    – Erric
    Nov 24 '18 at 6:23













  • why do you think it spawns new Processes? It does not. Process is not even mentioned in the program.

    – Alexei Kaigorodov
    Nov 24 '18 at 6:43











  • I meant Proc 1 and Proc 2 are started together here. I understand that in most scenarious you would want this. But I did not want my workflow to be compute intensive, hence restricting Proc 1 and Proc 2 to be running in parallel. Plus it seems nice to use RxJava and not end up managing semaphores yourself

    – Erric
    Nov 24 '18 at 6:46













  • For N records, total time in my approach would be Treq/K * N + K*Tproc whereas yours would be Treq/K * N + Tproc (faster, but at the expense of more CPU tasks in parallel)

    – Erric
    Nov 24 '18 at 6:58











  • if you want to run ProcessRecord() sequentially, just submit them to a dedicated single-threaded of serial executor.

    – Alexei Kaigorodov
    Nov 24 '18 at 7:30
















0














Solution using df4j, with explicit asynchronous semaphore:



import org.df4j.core.boundconnector.permitstream.Semafor;
import org.df4j.core.tasknode.Action;
import org.df4j.core.tasknode.messagestream.Actor;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;

public class AsyncSemaDemo extends Actor {
List<Integer> ids = Arrays.asList(1, 2, 3, 4, 5);
Semafor sema = new Semafor(this, 2);
Iterator<Integer> iter = ids.iterator();
int tick = 100; // millis
CountDownLatch done = new CountDownLatch(ids.size());
long start = System.currentTimeMillis();

private void printClock(String s) {
long ticks = (System.currentTimeMillis() - start)/tick;
System.out.println(Long.toString(ticks) + " " + s);
}

CompletableFuture<Integer> Retrieve(Integer e) {
return CompletableFuture.supplyAsync(() -> {
printClock("Req " + e + " started");
try {
Thread.sleep(tick); // Network
} catch (InterruptedException ex) {
}
printClock(" Req " + e + " done");
return e;
}, executor);
}

void ProcessRecord(Integer s) {
printClock(" Proc " + s + " started");
try {
Thread.sleep(tick*2); // Compute
} catch (InterruptedException ex) {
}
printClock(" Proc " + s + " done");
}

@Action
public void act() {
if (iter.hasNext()) {
CompletableFuture<Integer> fut = Retrieve(iter.next());
fut.thenRun(sema::release);
fut.thenAcceptAsync(this::ProcessRecord, executor)
.thenRun(done::countDown);
} else {
super.stop();
}
}

public static void main(String args) throws InterruptedException {
AsyncSemaDemo asyncSemaDemo = new AsyncSemaDemo();
asyncSemaDemo.start(ForkJoinPool.commonPool());
asyncSemaDemo.done.await();
}
}


its log should be:



0 Req 1 started
0 Req 2 started
1 Req 1 done
1 Proc 1 started
1 Req 3 started
1 Req 2 done
1 Proc 2 started
1 Req 4 started
2 Req 3 done
2 Proc 3 started
2 Req 5 started
2 Req 4 done
2 Proc 4 started
3 Proc 1 done
3 Req 5 done
3 Proc 5 started
3 Proc 2 done
4 Proc 3 done
4 Proc 4 done
5 Proc 5 done


Note how this solution is close to my previous answer with standard java.util.concurrent.Semaphore.






share|improve this answer
























  • This is spawns up k separate instances of Process() as well. I was looking at having only 1 Process() happening at any given point of time, while k requests for future records are in the air. I was able to acheive this with flatMap and concatMap methods: github.com/ReactiveX/RxJava#parallel-processing

    – Erric
    Nov 24 '18 at 6:23













  • why do you think it spawns new Processes? It does not. Process is not even mentioned in the program.

    – Alexei Kaigorodov
    Nov 24 '18 at 6:43











  • I meant Proc 1 and Proc 2 are started together here. I understand that in most scenarious you would want this. But I did not want my workflow to be compute intensive, hence restricting Proc 1 and Proc 2 to be running in parallel. Plus it seems nice to use RxJava and not end up managing semaphores yourself

    – Erric
    Nov 24 '18 at 6:46













  • For N records, total time in my approach would be Treq/K * N + K*Tproc whereas yours would be Treq/K * N + Tproc (faster, but at the expense of more CPU tasks in parallel)

    – Erric
    Nov 24 '18 at 6:58











  • if you want to run ProcessRecord() sequentially, just submit them to a dedicated single-threaded of serial executor.

    – Alexei Kaigorodov
    Nov 24 '18 at 7:30














0












0








0







Solution using df4j, with explicit asynchronous semaphore:



import org.df4j.core.boundconnector.permitstream.Semafor;
import org.df4j.core.tasknode.Action;
import org.df4j.core.tasknode.messagestream.Actor;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;

public class AsyncSemaDemo extends Actor {
List<Integer> ids = Arrays.asList(1, 2, 3, 4, 5);
Semafor sema = new Semafor(this, 2);
Iterator<Integer> iter = ids.iterator();
int tick = 100; // millis
CountDownLatch done = new CountDownLatch(ids.size());
long start = System.currentTimeMillis();

private void printClock(String s) {
long ticks = (System.currentTimeMillis() - start)/tick;
System.out.println(Long.toString(ticks) + " " + s);
}

CompletableFuture<Integer> Retrieve(Integer e) {
return CompletableFuture.supplyAsync(() -> {
printClock("Req " + e + " started");
try {
Thread.sleep(tick); // Network
} catch (InterruptedException ex) {
}
printClock(" Req " + e + " done");
return e;
}, executor);
}

void ProcessRecord(Integer s) {
printClock(" Proc " + s + " started");
try {
Thread.sleep(tick*2); // Compute
} catch (InterruptedException ex) {
}
printClock(" Proc " + s + " done");
}

@Action
public void act() {
if (iter.hasNext()) {
CompletableFuture<Integer> fut = Retrieve(iter.next());
fut.thenRun(sema::release);
fut.thenAcceptAsync(this::ProcessRecord, executor)
.thenRun(done::countDown);
} else {
super.stop();
}
}

public static void main(String args) throws InterruptedException {
AsyncSemaDemo asyncSemaDemo = new AsyncSemaDemo();
asyncSemaDemo.start(ForkJoinPool.commonPool());
asyncSemaDemo.done.await();
}
}


its log should be:



0 Req 1 started
0 Req 2 started
1 Req 1 done
1 Proc 1 started
1 Req 3 started
1 Req 2 done
1 Proc 2 started
1 Req 4 started
2 Req 3 done
2 Proc 3 started
2 Req 5 started
2 Req 4 done
2 Proc 4 started
3 Proc 1 done
3 Req 5 done
3 Proc 5 started
3 Proc 2 done
4 Proc 3 done
4 Proc 4 done
5 Proc 5 done


Note how this solution is close to my previous answer with standard java.util.concurrent.Semaphore.






share|improve this answer













Solution using df4j, with explicit asynchronous semaphore:



import org.df4j.core.boundconnector.permitstream.Semafor;
import org.df4j.core.tasknode.Action;
import org.df4j.core.tasknode.messagestream.Actor;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;

public class AsyncSemaDemo extends Actor {
List<Integer> ids = Arrays.asList(1, 2, 3, 4, 5);
Semafor sema = new Semafor(this, 2);
Iterator<Integer> iter = ids.iterator();
int tick = 100; // millis
CountDownLatch done = new CountDownLatch(ids.size());
long start = System.currentTimeMillis();

private void printClock(String s) {
long ticks = (System.currentTimeMillis() - start)/tick;
System.out.println(Long.toString(ticks) + " " + s);
}

CompletableFuture<Integer> Retrieve(Integer e) {
return CompletableFuture.supplyAsync(() -> {
printClock("Req " + e + " started");
try {
Thread.sleep(tick); // Network
} catch (InterruptedException ex) {
}
printClock(" Req " + e + " done");
return e;
}, executor);
}

void ProcessRecord(Integer s) {
printClock(" Proc " + s + " started");
try {
Thread.sleep(tick*2); // Compute
} catch (InterruptedException ex) {
}
printClock(" Proc " + s + " done");
}

@Action
public void act() {
if (iter.hasNext()) {
CompletableFuture<Integer> fut = Retrieve(iter.next());
fut.thenRun(sema::release);
fut.thenAcceptAsync(this::ProcessRecord, executor)
.thenRun(done::countDown);
} else {
super.stop();
}
}

public static void main(String args) throws InterruptedException {
AsyncSemaDemo asyncSemaDemo = new AsyncSemaDemo();
asyncSemaDemo.start(ForkJoinPool.commonPool());
asyncSemaDemo.done.await();
}
}


its log should be:



0 Req 1 started
0 Req 2 started
1 Req 1 done
1 Proc 1 started
1 Req 3 started
1 Req 2 done
1 Proc 2 started
1 Req 4 started
2 Req 3 done
2 Proc 3 started
2 Req 5 started
2 Req 4 done
2 Proc 4 started
3 Proc 1 done
3 Req 5 done
3 Proc 5 started
3 Proc 2 done
4 Proc 3 done
4 Proc 4 done
5 Proc 5 done


Note how this solution is close to my previous answer with standard java.util.concurrent.Semaphore.







share|improve this answer












share|improve this answer



share|improve this answer










answered Nov 24 '18 at 4:54









Alexei KaigorodovAlexei Kaigorodov

10.2k11330




10.2k11330













  • This is spawns up k separate instances of Process() as well. I was looking at having only 1 Process() happening at any given point of time, while k requests for future records are in the air. I was able to acheive this with flatMap and concatMap methods: github.com/ReactiveX/RxJava#parallel-processing

    – Erric
    Nov 24 '18 at 6:23













  • why do you think it spawns new Processes? It does not. Process is not even mentioned in the program.

    – Alexei Kaigorodov
    Nov 24 '18 at 6:43











  • I meant Proc 1 and Proc 2 are started together here. I understand that in most scenarious you would want this. But I did not want my workflow to be compute intensive, hence restricting Proc 1 and Proc 2 to be running in parallel. Plus it seems nice to use RxJava and not end up managing semaphores yourself

    – Erric
    Nov 24 '18 at 6:46













  • For N records, total time in my approach would be Treq/K * N + K*Tproc whereas yours would be Treq/K * N + Tproc (faster, but at the expense of more CPU tasks in parallel)

    – Erric
    Nov 24 '18 at 6:58











  • if you want to run ProcessRecord() sequentially, just submit them to a dedicated single-threaded of serial executor.

    – Alexei Kaigorodov
    Nov 24 '18 at 7:30



















  • This is spawns up k separate instances of Process() as well. I was looking at having only 1 Process() happening at any given point of time, while k requests for future records are in the air. I was able to acheive this with flatMap and concatMap methods: github.com/ReactiveX/RxJava#parallel-processing

    – Erric
    Nov 24 '18 at 6:23













  • why do you think it spawns new Processes? It does not. Process is not even mentioned in the program.

    – Alexei Kaigorodov
    Nov 24 '18 at 6:43











  • I meant Proc 1 and Proc 2 are started together here. I understand that in most scenarious you would want this. But I did not want my workflow to be compute intensive, hence restricting Proc 1 and Proc 2 to be running in parallel. Plus it seems nice to use RxJava and not end up managing semaphores yourself

    – Erric
    Nov 24 '18 at 6:46













  • For N records, total time in my approach would be Treq/K * N + K*Tproc whereas yours would be Treq/K * N + Tproc (faster, but at the expense of more CPU tasks in parallel)

    – Erric
    Nov 24 '18 at 6:58











  • if you want to run ProcessRecord() sequentially, just submit them to a dedicated single-threaded of serial executor.

    – Alexei Kaigorodov
    Nov 24 '18 at 7:30

















This is spawns up k separate instances of Process() as well. I was looking at having only 1 Process() happening at any given point of time, while k requests for future records are in the air. I was able to acheive this with flatMap and concatMap methods: github.com/ReactiveX/RxJava#parallel-processing

– Erric
Nov 24 '18 at 6:23







This is spawns up k separate instances of Process() as well. I was looking at having only 1 Process() happening at any given point of time, while k requests for future records are in the air. I was able to acheive this with flatMap and concatMap methods: github.com/ReactiveX/RxJava#parallel-processing

– Erric
Nov 24 '18 at 6:23















why do you think it spawns new Processes? It does not. Process is not even mentioned in the program.

– Alexei Kaigorodov
Nov 24 '18 at 6:43





why do you think it spawns new Processes? It does not. Process is not even mentioned in the program.

– Alexei Kaigorodov
Nov 24 '18 at 6:43













I meant Proc 1 and Proc 2 are started together here. I understand that in most scenarious you would want this. But I did not want my workflow to be compute intensive, hence restricting Proc 1 and Proc 2 to be running in parallel. Plus it seems nice to use RxJava and not end up managing semaphores yourself

– Erric
Nov 24 '18 at 6:46







I meant Proc 1 and Proc 2 are started together here. I understand that in most scenarious you would want this. But I did not want my workflow to be compute intensive, hence restricting Proc 1 and Proc 2 to be running in parallel. Plus it seems nice to use RxJava and not end up managing semaphores yourself

– Erric
Nov 24 '18 at 6:46















For N records, total time in my approach would be Treq/K * N + K*Tproc whereas yours would be Treq/K * N + Tproc (faster, but at the expense of more CPU tasks in parallel)

– Erric
Nov 24 '18 at 6:58





For N records, total time in my approach would be Treq/K * N + K*Tproc whereas yours would be Treq/K * N + Tproc (faster, but at the expense of more CPU tasks in parallel)

– Erric
Nov 24 '18 at 6:58













if you want to run ProcessRecord() sequentially, just submit them to a dedicated single-threaded of serial executor.

– Alexei Kaigorodov
Nov 24 '18 at 7:30





if you want to run ProcessRecord() sequentially, just submit them to a dedicated single-threaded of serial executor.

– Alexei Kaigorodov
Nov 24 '18 at 7:30











0














Here's what I finally came up with that seems to get the job done:



Flowable.just(1,2,3,4,5,6) // Completes in 1 + 6 * 3 = 19 secs
.concatMapEager(v->
Flowable.just(v)
.subscribeOn(Schedulers.io())
.map( e->{
System.out.println(getElapsed("Req " + e + " started");
Thread.sleep(1000); // Network: 1 sec
System.out.println(getElapsed("Req " + e + " done");
return e;
}, requestsOnWire, 1) // requestsOnWire = K = 2
.blockingSubscribe(new DisposableSubscriber<Integer>() {
@Override
protected void onStart() {
request(1);
}
@Override
public void onNext(Integer s) {
request(1);
System.out.println("Proc " + s + " started");
try {
Thread.sleep(3000); // Compute: 3 secs
System.out.println("Proc " + s + " done");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {

}
@Override
public void onComplete() {

}
});


Below is the execution order. Note that at any given point of time, there are 1 record being processed, at most 2 requests on wire and at most 2 unprocessed records in memory (Process trails behind by K=2) records:



 0 secs: Req 1 started
: Req 2 started
1 secs: Req 2 done
: Req 1 done
: Proc 1 started
: Req 3 started
: Req 4 started
2 secs: Req 3 done
: Req 4 done
4 secs: Proc 1 done
: Proc 2 started
: Req 5 started
5 secs: Req 5 done
7 secs: Proc 2 done
: Proc 3 started
: Req 6 started
8 secs: Req 6 done
10 secs: Proc 3 done
: Proc 4 started
13 secs: Proc 4 done
: Proc 5 started
16 secs: Proc 5 done
: Proc 6 started
19 secs: Proc 6 done


Hope there are no anti-patterns/pitfalls here.






share|improve this answer


























  • Note that increasing concurrency K is not beneficial in this example, but in scenarios where network takes longer than processing, increasing K speeds up the process

    – Erric
    Nov 24 '18 at 6:44
















0














Here's what I finally came up with that seems to get the job done:



Flowable.just(1,2,3,4,5,6) // Completes in 1 + 6 * 3 = 19 secs
.concatMapEager(v->
Flowable.just(v)
.subscribeOn(Schedulers.io())
.map( e->{
System.out.println(getElapsed("Req " + e + " started");
Thread.sleep(1000); // Network: 1 sec
System.out.println(getElapsed("Req " + e + " done");
return e;
}, requestsOnWire, 1) // requestsOnWire = K = 2
.blockingSubscribe(new DisposableSubscriber<Integer>() {
@Override
protected void onStart() {
request(1);
}
@Override
public void onNext(Integer s) {
request(1);
System.out.println("Proc " + s + " started");
try {
Thread.sleep(3000); // Compute: 3 secs
System.out.println("Proc " + s + " done");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {

}
@Override
public void onComplete() {

}
});


Below is the execution order. Note that at any given point of time, there are 1 record being processed, at most 2 requests on wire and at most 2 unprocessed records in memory (Process trails behind by K=2) records:



 0 secs: Req 1 started
: Req 2 started
1 secs: Req 2 done
: Req 1 done
: Proc 1 started
: Req 3 started
: Req 4 started
2 secs: Req 3 done
: Req 4 done
4 secs: Proc 1 done
: Proc 2 started
: Req 5 started
5 secs: Req 5 done
7 secs: Proc 2 done
: Proc 3 started
: Req 6 started
8 secs: Req 6 done
10 secs: Proc 3 done
: Proc 4 started
13 secs: Proc 4 done
: Proc 5 started
16 secs: Proc 5 done
: Proc 6 started
19 secs: Proc 6 done


Hope there are no anti-patterns/pitfalls here.






share|improve this answer


























  • Note that increasing concurrency K is not beneficial in this example, but in scenarios where network takes longer than processing, increasing K speeds up the process

    – Erric
    Nov 24 '18 at 6:44














0












0








0







Here's what I finally came up with that seems to get the job done:



Flowable.just(1,2,3,4,5,6) // Completes in 1 + 6 * 3 = 19 secs
.concatMapEager(v->
Flowable.just(v)
.subscribeOn(Schedulers.io())
.map( e->{
System.out.println(getElapsed("Req " + e + " started");
Thread.sleep(1000); // Network: 1 sec
System.out.println(getElapsed("Req " + e + " done");
return e;
}, requestsOnWire, 1) // requestsOnWire = K = 2
.blockingSubscribe(new DisposableSubscriber<Integer>() {
@Override
protected void onStart() {
request(1);
}
@Override
public void onNext(Integer s) {
request(1);
System.out.println("Proc " + s + " started");
try {
Thread.sleep(3000); // Compute: 3 secs
System.out.println("Proc " + s + " done");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {

}
@Override
public void onComplete() {

}
});


Below is the execution order. Note that at any given point of time, there are 1 record being processed, at most 2 requests on wire and at most 2 unprocessed records in memory (Process trails behind by K=2) records:



 0 secs: Req 1 started
: Req 2 started
1 secs: Req 2 done
: Req 1 done
: Proc 1 started
: Req 3 started
: Req 4 started
2 secs: Req 3 done
: Req 4 done
4 secs: Proc 1 done
: Proc 2 started
: Req 5 started
5 secs: Req 5 done
7 secs: Proc 2 done
: Proc 3 started
: Req 6 started
8 secs: Req 6 done
10 secs: Proc 3 done
: Proc 4 started
13 secs: Proc 4 done
: Proc 5 started
16 secs: Proc 5 done
: Proc 6 started
19 secs: Proc 6 done


Hope there are no anti-patterns/pitfalls here.






share|improve this answer















Here's what I finally came up with that seems to get the job done:



Flowable.just(1,2,3,4,5,6) // Completes in 1 + 6 * 3 = 19 secs
.concatMapEager(v->
Flowable.just(v)
.subscribeOn(Schedulers.io())
.map( e->{
System.out.println(getElapsed("Req " + e + " started");
Thread.sleep(1000); // Network: 1 sec
System.out.println(getElapsed("Req " + e + " done");
return e;
}, requestsOnWire, 1) // requestsOnWire = K = 2
.blockingSubscribe(new DisposableSubscriber<Integer>() {
@Override
protected void onStart() {
request(1);
}
@Override
public void onNext(Integer s) {
request(1);
System.out.println("Proc " + s + " started");
try {
Thread.sleep(3000); // Compute: 3 secs
System.out.println("Proc " + s + " done");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {

}
@Override
public void onComplete() {

}
});


Below is the execution order. Note that at any given point of time, there are 1 record being processed, at most 2 requests on wire and at most 2 unprocessed records in memory (Process trails behind by K=2) records:



 0 secs: Req 1 started
: Req 2 started
1 secs: Req 2 done
: Req 1 done
: Proc 1 started
: Req 3 started
: Req 4 started
2 secs: Req 3 done
: Req 4 done
4 secs: Proc 1 done
: Proc 2 started
: Req 5 started
5 secs: Req 5 done
7 secs: Proc 2 done
: Proc 3 started
: Req 6 started
8 secs: Req 6 done
10 secs: Proc 3 done
: Proc 4 started
13 secs: Proc 4 done
: Proc 5 started
16 secs: Proc 5 done
: Proc 6 started
19 secs: Proc 6 done


Hope there are no anti-patterns/pitfalls here.







share|improve this answer














share|improve this answer



share|improve this answer








edited Nov 24 '18 at 7:13

























answered Nov 23 '18 at 21:01









ErricErric

340421




340421













  • Note that increasing concurrency K is not beneficial in this example, but in scenarios where network takes longer than processing, increasing K speeds up the process

    – Erric
    Nov 24 '18 at 6:44



















  • Note that increasing concurrency K is not beneficial in this example, but in scenarios where network takes longer than processing, increasing K speeds up the process

    – Erric
    Nov 24 '18 at 6:44

















Note that increasing concurrency K is not beneficial in this example, but in scenarios where network takes longer than processing, increasing K speeds up the process

– Erric
Nov 24 '18 at 6:44





Note that increasing concurrency K is not beneficial in this example, but in scenarios where network takes longer than processing, increasing K speeds up the process

– Erric
Nov 24 '18 at 6:44


















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53448313%2foptimal-orchestration-of-list-of-network-calls-and-processing-tasks-in-java%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







這個網誌中的熱門文章

Xamarin.form Move up view when keyboard appear

Post-Redirect-Get with Spring WebFlux and Thymeleaf

Anylogic : not able to use stopDelay()