Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

java.lang.IllegalStateException: more items arrived than were requested #38

Closed
Crystark opened this issue Aug 27, 2015 · 11 comments
Closed
Labels

Comments

@Crystark
Copy link

Hi,

I'm trying to use rxjava-jdbc to query N elements from my db on a regular basis and then query for each of those elements an external service.

That external service is limited to a certain rate so I have tested what is explained in Constraining a stream of events in Rx to a maximum rate to limit the pace of my observable.

My first test was without rxjava-jdbc and works fine:

    Long start = System.currentTimeMillis();
    CountDownLatch latch = new CountDownLatch(1);
    Observable
        .interval(0, 1, TimeUnit.MILLISECONDS)
        .onBackpressureLatest()
        .doOnNext(t -> System.out.println("start " + t))
        .flatMap(i -> Observable.just(i + "A", i + "B"))
        .doOnNext(t -> System.out.println("result " + t))
        .concatMap(i -> Observable.empty()
            .delay(100, TimeUnit.MILLISECONDS)
            .startWith(i)
        )
        .subscribe(
            i -> {
                System.out.println("item " + i + " received at " + (System.currentTimeMillis() - start) + "ms");
            },
            t -> {
                System.out.println("Something went wrong ");
                t.printStackTrace();
                latch.countDown();
            },
            latch::countDown
        );

    try {
        latch.await();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }

This prints out:

start 0
result 0A
item 0A received at 70ms
result 0B
start 1
start 2
start 3
...
start 106
item 0B received at 175ms
start 107
start 108
result 1A
start 109
start 110
...
start 128
item 1A received at 280ms
result 1B
start 215
item 1B received at 385ms
result 2A
item 2A received at 490ms
result 2B
start 421
item 2B received at 591ms
result 3A
item 3A received at 696ms
result 3B
start 631
...

So as you can see everything works fine and BackPressureLast activates.

Now I tried to replace the flatMap by a query to the database which should behave the same :

    Long start = System.currentTimeMillis();
    CountDownLatch latch = new CountDownLatch(1);
    Observable
        .interval(0, 1, TimeUnit.MILLISECONDS)
        .onBackpressureLatest()
        .doOnNext(t -> System.out.println("start " + t))
        .lift(db
            .select("SELECT v1.v || v2.v FROM (VALUES (?)) AS v1(v), (VALUES ('A'), ('B')) AS v2(v)")
            .parameterOperator()
            .getAs(String.class)
        )
        .doOnNext(t -> System.out.println("result " + t))
        .concatMap(i -> Observable.empty()
            .delay(100, TimeUnit.MILLISECONDS)
            .startWith(i)
        )
        .subscribe(
            i -> {
                System.out.println("item " + i + " received at " + (System.currentTimeMillis() - start) + "ms");
            },
            t -> {
                System.out.println("Something went wrong ");
                t.printStackTrace();
                latch.countDown();
            },
            latch::countDown
        );

    try {
        latch.await();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }

Well, here's what I get:

start 0
result 0A
item 0A received at 463ms
result 0B
start 1
start 2
start 3
start 4
Something went wrong
java.lang.IllegalStateException: more items arrived than were requested
    at rx.internal.producers.ProducerArbiter.produced(ProducerArbiter.java:98)
    at rx.internal.operators.OperatorConcat$ConcatInnerSubscriber.onNext(OperatorConcat.java:208)
    at rx.observers.SerializedObserver.onNext(SerializedObserver.java:95)
    at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:95)
    at rx.internal.operators.OperatorConcat$ConcatInnerSubscriber.onNext(OperatorConcat.java:206)
    at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
    at com.github.davidmoten.rx.subjects.PublishSubjectSingleSubscriber.onNext(PublishSubjectSingleSubscriber.java:58)
    at rx.observers.Subscribers$1.onNext(Subscribers.java:67)
    at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:84)
    at rx.internal.operators.OperatorOnBackpressureLatest$LatestEmitter.emit(OperatorOnBackpressureLatest.java:163)
    at rx.internal.operators.OperatorOnBackpressureLatest$LatestEmitter.onNext(OperatorOnBackpressureLatest.java:129)
    at rx.internal.operators.OperatorOnBackpressureLatest$LatestSubscriber.onNext(OperatorOnBackpressureLatest.java:209)
    at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:51)
    at rx.Scheduler$Worker$1.call(Scheduler.java:120)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: com.github.davidmoten.rx.jdbc.Parameter.class
    at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:104)
    at rx.observers.SerializedObserver.onNext(SerializedObserver.java:99)
    ... 19 more

After that there's no more output.

Am I doing something wrong here ?
Thanks

Regards,
Crystark

@Crystark
Copy link
Author

FYI, I tried "breaking the chain" using a flatMap and I have no trouble there.

    Long start = System.currentTimeMillis();
    CountDownLatch latch = new CountDownLatch(1);
    Observable
        .interval(0, 1, TimeUnit.MILLISECONDS)
        .onBackpressureLatest()
        .flatMap(i -> db
            .select("SELECT v1.v || v2.v FROM (VALUES (?)) AS v1(v), (VALUES ('A'), ('B')) AS v2(v)")
            .parameter(i)
            .getAs(String.class)
        )
        .doOnNext(t -> System.out.println("result " + t))
        .concatMap(i -> Observable.empty()
            .delay(100, TimeUnit.MILLISECONDS)
            .startWith(i)
        )
        .subscribe(
            i -> {
                System.out.println("item " + i + " received at " + (System.currentTimeMillis() - start) + "ms");
            },
            t -> {
                System.out.println("Something went wrong ");
                t.printStackTrace();
                latch.countDown();
            },
            latch::countDown
        );

    try {
        latch.await();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }

Here's the output:

result 0A
item 0A received at 497ms
result 0B
item 0B received at 603ms
result 1A
item 1A received at 938ms
result 1B
item 1B received at 1043ms
result 2A
item 2A received at 1505ms
result 2B
item 2B received at 1606ms
result 3A
item 3A received at 1816ms
result 3B
item 3B received at 1921ms
result 4A
item 4A received at 2371ms
result 4B
item 4B received at 2476ms
result 5A
item 5A received at 2686ms
result 5B
item 5B received at 2793ms
result 6A
item 6A received at 3264ms
result 6B
item 6B received at 3369ms
result 7A
item 7A received at 3579ms
result 7B
item 7B received at 3684ms
result 8A
item 8A received at 4140ms
result 8B
item 8B received at 4245ms
result 9A
...

I guess something must be up with the operator ?

@davidmoten
Copy link
Owner

Sorry about the delay @Crystark. Yes it does look from your report that there is a backpressure problem related to uses of the parameterOperator. I'll have a look at it soon, see what I can do.

@davidmoten
Copy link
Owner

By the way this effect has most likely been there for ages but RxJava operators have become less forgiving in recent releases. Putting an .onBackpressureBuffer after that .lift would probably work too.

@davidmoten
Copy link
Owner

I think the culprit might be use of OperatorFromTransformer which inserts a subject in the change and breaks backpressure. I'll have a look soonish.

@davidmoten
Copy link
Owner

Yep, looks like the culprit is OperatorFromTransformer from rxjava-extras that predated backpressure and doesn't support it.

I'd like to make a breaking change to the API to fix this one so that instead of relying on Operators we rely on Transformers. I'll ponder it a bit more.

@davidmoten
Copy link
Owner

I've fixed OperatorFromTransformer in rxjava-extras to support backpressure. I'll release new versions of rxjava-extras and rxjava-jdbc in 12 hours or so. No API changes.

@Crystark
Copy link
Author

Excellent :) Thanks a lot !

@Crystark
Copy link
Author

Wouldn't #37 be tied to this too ?

@davidmoten
Copy link
Owner

Can you try rxjava-jdbc 0.6.7 please with this issue?

@davidmoten davidmoten added the bug label Sep 25, 2015
@Crystark
Copy link
Author

I can confirm this now works fine in 0.6.7.

@davidmoten
Copy link
Owner

Thanks @Crystark, I'll close this one now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants