diff --git a/orm_lib/src/DbClientImpl.cc b/orm_lib/src/DbClientImpl.cc index 5e154b21c9..730c7c75fc 100644 --- a/orm_lib/src/DbClientImpl.cc +++ b/orm_lib/src/DbClientImpl.cc @@ -81,10 +81,7 @@ void DbClientImpl::init() for (size_t i = 0; i < numberOfConnections_; ++i) { auto loop = loops_.getNextLoop(); - loop->runInLoop([this, loop]() { - std::lock_guard lock(connectionsMutex_); - connections_.insert(newConnection(loop)); - }); + loop->runInLoop([this, loop]() { newConnection(loop); }); } } else if (type_ == ClientType::Sqlite3) @@ -92,10 +89,9 @@ void DbClientImpl::init() sharedMutexPtr_ = std::make_shared(); assert(sharedMutexPtr_); - std::lock_guard lock(connectionsMutex_); for (size_t i = 0; i < numberOfConnections_; ++i) { - connections_.insert(newConnection(nullptr)); + newConnection(nullptr); } } } @@ -405,12 +401,9 @@ DbConnectionPtr DbClientImpl::newConnection(trantor::EventLoop *loop) else if (type_ == ClientType::Sqlite3) { #if USE_SQLITE3 - auto sqlite3ConnPtr = - std::make_shared(loop, - connectionInfo_, - sharedMutexPtr_); - sqlite3ConnPtr->init(); - connPtr = sqlite3ConnPtr; + connPtr = std::make_shared(loop, + connectionInfo_, + sharedMutexPtr_); #else return nullptr; #endif @@ -440,8 +433,8 @@ DbConnectionPtr DbClientImpl::newConnection(trantor::EventLoop *loop) auto thisPtr = weakPtr.lock(); if (!thisPtr) return; - std::lock_guard guard(thisPtr->connectionsMutex_); - thisPtr->connections_.insert(thisPtr->newConnection(loop)); + + thisPtr->newConnection(loop); }); }); connPtr->setOkCallback([weakPtr](const DbConnectionPtr &okConnPtr) { @@ -467,6 +460,16 @@ DbConnectionPtr DbClientImpl::newConnection(trantor::EventLoop *loop) return; thisPtr->handleNewTask(connPtr); }); + + { + std::lock_guard guard(connectionsMutex_); + connections_.insert(connPtr); + } + + // Init database connection only after all callbacks are set and connPtr + // is added to connections_. + connPtr->init(); + // std::cout<<"newConn end"<queueInLoop([this]() { for (size_t i = 0; i < numberOfConnections_; ++i) - connectionHolders_.push_back(newConnection()); + newConnection(); }); } else @@ -439,26 +439,17 @@ DbConnectionPtr DbClientLockFree::newConnection() if (!thisPtr) return; - for (auto iter = thisPtr->connections_.begin(); - iter != thisPtr->connections_.end(); - iter++) - { - if (closeConnPtr == *iter) - { - thisPtr->connections_.erase(iter); - break; - } - } - for (auto iter = thisPtr->connectionHolders_.begin(); - iter != thisPtr->connectionHolders_.end(); - iter++) - { - if (closeConnPtr == *iter) - { - thisPtr->connectionHolders_.erase(iter); - break; - } - } + auto iter = std::find(thisPtr->connections_.begin(), + thisPtr->connections_.end(), + closeConnPtr); + if (iter != thisPtr->connections_.end()) + thisPtr->connections_.erase(iter); + + iter = std::find(thisPtr->connectionHolders_.begin(), + thisPtr->connectionHolders_.end(), + closeConnPtr); + if (iter != thisPtr->connectionHolders_.end()) + thisPtr->connectionHolders_.erase(iter); thisPtr->transSet_.erase(closeConnPtr); // Reconnect after 1 second @@ -466,7 +457,7 @@ DbConnectionPtr DbClientLockFree::newConnection() auto thisPtr = weakPtr.lock(); if (!thisPtr) return; - thisPtr->connectionHolders_.push_back(thisPtr->newConnection()); + thisPtr->newConnection(); }); }); connPtr->setOkCallback([weakPtr](const DbConnectionPtr &okConnPtr) { @@ -487,6 +478,13 @@ DbConnectionPtr DbClientLockFree::newConnection() return; thisPtr->handleNewTask(connPtr); }); + + connectionHolders_.push_back(connPtr); + + // Init database connection only after all callbacks are set and connPtr + // is added to connectionHolders_. + connPtr->init(); + // std::cout<<"newConn end"<queueInLoop([this]() { MYSQL *ret; status_ = ConnectStatus::Connecting; @@ -120,10 +124,13 @@ MysqlConnection::MysqlConnection(trantor::EventLoop *loop, auto fd = mysql_get_socket(mysqlPtr_.get()); if (fd < 0) { - LOG_FATAL << "Socket fd < 0, Usually this is because the number of " - "files opened by the program exceeds the system " - "limit. Please use the ulimit command to check."; - exit(1); + LOG_ERROR << "Connection with MySQL could not be established"; + if (closeCallback_) + { + auto thisPtr = shared_from_this(); + closeCallback_(thisPtr); + } + return; } channelPtr_ = std::make_unique(loop_, fd); channelPtr_->setEventCallback([this]() { handleEvent(); }); diff --git a/orm_lib/src/mysql_impl/MysqlConnection.h b/orm_lib/src/mysql_impl/MysqlConnection.h index 7f678513e4..4ec0c618a8 100644 --- a/orm_lib/src/mysql_impl/MysqlConnection.h +++ b/orm_lib/src/mysql_impl/MysqlConnection.h @@ -38,6 +38,8 @@ class MysqlConnection : public DbConnection, public: MysqlConnection(trantor::EventLoop *loop, const std::string &connInfo); + void init() override; + ~MysqlConnection() { } diff --git a/orm_lib/src/postgresql_impl/PgBatchConnection.cc b/orm_lib/src/postgresql_impl/PgBatchConnection.cc index e29f9677df..631ef6907c 100644 --- a/orm_lib/src/postgresql_impl/PgBatchConnection.cc +++ b/orm_lib/src/postgresql_impl/PgBatchConnection.cc @@ -86,14 +86,22 @@ PgConnection::PgConnection(trantor::EventLoop *loop, std::shared_ptr(PQconnectStart(connInfo.c_str()), [](PGconn *conn) { PQfinish(conn); })), channel_(loop, PQsocket(connectionPtr_.get())) +{ +} + +void PgConnection::init() { PQsetnonblocking(connectionPtr_.get(), 1); if (channel_.fd() < 0) { - LOG_FATAL << "Socket fd < 0, Usually this is because the number of " - "files opened by the program exceeds the system " - "limit. Please use the ulimit command to check."; - exit(1); + LOG_ERROR << "Connection with Postgres could not be established"; + + if (closeCallback_) + { + auto thisPtr = shared_from_this(); + closeCallback_(thisPtr); + } + return; } channel_.setReadCallback([this]() { if (status_ == ConnectStatus::Bad) diff --git a/orm_lib/src/postgresql_impl/PgConnection.cc b/orm_lib/src/postgresql_impl/PgConnection.cc index 40c0b252a1..59d080cc71 100644 --- a/orm_lib/src/postgresql_impl/PgConnection.cc +++ b/orm_lib/src/postgresql_impl/PgConnection.cc @@ -64,14 +64,21 @@ PgConnection::PgConnection(trantor::EventLoop *loop, std::shared_ptr(PQconnectStart(connInfo.c_str()), [](PGconn *conn) { PQfinish(conn); })), channel_(loop, PQsocket(connectionPtr_.get())) +{ +} + +void PgConnection::init() { PQsetnonblocking(connectionPtr_.get(), 1); if (channel_.fd() < 0) { - LOG_FATAL << "Socket fd < 0, Usually this is because the number of " - "files opened by the program exceeds the system " - "limit. Please use the ulimit command to check."; - exit(1); + LOG_ERROR << "Connection with Postgres could not be established"; + if (closeCallback_) + { + auto thisPtr = shared_from_this(); + closeCallback_(thisPtr); + } + return; } channel_.setReadCallback([this]() { if (status_ == ConnectStatus::Bad) diff --git a/orm_lib/src/postgresql_impl/PgConnection.h b/orm_lib/src/postgresql_impl/PgConnection.h index 3c70e6a526..b962ec297e 100644 --- a/orm_lib/src/postgresql_impl/PgConnection.h +++ b/orm_lib/src/postgresql_impl/PgConnection.h @@ -45,6 +45,8 @@ class PgConnection : public DbConnection, const std::string &connInfo, bool autoBatch); + void init() override; + void execSql(std::string_view &&sql, size_t paraNum, std::vector &¶meters, diff --git a/orm_lib/src/postgresql_impl/PgListener.cc b/orm_lib/src/postgresql_impl/PgListener.cc index 4c1bde96e7..8a91088ddc 100644 --- a/orm_lib/src/postgresql_impl/PgListener.cc +++ b/orm_lib/src/postgresql_impl/PgListener.cc @@ -302,6 +302,7 @@ PgConnectionPtr PgListener::newConnection( thisPtr->onMessage(channel, message); } }); + connPtr->init(); return connPtr; } diff --git a/orm_lib/src/sqlite3_impl/Sqlite3Connection.h b/orm_lib/src/sqlite3_impl/Sqlite3Connection.h index b9c284303d..8f3ed834af 100644 --- a/orm_lib/src/sqlite3_impl/Sqlite3Connection.h +++ b/orm_lib/src/sqlite3_impl/Sqlite3Connection.h @@ -44,6 +44,8 @@ class Sqlite3Connection : public DbConnection, const std::string &connInfo, const std::shared_ptr &sharedMutex); + void init() override; + void execSql(std::string_view &&sql, size_t paraNum, std::vector &¶meters, @@ -60,7 +62,6 @@ class Sqlite3Connection : public DbConnection, } void disconnect() override; - void init(); private: static std::once_flag once_;