From 8d9c9477611a2eb355a4b1cc73117870f0fbd220 Mon Sep 17 00:00:00 2001 From: Nisarg Jhaveri Date: Wed, 27 Mar 2024 22:13:31 +0530 Subject: [PATCH] Add TCP read timeout, reliably stop forwarding by interrupting the threads (#90) - Add read timeout for the TCP connection to help detect wifi disconnection or other failures - When USB/TCP error is detected, interrupt the other thread to stop waiting on the read/write operation The interruption is required, especially for the usb read/right operations as it currently does not support poll/select. Fixes #26. --- .../package/aawg/src/proxyHandler.cpp | 63 +++++++++++++++++-- .../package/aawg/src/proxyHandler.h | 4 ++ 2 files changed, 63 insertions(+), 4 deletions(-) diff --git a/aa_wireless_dongle/package/aawg/src/proxyHandler.cpp b/aa_wireless_dongle/package/aawg/src/proxyHandler.cpp index 9465c5a..e7d494e 100644 --- a/aa_wireless_dongle/package/aawg/src/proxyHandler.cpp +++ b/aa_wireless_dongle/package/aawg/src/proxyHandler.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -15,6 +16,10 @@ #include "bluetoothHandler.h" #include "proxyHandler.h" +void empty_signal_handler(int signal) { + // Empty. We don't want to do anything but interrupt the thread. +} + ssize_t AAWProxy::readFully(int fd, unsigned char *buffer, size_t nbyte) { size_t remaining_bytes = nbyte; while (remaining_bytes > 0) { @@ -89,7 +94,9 @@ void AAWProxy::forward(ProxyDirection direction, std::atomic& should_exit) } while (!should_exit) { + // Read ssize_t len = read_message ? readMessage(read_fd, buffer, buffer_len) : read(read_fd, buffer, buffer_len); + if (len <= 0) { // Start logging read/write details if there is an error. m_log_communication = true; @@ -97,6 +104,7 @@ void AAWProxy::forward(ProxyDirection direction, std::atomic& should_exit) if (m_log_communication) { Logger::instance()->info("%d bytes read from %s\n", len, read_name.c_str()); } + if (len < 0) { Logger::instance()->info("Read from %s failed: %s\n", read_name.c_str(), strerror(errno)); break; @@ -104,8 +112,13 @@ void AAWProxy::forward(ProxyDirection direction, std::atomic& should_exit) else if (len == 0) { break; } + else if (should_exit) { + break; + } + // Write ssize_t wlen = write(write_fd, buffer, len); + if (wlen <= 0) { // Start logging read/write details if there is an error. m_log_communication = true; @@ -113,13 +126,30 @@ void AAWProxy::forward(ProxyDirection direction, std::atomic& should_exit) if (m_log_communication) { Logger::instance()->info("%d bytes written to %s\n", wlen, write_name.c_str()); } + if (wlen < 0) { Logger::instance()->info("Write to %s failed: %s\n", write_name.c_str(), strerror(errno)); break; } + else if (should_exit) { + break; + } } + stopForwarding(should_exit); +} + +void AAWProxy::stopForwarding(std::atomic& should_exit) { + Logger::instance()->info("Interrupting threads to stop forwarding\n"); should_exit = true; + + if (m_usb_tcp_thread) { + pthread_kill(m_usb_tcp_thread->native_handle(), SIGUSR1); + } + + if (m_tcp_usb_thread) { + pthread_kill(m_tcp_usb_thread->native_handle(), SIGUSR1); + } } void AAWProxy::handleClient(int server_sock) { @@ -150,13 +180,38 @@ void AAWProxy::handleClient(int server_sock) { return; } + // Set timeout on the TCP socket + struct timeval tv = { + .tv_sec = 10, + .tv_usec = 0, + }; + + if (setsockopt(m_tcp_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) { + Logger::instance()->info("setsockopt failed: %s\n", strerror(errno)); + return; + } + + // Setup signal handler + struct sigaction sa; + sa.sa_handler = empty_signal_handler; + sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + if (sigaction(SIGUSR1, &sa, NULL)) { + Logger::instance()->info("Adding signal handler failed: %s\n", strerror(errno)); + } + Logger::instance()->info("Forwarding data between TCP and USB\n"); std::atomic should_exit = false; - std::thread usb_tcp(&AAWProxy::forward, this, ProxyDirection::USB_to_TCP, std::ref(should_exit)); - std::thread tcp_usb(&AAWProxy::forward, this, ProxyDirection::TCP_to_USB, std::ref(should_exit)); + m_usb_tcp_thread = std::thread(&AAWProxy::forward, this, ProxyDirection::USB_to_TCP, std::ref(should_exit)); + m_tcp_usb_thread = std::thread(&AAWProxy::forward, this, ProxyDirection::TCP_to_USB, std::ref(should_exit)); + + m_usb_tcp_thread->join(); + m_usb_tcp_thread = std::nullopt; + + m_tcp_usb_thread->join(); + m_tcp_usb_thread = std::nullopt; - usb_tcp.join(); - tcp_usb.join(); + signal(SIGUSR1, SIG_DFL); close(m_usb_fd); m_usb_fd = -1; diff --git a/aa_wireless_dongle/package/aawg/src/proxyHandler.h b/aa_wireless_dongle/package/aawg/src/proxyHandler.h index 339f1b9..adba396 100644 --- a/aa_wireless_dongle/package/aawg/src/proxyHandler.h +++ b/aa_wireless_dongle/package/aawg/src/proxyHandler.h @@ -16,6 +16,7 @@ class AAWProxy { void handleClient(int server_fd); void forward(ProxyDirection direction, std::atomic& should_exit); + void stopForwarding(std::atomic& should_exit); ssize_t readFully(int fd, unsigned char *buf, size_t nbyte); ssize_t readMessage(int fd, unsigned char *buf, size_t nbyte); @@ -23,5 +24,8 @@ class AAWProxy { int m_usb_fd = -1; int m_tcp_fd = -1; + std::optional m_usb_tcp_thread = std::nullopt; + std::optional m_tcp_usb_thread = std::nullopt; + std::atomic m_log_communication = false; };