diff --git a/cpp/test/DataStorm/reliability/Reader.cpp b/cpp/test/DataStorm/reliability/Reader.cpp index 0b0856ef001..5c85686c5c0 100644 --- a/cpp/test/DataStorm/reliability/Reader.cpp +++ b/cpp/test/DataStorm/reliability/Reader.cpp @@ -57,7 +57,7 @@ void ::Reader::run(int argc, char* argv[]) auto connection = node.getSessionConnection(sample.getSession()); while (!connection) { - this_thread::sleep_for(chrono::milliseconds(200)); + this_thread::sleep_for(chrono::milliseconds(10)); connection = node.getSessionConnection(sample.getSession()); } connection->close().get(); @@ -68,6 +68,71 @@ void ::Reader::run(int argc, char* argv[]) writer.update(0); writer.waitForNoReaders(); } + + { + Topic topic(node, "int2"); + auto reader = makeSingleKeyReader(topic, "element", "", config); + string session; + + // Read 100 samples from the "element" key and close the connection. + for (int i = 0; i < 100; ++i) + { + auto sample = reader.getNextUnread(); + if (sample.getValue() != i) + { + cerr << "unexpected sample: " << sample.getValue() << " expected:" << i << endl; + test(false); + } + session = sample.getSession(); + } + + auto connection = node.getSessionConnection(session); + while (!connection) + { + this_thread::sleep_for(chrono::milliseconds(10)); + connection = node.getSessionConnection(session); + } + connection->close().get(); + + // Send a sample to the writer on "reader_barrier" to let it know that the connection was closed. + // The writer will read it after the session is reestablished. + auto writerB = makeSingleKeyWriter(topic, "reader_barrier"); + writerB.waitForReaders(); + writerB.update(0); + + // Wait for the writer to acknowledge the sample send on "reader_barrier" and close the connection again. + auto readerB = makeSingleKeyReader(topic, "writer_barrier"); + [[maybe_unused]] auto _ = readerB.getNextUnread(); + + // Session was reestablish close again + connection = node.getSessionConnection(session); + while (!connection) + { + this_thread::sleep_for(chrono::milliseconds(10)); + connection = node.getSessionConnection(session); + } + connection->close().get(); + + // Let the writer know the connection was closed again, and that it can proceed with the second batch of + // samples. + writerB.update(0); + + for (int i = 0; i < 100; ++i) + { + auto sample = reader.getNextUnread(); + if (sample.getValue() != i + 100) + { + cerr << "unexpected sample: " << sample.getValue() << " expected:" << (i + 100) << endl; + test(false); + } + session = sample.getSession(); + } + + // Let the writer know we have processed all samples. + writerB.waitForReaders(); + writerB.update(0); + writerB.waitForNoReaders(); + } } DEFINE_TEST(::Reader) diff --git a/cpp/test/DataStorm/reliability/Writer.cpp b/cpp/test/DataStorm/reliability/Writer.cpp index e0517fd641b..704db8a2498 100644 --- a/cpp/test/DataStorm/reliability/Writer.cpp +++ b/cpp/test/DataStorm/reliability/Writer.cpp @@ -54,6 +54,43 @@ void ::Writer::run(int argc, char* argv[]) [[maybe_unused]] auto _ = makeSingleKeyReader(topic, "barrier").getNextUnread(); } cout << "ok" << endl; + + // Publish a batch of samples to a topic's key, follow by two consecutive session recovery events without writer + // activity on the given key. + // Then send a second batch of samples to the same topic's key and ensure the reader continue reading from when it + // left off. + cout << "testing reader multiple connection closure without writer activity... " << flush; + { + Topic topic(node, "int2"); + auto writer = makeSingleKeyWriter(topic, "element", "", config); + writer.waitForReaders(); + for (int i = 0; i < 100; ++i) + { + writer.update(i); + } + + auto readerB = makeSingleKeyReader(topic, "reader_barrier"); + + // A control sample send by the reader to let the writer know the connection was closed. The writer process this + // sample after the first session reestablishment. + auto sample = readerB.getNextUnread(); + + // Send a control sample to let the reader know session was reestablished. + auto writerB = makeSingleKeyWriter(topic, "writer_barrier"); + writerB.update(0); + + // Wait for a second control sample from the reader indicating the second session closure. The writer process + // this sample after the second session reestablishment. + sample = readerB.getNextUnread(); + + // Session has been reestablish twice without activity in "element" key. Send the second batch of samples. + for (int i = 0; i < 100; ++i) + { + writer.update(i + 100); + } + sample = readerB.getNextUnread(); + } + cout << "ok" << endl; } DEFINE_TEST(::Writer)