Skip to content

Commit

Permalink
remove timeout in async test
Browse files Browse the repository at this point in the history
  • Loading branch information
Congyuwang committed Jul 30, 2023
1 parent b928a41 commit 85e987f
Show file tree
Hide file tree
Showing 13 changed files with 123 additions and 159 deletions.
23 changes: 10 additions & 13 deletions tests/test_transfer_data_large_async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,15 @@ int test_transfer_data_large_async(int argc, char **argv) {
store.connect_to_addr(addr);

// Wait for the connection to close
while (true) {
if (store_cb->has_closed.load()) {
auto avg_size = *store_cb->add_data / *store_cb->count;
std::cout << "received " << *store_cb->count << " messages ,"
<< "total size = " << *store_cb->add_data << " bytes, "
<< "average size = " << avg_size << " bytes" << std::endl;
assert(*store_cb->add_data == TOTAL_SIZE);
return 0;
}
{
std::unique_lock<std::mutex> u_lock(store_cb->mutex);
store_cb->cond.wait_for(u_lock, std::chrono::milliseconds(WAIT_MILLIS));
}
{
std::unique_lock<std::mutex> u_lock(store_cb->mutex);
store_cb->cond.wait(u_lock,
[store_cb]() { return store_cb->has_closed.load(); });
}
auto avg_size = *store_cb->add_data / *store_cb->count;
std::cout << "received " << *store_cb->count << " messages ,"
<< "total size = " << *store_cb->add_data << " bytes, "
<< "average size = " << avg_size << " bytes" << std::endl;
assert(*store_cb->add_data == TOTAL_SIZE);
return 0;
}
23 changes: 10 additions & 13 deletions tests/test_transfer_data_large_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,15 @@ int test_transfer_data_large_block(int argc, char **argv) {
store.connect_to_addr(addr);

// Wait for the connection to close
while (true) {
if (store_cb->has_closed.load()) {
auto avg_size = *store_cb->add_data / *store_cb->count;
std::cout << "received " << *store_cb->count << " messages ,"
<< "total size = " << *store_cb->add_data << " bytes, "
<< "average size = " << avg_size << " bytes" << std::endl;
assert(*store_cb->add_data == TOTAL_SIZE);
return 0;
}
{
std::unique_lock<std::mutex> u_lock(store_cb->mutex);
store_cb->cond.wait_for(u_lock, std::chrono::milliseconds(WAIT_MILLIS));
}
{
std::unique_lock<std::mutex> u_lock(store_cb->mutex);
store_cb->cond.wait(u_lock,
[store_cb]() { return store_cb->has_closed.load(); });
}
auto avg_size = *store_cb->add_data / *store_cb->count;
std::cout << "received " << *store_cb->count << " messages ,"
<< "total size = " << *store_cb->add_data << " bytes, "
<< "average size = " << avg_size << " bytes" << std::endl;
assert(*store_cb->add_data == TOTAL_SIZE);
return 0;
}
23 changes: 10 additions & 13 deletions tests/test_transfer_data_large_no_flush.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,15 @@ int test_transfer_data_large_no_flush(int argc, char **argv) {
store.connect_to_addr(addr);

// Wait for the connection to close
while (true) {
if (store_cb->has_closed.load()) {
auto avg_size = *store_cb->add_data / *store_cb->count;
std::cout << "received " << *store_cb->count << " messages ,"
<< "total size = " << *store_cb->add_data << " bytes, "
<< "average size = " << avg_size << " bytes" << std::endl;
assert(*store_cb->add_data == TOTAL_SIZE);
return 0;
}
{
std::unique_lock<std::mutex> u_lock(store_cb->mutex);
store_cb->cond.wait_for(u_lock, std::chrono::milliseconds(WAIT_MILLIS));
}
{
std::unique_lock<std::mutex> u_lock(store_cb->mutex);
store_cb->cond.wait(u_lock,
[store_cb]() { return store_cb->has_closed.load(); });
}
auto avg_size = *store_cb->add_data / *store_cb->count;
std::cout << "received " << *store_cb->count << " messages ,"
<< "total size = " << *store_cb->add_data << " bytes, "
<< "average size = " << avg_size << " bytes" << std::endl;
assert(*store_cb->add_data == TOTAL_SIZE);
return 0;
}
23 changes: 10 additions & 13 deletions tests/test_transfer_data_large_nonblock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,15 @@ int test_transfer_data_large_nonblock(int argc, char **argv) {
store.connect_to_addr(addr);

// Wait for the connection to close
while (true) {
if (store_cb->has_closed.load()) {
auto avg_size = *store_cb->add_data / *store_cb->count;
std::cout << "received " << *store_cb->count << " messages ,"
<< "total size = " << *store_cb->add_data << " bytes, "
<< "average size = " << avg_size << " bytes" << std::endl;
assert(*store_cb->add_data == TOTAL_SIZE);
return 0;
}
{
std::unique_lock<std::mutex> u_lock(store_cb->mutex);
store_cb->cond.wait_for(u_lock, std::chrono::milliseconds(WAIT_MILLIS));
}
{
std::unique_lock<std::mutex> u_lock(store_cb->mutex);
store_cb->cond.wait(u_lock,
[store_cb]() { return store_cb->has_closed.load(); });
}
auto avg_size = *store_cb->add_data / *store_cb->count;
std::cout << "received " << *store_cb->count << " messages ,"
<< "total size = " << *store_cb->add_data << " bytes, "
<< "average size = " << avg_size << " bytes" << std::endl;
assert(*store_cb->add_data == TOTAL_SIZE);
return 0;
}
23 changes: 10 additions & 13 deletions tests/test_transfer_data_mid_async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,15 @@ int test_transfer_data_mid_async(int argc, char **argv) {
store.connect_to_addr(addr);

// Wait for the connection to close
while (true) {
if (store_cb->has_closed.load()) {
auto avg_size = *store_cb->add_data / *store_cb->count;
std::cout << "received " << *store_cb->count << " messages ,"
<< "total size = " << *store_cb->add_data << " bytes, "
<< "average size = " << avg_size << " bytes" << std::endl;
assert(*store_cb->add_data == TOTAL_SIZE);
return 0;
}
{
std::unique_lock<std::mutex> u_lock(store_cb->mutex);
store_cb->cond.wait_for(u_lock, std::chrono::milliseconds(WAIT_MILLIS));
}
{
std::unique_lock<std::mutex> u_lock(store_cb->mutex);
store_cb->cond.wait(u_lock,
[store_cb]() { return store_cb->has_closed.load(); });
}
auto avg_size = *store_cb->add_data / *store_cb->count;
std::cout << "received " << *store_cb->count << " messages ,"
<< "total size = " << *store_cb->add_data << " bytes, "
<< "average size = " << avg_size << " bytes" << std::endl;
assert(*store_cb->add_data == TOTAL_SIZE);
return 0;
}
23 changes: 10 additions & 13 deletions tests/test_transfer_data_mid_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,15 @@ int test_transfer_data_mid_block(int argc, char **argv) {
store.connect_to_addr(addr);

// Wait for the connection to close
while (true) {
if (store_cb->has_closed.load()) {
auto avg_size = *store_cb->add_data / *store_cb->count;
std::cout << "received " << *store_cb->count << " messages ,"
<< "total size = " << *store_cb->add_data << " bytes, "
<< "average size = " << avg_size << " bytes" << std::endl;
assert(*store_cb->add_data == TOTAL_SIZE);
return 0;
}
{
std::unique_lock<std::mutex> u_lock(store_cb->mutex);
store_cb->cond.wait_for(u_lock, std::chrono::milliseconds(WAIT_MILLIS));
}
{
std::unique_lock<std::mutex> u_lock(store_cb->mutex);
store_cb->cond.wait(u_lock,
[store_cb]() { return store_cb->has_closed.load(); });
}
auto avg_size = *store_cb->add_data / *store_cb->count;
std::cout << "received " << *store_cb->count << " messages ,"
<< "total size = " << *store_cb->add_data << " bytes, "
<< "average size = " << avg_size << " bytes" << std::endl;
assert(*store_cb->add_data == TOTAL_SIZE);
return 0;
}
23 changes: 10 additions & 13 deletions tests/test_transfer_data_mid_no_flush.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,15 @@ int test_transfer_data_mid_no_flush(int argc, char **argv) {
store.connect_to_addr(addr);

// Wait for the connection to close
while (true) {
if (store_cb->has_closed.load()) {
auto avg_size = *store_cb->add_data / *store_cb->count;
std::cout << "received " << *store_cb->count << " messages ,"
<< "total size = " << *store_cb->add_data << " bytes, "
<< "average size = " << avg_size << " bytes" << std::endl;
assert(*store_cb->add_data == TOTAL_SIZE);
return 0;
}
{
std::unique_lock<std::mutex> u_lock(store_cb->mutex);
store_cb->cond.wait_for(u_lock, std::chrono::milliseconds(WAIT_MILLIS));
}
{
std::unique_lock<std::mutex> u_lock(store_cb->mutex);
store_cb->cond.wait(u_lock,
[store_cb]() { return store_cb->has_closed.load(); });
}
auto avg_size = *store_cb->add_data / *store_cb->count;
std::cout << "received " << *store_cb->count << " messages ,"
<< "total size = " << *store_cb->add_data << " bytes, "
<< "average size = " << avg_size << " bytes" << std::endl;
assert(*store_cb->add_data == TOTAL_SIZE);
return 0;
}
23 changes: 10 additions & 13 deletions tests/test_transfer_data_mid_nonblock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,15 @@ int test_transfer_data_mid_nonblock(int argc, char **argv) {
store.connect_to_addr(addr);

// Wait for the connection to close
while (true) {
if (store_cb->has_closed.load()) {
auto avg_size = *store_cb->add_data / *store_cb->count;
std::cout << "received " << *store_cb->count << " messages ,"
<< "total size = " << *store_cb->add_data << " bytes, "
<< "average size = " << avg_size << " bytes" << std::endl;
assert(*store_cb->add_data == TOTAL_SIZE);
return 0;
}
{
std::unique_lock<std::mutex> u_lock(store_cb->mutex);
store_cb->cond.wait_for(u_lock, std::chrono::milliseconds(WAIT_MILLIS));
}
{
std::unique_lock<std::mutex> u_lock(store_cb->mutex);
store_cb->cond.wait(u_lock,
[store_cb]() { return store_cb->has_closed.load(); });
}
auto avg_size = *store_cb->add_data / *store_cb->count;
std::cout << "received " << *store_cb->count << " messages ,"
<< "total size = " << *store_cb->add_data << " bytes, "
<< "average size = " << avg_size << " bytes" << std::endl;
assert(*store_cb->add_data == TOTAL_SIZE);
return 0;
}
23 changes: 10 additions & 13 deletions tests/test_transfer_data_small_async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,15 @@ int test_transfer_data_small_async(int argc, char **argv) {
store.connect_to_addr(addr);

// Wait for the connection to close
while (true) {
if (store_cb->has_closed.load()) {
auto avg_size = *store_cb->add_data / *store_cb->count;
std::cout << "received " << *store_cb->count << " messages ,"
<< "total size = " << *store_cb->add_data << " bytes, "
<< "average size = " << avg_size << " bytes" << std::endl;
assert(*store_cb->add_data == TOTAL_SIZE);
return 0;
}
{
std::unique_lock<std::mutex> u_lock(store_cb->mutex);
store_cb->cond.wait_for(u_lock, std::chrono::milliseconds(WAIT_MILLIS));
}
{
std::unique_lock<std::mutex> u_lock(store_cb->mutex);
store_cb->cond.wait(u_lock,
[store_cb]() { return store_cb->has_closed.load(); });
}
auto avg_size = *store_cb->add_data / *store_cb->count;
std::cout << "received " << *store_cb->count << " messages ,"
<< "total size = " << *store_cb->add_data << " bytes, "
<< "average size = " << avg_size << " bytes" << std::endl;
assert(*store_cb->add_data == TOTAL_SIZE);
return 0;
}
23 changes: 10 additions & 13 deletions tests/test_transfer_data_small_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,15 @@ int test_transfer_data_small_block(int argc, char **argv) {
store.connect_to_addr(addr);

// Wait for the connection to close
while (true) {
if (store_cb->has_closed.load()) {
auto avg_size = *store_cb->add_data / *store_cb->count;
std::cout << "received " << *store_cb->count << " messages ,"
<< "total size = " << *store_cb->add_data << " bytes, "
<< "average size = " << avg_size << " bytes" << std::endl;
assert(*store_cb->add_data == TOTAL_SIZE);
return 0;
}
{
std::unique_lock<std::mutex> u_lock(store_cb->mutex);
store_cb->cond.wait_for(u_lock, std::chrono::milliseconds(WAIT_MILLIS));
}
{
std::unique_lock<std::mutex> u_lock(store_cb->mutex);
store_cb->cond.wait(u_lock,
[store_cb]() { return store_cb->has_closed.load(); });
}
auto avg_size = *store_cb->add_data / *store_cb->count;
std::cout << "received " << *store_cb->count << " messages ,"
<< "total size = " << *store_cb->add_data << " bytes, "
<< "average size = " << avg_size << " bytes" << std::endl;
assert(*store_cb->add_data == TOTAL_SIZE);
return 0;
}
23 changes: 10 additions & 13 deletions tests/test_transfer_data_small_no_flush.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,15 @@ int test_transfer_data_small_no_flush(int argc, char **argv) {
store.connect_to_addr(addr);

// Wait for the connection to close
while (true) {
if (store_cb->has_closed.load()) {
auto avg_size = *store_cb->add_data / *store_cb->count;
std::cout << "received " << *store_cb->count << " messages ,"
<< "total size = " << *store_cb->add_data << " bytes, "
<< "average size = " << avg_size << " bytes" << std::endl;
assert(*store_cb->add_data == TOTAL_SIZE);
return 0;
}
{
std::unique_lock<std::mutex> u_lock(store_cb->mutex);
store_cb->cond.wait_for(u_lock, std::chrono::milliseconds(WAIT_MILLIS));
}
{
std::unique_lock<std::mutex> u_lock(store_cb->mutex);
store_cb->cond.wait(u_lock,
[store_cb]() { return store_cb->has_closed.load(); });
}
auto avg_size = *store_cb->add_data / *store_cb->count;
std::cout << "received " << *store_cb->count << " messages ,"
<< "total size = " << *store_cb->add_data << " bytes, "
<< "average size = " << avg_size << " bytes" << std::endl;
assert(*store_cb->add_data == TOTAL_SIZE);
return 0;
}
23 changes: 10 additions & 13 deletions tests/test_transfer_data_small_nonblock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,15 @@ int test_transfer_data_small_nonblock(int argc, char **argv) {
store.connect_to_addr(addr);

// Wait for the connection to close
while (true) {
if (store_cb->has_closed.load()) {
auto avg_size = *store_cb->add_data / *store_cb->count;
std::cout << "received " << *store_cb->count << " messages ,"
<< "total size = " << *store_cb->add_data << " bytes, "
<< "average size = " << avg_size << " bytes" << std::endl;
assert(*store_cb->add_data == TOTAL_SIZE);
return 0;
}
{
std::unique_lock<std::mutex> u_lock(store_cb->mutex);
store_cb->cond.wait_for(u_lock, std::chrono::milliseconds(WAIT_MILLIS));
}
{
std::unique_lock<std::mutex> u_lock(store_cb->mutex);
store_cb->cond.wait(u_lock,
[store_cb]() { return store_cb->has_closed.load(); });
}
auto avg_size = *store_cb->add_data / *store_cb->count;
std::cout << "received " << *store_cb->count << " messages ,"
<< "total size = " << *store_cb->add_data << " bytes, "
<< "average size = " << avg_size << " bytes" << std::endl;
assert(*store_cb->add_data == TOTAL_SIZE);
return 0;
}
6 changes: 3 additions & 3 deletions tests/transfer_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,15 @@ class SendAsyncCB : public DoNothingConnCallback {
std::string data = transfer_private::make_test_message(msg_size);
size_t msg_count = total_size / msg_size;

conn->start(std::move(rcv));
conn->start(std::move(rcv), waker);
std::thread([sender, msg_count, data, sem]() {
int progress = 0;
size_t offset = 0;
std::string_view data_view(data);
while (progress < msg_count) {
auto sent = sender->send_async(data_view.substr(offset));
if (sent < 0) {
sem->wait(WAIT_MILLIS);
if (sent == PENDING) {
sem->wait();
} else {
offset += sent;
}
Expand Down

0 comments on commit 85e987f

Please sign in to comment.