Skip to content

Commit

Permalink
Critical section when managing global subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
boazsegev committed Nov 14, 2023
1 parent 60fa6db commit 8e6f4f2
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 32 deletions.
49 changes: 33 additions & 16 deletions fio-stl.h
Original file line number Diff line number Diff line change
Expand Up @@ -32575,6 +32575,7 @@ static struct FIO___PUBSUB_POSTOFFICE {
uint8_t remote;
} filter;
uint8_t secret_is_random;
FIO___LOCK_TYPE lock;
fio___pubsub_engines_s engines;
FIO_LIST_NODE history_active;
FIO_LIST_NODE history_waiting;
Expand All @@ -32601,6 +32602,7 @@ static struct FIO___PUBSUB_POSTOFFICE {
.local = (FIO___PUBSUB_SIBLINGS),
.remote = FIO___PUBSUB_REMOTE,
},
.lock = FIO___LOCK_INIT,
.protocol =
{
.ipc =
Expand Down Expand Up @@ -32798,6 +32800,7 @@ FIO_SFUNC void fio___pubsub_at_exit(void *ignr_) {
fio___pubsub_message_map_destroy(&FIO___PUBSUB_POSTOFFICE.remote_messages);
fio___pubsub_message_map_destroy(&FIO___PUBSUB_POSTOFFICE.history_messages);
fio___pubsub_engines_destroy(&FIO___PUBSUB_POSTOFFICE.engines);
FIO___LOCK_DESTROY(FIO___PUBSUB_POSTOFFICE.lock);
fio_queue_perform_all(fio_srv_queue());
}

Expand Down Expand Up @@ -32986,15 +32989,21 @@ SFUNC void fio_subscribe FIO_NOOP(fio_subscribe_args_s args) {
if (fio_srv_is_master())
goto error_not_on_master;
is_global:
fio_srv_defer(fio___pubsub_subscribe_task, (void *)s, NULL);
fio___postoffice_msmap_set(
&FIO___PUBSUB_POSTOFFICE.master_subscriptions + (!args.master_only),
fio_risky_hash(args.channel.buf,
args.channel.len,
args.filter | ((size_t)args.is_pattern << 20)),
FIO_STR_INFO2(args.channel.buf, args.channel.len),
s,
NULL);
if (1) { /* so C++ can jump even though there's a new var here */
fio_srv_defer(fio___pubsub_subscribe_task, (void *)s, NULL);
uint64_t hashed_value =
fio_risky_hash(args.channel.buf,
args.channel.len,
args.filter | ((size_t)args.is_pattern << 20));
FIO___LOCK_LOCK(FIO___PUBSUB_POSTOFFICE.lock);
fio___postoffice_msmap_set(
&FIO___PUBSUB_POSTOFFICE.master_subscriptions + (!args.master_only),
hashed_value,
FIO_STR_INFO2(args.channel.buf, args.channel.len),
s,
NULL);
FIO___LOCK_UNLOCK(FIO___PUBSUB_POSTOFFICE.lock);
}
return;

error_not_on_master:
Expand Down Expand Up @@ -33050,13 +33059,21 @@ int fio_unsubscribe FIO_NOOP(fio_subscribe_args_s args) {
return 0;

is_global:
return fio___postoffice_msmap_remove(
&FIO___PUBSUB_POSTOFFICE.master_subscriptions + (!args.master_only),
fio_risky_hash(args.channel.buf,
args.channel.len,
args.filter | ((size_t)args.is_pattern << 20)),
FIO_STR_INFO3(args.channel.buf, args.channel.len, (size_t)-1),
NULL);
if (1) {
int r;
uint64_t hashed_value =
fio_risky_hash(args.channel.buf,
args.channel.len,
args.filter | ((size_t)args.is_pattern << 20));
FIO___LOCK_LOCK(FIO___PUBSUB_POSTOFFICE.lock);
r = fio___postoffice_msmap_remove(
&FIO___PUBSUB_POSTOFFICE.master_subscriptions + (!args.master_only),
hashed_value,
FIO_STR_INFO3(args.channel.buf, args.channel.len, (size_t)-1),
NULL);
FIO___LOCK_UNLOCK(FIO___PUBSUB_POSTOFFICE.lock);
return r;
}
}

/* *****************************************************************************
Expand Down
49 changes: 33 additions & 16 deletions fio-stl/420 pubsub.h
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,7 @@ static struct FIO___PUBSUB_POSTOFFICE {
uint8_t remote;
} filter;
uint8_t secret_is_random;
FIO___LOCK_TYPE lock;
fio___pubsub_engines_s engines;
FIO_LIST_NODE history_active;
FIO_LIST_NODE history_waiting;
Expand All @@ -798,6 +799,7 @@ static struct FIO___PUBSUB_POSTOFFICE {
.local = (FIO___PUBSUB_SIBLINGS),
.remote = FIO___PUBSUB_REMOTE,
},
.lock = FIO___LOCK_INIT,
.protocol =
{
.ipc =
Expand Down Expand Up @@ -995,6 +997,7 @@ FIO_SFUNC void fio___pubsub_at_exit(void *ignr_) {
fio___pubsub_message_map_destroy(&FIO___PUBSUB_POSTOFFICE.remote_messages);
fio___pubsub_message_map_destroy(&FIO___PUBSUB_POSTOFFICE.history_messages);
fio___pubsub_engines_destroy(&FIO___PUBSUB_POSTOFFICE.engines);
FIO___LOCK_DESTROY(FIO___PUBSUB_POSTOFFICE.lock);
fio_queue_perform_all(fio_srv_queue());
}

Expand Down Expand Up @@ -1183,15 +1186,21 @@ SFUNC void fio_subscribe FIO_NOOP(fio_subscribe_args_s args) {
if (fio_srv_is_master())
goto error_not_on_master;
is_global:
fio_srv_defer(fio___pubsub_subscribe_task, (void *)s, NULL);
fio___postoffice_msmap_set(
&FIO___PUBSUB_POSTOFFICE.master_subscriptions + (!args.master_only),
fio_risky_hash(args.channel.buf,
args.channel.len,
args.filter | ((size_t)args.is_pattern << 20)),
FIO_STR_INFO2(args.channel.buf, args.channel.len),
s,
NULL);
if (1) { /* so C++ can jump even though there's a new var here */
fio_srv_defer(fio___pubsub_subscribe_task, (void *)s, NULL);
uint64_t hashed_value =
fio_risky_hash(args.channel.buf,
args.channel.len,
args.filter | ((size_t)args.is_pattern << 20));
FIO___LOCK_LOCK(FIO___PUBSUB_POSTOFFICE.lock);
fio___postoffice_msmap_set(
&FIO___PUBSUB_POSTOFFICE.master_subscriptions + (!args.master_only),
hashed_value,
FIO_STR_INFO2(args.channel.buf, args.channel.len),
s,
NULL);
FIO___LOCK_UNLOCK(FIO___PUBSUB_POSTOFFICE.lock);
}
return;

error_not_on_master:
Expand Down Expand Up @@ -1247,13 +1256,21 @@ int fio_unsubscribe FIO_NOOP(fio_subscribe_args_s args) {
return 0;

is_global:
return fio___postoffice_msmap_remove(
&FIO___PUBSUB_POSTOFFICE.master_subscriptions + (!args.master_only),
fio_risky_hash(args.channel.buf,
args.channel.len,
args.filter | ((size_t)args.is_pattern << 20)),
FIO_STR_INFO3(args.channel.buf, args.channel.len, (size_t)-1),
NULL);
if (1) {
int r;
uint64_t hashed_value =
fio_risky_hash(args.channel.buf,
args.channel.len,
args.filter | ((size_t)args.is_pattern << 20));
FIO___LOCK_LOCK(FIO___PUBSUB_POSTOFFICE.lock);
r = fio___postoffice_msmap_remove(
&FIO___PUBSUB_POSTOFFICE.master_subscriptions + (!args.master_only),
hashed_value,
FIO_STR_INFO3(args.channel.buf, args.channel.len, (size_t)-1),
NULL);
FIO___LOCK_UNLOCK(FIO___PUBSUB_POSTOFFICE.lock);
return r;
}
}

/* *****************************************************************************
Expand Down

0 comments on commit 8e6f4f2

Please sign in to comment.