Skip to content

Commit

Permalink
Optimize TCP disconnect logic code.
Browse files Browse the repository at this point in the history
  • Loading branch information
freeeyes committed Mar 18, 2024
1 parent 37cbdaa commit 0376fda
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 78 deletions.
8 changes: 3 additions & 5 deletions PSS_ASIO/Message/ModuleLogic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ void CWorkThreadLogic::add_thread_session(uint32 connect_id, shared_ptr<ISession
#endif
}

void CWorkThreadLogic::delete_thread_session(uint32 connect_id, shared_ptr<ISession> session)
void CWorkThreadLogic::delete_thread_session(uint32 connect_id, shared_ptr<ISession> session, const _ClientIPInfo& local_info, EM_CONNECT_IO_TYPE io_type)
{
std::lock_guard <std::recursive_mutex> lock(plugin_timer_mutex_);

Expand All @@ -363,12 +363,10 @@ void CWorkThreadLogic::delete_thread_session(uint32 connect_id, shared_ptr<ISess
module_logic->delete_session_interface(connect_id);

//看看是否需要取消桥接逻辑
App_IoBridge::instance()->unregedit_bridge_session_info(session->get_remote_ip(connect_id),
session->get_io_type(),
App_IoBridge::instance()->unregedit_bridge_session_info(local_info,
io_type,
connect_id);

auto io_type = session->get_io_type();

//向插件告知链接断开消息
App_tms::instance()->AddMessage(curr_thread_index, [connect_id, server_id, io_type, module_logic]() {
CMessage_Source source;
Expand Down
2 changes: 1 addition & 1 deletion PSS_ASIO/Message/ModuleLogic.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class CWorkThreadLogic

void add_thread_session(uint32 connect_id, shared_ptr<ISession> session, const _ClientIPInfo& local_info, const _ClientIPInfo& romote_info);

void delete_thread_session(uint32 connect_id, shared_ptr<ISession> session);
void delete_thread_session(uint32 connect_id, shared_ptr<ISession> session, const _ClientIPInfo& local_info, EM_CONNECT_IO_TYPE io_type);

void close_session_event(uint32 connect_id, shared_ptr<ISession> session);

Expand Down
5 changes: 4 additions & 1 deletion PSS_ASIO/TTySession/TtyServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,10 @@ void CTTyServer::close_immediaterly()
self->packet_parse_interface_->packet_disconnect_ptr_(self->connect_id_, io_type, App_IoBridge::instance());

//删除映射关系
App_WorkThreadLogic::instance()->delete_thread_session(self->connect_id_, self);
App_WorkThreadLogic::instance()->delete_thread_session(self->connect_id_,
self,
remote_ip,
io_type);

self->io_list_manager_->del_accept_net_io_event(self->tty_name_, self->tty_port_, EM_CONNECT_IO_TYPE::CONNECT_IO_TTY);
});
Expand Down
5 changes: 4 additions & 1 deletion PSS_ASIO/TcpSession/TcpClientSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ void CTcpClientSession::close(uint32 connect_id)
self->packet_parse_interface_->packet_disconnect_ptr_(connect_id, io_type, App_IoBridge::instance());

//发送链接断开消息
App_WorkThreadLogic::instance()->delete_thread_session(connect_id, self);
App_WorkThreadLogic::instance()->delete_thread_session(connect_id,
self,
remote_ip,
io_type);
self->socket_.close();
});

Expand Down
5 changes: 4 additions & 1 deletion PSS_ASIO/TcpSession/TcpSSLClientSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ void CTcpSSLClientSession::close(uint32 connect_id)
self->packet_parse_interface_->packet_disconnect_ptr_(connect_id, io_type, App_IoBridge::instance());

//发送链接断开消息
App_WorkThreadLogic::instance()->delete_thread_session(connect_id, self);
App_WorkThreadLogic::instance()->delete_thread_session(connect_id,
self,
remote_ip,
io_type);

self->ssl_socket_.lowest_layer().close();
});
Expand Down
5 changes: 4 additions & 1 deletion PSS_ASIO/TcpSession/TcpSSLSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ void CTcpSSLSession::close(uint32 connect_id)
//断开连接
self->packet_parse_interface_->packet_disconnect_ptr_(connect_id, io_type, App_IoBridge::instance());

App_WorkThreadLogic::instance()->delete_thread_session(connect_id, self);
App_WorkThreadLogic::instance()->delete_thread_session(connect_id,
self,
remote_ip,
io_type);
self->ssl_socket_.lowest_layer().close();
});
}
Expand Down
5 changes: 4 additions & 1 deletion PSS_ASIO/TcpSession/TcpSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ void CTcpSession::close_immediaterly()
//断开连接
self->packet_parse_interface_->packet_disconnect_ptr_(self->connect_id_, io_type, App_IoBridge::instance());

App_WorkThreadLogic::instance()->delete_thread_session(self->connect_id_, self);
App_WorkThreadLogic::instance()->delete_thread_session(self->connect_id_,
self,
remote_ip,
io_type);
self->socket_.close();
});
}
Expand Down
5 changes: 4 additions & 1 deletion PSS_ASIO/UdpSession/KcpServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,10 @@ void CKcpServer::close_udp_endpoint_by_id(uint32 connect_id)
}

//删除映射关系
App_WorkThreadLogic::instance()->delete_thread_session(connect_id, self);
App_WorkThreadLogic::instance()->delete_thread_session(connect_id,
self,
get_remote_ip(connect_id),
io_type_);
}

void CKcpServer::add_send_finish_size(uint32 connect_id, size_t length)
Expand Down
8 changes: 6 additions & 2 deletions PSS_ASIO/UdpSession/UdpClientSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ void CUdpClientSession::close(uint32 connect_id)
auto self(shared_from_this());

auto io_type = io_type_;
auto remote_info = get_remote_ip(connect_id);

io_context_->dispatch([self, connect_id, io_type]()
io_context_->dispatch([self, connect_id, remote_info, io_type]()
{
self->packet_parse_interface_->packet_disconnect_ptr_(connect_id, io_type, App_IoBridge::instance());

Expand All @@ -96,7 +97,10 @@ void CUdpClientSession::close(uint32 connect_id)
self->recv_data_size_,
self->send_data_size_);

App_WorkThreadLogic::instance()->delete_thread_session(connect_id, self);
App_WorkThreadLogic::instance()->delete_thread_session(connect_id,
self,
remote_info,
io_type);
self->socket_.close();
});
}
Expand Down
107 changes: 43 additions & 64 deletions PSS_ASIO/UdpSession/UdpServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,40 +195,32 @@ void CUdpServer::close()

void CUdpServer::close_server()
{
std::vector<uint32> veccid;
std::lock_guard <std::recursive_mutex> lock(udp_session_mutex_);
if (!socket_.is_open())
{
std::lock_guard <std::recursive_mutex> lock(udp_session_mutex_);
if (!socket_.is_open())
{
return;
}
//释放所有udp资源
for (const auto& session_info : udp_id_2_endpoint_list_)
{
auto connect_id = session_info.first;
//这里发送数据通知()
packet_parse_interface_->packet_disconnect_ptr_(connect_id, io_type_, App_IoBridge::instance());

veccid.push_back(connect_id);
}
return;
}

for (auto cid : veccid)
//释放所有udp资源
for (const auto& session_info : udp_id_2_endpoint_list_)
{
App_WorkThreadLogic::instance()->delete_thread_session(cid, shared_from_this());
auto connect_id = session_info.first;
//这里发送数据通知()
packet_parse_interface_->packet_disconnect_ptr_(connect_id, io_type_, App_IoBridge::instance());

//发送处理IO断开事件
_ClientIPInfo remote_ip_info;
remote_ip_info.m_strClientIP = session_info.second->send_endpoint.address().to_string();
remote_ip_info.m_u2Port = session_info.second->send_endpoint.port();

App_WorkThreadLogic::instance()->delete_thread_session(connect_id,
shared_from_this(),
remote_ip_info,
io_type_);
}

{
std::lock_guard <std::recursive_mutex> lock(udp_session_mutex_);
if (!socket_.is_open())
{
return;
}

udp_id_2_endpoint_list_.clear();
udp_endpoint_2_id_list_.clear();
socket_.close();
}
udp_id_2_endpoint_list_.clear();
udp_endpoint_2_id_list_.clear();
socket_.close();

PSS_LOGGER_DEBUG("[CUdpServer::close_server]close [{0}:{1}]", server_ip_, server_port_);
}
Expand Down Expand Up @@ -400,50 +392,37 @@ shared_ptr<CUdp_Session_Info> CUdpServer::find_udp_endpoint_by_id(uint32 connect

void CUdpServer::close_udp_endpoint_by_id(uint32 connect_id)
{
std::lock_guard <std::recursive_mutex> lock(udp_session_mutex_);
auto self(shared_from_this());

if (!socket_.is_open())
{
std::lock_guard <std::recursive_mutex> lock(udp_session_mutex_);
if (!socket_.is_open())
{
return;
}
return;
}

_ClientIPInfo remote_ip;
_ClientIPInfo remote_ip;

auto f = udp_id_2_endpoint_list_.find(connect_id);
if (f != udp_id_2_endpoint_list_.end())
{
//调用packet parse 断开消息
packet_parse_interface_->packet_disconnect_ptr_(connect_id, io_type_, App_IoBridge::instance());
auto f = udp_id_2_endpoint_list_.find(connect_id);
if (f != udp_id_2_endpoint_list_.end())
{
//调用packet parse 断开消息
packet_parse_interface_->packet_disconnect_ptr_(connect_id, io_type_, App_IoBridge::instance());

remote_ip.m_strClientIP = f->second->send_endpoint.address().to_string();
remote_ip.m_u2Port = f->second->send_endpoint.port();
}
remote_ip.m_strClientIP = f->second->send_endpoint.address().to_string();
remote_ip.m_u2Port = f->second->send_endpoint.port();

auto iter=cid_recv_data_time_.find(connect_id);
if(iter != cid_recv_data_time_.end())
{
cid_recv_data_time_.erase(iter);
}
}
App_WorkThreadLogic::instance()->delete_thread_session(connect_id, self, remote_ip, io_type_);

App_WorkThreadLogic::instance()->delete_thread_session(connect_id, self);
//清理链接关系
auto session_endpoint = f->second->send_endpoint;
udp_id_2_endpoint_list_.erase(connect_id);
udp_endpoint_2_id_list_.erase(session_endpoint);
}

auto iter=cid_recv_data_time_.find(connect_id);
if(iter != cid_recv_data_time_.end())
{
std::lock_guard <std::recursive_mutex> lock(udp_session_mutex_);
if (!socket_.is_open())
{
return;
}

auto f = udp_id_2_endpoint_list_.find(connect_id);
if (f != udp_id_2_endpoint_list_.end())
{
//清理链接关系
auto session_endpoint = f->second->send_endpoint;
udp_id_2_endpoint_list_.erase(connect_id);
udp_endpoint_2_id_list_.erase(session_endpoint);
}
cid_recv_data_time_.erase(iter);
}
}

Expand Down

0 comments on commit 0376fda

Please sign in to comment.