From 688f3a5f20893e4a2bd59075d65f8a2d3d164dfb Mon Sep 17 00:00:00 2001 From: Bernard Normier Date: Wed, 13 Nov 2024 15:18:42 -0500 Subject: [PATCH] Update Ice/ami to make sure connection close doesn't throw --- cpp/test/Ice/ami/AllTests.cpp | 19 +++++++++++++------ cpp/test/Ice/ami/TestI.cpp | 15 ++++++++++++++- csharp/test/Ice/ami/AllTests.cs | 15 +++++++-------- csharp/test/Ice/ami/TestI.cs | 15 ++++++++++++++- .../src/main/java/test/Ice/ami/AllTests.java | 14 +++++++++----- .../src/main/java/test/Ice/ami/TestI.java | 14 +++++++++++--- python/test/Ice/ami/TestI.py | 1 + swift/test/Ice/ami/AllTests.swift | 2 ++ swift/test/Ice/ami/TestI.swift | 8 +++++++- 9 files changed, 78 insertions(+), 25 deletions(-) diff --git a/cpp/test/Ice/ami/AllTests.cpp b/cpp/test/Ice/ami/AllTests.cpp index 2d251fd6b6f..874559667cc 100644 --- a/cpp/test/Ice/ami/AllTests.cpp +++ b/cpp/test/Ice/ami/AllTests.cpp @@ -1118,7 +1118,7 @@ allTests(TestHelper* helper, bool collocated) { done = true; p->ice_ping(); - vector> results; + vector> futures; for (int i = 0; i < maxQueue; ++i) { auto s = make_shared>(); @@ -1126,10 +1126,17 @@ allTests(TestHelper* helper, bool collocated) seq, [s]() { s->set_value(); }, [s](exception_ptr ex) { s->set_exception(ex); }); - results.push_back(s->get_future()); + futures.push_back(s->get_future()); } atomic_flag sent = ATOMIC_FLAG_INIT; - p->closeConnectionAsync(nullptr, nullptr, [&sent](bool) { sent.test_and_set(); }); + + auto closePromise = make_shared>(); + p->closeConnectionAsync( + [closePromise] { closePromise->set_value(); }, + [closePromise](exception_ptr ex) { closePromise->set_exception(ex); }, + [&sent](bool) { sent.test_and_set(); }); + futures.push_back(closePromise->get_future()); + if (!sent.test_and_set()) { for (int i = 0; i < maxQueue; i++) @@ -1141,7 +1148,7 @@ allTests(TestHelper* helper, bool collocated) [s]() { s->set_value(); }, [s](exception_ptr ex) { s->set_exception(ex); }, [&sent2](bool) { sent2.test_and_set(); }); - results.push_back(s->get_future()); + futures.push_back(s->get_future()); if (sent2.test_and_set()) { done = false; @@ -1156,11 +1163,11 @@ allTests(TestHelper* helper, bool collocated) done = false; } - for (vector>::iterator r = results.begin(); r != results.end(); ++r) + for (auto& f : futures) { try { - r->get(); + f.get(); } catch (const Ice::LocalException& ex) { diff --git a/cpp/test/Ice/ami/TestI.cpp b/cpp/test/Ice/ami/TestI.cpp index cc09dd41bfc..330edc2789a 100644 --- a/cpp/test/Ice/ami/TestI.cpp +++ b/cpp/test/Ice/ami/TestI.cpp @@ -97,7 +97,20 @@ TestIntfI::waitForBatch(int32_t count, const Ice::Current&) void TestIntfI::closeConnection(const Ice::Current& current) { - current.con->close(nullptr, nullptr); + current.con->close( + nullptr, + [](exception_ptr ex) + { + try + { + rethrow_exception(ex); + } + catch (const std::exception& e) + { + cerr << "Connection::close failed with: " << e.what() << endl; + test(false); + } + }); } void diff --git a/csharp/test/Ice/ami/AllTests.cs b/csharp/test/Ice/ami/AllTests.cs index dc31d91fba4..72f842db059 100644 --- a/csharp/test/Ice/ami/AllTests.cs +++ b/csharp/test/Ice/ami/AllTests.cs @@ -724,15 +724,16 @@ public static async Task allTestsAsync(global::Test.TestHelper helper, bool coll { done = true; p.ice_ping(); - List results = new List(); + var tasks = new List(); for (int i = 0; i < maxQueue; ++i) { - results.Add(p.opWithPayloadAsync(seq)); + tasks.Add(p.opWithPayloadAsync(seq)); } bool sentSynchronously = true; - _ = p.closeConnectionAsync( + var closeTask = p.closeConnectionAsync( progress: new Progress(value => sentSynchronously = value)); + tasks.Add(closeTask); if (!sentSynchronously) { @@ -741,7 +742,7 @@ public static async Task allTestsAsync(global::Test.TestHelper helper, bool coll Task t = p.opWithPayloadAsync( seq, progress: new Progress(value => sentSynchronously = value)); - results.Add(t); + tasks.Add(t); if (sentSynchronously) { done = false; @@ -756,10 +757,8 @@ public static async Task allTestsAsync(global::Test.TestHelper helper, bool coll done = false; } - foreach (Task q in results) - { - await q; - } + await Task.WhenAll(tasks); + // Wait until the connection is closed. await p.ice_getCachedConnection().closeAsync(); } diff --git a/csharp/test/Ice/ami/TestI.cs b/csharp/test/Ice/ami/TestI.cs index d4edf7300bc..b7c7e94c8d9 100644 --- a/csharp/test/Ice/ami/TestI.cs +++ b/csharp/test/Ice/ami/TestI.cs @@ -73,7 +73,20 @@ public override bool public override void closeConnection(Ice.Current current) { // We can't wait for the connection to close - it would self-deadlock. So we just initiate the closure. - _ = current.con.closeAsync(); + _ = closeAsync(); + + async Task closeAsync() + { + try + { + await current.con.closeAsync(); + } + catch (System.Exception ex) + { + Console.WriteLine($"Connection close failed: {ex}"); + test(false); + } + } } public override void abortConnection(Ice.Current current) diff --git a/java/test/src/main/java/test/Ice/ami/AllTests.java b/java/test/src/main/java/test/Ice/ami/AllTests.java index 25cf1d3b1d5..8baee79a237 100644 --- a/java/test/src/main/java/test/Ice/ami/AllTests.java +++ b/java/test/src/main/java/test/Ice/ami/AllTests.java @@ -951,15 +951,19 @@ public static void allTests(test.TestHelper helper, boolean collocated) { while (!done && maxQueue < 50) { done = true; p.ice_ping(); - var results = new java.util.ArrayList<>(); + var futures = new java.util.ArrayList(); for (int i = 0; i < maxQueue; ++i) { - results.add(Util.getInvocationFuture(p.opWithPayloadAsync(seq))); + futures.add(Util.getInvocationFuture(p.opWithPayloadAsync(seq))); } - if (!Util.getInvocationFuture(p.closeConnectionAsync()).isSent()) { + + var closeFuture = Util.getInvocationFuture(p.closeConnectionAsync()); + futures.add(closeFuture); + + if (!closeFuture.isSent()) { for (int i = 0; i < maxQueue; i++) { InvocationFuture r = Util.getInvocationFuture(p.opWithPayloadAsync(seq)); - results.add(r); + futures.add(r); if (r.isSent()) { done = false; maxQueue *= 2; @@ -970,7 +974,7 @@ public static void allTests(test.TestHelper helper, boolean collocated) { maxQueue *= 2; done = false; } - CompletableFuture.allOf(results.toArray(new CompletableFuture[0])).join(); + CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join(); // Wait until the connection is closed. p.ice_getCachedConnection().close(); diff --git a/java/test/src/main/java/test/Ice/ami/TestI.java b/java/test/src/main/java/test/Ice/ami/TestI.java index 6e6be5ec86c..540dab8b0ff 100644 --- a/java/test/src/main/java/test/Ice/ami/TestI.java +++ b/java/test/src/main/java/test/Ice/ami/TestI.java @@ -128,9 +128,17 @@ public synchronized boolean waitForBatch(int count, com.zeroc.Ice.Current curren @Override public void closeConnection(com.zeroc.Ice.Current current) { // We can't wait for the connection to be closed - this would cause a self dead-lock. - // So instead we just initiate the closure by running `close` in a separate thread. - var closureThread = new Thread(() -> current.con.close()); - closureThread.start(); + // So instead we just initiate the closure in the background. + CompletableFuture.runAsync( + () -> { + try { + current.con.close(); + } catch (Exception e) { + // make sure the closure is graceful + System.err.println("********** Connection.close failed: " + e); + test(false); + } + }); } @Override diff --git a/python/test/Ice/ami/TestI.py b/python/test/Ice/ami/TestI.py index 8d7002c0f80..512158ef252 100644 --- a/python/test/Ice/ami/TestI.py +++ b/python/test/Ice/ami/TestI.py @@ -45,6 +45,7 @@ def waitForBatch(self, count, current): return result def closeConnection(self, current): + # TODO: make sure close is graceful once API is fixed current.con.close(False) def abortConnection(self, current): diff --git a/swift/test/Ice/ami/AllTests.swift b/swift/test/Ice/ami/AllTests.swift index 1d34864b66c..0be084265a2 100644 --- a/swift/test/Ice/ami/AllTests.swift +++ b/swift/test/Ice/ami/AllTests.swift @@ -312,6 +312,8 @@ func allTests(_ helper: TestHelper, collocated: Bool = false) async throws { maxQueue *= 2 } + + try await p.ice_getCachedConnection()!.close() } } diff --git a/swift/test/Ice/ami/TestI.swift b/swift/test/Ice/ami/TestI.swift index 1762fa318cd..875d56e6ec8 100644 --- a/swift/test/Ice/ami/TestI.swift +++ b/swift/test/Ice/ami/TestI.swift @@ -150,7 +150,13 @@ class TestI: TestIntf { } func closeConnection(current: Current) async throws { - Task { try await current.con!.close() } + Task { + do { + try await current.con!.close() + } catch { + fatalError("Connection.close failed: \(error)") + } + } } func abortConnection(current: Current) async throws {