diff --git a/Module_Logic/Test_Logic/BaseCommand.cpp b/Module_Logic/Test_Logic/BaseCommand.cpp index 32abae4..dd481f0 100644 --- a/Module_Logic/Test_Logic/BaseCommand.cpp +++ b/Module_Logic/Test_Logic/BaseCommand.cpp @@ -101,6 +101,8 @@ void CBaseCommand::logic_connect_udp() io_info.server_ip = "127.0.0.1"; io_info.server_port = 10005; io_info.server_id = 1002; + io_info.client_ip = "127.0.0.1"; + io_info.client_port = 10094; io_info.packet_parse_id = 1; session_service_->connect_io_server(io_info, io_type); diff --git a/PSS_ASIO/Message/SessionInterface.cpp b/PSS_ASIO/Message/SessionInterface.cpp index 7ad3d37..172a9b3 100644 --- a/PSS_ASIO/Message/SessionInterface.cpp +++ b/PSS_ASIO/Message/SessionInterface.cpp @@ -109,7 +109,7 @@ void CSessionInterface::check_session_io_timeout(uint32 connect_timeout, vector< } -bool CSessionInterface::is_need_check_session(std::chrono::steady_clock::time_point& check_connect_time, uint32& connect_timeout) +bool CSessionInterface::is_need_check_session(const std::chrono::steady_clock::time_point& check_connect_time, const uint32& connect_timeout) { std::chrono::duration> elapsed = check_connect_time - check_connect_time_; if (elapsed.count() >= connect_timeout) diff --git a/PSS_ASIO/Message/SessionInterface.h b/PSS_ASIO/Message/SessionInterface.h index fec54ba..6082bb5 100644 --- a/PSS_ASIO/Message/SessionInterface.h +++ b/PSS_ASIO/Message/SessionInterface.h @@ -41,7 +41,7 @@ class CSessionInterface void check_session_io_timeout(uint32 connect_timeout, vector& session_list); - bool is_need_check_session(std::chrono::steady_clock::time_point& check_connect_time, uint32& connect_timeout); + bool is_need_check_session(const std::chrono::steady_clock::time_point& check_connect_time, const uint32& connect_timeout); void start_check(); diff --git a/PSS_ASIO/TTySession/TtyServer.cpp b/PSS_ASIO/TTySession/TtyServer.cpp index 494d67a..ceace5a 100644 --- a/PSS_ASIO/TTySession/TtyServer.cpp +++ b/PSS_ASIO/TTySession/TtyServer.cpp @@ -85,6 +85,13 @@ void CTTyServer::do_receive() self->tty_name_, self->tty_port_, ec.message()); + + App_WorkThreadLogic::instance()->add_frame_events(LOGIC_LISTEN_SERVER_ERROR, + self->server_id_, + self->tty_name_, + self->tty_port_, + EM_CONNECT_IO_TYPE::CONNECT_IO_TTY); + return; } }); @@ -149,6 +156,9 @@ void CTTyServer::do_write(uint32 connect_id) send_buffer->data_.append(session_send_buffer_.read(), session_send_buffer_.get_write_size()); send_buffer->buffer_length_ = session_send_buffer_.get_write_size(); + //记录放入缓存的大小 + send_buffer_size_ += send_buffer->buffer_length_; + clear_write_buffer(); //异步发送 @@ -156,22 +166,39 @@ void CTTyServer::do_write(uint32 connect_id) serial_port_param_->async_write_some(asio::buffer(send_buffer->data_.c_str(), send_buffer->buffer_length_), [self, send_buffer, connect_id](std::error_code ec, std::size_t length) { - if (ec) - { - //回调消息处理失败信息 - PSS_LOGGER_DEBUG("[CTTyServer::do_write]({0})write error({1}).", connect_id, ec.message()); - - self->send_write_fail_to_logic(send_buffer->data_, length); - } - else - { - self->add_send_finish_size(connect_id, length); - } + self->do_write_finish(ec, connect_id, send_buffer, length); }); clear_write_buffer(); } +void CTTyServer::do_write_finish(std::error_code& ec, uint32 connect_id, std::shared_ptr send_buffer, std::size_t length) +{ + if (ec) + { + //回调消息处理失败信息 + PSS_LOGGER_DEBUG("[CTTyServer::do_write]({0})write error({1}).", connect_id, ec.message()); + + send_write_fail_to_logic(send_buffer->data_, length); + + if (true == is_active_close_) + { + close_immediaterly(); + } + } + else + { + add_send_finish_size(connect_id, length); + + if (true == is_active_close_ + && send_buffer_size_ == send_data_size_) + { + //关闭客户端 + close_immediaterly(); + } + } +} + void CTTyServer::do_write_immediately(uint32 connect_id, const char* data, size_t length) { PSS_UNUSED_ARG(connect_id); @@ -216,19 +243,35 @@ EM_CONNECT_IO_TYPE CTTyServer::get_io_type() } void CTTyServer::close(uint32 connect_id) +{ + //如果缓冲中不存在等待发送的数据,则直接关闭 + if (send_buffer_size_ == send_data_size_) + { + close_immediaterly(); + } + else + { + //如果缓冲区有数据,直接发送 + do_write(connect_id_); + + is_active_close_ = true; + } +} + +void CTTyServer::close_immediaterly() { auto self(shared_from_this()); auto io_type = io_type_; _ClientIPInfo remote_ip = remote_ip_; - io_context_->dispatch([self, connect_id, io_type, remote_ip]() + io_context_->dispatch([self, io_type, remote_ip]() { - PSS_UNUSED_ARG(connect_id); - self->packet_parse_interface_->packet_disconnect_ptr_(connect_id, io_type, App_IoBridge::instance()); + PSS_UNUSED_ARG(self->connect_id_); + self->packet_parse_interface_->packet_disconnect_ptr_(self->connect_id_, io_type, App_IoBridge::instance()); //删除映射关系 - App_WorkThreadLogic::instance()->delete_thread_session(connect_id, self); + App_WorkThreadLogic::instance()->delete_thread_session(self->connect_id_, self); self->io_list_manager_->del_accept_net_io_event(self->tty_name_, self->tty_port_, EM_CONNECT_IO_TYPE::CONNECT_IO_TTY); }); diff --git a/PSS_ASIO/TTySession/TtyServer.h b/PSS_ASIO/TTySession/TtyServer.h index bc1b120..05053c7 100644 --- a/PSS_ASIO/TTySession/TtyServer.h +++ b/PSS_ASIO/TTySession/TtyServer.h @@ -58,12 +58,16 @@ class CTTyServer : public std::enable_shared_from_this, public ISess void send_write_fail_to_logic(const std::string write_fail_buffer, std::size_t buffer_length); private: + void do_write_finish(std::error_code& ec, uint32 connect_id, std::shared_ptr send_buffer, std::size_t length); + void do_receive(); void clear_write_buffer(); bool add_serial_port(asio::io_context* io_context, const std::string& tty_name, uint16 tty_port, uint8 char_size); + void close_immediaterly(); + asio::io_context* io_context_ = nullptr; std::string tty_name_; uint16 tty_port_ = 0; @@ -77,6 +81,9 @@ class CTTyServer : public std::enable_shared_from_this, public ISess size_t recv_data_size_ = 0; size_t send_data_size_ = 0; + size_t send_buffer_size_ = 0; + + bool is_active_close_ = false; std::chrono::steady_clock::time_point recv_data_time_ = std::chrono::steady_clock::now(); CSessionBuffer session_recv_buffer_; diff --git a/PSS_ASIO/TcpSession/TcpServer.cpp b/PSS_ASIO/TcpSession/TcpServer.cpp index b4e606f..1a655c3 100644 --- a/PSS_ASIO/TcpSession/TcpServer.cpp +++ b/PSS_ASIO/TcpSession/TcpServer.cpp @@ -62,12 +62,10 @@ void CTcpServer::do_accept() void CTcpServer::send_accept_listen_fail(std::error_code ec) { //发送监听失败消息 - auto server_ip = server_ip_; - auto server_port = server_port_; App_WorkThreadLogic::instance()->add_frame_events(LOGIC_LISTEN_SERVER_ERROR, 0, - server_ip, - server_port, + server_ip_, + server_port_, EM_CONNECT_IO_TYPE::CONNECT_IO_TCP); //监听失败,查看错误信息 diff --git a/PSS_ASIO/TcpSession/TcpSession.cpp b/PSS_ASIO/TcpSession/TcpSession.cpp index bdc2e07..56a631e 100644 --- a/PSS_ASIO/TcpSession/TcpSession.cpp +++ b/PSS_ASIO/TcpSession/TcpSession.cpp @@ -148,7 +148,7 @@ void CTcpSession::do_write(uint32 connect_id) send_buffer->data_ = session_send_buffer_; send_buffer->buffer_length_ = session_send_buffer_.size(); - //记录存储的地址 + //记录放入缓存的大小 send_buffer_size_+= session_send_buffer_.size(); clear_write_buffer(); diff --git a/PSS_ASIO/UdpSession/UdpServer.cpp b/PSS_ASIO/UdpSession/UdpServer.cpp index 87357c2..d66b072 100644 --- a/PSS_ASIO/UdpSession/UdpServer.cpp +++ b/PSS_ASIO/UdpSession/UdpServer.cpp @@ -150,6 +150,18 @@ void CUdpServer::do_receive_from(std::error_code ec, std::size_t length) session_recv_buffer_.move(length); +#ifdef GCOV_TEST + //测试代码 + auto remote_ip_info = get_remote_ip(connect_id); + PSS_LOGGER_WARN("[CUdpServer::do_receive_from]client udp {}:{} is connect server", + remote_ip_info.m_strClientIP, + remote_ip_info.m_u2Port); + + set_io_bridge_connect_id(connect_id, 3); + regedit_bridge_session_id(connect_id); + set_io_bridge_connect_id(connect_id, 0); +#endif + //持续接收数据 do_receive(); }