From b9779fd8a4ece23783fff59f96b725979b7d340d Mon Sep 17 00:00:00 2001 From: Masakazu Kitajo Date: Mon, 4 Nov 2024 14:30:07 -0700 Subject: [PATCH] Refactor the read and write functions in Unix and SSL NetVC (#11794) --- include/iocore/net/NetEvent.h | 6 +- src/iocore/net/NetHandler.cc | 4 +- src/iocore/net/P_QUICNetVConnection.h | 2 +- src/iocore/net/P_SSLNetVConnection.h | 4 +- src/iocore/net/P_UnixNetVConnection.h | 14 +- src/iocore/net/QUICNetVConnection.cc | 6 +- src/iocore/net/SSLNetVConnection.cc | 16 +- src/iocore/net/UnixNetVConnection.cc | 998 +++++++++++++------------- 8 files changed, 508 insertions(+), 542 deletions(-) diff --git a/include/iocore/net/NetEvent.h b/include/iocore/net/NetEvent.h index 08b2ff8c8a1..35c7dabd222 100644 --- a/include/iocore/net/NetEvent.h +++ b/include/iocore/net/NetEvent.h @@ -55,9 +55,9 @@ class NetEvent public: NetEvent() = default; virtual ~NetEvent() {} - virtual void net_read_io(NetHandler *nh, EThread *lthread) = 0; - virtual void net_write_io(NetHandler *nh, EThread *lthread) = 0; - virtual void free_thread(EThread *t) = 0; + virtual void net_read_io(NetHandler *nh) = 0; + virtual void net_write_io(NetHandler *nh) = 0; + virtual void free_thread(EThread *t) = 0; // since we want this class to be independent from VConnection, Continutaion. There should be // a pure virtual function which connect sub class and NetHandler. diff --git a/src/iocore/net/NetHandler.cc b/src/iocore/net/NetHandler.cc index 208728636ab..0f7d519d2b2 100644 --- a/src/iocore/net/NetHandler.cc +++ b/src/iocore/net/NetHandler.cc @@ -283,7 +283,7 @@ NetHandler::process_ready_list() if (ne->closed) { free_netevent(ne); } else if (ne->read.enabled && ne->read.triggered) { - ne->net_read_io(this, this->thread); + ne->net_read_io(this); } else if (!ne->read.enabled) { read_ready_list.remove(ne); } @@ -293,7 +293,7 @@ NetHandler::process_ready_list() if (ne->closed) { free_netevent(ne); } else if (ne->write.enabled && ne->write.triggered) { - ne->net_write_io(this, this->thread); + ne->net_write_io(this); } else if (!ne->write.enabled) { write_ready_list.remove(ne); } diff --git a/src/iocore/net/P_QUICNetVConnection.h b/src/iocore/net/P_QUICNetVConnection.h index 9797c249846..353d3aed222 100644 --- a/src/iocore/net/P_QUICNetVConnection.h +++ b/src/iocore/net/P_QUICNetVConnection.h @@ -108,7 +108,7 @@ class QUICNetVConnection : public UnixNetVConnection, bool getSSLHandShakeComplete() const override; // NetEvent - virtual void net_read_io(NetHandler *nh, EThread *lthread) override; + virtual void net_read_io(NetHandler *nh) override; // NetVConnection int populate_protocol(std::string_view *results, int n) const override; diff --git a/src/iocore/net/P_SSLNetVConnection.h b/src/iocore/net/P_SSLNetVConnection.h index 9b437a6d207..2babaeb0d69 100644 --- a/src/iocore/net/P_SSLNetVConnection.h +++ b/src/iocore/net/P_SSLNetVConnection.h @@ -140,7 +140,7 @@ class SSLNetVConnection : public UnixNetVConnection, int sslServerHandShakeEvent(int &err); int sslClientHandShakeEvent(int &err); - void net_read_io(NetHandler *nh, EThread *lthread) override; + void net_read_io(NetHandler *nh) override; int64_t load_buffer_and_write(int64_t towrite, MIOBufferAccessor &buf, int64_t &total_written, int &needs) override; void do_io_close(int lerrno = -1) override; @@ -376,7 +376,7 @@ class SSLNetVConnection : public UnixNetVConnection, UnixNetVConnection *_migrateFromSSL(); void _propagateHandShakeBuffer(UnixNetVConnection *target, EThread *t); - int _ssl_read_from_net(EThread *lthread, int64_t &ret); + int _ssl_read_from_net(int64_t &ret); ssl_error_t _ssl_read_buffer(void *buf, int64_t nbytes, int64_t &nread); ssl_error_t _ssl_write_buffer(const void *buf, int64_t nbytes, int64_t &nwritten); ssl_error_t _ssl_connect(); diff --git a/src/iocore/net/P_UnixNetVConnection.h b/src/iocore/net/P_UnixNetVConnection.h index a6ce3ab89ed..5ff97aee16b 100644 --- a/src/iocore/net/P_UnixNetVConnection.h +++ b/src/iocore/net/P_UnixNetVConnection.h @@ -150,8 +150,8 @@ class UnixNetVConnection : public NetVConnection, public NetEvent } // NetEvent - virtual void net_read_io(NetHandler *nh, EThread *lthread) override; - virtual void net_write_io(NetHandler *nh, EThread *lthread) override; + virtual void net_read_io(NetHandler *nh) override; + virtual void net_write_io(NetHandler *nh) override; virtual void free_thread(EThread *t) override; virtual int close() override @@ -195,7 +195,7 @@ class UnixNetVConnection : public NetVConnection, public NetEvent int readSignalAndUpdate(int event); void readReschedule(NetHandler *nh); void writeReschedule(NetHandler *nh); - void netActivity(EThread *lthread); + void netActivity(); /** * If the current object's thread does not match the t argument, create a new * NetVC in the thread t context based on the socket and ssl information in the @@ -234,8 +234,6 @@ class UnixNetVConnection : public NetVConnection, public NetEvent int set_tcp_congestion_control(int side) override; void apply_options() override; - friend void write_to_net_io(NetHandler *, UnixNetVConnection *, EThread *); - // set_context() should be called before calling this member function. void mark_as_tunnel_endpoint() override; @@ -376,9 +374,3 @@ UnixNetVConnection::get_action() const { return &action_; } - -// declarations for local use (within the net module) - -void write_to_net(NetHandler *nh, UnixNetVConnection *vc, EThread *thread); -void write_to_net_io(NetHandler *nh, UnixNetVConnection *vc, EThread *thread); -void net_activity(UnixNetVConnection *vc, EThread *thread); diff --git a/src/iocore/net/QUICNetVConnection.cc b/src/iocore/net/QUICNetVConnection.cc index 2fd225df244..5154c349502 100644 --- a/src/iocore/net/QUICNetVConnection.cc +++ b/src/iocore/net/QUICNetVConnection.cc @@ -401,7 +401,7 @@ QUICNetVConnection::handle_received_packet(UDPPacket *packet) { size_t buf_len{0}; uint8_t *buf = packet->get_entire_chain_buffer(&buf_len); - net_activity(this, this_ethread()); + this->netActivity(); quiche_recv_info recv_info = { &packet->from.sa, static_cast(packet->from.isIp4() ? sizeof(packet->from.sin) : sizeof(packet->from.sin6)), @@ -522,7 +522,7 @@ QUICNetVConnection::is_handshake_completed() const } void -QUICNetVConnection::net_read_io(NetHandler * /* nh ATS_UNUSED */, EThread * /* lthread ATS_UNUSED */) +QUICNetVConnection::net_read_io(NetHandler * /* nh ATS_UNUSED */) { SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread()); this->handleEvent(QUIC_EVENT_PACKET_READ_READY, nullptr); @@ -714,7 +714,7 @@ QUICNetVConnection::_handle_write_ready() segment_size = max_udp_payload_size; } this->_packet_handler->send_packet(this->_udp_con, this->con.addr, udp_payload, segment_size, &send_at_hint); - net_activity(this, this_ethread()); + this->netActivity(); } } diff --git a/src/iocore/net/SSLNetVConnection.cc b/src/iocore/net/SSLNetVConnection.cc index d49c79ec287..88805cf32e6 100644 --- a/src/iocore/net/SSLNetVConnection.cc +++ b/src/iocore/net/SSLNetVConnection.cc @@ -178,7 +178,7 @@ debug_certificate_name(const char *msg, X509_NAME *name) } int -SSLNetVConnection::_ssl_read_from_net(EThread *lthread, int64_t &ret) +SSLNetVConnection::_ssl_read_from_net(int64_t &ret) { NetState *s = &this->read; MIOBufferAccessor &buf = s->vio.buffer; @@ -221,7 +221,7 @@ SSLNetVConnection::_ssl_read_from_net(EThread *lthread, int64_t &ret) bytes_read += nread; if (nread > 0) { buf.writer()->fill(nread); // Tell the buffer, we've used the bytes - this->netActivity(lthread); + this->netActivity(); } break; case SSL_ERROR_WANT_WRITE: @@ -275,7 +275,7 @@ SSLNetVConnection::_ssl_read_from_net(EThread *lthread, int64_t &ret) Dbg(dbg_ctl_ssl, "bytes_read=%" PRId64, bytes_read); s->vio.ndone += bytes_read; - this->netActivity(lthread); + this->netActivity(); ret = bytes_read; @@ -454,7 +454,7 @@ SSLNetVConnection::update_rbio(bool move_to_socket) // changed by YTS Team, yamsat void -SSLNetVConnection::net_read_io(NetHandler *nh, EThread *lthread) +SSLNetVConnection::net_read_io(NetHandler *nh) { int ret; int64_t r = 0; @@ -462,11 +462,11 @@ SSLNetVConnection::net_read_io(NetHandler *nh, EThread *lthread) NetState *s = &this->read; if (HttpProxyPort::TRANSPORT_BLIND_TUNNEL == this->attributes) { - this->super::net_read_io(nh, lthread); + this->super::net_read_io(nh); return; } - MUTEX_TRY_LOCK(lock, s->vio.mutex, lthread); + MUTEX_TRY_LOCK(lock, s->vio.mutex, nh->thread); if (!lock.is_locked()) { readReschedule(nh); return; @@ -475,7 +475,7 @@ SSLNetVConnection::net_read_io(NetHandler *nh, EThread *lthread) // The closed flag should be stable once we get the s->vio.mutex in that case // (the global session pool mutex). if (this->closed) { - this->super::net_read_io(nh, lthread); + this->super::net_read_io(nh); return; } // If the key renegotiation failed it's over, just signal the error and finish. @@ -614,7 +614,7 @@ SSLNetVConnection::net_read_io(NetHandler *nh, EThread *lthread) // this comment if you know int ssl_read_errno = 0; do { - ret = this->_ssl_read_from_net(lthread, r); + ret = this->_ssl_read_from_net(r); if (ret == SSL_READ_READY || ret == SSL_READ_ERROR_NONE) { bytes += r; } diff --git a/src/iocore/net/UnixNetVConnection.cc b/src/iocore/net/UnixNetVConnection.cc index 46b568999bc..6bd0331d544 100644 --- a/src/iocore/net/UnixNetVConnection.cc +++ b/src/iocore/net/UnixNetVConnection.cc @@ -75,18 +75,6 @@ write_reschedule(NetHandler *nh, UnixNetVConnection *vc) } } -void -net_activity(UnixNetVConnection *vc, EThread *thread) -{ - Dbg(dbg_ctl_socket, "net_activity updating inactivity %" PRId64 ", NetVC=%p", vc->inactivity_timeout_in, vc); - (void)thread; - if (vc->inactivity_timeout_in) { - vc->next_inactivity_timeout_at = ink_get_hrtime() + vc->inactivity_timeout_in; - } else { - vc->next_inactivity_timeout_at = 0; - } -} - // // Signal an event // @@ -188,394 +176,12 @@ read_signal_error(NetHandler *nh, UnixNetVConnection *vc, int lerrno) vc->lerrno = lerrno; return read_signal_done(VC_EVENT_ERROR, nh, vc); } - -static inline int -write_signal_error(NetHandler *nh, UnixNetVConnection *vc, int lerrno) -{ - vc->lerrno = lerrno; - return write_signal_done(VC_EVENT_ERROR, nh, vc); -} - -// Read the data for a UnixNetVConnection. -// Rescheduling the UnixNetVConnection by moving the VC -// onto or off of the ready_list. -// Had to wrap this function with net_read_io for SSL. -static void -read_from_net(NetHandler *nh, UnixNetVConnection *vc, EThread *thread) -{ - NetState *s = &vc->read; - int64_t r = 0; - - MUTEX_TRY_LOCK(lock, s->vio.mutex, thread); - - if (!lock.is_locked()) { - read_reschedule(nh, vc); - return; - } - - // It is possible that the closed flag got set from HttpSessionManager in the - // global session pool case. If so, the closed flag should be stable once we get the - // s->vio.mutex (the global session pool mutex). - if (vc->closed) { - vc->nh->free_netevent(vc); - return; - } - // if it is not enabled. - if (!s->enabled || s->vio.op != VIO::READ || s->vio.is_disabled()) { - read_disable(nh, vc); - return; - } - - MIOBufferAccessor &buf = s->vio.buffer; - ink_assert(buf.writer()); - - // if there is nothing to do, disable connection - int64_t ntodo = s->vio.ntodo(); - if (ntodo <= 0) { - read_disable(nh, vc); - return; - } - int64_t toread = buf.writer()->write_avail(); - if (toread > ntodo) { - toread = ntodo; - } - - // read data - int64_t rattempted = 0, total_read = 0; - unsigned niov = 0; - IOVec tiovec[NET_MAX_IOV]; - if (toread) { - IOBufferBlock *b = buf.writer()->first_write_block(); - do { - niov = 0; - rattempted = 0; - while (b && niov < NET_MAX_IOV) { - int64_t a = b->write_avail(); - if (a > 0) { - tiovec[niov].iov_base = b->_end; - int64_t togo = toread - total_read - rattempted; - if (a > togo) { - a = togo; - } - tiovec[niov].iov_len = a; - rattempted += a; - niov++; - if (a >= togo) { - break; - } - } - b = b->next.get(); - } - - ink_assert(niov > 0); - ink_assert(niov <= countof(tiovec)); - struct msghdr msg; - - ink_zero(msg); - msg.msg_name = const_cast(vc->get_remote_addr()); - msg.msg_namelen = ats_ip_size(vc->get_remote_addr()); - msg.msg_iov = &tiovec[0]; - msg.msg_iovlen = niov; - r = vc->con.sock.recvmsg(&msg, 0); - - Metrics::Counter::increment(net_rsb.calls_to_read); - - total_read += rattempted; - } while (rattempted && r == rattempted && total_read < toread); - - // if we have already moved some bytes successfully, summarize in r - if (total_read != rattempted) { - if (r <= 0) { - r = total_read - rattempted; - } else { - r = total_read - rattempted + r; - } - } - // check for errors - if (r <= 0) { - if (r == -EAGAIN || r == -ENOTCONN) { - Metrics::Counter::increment(net_rsb.calls_to_read_nodata); - vc->read.triggered = 0; - nh->read_ready_list.remove(vc); - return; - } - - if (!r || r == -ECONNRESET) { - vc->read.triggered = 0; - nh->read_ready_list.remove(vc); - read_signal_done(VC_EVENT_EOS, nh, vc); - return; - } - vc->read.triggered = 0; - read_signal_error(nh, vc, static_cast(-r)); - return; - } - Metrics::Counter::increment(net_rsb.read_bytes, r); - Metrics::Counter::increment(net_rsb.read_bytes_count); - - // Add data to buffer and signal continuation. - buf.writer()->fill(r); -#ifdef DEBUG - if (buf.writer()->write_avail() <= 0) { - Dbg(dbg_ctl_iocore_net, "read_from_net, read buffer full"); - } -#endif - s->vio.ndone += r; - net_activity(vc, thread); - } else { - r = 0; - } - - // Signal read ready, check if user is not done - if (r) { - // If there are no more bytes to read, signal read complete - ink_assert(ntodo >= 0); - if (s->vio.ntodo() <= 0) { - read_signal_done(VC_EVENT_READ_COMPLETE, nh, vc); - Dbg(dbg_ctl_iocore_net, "read_from_net, read finished - signal done"); - return; - } else { - if (read_signal_and_update(VC_EVENT_READ_READY, vc) != EVENT_CONT) { - return; - } - - // change of lock... don't look at shared variables! - if (lock.get_mutex() != s->vio.mutex.get()) { - read_reschedule(nh, vc); - return; - } - } - } - - // If here are is no more room, or nothing to do, disable the connection - if (s->vio.ntodo() <= 0 || !s->enabled || !buf.writer()->write_avail()) { - read_disable(nh, vc); - return; - } - - read_reschedule(nh, vc); -} - -// -// Write the data for a UnixNetVConnection. -// Rescheduling the UnixNetVConnection when necessary. -// -void -write_to_net(NetHandler *nh, UnixNetVConnection *vc, EThread *thread) -{ - Metrics::Counter::increment(net_rsb.calls_to_writetonet); - write_to_net_io(nh, vc, thread); -} - -void -write_to_net_io(NetHandler *nh, UnixNetVConnection *vc, EThread *thread) -{ - NetState *s = &vc->write; - Continuation *c = vc->write.vio.cont; - - MUTEX_TRY_LOCK(lock, s->vio.mutex, thread); - - if (!lock.is_locked() || lock.get_mutex() != s->vio.mutex.get()) { - write_reschedule(nh, vc); - return; - } - - if (vc->has_error()) { - vc->lerrno = vc->error; - write_signal_and_update(VC_EVENT_ERROR, vc); - return; - } - - // This function will always return true unless - // vc is an SSLNetVConnection. - if (!vc->getSSLHandShakeComplete()) { - if (vc->trackFirstHandshake()) { - // Eat the first write-ready. Until the TLS handshake is complete, - // we should still be under the connect timeout and shouldn't bother - // the state machine until the TLS handshake is complete - vc->write.triggered = 0; - nh->write_ready_list.remove(vc); - } - - int err, ret; - - if (vc->get_context() == NET_VCONNECTION_OUT) { - ret = vc->sslStartHandShake(SSL_EVENT_CLIENT, err); - } else { - ret = vc->sslStartHandShake(SSL_EVENT_SERVER, err); - } - - if (ret == EVENT_ERROR) { - vc->write.triggered = 0; - write_signal_error(nh, vc, err); - } else if (ret == SSL_HANDSHAKE_WANT_READ || ret == SSL_HANDSHAKE_WANT_ACCEPT) { - vc->read.triggered = 0; - nh->read_ready_list.remove(vc); - read_reschedule(nh, vc); - } else if (ret == SSL_HANDSHAKE_WANT_CONNECT || ret == SSL_HANDSHAKE_WANT_WRITE) { - vc->write.triggered = 0; - nh->write_ready_list.remove(vc); - write_reschedule(nh, vc); - } else if (ret == EVENT_DONE) { - vc->write.triggered = 1; - if (vc->write.enabled) { - nh->write_ready_list.in_or_enqueue(vc); - } - // If this was driven by a zero length read, signal complete when - // the handshake is complete. Otherwise set up for continuing read - // operations. - if (s->vio.ntodo() <= 0) { - vc->readSignalDone(VC_EVENT_WRITE_COMPLETE, nh); - } - } else { - write_reschedule(nh, vc); - } - - return; - } - - // If it is not enabled,add to WaitList. - if (!s->enabled || s->vio.op != VIO::WRITE) { - write_disable(nh, vc); - return; - } - - // If there is nothing to do, disable - int64_t ntodo = s->vio.ntodo(); - if (ntodo <= 0) { - write_disable(nh, vc); - return; - } - - MIOBufferAccessor &buf = s->vio.buffer; - ink_assert(buf.writer()); - - // Calculate the amount to write. - int64_t towrite = buf.reader()->read_avail(); - if (towrite > ntodo) { - towrite = ntodo; - } - - int signalled = 0; - - // signal write ready to allow user to fill the buffer - if (towrite != ntodo && !buf.writer()->high_water()) { - if (write_signal_and_update(VC_EVENT_WRITE_READY, vc) != EVENT_CONT) { - return; - } else if (c != s->vio.cont) { /* The write vio was updated in the handler */ - write_reschedule(nh, vc); - return; - } - - ntodo = s->vio.ntodo(); - if (ntodo <= 0) { - write_disable(nh, vc); - return; - } - - signalled = 1; - - // Recalculate amount to write - towrite = buf.reader()->read_avail(); - if (towrite > ntodo) { - towrite = ntodo; - } - } - - // if there is nothing to do, disable - ink_assert(towrite >= 0); - if (towrite <= 0) { - write_disable(nh, vc); - return; - } - - int needs = 0; - int64_t total_written = 0; - int64_t r = vc->load_buffer_and_write(towrite, buf, total_written, needs); - - if (total_written > 0) { - Metrics::Counter::increment(net_rsb.write_bytes, total_written); - Metrics::Counter::increment(net_rsb.write_bytes_count); - s->vio.ndone += total_written; - net_activity(vc, thread); - } - - // A write of 0 makes no sense since we tried to write more than 0. - ink_assert(r != 0); - // Either we wrote something or got an error. - // check for errors - if (r < 0) { // if the socket was not ready, add to WaitList - if (r == -EAGAIN || r == -ENOTCONN || -r == EINPROGRESS) { - Metrics::Counter::increment(net_rsb.calls_to_write_nodata); - if ((needs & EVENTIO_WRITE) == EVENTIO_WRITE) { - vc->write.triggered = 0; - nh->write_ready_list.remove(vc); - write_reschedule(nh, vc); - } - - if ((needs & EVENTIO_READ) == EVENTIO_READ) { - vc->read.triggered = 0; - nh->read_ready_list.remove(vc); - read_reschedule(nh, vc); - } - - return; - } - - vc->write.triggered = 0; - write_signal_error(nh, vc, static_cast(-r)); - return; - } else { // Wrote data. Finished without error - int wbe_event = vc->write_buffer_empty_event; // save so we can clear if needed. - - // If the empty write buffer trap is set, clear it. - if (!(buf.reader()->is_read_avail_more_than(0))) { - vc->write_buffer_empty_event = 0; - } - - // If there are no more bytes to write, signal write complete, - ink_assert(ntodo >= 0); - if (s->vio.ntodo() <= 0) { - write_signal_done(VC_EVENT_WRITE_COMPLETE, nh, vc); - return; - } - - int e = 0; - if (!signalled || (s->vio.ntodo() > 0 && !buf.writer()->high_water())) { - e = VC_EVENT_WRITE_READY; - } else if (wbe_event != vc->write_buffer_empty_event) { - // @a signalled means we won't send an event, and the event values differing means we - // had a write buffer trap and cleared it, so we need to send it now. - e = wbe_event; - } - - if (e) { - if (write_signal_and_update(e, vc) != EVENT_CONT) { - return; - } - - // change of lock... don't look at shared variables! - if (lock.get_mutex() != s->vio.mutex.get()) { - write_reschedule(nh, vc); - return; - } - } - - if ((needs & EVENTIO_READ) == EVENTIO_READ) { - read_reschedule(nh, vc); - } - - if (!(buf.reader()->is_read_avail_more_than(0))) { - write_disable(nh, vc); - return; - } - - if ((needs & EVENTIO_WRITE) == EVENTIO_WRITE) { - write_reschedule(nh, vc); - } - - return; - } + +static inline int +write_signal_error(NetHandler *nh, UnixNetVConnection *vc, int lerrno) +{ + vc->lerrno = lerrno; + return write_signal_done(VC_EVENT_ERROR, nh, vc); } bool @@ -739,141 +345,504 @@ UnixNetVConnection::do_io_shutdown(ShutdownHowTo_t howto) } // -// Function used to reenable the VC for reading or -// writing. +// Function used to reenable the VC for reading or +// writing. +// +void +UnixNetVConnection::reenable(VIO *vio) +{ + if (STATE_FROM_VIO(vio)->enabled) { + return; + } + set_enabled(vio); + if (!thread) { + return; + } + EThread *t = vio->mutex->thread_holding; + ink_assert(t == this_ethread()); + ink_release_assert(!closed); + if (nh->mutex->thread_holding == t) { + if (vio == &read.vio) { + ep.modify(EVENTIO_READ); + ep.refresh(EVENTIO_READ); + if (read.triggered) { + nh->read_ready_list.in_or_enqueue(this); + } else { + nh->read_ready_list.remove(this); + } + } else { + ep.modify(EVENTIO_WRITE); + ep.refresh(EVENTIO_WRITE); + if (write.triggered) { + nh->write_ready_list.in_or_enqueue(this); + } else { + nh->write_ready_list.remove(this); + } + } + } else { + MUTEX_TRY_LOCK(lock, nh->mutex, t); + if (!lock.is_locked()) { + if (vio == &read.vio) { + int isin = ink_atomic_swap(&read.in_enabled_list, 1); + if (!isin) { + nh->read_enable_list.push(this); + } + } else { + int isin = ink_atomic_swap(&write.in_enabled_list, 1); + if (!isin) { + nh->write_enable_list.push(this); + } + } + if (likely(nh->thread)) { + nh->thread->tail_cb->signalActivity(); + } else if (nh->trigger_event) { + nh->trigger_event->ethread->tail_cb->signalActivity(); + } + } else { + if (vio == &read.vio) { + ep.modify(EVENTIO_READ); + ep.refresh(EVENTIO_READ); + if (read.triggered) { + nh->read_ready_list.in_or_enqueue(this); + } else { + nh->read_ready_list.remove(this); + } + } else { + ep.modify(EVENTIO_WRITE); + ep.refresh(EVENTIO_WRITE); + if (write.triggered) { + nh->write_ready_list.in_or_enqueue(this); + } else { + nh->write_ready_list.remove(this); + } + } + } + } +} + +void +UnixNetVConnection::reenable_re(VIO *vio) +{ + if (!thread) { + return; + } + EThread *t = vio->mutex->thread_holding; + ink_assert(t == this_ethread()); + if (nh->mutex->thread_holding == t) { + set_enabled(vio); + if (vio == &read.vio) { + ep.modify(EVENTIO_READ); + ep.refresh(EVENTIO_READ); + if (read.triggered) { + net_read_io(nh); + } else { + nh->read_ready_list.remove(this); + } + } else { + ep.modify(EVENTIO_WRITE); + ep.refresh(EVENTIO_WRITE); + if (write.triggered) { + this->net_write_io(nh); + } else { + nh->write_ready_list.remove(this); + } + } + } else { + reenable(vio); + } +} + +UnixNetVConnection::UnixNetVConnection() +{ + SET_HANDLER(&UnixNetVConnection::startEvent); +} + +// Private methods + +void +UnixNetVConnection::set_enabled(VIO *vio) +{ + ink_assert(vio->mutex->thread_holding == this_ethread() && thread); + ink_release_assert(!closed); + STATE_FROM_VIO(vio)->enabled = 1; + if (!next_inactivity_timeout_at && inactivity_timeout_in) { + next_inactivity_timeout_at = ink_get_hrtime() + inactivity_timeout_in; + } +} + +// Read the data for a UnixNetVConnection. +// Rescheduling the UnixNetVConnection by moving the VC +// onto or off of the ready_list. +void +UnixNetVConnection::net_read_io(NetHandler *nh) +{ + NetState *s = &this->read; + int64_t r = 0; + + MUTEX_TRY_LOCK(lock, s->vio.mutex, thread); + + if (!lock.is_locked()) { + read_reschedule(nh, this); + return; + } + + // It is possible that the closed flag got set from HttpSessionManager in the + // global session pool case. If so, the closed flag should be stable once we get the + // s->vio.mutex (the global session pool mutex). + if (this->closed) { + this->nh->free_netevent(this); + return; + } + // if it is not enabled. + if (!s->enabled || s->vio.op != VIO::READ || s->vio.is_disabled()) { + read_disable(nh, this); + return; + } + + MIOBufferAccessor &buf = s->vio.buffer; + ink_assert(buf.writer()); + + // if there is nothing to do, disable connection + int64_t ntodo = s->vio.ntodo(); + if (ntodo <= 0) { + read_disable(nh, this); + return; + } + int64_t toread = buf.writer()->write_avail(); + if (toread > ntodo) { + toread = ntodo; + } + + // read data + int64_t rattempted = 0, total_read = 0; + unsigned niov = 0; + IOVec tiovec[NET_MAX_IOV]; + if (toread) { + IOBufferBlock *b = buf.writer()->first_write_block(); + do { + niov = 0; + rattempted = 0; + while (b && niov < NET_MAX_IOV) { + int64_t a = b->write_avail(); + if (a > 0) { + tiovec[niov].iov_base = b->_end; + int64_t togo = toread - total_read - rattempted; + if (a > togo) { + a = togo; + } + tiovec[niov].iov_len = a; + rattempted += a; + niov++; + if (a >= togo) { + break; + } + } + b = b->next.get(); + } + + ink_assert(niov > 0); + ink_assert(niov <= countof(tiovec)); + struct msghdr msg; + + ink_zero(msg); + msg.msg_name = const_cast(this->get_remote_addr()); + msg.msg_namelen = ats_ip_size(this->get_remote_addr()); + msg.msg_iov = &tiovec[0]; + msg.msg_iovlen = niov; + r = this->con.sock.recvmsg(&msg, 0); + + Metrics::Counter::increment(net_rsb.calls_to_read); + + total_read += rattempted; + } while (rattempted && r == rattempted && total_read < toread); + + // if we have already moved some bytes successfully, summarize in r + if (total_read != rattempted) { + if (r <= 0) { + r = total_read - rattempted; + } else { + r = total_read - rattempted + r; + } + } + // check for errors + if (r <= 0) { + if (r == -EAGAIN || r == -ENOTCONN) { + Metrics::Counter::increment(net_rsb.calls_to_read_nodata); + this->read.triggered = 0; + nh->read_ready_list.remove(this); + return; + } + + if (!r || r == -ECONNRESET) { + this->read.triggered = 0; + nh->read_ready_list.remove(this); + read_signal_done(VC_EVENT_EOS, nh, this); + return; + } + this->read.triggered = 0; + read_signal_error(nh, this, static_cast(-r)); + return; + } + Metrics::Counter::increment(net_rsb.read_bytes, r); + Metrics::Counter::increment(net_rsb.read_bytes_count); + + // Add data to buffer and signal continuation. + buf.writer()->fill(r); +#ifdef DEBUG + if (buf.writer()->write_avail() <= 0) { + Dbg(dbg_ctl_iocore_net, "read_from_net, read buffer full"); + } +#endif + s->vio.ndone += r; + this->netActivity(); + } else { + r = 0; + } + + // Signal read ready, check if user is not done + if (r) { + // If there are no more bytes to read, signal read complete + ink_assert(ntodo >= 0); + if (s->vio.ntodo() <= 0) { + read_signal_done(VC_EVENT_READ_COMPLETE, nh, this); + Dbg(dbg_ctl_iocore_net, "read_from_net, read finished - signal done"); + return; + } else { + if (read_signal_and_update(VC_EVENT_READ_READY, this) != EVENT_CONT) { + return; + } + + // change of lock... don't look at shared variables! + if (lock.get_mutex() != s->vio.mutex.get()) { + read_reschedule(nh, this); + return; + } + } + } + + // If here are is no more room, or nothing to do, disable the connection + if (s->vio.ntodo() <= 0 || !s->enabled || !buf.writer()->write_avail()) { + read_disable(nh, this); + return; + } + + read_reschedule(nh, this); +} + +// +// Write the data for a UnixNetVConnection. +// Rescheduling the UnixNetVConnection when necessary. // void -UnixNetVConnection::reenable(VIO *vio) +UnixNetVConnection::net_write_io(NetHandler *nh) { - if (STATE_FROM_VIO(vio)->enabled) { + Metrics::Counter::increment(net_rsb.calls_to_writetonet); + NetState *s = &this->write; + Continuation *c = this->write.vio.cont; + + MUTEX_TRY_LOCK(lock, s->vio.mutex, thread); + + if (!lock.is_locked() || lock.get_mutex() != s->vio.mutex.get()) { + write_reschedule(nh, this); return; } - set_enabled(vio); - if (!thread) { + + if (this->has_error()) { + this->lerrno = this->error; + write_signal_and_update(VC_EVENT_ERROR, this); return; } - EThread *t = vio->mutex->thread_holding; - ink_assert(t == this_ethread()); - ink_release_assert(!closed); - if (nh->mutex->thread_holding == t) { - if (vio == &read.vio) { - ep.modify(EVENTIO_READ); - ep.refresh(EVENTIO_READ); - if (read.triggered) { - nh->read_ready_list.in_or_enqueue(this); - } else { - nh->read_ready_list.remove(this); - } + + // This function will always return true unless + // this vc is an SSLNetVConnection. + if (!this->getSSLHandShakeComplete()) { + if (this->trackFirstHandshake()) { + // Eat the first write-ready. Until the TLS handshake is complete, + // we should still be under the connect timeout and shouldn't bother + // the state machine until the TLS handshake is complete + this->write.triggered = 0; + nh->write_ready_list.remove(this); + } + + int err, ret; + + if (this->get_context() == NET_VCONNECTION_OUT) { + ret = this->sslStartHandShake(SSL_EVENT_CLIENT, err); } else { - ep.modify(EVENTIO_WRITE); - ep.refresh(EVENTIO_WRITE); - if (write.triggered) { - nh->write_ready_list.in_or_enqueue(this); - } else { - nh->write_ready_list.remove(this); - } + ret = this->sslStartHandShake(SSL_EVENT_SERVER, err); } - } else { - MUTEX_TRY_LOCK(lock, nh->mutex, t); - if (!lock.is_locked()) { - if (vio == &read.vio) { - int isin = ink_atomic_swap(&read.in_enabled_list, 1); - if (!isin) { - nh->read_enable_list.push(this); - } - } else { - int isin = ink_atomic_swap(&write.in_enabled_list, 1); - if (!isin) { - nh->write_enable_list.push(this); - } + + if (ret == EVENT_ERROR) { + this->write.triggered = 0; + write_signal_error(nh, this, err); + } else if (ret == SSL_HANDSHAKE_WANT_READ || ret == SSL_HANDSHAKE_WANT_ACCEPT) { + this->read.triggered = 0; + nh->read_ready_list.remove(this); + read_reschedule(nh, this); + } else if (ret == SSL_HANDSHAKE_WANT_CONNECT || ret == SSL_HANDSHAKE_WANT_WRITE) { + this->write.triggered = 0; + nh->write_ready_list.remove(this); + write_reschedule(nh, this); + } else if (ret == EVENT_DONE) { + this->write.triggered = 1; + if (this->write.enabled) { + nh->write_ready_list.in_or_enqueue(this); } - if (likely(nh->thread)) { - nh->thread->tail_cb->signalActivity(); - } else if (nh->trigger_event) { - nh->trigger_event->ethread->tail_cb->signalActivity(); + // If this was driven by a zero length read, signal complete when + // the handshake is complete. Otherwise set up for continuing read + // operations. + if (s->vio.ntodo() <= 0) { + this->readSignalDone(VC_EVENT_WRITE_COMPLETE, nh); } } else { - if (vio == &read.vio) { - ep.modify(EVENTIO_READ); - ep.refresh(EVENTIO_READ); - if (read.triggered) { - nh->read_ready_list.in_or_enqueue(this); - } else { - nh->read_ready_list.remove(this); - } - } else { - ep.modify(EVENTIO_WRITE); - ep.refresh(EVENTIO_WRITE); - if (write.triggered) { - nh->write_ready_list.in_or_enqueue(this); - } else { - nh->write_ready_list.remove(this); - } - } + write_reschedule(nh, this); } + + return; } -} -void -UnixNetVConnection::reenable_re(VIO *vio) -{ - if (!thread) { + // If it is not enabled,add to WaitList. + if (!s->enabled || s->vio.op != VIO::WRITE) { + write_disable(nh, this); return; } - EThread *t = vio->mutex->thread_holding; - ink_assert(t == this_ethread()); - if (nh->mutex->thread_holding == t) { - set_enabled(vio); - if (vio == &read.vio) { - ep.modify(EVENTIO_READ); - ep.refresh(EVENTIO_READ); - if (read.triggered) { - net_read_io(nh, t); - } else { - nh->read_ready_list.remove(this); - } - } else { - ep.modify(EVENTIO_WRITE); - ep.refresh(EVENTIO_WRITE); - if (write.triggered) { - write_to_net(nh, this, t); - } else { - nh->write_ready_list.remove(this); - } + + // If there is nothing to do, disable + int64_t ntodo = s->vio.ntodo(); + if (ntodo <= 0) { + write_disable(nh, this); + return; + } + + MIOBufferAccessor &buf = s->vio.buffer; + ink_assert(buf.writer()); + + // Calculate the amount to write. + int64_t towrite = buf.reader()->read_avail(); + if (towrite > ntodo) { + towrite = ntodo; + } + + int signalled = 0; + + // signal write ready to allow user to fill the buffer + if (towrite != ntodo && !buf.writer()->high_water()) { + if (write_signal_and_update(VC_EVENT_WRITE_READY, this) != EVENT_CONT) { + return; + } else if (c != s->vio.cont) { /* The write vio was updated in the handler */ + write_reschedule(nh, this); + return; + } + + ntodo = s->vio.ntodo(); + if (ntodo <= 0) { + write_disable(nh, this); + return; + } + + signalled = 1; + + // Recalculate amount to write + towrite = buf.reader()->read_avail(); + if (towrite > ntodo) { + towrite = ntodo; } - } else { - reenable(vio); } -} -UnixNetVConnection::UnixNetVConnection() -{ - SET_HANDLER(&UnixNetVConnection::startEvent); -} + // if there is nothing to do, disable + ink_assert(towrite >= 0); + if (towrite <= 0) { + write_disable(nh, this); + return; + } -// Private methods + int needs = 0; + int64_t total_written = 0; + int64_t r = this->load_buffer_and_write(towrite, buf, total_written, needs); -void -UnixNetVConnection::set_enabled(VIO *vio) -{ - ink_assert(vio->mutex->thread_holding == this_ethread() && thread); - ink_release_assert(!closed); - STATE_FROM_VIO(vio)->enabled = 1; - if (!next_inactivity_timeout_at && inactivity_timeout_in) { - next_inactivity_timeout_at = ink_get_hrtime() + inactivity_timeout_in; + if (total_written > 0) { + Metrics::Counter::increment(net_rsb.write_bytes, total_written); + Metrics::Counter::increment(net_rsb.write_bytes_count); + s->vio.ndone += total_written; + this->netActivity(); } -} -void -UnixNetVConnection::net_read_io(NetHandler *nh, EThread *lthread) -{ - read_from_net(nh, this, lthread); -} + // A write of 0 makes no sense since we tried to write more than 0. + ink_assert(r != 0); + // Either we wrote something or got an error. + // check for errors + if (r < 0) { // if the socket was not ready, add to WaitList + if (r == -EAGAIN || r == -ENOTCONN || -r == EINPROGRESS) { + Metrics::Counter::increment(net_rsb.calls_to_write_nodata); + if ((needs & EVENTIO_WRITE) == EVENTIO_WRITE) { + this->write.triggered = 0; + nh->write_ready_list.remove(this); + write_reschedule(nh, this); + } -void -UnixNetVConnection::net_write_io(NetHandler *nh, EThread *lthread) -{ - write_to_net(nh, this, lthread); + if ((needs & EVENTIO_READ) == EVENTIO_READ) { + this->read.triggered = 0; + nh->read_ready_list.remove(this); + read_reschedule(nh, this); + } + + return; + } + + this->write.triggered = 0; + write_signal_error(nh, this, static_cast(-r)); + return; + } else { // Wrote data. Finished without error + int wbe_event = this->write_buffer_empty_event; // save so we can clear if needed. + + // If the empty write buffer trap is set, clear it. + if (!(buf.reader()->is_read_avail_more_than(0))) { + this->write_buffer_empty_event = 0; + } + + // If there are no more bytes to write, signal write complete, + ink_assert(ntodo >= 0); + if (s->vio.ntodo() <= 0) { + write_signal_done(VC_EVENT_WRITE_COMPLETE, nh, this); + return; + } + + int e = 0; + if (!signalled || (s->vio.ntodo() > 0 && !buf.writer()->high_water())) { + e = VC_EVENT_WRITE_READY; + } else if (wbe_event != this->write_buffer_empty_event) { + // @a signalled means we won't send an event, and the event values differing means we + // had a write buffer trap and cleared it, so we need to send it now. + e = wbe_event; + } + + if (e) { + if (write_signal_and_update(e, this) != EVENT_CONT) { + return; + } + + // change of lock... don't look at shared variables! + if (lock.get_mutex() != s->vio.mutex.get()) { + write_reschedule(nh, this); + return; + } + } + + if ((needs & EVENTIO_READ) == EVENTIO_READ) { + read_reschedule(nh, this); + } + + if (!(buf.reader()->is_read_avail_more_than(0))) { + write_disable(nh, this); + return; + } + + if ((needs & EVENTIO_WRITE) == EVENTIO_WRITE) { + write_reschedule(nh, this); + } + + return; + } } // This code was pulled out of write_to_net so @@ -1005,9 +974,14 @@ UnixNetVConnection::writeReschedule(NetHandler *nh) } void -UnixNetVConnection::netActivity(EThread *lthread) +UnixNetVConnection::netActivity() { - net_activity(this, lthread); + Dbg(dbg_ctl_socket, "net_activity updating inactivity %" PRId64 ", NetVC=%p", this->inactivity_timeout_in, this); + if (this->inactivity_timeout_in) { + this->next_inactivity_timeout_at = ink_get_hrtime() + this->inactivity_timeout_in; + } else { + this->next_inactivity_timeout_at = 0; + } } int