diff --git a/cdk/mysqlx/result.cc b/cdk/mysqlx/result.cc index 474e0c0e7..fc6abbdc0 100644 --- a/cdk/mysqlx/result.cc +++ b/cdk/mysqlx/result.cc @@ -273,7 +273,13 @@ void Stmt_op::discard_result() // Finish current activity to see if we have any pending rows. - wait(); + try { + wait(); + } catch (...) { + m_op = nullptr; + m_state = ERROR; + } + assert(!m_op || ERROR == m_state); switch (m_state) @@ -615,8 +621,10 @@ void Cursor::close() { if (m_reply && this == m_reply->m_current_cursor) { - if (m_rows_op) - m_rows_op->wait(); + try { + if (m_rows_op) + m_rows_op->wait(); + } catch (...) {} m_rows_op = nullptr; /* diff --git a/devapi/tests/session-t.cc b/devapi/tests/session-t.cc index 758a0b8f5..cad59523b 100644 --- a/devapi/tests/session-t.cc +++ b/devapi/tests/session-t.cc @@ -35,6 +35,7 @@ #include #include #include +#include // std::setfill using std::cout; using std::endl; @@ -4329,4 +4330,54 @@ TEST_F(Sess, MACRO_VERSION) EXPECT_EQ(version_orig.str(), version_generated.str()); #endif -} \ No newline at end of file +} + +TEST_F(Sess, connection_drop) +{ + SKIP_IF_NO_XPLUGIN; + + try { + get_sess().dropSchema("connection_drop"); + } catch (...) {} + + EXPECT_TRUE(get_sess().createSchema("connection_drop").existsInDatabase()); + + std::string sql; + sql = R"sql( + CREATE TABLE connection_drop.t( + id INT PRIMARY KEY AUTO_INCREMENT, + hash CHAR(64) + ))sql"; + get_sess().sql(sql).execute(); + // This closes connection after 1 second of inactivity + get_sess().sql("SET SESSION mysqlx_write_timeout = 1").execute(); + get_sess().sql("SET SESSION cte_max_recursion_depth = 2000000").execute(); + // Generate enough data to spend some time in executing the query + sql = R"sql( + INSERT INTO connection_drop.t(hash) + WITH RECURSIVE cte (n) AS + ( + SELECT 1 + UNION ALL + SELECT n + 1 FROM cte WHERE n < 2000000 + ) + SELECT SHA2(n, 256) FROM cte + )sql"; + get_sess().sql(sql).execute(); + + std::thread t([&] { + // Force 100% packet loss for XPLUGIN_PORT + system("tc qdisc add dev lo root handle 1: prio priomap 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0"); + system("tc qdisc add dev lo parent 1:2 handle 20: netem loss 100%"); + system(std::string("tc filter add dev lo parent 1:0 protocol ip u32 match ip sport " + std::to_string(get_port()) + " 0xffff flowid 1:2").c_str()); + std::this_thread::sleep_for(std::chrono::seconds(5)); + system("tc qdisc del dev lo root"); + // Crash should be after this + }); + + get_sess().sql("SELECT * FROM connection_drop.t").execute(); + t.join(); + try { + get_sess().dropSchema("connection_drop"); + } catch (...) {} +}