Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQS read - Upstream failed. java.util.concurrent.CompletionException #9

Open
kartzcoder opened this issue Mar 13, 2019 · 1 comment

Comments

@kartzcoder
Copy link

When am trying to run your code, I get this run time exception being thrown. I have my AWS SQS queue configured and the URLs replaced in the source code. Not sure if I am missing anything else?

SEVERE: [sending to publish queue] Upstream failed. java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://default/user/$a#923390501]] after [2000 ms]. Message of type [java.lang.String]. A typical reason for AskTimeoutException is that the recipient actor didn't send a reply. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at scala.concurrent.java8.FuturesConvertersImpl$CF.apply(FutureConvertersImpl.scala:21) at scala.concurrent.java8.FuturesConvertersImpl$CF.apply(FutureConvertersImpl.scala:18) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at scala.concurrent.BatchingExecutor$Batch.processBatch$1(BatchingExecutor.scala:63) at scala.concurrent.BatchingExecutor$Batch.$anonfun$run$1(BatchingExecutor.scala:78) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81) at scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:55) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870) at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:106) at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870) at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109) at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279) at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283) at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235) at java.lang.Thread.run(Thread.java:748) Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://default/user/$a#923390501]] after [2000 ms]. Message of type [java.lang.String]. A typical reason for AskTimeoutException is that the recipient actor didn't send a reply.

@kartzcoder
Copy link
Author

I changed the code to do this for me and it worked for completing the reading part and not publish back.

CompletionStage streamCompletion = SqsSource.create(sourceQueueUrl, settings, sqsClient)
.log("read from SQS", log)
.mapAsync(8, (Message msg) -> {
return enrichAndPublish(msg)
// upon completion ignore the result and pass on the original message
.thenApply(result -> msg);
})
.map(msg -> MessageAction.delete(msg))
.runWith(
SqsAckSink.create(sourceQueueUrl, ackSettings, sqsClient),
materializer
);
// terminate the actor system when the stream completes (see withCloseOnEmptyReceive)
streamCompletion.thenAccept(done -> system.terminate());
}

CompletionStage<MessageFromSqs> enrichAndPublish(Message sqsMsg) {
    return Source.<Message>single(sqsMsg)
            .map(message -> transform(message))
            .runWith(Sink.head(), materializer);
}

I just did not want to publish back to the queue, so I avoided the logic whole together. However, am not sure why am not able to publish back to the queue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant