Skip to content

Commit

Permalink
Fix logic of periodic events
Browse files Browse the repository at this point in the history
  • Loading branch information
svpcom committed Sep 22, 2024
1 parent 31b2faf commit 549fcda
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 28 deletions.
8 changes: 4 additions & 4 deletions src/rx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ void Aggregator::apply_fec(int ring_idx)
void radio_loop(int argc, char* const *argv, int optind, uint32_t channel_id, unique_ptr<BaseAggregator> &agg, int log_interval, int rcv_buf_size)
{
int nfds = argc - optind;
uint64_t log_send_ts = 0;
uint64_t log_send_ts = get_time_ms();
struct pollfd fds[MAX_RX_INTERFACES];
unique_ptr<Receiver> rx[MAX_RX_INTERFACES];

Expand Down Expand Up @@ -909,7 +909,7 @@ void radio_loop(int argc, char* const *argv, int optind, uint32_t channel_id, un
if (cur_ts >= log_send_ts)
{
agg->dump_stats(stdout);
log_send_ts = cur_ts + log_interval;
log_send_ts = cur_ts + log_interval - ((cur_ts - log_send_ts) % log_interval);
}

if (rc == 0) continue; // timeout expired
Expand All @@ -934,7 +934,7 @@ void network_loop(int srv_port, Aggregator &agg, int log_interval, int rcv_buf_s
struct sockaddr_in sockaddr;
uint8_t buf[MAX_FORWARDER_PACKET_SIZE];

uint64_t log_send_ts = 0;
uint64_t log_send_ts = get_time_ms();
struct pollfd fds[1];
int fd = open_udp_socket_for_rx(srv_port, rcv_buf_size);

Expand All @@ -956,7 +956,7 @@ void network_loop(int srv_port, Aggregator &agg, int log_interval, int rcv_buf_s
if (cur_ts >= log_send_ts)
{
agg.dump_stats(stdout);
log_send_ts = cur_ts + log_interval;
log_send_ts = cur_ts + log_interval - ((cur_ts - log_send_ts) % log_interval);
}

if (rc == 0) continue; // timeout expired
Expand Down
53 changes: 34 additions & 19 deletions src/tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -676,9 +676,9 @@ void data_source(unique_ptr<Transmitter> &t, vector<int> &rx_fd, int control_fd,
fds[nfds].fd = control_fd;
fds[nfds].events = POLLIN;

uint64_t session_key_announce_ts = 0;
uint64_t session_key_announce_ts = get_time_ms();
uint32_t rxq_overflow = 0;
uint64_t log_send_ts = 0;
uint64_t log_send_ts = get_time_ms();
uint64_t fec_close_ts = fec_timeout > 0 ? get_time_ms() + fec_timeout : 0;
uint32_t count_p_fec_timeouts = 0; // empty packets sent to close fec block due to timeout
uint32_t count_p_incoming = 0; // incoming udp packets (received + dropped due to rxq overflow)
Expand Down Expand Up @@ -735,7 +735,7 @@ void data_source(unique_ptr<Transmitter> &t, vector<int> &rx_fd, int control_fd,
count_p_dropped = 0;
count_p_truncated = 0;

log_send_ts = cur_ts + log_interval;
log_send_ts = cur_ts + log_interval - ((cur_ts - log_send_ts) % log_interval);
}

// Check control socket first
Expand Down Expand Up @@ -917,26 +917,30 @@ void data_source(unique_ptr<Transmitter> &t, vector<int> &rx_fd, int control_fd,

// rc > 0: events detected
// start from last fd index and reset it to zero
int i = start_fd_idx;
for(start_fd_idx = 0; rc > 0; i++)
int _tmp = start_fd_idx;
start_fd_idx = 0;

for(int i = _tmp; rc > 0; i = (i + 1) % nfds)
{
if (fds[i % nfds].revents & (POLLERR | POLLNVAL))
assert(i < nfds);

if (fds[i].revents & (POLLERR | POLLNVAL))
{
throw runtime_error(string_format("socket error: %s", strerror(errno)));
}

if (fds[i % nfds].revents & POLLIN)
if (fds[i].revents & POLLIN)
{
uint8_t buf[MAX_PAYLOAD_SIZE + 1];
uint8_t cmsgbuf[CMSG_SPACE(sizeof(uint32_t))];
rc -= 1;

t->select_output(mirror ? -1 : (i % nfds));
t->select_output(mirror ? -1 : (i));

for(;;)
{
ssize_t rsize;
int fd = fds[i % nfds].fd;
int fd = fds[i].fd;
struct iovec iov = { .iov_base = (void*)buf,
.iov_len = sizeof(buf) };

Expand Down Expand Up @@ -980,15 +984,20 @@ void data_source(unique_ptr<Transmitter> &t, vector<int> &rx_fd, int control_fd,
{
// Announce session key
t->send_session_key();

// Session packet interval is not in fixed grid because
// we yield session packets only if there are data packets
session_key_announce_ts = cur_ts + SESSION_KEY_ANNOUNCE_MSEC;
}

t->send_packet(buf, rsize, 0);

if (cur_ts >= log_send_ts) // log timeout expired
{
// Save current index and go to outer loop
// We need to transmit all packets from the queue before tx card switch
start_fd_idx = i % nfds;
start_fd_idx = i;
rc = 0;
break;
}
}
Expand Down Expand Up @@ -1138,7 +1147,7 @@ void packet_injector(RawSocketInjector &t, vector<int> &rx_fd, int log_interval)
}

uint32_t rxq_overflow = 0;
uint64_t log_send_ts = 0;
uint64_t log_send_ts = get_time_ms();

uint32_t count_p_incoming = 0; // incoming udp packets (received + dropped due to rxq overflow)
uint32_t count_b_incoming = 0; // incoming udp bytes (received only)
Expand Down Expand Up @@ -1178,7 +1187,7 @@ void packet_injector(RawSocketInjector &t, vector<int> &rx_fd, int log_interval)
count_p_dropped = 0;
count_p_bad = 0;

log_send_ts = cur_ts + log_interval;
log_send_ts = cur_ts + log_interval - ((cur_ts - log_send_ts) % log_interval);
}

if (rc == 0) // poll timeout
Expand All @@ -1188,15 +1197,19 @@ void packet_injector(RawSocketInjector &t, vector<int> &rx_fd, int log_interval)

// rc > 0: events detected
// start from last fd index and reset it to zero
int i = start_fd_idx;
for(start_fd_idx = 0; rc > 0; i++)
int _tmp = start_fd_idx;
start_fd_idx = 0;

for(int i = _tmp; rc > 0; i = (i + 1) % nfds)
{
if (fds[i % nfds].revents & (POLLERR | POLLNVAL))
assert(i < nfds);

if (fds[i].revents & (POLLERR | POLLNVAL))
{
throw runtime_error(string_format("socket error: %s", strerror(errno)));
}

if (fds[i % nfds].revents & POLLIN)
if (fds[i].revents & POLLIN)
{
uint8_t buf[MAX_DISTRIBUTION_PACKET_SIZE - sizeof(uint32_t) + 1];
uint8_t cmsgbuf[CMSG_SPACE(sizeof(uint32_t))];
Expand All @@ -1206,7 +1219,7 @@ void packet_injector(RawSocketInjector &t, vector<int> &rx_fd, int log_interval)
{
ssize_t rsize;
uint32_t _fwmark;
int fd = fds[i % nfds].fd;
int fd = fds[i].fd;

struct iovec iov[2] = {
// fwmark
Expand Down Expand Up @@ -1258,12 +1271,14 @@ void packet_injector(RawSocketInjector &t, vector<int> &rx_fd, int log_interval)

cur_ts = get_time_ms();

t.inject_packet(i % nfds, buf, rsize, ntohl(_fwmark));
t.inject_packet(i, buf, rsize, ntohl(_fwmark));

if (cur_ts >= log_send_ts) // log timeout expired
{
// Save current index and go to outer loop
// We need to transmit all packets from the queue before tx card switch
start_fd_idx = i % nfds;
start_fd_idx = i;
rc = 0;
break;
}
}
Expand Down
4 changes: 4 additions & 0 deletions wfb_ng/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
import atexit
import time

from twisted.python import runtime
runtime.seconds = time.monotonic
runtime.Platform.seconds = staticmethod(time.monotonic)

from twisted.internet import utils, reactor
from logging import currentframe
from twisted.python import log
Expand Down
1 change: 1 addition & 0 deletions wfb_ng/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def get_allocator(node):
if which nmcli > /dev/null && ! nmcli device show {{ wlan }} | grep -q '(unmanaged)'
then
nmcli device set {{ wlan }} managed no
sleep 1
fi
ip link set {{ wlan }} down
Expand Down
7 changes: 7 additions & 0 deletions wfb_ng/tests/test_twisted.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from twisted.trial import unittest
from twisted.internet import reactor


class ClockTestCase(unittest.TestCase):
def test_reactor_has_monitonic_clock(self):
self.assertLess(reactor.seconds(), 1000000000)
29 changes: 24 additions & 5 deletions wfb_ng/tuntap.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from pyroute2 import IPRoute
from contextlib import closing

from .conf import settings
from .proxy import ProxyProtocol


Expand Down Expand Up @@ -129,7 +130,7 @@ def write(self, data):

class TUNTAPProtocol(Protocol, ProxyProtocol):
noisy = False
keepalive_interval = 0.9
keepalive_interval = 0.5 * settings.common.log_interval / 1000.0

def __init__(self, mtu, agg_timeout=None):
self.all_peers = []
Expand All @@ -140,6 +141,8 @@ def __init__(self, mtu, agg_timeout=None):
# Sent keepalive packets
self.lc = task.LoopingCall(self.send_keepalive)
self.lc.start(self.keepalive_interval, now=False)
self.pkt_in_sem = 0
self.pkt_out_sem = 0

def _send_to_all_peers(self, data):
for peer in self.all_peers:
Expand All @@ -151,7 +154,9 @@ def _cleanup(self):

# call from peer only!
def write(self, msg):
# Remove keepalive messages
self.pkt_in_sem = 2

# Ignore incoming keepalive messages
if self.transport is None or not msg:
return

Expand All @@ -173,11 +178,25 @@ def write(self, msg):
i += pkt_size

def send_keepalive(self):
# Send keepalive message via all antennas.
# Send keepalive messages:
# 1. via all antennas if no RX from peer during 2 keepalive intervals
# 2. via current antenna if no TX to peer during one keepalive interval

# This allow to use multiple directed antennas on the both ends
# and/or use different frequency channels on different cards.
self._send_to_all_peers(b'')

if self.pkt_in_sem == 0:
self._send_to_all_peers(b'')

elif self.pkt_out_sem == 0:
self._send_to_peer(b'')

if self.pkt_in_sem > 0:
self.pkt_in_sem -= 1

if self.pkt_out_sem > 0:
self.pkt_out_sem -= 1

def dataReceived(self, data):
self.lc.reset() # reset keepalive timer
self.pkt_out_sem = 1
return self.messageReceived(struct.pack('!H', len(data)) + data)

0 comments on commit 549fcda

Please sign in to comment.