Skip to content

Commit

Permalink
Update Ice/ami to make sure connection close doesn't throw (#3149)
Browse files Browse the repository at this point in the history
  • Loading branch information
bernardnormier authored Nov 13, 2024
1 parent fa22827 commit aaaf89d
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 25 deletions.
19 changes: 13 additions & 6 deletions cpp/test/Ice/ami/AllTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1118,18 +1118,25 @@ allTests(TestHelper* helper, bool collocated)
{
done = true;
p->ice_ping();
vector<future<void>> results;
vector<future<void>> futures;
for (int i = 0; i < maxQueue; ++i)
{
auto s = make_shared<promise<void>>();
p->opWithPayloadAsync(
seq,
[s]() { s->set_value(); },
[s](exception_ptr ex) { s->set_exception(ex); });
results.push_back(s->get_future());
futures.push_back(s->get_future());
}
atomic_flag sent = ATOMIC_FLAG_INIT;
p->closeConnectionAsync(nullptr, nullptr, [&sent](bool) { sent.test_and_set(); });

auto closePromise = make_shared<promise<void>>();
p->closeConnectionAsync(
[closePromise] { closePromise->set_value(); },
[closePromise](exception_ptr ex) { closePromise->set_exception(ex); },
[&sent](bool) { sent.test_and_set(); });
futures.push_back(closePromise->get_future());

if (!sent.test_and_set())
{
for (int i = 0; i < maxQueue; i++)
Expand All @@ -1141,7 +1148,7 @@ allTests(TestHelper* helper, bool collocated)
[s]() { s->set_value(); },
[s](exception_ptr ex) { s->set_exception(ex); },
[&sent2](bool) { sent2.test_and_set(); });
results.push_back(s->get_future());
futures.push_back(s->get_future());
if (sent2.test_and_set())
{
done = false;
Expand All @@ -1156,11 +1163,11 @@ allTests(TestHelper* helper, bool collocated)
done = false;
}

for (vector<future<void>>::iterator r = results.begin(); r != results.end(); ++r)
for (auto& f : futures)
{
try
{
r->get();
f.get();
}
catch (const Ice::LocalException& ex)
{
Expand Down
15 changes: 14 additions & 1 deletion cpp/test/Ice/ami/TestI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,20 @@ TestIntfI::waitForBatch(int32_t count, const Ice::Current&)
void
TestIntfI::closeConnection(const Ice::Current& current)
{
current.con->close(nullptr, nullptr);
current.con->close(
nullptr,
[](exception_ptr ex)
{
try
{
rethrow_exception(ex);
}
catch (const std::exception& e)
{
cerr << "Connection::close failed with: " << e.what() << endl;
test(false);
}
});
}

void
Expand Down
15 changes: 7 additions & 8 deletions csharp/test/Ice/ami/AllTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -724,15 +724,16 @@ public static async Task allTestsAsync(global::Test.TestHelper helper, bool coll
{
done = true;
p.ice_ping();
List<Task> results = new List<Task>();
var tasks = new List<Task>();
for (int i = 0; i < maxQueue; ++i)
{
results.Add(p.opWithPayloadAsync(seq));
tasks.Add(p.opWithPayloadAsync(seq));
}

bool sentSynchronously = true;
_ = p.closeConnectionAsync(
var closeTask = p.closeConnectionAsync(
progress: new Progress<bool>(value => sentSynchronously = value));
tasks.Add(closeTask);

if (!sentSynchronously)
{
Expand All @@ -741,7 +742,7 @@ public static async Task allTestsAsync(global::Test.TestHelper helper, bool coll
Task t = p.opWithPayloadAsync(
seq,
progress: new Progress<bool>(value => sentSynchronously = value));
results.Add(t);
tasks.Add(t);
if (sentSynchronously)
{
done = false;
Expand All @@ -756,10 +757,8 @@ public static async Task allTestsAsync(global::Test.TestHelper helper, bool coll
done = false;
}

foreach (Task q in results)
{
await q;
}
await Task.WhenAll(tasks);

// Wait until the connection is closed.
await p.ice_getCachedConnection().closeAsync();
}
Expand Down
15 changes: 14 additions & 1 deletion csharp/test/Ice/ami/TestI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,20 @@ public override bool
public override void closeConnection(Ice.Current current)
{
// We can't wait for the connection to close - it would self-deadlock. So we just initiate the closure.
_ = current.con.closeAsync();
_ = closeAsync();

async Task closeAsync()
{
try
{
await current.con.closeAsync();
}
catch (System.Exception ex)
{
Console.WriteLine($"Connection close failed: {ex}");
test(false);
}
}
}

public override void abortConnection(Ice.Current current)
Expand Down
14 changes: 9 additions & 5 deletions java/test/src/main/java/test/Ice/ami/AllTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -951,15 +951,19 @@ public static void allTests(test.TestHelper helper, boolean collocated) {
while (!done && maxQueue < 50) {
done = true;
p.ice_ping();
var results = new java.util.ArrayList<>();
var futures = new java.util.ArrayList<CompletableFuture>();
for (int i = 0; i < maxQueue; ++i) {
results.add(Util.getInvocationFuture(p.opWithPayloadAsync(seq)));
futures.add(Util.getInvocationFuture(p.opWithPayloadAsync(seq)));
}
if (!Util.getInvocationFuture(p.closeConnectionAsync()).isSent()) {

var closeFuture = Util.getInvocationFuture(p.closeConnectionAsync());
futures.add(closeFuture);

if (!closeFuture.isSent()) {
for (int i = 0; i < maxQueue; i++) {
InvocationFuture<Void> r =
Util.getInvocationFuture(p.opWithPayloadAsync(seq));
results.add(r);
futures.add(r);
if (r.isSent()) {
done = false;
maxQueue *= 2;
Expand All @@ -970,7 +974,7 @@ public static void allTests(test.TestHelper helper, boolean collocated) {
maxQueue *= 2;
done = false;
}
CompletableFuture.allOf(results.toArray(new CompletableFuture[0])).join();
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();

// Wait until the connection is closed.
p.ice_getCachedConnection().close();
Expand Down
14 changes: 11 additions & 3 deletions java/test/src/main/java/test/Ice/ami/TestI.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,17 @@ public synchronized boolean waitForBatch(int count, com.zeroc.Ice.Current curren
@Override
public void closeConnection(com.zeroc.Ice.Current current) {
// We can't wait for the connection to be closed - this would cause a self dead-lock.
// So instead we just initiate the closure by running `close` in a separate thread.
var closureThread = new Thread(() -> current.con.close());
closureThread.start();
// So instead we just initiate the closure in the background.
CompletableFuture.runAsync(
() -> {
try {
current.con.close();
} catch (Exception e) {
// make sure the closure is graceful
System.err.println("********** Connection.close failed: " + e);
test(false);
}
});
}

@Override
Expand Down
1 change: 1 addition & 0 deletions python/test/Ice/ami/TestI.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def waitForBatch(self, count, current):
return result

def closeConnection(self, current):
# TODO: make sure close is graceful once API is fixed
current.con.close(False)

def abortConnection(self, current):
Expand Down
2 changes: 2 additions & 0 deletions swift/test/Ice/ami/AllTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ func allTests(_ helper: TestHelper, collocated: Bool = false) async throws {

maxQueue *= 2
}

try await p.ice_getCachedConnection()!.close()
}
}

Expand Down
8 changes: 7 additions & 1 deletion swift/test/Ice/ami/TestI.swift
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,13 @@ class TestI: TestIntf {
}

func closeConnection(current: Current) async throws {
Task { try await current.con!.close() }
Task {
do {
try await current.con!.close()
} catch {
fatalError("Connection.close failed: \(error)")
}
}
}

func abortConnection(current: Current) async throws {
Expand Down

0 comments on commit aaaf89d

Please sign in to comment.