-
Notifications
You must be signed in to change notification settings - Fork 1
Conversation
* Fixes issue header length issue for streams when callback is less than 4 bytes. * Adds connection and data context ids to more log messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a number of comments, though you're free to accept or ignore them. :)
include/transport/priority_queue.h
Outdated
@@ -42,6 +42,10 @@ namespace qtransport { | |||
}; | |||
|
|||
public: | |||
~priority_queue() { | |||
std::lock_guard<std::mutex> _(_mutex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why lock the mutex if there is nothing to do? Is this to ensure there are no threads inside doing work? If that's a risk, there might still be a risk of a thread in the code, but not fully out of a function or perhaps entering the code and not yet reaching a lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Destruct starts from another thread, which caused a race condition on a pop/push that was happening at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this object is held with a shared pointer. Whatever threads are calling into it should have a valid pointer (i.e., the object's destructor has not been called). Do you know what threads? While this might appear to fix it, it concerns me that we're not first terminating those threads before attempting to destroy the priority queue object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed it for now. This change was a fix back before the shutdown with the picoquic transport. The segfault was caused by the priority queue destructor, but it is a valid question of why was the order not maintained. I've created issue #112 to work this issue.
break; | ||
} | ||
default: { | ||
// IPv6 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ss_family
can have other values. It would be safer to explicitly check for IPv4 and IPv6, then default to zeroing out the address or throw an exception. I assume we're controlling what gets passed in, but an exception would certainly highlight a problem if there were a mistake upstream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, but I created issue #110 to fix/add IPv6 support. Right now it doesn't work in UDP, it does in Picoquic. Not a big deal to add, just want to do that in a different PR.
|
||
std::lock_guard<std::mutex> _(_socket_write_mutex); | ||
|
||
int numSent = sendto(fd, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sendto
is thread-safe, so the lock may not be required above. I did not see anything in particular that needs protection here.
I'm also wondering about sending a structure as-is. There's no serialization logic for this and the other calls I see below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's to sync both fd_reader and fd_writer threads on writes and updating the shared Connection context. Not all the methods pass in the connection context right now, but they do pass in the addr. The main thing that needs to be in sync is the connection context.
I added _connections_mutex
afterwards. That lock actually should be enough now as both fd_reader and fd_writer use it, but fd_reader doesn't need to lock unless it's doing a send related update. We can consolidate them, but it might be a little less efficient. For now, I've renamed send_recv_sync_mutex
to be more clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm missing the point, I guess. I see no reason to have the mutex locked while calling an OS-level sendto()
call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It has nothing to do with sendto()
. Socket calls are only atomic though, but that's not an issue here.
I believe your comment is focused on sendto()
because you are saying that sendto()
does not need to be locked... While that is true, it's not why the lock is there. The lock is there so that we can update shared contexts/references passed. In this specific case 3 of the methods do not have connection context and therefore don't need the lock. True for now, but that's not the case soon as it'll have to be updated soon as the protocol evolves. The shared connection context will likely need to be passed for these 3 methods as well and locking will be required/ The other 2 methods, See https://github.com/Quicr/transport/blob/tievens-0108/src/transport_udp.h#L170, do require locking as you can see them updating the connection context.
The protocol design is that the send methods will be updating shared context information, therefore they will need locking. But... as mentioned above, I added connection context locking, which can be used instead of this lock. I will consolidate this on Tuesday, but locking will still be required and used because of the updates that the protocol requires.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I don't like about moving locking out of the send_*
methods is that they become non thread-safe. It will require the caller to lock or be super careful. I'll update the method comments to mention that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case, what's to protect? I don't see any reason for the lock.
More generally, I would avoid locking while calling into the OS to send data. Whatever time that takes, the lock is held with no benefit. I get it that sometimes that's unavoidable.
On locking in every function, while you're right that can be a problem, it can be documented. You don't want the cost of locking and unlocking all the time.
|
||
std::lock_guard<std::mutex> _(_socket_write_mutex); | ||
|
||
int numSent = sendto(fd, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment here. I don't think we need the above lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same response as above.
|
||
std::lock_guard<std::mutex> _(_socket_write_mutex); | ||
|
||
int numSent = sendto(fd, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment here. I don't think we need the above lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same response as above.
src/transport_udp.cpp
Outdated
|
||
logger->Log("Done transport reader thread"); | ||
} | ||
size_t snd_rcv_max = 64000; // TODO: Add config for value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw a couple of places where 64000 was explicitly referenced. I'd suggest making this constexpr size_t Max_Packet_Size = 65535
or something until we sort out the dynamic configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion, updated.
src/transport_udp.cpp
Outdated
|
||
return dq.pop().value().data; | ||
} | ||
const auto& [conn_it, _] = conn_contexts.emplace(last_conn_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is a lock needed to protect this map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not on this one because it's creating the client. Although, we can add a lock for the crazy use-case where the app calls start and close at the same time from two different threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While not needed, consolidating the send/recv lock to connections lock will result in this being protected by lock as well.
} | ||
|
||
// TODO: Add config for this value | ||
size_t snd_rcv_max = 2000000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Above, I saw a value about half this. Another good candidate to make a global constant for the moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Max packet size is pretty locked in, but it would make sense to update all the others that should be config variables in another PR. I would like to do that very soon after this.
} | ||
|
||
err = | ||
setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &rcv_timeout, sizeof(rcv_timeout)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed on a UDP socket?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used today so that we don't block on reading/waiting for data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should put the socket in non-blocking mode:
auto flags = fcntl(fd, F_GETFL);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
Each of those calls can return an error that needs to be checked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now we specifically do not want it in non-blocking mode, dedicated threads for reader/writer support this. The timeout is there to support stop as well as to no longer too long.
Now that we have the tick service that runs with very low impact on CPU and interrupts, it would support non-blocking well. We can change it to non-blocking with [e]poll, later in another PR. I created #113 for this.
|
||
uint16_t idle_timeout { 120 }; /// Idle timeout in seconds. Must not be zero | ||
|
||
} __attribute__((__packed__, aligned(1))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, this explains what I was asking about before. This does ensure the size is precisely the size of the object. Why are these not serialized using the message buffer stuff used elsewhere in libquicr? Perhaps because this is one level down from that, but you know this will not be portable. It would be trivial to create a miniature message serializer for these, since they're all trivial.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can update it if it's really a problem, but:
- It's portable with clang and gcc of versions that support c++20, which we require.
#pragma pack(1)
is portable with clang, gcc, and vs, so we could use that as well, but that's header wide
@paulej , thanks so much for the review. I updated all the comments and code based on feedback. The one change not updated was the packing since it is portable to clang and gcc, which is what we require right now. |
{ | ||
// TODO: Close all streams and connections | ||
UDPTransport::~UDPTransport() { | ||
// TODO: Close all streams and connections |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still not do that?
if (thread.joinable()) | ||
thread.join(); | ||
} | ||
_tick_service.reset(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't have to call reset, that should be handled fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless the objects that depend on it are declared before it in the class, in which case it would probably be better to reorder the member declarations so you can avoid this manual call.
src/transport_udp.cpp
Outdated
|
||
// Do not delete the default datagram context id ZERO | ||
if (data_ctx_id > 0) { | ||
auto conn_it = conn_contexts.find(conn_id); | ||
if (conn_it != conn_contexts.end()) { | ||
logger->info << "Delete data context id: " << data_ctx_id << " in conn_id: " << conn_id << std::flush; | ||
conn_it->second->data_contexts.erase(data_ctx_id); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whole function body is the if statement, could return early instead and reduce tabbing.
const auto current_tick = _tick_service->get_ticks(std::chrono::milliseconds(1)); | ||
|
||
if (current_tick >= conn.next_report_tick) { | ||
// New report ID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment seems out of place
|
||
// Shape flow by only processing data if wait for tick value is less than or equal to current tick | ||
if (conn->wait_for_tick > current_tick) { | ||
/* Noisy - Enable when debugging shaping |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Second time seeing this, maybe we should have a "noisy_print" config knob?
|
||
if (data_ctx.tx_data->empty()) { // No data, go to next connection | ||
// Send keepalive if needed | ||
if (conn->last_tx_msg_tick && current_tick - conn->last_tx_msg_tick > conn->ka_interval_ms) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see this calculations few times, maybe it should be calculated once and stored in a constant above all these checks?
if (!sent_data) { | ||
all_empty_count++; | ||
|
||
if (all_empty_count > 5) { | ||
all_empty_count = 1; | ||
to.tv_usec = 1000; | ||
select(0, NULL, NULL, NULL, &to); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I fully understand what this is doing
Significant refactor to add the following new features:
will clean up the connection.
based on testing. We can use the feedback report to adjust the shaping/pacing.