diff --git a/fio-stl.h b/fio-stl.h index 00f3e0e..5a20a73 100644 --- a/fio-stl.h +++ b/fio-stl.h @@ -32575,6 +32575,7 @@ static struct FIO___PUBSUB_POSTOFFICE { uint8_t remote; } filter; uint8_t secret_is_random; + FIO___LOCK_TYPE lock; fio___pubsub_engines_s engines; FIO_LIST_NODE history_active; FIO_LIST_NODE history_waiting; @@ -32601,6 +32602,7 @@ static struct FIO___PUBSUB_POSTOFFICE { .local = (FIO___PUBSUB_SIBLINGS), .remote = FIO___PUBSUB_REMOTE, }, + .lock = FIO___LOCK_INIT, .protocol = { .ipc = @@ -32798,6 +32800,7 @@ FIO_SFUNC void fio___pubsub_at_exit(void *ignr_) { fio___pubsub_message_map_destroy(&FIO___PUBSUB_POSTOFFICE.remote_messages); fio___pubsub_message_map_destroy(&FIO___PUBSUB_POSTOFFICE.history_messages); fio___pubsub_engines_destroy(&FIO___PUBSUB_POSTOFFICE.engines); + FIO___LOCK_DESTROY(FIO___PUBSUB_POSTOFFICE.lock); fio_queue_perform_all(fio_srv_queue()); } @@ -32986,15 +32989,21 @@ SFUNC void fio_subscribe FIO_NOOP(fio_subscribe_args_s args) { if (fio_srv_is_master()) goto error_not_on_master; is_global: - fio_srv_defer(fio___pubsub_subscribe_task, (void *)s, NULL); - fio___postoffice_msmap_set( - &FIO___PUBSUB_POSTOFFICE.master_subscriptions + (!args.master_only), - fio_risky_hash(args.channel.buf, - args.channel.len, - args.filter | ((size_t)args.is_pattern << 20)), - FIO_STR_INFO2(args.channel.buf, args.channel.len), - s, - NULL); + if (1) { /* so C++ can jump even though there's a new var here */ + fio_srv_defer(fio___pubsub_subscribe_task, (void *)s, NULL); + uint64_t hashed_value = + fio_risky_hash(args.channel.buf, + args.channel.len, + args.filter | ((size_t)args.is_pattern << 20)); + FIO___LOCK_LOCK(FIO___PUBSUB_POSTOFFICE.lock); + fio___postoffice_msmap_set( + &FIO___PUBSUB_POSTOFFICE.master_subscriptions + (!args.master_only), + hashed_value, + FIO_STR_INFO2(args.channel.buf, args.channel.len), + s, + NULL); + FIO___LOCK_UNLOCK(FIO___PUBSUB_POSTOFFICE.lock); + } return; error_not_on_master: @@ -33050,13 +33059,21 @@ int fio_unsubscribe FIO_NOOP(fio_subscribe_args_s args) { return 0; is_global: - return fio___postoffice_msmap_remove( - &FIO___PUBSUB_POSTOFFICE.master_subscriptions + (!args.master_only), - fio_risky_hash(args.channel.buf, - args.channel.len, - args.filter | ((size_t)args.is_pattern << 20)), - FIO_STR_INFO3(args.channel.buf, args.channel.len, (size_t)-1), - NULL); + if (1) { + int r; + uint64_t hashed_value = + fio_risky_hash(args.channel.buf, + args.channel.len, + args.filter | ((size_t)args.is_pattern << 20)); + FIO___LOCK_LOCK(FIO___PUBSUB_POSTOFFICE.lock); + r = fio___postoffice_msmap_remove( + &FIO___PUBSUB_POSTOFFICE.master_subscriptions + (!args.master_only), + hashed_value, + FIO_STR_INFO3(args.channel.buf, args.channel.len, (size_t)-1), + NULL); + FIO___LOCK_UNLOCK(FIO___PUBSUB_POSTOFFICE.lock); + return r; + } } /* ***************************************************************************** diff --git a/fio-stl/420 pubsub.h b/fio-stl/420 pubsub.h index 94905dc..04be725 100644 --- a/fio-stl/420 pubsub.h +++ b/fio-stl/420 pubsub.h @@ -772,6 +772,7 @@ static struct FIO___PUBSUB_POSTOFFICE { uint8_t remote; } filter; uint8_t secret_is_random; + FIO___LOCK_TYPE lock; fio___pubsub_engines_s engines; FIO_LIST_NODE history_active; FIO_LIST_NODE history_waiting; @@ -798,6 +799,7 @@ static struct FIO___PUBSUB_POSTOFFICE { .local = (FIO___PUBSUB_SIBLINGS), .remote = FIO___PUBSUB_REMOTE, }, + .lock = FIO___LOCK_INIT, .protocol = { .ipc = @@ -995,6 +997,7 @@ FIO_SFUNC void fio___pubsub_at_exit(void *ignr_) { fio___pubsub_message_map_destroy(&FIO___PUBSUB_POSTOFFICE.remote_messages); fio___pubsub_message_map_destroy(&FIO___PUBSUB_POSTOFFICE.history_messages); fio___pubsub_engines_destroy(&FIO___PUBSUB_POSTOFFICE.engines); + FIO___LOCK_DESTROY(FIO___PUBSUB_POSTOFFICE.lock); fio_queue_perform_all(fio_srv_queue()); } @@ -1183,15 +1186,21 @@ SFUNC void fio_subscribe FIO_NOOP(fio_subscribe_args_s args) { if (fio_srv_is_master()) goto error_not_on_master; is_global: - fio_srv_defer(fio___pubsub_subscribe_task, (void *)s, NULL); - fio___postoffice_msmap_set( - &FIO___PUBSUB_POSTOFFICE.master_subscriptions + (!args.master_only), - fio_risky_hash(args.channel.buf, - args.channel.len, - args.filter | ((size_t)args.is_pattern << 20)), - FIO_STR_INFO2(args.channel.buf, args.channel.len), - s, - NULL); + if (1) { /* so C++ can jump even though there's a new var here */ + fio_srv_defer(fio___pubsub_subscribe_task, (void *)s, NULL); + uint64_t hashed_value = + fio_risky_hash(args.channel.buf, + args.channel.len, + args.filter | ((size_t)args.is_pattern << 20)); + FIO___LOCK_LOCK(FIO___PUBSUB_POSTOFFICE.lock); + fio___postoffice_msmap_set( + &FIO___PUBSUB_POSTOFFICE.master_subscriptions + (!args.master_only), + hashed_value, + FIO_STR_INFO2(args.channel.buf, args.channel.len), + s, + NULL); + FIO___LOCK_UNLOCK(FIO___PUBSUB_POSTOFFICE.lock); + } return; error_not_on_master: @@ -1247,13 +1256,21 @@ int fio_unsubscribe FIO_NOOP(fio_subscribe_args_s args) { return 0; is_global: - return fio___postoffice_msmap_remove( - &FIO___PUBSUB_POSTOFFICE.master_subscriptions + (!args.master_only), - fio_risky_hash(args.channel.buf, - args.channel.len, - args.filter | ((size_t)args.is_pattern << 20)), - FIO_STR_INFO3(args.channel.buf, args.channel.len, (size_t)-1), - NULL); + if (1) { + int r; + uint64_t hashed_value = + fio_risky_hash(args.channel.buf, + args.channel.len, + args.filter | ((size_t)args.is_pattern << 20)); + FIO___LOCK_LOCK(FIO___PUBSUB_POSTOFFICE.lock); + r = fio___postoffice_msmap_remove( + &FIO___PUBSUB_POSTOFFICE.master_subscriptions + (!args.master_only), + hashed_value, + FIO_STR_INFO3(args.channel.buf, args.channel.len, (size_t)-1), + NULL); + FIO___LOCK_UNLOCK(FIO___PUBSUB_POSTOFFICE.lock); + return r; + } } /* *****************************************************************************