Skip to content
This repository has been archived by the owner on Sep 3, 2024. It is now read-only.

Refactor UDP transport #111

Merged
merged 12 commits into from
Jan 16, 2024
Merged

Refactor UDP transport #111

merged 12 commits into from
Jan 16, 2024

Conversation

TimEvens
Copy link
Contributor

Significant refactor to add the following new features:

  • Connection establishment - helps with port scanning/dos mitigation
  • Connection disconnect - Client disconnect is now signaled to close properly
  • Connection idle timeout - If the client or server disappears (e.g., crash), idle timeout
    will clean up the connection.
  • Shipping/pacing transmission to configurable rate per microsecond (applied at the millisecond level).
  • Added report feedback with logging and initial calculation of bandwidth and loss. This will be tuned and adjusted
    based on testing. We can use the feedback report to adjust the shaping/pacing.

Copy link
Contributor

@paulej paulej left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a number of comments, though you're free to accept or ignore them. :)

@@ -42,6 +42,10 @@ namespace qtransport {
};

public:
~priority_queue() {
std::lock_guard<std::mutex> _(_mutex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why lock the mutex if there is nothing to do? Is this to ensure there are no threads inside doing work? If that's a risk, there might still be a risk of a thread in the code, but not fully out of a function or perhaps entering the code and not yet reaching a lock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Destruct starts from another thread, which caused a race condition on a pop/push that was happening at the same time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this object is held with a shared pointer. Whatever threads are calling into it should have a valid pointer (i.e., the object's destructor has not been called). Do you know what threads? While this might appear to fix it, it concerns me that we're not first terminating those threads before attempting to destroy the priority queue object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed it for now. This change was a fix back before the shutdown with the picoquic transport. The segfault was caused by the priority queue destructor, but it is a valid question of why was the order not maintained. I've created issue #112 to work this issue.

break;
}
default: {
// IPv6
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ss_family can have other values. It would be safer to explicitly check for IPv4 and IPv6, then default to zeroing out the address or throw an exception. I assume we're controlling what gets passed in, but an exception would certainly highlight a problem if there were a mistake upstream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, but I created issue #110 to fix/add IPv6 support. Right now it doesn't work in UDP, it does in Picoquic. Not a big deal to add, just want to do that in a different PR.


std::lock_guard<std::mutex> _(_socket_write_mutex);

int numSent = sendto(fd,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sendto is thread-safe, so the lock may not be required above. I did not see anything in particular that needs protection here.

I'm also wondering about sending a structure as-is. There's no serialization logic for this and the other calls I see below?

Copy link
Contributor Author

@TimEvens TimEvens Jan 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to sync both fd_reader and fd_writer threads on writes and updating the shared Connection context. Not all the methods pass in the connection context right now, but they do pass in the addr. The main thing that needs to be in sync is the connection context.

I added _connections_mutex afterwards. That lock actually should be enough now as both fd_reader and fd_writer use it, but fd_reader doesn't need to lock unless it's doing a send related update. We can consolidate them, but it might be a little less efficient. For now, I've renamed send_recv_sync_mutex to be more clear.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm missing the point, I guess. I see no reason to have the mutex locked while calling an OS-level sendto() call.

Copy link
Contributor Author

@TimEvens TimEvens Jan 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has nothing to do with sendto(). Socket calls are only atomic though, but that's not an issue here.

I believe your comment is focused on sendto() because you are saying that sendto() does not need to be locked... While that is true, it's not why the lock is there. The lock is there so that we can update shared contexts/references passed. In this specific case 3 of the methods do not have connection context and therefore don't need the lock. True for now, but that's not the case soon as it'll have to be updated soon as the protocol evolves. The shared connection context will likely need to be passed for these 3 methods as well and locking will be required/ The other 2 methods, See https://github.com/Quicr/transport/blob/tievens-0108/src/transport_udp.h#L170, do require locking as you can see them updating the connection context.

The protocol design is that the send methods will be updating shared context information, therefore they will need locking. But... as mentioned above, I added connection context locking, which can be used instead of this lock. I will consolidate this on Tuesday, but locking will still be required and used because of the updates that the protocol requires.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I don't like about moving locking out of the send_* methods is that they become non thread-safe. It will require the caller to lock or be super careful. I'll update the method comments to mention that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case, what's to protect? I don't see any reason for the lock.

More generally, I would avoid locking while calling into the OS to send data. Whatever time that takes, the lock is held with no benefit. I get it that sometimes that's unavoidable.

On locking in every function, while you're right that can be a problem, it can be documented. You don't want the cost of locking and unlocking all the time.


std::lock_guard<std::mutex> _(_socket_write_mutex);

int numSent = sendto(fd,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here. I don't think we need the above lock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same response as above.


std::lock_guard<std::mutex> _(_socket_write_mutex);

int numSent = sendto(fd,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here. I don't think we need the above lock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same response as above.


logger->Log("Done transport reader thread");
}
size_t snd_rcv_max = 64000; // TODO: Add config for value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw a couple of places where 64000 was explicitly referenced. I'd suggest making this constexpr size_t Max_Packet_Size = 65535 or something until we sort out the dynamic configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, updated.


return dq.pop().value().data;
}
const auto& [conn_it, _] = conn_contexts.emplace(last_conn_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a lock needed to protect this map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not on this one because it's creating the client. Although, we can add a lock for the crazy use-case where the app calls start and close at the same time from two different threads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While not needed, consolidating the send/recv lock to connections lock will result in this being protected by lock as well.

}

// TODO: Add config for this value
size_t snd_rcv_max = 2000000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Above, I saw a value about half this. Another good candidate to make a global constant for the moment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Max packet size is pretty locked in, but it would make sense to update all the others that should be config variables in another PR. I would like to do that very soon after this.

}

err =
setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &rcv_timeout, sizeof(rcv_timeout));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed on a UDP socket?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used today so that we don't block on reading/waiting for data.

Copy link
Contributor

@paulej paulej Jan 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should put the socket in non-blocking mode:

auto flags = fcntl(fd, F_GETFL);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);

Each of those calls can return an error that needs to be checked.

Copy link
Contributor Author

@TimEvens TimEvens Jan 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now we specifically do not want it in non-blocking mode, dedicated threads for reader/writer support this. The timeout is there to support stop as well as to no longer too long.

Now that we have the tick service that runs with very low impact on CPU and interrupts, it would support non-blocking well. We can change it to non-blocking with [e]poll, later in another PR. I created #113 for this.


uint16_t idle_timeout { 120 }; /// Idle timeout in seconds. Must not be zero

} __attribute__((__packed__, aligned(1)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this explains what I was asking about before. This does ensure the size is precisely the size of the object. Why are these not serialized using the message buffer stuff used elsewhere in libquicr? Perhaps because this is one level down from that, but you know this will not be portable. It would be trivial to create a miniature message serializer for these, since they're all trivial.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can update it if it's really a problem, but:

  1. It's portable with clang and gcc of versions that support c++20, which we require.
  2. #pragma pack(1) is portable with clang, gcc, and vs, so we could use that as well, but that's header wide

@TimEvens
Copy link
Contributor Author

@paulej , thanks so much for the review. I updated all the comments and code based on feedback. The one change not updated was the packing since it is portable to clang and gcc, which is what we require right now.

include/transport/priority_queue.h Outdated Show resolved Hide resolved
{
// TODO: Close all streams and connections
UDPTransport::~UDPTransport() {
// TODO: Close all streams and connections
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still not do that?

if (thread.joinable())
thread.join();
}
_tick_service.reset();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't have to call reset, that should be handled fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless the objects that depend on it are declared before it in the class, in which case it would probably be better to reorder the member declarations so you can avoid this manual call.

Comment on lines 101 to 109

// Do not delete the default datagram context id ZERO
if (data_ctx_id > 0) {
auto conn_it = conn_contexts.find(conn_id);
if (conn_it != conn_contexts.end()) {
logger->info << "Delete data context id: " << data_ctx_id << " in conn_id: " << conn_id << std::flush;
conn_it->second->data_contexts.erase(data_ctx_id);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whole function body is the if statement, could return early instead and reduce tabbing.

const auto current_tick = _tick_service->get_ticks(std::chrono::milliseconds(1));

if (current_tick >= conn.next_report_tick) {
// New report ID
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment seems out of place


// Shape flow by only processing data if wait for tick value is less than or equal to current tick
if (conn->wait_for_tick > current_tick) {
/* Noisy - Enable when debugging shaping
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second time seeing this, maybe we should have a "noisy_print" config knob?


if (data_ctx.tx_data->empty()) { // No data, go to next connection
// Send keepalive if needed
if (conn->last_tx_msg_tick && current_tick - conn->last_tx_msg_tick > conn->ka_interval_ms) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this calculations few times, maybe it should be calculated once and stored in a constant above all these checks?

Comment on lines +501 to +509
if (!sent_data) {
all_empty_count++;

if (all_empty_count > 5) {
all_empty_count = 1;
to.tv_usec = 1000;
select(0, NULL, NULL, NULL, &to);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I fully understand what this is doing

@TimEvens TimEvens merged commit f6b36e9 into main Jan 16, 2024
2 checks passed
@TimEvens TimEvens deleted the tievens-0108 branch March 8, 2024 16:24
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants