Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't call std::terminate on connection drop #36

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions cdk/mysqlx/result.cc
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,13 @@ void Stmt_op::discard_result()

// Finish current activity to see if we have any pending rows.

wait();
try {
wait();
} catch (...) {
m_op = nullptr;
m_state = ERROR;
}

assert(!m_op || ERROR == m_state);

switch (m_state)
Expand Down Expand Up @@ -615,8 +621,10 @@ void Cursor::close()
{
if (m_reply && this == m_reply->m_current_cursor)
{
if (m_rows_op)
m_rows_op->wait();
try {
if (m_rows_op)
m_rows_op->wait();
} catch (...) {}
m_rows_op = nullptr;

/*
Expand Down
53 changes: 52 additions & 1 deletion devapi/tests/session-t.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <thread>
#include <map>
#include <sstream>
#include <iomanip> // std::setfill

using std::cout;
using std::endl;
Expand Down Expand Up @@ -4329,4 +4330,54 @@ TEST_F(Sess, MACRO_VERSION)
EXPECT_EQ(version_orig.str(), version_generated.str());

#endif
}
}

TEST_F(Sess, connection_drop)
{
SKIP_IF_NO_XPLUGIN;

try {
get_sess().dropSchema("connection_drop");
} catch (...) {}

EXPECT_TRUE(get_sess().createSchema("connection_drop").existsInDatabase());

std::string sql;
sql = R"sql(
CREATE TABLE connection_drop.t(
id INT PRIMARY KEY AUTO_INCREMENT,
hash CHAR(64)
))sql";
get_sess().sql(sql).execute();
// This closes connection after 1 second of inactivity
get_sess().sql("SET SESSION mysqlx_write_timeout = 1").execute();
get_sess().sql("SET SESSION cte_max_recursion_depth = 2000000").execute();
// Generate enough data to spend some time in executing the query
sql = R"sql(
INSERT INTO connection_drop.t(hash)
WITH RECURSIVE cte (n) AS
(
SELECT 1
UNION ALL
SELECT n + 1 FROM cte WHERE n < 2000000
)
SELECT SHA2(n, 256) FROM cte
)sql";
get_sess().sql(sql).execute();

std::thread t([&] {
// Force 100% packet loss for XPLUGIN_PORT
system("tc qdisc add dev lo root handle 1: prio priomap 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0");
system("tc qdisc add dev lo parent 1:2 handle 20: netem loss 100%");
system(std::string("tc filter add dev lo parent 1:0 protocol ip u32 match ip sport " + std::to_string(get_port()) + " 0xffff flowid 1:2").c_str());
std::this_thread::sleep_for(std::chrono::seconds(5));
system("tc qdisc del dev lo root");
// Crash should be after this
});

get_sess().sql("SELECT * FROM connection_drop.t").execute();
t.join();
try {
get_sess().dropSchema("connection_drop");
} catch (...) {}
}