From 5d3d2cde579c0ba47606567b548e9c72df7a4574 Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Mon, 4 Mar 2024 10:38:02 -0800 Subject: [PATCH] Comments --- .../request_response_client.h | 14 +- .../request_response_client.c | 162 +++++++++++++----- 2 files changed, 133 insertions(+), 43 deletions(-) diff --git a/include/aws/mqtt/request-response/request_response_client.h b/include/aws/mqtt/request-response/request_response_client.h index 5e8fa221..5fe38149 100644 --- a/include/aws/mqtt/request-response/request_response_client.h +++ b/include/aws/mqtt/request-response/request_response_client.h @@ -19,7 +19,7 @@ struct aws_mqtt_request_response_client_options { size_t max_subscriptions; uint32_t operation_timeout_seconds; - // Do not bind the initialized callback; it exists mostly for tests and should not be exposed + /* Do not bind the initialized callback; it exists mostly for tests and should not be exposed */ aws_mqtt_request_response_client_initialized_callback_fn *initialized_callback; aws_mqtt_request_response_client_terminated_callback_fn *terminated_callback; @@ -28,19 +28,31 @@ struct aws_mqtt_request_response_client_options { AWS_EXTERN_C_BEGIN +/* + * Create a new request-response client that uses an MQTT311 client. + */ struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt311_client( struct aws_allocator *allocator, struct aws_mqtt_client_connection *client, const struct aws_mqtt_request_response_client_options *options); +/* + * Create a new request-response client that uses an MQTT5 client. + */ struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt5_client( struct aws_allocator *allocator, struct aws_mqtt5_client *client, const struct aws_mqtt_request_response_client_options *options); +/* + * Add a reference to a request-response client + */ struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_acquire( struct aws_mqtt_request_response_client *client); +/* + * Remove a reference to a request-response client + */ struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_release( struct aws_mqtt_request_response_client *client); diff --git a/source/request-response/request_response_client.c b/source/request-response/request_response_client.c index 1d313a2b..7b43f452 100644 --- a/source/request-response/request_response_client.c +++ b/source/request-response/request_response_client.c @@ -1,7 +1,7 @@ /** -* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -* SPDX-License-Identifier: Apache-2.0. -*/ + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ #include @@ -13,16 +13,44 @@ #include #include +/* Tracks the current state of the request-response client */ enum aws_request_response_client_state { - // cross-thread initialization has not completed and all protocol adapter callbacks are ignored + + /* cross-thread initialization has not completed and all protocol adapter callbacks are ignored */ AWS_RRCS_UNINITIALIZED, + /* Normal operating state for the client. */ AWS_RRCS_ACTIVE, - // asynchronously shutting down, no more servicing will be done and all protocol adapter callbacks are ignored + /* asynchronously shutting down, no more servicing will be done and all protocol adapter callbacks are ignored */ AWS_RRCS_SHUTTING_DOWN, }; +/* + * Request-Response Client Notes + * + * Ref-counting/Shutdown + * + * The request-response client uses a double ref-count pattern. + * + * External references represent user references. When the external reference reaches zero, the client's asynchronous + * shutdown process is started. + * + * Internal references block final destruction. Asynchronous shutdown will not complete until all internal references + * are dropped. In addition to one long-lived internal reference (the protocol client adapter's back reference to + * the request-response client), all event loop tasks that target the request-response client hold an internal + * reference between task submission and task completion. This ensures that the task always has a valid reference + * to the client, even if we're trying to shut down at the same time. + * + * + * Initialization + * + * Initialization is complicated by the fact that the subscription manager needs to be initialized from the + * event loop thread that the client/protocol adapter/protocol client are all seated on. To do this safely, + * we add an uninitialized state that ignores all callbacks and we schedule a task on initial construction to do + * the event-loop-only initialization. Once that initialization completes on the event loop thread, we move + * the client into an active state where it will process operations and protocol adapter callbacks. + */ struct aws_mqtt_request_response_client { struct aws_allocator *allocator; @@ -47,12 +75,14 @@ struct aws_mqtt_request_response_client { static void s_aws_rr_client_on_zero_internal_ref_count(void *context) { struct aws_mqtt_request_response_client *client = context; + /* Both ref counts are zero, but it's still safest to schedule final destruction, not invoke it directly */ aws_event_loop_schedule_task_now(client->loop, &client->internal_shutdown_task); } static void s_aws_rr_client_on_zero_external_ref_count(void *context) { struct aws_mqtt_request_response_client *client = context; + /* Start the asynchronous shutdown process */ aws_event_loop_schedule_task_now(client->loop, &client->external_shutdown_task); } @@ -67,7 +97,10 @@ static void s_mqtt_request_response_client_final_destroy(struct aws_mqtt_request } } -static void s_mqtt_request_response_client_internal_shutdown_task_fn(struct aws_task *task, void *arg, enum aws_task_status task_status) { +static void s_mqtt_request_response_client_internal_shutdown_task_fn( + struct aws_task *task, + void *arg, + enum aws_task_status task_status) { (void)task; (void)task_status; @@ -76,13 +109,16 @@ static void s_mqtt_request_response_client_internal_shutdown_task_fn(struct aws_ /* * All internal and external refs are gone; it is safe to clean up synchronously. * - * The subscription manager is cleaned up and the protocol adapter has been shut down. All that's left is to - * free memory. + * The subscription manager is cleaned up and the protocol adapter has been shut down. No tasks targeting the + * client are active (other than this one). All that's left is to free memory. */ s_mqtt_request_response_client_final_destroy(client); } -static void s_mqtt_request_response_client_external_shutdown_task_fn(struct aws_task *task, void *arg, enum aws_task_status task_status) { +static void s_mqtt_request_response_client_external_shutdown_task_fn( + struct aws_task *task, + void *arg, + enum aws_task_status task_status) { (void)task; AWS_FATAL_ASSERT(task_status != AWS_TASK_STATUS_CANCELED); @@ -101,21 +137,25 @@ static void s_mqtt_request_response_client_external_shutdown_task_fn(struct aws_ aws_ref_count_release(&client->internal_ref_count); } -static void s_aws_rr_client_subscription_status_event_callback(const struct aws_rr_subscription_status_event *event, void *userdata) { +static void s_aws_rr_client_subscription_status_event_callback( + const struct aws_rr_subscription_status_event *event, + void *userdata) { (void)event; (void)userdata; /* - * We must be on the event loop, but it's safer overall to process this event as a top-level event loop task. The subscription - * manager assumes that we won't call APIs on it while iterating subscription records and listeners. + * We must be on the event loop, but it's safer overall to process this event as a top-level event loop task. The + * subscription manager assumes that we won't call APIs on it while iterating subscription records and listeners. * * These tasks hold an internal reference while they exist. */ - // NYI + /* NYI */ } -static void s_aws_rr_client_protocol_adapter_subscription_event_callback(const struct aws_protocol_adapter_subscription_event *event, void *user_data) { +static void s_aws_rr_client_protocol_adapter_subscription_event_callback( + const struct aws_protocol_adapter_subscription_event *event, + void *user_data) { struct aws_mqtt_request_response_client *rr_client = user_data; AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(rr_client->loop)); @@ -140,17 +180,19 @@ static void s_aws_rr_client_protocol_adapter_incoming_publish_callback( return; } - // NYI + /* NYI */ } static void s_aws_rr_client_protocol_adapter_terminate_callback(void *user_data) { struct aws_mqtt_request_response_client *rr_client = user_data; - // release the internal ref count "held" by the protocol adapter's existence + /* release the internal ref count "held" by the protocol adapter's existence */ aws_ref_count_release(&rr_client->internal_ref_count); } -static void s_aws_rr_client_protocol_adapter_connection_event_callback(const struct aws_protocol_adapter_connection_event *event, void *user_data) { +static void s_aws_rr_client_protocol_adapter_connection_event_callback( + const struct aws_protocol_adapter_connection_event *event, + void *user_data) { struct aws_mqtt_request_response_client *rr_client = user_data; AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(rr_client->loop)); @@ -162,38 +204,56 @@ static void s_aws_rr_client_protocol_adapter_connection_event_callback(const str aws_rr_subscription_manager_on_protocol_adapter_connection_event(&rr_client->subscription_manager, event); } -static struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_client_new(struct aws_allocator *allocator, const struct aws_mqtt_request_response_client_options *options, struct aws_event_loop *loop) { +static struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_client_new( + struct aws_allocator *allocator, + const struct aws_mqtt_request_response_client_options *options, + struct aws_event_loop *loop) { struct aws_rr_subscription_manager_options sm_options = { .max_subscriptions = options->max_subscriptions, .operation_timeout_seconds = options->operation_timeout_seconds, }; - // we can't initialize the subscription manager until we're running on the event loop, so make sure that - // initialize can't fail by checking its options for validity now. + /* + * We can't initialize the subscription manager until we're running on the event loop, so make sure that + * initialize can't fail by checking its options for validity now. + */ if (!aws_rr_subscription_manager_are_options_valid(&sm_options)) { return NULL; } - struct aws_mqtt_request_response_client *rr_client = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_request_response_client)); + struct aws_mqtt_request_response_client *rr_client = + aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_request_response_client)); rr_client->allocator = allocator; rr_client->config = *options; rr_client->loop = loop; rr_client->state = AWS_RRCS_UNINITIALIZED; - aws_task_init(&rr_client->external_shutdown_task, s_mqtt_request_response_client_external_shutdown_task_fn, rr_client, "mqtt_rr_client_external_shutdown"); - aws_task_init(&rr_client->internal_shutdown_task, s_mqtt_request_response_client_internal_shutdown_task_fn, rr_client, "mqtt_rr_client_internal_shutdown"); - - // 1 external ref to the caller + aws_task_init( + &rr_client->external_shutdown_task, + s_mqtt_request_response_client_external_shutdown_task_fn, + rr_client, + "mqtt_rr_client_external_shutdown"); + aws_task_init( + &rr_client->internal_shutdown_task, + s_mqtt_request_response_client_internal_shutdown_task_fn, + rr_client, + "mqtt_rr_client_internal_shutdown"); + + /* The initial external ref belongs to the caller */ aws_ref_count_init(&rr_client->external_ref_count, rr_client, s_aws_rr_client_on_zero_external_ref_count); - // 1 internal ref count belongs to ourselves (the external ref count shutdown task) + /* The initial internal ref belongs to ourselves (the external ref count shutdown task) */ aws_ref_count_init(&rr_client->internal_ref_count, rr_client, s_aws_rr_client_on_zero_internal_ref_count); return rr_client; } -static void s_aws_rr_client_init_subscription_manager(struct aws_mqtt_request_response_client *rr_client, struct aws_allocator *allocator) { +static void s_aws_rr_client_init_subscription_manager( + struct aws_mqtt_request_response_client *rr_client, + struct aws_allocator *allocator) { + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(rr_client->loop)); + struct aws_rr_subscription_manager_options subscription_manager_options = { .operation_timeout_seconds = rr_client->config.operation_timeout_seconds, .max_subscriptions = rr_client->config.max_subscriptions, @@ -201,10 +261,14 @@ static void s_aws_rr_client_init_subscription_manager(struct aws_mqtt_request_re .userdata = rr_client, }; - aws_rr_subscription_manager_init(&rr_client->subscription_manager, allocator, rr_client->client_adapter, &subscription_manager_options); + aws_rr_subscription_manager_init( + &rr_client->subscription_manager, allocator, rr_client->client_adapter, &subscription_manager_options); } -static void s_mqtt_request_response_client_initialize_task_fn(struct aws_task *task, void *arg, enum aws_task_status task_status) { +static void s_mqtt_request_response_client_initialize_task_fn( + struct aws_task *task, + void *arg, + enum aws_task_status task_status) { (void)task; AWS_FATAL_ASSERT(task_status != AWS_TASK_STATUS_CANCELED); @@ -221,23 +285,31 @@ static void s_mqtt_request_response_client_initialize_task_fn(struct aws_task *t (*client->config.initialized_callback)(client->config.user_data); } - // give up the internal ref we held while the task was pending + /* give up the internal ref we held while the task was pending */ aws_ref_count_release(&client->internal_ref_count); } -static void s_setup_cross_thread_initialization(struct aws_mqtt_request_response_client * rr_client) { - // now that it exists, 1 internal ref belongs to protocol adapter termination +static void s_setup_cross_thread_initialization(struct aws_mqtt_request_response_client *rr_client) { + /* now that it exists, 1 internal ref belongs to protocol adapter termination */ aws_ref_count_acquire(&rr_client->internal_ref_count); - // 1 internal ref belongs to the initialize task until it runs + /* 1 internal ref belongs to the initialize task until it runs */ aws_ref_count_acquire(&rr_client->internal_ref_count); - aws_task_init(&rr_client->initialize_task, s_mqtt_request_response_client_initialize_task_fn, rr_client, "mqtt_rr_client_initialize"); + aws_task_init( + &rr_client->initialize_task, + s_mqtt_request_response_client_initialize_task_fn, + rr_client, + "mqtt_rr_client_initialize"); aws_event_loop_schedule_task_now(rr_client->loop, &rr_client->initialize_task); } -struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt311_client(struct aws_allocator *allocator, struct aws_mqtt_client_connection *client, const struct aws_mqtt_request_response_client_options *options) { +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt311_client( + struct aws_allocator *allocator, + struct aws_mqtt_client_connection *client, + const struct aws_mqtt_request_response_client_options *options) { - struct aws_mqtt_request_response_client *rr_client = s_aws_mqtt_request_response_client_new(allocator, options, aws_mqtt_client_connection_get_event_loop(client)); + struct aws_mqtt_request_response_client *rr_client = + s_aws_mqtt_request_response_client_new(allocator, options, aws_mqtt_client_connection_get_event_loop(client)); struct aws_mqtt_protocol_adapter_options adapter_options = { .subscription_event_callback = s_aws_rr_client_protocol_adapter_subscription_event_callback, @@ -258,15 +330,19 @@ struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_fr error: - // even on construction failures we still need to walk through the async shutdown process + /* even on construction failures we still need to walk through the async shutdown process */ aws_mqtt_request_response_client_release(rr_client); return NULL; } -struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt5_client(struct aws_allocator *allocator, struct aws_mqtt5_client *client, const struct aws_mqtt_request_response_client_options *options) { +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt5_client( + struct aws_allocator *allocator, + struct aws_mqtt5_client *client, + const struct aws_mqtt_request_response_client_options *options) { - struct aws_mqtt_request_response_client * rr_client = s_aws_mqtt_request_response_client_new(allocator, options, client->loop); + struct aws_mqtt_request_response_client *rr_client = + s_aws_mqtt_request_response_client_new(allocator, options, client->loop); struct aws_mqtt_protocol_adapter_options adapter_options = { .subscription_event_callback = s_aws_rr_client_protocol_adapter_subscription_event_callback, @@ -287,13 +363,14 @@ struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_fr error: - // even on construction failures we still need to walk through the async shutdown process + /* even on construction failures we still need to walk through the async shutdown process */ aws_mqtt_request_response_client_release(rr_client); return NULL; } -struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_acquire(struct aws_mqtt_request_response_client *client) { +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_acquire( + struct aws_mqtt_request_response_client *client) { if (client != NULL) { aws_ref_count_acquire(&client->external_ref_count); } @@ -301,7 +378,8 @@ struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_acquir return client; } -struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_release(struct aws_mqtt_request_response_client *client) { +struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_release( + struct aws_mqtt_request_response_client *client) { if (client != NULL) { aws_ref_count_release(&client->external_ref_count); }