-
-
Notifications
You must be signed in to change notification settings - Fork 490
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixes surprising possible ordering of nng_pipe_notify events. #961
base: main
Are you sure you want to change the base?
Conversation
On nng_pipe_notify events, it was possible for events to fire in an unexpected order: pre-connect, post-*remove*, and finally post-*connect*. This can cause errors in the wild if a resource is attained in pre-connect and released in post-remove, as the resource cannot be used in the post-connect event if the race is exercised. Now, events will fire strictly in the order of pre-connect, post-connect, and post-remove. If the pipe is closed in pre-connect, post-connect and post-remove will not be called.
There is a semantic difference: now the callback is called with the This fixes codypiersall/pynng#40. For a little background, in pynng, we register callback functions for each pipe event. Inside our callback, we call any callbacks specified by the user. To keep track of the Python representation of pipes, we keep track of some stuff internally when the callbacks are called. Whenever the callbacks are not called in an expected order we run into problems. This PR doesn't fix any "bugs", exactly, but it does make it easier to reason about invariants. The pipe callbacks can now strictly only be called in the order of A different implementation that would accomplish the same thing would be to add a |
Is a mutex strictly necessary here? I'm not familiar with nng code, but browsing around some, it seems you're mixing an ordering / state transition problem with an exclusivity problem. If you only want one callback running at once, in a set order, then you could use a condition variable locking on a new mutex for callback activity that guards the last |
I see your point, @bb010g. This patch is using the mutex for more than what mutexes are traditionally used for--protecting data structures from being used in inconsistent states. Technically, this increases the critical section the mutex is protecting, since I didn't create the mutex for this patch, I just hold it longer. I like the idea of using condition variables. When I have more time (such a precious commodity!) I'll create an issue with a more full discussion, and point to this PR as a potential solution. It probably won't be for a couple weeks though; I've been fighting with a lawn mower, and many battles are yet to come. |
I need to look at this in more detail. I have some specific concerns about holding the lock across the callbacks -- I am worried that activity done in a callback might cause a deadlock (such as if the callback rejects the connection.) The other thing is that if ordering is being caused by being run on different threads, we can actually create a separate thread just for running these events, providing serialization in that way. There may be concerns with that too -- as I would need to synchronize back against the socket when the preconnect and postconnect routines return. Let me think about this. |
I may have been wrong about this being an issue in nng. The following C code I expected to reproduce this without using the Python bindings, but I couldn't reproduce it: This C code does not seem to demonstrate the race as I thought it would:issuex.c
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <nng/nng.h>
#include <nng/protocol/pair0/pair.h>
#define CHECK(x) (assert((x) == 0))
#define ADDR "tcp://127.0.0.1:14224"
#define NUM_ITER 1000
typedef struct cb_arg {
int sock_num;
int events_idx;
int events[3];
} cb_arg;
void cb(nng_pipe pipe, nng_pipe_ev event, void *data) {
struct cb_arg *info = (struct cb_arg *) data;
info->events[info->events_idx++] = event;
}
int main(int argc, char *argv[]) {
nng_socket listener, dialer;
int num_iter = NUM_ITER;
char *addr = ADDR;
if (argc >= 3) {
num_iter = strtol(argv[2], NULL, 0);
}
if (argc >= 2) {
addr = argv[1];
}
CHECK(nng_pair0_open(&listener));
CHECK(nng_listen(listener, addr, NULL, 0));
struct cb_arg *cb_args = malloc(num_iter * sizeof *cb_args);
for (int i = 0; i < num_iter; i++) {
struct cb_arg *cb_arg = &cb_args[i];
cb_arg->sock_num = i;
CHECK(nng_pair_open(&dialer));
CHECK(nng_pipe_notify(dialer, NNG_PIPE_EV_ADD_PRE, cb, cb_arg));
CHECK(nng_pipe_notify(dialer, NNG_PIPE_EV_ADD_POST, cb, cb_arg));
CHECK(nng_pipe_notify(dialer, NNG_PIPE_EV_REM_POST, cb, cb_arg));
CHECK(nng_dial(dialer, addr, NULL, 0));
CHECK(nng_close(dialer));
}
for (int i = 0; i < num_iter; i++) {
int *events = cb_args[i].events;
fprintf(stdout, "%d %d %d\n", events[0], events[1], events[2]);
}
free(cb_args);
}
Assuming that code is called issuex.c, it can be built like so:
And run like so, to output the unique ordering of pipe events:
So this "issue" should be treated as a non-issue until I can actually reproduce it in C without using pynng. |
Ugh, I realized that in my "reproducer" above, I was potentially watching pipe events from the wrong socket, if the short-lived sockets cause a race in the longer lived ones. I'll update the code and repost when I'm able to |
Okay, it took forever for me to get back to this, but I have finally made a plain C reproducer that demonstrates the bad pipe event ordering, and it is in the "details" tag below. My confusion before was that:
I also added some sleeps depending on the event type in the callback to try to observe the out-of-order events. The C program below just prints out issuex.c: reproduces pipe event ordering#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <unistd.h>
#include <nng/nng.h>
#include <nng/protocol/pair0/pair.h>
#define CHECK(x) (assert((x) == 0))
#define ADDR "tcp://127.0.0.1:14224"
#define NUM_ITER 1000
struct seen_events {
int pipe_id;
int event;
};
struct cb_data {
int iter;
pthread_mutex_t mut;
struct seen_events *events;
};
/* This callback just adds the current pipe id and event to the passed in events. */
void cb(nng_pipe pipe, nng_pipe_ev event, void *data) {
// add delays to simulate doing real work.
// These are the delays that demonstrated the race on my computer.
if (event == NNG_PIPE_EV_ADD_PRE) {
usleep(40);
}
else if (event == NNG_PIPE_EV_ADD_POST) {
usleep(50);
}
else if (event == NNG_PIPE_EV_REM_POST) {
usleep(20);
}
struct cb_data *d = (struct cb_data *) data;
pthread_mutex_lock(&d->mut);
d->iter += 1;
d->events[d->iter].pipe_id = nng_pipe_id(pipe);
d->events[d->iter].event = event;
pthread_mutex_unlock(&d->mut);
}
void init_cb_data(struct cb_data *data, int num_iter) {
data->iter = 0;
pthread_mutex_init(&data->mut, NULL);
data->events = malloc(sizeof (*data->events) * num_iter * 3);
}
int main(int argc, char *argv[]) {
nng_socket listener, dialer;
struct cb_data listener_cb_data, dialer_cb_data;
init_cb_data(&listener_cb_data, NUM_ITER);
init_cb_data(&dialer_cb_data, NUM_ITER);
CHECK(nng_pair0_open(&listener));
CHECK(nng_listen(listener, ADDR, NULL, 0));
for (int j = 0; j < NUM_ITER; j++) {
CHECK(nng_pair_open(&dialer));
CHECK(nng_pipe_notify(dialer, NNG_PIPE_EV_ADD_PRE, cb, &dialer_cb_data));
CHECK(nng_pipe_notify(dialer, NNG_PIPE_EV_ADD_POST, cb, &dialer_cb_data));
CHECK(nng_pipe_notify(dialer, NNG_PIPE_EV_REM_POST, cb, &dialer_cb_data));
CHECK(nng_dial(dialer, ADDR, NULL, NNG_FLAG_NONBLOCK));
usleep(100);
CHECK(nng_close(dialer));
}
for (int j = 0; j < NUM_ITER * 3; j++) {
struct seen_events *e = &dialer_cb_data.events[j];
fprintf(stdout, "%d %d\n", e->pipe_id, e->event);
}
usleep(100000);
} The Python script outputs the pipe event order and number of occurrences: Python script for postprocessing: parse_output.pyfrom collections import defaultdict, Counter
import fileinput
# appends pipe events in the order they are viewed.
d = defaultdict(list)
for line in fileinput.input():
pipe_id, event = line.split()
d[pipe_id].append(event)
d.pop('0', None)
vals = [tuple(x) for x in d.values()]
c = Counter(vals)
for event_order, n_occurred in c.items():
orders = ','.join(event_order)
print('{:7s} {:>3d}'.format(orders, n_occurred)) Build the C program with something like this:
And run it like this:
On my machine, it outputs something like this:
This indicates that 304 times, the pipe events fired (well, finished running) in the order pre_connect, post_remove, and post_connect, and 1 time in the order post_remove, pre_connect. I intentionally tried to make a callback that would exacerbate the issues I saw happening in pynng at codypiersall/pynng#40 |
I think I want to take a somewhat different approach with this. So basically we would have a lock (and possibly a condvar, though I'm not sure that's strictly necessary yet) and a couple of ints on the pipe that track what notifications have run and whether a notifier is running. This would avoid the global locks (hopefully minimizing contention as well). I'd like to also take a close look at the preattach hook and other synchronization with the rest of the socket. |
On nng_pipe_notify events, it was possible for events to fire in an
unexpected order: pre-connect, post-remove, and finally
post-connect. This can cause errors in the wild if a resource is
attained in pre-connect and released in post-remove, as the resource
cannot be used in the post-connect event if the race is exercised.
Now, events will fire strictly in the order of pre-connect,
post-connect, and post-remove. If the pipe is closed in
pre-connect, post-connect and post-remove will not be called.