From ed97eb6a7b8b906908dc977aeb61558bc4c002b5 Mon Sep 17 00:00:00 2001 From: Josiah VanderZee Date: Tue, 13 Aug 2024 08:14:15 -0500 Subject: [PATCH] Apply RAII to `UnixSocket` This deletes the `UnixSocket` copy constructors and adds move constructors and a destructor. The `NetAccept` and `NetAcceptEvent` classes now store shared pointers to their server instances because of this change. --- include/iocore/eventsystem/UnixSocket.h | 8 +++ src/iocore/dns/DNS.cc | 6 +-- src/iocore/eventsystem/UnixSocket.cc | 22 ++++++++ src/iocore/net/Connection.cc | 8 ++- src/iocore/net/NetAcceptEventIO.cc | 2 +- src/iocore/net/P_Connection.h | 16 ++---- src/iocore/net/P_NetAccept.h | 22 ++++---- src/iocore/net/QUICNetProcessor.cc | 8 +-- src/iocore/net/QUICPacketHandler.cc | 2 +- src/iocore/net/Server.h | 2 + src/iocore/net/UnixNetAccept.cc | 67 +++++++++++++------------ src/iocore/net/UnixNetProcessor.cc | 8 +-- src/iocore/net/UnixNetVConnection.cc | 7 +-- 13 files changed, 102 insertions(+), 76 deletions(-) diff --git a/include/iocore/eventsystem/UnixSocket.h b/include/iocore/eventsystem/UnixSocket.h index 08cbdb7bd13..39d1bcb7fa4 100644 --- a/include/iocore/eventsystem/UnixSocket.h +++ b/include/iocore/eventsystem/UnixSocket.h @@ -65,6 +65,14 @@ class UnixSocket */ UnixSocket(int domain, int ctype, int protocol); + ~UnixSocket(); + + UnixSocket(UnixSocket const &other) = delete; + UnixSocket &operator=(UnixSocket const &other) = delete; + + UnixSocket(UnixSocket &&other); + UnixSocket &operator=(UnixSocket &&other); + int get_fd() const; bool is_ok() const; diff --git a/src/iocore/dns/DNS.cc b/src/iocore/dns/DNS.cc index 1d565c6fa70..2b044729f45 100644 --- a/src/iocore/dns/DNS.cc +++ b/src/iocore/dns/DNS.cc @@ -679,7 +679,7 @@ DNSHandler::retry_named(int ndx, ink_hrtime t, bool reopen) open_cons(&m_res->nsaddr_list[ndx].sa, true, ndx); } bool over_tcp = dns_conn_mode == DNS_CONN_MODE::TCP_ONLY; - UnixSocket con_sock = over_tcp ? tcpcon[ndx].sock : udpcon[ndx].sock; + UnixSocket &con_sock = over_tcp ? tcpcon[ndx].sock : udpcon[ndx].sock; unsigned char buffer[MAX_DNS_REQUEST_LEN]; Dbg(dbg_ctl_dns, "trying to resolve '%s' from DNS connection, ndx %d", try_server_names[try_servers], ndx); int r = _ink_res_mkquery(m_res, try_server_names[try_servers], T_A, buffer, over_tcp); @@ -703,7 +703,7 @@ DNSHandler::try_primary_named(bool reopen) if ((t - last_primary_retry) > DNS_PRIMARY_RETRY_PERIOD) { unsigned char buffer[MAX_DNS_REQUEST_LEN]; bool over_tcp = dns_conn_mode == DNS_CONN_MODE::TCP_ONLY; - UnixSocket con_sock = over_tcp ? tcpcon[0].sock : udpcon[0].sock; + UnixSocket &con_sock = over_tcp ? tcpcon[0].sock : udpcon[0].sock; last_primary_retry = t; Dbg(dbg_ctl_dns, "trying to resolve '%s' from primary DNS connection", try_server_names[try_servers]); int r = _ink_res_mkquery(m_res, try_server_names[try_servers], T_A, buffer, over_tcp); @@ -1183,7 +1183,7 @@ write_dns_event(DNSHandler *h, DNSEntry *e, bool over_tcp) h->release_query_id(e->id[dns_retries - e->retries]); } e->id[dns_retries - e->retries] = i; - UnixSocket con_sock = over_tcp ? h->tcpcon[h->name_server].sock : h->udpcon[h->name_server].sock; + UnixSocket &con_sock = over_tcp ? h->tcpcon[h->name_server].sock : h->udpcon[h->name_server].sock; Dbg(dbg_ctl_dns, "send query (qtype=%d) for %s to fd %d", e->qtype, e->qname, con_sock.get_fd()); int s = con_sock.send(buffer, r, 0); diff --git a/src/iocore/eventsystem/UnixSocket.cc b/src/iocore/eventsystem/UnixSocket.cc index 17425ea219e..1be13d244ef 100644 --- a/src/iocore/eventsystem/UnixSocket.cc +++ b/src/iocore/eventsystem/UnixSocket.cc @@ -23,6 +23,7 @@ #include "iocore/eventsystem/UnixSocket.h" +#include "tscore/Diags.h" #include "tscore/ink_apidefs.h" #include "tscore/ink_config.h" #include "tscore/TextBuffer.h" @@ -50,6 +51,27 @@ static int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int fl #endif static unsigned int read_uint_from_fd(int fd); +UnixSocket::~UnixSocket() +{ + if (this->is_ok()) { + Warning("Dropped UnixSocket without closing socket.\n"); + } +} + +UnixSocket::UnixSocket(UnixSocket &&other) +{ + this->fd = other.fd; + other.fd = NO_SOCK; +} + +UnixSocket & +UnixSocket::operator=(UnixSocket &&other) +{ + this->fd = other.fd; + other.fd = NO_SOCK; + return *this; +} + int UnixSocket::set_nonblocking() { diff --git a/src/iocore/net/Connection.cc b/src/iocore/net/Connection.cc index 26ab6d996d3..40f8dc7eb03 100644 --- a/src/iocore/net/Connection.cc +++ b/src/iocore/net/Connection.cc @@ -72,9 +72,7 @@ Connection::move(Connection &orig) { this->is_connected = orig.is_connected; this->is_bound = orig.is_bound; - this->sock = orig.sock; - // The target has taken ownership of the file descriptor - orig.sock = UnixSocket{NO_FD}; - this->addr = orig.addr; - this->sock_type = orig.sock_type; + this->sock = std::move(orig.sock); + this->addr = orig.addr; + this->sock_type = orig.sock_type; } diff --git a/src/iocore/net/NetAcceptEventIO.cc b/src/iocore/net/NetAcceptEventIO.cc index 876c663ebf6..700abea5c98 100644 --- a/src/iocore/net/NetAcceptEventIO.cc +++ b/src/iocore/net/NetAcceptEventIO.cc @@ -28,7 +28,7 @@ int NetAcceptEventIO::start(EventLoop l, NetAccept *na, int events) { _na = na; - return start_common(l, _na->server.sock.get_fd(), events); + return start_common(l, _na->server->sock.get_fd(), events); } void NetAcceptEventIO::process_event(int /* flags ATS_UNUSED */) diff --git a/src/iocore/net/P_Connection.h b/src/iocore/net/P_Connection.h index 947b5750f12..65ad101863a 100644 --- a/src/iocore/net/P_Connection.h +++ b/src/iocore/net/P_Connection.h @@ -109,7 +109,8 @@ struct Connection { virtual ~Connection(); Connection(); - Connection(Connection const &that) = delete; + Connection(Connection const &that) = delete; + Connection &operator=(Connection const &that) = delete; /// Default options. static NetVCOptions const DEFAULT_OPTIONS; @@ -119,15 +120,6 @@ struct Connection { */ void move(Connection &); -protected: - /** Assignment operator. - * - * @param that Source object. - * @return @a this - * - * This is protected because it is not safe in the general case, but is valid for - * certain subclasses. Those provide a public assignment that depends on this method. - */ - Connection &operator=(Connection const &that) = default; - void _cleanup(); +private: + void _cleanup(); }; diff --git a/src/iocore/net/P_NetAccept.h b/src/iocore/net/P_NetAccept.h index 5a38940822d..6e49b028b56 100644 --- a/src/iocore/net/P_NetAccept.h +++ b/src/iocore/net/P_NetAccept.h @@ -42,9 +42,11 @@ #include "iocore/net/NetAcceptEventIO.h" #include "Server.h" -#include #include "tscore/ink_platform.h" +#include +#include + struct NetAccept; struct HttpProxyPort; class Event; @@ -62,7 +64,7 @@ class UnixNetVConnection; // TODO fix race between cancel accept and call back struct NetAcceptAction : public Action, public RefCountObjInHeap { - Server *server; + std::shared_ptr server; void cancel(Continuation *cont = nullptr) override @@ -89,14 +91,14 @@ struct NetAcceptAction : public Action, public RefCountObjInHeap { // Handles accepting connections. // struct NetAccept : public Continuation { - ink_hrtime period = 0; - Server server; - AcceptFunctionPtr accept_fn = nullptr; - int ifd = NO_FD; - int id = -1; - Ptr action_; - SSLNextProtocolAccept *snpa = nullptr; - NetAcceptEventIO ep; + ink_hrtime period = 0; + std::shared_ptr server; + AcceptFunctionPtr accept_fn = nullptr; + int ifd = NO_FD; + int id = -1; + Ptr action_; + SSLNextProtocolAccept *snpa = nullptr; + NetAcceptEventIO ep; HttpProxyPort *proxyPort = nullptr; AcceptOptions opt; diff --git a/src/iocore/net/QUICNetProcessor.cc b/src/iocore/net/QUICNetProcessor.cc index d2e273e6131..b1ffd07c9c5 100644 --- a/src/iocore/net/QUICNetProcessor.cc +++ b/src/iocore/net/QUICNetProcessor.cc @@ -250,13 +250,13 @@ QUICNetProcessor::main_accept(Continuation *cont, SOCKET fd, AcceptOptions const ink_assert(0 < opt.local_port && opt.local_port < 65536); accept_ip.network_order_port() = htons(opt.local_port); - na->accept_fn = net_accept; - na->server.sock = UnixSocket{fd}; - ats_ip_copy(&na->server.accept_addr, &accept_ip); + na->accept_fn = net_accept; + na->server->sock = UnixSocket{fd}; + ats_ip_copy(&na->server->accept_addr, &accept_ip); na->action_ = new NetAcceptAction(); *na->action_ = cont; - na->action_->server = &na->server; + na->action_->server = na->server; na->init_accept(); return na->action_.get(); diff --git a/src/iocore/net/QUICPacketHandler.cc b/src/iocore/net/QUICPacketHandler.cc index fada4eed5b6..b892e1d09e7 100644 --- a/src/iocore/net/QUICPacketHandler.cc +++ b/src/iocore/net/QUICPacketHandler.cc @@ -160,7 +160,7 @@ QUICPacketHandlerIn::acceptEvent(int event, void *data) } else if (event == EVENT_IMMEDIATE) { this->setThreadAffinity(this_ethread()); SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread()); - udpNet.UDPBind((Continuation *)this, &this->server.accept_addr.sa, -1, 1048576, 1048576); + udpNet.UDPBind((Continuation *)this, &this->server->accept_addr.sa, -1, 1048576, 1048576); return EVENT_CONT; } diff --git a/src/iocore/net/Server.h b/src/iocore/net/Server.h index fe91443ab5f..e8c7dcc5f98 100644 --- a/src/iocore/net/Server.h +++ b/src/iocore/net/Server.h @@ -29,6 +29,8 @@ #include "tscore/ink_inet.h" #include "tscore/ink_memory.h" +#include + struct Server { /// Client side (inbound) local IP address. IpEndpoint accept_addr; diff --git a/src/iocore/net/UnixNetAccept.cc b/src/iocore/net/UnixNetAccept.cc index b8c91d3c1c0..b0dfd0e95ca 100644 --- a/src/iocore/net/UnixNetAccept.cc +++ b/src/iocore/net/UnixNetAccept.cc @@ -28,6 +28,8 @@ #include "P_Net.h" #include "tscore/ink_inet.h" +#include + using NetAcceptHandler = int (NetAccept::*)(int, void *); namespace @@ -98,11 +100,11 @@ net_accept(NetAccept *na, void *ep, bool blockable) // do-while for accepting all the connections // added by YTS Team, yamsat do { - if ((res = na->server.accept(&con)) < 0) { + if ((res = na->server->accept(&con)) < 0) { if (res == -EAGAIN || res == -ECONNABORTED || res == -EPIPE) { goto Ldone; } - if (na->server.sock.is_ok() && !na->action_->cancelled) { + if (na->server->sock.is_ok() && !na->action_->cancelled) { if (!blockable) { na->action_->continuation->handleEvent(EVENT_ERROR, (void *)static_cast(res)); } else { @@ -142,7 +144,7 @@ net_accept(NetAccept *na, void *ep, bool blockable) } #ifdef USE_EDGE_TRIGGER // Set the vc as triggered and place it in the read ready queue later in case there is already data on the socket. - if (na->server.http_accept_filter) { + if (na->server->http_accept_filter) { vc->read.triggered = 1; } #endif @@ -216,10 +218,10 @@ NetAccept::init_accept_loop() for (i = 0; i < n; i++) { NetAccept *a = (i < n - 1) ? clone() : this; - snprintf(thr_name, MAX_THREAD_NAME_LENGTH, "[ACCEPT %d:%d]", i, ats_ip_port_host_order(&server.accept_addr)); + snprintf(thr_name, MAX_THREAD_NAME_LENGTH, "[ACCEPT %d:%d]", i, ats_ip_port_host_order(&server->accept_addr)); eventProcessor.spawn_thread(a, thr_name, stacksize); Dbg(dbg_ctl_iocore_net_accept_start, "Created accept thread #%d for port %d", i + 1, - ats_ip_port_host_order(&server.accept_addr)); + ats_ip_port_host_order(&server->accept_addr)); } } @@ -309,7 +311,7 @@ NetAccept::stop_accept() if (!action_->cancelled) { action_->cancel(); } - server.close(); + server->close(); } int @@ -330,16 +332,16 @@ NetAccept::do_listen_impl(bool non_blocking) { int res = 0; - if (server.sock.is_ok()) { - if ((res = server.setup_fd_for_listen(non_blocking, opt))) { - Warning("unable to listen on main accept port %d: errno = %d, %s", server.accept_addr.host_order_port(), errno, + if (server->sock.is_ok()) { + if ((res = server->setup_fd_for_listen(non_blocking, opt))) { + Warning("unable to listen on main accept port %d: errno = %d, %s", server->accept_addr.host_order_port(), errno, strerror(errno)); goto Lretry; } } else { Lretry: - if ((res = server.listen(non_blocking, opt))) { - Warning("unable to listen on port %d: %d %d, %s", server.accept_addr.host_order_port(), res, errno, strerror(errno)); + if ((res = server->listen(non_blocking, opt))) { + Warning("unable to listen on port %d: %d %d, %s", server->accept_addr.host_order_port(), res, errno, strerror(errno)); } } @@ -360,7 +362,7 @@ NetAccept::do_blocking_accept(EThread *t) // do-while for accepting all the connections // added by YTS Team, yamsat do { - if ((res = server.accept(&con)) < 0) { + if ((res = server->accept(&con)) < 0) { int seriousness = accept_error_seriousness(res); switch (seriousness) { case 0: @@ -435,7 +437,7 @@ NetAccept::do_blocking_accept(EThread *t) vc->accept_object = this; #ifdef USE_EDGE_TRIGGER // Set the vc as triggered and place it in the read ready queue later in case there is already data on the socket. - if (server.http_accept_filter) { + if (server->http_accept_filter) { vc->read.triggered = 1; } #endif @@ -478,10 +480,10 @@ NetAccept::acceptEvent(int event, void *ep) if ((res = accept_fn(this, e, false)) < 0) { Metrics::Gauge::decrement(net_rsb.accepts_currently_open); /* INKqa11179 */ - Warning("Accept on port %d failed with error no %d", ats_ip_port_host_order(&server.addr), res); + Warning("Accept on port %d failed with error no %d", ats_ip_port_host_order(&server->addr), res); Warning("Traffic Server may be unable to accept more network" "connections on %d", - ats_ip_port_host_order(&server.addr)); + ats_ip_port_host_order(&server->addr)); e->cancel(); delete this; return EVENT_DONE; @@ -508,15 +510,13 @@ NetAccept::acceptFastEvent(int event, void *ep) int additional_accepts = NetHandler::get_additional_accepts(); do { - socklen_t sz = sizeof(con.addr); - UnixSocket sock{-1}; - if (int res{server.sock.accept4(&con.addr.sa, &sz, SOCK_NONBLOCK | SOCK_CLOEXEC)}; res >= 0) { - sock = UnixSocket{res}; + socklen_t sz = sizeof(con.addr); + if (int res{server->sock.accept4(&con.addr.sa, &sz, SOCK_NONBLOCK | SOCK_CLOEXEC)}; res >= 0) { + con.sock = UnixSocket{res}; } - con.sock = sock; std::shared_ptr conn_track_group; - if (likely(sock.is_ok())) { + if (likely(con.sock.is_ok())) { // check for throttle if (check_net_throttle(ACCEPT)) { // close the connection as we are in throttle state @@ -528,13 +528,13 @@ NetAccept::acceptFastEvent(int event, void *ep) con.close(); continue; } - Dbg(dbg_ctl_iocore_net, "accepted a new socket: %d", sock.get_fd()); + Dbg(dbg_ctl_iocore_net, "accepted a new socket: %d", con.sock.get_fd()); Metrics::Counter::increment(net_rsb.tcp_accept); if (opt.send_bufsize > 0) { - if (unlikely(sock.set_sndbuf_size(opt.send_bufsize))) { + if (unlikely(con.sock.set_sndbuf_size(opt.send_bufsize))) { bufsz = ROUNDUP(opt.send_bufsize, 1024); while (bufsz > 0) { - if (!sock.set_sndbuf_size(bufsz)) { + if (!con.sock.set_sndbuf_size(bufsz)) { break; } bufsz -= 1024; @@ -542,10 +542,10 @@ NetAccept::acceptFastEvent(int event, void *ep) } } if (opt.recv_bufsize > 0) { - if (unlikely(sock.set_rcvbuf_size(opt.recv_bufsize))) { + if (unlikely(con.sock.set_rcvbuf_size(opt.recv_bufsize))) { bufsz = ROUNDUP(opt.recv_bufsize, 1024); while (bufsz > 0) { - if (!sock.set_rcvbuf_size(bufsz)) { + if (!con.sock.set_rcvbuf_size(bufsz)) { break; } bufsz -= 1024; @@ -553,7 +553,7 @@ NetAccept::acceptFastEvent(int event, void *ep) } } } else { - res = sock.get_fd(); + res = con.sock.get_fd(); } // check return value from accept() if (res < 0) { @@ -601,7 +601,7 @@ NetAccept::acceptFastEvent(int event, void *ep) #ifdef USE_EDGE_TRIGGER // Set the vc as triggered and place it in the read ready queue later in case there is already data on the socket. - if (server.http_accept_filter) { + if (server->http_accept_filter) { vc->read.triggered = 1; } #endif @@ -626,7 +626,7 @@ NetAccept::acceptFastEvent(int event, void *ep) return EVENT_CONT; Lerror: - server.close(); + server->close(); e->cancel(); Metrics::Gauge::decrement(net_rsb.accepts_currently_open); delete this; @@ -655,7 +655,9 @@ NetAccept::acceptLoopEvent(int event, Event *e) // // -NetAccept::NetAccept(const NetProcessor::AcceptOptions &_opt) : Continuation(nullptr), opt(_opt) {} +NetAccept::NetAccept(const NetProcessor::AcceptOptions &_opt) : Continuation(nullptr), server{std::make_shared()}, opt(_opt) +{ +} // // Stop listening. When the next poll takes place, an error will result. @@ -665,15 +667,14 @@ void NetAccept::cancel() { action_->cancel(); - server.close(); + server->close(); } NetAccept * NetAccept::clone() const { NetAccept *na; - na = new NetAccept(opt); - *na = *this; + na = new NetAccept{*this}; return na; } diff --git a/src/iocore/net/UnixNetProcessor.cc b/src/iocore/net/UnixNetProcessor.cc index c81cc1aa4c1..addbcd27578 100644 --- a/src/iocore/net/UnixNetProcessor.cc +++ b/src/iocore/net/UnixNetProcessor.cc @@ -110,9 +110,9 @@ UnixNetProcessor::accept_internal(Continuation *cont, int fd, AcceptOptions cons ink_assert(0 < opt.local_port && opt.local_port < 65536); accept_ip.network_order_port() = htons(opt.local_port); - na->accept_fn = net_accept; // All callers used this. - na->server.sock = UnixSocket{fd}; - ats_ip_copy(&na->server.accept_addr, &accept_ip); + na->accept_fn = net_accept; // All callers used this. + na->server->sock = UnixSocket{fd}; + ats_ip_copy(&na->server->accept_addr, &accept_ip); if (opt.f_inbound_transparent) { Dbg(dbg_ctl_http_tproxy, "Marked accept server %p on port %d as inbound transparent", na, opt.local_port); @@ -128,7 +128,7 @@ UnixNetProcessor::accept_internal(Continuation *cont, int fd, AcceptOptions cons na->action_ = new NetAcceptAction(); *na->action_ = cont; - na->action_->server = &na->server; + na->action_->server = na->server; if (opt.frequent_accept) { // true if (accept_threads > 0 && listen_per_thread == 0) { diff --git a/src/iocore/net/UnixNetVConnection.cc b/src/iocore/net/UnixNetVConnection.cc index 46b568999bc..17707c61369 100644 --- a/src/iocore/net/UnixNetVConnection.cc +++ b/src/iocore/net/UnixNetVConnection.cc @@ -1174,6 +1174,7 @@ UnixNetVConnection::connectUp(EThread *t, int fd) ink_assert(get_NetHandler(t)->mutex->thread_holding == this_ethread()); int res; UnixSocket sock{fd}; + bool already_connected{sock.is_ok()}; thread = t; if (check_net_throttle(CONNECT)) { @@ -1198,7 +1199,7 @@ UnixNetVConnection::connectUp(EThread *t, int fd) // If this is getting called from the TS API, then we are wiring up a file descriptor // provided by the caller. In that case, we know that the socket is already connected. - if (!sock.is_ok()) { + if (!already_connected) { // Due to multi-threads system, the fd returned from con.open() may exceed the limitation of check_net_throttle(). res = con.open(options); if (res != 0) { @@ -1212,7 +1213,7 @@ UnixNetVConnection::connectUp(EThread *t, int fd) // is only used when setting up the socket. safe_getsockopt(fd, SOL_SOCKET, SO_TYPE, reinterpret_cast(&con.sock_type), &len); sock.set_nonblocking(); - con.sock = sock; + con.sock = std::move(sock); con.is_connected = true; con.is_bound = true; } @@ -1223,7 +1224,7 @@ UnixNetVConnection::connectUp(EThread *t, int fd) goto fail; } - if (!sock.is_ok()) { + if (!already_connected) { res = con.connect(nullptr, options); if (res != 0) { // fast stopIO