Optimal orchestration of list of network calls and processing tasks in Java
I have the following workflow:
There are n
records that need to be retrieved over the network and subsequently n
expensive computations that need to be done on each. Put in code, this would look like:
List<Integer> ids = {1,2,....n};
ids.forEach(id -> {
Record r = RetrieveRecord(id); // Blocking IO
ProcessRecord(r); // CPU Intensive
})
I would like to convert the blocking part into async so that the time is minimized with a single thread- essentially, by ensuring that record i+1
is being retrieved when record i
is being processed. So that the execution would look like:
Retrieve(1).start()
Retrieve(1).onEnd(() -> { start Retrieve(2), Process(1) })
Retrieve(2).onEnd(() -> { start Retrieve(3), Process(2) })
....
Now I can come up with the naive way to implement this with a List<>
and CompletableFuture
, but this would require me to handle the first record differently.
Is there a more elegant way of solving this with something like reactive streams?
A solution that would maybe let me easily configure how many records Process()
can trail behind Retreive()
?
java asynchronous stream
add a comment |
I have the following workflow:
There are n
records that need to be retrieved over the network and subsequently n
expensive computations that need to be done on each. Put in code, this would look like:
List<Integer> ids = {1,2,....n};
ids.forEach(id -> {
Record r = RetrieveRecord(id); // Blocking IO
ProcessRecord(r); // CPU Intensive
})
I would like to convert the blocking part into async so that the time is minimized with a single thread- essentially, by ensuring that record i+1
is being retrieved when record i
is being processed. So that the execution would look like:
Retrieve(1).start()
Retrieve(1).onEnd(() -> { start Retrieve(2), Process(1) })
Retrieve(2).onEnd(() -> { start Retrieve(3), Process(2) })
....
Now I can come up with the naive way to implement this with a List<>
and CompletableFuture
, but this would require me to handle the first record differently.
Is there a more elegant way of solving this with something like reactive streams?
A solution that would maybe let me easily configure how many records Process()
can trail behind Retreive()
?
java asynchronous stream
RetrieveRecord is a blocking operation. It always blocks some thread. If you move it to a single thread, then this thread would be execute all the retrievings sequentially and the time would be maximized, not minimized.
– Alexei Kaigorodov
Nov 23 '18 at 14:55
I already have a non blocking implementation of Retrieve() which returns a CompletableFuture. What I am trying to achieve here is a (strictly) bounded prefetch of nextk
records (k=1 in this case). Basically I dont want all 100 RetrieveRecords to be triggered in the start, neither do I want to wait till Process(i) is complete before starting Retreive(i+1). Basically Retrieve(i+k) should be happening over the wire while Process(i) is running
– Erric
Nov 23 '18 at 15:08
add a comment |
I have the following workflow:
There are n
records that need to be retrieved over the network and subsequently n
expensive computations that need to be done on each. Put in code, this would look like:
List<Integer> ids = {1,2,....n};
ids.forEach(id -> {
Record r = RetrieveRecord(id); // Blocking IO
ProcessRecord(r); // CPU Intensive
})
I would like to convert the blocking part into async so that the time is minimized with a single thread- essentially, by ensuring that record i+1
is being retrieved when record i
is being processed. So that the execution would look like:
Retrieve(1).start()
Retrieve(1).onEnd(() -> { start Retrieve(2), Process(1) })
Retrieve(2).onEnd(() -> { start Retrieve(3), Process(2) })
....
Now I can come up with the naive way to implement this with a List<>
and CompletableFuture
, but this would require me to handle the first record differently.
Is there a more elegant way of solving this with something like reactive streams?
A solution that would maybe let me easily configure how many records Process()
can trail behind Retreive()
?
java asynchronous stream
I have the following workflow:
There are n
records that need to be retrieved over the network and subsequently n
expensive computations that need to be done on each. Put in code, this would look like:
List<Integer> ids = {1,2,....n};
ids.forEach(id -> {
Record r = RetrieveRecord(id); // Blocking IO
ProcessRecord(r); // CPU Intensive
})
I would like to convert the blocking part into async so that the time is minimized with a single thread- essentially, by ensuring that record i+1
is being retrieved when record i
is being processed. So that the execution would look like:
Retrieve(1).start()
Retrieve(1).onEnd(() -> { start Retrieve(2), Process(1) })
Retrieve(2).onEnd(() -> { start Retrieve(3), Process(2) })
....
Now I can come up with the naive way to implement this with a List<>
and CompletableFuture
, but this would require me to handle the first record differently.
Is there a more elegant way of solving this with something like reactive streams?
A solution that would maybe let me easily configure how many records Process()
can trail behind Retreive()
?
java asynchronous stream
java asynchronous stream
edited Nov 23 '18 at 15:07
Erric
asked Nov 23 '18 at 14:15
ErricErric
338421
338421
RetrieveRecord is a blocking operation. It always blocks some thread. If you move it to a single thread, then this thread would be execute all the retrievings sequentially and the time would be maximized, not minimized.
– Alexei Kaigorodov
Nov 23 '18 at 14:55
I already have a non blocking implementation of Retrieve() which returns a CompletableFuture. What I am trying to achieve here is a (strictly) bounded prefetch of nextk
records (k=1 in this case). Basically I dont want all 100 RetrieveRecords to be triggered in the start, neither do I want to wait till Process(i) is complete before starting Retreive(i+1). Basically Retrieve(i+k) should be happening over the wire while Process(i) is running
– Erric
Nov 23 '18 at 15:08
add a comment |
RetrieveRecord is a blocking operation. It always blocks some thread. If you move it to a single thread, then this thread would be execute all the retrievings sequentially and the time would be maximized, not minimized.
– Alexei Kaigorodov
Nov 23 '18 at 14:55
I already have a non blocking implementation of Retrieve() which returns a CompletableFuture. What I am trying to achieve here is a (strictly) bounded prefetch of nextk
records (k=1 in this case). Basically I dont want all 100 RetrieveRecords to be triggered in the start, neither do I want to wait till Process(i) is complete before starting Retreive(i+1). Basically Retrieve(i+k) should be happening over the wire while Process(i) is running
– Erric
Nov 23 '18 at 15:08
RetrieveRecord is a blocking operation. It always blocks some thread. If you move it to a single thread, then this thread would be execute all the retrievings sequentially and the time would be maximized, not minimized.
– Alexei Kaigorodov
Nov 23 '18 at 14:55
RetrieveRecord is a blocking operation. It always blocks some thread. If you move it to a single thread, then this thread would be execute all the retrievings sequentially and the time would be maximized, not minimized.
– Alexei Kaigorodov
Nov 23 '18 at 14:55
I already have a non blocking implementation of Retrieve() which returns a CompletableFuture. What I am trying to achieve here is a (strictly) bounded prefetch of next
k
records (k=1 in this case). Basically I dont want all 100 RetrieveRecords to be triggered in the start, neither do I want to wait till Process(i) is complete before starting Retreive(i+1). Basically Retrieve(i+k) should be happening over the wire while Process(i) is running– Erric
Nov 23 '18 at 15:08
I already have a non blocking implementation of Retrieve() which returns a CompletableFuture. What I am trying to achieve here is a (strictly) bounded prefetch of next
k
records (k=1 in this case). Basically I dont want all 100 RetrieveRecords to be triggered in the start, neither do I want to wait till Process(i) is complete before starting Retreive(i+1). Basically Retrieve(i+k) should be happening over the wire while Process(i) is running– Erric
Nov 23 '18 at 15:08
add a comment |
3 Answers
3
active
oldest
votes
So you have N tasks and want to run them in parallel but no more than K tasks simultaneously. Most natural way is to have a task generator and a permission counter with K permissions initially. Task generator creates K tasks and waits for more permissions. Each permission is owned by some task and is returned when the task ends. Standard permission counter in Java is class java.util.concurrent.Semaphore
:
List<Integer> ids = {1,2,....n};
Semaphore sem = new Semaphore(K);
ids.forEach(id -> {
sem.aquire();
CompletableFuture<Data> fut = Retrieve(id);
fut.thenRun(sem::release);
fut.thenAcceptAsync(this::ProcessRecord, someExecutor);
})
Since the task generator occupies only one thread, there is little sense to make it asynchronous. If, however, you don't want to use a dedicated thread for task generator and want to implement asynchronous solution, then the main question is what class can play the role of asynchronous permission counter. You have 3 options:
- use implicit asynchronous permission counter which is a part of reactive streams, found in RxJava, project Reactor etc.
- use explicit asynchronous semaphore
org.df4j.core.boundconnector.permitstream.Semafor
included in my asynchronous library df4j
- make it yourself
Great! Could you give an example/link to the "implicit asynchronous permission counter" in RxJava? I tried a lot to find how to do this with RxJava since my project already has a dependency on this
– Erric
Nov 23 '18 at 17:24
Learn how to use backpressure in RxJava. Backpressure limits the number of items in a message queue, and you need to limit the number of asynchronous tasks. Map tasks to messages somehow.
– Alexei Kaigorodov
Nov 23 '18 at 17:35
But backpressure in RxJava has either "discard", "buffer" or "latest" strategy, which does not seem to fit my needs
– Erric
Nov 23 '18 at 17:38
I am not a specialist in RxJava, but I think strategy.BUFFER is ok for you, as it does not drops items. Task generator is implemented as an Observable, pushing tasks into a reactive stream and waiting when the stream is able to accept next task. The most tricky part is an Observer on the other end of the stream. It should take the next task and asynchronously wait when it is complete. Then start ProcessRecord() and take the next task.
– Alexei Kaigorodov
Nov 23 '18 at 17:50
I mostly get stuck with how to provide "k" (like in your snippet) when implementing with RxJava. All my attempts ended up with things working like k = 0 or k = N
– Erric
Nov 23 '18 at 17:57
|
show 2 more comments
Solution using df4j, with explicit asynchronous semaphore:
import org.df4j.core.boundconnector.permitstream.Semafor;
import org.df4j.core.tasknode.Action;
import org.df4j.core.tasknode.messagestream.Actor;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
public class AsyncSemaDemo extends Actor {
List<Integer> ids = Arrays.asList(1, 2, 3, 4, 5);
Semafor sema = new Semafor(this, 2);
Iterator<Integer> iter = ids.iterator();
int tick = 100; // millis
CountDownLatch done = new CountDownLatch(ids.size());
long start = System.currentTimeMillis();
private void printClock(String s) {
long ticks = (System.currentTimeMillis() - start)/tick;
System.out.println(Long.toString(ticks) + " " + s);
}
CompletableFuture<Integer> Retrieve(Integer e) {
return CompletableFuture.supplyAsync(() -> {
printClock("Req " + e + " started");
try {
Thread.sleep(tick); // Network
} catch (InterruptedException ex) {
}
printClock(" Req " + e + " done");
return e;
}, executor);
}
void ProcessRecord(Integer s) {
printClock(" Proc " + s + " started");
try {
Thread.sleep(tick*2); // Compute
} catch (InterruptedException ex) {
}
printClock(" Proc " + s + " done");
}
@Action
public void act() {
if (iter.hasNext()) {
CompletableFuture<Integer> fut = Retrieve(iter.next());
fut.thenRun(sema::release);
fut.thenAcceptAsync(this::ProcessRecord, executor)
.thenRun(done::countDown);
} else {
super.stop();
}
}
public static void main(String args) throws InterruptedException {
AsyncSemaDemo asyncSemaDemo = new AsyncSemaDemo();
asyncSemaDemo.start(ForkJoinPool.commonPool());
asyncSemaDemo.done.await();
}
}
its log should be:
0 Req 1 started
0 Req 2 started
1 Req 1 done
1 Proc 1 started
1 Req 3 started
1 Req 2 done
1 Proc 2 started
1 Req 4 started
2 Req 3 done
2 Proc 3 started
2 Req 5 started
2 Req 4 done
2 Proc 4 started
3 Proc 1 done
3 Req 5 done
3 Proc 5 started
3 Proc 2 done
4 Proc 3 done
4 Proc 4 done
5 Proc 5 done
Note how this solution is close to my previous answer with standard java.util.concurrent.Semaphore.
This is spawns up k separate instances of Process() as well. I was looking at having only 1 Process() happening at any given point of time, while k requests for future records are in the air. I was able to acheive this with flatMap and concatMap methods: github.com/ReactiveX/RxJava#parallel-processing
– Erric
Nov 24 '18 at 6:23
why do you think it spawns new Processes? It does not. Process is not even mentioned in the program.
– Alexei Kaigorodov
Nov 24 '18 at 6:43
I meant Proc 1 and Proc 2 are started together here. I understand that in most scenarious you would want this. But I did not want my workflow to be compute intensive, hence restricting Proc 1 and Proc 2 to be running in parallel. Plus it seems nice to use RxJava and not end up managing semaphores yourself
– Erric
Nov 24 '18 at 6:46
For N records, total time in my approach would beTreq/K * N + K*Tproc
whereas yours would beTreq/K * N + Tproc
(faster, but at the expense of more CPU tasks in parallel)
– Erric
Nov 24 '18 at 6:58
if you want to run ProcessRecord() sequentially, just submit them to a dedicated single-threaded of serial executor.
– Alexei Kaigorodov
Nov 24 '18 at 7:30
|
show 1 more comment
Here's what I finally came up with that seems to get the job done:
Flowable.just(1,2,3,4,5,6) // Completes in 1 + 6 * 3 = 19 secs
.concatMapEager(v->
Flowable.just(v)
.subscribeOn(Schedulers.io())
.map( e->{
System.out.println(getElapsed("Req " + e + " started");
Thread.sleep(1000); // Network: 1 sec
System.out.println(getElapsed("Req " + e + " done");
return e;
}, requestsOnWire, 1) // requestsOnWire = K = 2
.blockingSubscribe(new DisposableSubscriber<Integer>() {
@Override
protected void onStart() {
request(1);
}
@Override
public void onNext(Integer s) {
request(1);
System.out.println("Proc " + s + " started");
try {
Thread.sleep(3000); // Compute: 3 secs
System.out.println("Proc " + s + " done");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
Below is the execution order. Note that at any given point of time, there are 1 record being processed, at most 2 requests on wire and at most 2 unprocessed records in memory (Process trails behind by K=2) records:
0 secs: Req 1 started
: Req 2 started
1 secs: Req 2 done
: Req 1 done
: Proc 1 started
: Req 3 started
: Req 4 started
2 secs: Req 3 done
: Req 4 done
4 secs: Proc 1 done
: Proc 2 started
: Req 5 started
5 secs: Req 5 done
7 secs: Proc 2 done
: Proc 3 started
: Req 6 started
8 secs: Req 6 done
10 secs: Proc 3 done
: Proc 4 started
13 secs: Proc 4 done
: Proc 5 started
16 secs: Proc 5 done
: Proc 6 started
19 secs: Proc 6 done
Hope there are no anti-patterns/pitfalls here.
Note that increasing concurrency K is not beneficial in this example, but in scenarios where network takes longer than processing, increasing K speeds up the process
– Erric
Nov 24 '18 at 6:44
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53448313%2foptimal-orchestration-of-list-of-network-calls-and-processing-tasks-in-java%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
3 Answers
3
active
oldest
votes
3 Answers
3
active
oldest
votes
active
oldest
votes
active
oldest
votes
So you have N tasks and want to run them in parallel but no more than K tasks simultaneously. Most natural way is to have a task generator and a permission counter with K permissions initially. Task generator creates K tasks and waits for more permissions. Each permission is owned by some task and is returned when the task ends. Standard permission counter in Java is class java.util.concurrent.Semaphore
:
List<Integer> ids = {1,2,....n};
Semaphore sem = new Semaphore(K);
ids.forEach(id -> {
sem.aquire();
CompletableFuture<Data> fut = Retrieve(id);
fut.thenRun(sem::release);
fut.thenAcceptAsync(this::ProcessRecord, someExecutor);
})
Since the task generator occupies only one thread, there is little sense to make it asynchronous. If, however, you don't want to use a dedicated thread for task generator and want to implement asynchronous solution, then the main question is what class can play the role of asynchronous permission counter. You have 3 options:
- use implicit asynchronous permission counter which is a part of reactive streams, found in RxJava, project Reactor etc.
- use explicit asynchronous semaphore
org.df4j.core.boundconnector.permitstream.Semafor
included in my asynchronous library df4j
- make it yourself
Great! Could you give an example/link to the "implicit asynchronous permission counter" in RxJava? I tried a lot to find how to do this with RxJava since my project already has a dependency on this
– Erric
Nov 23 '18 at 17:24
Learn how to use backpressure in RxJava. Backpressure limits the number of items in a message queue, and you need to limit the number of asynchronous tasks. Map tasks to messages somehow.
– Alexei Kaigorodov
Nov 23 '18 at 17:35
But backpressure in RxJava has either "discard", "buffer" or "latest" strategy, which does not seem to fit my needs
– Erric
Nov 23 '18 at 17:38
I am not a specialist in RxJava, but I think strategy.BUFFER is ok for you, as it does not drops items. Task generator is implemented as an Observable, pushing tasks into a reactive stream and waiting when the stream is able to accept next task. The most tricky part is an Observer on the other end of the stream. It should take the next task and asynchronously wait when it is complete. Then start ProcessRecord() and take the next task.
– Alexei Kaigorodov
Nov 23 '18 at 17:50
I mostly get stuck with how to provide "k" (like in your snippet) when implementing with RxJava. All my attempts ended up with things working like k = 0 or k = N
– Erric
Nov 23 '18 at 17:57
|
show 2 more comments
So you have N tasks and want to run them in parallel but no more than K tasks simultaneously. Most natural way is to have a task generator and a permission counter with K permissions initially. Task generator creates K tasks and waits for more permissions. Each permission is owned by some task and is returned when the task ends. Standard permission counter in Java is class java.util.concurrent.Semaphore
:
List<Integer> ids = {1,2,....n};
Semaphore sem = new Semaphore(K);
ids.forEach(id -> {
sem.aquire();
CompletableFuture<Data> fut = Retrieve(id);
fut.thenRun(sem::release);
fut.thenAcceptAsync(this::ProcessRecord, someExecutor);
})
Since the task generator occupies only one thread, there is little sense to make it asynchronous. If, however, you don't want to use a dedicated thread for task generator and want to implement asynchronous solution, then the main question is what class can play the role of asynchronous permission counter. You have 3 options:
- use implicit asynchronous permission counter which is a part of reactive streams, found in RxJava, project Reactor etc.
- use explicit asynchronous semaphore
org.df4j.core.boundconnector.permitstream.Semafor
included in my asynchronous library df4j
- make it yourself
Great! Could you give an example/link to the "implicit asynchronous permission counter" in RxJava? I tried a lot to find how to do this with RxJava since my project already has a dependency on this
– Erric
Nov 23 '18 at 17:24
Learn how to use backpressure in RxJava. Backpressure limits the number of items in a message queue, and you need to limit the number of asynchronous tasks. Map tasks to messages somehow.
– Alexei Kaigorodov
Nov 23 '18 at 17:35
But backpressure in RxJava has either "discard", "buffer" or "latest" strategy, which does not seem to fit my needs
– Erric
Nov 23 '18 at 17:38
I am not a specialist in RxJava, but I think strategy.BUFFER is ok for you, as it does not drops items. Task generator is implemented as an Observable, pushing tasks into a reactive stream and waiting when the stream is able to accept next task. The most tricky part is an Observer on the other end of the stream. It should take the next task and asynchronously wait when it is complete. Then start ProcessRecord() and take the next task.
– Alexei Kaigorodov
Nov 23 '18 at 17:50
I mostly get stuck with how to provide "k" (like in your snippet) when implementing with RxJava. All my attempts ended up with things working like k = 0 or k = N
– Erric
Nov 23 '18 at 17:57
|
show 2 more comments
So you have N tasks and want to run them in parallel but no more than K tasks simultaneously. Most natural way is to have a task generator and a permission counter with K permissions initially. Task generator creates K tasks and waits for more permissions. Each permission is owned by some task and is returned when the task ends. Standard permission counter in Java is class java.util.concurrent.Semaphore
:
List<Integer> ids = {1,2,....n};
Semaphore sem = new Semaphore(K);
ids.forEach(id -> {
sem.aquire();
CompletableFuture<Data> fut = Retrieve(id);
fut.thenRun(sem::release);
fut.thenAcceptAsync(this::ProcessRecord, someExecutor);
})
Since the task generator occupies only one thread, there is little sense to make it asynchronous. If, however, you don't want to use a dedicated thread for task generator and want to implement asynchronous solution, then the main question is what class can play the role of asynchronous permission counter. You have 3 options:
- use implicit asynchronous permission counter which is a part of reactive streams, found in RxJava, project Reactor etc.
- use explicit asynchronous semaphore
org.df4j.core.boundconnector.permitstream.Semafor
included in my asynchronous library df4j
- make it yourself
So you have N tasks and want to run them in parallel but no more than K tasks simultaneously. Most natural way is to have a task generator and a permission counter with K permissions initially. Task generator creates K tasks and waits for more permissions. Each permission is owned by some task and is returned when the task ends. Standard permission counter in Java is class java.util.concurrent.Semaphore
:
List<Integer> ids = {1,2,....n};
Semaphore sem = new Semaphore(K);
ids.forEach(id -> {
sem.aquire();
CompletableFuture<Data> fut = Retrieve(id);
fut.thenRun(sem::release);
fut.thenAcceptAsync(this::ProcessRecord, someExecutor);
})
Since the task generator occupies only one thread, there is little sense to make it asynchronous. If, however, you don't want to use a dedicated thread for task generator and want to implement asynchronous solution, then the main question is what class can play the role of asynchronous permission counter. You have 3 options:
- use implicit asynchronous permission counter which is a part of reactive streams, found in RxJava, project Reactor etc.
- use explicit asynchronous semaphore
org.df4j.core.boundconnector.permitstream.Semafor
included in my asynchronous library df4j
- make it yourself
answered Nov 23 '18 at 16:27
Alexei KaigorodovAlexei Kaigorodov
10.1k11029
10.1k11029
Great! Could you give an example/link to the "implicit asynchronous permission counter" in RxJava? I tried a lot to find how to do this with RxJava since my project already has a dependency on this
– Erric
Nov 23 '18 at 17:24
Learn how to use backpressure in RxJava. Backpressure limits the number of items in a message queue, and you need to limit the number of asynchronous tasks. Map tasks to messages somehow.
– Alexei Kaigorodov
Nov 23 '18 at 17:35
But backpressure in RxJava has either "discard", "buffer" or "latest" strategy, which does not seem to fit my needs
– Erric
Nov 23 '18 at 17:38
I am not a specialist in RxJava, but I think strategy.BUFFER is ok for you, as it does not drops items. Task generator is implemented as an Observable, pushing tasks into a reactive stream and waiting when the stream is able to accept next task. The most tricky part is an Observer on the other end of the stream. It should take the next task and asynchronously wait when it is complete. Then start ProcessRecord() and take the next task.
– Alexei Kaigorodov
Nov 23 '18 at 17:50
I mostly get stuck with how to provide "k" (like in your snippet) when implementing with RxJava. All my attempts ended up with things working like k = 0 or k = N
– Erric
Nov 23 '18 at 17:57
|
show 2 more comments
Great! Could you give an example/link to the "implicit asynchronous permission counter" in RxJava? I tried a lot to find how to do this with RxJava since my project already has a dependency on this
– Erric
Nov 23 '18 at 17:24
Learn how to use backpressure in RxJava. Backpressure limits the number of items in a message queue, and you need to limit the number of asynchronous tasks. Map tasks to messages somehow.
– Alexei Kaigorodov
Nov 23 '18 at 17:35
But backpressure in RxJava has either "discard", "buffer" or "latest" strategy, which does not seem to fit my needs
– Erric
Nov 23 '18 at 17:38
I am not a specialist in RxJava, but I think strategy.BUFFER is ok for you, as it does not drops items. Task generator is implemented as an Observable, pushing tasks into a reactive stream and waiting when the stream is able to accept next task. The most tricky part is an Observer on the other end of the stream. It should take the next task and asynchronously wait when it is complete. Then start ProcessRecord() and take the next task.
– Alexei Kaigorodov
Nov 23 '18 at 17:50
I mostly get stuck with how to provide "k" (like in your snippet) when implementing with RxJava. All my attempts ended up with things working like k = 0 or k = N
– Erric
Nov 23 '18 at 17:57
Great! Could you give an example/link to the "implicit asynchronous permission counter" in RxJava? I tried a lot to find how to do this with RxJava since my project already has a dependency on this
– Erric
Nov 23 '18 at 17:24
Great! Could you give an example/link to the "implicit asynchronous permission counter" in RxJava? I tried a lot to find how to do this with RxJava since my project already has a dependency on this
– Erric
Nov 23 '18 at 17:24
Learn how to use backpressure in RxJava. Backpressure limits the number of items in a message queue, and you need to limit the number of asynchronous tasks. Map tasks to messages somehow.
– Alexei Kaigorodov
Nov 23 '18 at 17:35
Learn how to use backpressure in RxJava. Backpressure limits the number of items in a message queue, and you need to limit the number of asynchronous tasks. Map tasks to messages somehow.
– Alexei Kaigorodov
Nov 23 '18 at 17:35
But backpressure in RxJava has either "discard", "buffer" or "latest" strategy, which does not seem to fit my needs
– Erric
Nov 23 '18 at 17:38
But backpressure in RxJava has either "discard", "buffer" or "latest" strategy, which does not seem to fit my needs
– Erric
Nov 23 '18 at 17:38
I am not a specialist in RxJava, but I think strategy.BUFFER is ok for you, as it does not drops items. Task generator is implemented as an Observable, pushing tasks into a reactive stream and waiting when the stream is able to accept next task. The most tricky part is an Observer on the other end of the stream. It should take the next task and asynchronously wait when it is complete. Then start ProcessRecord() and take the next task.
– Alexei Kaigorodov
Nov 23 '18 at 17:50
I am not a specialist in RxJava, but I think strategy.BUFFER is ok for you, as it does not drops items. Task generator is implemented as an Observable, pushing tasks into a reactive stream and waiting when the stream is able to accept next task. The most tricky part is an Observer on the other end of the stream. It should take the next task and asynchronously wait when it is complete. Then start ProcessRecord() and take the next task.
– Alexei Kaigorodov
Nov 23 '18 at 17:50
I mostly get stuck with how to provide "k" (like in your snippet) when implementing with RxJava. All my attempts ended up with things working like k = 0 or k = N
– Erric
Nov 23 '18 at 17:57
I mostly get stuck with how to provide "k" (like in your snippet) when implementing with RxJava. All my attempts ended up with things working like k = 0 or k = N
– Erric
Nov 23 '18 at 17:57
|
show 2 more comments
Solution using df4j, with explicit asynchronous semaphore:
import org.df4j.core.boundconnector.permitstream.Semafor;
import org.df4j.core.tasknode.Action;
import org.df4j.core.tasknode.messagestream.Actor;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
public class AsyncSemaDemo extends Actor {
List<Integer> ids = Arrays.asList(1, 2, 3, 4, 5);
Semafor sema = new Semafor(this, 2);
Iterator<Integer> iter = ids.iterator();
int tick = 100; // millis
CountDownLatch done = new CountDownLatch(ids.size());
long start = System.currentTimeMillis();
private void printClock(String s) {
long ticks = (System.currentTimeMillis() - start)/tick;
System.out.println(Long.toString(ticks) + " " + s);
}
CompletableFuture<Integer> Retrieve(Integer e) {
return CompletableFuture.supplyAsync(() -> {
printClock("Req " + e + " started");
try {
Thread.sleep(tick); // Network
} catch (InterruptedException ex) {
}
printClock(" Req " + e + " done");
return e;
}, executor);
}
void ProcessRecord(Integer s) {
printClock(" Proc " + s + " started");
try {
Thread.sleep(tick*2); // Compute
} catch (InterruptedException ex) {
}
printClock(" Proc " + s + " done");
}
@Action
public void act() {
if (iter.hasNext()) {
CompletableFuture<Integer> fut = Retrieve(iter.next());
fut.thenRun(sema::release);
fut.thenAcceptAsync(this::ProcessRecord, executor)
.thenRun(done::countDown);
} else {
super.stop();
}
}
public static void main(String args) throws InterruptedException {
AsyncSemaDemo asyncSemaDemo = new AsyncSemaDemo();
asyncSemaDemo.start(ForkJoinPool.commonPool());
asyncSemaDemo.done.await();
}
}
its log should be:
0 Req 1 started
0 Req 2 started
1 Req 1 done
1 Proc 1 started
1 Req 3 started
1 Req 2 done
1 Proc 2 started
1 Req 4 started
2 Req 3 done
2 Proc 3 started
2 Req 5 started
2 Req 4 done
2 Proc 4 started
3 Proc 1 done
3 Req 5 done
3 Proc 5 started
3 Proc 2 done
4 Proc 3 done
4 Proc 4 done
5 Proc 5 done
Note how this solution is close to my previous answer with standard java.util.concurrent.Semaphore.
This is spawns up k separate instances of Process() as well. I was looking at having only 1 Process() happening at any given point of time, while k requests for future records are in the air. I was able to acheive this with flatMap and concatMap methods: github.com/ReactiveX/RxJava#parallel-processing
– Erric
Nov 24 '18 at 6:23
why do you think it spawns new Processes? It does not. Process is not even mentioned in the program.
– Alexei Kaigorodov
Nov 24 '18 at 6:43
I meant Proc 1 and Proc 2 are started together here. I understand that in most scenarious you would want this. But I did not want my workflow to be compute intensive, hence restricting Proc 1 and Proc 2 to be running in parallel. Plus it seems nice to use RxJava and not end up managing semaphores yourself
– Erric
Nov 24 '18 at 6:46
For N records, total time in my approach would beTreq/K * N + K*Tproc
whereas yours would beTreq/K * N + Tproc
(faster, but at the expense of more CPU tasks in parallel)
– Erric
Nov 24 '18 at 6:58
if you want to run ProcessRecord() sequentially, just submit them to a dedicated single-threaded of serial executor.
– Alexei Kaigorodov
Nov 24 '18 at 7:30
|
show 1 more comment
Solution using df4j, with explicit asynchronous semaphore:
import org.df4j.core.boundconnector.permitstream.Semafor;
import org.df4j.core.tasknode.Action;
import org.df4j.core.tasknode.messagestream.Actor;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
public class AsyncSemaDemo extends Actor {
List<Integer> ids = Arrays.asList(1, 2, 3, 4, 5);
Semafor sema = new Semafor(this, 2);
Iterator<Integer> iter = ids.iterator();
int tick = 100; // millis
CountDownLatch done = new CountDownLatch(ids.size());
long start = System.currentTimeMillis();
private void printClock(String s) {
long ticks = (System.currentTimeMillis() - start)/tick;
System.out.println(Long.toString(ticks) + " " + s);
}
CompletableFuture<Integer> Retrieve(Integer e) {
return CompletableFuture.supplyAsync(() -> {
printClock("Req " + e + " started");
try {
Thread.sleep(tick); // Network
} catch (InterruptedException ex) {
}
printClock(" Req " + e + " done");
return e;
}, executor);
}
void ProcessRecord(Integer s) {
printClock(" Proc " + s + " started");
try {
Thread.sleep(tick*2); // Compute
} catch (InterruptedException ex) {
}
printClock(" Proc " + s + " done");
}
@Action
public void act() {
if (iter.hasNext()) {
CompletableFuture<Integer> fut = Retrieve(iter.next());
fut.thenRun(sema::release);
fut.thenAcceptAsync(this::ProcessRecord, executor)
.thenRun(done::countDown);
} else {
super.stop();
}
}
public static void main(String args) throws InterruptedException {
AsyncSemaDemo asyncSemaDemo = new AsyncSemaDemo();
asyncSemaDemo.start(ForkJoinPool.commonPool());
asyncSemaDemo.done.await();
}
}
its log should be:
0 Req 1 started
0 Req 2 started
1 Req 1 done
1 Proc 1 started
1 Req 3 started
1 Req 2 done
1 Proc 2 started
1 Req 4 started
2 Req 3 done
2 Proc 3 started
2 Req 5 started
2 Req 4 done
2 Proc 4 started
3 Proc 1 done
3 Req 5 done
3 Proc 5 started
3 Proc 2 done
4 Proc 3 done
4 Proc 4 done
5 Proc 5 done
Note how this solution is close to my previous answer with standard java.util.concurrent.Semaphore.
This is spawns up k separate instances of Process() as well. I was looking at having only 1 Process() happening at any given point of time, while k requests for future records are in the air. I was able to acheive this with flatMap and concatMap methods: github.com/ReactiveX/RxJava#parallel-processing
– Erric
Nov 24 '18 at 6:23
why do you think it spawns new Processes? It does not. Process is not even mentioned in the program.
– Alexei Kaigorodov
Nov 24 '18 at 6:43
I meant Proc 1 and Proc 2 are started together here. I understand that in most scenarious you would want this. But I did not want my workflow to be compute intensive, hence restricting Proc 1 and Proc 2 to be running in parallel. Plus it seems nice to use RxJava and not end up managing semaphores yourself
– Erric
Nov 24 '18 at 6:46
For N records, total time in my approach would beTreq/K * N + K*Tproc
whereas yours would beTreq/K * N + Tproc
(faster, but at the expense of more CPU tasks in parallel)
– Erric
Nov 24 '18 at 6:58
if you want to run ProcessRecord() sequentially, just submit them to a dedicated single-threaded of serial executor.
– Alexei Kaigorodov
Nov 24 '18 at 7:30
|
show 1 more comment
Solution using df4j, with explicit asynchronous semaphore:
import org.df4j.core.boundconnector.permitstream.Semafor;
import org.df4j.core.tasknode.Action;
import org.df4j.core.tasknode.messagestream.Actor;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
public class AsyncSemaDemo extends Actor {
List<Integer> ids = Arrays.asList(1, 2, 3, 4, 5);
Semafor sema = new Semafor(this, 2);
Iterator<Integer> iter = ids.iterator();
int tick = 100; // millis
CountDownLatch done = new CountDownLatch(ids.size());
long start = System.currentTimeMillis();
private void printClock(String s) {
long ticks = (System.currentTimeMillis() - start)/tick;
System.out.println(Long.toString(ticks) + " " + s);
}
CompletableFuture<Integer> Retrieve(Integer e) {
return CompletableFuture.supplyAsync(() -> {
printClock("Req " + e + " started");
try {
Thread.sleep(tick); // Network
} catch (InterruptedException ex) {
}
printClock(" Req " + e + " done");
return e;
}, executor);
}
void ProcessRecord(Integer s) {
printClock(" Proc " + s + " started");
try {
Thread.sleep(tick*2); // Compute
} catch (InterruptedException ex) {
}
printClock(" Proc " + s + " done");
}
@Action
public void act() {
if (iter.hasNext()) {
CompletableFuture<Integer> fut = Retrieve(iter.next());
fut.thenRun(sema::release);
fut.thenAcceptAsync(this::ProcessRecord, executor)
.thenRun(done::countDown);
} else {
super.stop();
}
}
public static void main(String args) throws InterruptedException {
AsyncSemaDemo asyncSemaDemo = new AsyncSemaDemo();
asyncSemaDemo.start(ForkJoinPool.commonPool());
asyncSemaDemo.done.await();
}
}
its log should be:
0 Req 1 started
0 Req 2 started
1 Req 1 done
1 Proc 1 started
1 Req 3 started
1 Req 2 done
1 Proc 2 started
1 Req 4 started
2 Req 3 done
2 Proc 3 started
2 Req 5 started
2 Req 4 done
2 Proc 4 started
3 Proc 1 done
3 Req 5 done
3 Proc 5 started
3 Proc 2 done
4 Proc 3 done
4 Proc 4 done
5 Proc 5 done
Note how this solution is close to my previous answer with standard java.util.concurrent.Semaphore.
Solution using df4j, with explicit asynchronous semaphore:
import org.df4j.core.boundconnector.permitstream.Semafor;
import org.df4j.core.tasknode.Action;
import org.df4j.core.tasknode.messagestream.Actor;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
public class AsyncSemaDemo extends Actor {
List<Integer> ids = Arrays.asList(1, 2, 3, 4, 5);
Semafor sema = new Semafor(this, 2);
Iterator<Integer> iter = ids.iterator();
int tick = 100; // millis
CountDownLatch done = new CountDownLatch(ids.size());
long start = System.currentTimeMillis();
private void printClock(String s) {
long ticks = (System.currentTimeMillis() - start)/tick;
System.out.println(Long.toString(ticks) + " " + s);
}
CompletableFuture<Integer> Retrieve(Integer e) {
return CompletableFuture.supplyAsync(() -> {
printClock("Req " + e + " started");
try {
Thread.sleep(tick); // Network
} catch (InterruptedException ex) {
}
printClock(" Req " + e + " done");
return e;
}, executor);
}
void ProcessRecord(Integer s) {
printClock(" Proc " + s + " started");
try {
Thread.sleep(tick*2); // Compute
} catch (InterruptedException ex) {
}
printClock(" Proc " + s + " done");
}
@Action
public void act() {
if (iter.hasNext()) {
CompletableFuture<Integer> fut = Retrieve(iter.next());
fut.thenRun(sema::release);
fut.thenAcceptAsync(this::ProcessRecord, executor)
.thenRun(done::countDown);
} else {
super.stop();
}
}
public static void main(String args) throws InterruptedException {
AsyncSemaDemo asyncSemaDemo = new AsyncSemaDemo();
asyncSemaDemo.start(ForkJoinPool.commonPool());
asyncSemaDemo.done.await();
}
}
its log should be:
0 Req 1 started
0 Req 2 started
1 Req 1 done
1 Proc 1 started
1 Req 3 started
1 Req 2 done
1 Proc 2 started
1 Req 4 started
2 Req 3 done
2 Proc 3 started
2 Req 5 started
2 Req 4 done
2 Proc 4 started
3 Proc 1 done
3 Req 5 done
3 Proc 5 started
3 Proc 2 done
4 Proc 3 done
4 Proc 4 done
5 Proc 5 done
Note how this solution is close to my previous answer with standard java.util.concurrent.Semaphore.
answered Nov 24 '18 at 4:54
Alexei KaigorodovAlexei Kaigorodov
10.1k11029
10.1k11029
This is spawns up k separate instances of Process() as well. I was looking at having only 1 Process() happening at any given point of time, while k requests for future records are in the air. I was able to acheive this with flatMap and concatMap methods: github.com/ReactiveX/RxJava#parallel-processing
– Erric
Nov 24 '18 at 6:23
why do you think it spawns new Processes? It does not. Process is not even mentioned in the program.
– Alexei Kaigorodov
Nov 24 '18 at 6:43
I meant Proc 1 and Proc 2 are started together here. I understand that in most scenarious you would want this. But I did not want my workflow to be compute intensive, hence restricting Proc 1 and Proc 2 to be running in parallel. Plus it seems nice to use RxJava and not end up managing semaphores yourself
– Erric
Nov 24 '18 at 6:46
For N records, total time in my approach would beTreq/K * N + K*Tproc
whereas yours would beTreq/K * N + Tproc
(faster, but at the expense of more CPU tasks in parallel)
– Erric
Nov 24 '18 at 6:58
if you want to run ProcessRecord() sequentially, just submit them to a dedicated single-threaded of serial executor.
– Alexei Kaigorodov
Nov 24 '18 at 7:30
|
show 1 more comment
This is spawns up k separate instances of Process() as well. I was looking at having only 1 Process() happening at any given point of time, while k requests for future records are in the air. I was able to acheive this with flatMap and concatMap methods: github.com/ReactiveX/RxJava#parallel-processing
– Erric
Nov 24 '18 at 6:23
why do you think it spawns new Processes? It does not. Process is not even mentioned in the program.
– Alexei Kaigorodov
Nov 24 '18 at 6:43
I meant Proc 1 and Proc 2 are started together here. I understand that in most scenarious you would want this. But I did not want my workflow to be compute intensive, hence restricting Proc 1 and Proc 2 to be running in parallel. Plus it seems nice to use RxJava and not end up managing semaphores yourself
– Erric
Nov 24 '18 at 6:46
For N records, total time in my approach would beTreq/K * N + K*Tproc
whereas yours would beTreq/K * N + Tproc
(faster, but at the expense of more CPU tasks in parallel)
– Erric
Nov 24 '18 at 6:58
if you want to run ProcessRecord() sequentially, just submit them to a dedicated single-threaded of serial executor.
– Alexei Kaigorodov
Nov 24 '18 at 7:30
This is spawns up k separate instances of Process() as well. I was looking at having only 1 Process() happening at any given point of time, while k requests for future records are in the air. I was able to acheive this with flatMap and concatMap methods: github.com/ReactiveX/RxJava#parallel-processing
– Erric
Nov 24 '18 at 6:23
This is spawns up k separate instances of Process() as well. I was looking at having only 1 Process() happening at any given point of time, while k requests for future records are in the air. I was able to acheive this with flatMap and concatMap methods: github.com/ReactiveX/RxJava#parallel-processing
– Erric
Nov 24 '18 at 6:23
why do you think it spawns new Processes? It does not. Process is not even mentioned in the program.
– Alexei Kaigorodov
Nov 24 '18 at 6:43
why do you think it spawns new Processes? It does not. Process is not even mentioned in the program.
– Alexei Kaigorodov
Nov 24 '18 at 6:43
I meant Proc 1 and Proc 2 are started together here. I understand that in most scenarious you would want this. But I did not want my workflow to be compute intensive, hence restricting Proc 1 and Proc 2 to be running in parallel. Plus it seems nice to use RxJava and not end up managing semaphores yourself
– Erric
Nov 24 '18 at 6:46
I meant Proc 1 and Proc 2 are started together here. I understand that in most scenarious you would want this. But I did not want my workflow to be compute intensive, hence restricting Proc 1 and Proc 2 to be running in parallel. Plus it seems nice to use RxJava and not end up managing semaphores yourself
– Erric
Nov 24 '18 at 6:46
For N records, total time in my approach would be
Treq/K * N + K*Tproc
whereas yours would be Treq/K * N + Tproc
(faster, but at the expense of more CPU tasks in parallel)– Erric
Nov 24 '18 at 6:58
For N records, total time in my approach would be
Treq/K * N + K*Tproc
whereas yours would be Treq/K * N + Tproc
(faster, but at the expense of more CPU tasks in parallel)– Erric
Nov 24 '18 at 6:58
if you want to run ProcessRecord() sequentially, just submit them to a dedicated single-threaded of serial executor.
– Alexei Kaigorodov
Nov 24 '18 at 7:30
if you want to run ProcessRecord() sequentially, just submit them to a dedicated single-threaded of serial executor.
– Alexei Kaigorodov
Nov 24 '18 at 7:30
|
show 1 more comment
Here's what I finally came up with that seems to get the job done:
Flowable.just(1,2,3,4,5,6) // Completes in 1 + 6 * 3 = 19 secs
.concatMapEager(v->
Flowable.just(v)
.subscribeOn(Schedulers.io())
.map( e->{
System.out.println(getElapsed("Req " + e + " started");
Thread.sleep(1000); // Network: 1 sec
System.out.println(getElapsed("Req " + e + " done");
return e;
}, requestsOnWire, 1) // requestsOnWire = K = 2
.blockingSubscribe(new DisposableSubscriber<Integer>() {
@Override
protected void onStart() {
request(1);
}
@Override
public void onNext(Integer s) {
request(1);
System.out.println("Proc " + s + " started");
try {
Thread.sleep(3000); // Compute: 3 secs
System.out.println("Proc " + s + " done");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
Below is the execution order. Note that at any given point of time, there are 1 record being processed, at most 2 requests on wire and at most 2 unprocessed records in memory (Process trails behind by K=2) records:
0 secs: Req 1 started
: Req 2 started
1 secs: Req 2 done
: Req 1 done
: Proc 1 started
: Req 3 started
: Req 4 started
2 secs: Req 3 done
: Req 4 done
4 secs: Proc 1 done
: Proc 2 started
: Req 5 started
5 secs: Req 5 done
7 secs: Proc 2 done
: Proc 3 started
: Req 6 started
8 secs: Req 6 done
10 secs: Proc 3 done
: Proc 4 started
13 secs: Proc 4 done
: Proc 5 started
16 secs: Proc 5 done
: Proc 6 started
19 secs: Proc 6 done
Hope there are no anti-patterns/pitfalls here.
Note that increasing concurrency K is not beneficial in this example, but in scenarios where network takes longer than processing, increasing K speeds up the process
– Erric
Nov 24 '18 at 6:44
add a comment |
Here's what I finally came up with that seems to get the job done:
Flowable.just(1,2,3,4,5,6) // Completes in 1 + 6 * 3 = 19 secs
.concatMapEager(v->
Flowable.just(v)
.subscribeOn(Schedulers.io())
.map( e->{
System.out.println(getElapsed("Req " + e + " started");
Thread.sleep(1000); // Network: 1 sec
System.out.println(getElapsed("Req " + e + " done");
return e;
}, requestsOnWire, 1) // requestsOnWire = K = 2
.blockingSubscribe(new DisposableSubscriber<Integer>() {
@Override
protected void onStart() {
request(1);
}
@Override
public void onNext(Integer s) {
request(1);
System.out.println("Proc " + s + " started");
try {
Thread.sleep(3000); // Compute: 3 secs
System.out.println("Proc " + s + " done");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
Below is the execution order. Note that at any given point of time, there are 1 record being processed, at most 2 requests on wire and at most 2 unprocessed records in memory (Process trails behind by K=2) records:
0 secs: Req 1 started
: Req 2 started
1 secs: Req 2 done
: Req 1 done
: Proc 1 started
: Req 3 started
: Req 4 started
2 secs: Req 3 done
: Req 4 done
4 secs: Proc 1 done
: Proc 2 started
: Req 5 started
5 secs: Req 5 done
7 secs: Proc 2 done
: Proc 3 started
: Req 6 started
8 secs: Req 6 done
10 secs: Proc 3 done
: Proc 4 started
13 secs: Proc 4 done
: Proc 5 started
16 secs: Proc 5 done
: Proc 6 started
19 secs: Proc 6 done
Hope there are no anti-patterns/pitfalls here.
Note that increasing concurrency K is not beneficial in this example, but in scenarios where network takes longer than processing, increasing K speeds up the process
– Erric
Nov 24 '18 at 6:44
add a comment |
Here's what I finally came up with that seems to get the job done:
Flowable.just(1,2,3,4,5,6) // Completes in 1 + 6 * 3 = 19 secs
.concatMapEager(v->
Flowable.just(v)
.subscribeOn(Schedulers.io())
.map( e->{
System.out.println(getElapsed("Req " + e + " started");
Thread.sleep(1000); // Network: 1 sec
System.out.println(getElapsed("Req " + e + " done");
return e;
}, requestsOnWire, 1) // requestsOnWire = K = 2
.blockingSubscribe(new DisposableSubscriber<Integer>() {
@Override
protected void onStart() {
request(1);
}
@Override
public void onNext(Integer s) {
request(1);
System.out.println("Proc " + s + " started");
try {
Thread.sleep(3000); // Compute: 3 secs
System.out.println("Proc " + s + " done");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
Below is the execution order. Note that at any given point of time, there are 1 record being processed, at most 2 requests on wire and at most 2 unprocessed records in memory (Process trails behind by K=2) records:
0 secs: Req 1 started
: Req 2 started
1 secs: Req 2 done
: Req 1 done
: Proc 1 started
: Req 3 started
: Req 4 started
2 secs: Req 3 done
: Req 4 done
4 secs: Proc 1 done
: Proc 2 started
: Req 5 started
5 secs: Req 5 done
7 secs: Proc 2 done
: Proc 3 started
: Req 6 started
8 secs: Req 6 done
10 secs: Proc 3 done
: Proc 4 started
13 secs: Proc 4 done
: Proc 5 started
16 secs: Proc 5 done
: Proc 6 started
19 secs: Proc 6 done
Hope there are no anti-patterns/pitfalls here.
Here's what I finally came up with that seems to get the job done:
Flowable.just(1,2,3,4,5,6) // Completes in 1 + 6 * 3 = 19 secs
.concatMapEager(v->
Flowable.just(v)
.subscribeOn(Schedulers.io())
.map( e->{
System.out.println(getElapsed("Req " + e + " started");
Thread.sleep(1000); // Network: 1 sec
System.out.println(getElapsed("Req " + e + " done");
return e;
}, requestsOnWire, 1) // requestsOnWire = K = 2
.blockingSubscribe(new DisposableSubscriber<Integer>() {
@Override
protected void onStart() {
request(1);
}
@Override
public void onNext(Integer s) {
request(1);
System.out.println("Proc " + s + " started");
try {
Thread.sleep(3000); // Compute: 3 secs
System.out.println("Proc " + s + " done");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
Below is the execution order. Note that at any given point of time, there are 1 record being processed, at most 2 requests on wire and at most 2 unprocessed records in memory (Process trails behind by K=2) records:
0 secs: Req 1 started
: Req 2 started
1 secs: Req 2 done
: Req 1 done
: Proc 1 started
: Req 3 started
: Req 4 started
2 secs: Req 3 done
: Req 4 done
4 secs: Proc 1 done
: Proc 2 started
: Req 5 started
5 secs: Req 5 done
7 secs: Proc 2 done
: Proc 3 started
: Req 6 started
8 secs: Req 6 done
10 secs: Proc 3 done
: Proc 4 started
13 secs: Proc 4 done
: Proc 5 started
16 secs: Proc 5 done
: Proc 6 started
19 secs: Proc 6 done
Hope there are no anti-patterns/pitfalls here.
edited Nov 24 '18 at 7:13
answered Nov 23 '18 at 21:01
ErricErric
338421
338421
Note that increasing concurrency K is not beneficial in this example, but in scenarios where network takes longer than processing, increasing K speeds up the process
– Erric
Nov 24 '18 at 6:44
add a comment |
Note that increasing concurrency K is not beneficial in this example, but in scenarios where network takes longer than processing, increasing K speeds up the process
– Erric
Nov 24 '18 at 6:44
Note that increasing concurrency K is not beneficial in this example, but in scenarios where network takes longer than processing, increasing K speeds up the process
– Erric
Nov 24 '18 at 6:44
Note that increasing concurrency K is not beneficial in this example, but in scenarios where network takes longer than processing, increasing K speeds up the process
– Erric
Nov 24 '18 at 6:44
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53448313%2foptimal-orchestration-of-list-of-network-calls-and-processing-tasks-in-java%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
RetrieveRecord is a blocking operation. It always blocks some thread. If you move it to a single thread, then this thread would be execute all the retrievings sequentially and the time would be maximized, not minimized.
– Alexei Kaigorodov
Nov 23 '18 at 14:55
I already have a non blocking implementation of Retrieve() which returns a CompletableFuture. What I am trying to achieve here is a (strictly) bounded prefetch of next
k
records (k=1 in this case). Basically I dont want all 100 RetrieveRecords to be triggered in the start, neither do I want to wait till Process(i) is complete before starting Retreive(i+1). Basically Retrieve(i+k) should be happening over the wire while Process(i) is running– Erric
Nov 23 '18 at 15:08