Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Ice/ami to make sure connection close doesn't throw #3149

Merged
merged 1 commit into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions cpp/test/Ice/ami/AllTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1118,18 +1118,25 @@ allTests(TestHelper* helper, bool collocated)
{
done = true;
p->ice_ping();
vector<future<void>> results;
vector<future<void>> futures;
for (int i = 0; i < maxQueue; ++i)
{
auto s = make_shared<promise<void>>();
p->opWithPayloadAsync(
seq,
[s]() { s->set_value(); },
[s](exception_ptr ex) { s->set_exception(ex); });
results.push_back(s->get_future());
futures.push_back(s->get_future());
}
atomic_flag sent = ATOMIC_FLAG_INIT;
p->closeConnectionAsync(nullptr, nullptr, [&sent](bool) { sent.test_and_set(); });

auto closePromise = make_shared<promise<void>>();
p->closeConnectionAsync(
[closePromise] { closePromise->set_value(); },
[closePromise](exception_ptr ex) { closePromise->set_exception(ex); },
[&sent](bool) { sent.test_and_set(); });
futures.push_back(closePromise->get_future());

if (!sent.test_and_set())
{
for (int i = 0; i < maxQueue; i++)
Expand All @@ -1141,7 +1148,7 @@ allTests(TestHelper* helper, bool collocated)
[s]() { s->set_value(); },
[s](exception_ptr ex) { s->set_exception(ex); },
[&sent2](bool) { sent2.test_and_set(); });
results.push_back(s->get_future());
futures.push_back(s->get_future());
if (sent2.test_and_set())
{
done = false;
Expand All @@ -1156,11 +1163,11 @@ allTests(TestHelper* helper, bool collocated)
done = false;
}

for (vector<future<void>>::iterator r = results.begin(); r != results.end(); ++r)
for (auto& f : futures)
{
try
{
r->get();
f.get();
}
catch (const Ice::LocalException& ex)
{
Expand Down
15 changes: 14 additions & 1 deletion cpp/test/Ice/ami/TestI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,20 @@ TestIntfI::waitForBatch(int32_t count, const Ice::Current&)
void
TestIntfI::closeConnection(const Ice::Current& current)
{
current.con->close(nullptr, nullptr);
current.con->close(
nullptr,
[](exception_ptr ex)
{
try
{
rethrow_exception(ex);
}
catch (const std::exception& e)
{
cerr << "Connection::close failed with: " << e.what() << endl;
test(false);
}
});
}

void
Expand Down
15 changes: 7 additions & 8 deletions csharp/test/Ice/ami/AllTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -724,15 +724,16 @@ public static async Task allTestsAsync(global::Test.TestHelper helper, bool coll
{
done = true;
p.ice_ping();
List<Task> results = new List<Task>();
var tasks = new List<Task>();
for (int i = 0; i < maxQueue; ++i)
{
results.Add(p.opWithPayloadAsync(seq));
tasks.Add(p.opWithPayloadAsync(seq));
}

bool sentSynchronously = true;
_ = p.closeConnectionAsync(
var closeTask = p.closeConnectionAsync(
progress: new Progress<bool>(value => sentSynchronously = value));
tasks.Add(closeTask);

if (!sentSynchronously)
{
Expand All @@ -741,7 +742,7 @@ public static async Task allTestsAsync(global::Test.TestHelper helper, bool coll
Task t = p.opWithPayloadAsync(
seq,
progress: new Progress<bool>(value => sentSynchronously = value));
results.Add(t);
tasks.Add(t);
if (sentSynchronously)
{
done = false;
Expand All @@ -756,10 +757,8 @@ public static async Task allTestsAsync(global::Test.TestHelper helper, bool coll
done = false;
}

foreach (Task q in results)
{
await q;
}
await Task.WhenAll(tasks);

// Wait until the connection is closed.
await p.ice_getCachedConnection().closeAsync();
}
Expand Down
15 changes: 14 additions & 1 deletion csharp/test/Ice/ami/TestI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,20 @@ public override bool
public override void closeConnection(Ice.Current current)
{
// We can't wait for the connection to close - it would self-deadlock. So we just initiate the closure.
_ = current.con.closeAsync();
_ = closeAsync();

async Task closeAsync()
{
try
{
await current.con.closeAsync();
}
catch (System.Exception ex)
{
Console.WriteLine($"Connection close failed: {ex}");
test(false);
}
}
}

public override void abortConnection(Ice.Current current)
Expand Down
14 changes: 9 additions & 5 deletions java/test/src/main/java/test/Ice/ami/AllTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -951,15 +951,19 @@ public static void allTests(test.TestHelper helper, boolean collocated) {
while (!done && maxQueue < 50) {
done = true;
p.ice_ping();
var results = new java.util.ArrayList<>();
var futures = new java.util.ArrayList<CompletableFuture>();
for (int i = 0; i < maxQueue; ++i) {
results.add(Util.getInvocationFuture(p.opWithPayloadAsync(seq)));
futures.add(Util.getInvocationFuture(p.opWithPayloadAsync(seq)));
}
if (!Util.getInvocationFuture(p.closeConnectionAsync()).isSent()) {

var closeFuture = Util.getInvocationFuture(p.closeConnectionAsync());
futures.add(closeFuture);

if (!closeFuture.isSent()) {
for (int i = 0; i < maxQueue; i++) {
InvocationFuture<Void> r =
Util.getInvocationFuture(p.opWithPayloadAsync(seq));
results.add(r);
futures.add(r);
if (r.isSent()) {
done = false;
maxQueue *= 2;
Expand All @@ -970,7 +974,7 @@ public static void allTests(test.TestHelper helper, boolean collocated) {
maxQueue *= 2;
done = false;
}
CompletableFuture.allOf(results.toArray(new CompletableFuture[0])).join();
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();

// Wait until the connection is closed.
p.ice_getCachedConnection().close();
Expand Down
14 changes: 11 additions & 3 deletions java/test/src/main/java/test/Ice/ami/TestI.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,17 @@ public synchronized boolean waitForBatch(int count, com.zeroc.Ice.Current curren
@Override
public void closeConnection(com.zeroc.Ice.Current current) {
// We can't wait for the connection to be closed - this would cause a self dead-lock.
// So instead we just initiate the closure by running `close` in a separate thread.
var closureThread = new Thread(() -> current.con.close());
closureThread.start();
// So instead we just initiate the closure in the background.
CompletableFuture.runAsync(
() -> {
try {
current.con.close();
} catch (Exception e) {
// make sure the closure is graceful
System.err.println("********** Connection.close failed: " + e);
test(false);
}
});
}

@Override
Expand Down
1 change: 1 addition & 0 deletions python/test/Ice/ami/TestI.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def waitForBatch(self, count, current):
return result

def closeConnection(self, current):
# TODO: make sure close is graceful once API is fixed
current.con.close(False)

def abortConnection(self, current):
Expand Down
2 changes: 2 additions & 0 deletions swift/test/Ice/ami/AllTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ func allTests(_ helper: TestHelper, collocated: Bool = false) async throws {

maxQueue *= 2
}

try await p.ice_getCachedConnection()!.close()
}
}

Expand Down
8 changes: 7 additions & 1 deletion swift/test/Ice/ami/TestI.swift
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,13 @@ class TestI: TestIntf {
}

func closeConnection(current: Current) async throws {
Task { try await current.con!.close() }
Task {
do {
try await current.con!.close()
} catch {
fatalError("Connection.close failed: \(error)")
}
}
}

func abortConnection(current: Current) async throws {
Expand Down