Skip to content

Commit

Permalink
Add TCP read timeout, reliably stop forwarding by interrupting the th…
Browse files Browse the repository at this point in the history
…reads (#90)

- Add read timeout for the TCP connection to help detect wifi disconnection or other failures
- When USB/TCP error is detected, interrupt the other thread to stop waiting on the read/write operation

The interruption is required, especially for the usb read/right operations as it currently does not support poll/select.

Fixes #26.
  • Loading branch information
nisargjhaveri authored Mar 27, 2024
1 parent 8b76527 commit 8d9c947
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 4 deletions.
63 changes: 59 additions & 4 deletions aa_wireless_dongle/package/aawg/src/proxyHandler.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <stdio.h>
#include <unistd.h>
#include <signal.h>
#include <fcntl.h>
#include <string.h>
#include <netinet/in.h>
Expand All @@ -15,6 +16,10 @@
#include "bluetoothHandler.h"
#include "proxyHandler.h"

void empty_signal_handler(int signal) {
// Empty. We don't want to do anything but interrupt the thread.
}

ssize_t AAWProxy::readFully(int fd, unsigned char *buffer, size_t nbyte) {
size_t remaining_bytes = nbyte;
while (remaining_bytes > 0) {
Expand Down Expand Up @@ -89,37 +94,62 @@ void AAWProxy::forward(ProxyDirection direction, std::atomic<bool>& should_exit)
}

while (!should_exit) {
// Read
ssize_t len = read_message ? readMessage(read_fd, buffer, buffer_len) : read(read_fd, buffer, buffer_len);

if (len <= 0) {
// Start logging read/write details if there is an error.
m_log_communication = true;
}
if (m_log_communication) {
Logger::instance()->info("%d bytes read from %s\n", len, read_name.c_str());
}

if (len < 0) {
Logger::instance()->info("Read from %s failed: %s\n", read_name.c_str(), strerror(errno));
break;
}
else if (len == 0) {
break;
}
else if (should_exit) {
break;
}

// Write
ssize_t wlen = write(write_fd, buffer, len);

if (wlen <= 0) {
// Start logging read/write details if there is an error.
m_log_communication = true;
}
if (m_log_communication) {
Logger::instance()->info("%d bytes written to %s\n", wlen, write_name.c_str());
}

if (wlen < 0) {
Logger::instance()->info("Write to %s failed: %s\n", write_name.c_str(), strerror(errno));
break;
}
else if (should_exit) {
break;
}
}

stopForwarding(should_exit);
}

void AAWProxy::stopForwarding(std::atomic<bool>& should_exit) {
Logger::instance()->info("Interrupting threads to stop forwarding\n");
should_exit = true;

if (m_usb_tcp_thread) {
pthread_kill(m_usb_tcp_thread->native_handle(), SIGUSR1);
}

if (m_tcp_usb_thread) {
pthread_kill(m_tcp_usb_thread->native_handle(), SIGUSR1);
}
}

void AAWProxy::handleClient(int server_sock) {
Expand Down Expand Up @@ -150,13 +180,38 @@ void AAWProxy::handleClient(int server_sock) {
return;
}

// Set timeout on the TCP socket
struct timeval tv = {
.tv_sec = 10,
.tv_usec = 0,
};

if (setsockopt(m_tcp_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) {
Logger::instance()->info("setsockopt failed: %s\n", strerror(errno));
return;
}

// Setup signal handler
struct sigaction sa;
sa.sa_handler = empty_signal_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
if (sigaction(SIGUSR1, &sa, NULL)) {
Logger::instance()->info("Adding signal handler failed: %s\n", strerror(errno));
}

Logger::instance()->info("Forwarding data between TCP and USB\n");
std::atomic<bool> should_exit = false;
std::thread usb_tcp(&AAWProxy::forward, this, ProxyDirection::USB_to_TCP, std::ref(should_exit));
std::thread tcp_usb(&AAWProxy::forward, this, ProxyDirection::TCP_to_USB, std::ref(should_exit));
m_usb_tcp_thread = std::thread(&AAWProxy::forward, this, ProxyDirection::USB_to_TCP, std::ref(should_exit));
m_tcp_usb_thread = std::thread(&AAWProxy::forward, this, ProxyDirection::TCP_to_USB, std::ref(should_exit));

m_usb_tcp_thread->join();
m_usb_tcp_thread = std::nullopt;

m_tcp_usb_thread->join();
m_tcp_usb_thread = std::nullopt;

usb_tcp.join();
tcp_usb.join();
signal(SIGUSR1, SIG_DFL);

close(m_usb_fd);
m_usb_fd = -1;
Expand Down
4 changes: 4 additions & 0 deletions aa_wireless_dongle/package/aawg/src/proxyHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@ class AAWProxy {

void handleClient(int server_fd);
void forward(ProxyDirection direction, std::atomic<bool>& should_exit);
void stopForwarding(std::atomic<bool>& should_exit);

ssize_t readFully(int fd, unsigned char *buf, size_t nbyte);
ssize_t readMessage(int fd, unsigned char *buf, size_t nbyte);

int m_usb_fd = -1;
int m_tcp_fd = -1;

std::optional<std::thread> m_usb_tcp_thread = std::nullopt;
std::optional<std::thread> m_tcp_usb_thread = std::nullopt;

std::atomic<bool> m_log_communication = false;
};

1 comment on commit 8d9c947

@Ioniq3
Copy link
Contributor

@Ioniq3 Ioniq3 commented on 8d9c947 Mar 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nisargjhaveri , Great works, now it works more resilient than ever, I proved with dongle_mode_connection merge dand no problem at all.

Please sign in to comment.