-
Notifications
You must be signed in to change notification settings - Fork 113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
java.lang.IllegalStateException: more items arrived than were requested #38
Comments
FYI, I tried "breaking the chain" using a flatMap and I have no trouble there. Long start = System.currentTimeMillis();
CountDownLatch latch = new CountDownLatch(1);
Observable
.interval(0, 1, TimeUnit.MILLISECONDS)
.onBackpressureLatest()
.flatMap(i -> db
.select("SELECT v1.v || v2.v FROM (VALUES (?)) AS v1(v), (VALUES ('A'), ('B')) AS v2(v)")
.parameter(i)
.getAs(String.class)
)
.doOnNext(t -> System.out.println("result " + t))
.concatMap(i -> Observable.empty()
.delay(100, TimeUnit.MILLISECONDS)
.startWith(i)
)
.subscribe(
i -> {
System.out.println("item " + i + " received at " + (System.currentTimeMillis() - start) + "ms");
},
t -> {
System.out.println("Something went wrong ");
t.printStackTrace();
latch.countDown();
},
latch::countDown
);
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} Here's the output: result 0A
item 0A received at 497ms
result 0B
item 0B received at 603ms
result 1A
item 1A received at 938ms
result 1B
item 1B received at 1043ms
result 2A
item 2A received at 1505ms
result 2B
item 2B received at 1606ms
result 3A
item 3A received at 1816ms
result 3B
item 3B received at 1921ms
result 4A
item 4A received at 2371ms
result 4B
item 4B received at 2476ms
result 5A
item 5A received at 2686ms
result 5B
item 5B received at 2793ms
result 6A
item 6A received at 3264ms
result 6B
item 6B received at 3369ms
result 7A
item 7A received at 3579ms
result 7B
item 7B received at 3684ms
result 8A
item 8A received at 4140ms
result 8B
item 8B received at 4245ms
result 9A
... I guess something must be up with the operator ? |
Sorry about the delay @Crystark. Yes it does look from your report that there is a backpressure problem related to uses of the parameterOperator. I'll have a look at it soon, see what I can do. |
By the way this effect has most likely been there for ages but RxJava operators have become less forgiving in recent releases. Putting an |
I think the culprit might be use of |
Yep, looks like the culprit is I'd like to make a breaking change to the API to fix this one so that instead of relying on Operators we rely on Transformers. I'll ponder it a bit more. |
I've fixed |
Excellent :) Thanks a lot ! |
Wouldn't #37 be tied to this too ? |
Can you try rxjava-jdbc 0.6.7 please with this issue? |
I can confirm this now works fine in 0.6.7. |
Thanks @Crystark, I'll close this one now. |
Hi,
I'm trying to use
rxjava-jdbc
to query N elements from my db on a regular basis and then query for each of those elements an external service.That external service is limited to a certain rate so I have tested what is explained in Constraining a stream of events in Rx to a maximum rate to limit the pace of my observable.
My first test was without
rxjava-jdbc
and works fine:This prints out:
So as you can see everything works fine and
BackPressureLast
activates.Now I tried to replace the
flatMap
by a query to the database which should behave the same :Well, here's what I get:
After that there's no more output.
Am I doing something wrong here ?
Thanks
Regards,
Crystark
The text was updated successfully, but these errors were encountered: