Skip to content

Commit

Permalink
Comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Bret Ambrose committed Mar 4, 2024
1 parent c7d7cbe commit 5d3d2cd
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 43 deletions.
14 changes: 13 additions & 1 deletion include/aws/mqtt/request-response/request_response_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ struct aws_mqtt_request_response_client_options {
size_t max_subscriptions;
uint32_t operation_timeout_seconds;

// Do not bind the initialized callback; it exists mostly for tests and should not be exposed
/* Do not bind the initialized callback; it exists mostly for tests and should not be exposed */
aws_mqtt_request_response_client_initialized_callback_fn *initialized_callback;

aws_mqtt_request_response_client_terminated_callback_fn *terminated_callback;
Expand All @@ -28,19 +28,31 @@ struct aws_mqtt_request_response_client_options {

AWS_EXTERN_C_BEGIN

/*
* Create a new request-response client that uses an MQTT311 client.
*/
struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt311_client(
struct aws_allocator *allocator,
struct aws_mqtt_client_connection *client,
const struct aws_mqtt_request_response_client_options *options);

/*
* Create a new request-response client that uses an MQTT5 client.
*/
struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt5_client(
struct aws_allocator *allocator,
struct aws_mqtt5_client *client,
const struct aws_mqtt_request_response_client_options *options);

/*
* Add a reference to a request-response client
*/
struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_acquire(
struct aws_mqtt_request_response_client *client);

/*
* Remove a reference to a request-response client
*/
struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_release(
struct aws_mqtt_request_response_client *client);

Expand Down
162 changes: 120 additions & 42 deletions source/request-response/request_response_client.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#include <aws/mqtt/request-response/request_response_client.h>

Expand All @@ -13,16 +13,44 @@
#include <aws/mqtt/private/request-response/subscription_manager.h>
#include <aws/mqtt/private/v5/mqtt5_client_impl.h>

/* Tracks the current state of the request-response client */
enum aws_request_response_client_state {
// cross-thread initialization has not completed and all protocol adapter callbacks are ignored

/* cross-thread initialization has not completed and all protocol adapter callbacks are ignored */
AWS_RRCS_UNINITIALIZED,

/* Normal operating state for the client. */
AWS_RRCS_ACTIVE,

// asynchronously shutting down, no more servicing will be done and all protocol adapter callbacks are ignored
/* asynchronously shutting down, no more servicing will be done and all protocol adapter callbacks are ignored */
AWS_RRCS_SHUTTING_DOWN,
};

/*
* Request-Response Client Notes
*
* Ref-counting/Shutdown
*
* The request-response client uses a double ref-count pattern.
*
* External references represent user references. When the external reference reaches zero, the client's asynchronous
* shutdown process is started.
*
* Internal references block final destruction. Asynchronous shutdown will not complete until all internal references
* are dropped. In addition to one long-lived internal reference (the protocol client adapter's back reference to
* the request-response client), all event loop tasks that target the request-response client hold an internal
* reference between task submission and task completion. This ensures that the task always has a valid reference
* to the client, even if we're trying to shut down at the same time.
*
*
* Initialization
*
* Initialization is complicated by the fact that the subscription manager needs to be initialized from the
* event loop thread that the client/protocol adapter/protocol client are all seated on. To do this safely,
* we add an uninitialized state that ignores all callbacks and we schedule a task on initial construction to do
* the event-loop-only initialization. Once that initialization completes on the event loop thread, we move
* the client into an active state where it will process operations and protocol adapter callbacks.
*/
struct aws_mqtt_request_response_client {
struct aws_allocator *allocator;

Expand All @@ -47,12 +75,14 @@ struct aws_mqtt_request_response_client {
static void s_aws_rr_client_on_zero_internal_ref_count(void *context) {
struct aws_mqtt_request_response_client *client = context;

/* Both ref counts are zero, but it's still safest to schedule final destruction, not invoke it directly */
aws_event_loop_schedule_task_now(client->loop, &client->internal_shutdown_task);
}

static void s_aws_rr_client_on_zero_external_ref_count(void *context) {
struct aws_mqtt_request_response_client *client = context;

/* Start the asynchronous shutdown process */
aws_event_loop_schedule_task_now(client->loop, &client->external_shutdown_task);
}

Expand All @@ -67,7 +97,10 @@ static void s_mqtt_request_response_client_final_destroy(struct aws_mqtt_request
}
}

static void s_mqtt_request_response_client_internal_shutdown_task_fn(struct aws_task *task, void *arg, enum aws_task_status task_status) {
static void s_mqtt_request_response_client_internal_shutdown_task_fn(
struct aws_task *task,
void *arg,
enum aws_task_status task_status) {
(void)task;
(void)task_status;

Expand All @@ -76,13 +109,16 @@ static void s_mqtt_request_response_client_internal_shutdown_task_fn(struct aws_
/*
* All internal and external refs are gone; it is safe to clean up synchronously.
*
* The subscription manager is cleaned up and the protocol adapter has been shut down. All that's left is to
* free memory.
* The subscription manager is cleaned up and the protocol adapter has been shut down. No tasks targeting the
* client are active (other than this one). All that's left is to free memory.
*/
s_mqtt_request_response_client_final_destroy(client);
}

static void s_mqtt_request_response_client_external_shutdown_task_fn(struct aws_task *task, void *arg, enum aws_task_status task_status) {
static void s_mqtt_request_response_client_external_shutdown_task_fn(
struct aws_task *task,
void *arg,
enum aws_task_status task_status) {
(void)task;

AWS_FATAL_ASSERT(task_status != AWS_TASK_STATUS_CANCELED);
Expand All @@ -101,21 +137,25 @@ static void s_mqtt_request_response_client_external_shutdown_task_fn(struct aws_
aws_ref_count_release(&client->internal_ref_count);
}

static void s_aws_rr_client_subscription_status_event_callback(const struct aws_rr_subscription_status_event *event, void *userdata) {
static void s_aws_rr_client_subscription_status_event_callback(
const struct aws_rr_subscription_status_event *event,
void *userdata) {
(void)event;
(void)userdata;

/*
* We must be on the event loop, but it's safer overall to process this event as a top-level event loop task. The subscription
* manager assumes that we won't call APIs on it while iterating subscription records and listeners.
* We must be on the event loop, but it's safer overall to process this event as a top-level event loop task. The
* subscription manager assumes that we won't call APIs on it while iterating subscription records and listeners.
*
* These tasks hold an internal reference while they exist.
*/

// NYI
/* NYI */
}

static void s_aws_rr_client_protocol_adapter_subscription_event_callback(const struct aws_protocol_adapter_subscription_event *event, void *user_data) {
static void s_aws_rr_client_protocol_adapter_subscription_event_callback(
const struct aws_protocol_adapter_subscription_event *event,
void *user_data) {
struct aws_mqtt_request_response_client *rr_client = user_data;

AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(rr_client->loop));
Expand All @@ -140,17 +180,19 @@ static void s_aws_rr_client_protocol_adapter_incoming_publish_callback(
return;
}

// NYI
/* NYI */
}

static void s_aws_rr_client_protocol_adapter_terminate_callback(void *user_data) {
struct aws_mqtt_request_response_client *rr_client = user_data;

// release the internal ref count "held" by the protocol adapter's existence
/* release the internal ref count "held" by the protocol adapter's existence */
aws_ref_count_release(&rr_client->internal_ref_count);
}

static void s_aws_rr_client_protocol_adapter_connection_event_callback(const struct aws_protocol_adapter_connection_event *event, void *user_data) {
static void s_aws_rr_client_protocol_adapter_connection_event_callback(
const struct aws_protocol_adapter_connection_event *event,
void *user_data) {
struct aws_mqtt_request_response_client *rr_client = user_data;

AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(rr_client->loop));
Expand All @@ -162,49 +204,71 @@ static void s_aws_rr_client_protocol_adapter_connection_event_callback(const str
aws_rr_subscription_manager_on_protocol_adapter_connection_event(&rr_client->subscription_manager, event);
}

static struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_client_new(struct aws_allocator *allocator, const struct aws_mqtt_request_response_client_options *options, struct aws_event_loop *loop) {
static struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_client_new(
struct aws_allocator *allocator,
const struct aws_mqtt_request_response_client_options *options,
struct aws_event_loop *loop) {
struct aws_rr_subscription_manager_options sm_options = {
.max_subscriptions = options->max_subscriptions,
.operation_timeout_seconds = options->operation_timeout_seconds,
};

// we can't initialize the subscription manager until we're running on the event loop, so make sure that
// initialize can't fail by checking its options for validity now.
/*
* We can't initialize the subscription manager until we're running on the event loop, so make sure that
* initialize can't fail by checking its options for validity now.
*/
if (!aws_rr_subscription_manager_are_options_valid(&sm_options)) {
return NULL;
}

struct aws_mqtt_request_response_client *rr_client = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_request_response_client));
struct aws_mqtt_request_response_client *rr_client =
aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_request_response_client));

rr_client->allocator = allocator;
rr_client->config = *options;
rr_client->loop = loop;
rr_client->state = AWS_RRCS_UNINITIALIZED;

aws_task_init(&rr_client->external_shutdown_task, s_mqtt_request_response_client_external_shutdown_task_fn, rr_client, "mqtt_rr_client_external_shutdown");
aws_task_init(&rr_client->internal_shutdown_task, s_mqtt_request_response_client_internal_shutdown_task_fn, rr_client, "mqtt_rr_client_internal_shutdown");

// 1 external ref to the caller
aws_task_init(
&rr_client->external_shutdown_task,
s_mqtt_request_response_client_external_shutdown_task_fn,
rr_client,
"mqtt_rr_client_external_shutdown");
aws_task_init(
&rr_client->internal_shutdown_task,
s_mqtt_request_response_client_internal_shutdown_task_fn,
rr_client,
"mqtt_rr_client_internal_shutdown");

/* The initial external ref belongs to the caller */
aws_ref_count_init(&rr_client->external_ref_count, rr_client, s_aws_rr_client_on_zero_external_ref_count);

// 1 internal ref count belongs to ourselves (the external ref count shutdown task)
/* The initial internal ref belongs to ourselves (the external ref count shutdown task) */
aws_ref_count_init(&rr_client->internal_ref_count, rr_client, s_aws_rr_client_on_zero_internal_ref_count);

return rr_client;
}

static void s_aws_rr_client_init_subscription_manager(struct aws_mqtt_request_response_client *rr_client, struct aws_allocator *allocator) {
static void s_aws_rr_client_init_subscription_manager(
struct aws_mqtt_request_response_client *rr_client,
struct aws_allocator *allocator) {
AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(rr_client->loop));

struct aws_rr_subscription_manager_options subscription_manager_options = {
.operation_timeout_seconds = rr_client->config.operation_timeout_seconds,
.max_subscriptions = rr_client->config.max_subscriptions,
.subscription_status_callback = s_aws_rr_client_subscription_status_event_callback,
.userdata = rr_client,
};

aws_rr_subscription_manager_init(&rr_client->subscription_manager, allocator, rr_client->client_adapter, &subscription_manager_options);
aws_rr_subscription_manager_init(
&rr_client->subscription_manager, allocator, rr_client->client_adapter, &subscription_manager_options);
}

static void s_mqtt_request_response_client_initialize_task_fn(struct aws_task *task, void *arg, enum aws_task_status task_status) {
static void s_mqtt_request_response_client_initialize_task_fn(
struct aws_task *task,
void *arg,
enum aws_task_status task_status) {
(void)task;

AWS_FATAL_ASSERT(task_status != AWS_TASK_STATUS_CANCELED);
Expand All @@ -221,23 +285,31 @@ static void s_mqtt_request_response_client_initialize_task_fn(struct aws_task *t
(*client->config.initialized_callback)(client->config.user_data);
}

// give up the internal ref we held while the task was pending
/* give up the internal ref we held while the task was pending */
aws_ref_count_release(&client->internal_ref_count);
}

static void s_setup_cross_thread_initialization(struct aws_mqtt_request_response_client * rr_client) {
// now that it exists, 1 internal ref belongs to protocol adapter termination
static void s_setup_cross_thread_initialization(struct aws_mqtt_request_response_client *rr_client) {
/* now that it exists, 1 internal ref belongs to protocol adapter termination */
aws_ref_count_acquire(&rr_client->internal_ref_count);

// 1 internal ref belongs to the initialize task until it runs
/* 1 internal ref belongs to the initialize task until it runs */
aws_ref_count_acquire(&rr_client->internal_ref_count);
aws_task_init(&rr_client->initialize_task, s_mqtt_request_response_client_initialize_task_fn, rr_client, "mqtt_rr_client_initialize");
aws_task_init(
&rr_client->initialize_task,
s_mqtt_request_response_client_initialize_task_fn,
rr_client,
"mqtt_rr_client_initialize");
aws_event_loop_schedule_task_now(rr_client->loop, &rr_client->initialize_task);
}

struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt311_client(struct aws_allocator *allocator, struct aws_mqtt_client_connection *client, const struct aws_mqtt_request_response_client_options *options) {
struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt311_client(
struct aws_allocator *allocator,
struct aws_mqtt_client_connection *client,
const struct aws_mqtt_request_response_client_options *options) {

struct aws_mqtt_request_response_client *rr_client = s_aws_mqtt_request_response_client_new(allocator, options, aws_mqtt_client_connection_get_event_loop(client));
struct aws_mqtt_request_response_client *rr_client =
s_aws_mqtt_request_response_client_new(allocator, options, aws_mqtt_client_connection_get_event_loop(client));

struct aws_mqtt_protocol_adapter_options adapter_options = {
.subscription_event_callback = s_aws_rr_client_protocol_adapter_subscription_event_callback,
Expand All @@ -258,15 +330,19 @@ struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_fr

error:

// even on construction failures we still need to walk through the async shutdown process
/* even on construction failures we still need to walk through the async shutdown process */
aws_mqtt_request_response_client_release(rr_client);

return NULL;
}

struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt5_client(struct aws_allocator *allocator, struct aws_mqtt5_client *client, const struct aws_mqtt_request_response_client_options *options) {
struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_from_mqtt5_client(
struct aws_allocator *allocator,
struct aws_mqtt5_client *client,
const struct aws_mqtt_request_response_client_options *options) {

struct aws_mqtt_request_response_client * rr_client = s_aws_mqtt_request_response_client_new(allocator, options, client->loop);
struct aws_mqtt_request_response_client *rr_client =
s_aws_mqtt_request_response_client_new(allocator, options, client->loop);

struct aws_mqtt_protocol_adapter_options adapter_options = {
.subscription_event_callback = s_aws_rr_client_protocol_adapter_subscription_event_callback,
Expand All @@ -287,21 +363,23 @@ struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_new_fr

error:

// even on construction failures we still need to walk through the async shutdown process
/* even on construction failures we still need to walk through the async shutdown process */
aws_mqtt_request_response_client_release(rr_client);

return NULL;
}

struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_acquire(struct aws_mqtt_request_response_client *client) {
struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_acquire(
struct aws_mqtt_request_response_client *client) {
if (client != NULL) {
aws_ref_count_acquire(&client->external_ref_count);
}

return client;
}

struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_release(struct aws_mqtt_request_response_client *client) {
struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_release(
struct aws_mqtt_request_response_client *client) {
if (client != NULL) {
aws_ref_count_release(&client->external_ref_count);
}
Expand Down

0 comments on commit 5d3d2cd

Please sign in to comment.