Skip to content

Commit

Permalink
Wait for all opWithPayload completion (#3142)
Browse files Browse the repository at this point in the history
  • Loading branch information
pepone authored Nov 13, 2024
1 parent 129e55f commit 4c9de02
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ private synchronized void closeImpl() {
doApplicationClose();
} else {
_closeRequested = true;
scheduleCloseTimer(); // we don't wait forever for outstanding invocations to
// complete
// we don't wait forever for outstanding invocations to complete
scheduleCloseTimer();
}
}
// else nothing else to do, already closing or closed.
Expand Down
31 changes: 17 additions & 14 deletions java/test/src/main/java/test/Ice/ami/AllTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import test.Ice.ami.Test.TestIntfPrx;

import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -738,11 +739,13 @@ public static void allTests(test.TestHelper helper, boolean collocated) {
testController.holdAdapter();
InvocationFuture<Void> r1;
InvocationFuture<Void> r2;
var results = new ArrayList<InvocationFuture<Void>>();
try {
r1 = Util.getInvocationFuture(p.opAsync());
byte[] seq = new byte[10024];
while (true) {
r2 = Util.getInvocationFuture(p.opWithPayloadAsync(seq));
results.add(r2);
if (!r2.sentSynchronously()) {
break;
}
Expand Down Expand Up @@ -773,6 +776,8 @@ public static void allTests(test.TestHelper helper, boolean collocated) {

test(r1.getOperation().equals("op"));
test(r2.getOperation().equals("opWithPayload"));

CompletableFuture.allOf(results.toArray(new CompletableFuture[0])).join();
}

{
Expand Down Expand Up @@ -850,12 +855,14 @@ public static void allTests(test.TestHelper helper, boolean collocated) {
InvocationFuture<String> r2 = null;

testController.holdAdapter();
var results = new ArrayList<InvocationFuture<Void>>();
try {
InvocationFuture<Void> r = null;
byte[] seq = new byte[10024];
for (int i = 0; i < 200; ++i) // 2MB
{
r = Util.getInvocationFuture(p.opWithPayloadAsync(seq));
results.add(r);
}

test(!r.isSent());
Expand All @@ -877,6 +884,9 @@ public static void allTests(test.TestHelper helper, boolean collocated) {
} finally {
testController.resumeAdapter();
}

CompletableFuture.allOf(results.toArray(new CompletableFuture[0])).join();

p.ice_ping();
// test(!r1.isSent() && r1.isDone());
test(!r1.isSent());
Expand Down Expand Up @@ -912,11 +922,8 @@ public static void allTests(test.TestHelper helper, boolean collocated) {
out.print("testing connection close... ");
out.flush();
{
//
// Local case: begin a request, close the connection gracefully, and make sure it
// waits
// for the request to complete.
//
// waits for the request to complete.
com.zeroc.Ice.Connection con = p.ice_getConnection();
Callback cb = new Callback();
con.setCloseCallback(c -> cb.called());
Expand All @@ -935,19 +942,16 @@ public static void allTests(test.TestHelper helper, boolean collocated) {
//
byte[] seq = new byte[1024 * 10];

//
// Send multiple opWithPayload, followed by a close and followed by multiple
// opWithPayload.
// The goal is to make sure that none of the opWithPayload fail even if the server
// closes
// the connection gracefully in between.
//
// closes the connection gracefully in between.
int maxQueue = 2;
boolean done = false;
while (!done && maxQueue < 50) {
done = true;
p.ice_ping();
java.util.List<InvocationFuture<Void>> results = new java.util.ArrayList<>();
var results = new java.util.ArrayList<>();
for (int i = 0; i < maxQueue; ++i) {
results.add(Util.getInvocationFuture(p.opWithPayloadAsync(seq)));
}
Expand All @@ -966,12 +970,11 @@ public static void allTests(test.TestHelper helper, boolean collocated) {
maxQueue *= 2;
done = false;
}
for (InvocationFuture<Void> q : results) {
q.join();
}
CompletableFuture.allOf(results.toArray(new CompletableFuture[0])).join();

// Wait until the connection is closed.
p.ice_getCachedConnection().close();
}
// Wait until the connection is closed.
p.ice_getCachedConnection().close();
}
out.println("ok");

Expand Down
3 changes: 1 addition & 2 deletions java/test/src/main/java/test/Ice/ami/TestI.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ public void sleep(int ms, com.zeroc.Ice.Current current) {
public synchronized CompletionStage<Void> startDispatchAsync(com.zeroc.Ice.Current current) {
if (_shutdown) {
// Ignore, this can occur with the forceful connection close test, shutdown can be
// dispatch
// before start dispatch.
// dispatch before start dispatch.
CompletableFuture<Void> v = new CompletableFuture<>();
v.complete(null);
return v;
Expand Down

0 comments on commit 4c9de02

Please sign in to comment.