From 91bf4875c3d6b7163a98c613e207f1a49887ea44 Mon Sep 17 00:00:00 2001 From: liuchao412 Date: Tue, 5 Sep 2023 15:04:01 +0800 Subject: [PATCH] update code --- Common/ThreadQueue.h | 2 - Common/VersionConfig.h | 4 +- Common/consoleoutput.hpp | 2 +- Common/define.h | 6 +-- PSS_ASIO/Common/IoContextPool.cpp | 2 +- PSS_ASIO/Common/NetSvrManager.cpp | 1 - PSS_ASIO/Common/serverconfig.cpp | 2 +- PSS_ASIO/Common/serverconfigtype.h | 2 +- PSS_ASIO/Message/Iobridge.h | 50 ++++++++++----------- PSS_ASIO/Message/LoadModule.h | 6 +-- PSS_ASIO/Message/SessionInterface.cpp | 4 +- PSS_ASIO/Message/SessionService.cpp | 2 +- PSS_ASIO/TcpSession/TcpClientSession.cpp | 11 +++-- PSS_ASIO/TcpSession/TcpSSLClientSession.cpp | 7 ++- PSS_ASIO/TcpSession/TcpSSLSession.cpp | 4 ++ PSS_ASIO/TcpSession/TcpServer.cpp | 32 +++++++++---- PSS_ASIO/TcpSession/TcpServer.h | 7 +-- PSS_ASIO/TcpSession/TcpSession.cpp | 5 ++- PSS_ASIO/UdpSession/KcpServer.cpp | 4 ++ PSS_ASIO/UdpSession/UdpClientSession.cpp | 32 +++++++++---- PSS_ASIO/UdpSession/UdpServer.cpp | 39 +++++++++++++--- PSS_ASIO/UdpSession/UdpServer.h | 4 +- 22 files changed, 153 insertions(+), 75 deletions(-) diff --git a/Common/ThreadQueue.h b/Common/ThreadQueue.h index 1ed9cbc..51586f8 100644 --- a/Common/ThreadQueue.h +++ b/Common/ThreadQueue.h @@ -32,7 +32,6 @@ class CMessageQueue while (_queue.empty()) { _condition.wait(lock); - } //注意这一段必须放在if语句中,因为lock的生命域仅仅在if大括号内 msg = std::move(_queue.front()); @@ -46,7 +45,6 @@ class CMessageQueue if (_queue.empty()) return false; - msg = std::move(_queue.front()); _queue.pop(); return true; diff --git a/Common/VersionConfig.h b/Common/VersionConfig.h index 7eff790..61e535c 100644 --- a/Common/VersionConfig.h +++ b/Common/VersionConfig.h @@ -1,3 +1,3 @@ //VersionConfig.h.in -#define V_BUILD_TIME "2023-09-04_10:34:07" -#define V_GIT_INFO "master_v3.0.0-186-g1bed1b52" +#define V_BUILD_TIME "2023-08-01_17:54:37" +#define V_GIT_INFO "master_v3.0.0-168-gda97086a" diff --git a/Common/consoleoutput.hpp b/Common/consoleoutput.hpp index 55396cf..6f22d36 100644 --- a/Common/consoleoutput.hpp +++ b/Common/consoleoutput.hpp @@ -1,4 +1,4 @@ -#ifndef PSS_CONSOLE_OUTPUT_H +#ifndef PSS_CONSOLE_OUTPUT_H #define PSS_CONSOLE_OUTPUT_H //屏幕输出管控 diff --git a/Common/define.h b/Common/define.h index fab4e36..c392676 100644 --- a/Common/define.h +++ b/Common/define.h @@ -89,7 +89,7 @@ enum class EM_SESSION_STATE class CConfigNetIO { public: - std::string ip_; + std::string ip_ = ""; io_port_type port_ = 0; std::string protocol_type_ = "TCP"; unsigned int packet_parse_id_ = 0; @@ -269,7 +269,7 @@ inline vector string_split(const string& srcStr, const string& deli vector vec; string strtmp = srcStr; nPos = strtmp.find(delim.c_str()); - while(-1 != nPos) + while(string::npos != nPos) { string temp = strtmp.substr(0, nPos); vec.push_back(temp); @@ -298,7 +298,7 @@ inline void bind_thread_to_cpu(std::thread* logic_thread) int rc =pthread_setaffinity_np(logic_thread->native_handle(), sizeof(cpu_set_t), &cpuset); if (rc != 0) { - PSS_LOGGER_ERROR("[bind_thread_to_cpu]Error calling pthread_setaffinity_np:{}",rc); + PSS_LOGGER_ERROR("[bind_thread_to_cpu]Error calling pthread_setaffinity_np:{}",rc); } #else auto mask = SetThreadAffinityMask(logic_thread->native_handle(), (cpuidx++) % cpunum); diff --git a/PSS_ASIO/Common/IoContextPool.cpp b/PSS_ASIO/Common/IoContextPool.cpp index 98ec542..c67e848 100644 --- a/PSS_ASIO/Common/IoContextPool.cpp +++ b/PSS_ASIO/Common/IoContextPool.cpp @@ -32,7 +32,7 @@ void CIoContextPool::run() threads.push_back(thread); //CPU󶨹ϵ - if (App_ServerConfig::instance()->get_config_workthread().logic_thread_bind_cpu != 0) + if (App_ServerConfig::instance()->get_config_workthread().logic_thread_bind_cpu_ != 0) { bind_thread_to_cpu(thread.get()); } diff --git a/PSS_ASIO/Common/NetSvrManager.cpp b/PSS_ASIO/Common/NetSvrManager.cpp index c7cbdf8..60e9fbd 100644 --- a/PSS_ASIO/Common/NetSvrManager.cpp +++ b/PSS_ASIO/Common/NetSvrManager.cpp @@ -7,7 +7,6 @@ CNetSvrManager::CNetSvrManager() CNetSvrManager::~CNetSvrManager() { - this->close_all_service(); } void CNetSvrManager::start_default_service() diff --git a/PSS_ASIO/Common/serverconfig.cpp b/PSS_ASIO/Common/serverconfig.cpp index 44745f2..2b08e2f 100644 --- a/PSS_ASIO/Common/serverconfig.cpp +++ b/PSS_ASIO/Common/serverconfig.cpp @@ -15,7 +15,7 @@ bool CServerConfig::read_server_config_file(const std::string& file_name) config_work_thread_.client_connect_timeout_ = config_work_thread["client connect timeout"]; config_work_thread_.linux_daemonize_ = config_work_thread["linux daemonize"]; config_work_thread_.io_send_time_check_ = config_work_thread["IO send data check"]; - config_work_thread_.logic_thread_bind_cpu = config_work_thread["logic thread bind cpu"]; + config_work_thread_.logic_thread_bind_cpu_ = config_work_thread["logic thread bind cpu"]; for (auto packet_parse : json_config["packet parse library"]) { diff --git a/PSS_ASIO/Common/serverconfigtype.h b/PSS_ASIO/Common/serverconfigtype.h index 8a4f2dd..6939406 100644 --- a/PSS_ASIO/Common/serverconfigtype.h +++ b/PSS_ASIO/Common/serverconfigtype.h @@ -17,7 +17,7 @@ class CConfigWorkThread int s2s_timeout_seconds_ = 60; int client_connect_timeout_ = 0; int io_send_time_check_ = 0; - int logic_thread_bind_cpu = 0; //0为不绑定CPU,1为绑定 + int logic_thread_bind_cpu_ = 0; //0为不绑定CPU,1为绑定 }; class CConfigPacketParseInfo diff --git a/PSS_ASIO/Message/Iobridge.h b/PSS_ASIO/Message/Iobridge.h index bdc6eff..adcaec1 100644 --- a/PSS_ASIO/Message/Iobridge.h +++ b/PSS_ASIO/Message/Iobridge.h @@ -1,25 +1,25 @@ -#pragma once - -//ṩIOŽӷע -//add by freeeyes - -#include "IotoIo.h" -#include "ModuleLogic.h" - -class CIoBridge : public IIoBridge -{ -public: - virtual ~CIoBridge() = default; - - bool add_session_io_mapping(const _ClientIPInfo& from_io, EM_CONNECT_IO_TYPE from_io_type, const _ClientIPInfo& to_io, EM_CONNECT_IO_TYPE to_io_type, ENUM_IO_BRIDGE_TYPE bridge_type = ENUM_IO_BRIDGE_TYPE::IO_BRIDGE_BATH) final; - bool delete_session_io_mapping(const _ClientIPInfo& from_io, EM_CONNECT_IO_TYPE from_io_type) final; - - bool regedit_bridge_session_id(const _ClientIPInfo& from_io, EM_CONNECT_IO_TYPE io_type, uint32 session_id); - void unregedit_bridge_session_id(const _ClientIPInfo& from_io, EM_CONNECT_IO_TYPE io_type); - uint32 get_to_session_id(uint32 session_id, const _ClientIPInfo& from_io); - -private: - CIotoIo iotoio_; -}; - -using App_IoBridge = PSS_singleton; +#pragma once + +//提供IO桥接服务注册 +//add by freeeyes + +#include "IotoIo.h" +#include "ModuleLogic.h" + +class CIoBridge : public IIoBridge +{ +public: + virtual ~CIoBridge() = default; + + bool add_session_io_mapping(const _ClientIPInfo& from_io, EM_CONNECT_IO_TYPE from_io_type, const _ClientIPInfo& to_io, EM_CONNECT_IO_TYPE to_io_type, ENUM_IO_BRIDGE_TYPE bridge_type = ENUM_IO_BRIDGE_TYPE::IO_BRIDGE_BATH) final; + bool delete_session_io_mapping(const _ClientIPInfo& from_io, EM_CONNECT_IO_TYPE from_io_type) final; + + bool regedit_bridge_session_id(const _ClientIPInfo& from_io, EM_CONNECT_IO_TYPE io_type, uint32 session_id); + void unregedit_bridge_session_id(const _ClientIPInfo& from_io, EM_CONNECT_IO_TYPE io_type); + uint32 get_to_session_id(uint32 session_id, const _ClientIPInfo& from_io); + +private: + CIotoIo iotoio_; +}; + +using App_IoBridge = PSS_singleton; diff --git a/PSS_ASIO/Message/LoadModule.h b/PSS_ASIO/Message/LoadModule.h index 8dd3384..7ecbfe7 100644 --- a/PSS_ASIO/Message/LoadModule.h +++ b/PSS_ASIO/Message/LoadModule.h @@ -75,10 +75,10 @@ class CLoadModule void delete_module_name_list(const string& module_name); using hashmapModuleList = unordered_map>; - hashmapModuleList module_list_; - vector module_name_list_; //当前插件名称列表 + hashmapModuleList module_list_; + vector module_name_list_; //当前插件名称列表 command_to_module_function command_to_module_function_; plugin_name_to_module_run plugin_name_to_module_run_; - ISessionService* session_service_; + ISessionService* session_service_ = nullptr; }; diff --git a/PSS_ASIO/Message/SessionInterface.cpp b/PSS_ASIO/Message/SessionInterface.cpp index 321b3d5..2e1e194 100644 --- a/PSS_ASIO/Message/SessionInterface.cpp +++ b/PSS_ASIO/Message/SessionInterface.cpp @@ -73,7 +73,7 @@ void CSessionInterface::check_session_io_timeout(uint32 connect_timeout, vector< std::chrono::duration> elapsed = check_connect_time_ - session_io.second.session_->get_recv_time(); if (elapsed.count() >= connect_timeout) { - PSS_LOGGER_INFO("[CSessionInterface::check_session_io_timeout]elapsed={0}.", elapsed.count()); + PSS_LOGGER_INFO("[CSessionInterface::check_session_io_timeout]connectid={},elapsed={}.",session_io.first, elapsed.count()); CSessionIO_Cancel session_cancel; session_cancel.session_id_ = session_io.first; @@ -88,7 +88,7 @@ void CSessionInterface::check_session_io_timeout(uint32 connect_timeout, vector< std::chrono::duration> elapsed = check_connect_time_ - session_io.second.session_->get_recv_time(session_io.first); if (elapsed.count() >= connect_timeout) { - PSS_LOGGER_INFO("[CSessionInterface::check_session_io_timeout]elapsed={0}.", elapsed.count()); + PSS_LOGGER_INFO("[CSessionInterface::check_session_io_timeout]connectid={},elapsed={}.",session_io.first, elapsed.count()); CSessionIO_Cancel session_cancel; session_cancel.session_id_ = session_io.first; diff --git a/PSS_ASIO/Message/SessionService.cpp b/PSS_ASIO/Message/SessionService.cpp index c7d140b..f355e64 100644 --- a/PSS_ASIO/Message/SessionService.cpp +++ b/PSS_ASIO/Message/SessionService.cpp @@ -199,7 +199,7 @@ void CSessionService::regedit_bridge_session_id(uint32 connect_id) { if(connect_id == 0) { - PSS_LOGGER_INFO("[CSessionService::regedit_bridge_session_id]server id must over 0, regedit_bridge_session_id fail."); + PSS_LOGGER_WARN("[CSessionService::regedit_bridge_session_id]server id must over 0, regedit_bridge_session_id fail."); return; } else diff --git a/PSS_ASIO/TcpSession/TcpClientSession.cpp b/PSS_ASIO/TcpSession/TcpClientSession.cpp index 4d05440..d6bbdf8 100644 --- a/PSS_ASIO/TcpSession/TcpClientSession.cpp +++ b/PSS_ASIO/TcpSession/TcpClientSession.cpp @@ -47,8 +47,7 @@ bool CTcpClientSession::start(const CConnect_IO_Info& io_info) //异步链接 tcp::resolver::results_type::iterator endpoint_iter; - socket_.async_connect(end_point, std::bind(&CTcpClientSession::handle_connect, - this, std::placeholders::_1, endpoint_iter)); + socket_.async_connect(end_point, std::bind(&CTcpClientSession::handle_connect, this, std::placeholders::_1, endpoint_iter)); return true; } @@ -337,8 +336,12 @@ void CTcpClientSession::handle_connect(const asio::error_code& ec, tcp::resolver local_ip_.m_strClientIP = socket_.local_endpoint().address().to_string(); local_ip_.m_u2Port = socket_.local_endpoint().port(); - PSS_LOGGER_DEBUG("[CTcpClientSession::start]remote({0}:{1})", remote_ip_.m_strClientIP, remote_ip_.m_u2Port); - PSS_LOGGER_DEBUG("[CTcpClientSession::start]local({0}:{1})", local_ip_.m_strClientIP, local_ip_.m_u2Port); + PSS_LOGGER_INFO("[CTcpClientSession::handle_connect]connect_id:{} remote[{}:{}] local[{}:{}]", + connect_id_, + remote_ip_.m_strClientIP, + remote_ip_.m_u2Port, + local_ip_.m_strClientIP, + local_ip_.m_u2Port); packet_parse_interface_->packet_connect_ptr_(connect_id_, remote_ip_, local_ip_, io_type_, App_IoBridge::instance()); diff --git a/PSS_ASIO/TcpSession/TcpSSLClientSession.cpp b/PSS_ASIO/TcpSession/TcpSSLClientSession.cpp index 188b71b..0a1054b 100644 --- a/PSS_ASIO/TcpSession/TcpSSLClientSession.cpp +++ b/PSS_ASIO/TcpSession/TcpSSLClientSession.cpp @@ -30,8 +30,7 @@ bool CTcpSSLClientSession::start(const CConnect_IO_Info& io_info) } ssl_socket_.set_verify_mode(asio::ssl::verify_peer); - ssl_socket_.set_verify_callback( - std::bind(&CTcpSSLClientSession::verify_certificate, this, _1, _2)); + ssl_socket_.set_verify_callback(std::bind(&CTcpSSLClientSession::verify_certificate, this, _1, _2)); //建立连接(异步) tcp::resolver resolver(*io_context_); @@ -67,6 +66,10 @@ bool CTcpSSLClientSession::start(const CConnect_IO_Info& io_info) void CTcpSSLClientSession::close(uint32 connect_id) { + if(!socket_.is_open()) + { + return; + } auto self(shared_from_this()); auto recv_data_size = recv_data_size_; diff --git a/PSS_ASIO/TcpSession/TcpSSLSession.cpp b/PSS_ASIO/TcpSession/TcpSSLSession.cpp index cff1a4e..6553e96 100644 --- a/PSS_ASIO/TcpSession/TcpSSLSession.cpp +++ b/PSS_ASIO/TcpSession/TcpSSLSession.cpp @@ -50,6 +50,10 @@ _ClientIPInfo CTcpSSLSession::get_remote_ip(uint32 connect_id) void CTcpSSLSession::close(uint32 connect_id) { + if(!socket_.is_open()) + { + return; + } auto self(shared_from_this()); auto recv_data_size = recv_data_size_; diff --git a/PSS_ASIO/TcpSession/TcpServer.cpp b/PSS_ASIO/TcpSession/TcpServer.cpp index 0430646..6191c4f 100644 --- a/PSS_ASIO/TcpSession/TcpServer.cpp +++ b/PSS_ASIO/TcpSession/TcpServer.cpp @@ -5,6 +5,7 @@ CTcpServer::CTcpServer(const CreateIoContextCallbackFunc callback, const std::st { try { + accept_run_state_ = true; callback_ = callback; asio::io_context* iocontext = callback_(); acceptor_ = std::make_shared(*iocontext, tcp::endpoint(asio::ip::address_v4::from_string(server_ip), port)); @@ -18,20 +19,22 @@ CTcpServer::CTcpServer(const CreateIoContextCallbackFunc callback, const std::st } catch (std::system_error const& ex) { - PSS_LOGGER_INFO("[CTcpServer::do_accept]({0}:{1}) accept error {2}.", server_ip, port, ex.what()); + PSS_LOGGER_WARN("[CTcpServer::do_accept]({0}:{1}) accept error {2}.", server_ip, port, ex.what()); } } -void CTcpServer::close() const +void CTcpServer::close() { - if (nullptr != acceptor_) - { - acceptor_->close(); - } + PSS_LOGGER_INFO("[CTcpServer::close]stop tcp server[{0}:{1}]", server_ip_, server_port_); + accept_run_state_ = false; } void CTcpServer::do_accept() { + if (!accept_run_state_) + { + return; + } acceptor_->async_accept( [this](std::error_code ec, tcp::socket socket) { @@ -53,10 +56,23 @@ void CTcpServer::do_accept() PSS_LOGGER_WARN("[CTcpServer::do_accept]close tcp server[{}:{}], error={}",server_ip_,server_port_, ex.what()); } }); + + if (!accept_run_state_) + { + if (nullptr != acceptor_) + { + acceptor_->close(); + } + return; + } } -void CTcpServer::send_accept_listen_fail(std::error_code ec) const +void CTcpServer::send_accept_listen_fail(std::error_code ec) { + if (!accept_run_state_) + { + return; + } //发送监听失败消息 App_WorkThreadLogic::instance()->add_frame_events(LOGIC_LISTEN_SERVER_ERROR, 0, @@ -65,7 +81,7 @@ void CTcpServer::send_accept_listen_fail(std::error_code ec) const EM_CONNECT_IO_TYPE::CONNECT_IO_TCP); //监听失败,查看错误信息 - PSS_LOGGER_INFO("[CTcpServer::do_accept]({0}{1})accept error:{2}", + PSS_LOGGER_INFO("[CTcpServer::do_accept]({0}:{1})accept error:{2}", acceptor_->local_endpoint().address().to_string(), acceptor_->local_endpoint().port(), ec.message()); diff --git a/PSS_ASIO/TcpSession/TcpServer.h b/PSS_ASIO/TcpSession/TcpServer.h index ee61dce..ff3c130 100644 --- a/PSS_ASIO/TcpSession/TcpServer.h +++ b/PSS_ASIO/TcpSession/TcpServer.h @@ -8,18 +8,19 @@ class CTcpServer public: CTcpServer(const CreateIoContextCallbackFunc callback, const std::string& server_ip, io_port_type port, uint32 packet_parse_id, uint32 max_recv_size); - void close() const; + void close(); private: void do_accept(); - void send_accept_listen_fail(std::error_code ec) const; + void send_accept_listen_fail(std::error_code ec); std::shared_ptr acceptor_; uint32 packet_parse_id_ = 0; uint32 max_recv_size_ = 0; CreateIoContextCallbackFunc callback_; - + bool accept_run_state_; + string server_ip_; io_port_type server_port_; }; diff --git a/PSS_ASIO/TcpSession/TcpSession.cpp b/PSS_ASIO/TcpSession/TcpSession.cpp index 5967c0a..c327eb5 100644 --- a/PSS_ASIO/TcpSession/TcpSession.cpp +++ b/PSS_ASIO/TcpSession/TcpSession.cpp @@ -70,6 +70,10 @@ _ClientIPInfo CTcpSession::get_remote_ip(uint32 connect_id) void CTcpSession::close(uint32 connect_id) { + if(!socket_.is_open()) + { + return; + } auto self(shared_from_this()); auto recv_data_size = recv_data_size_; @@ -99,7 +103,6 @@ void CTcpSession::close(uint32 connect_id) App_WorkThreadLogic::instance()->delete_thread_session(connect_id, self); }); - } void CTcpSession::do_read() diff --git a/PSS_ASIO/UdpSession/KcpServer.cpp b/PSS_ASIO/UdpSession/KcpServer.cpp index 76377ef..def6fb3 100644 --- a/PSS_ASIO/UdpSession/KcpServer.cpp +++ b/PSS_ASIO/UdpSession/KcpServer.cpp @@ -166,6 +166,10 @@ void CKcpServer::do_receive_from(std::error_code ec, std::size_t length) void CKcpServer::close(uint32 connect_id) { + if(!socket_.is_open()) + { + return; + } auto self(shared_from_this()); io_context_->dispatch([self, connect_id]() diff --git a/PSS_ASIO/UdpSession/UdpClientSession.cpp b/PSS_ASIO/UdpSession/UdpClientSession.cpp index c550fd4..0ad0842 100644 --- a/PSS_ASIO/UdpSession/UdpClientSession.cpp +++ b/PSS_ASIO/UdpSession/UdpClientSession.cpp @@ -1,7 +1,6 @@ #include "UdpClientSession.h" -CUdpClientSession::CUdpClientSession(asio::io_context* io_context) - : socket_(*io_context), io_context_(io_context) +CUdpClientSession::CUdpClientSession(asio::io_context* io_context) : socket_(*io_context), io_context_(io_context) { } @@ -62,8 +61,12 @@ void CUdpClientSession::start(const CConnect_IO_Info& io_type) local_ip.m_strClientIP = socket_.local_endpoint().address().to_string(); local_ip.m_u2Port = socket_.local_endpoint().port(); - PSS_LOGGER_DEBUG("[CUdpClientSession::start]remote({0}:{1})", remote_ip.m_strClientIP, remote_ip.m_u2Port); - PSS_LOGGER_DEBUG("[CUdpClientSession::start]local({0}:{1})", local_ip.m_strClientIP, local_ip.m_u2Port); + PSS_LOGGER_INFO("[CUdpClientSession::start]connect_id:{} remote[{}:{}] local[{}:{}]", + connect_id_, + remote_ip.m_strClientIP, + remote_ip.m_u2Port, + local_ip.m_strClientIP, + local_ip.m_u2Port); packet_parse_interface_->packet_connect_ptr_(connect_id_, remote_ip, local_ip, io_type_, App_IoBridge::instance()); @@ -89,6 +92,10 @@ void CUdpClientSession::start(const CConnect_IO_Info& io_type) void CUdpClientSession::close(uint32 connect_id) { + if(!socket_.is_open()) + { + return; + } auto self(shared_from_this()); auto io_type = io_type_; @@ -146,9 +153,7 @@ void CUdpClientSession::do_write(uint32 connect_id) void CUdpClientSession::set_write_buffer(uint32 connect_id, const char* data, size_t length) { - std::memcpy(session_send_buffer_.get_curr_write_ptr(), - data, - length); + std::memcpy(session_send_buffer_.get_curr_write_ptr(),data,length); session_send_buffer_.set_write_data(length); } @@ -204,6 +209,11 @@ bool CUdpClientSession::is_need_send_format() void CUdpClientSession::send_io_data(uint32 connect_id, std::shared_ptr send_buffer) { + if(!socket_.is_open()) + { + PSS_LOGGER_WARN("[CUdpClientSession::send_io_data]connect_id={0}, socket is closed.", connect_id); + return; + } clear_write_buffer(send_buffer->buffer_length_); //异步发送 @@ -217,7 +227,7 @@ void CUdpClientSession::send_io_data(uint32 connect_id, std::shared_ptrclose_server(); + } + return; + } auto self(shared_from_this()); socket_.async_receive_from( asio::buffer(session_recv_buffer_.get_curr_write_ptr(), session_recv_buffer_.get_buffer_size()), recv_endpoint_, @@ -69,10 +78,20 @@ void CUdpServer::do_receive() { self->do_receive_from(ec, length); }); + + if (!udp_run_state_) + { + this->close_server(); + return; + } } void CUdpServer::do_receive_from(std::error_code ec, std::size_t length) { + if(!udp_run_state_) + { + return; + } try { //查询当前的connect_id @@ -143,14 +162,27 @@ void CUdpServer::do_receive_from(std::error_code ec, std::size_t length) void CUdpServer::close(uint32 connect_id) { + PSS_LOGGER_DEBUG("[CUdpServer::close]start connect_id={0}",connect_id); + if(!udp_run_state_) + { + return; + } auto self(shared_from_this()); io_context_->dispatch([self, connect_id]() { self->close_udp_endpoint_by_id(connect_id); }); + PSS_LOGGER_DEBUG("[CUdpServer::close]end connect_id={0}",connect_id); } void CUdpServer::close_all() +{ + PSS_LOGGER_DEBUG("[CUdpServer::close_all]start size1={} size2={}",udp_id_2_endpoint_list_.size(),udp_endpoint_2_id_list_.size()); + udp_run_state_ = false; + PSS_LOGGER_DEBUG("[CUdpServer::close_all]end"); +} + +void CUdpServer::close_server() { //释放所有kcp资源 for (const auto& session_info : udp_id_2_endpoint_list_) @@ -236,7 +268,7 @@ void CUdpServer::do_write_immediately(uint32 connect_id, const char* data, size_ if (session_info == nullptr) { - PSS_LOGGER_DEBUG("[CUdpServer::do_write]({}) is nullptr.", connect_id); + PSS_LOGGER_DEBUG("[CUdpServer::do_write_immediately]({}) is nullptr.", connect_id); return; } @@ -281,8 +313,6 @@ uint32 CUdpServer::add_udp_endpoint(const udp::endpoint& recv_endpoint, size_t l } else { - std::lock_guard lock(udp_mutex_); - //生成一个新的ID auto connect_id = App_ConnectCounter::instance()->CreateCounter(); @@ -339,7 +369,6 @@ shared_ptr CUdpServer::find_udp_endpoint_by_id(uint32 connect void CUdpServer::close_udp_endpoint_by_id(uint32 connect_id) { - std::lock_guard lock(udp_mutex_); auto self(shared_from_this()); _ClientIPInfo remote_ip; @@ -355,7 +384,7 @@ void CUdpServer::close_udp_endpoint_by_id(uint32 connect_id) //清理链接关系 auto session_endpoint = f->second->send_endpoint; - udp_id_2_endpoint_list_.erase(f); + udp_id_2_endpoint_list_.erase(connect_id); udp_endpoint_2_id_list_.erase(session_endpoint); } diff --git a/PSS_ASIO/UdpSession/UdpServer.h b/PSS_ASIO/UdpSession/UdpServer.h index bf1f088..54aabe5 100644 --- a/PSS_ASIO/UdpSession/UdpServer.h +++ b/PSS_ASIO/UdpSession/UdpServer.h @@ -72,6 +72,8 @@ class CUdpServer : public std::enable_shared_from_this, public ISess void set_io_bridge_connect_id(uint32 from_io_connect_id, uint32 to_io_connect_id) final; private: + void close_server(); + void do_receive(); void do_receive_from(std::error_code ec, std::size_t length); @@ -105,7 +107,7 @@ class CUdpServer : public std::enable_shared_from_this, public ISess using hashmapcid_recv_data_time = unordered_map; hashmapcid_recv_data_time cid_recv_data_time_; - std::mutex udp_mutex_; + bool udp_run_state_; string server_ip_; io_port_type server_port_;