Skip to content

Commit

Permalink
Implemented database reconnection loop (drogonframework#2003)
Browse files Browse the repository at this point in the history
  • Loading branch information
Demilivor authored Apr 24, 2024
1 parent 519398c commit e79d517
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 49 deletions.
31 changes: 17 additions & 14 deletions orm_lib/src/DbClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,17 @@ void DbClientImpl::init()
for (size_t i = 0; i < numberOfConnections_; ++i)
{
auto loop = loops_.getNextLoop();
loop->runInLoop([this, loop]() {
std::lock_guard<std::mutex> lock(connectionsMutex_);
connections_.insert(newConnection(loop));
});
loop->runInLoop([this, loop]() { newConnection(loop); });
}
}
else if (type_ == ClientType::Sqlite3)
{
sharedMutexPtr_ = std::make_shared<SharedMutex>();
assert(sharedMutexPtr_);

std::lock_guard<std::mutex> lock(connectionsMutex_);
for (size_t i = 0; i < numberOfConnections_; ++i)
{
connections_.insert(newConnection(nullptr));
newConnection(nullptr);
}
}
}
Expand Down Expand Up @@ -405,12 +401,9 @@ DbConnectionPtr DbClientImpl::newConnection(trantor::EventLoop *loop)
else if (type_ == ClientType::Sqlite3)
{
#if USE_SQLITE3
auto sqlite3ConnPtr =
std::make_shared<Sqlite3Connection>(loop,
connectionInfo_,
sharedMutexPtr_);
sqlite3ConnPtr->init();
connPtr = sqlite3ConnPtr;
connPtr = std::make_shared<Sqlite3Connection>(loop,
connectionInfo_,
sharedMutexPtr_);
#else
return nullptr;
#endif
Expand Down Expand Up @@ -440,8 +433,8 @@ DbConnectionPtr DbClientImpl::newConnection(trantor::EventLoop *loop)
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;
std::lock_guard<std::mutex> guard(thisPtr->connectionsMutex_);
thisPtr->connections_.insert(thisPtr->newConnection(loop));

thisPtr->newConnection(loop);
});
});
connPtr->setOkCallback([weakPtr](const DbConnectionPtr &okConnPtr) {
Expand All @@ -467,6 +460,16 @@ DbConnectionPtr DbClientImpl::newConnection(trantor::EventLoop *loop)
return;
thisPtr->handleNewTask(connPtr);
});

{
std::lock_guard<std::mutex> guard(connectionsMutex_);
connections_.insert(connPtr);
}

// Init database connection only after all callbacks are set and connPtr
// is added to connections_.
connPtr->init();

// std::cout<<"newConn end"<<connPtr<<std::endl;
return connPtr;
}
Expand Down
42 changes: 20 additions & 22 deletions orm_lib/src/DbClientLockFree.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ DbClientLockFree::DbClientLockFree(const std::string &connInfo,
{
loop_->queueInLoop([this]() {
for (size_t i = 0; i < numberOfConnections_; ++i)
connectionHolders_.push_back(newConnection());
newConnection();
});
}
else
Expand Down Expand Up @@ -439,34 +439,25 @@ DbConnectionPtr DbClientLockFree::newConnection()
if (!thisPtr)
return;

for (auto iter = thisPtr->connections_.begin();
iter != thisPtr->connections_.end();
iter++)
{
if (closeConnPtr == *iter)
{
thisPtr->connections_.erase(iter);
break;
}
}
for (auto iter = thisPtr->connectionHolders_.begin();
iter != thisPtr->connectionHolders_.end();
iter++)
{
if (closeConnPtr == *iter)
{
thisPtr->connectionHolders_.erase(iter);
break;
}
}
auto iter = std::find(thisPtr->connections_.begin(),
thisPtr->connections_.end(),
closeConnPtr);
if (iter != thisPtr->connections_.end())
thisPtr->connections_.erase(iter);

iter = std::find(thisPtr->connectionHolders_.begin(),
thisPtr->connectionHolders_.end(),
closeConnPtr);
if (iter != thisPtr->connectionHolders_.end())
thisPtr->connectionHolders_.erase(iter);

thisPtr->transSet_.erase(closeConnPtr);
// Reconnect after 1 second
thisPtr->loop_->runAfter(1, [weakPtr, closeConnPtr] {
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;
thisPtr->connectionHolders_.push_back(thisPtr->newConnection());
thisPtr->newConnection();
});
});
connPtr->setOkCallback([weakPtr](const DbConnectionPtr &okConnPtr) {
Expand All @@ -487,6 +478,13 @@ DbConnectionPtr DbClientLockFree::newConnection()
return;
thisPtr->handleNewTask(connPtr);
});

connectionHolders_.push_back(connPtr);

// Init database connection only after all callbacks are set and connPtr
// is added to connectionHolders_.
connPtr->init();

// std::cout<<"newConn end"<<connPtr<<std::endl;
return connPtr;
}
Expand Down
2 changes: 2 additions & 0 deletions orm_lib/src/DbConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ class DbConnection : public trantor::NonCopyable
{
}

virtual void init(){};

void setOkCallback(const DbConnectionCallback &cb)
{
okCallback_ = cb;
Expand Down
15 changes: 11 additions & 4 deletions orm_lib/src/mysql_impl/MysqlConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ MysqlConnection::MysqlConnection(trantor::EventLoop *loop,
characterSet_ = value;
}
}
}

void MysqlConnection::init()
{
loop_->queueInLoop([this]() {
MYSQL *ret;
status_ = ConnectStatus::Connecting;
Expand All @@ -120,10 +124,13 @@ MysqlConnection::MysqlConnection(trantor::EventLoop *loop,
auto fd = mysql_get_socket(mysqlPtr_.get());
if (fd < 0)
{
LOG_FATAL << "Socket fd < 0, Usually this is because the number of "
"files opened by the program exceeds the system "
"limit. Please use the ulimit command to check.";
exit(1);
LOG_ERROR << "Connection with MySQL could not be established";
if (closeCallback_)
{
auto thisPtr = shared_from_this();
closeCallback_(thisPtr);
}
return;
}
channelPtr_ = std::make_unique<trantor::Channel>(loop_, fd);
channelPtr_->setEventCallback([this]() { handleEvent(); });
Expand Down
2 changes: 2 additions & 0 deletions orm_lib/src/mysql_impl/MysqlConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class MysqlConnection : public DbConnection,
public:
MysqlConnection(trantor::EventLoop *loop, const std::string &connInfo);

void init() override;

~MysqlConnection()
{
}
Expand Down
16 changes: 12 additions & 4 deletions orm_lib/src/postgresql_impl/PgBatchConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,22 @@ PgConnection::PgConnection(trantor::EventLoop *loop,
std::shared_ptr<PGconn>(PQconnectStart(connInfo.c_str()),
[](PGconn *conn) { PQfinish(conn); })),
channel_(loop, PQsocket(connectionPtr_.get()))
{
}

void PgConnection::init()
{
PQsetnonblocking(connectionPtr_.get(), 1);
if (channel_.fd() < 0)
{
LOG_FATAL << "Socket fd < 0, Usually this is because the number of "
"files opened by the program exceeds the system "
"limit. Please use the ulimit command to check.";
exit(1);
LOG_ERROR << "Connection with Postgres could not be established";

if (closeCallback_)
{
auto thisPtr = shared_from_this();
closeCallback_(thisPtr);
}
return;
}
channel_.setReadCallback([this]() {
if (status_ == ConnectStatus::Bad)
Expand Down
15 changes: 11 additions & 4 deletions orm_lib/src/postgresql_impl/PgConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,21 @@ PgConnection::PgConnection(trantor::EventLoop *loop,
std::shared_ptr<PGconn>(PQconnectStart(connInfo.c_str()),
[](PGconn *conn) { PQfinish(conn); })),
channel_(loop, PQsocket(connectionPtr_.get()))
{
}

void PgConnection::init()
{
PQsetnonblocking(connectionPtr_.get(), 1);
if (channel_.fd() < 0)
{
LOG_FATAL << "Socket fd < 0, Usually this is because the number of "
"files opened by the program exceeds the system "
"limit. Please use the ulimit command to check.";
exit(1);
LOG_ERROR << "Connection with Postgres could not be established";
if (closeCallback_)
{
auto thisPtr = shared_from_this();
closeCallback_(thisPtr);
}
return;
}
channel_.setReadCallback([this]() {
if (status_ == ConnectStatus::Bad)
Expand Down
2 changes: 2 additions & 0 deletions orm_lib/src/postgresql_impl/PgConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class PgConnection : public DbConnection,
const std::string &connInfo,
bool autoBatch);

void init() override;

void execSql(std::string_view &&sql,
size_t paraNum,
std::vector<const char *> &&parameters,
Expand Down
1 change: 1 addition & 0 deletions orm_lib/src/postgresql_impl/PgListener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ PgConnectionPtr PgListener::newConnection(
thisPtr->onMessage(channel, message);
}
});
connPtr->init();
return connPtr;
}

Expand Down
3 changes: 2 additions & 1 deletion orm_lib/src/sqlite3_impl/Sqlite3Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class Sqlite3Connection : public DbConnection,
const std::string &connInfo,
const std::shared_ptr<SharedMutex> &sharedMutex);

void init() override;

void execSql(std::string_view &&sql,
size_t paraNum,
std::vector<const char *> &&parameters,
Expand All @@ -60,7 +62,6 @@ class Sqlite3Connection : public DbConnection,
}

void disconnect() override;
void init();

private:
static std::once_flag once_;
Expand Down

0 comments on commit e79d517

Please sign in to comment.