diff --git a/source/request-response/request_response_client.c b/source/request-response/request_response_client.c index 5822bdb8..9b937c43 100644 --- a/source/request-response/request_response_client.c +++ b/source/request-response/request_response_client.c @@ -21,30 +21,35 @@ enum aws_mqtt_request_response_operation_type { }; enum aws_mqtt_request_response_operation_state { - AWS_MRROS_NONE, // creation -> in event loop enqueue - AWS_MRROS_QUEUED, // in event loop queue -> non blocked response from subscription manager + AWS_MRROS_NONE, // creation -> in event loop enqueue + AWS_MRROS_QUEUED, // in event loop queue -> non blocked response from subscription manager AWS_MRROS_PENDING_SUBSCRIPTION, // subscribing response from sub manager -> subscription success/failure event - AWS_MRROS_AWAITING_RESPONSE, // (request only) subscription success -> (publish failure OR correlated response received) + AWS_MRROS_AWAITING_RESPONSE, // (request only) subscription success -> (publish failure OR correlated response + // received) AWS_MRROS_SUBSCRIBED, // (streaming only) subscription success -> (operation finished OR subscription ended event) - AWS_MRROS_TERMINAL, // (streaming only) (subscription failure OR subscription ended) -> operation close/terminate - AWS_MRROS_PENDING_DESTROY, // (request only) the request operation's destroy task has been scheduled but not yet executed + AWS_MRROS_TERMINAL, // (streaming only) (subscription failure OR subscription ended) -> operation close/terminate + AWS_MRROS_PENDING_DESTROY, // (request only) the request operation's destroy task has been scheduled but not yet + // executed }; /* -Tables/Lookups +Client Tables/Lookups (Authoritative operation container) 1. &operation.id -> &operation // added on in-thread enqueue, removed on operation completion/destruction (Response topic -> Correlation token extraction info) - 2. &topic -> &{topic, topic_buffer, correlation token json path buffer} // per-message-path add/replace on in-thread enqueue, removed on client destruction + 2. &topic -> &{topic, topic_buffer, correlation token json path buffer} // per-message-path add/replace on in-thread +enqueue, removed on client destruction (Correlation token -> request operation) - 3. &operation.correlation token -> (request) &operation // added on in-thread request op move to awaiting response state, removed on operation completion/destruction + 3. &operation.correlation token -> (request) &operation // added on in-thread request op move to awaiting response +state, removed on operation completion/destruction (Subscription filter -> all operations using that filter) - 4. &topic_filter -> &{topic_filter, linked_list} // added on in-thread pop from queue, removed from list on operation completion/destruction also checked for empty and removed from table + 4. &topic_filter -> &{topic_filter, linked_list} // added on in-thread pop from queue, removed from list on +operation completion/destruction also checked for empty and removed from table */ @@ -116,9 +121,8 @@ Tables/Lookups Remove from client's correlation token table Zero publish completion weak ref wrapper around operation dec-ref weak-ref-operation-wrapper - Check client's topic filter table entry for empty list, remove entry if so. (intrusive list removal already unlinked it from table) - If client is not shutting down - remove from subscription manager (otherwise it's already been cleaned up) + Check client's topic filter table entry for empty list, remove entry if so. (intrusive list removal already unlinked it + from table) If client is not shutting down remove from subscription manager (otherwise it's already been cleaned up) WakeServiceTask // queue may now be unblocked, does nothing if shutting down (Streaming) Invoke termination callback @@ -299,9 +303,6 @@ struct aws_mqtt_rr_client_operation { struct aws_mqtt_request_operation_storage request_storage; } storage; - uint64_t ack_timeout_timepoint_ns; - struct aws_priority_queue_node priority_queue_node; - /* Sometimes this is client->operation_queue, other times it is an entry in the client's topic_filter table */ struct aws_linked_list_node node; @@ -377,9 +378,6 @@ struct aws_mqtt_request_response_client { /* &operation->id -> &operation */ struct aws_hash_table operations; - - struct aws_task service_task; - uint64_t next_service_scheduled_timepoint; }; static void s_aws_rr_client_on_zero_internal_ref_count(void *context) { @@ -419,12 +417,7 @@ static void s_mqtt_request_response_client_internal_shutdown_task_fn( struct aws_mqtt_request_response_client *client = arg; - /* - * All internal and external refs are gone; it is safe to clean up synchronously. - * - * The subscription manager is cleaned up and the protocol adapter has been shut down. No tasks targeting the - * client are active (other than this one). All that's left is to free memory. - */ + /* All internal and external refs are gone; it is safe to clean up synchronously. */ s_mqtt_request_response_client_final_destroy(client); } @@ -436,7 +429,8 @@ static void s_complete_request_operation_with_failure(struct aws_mqtt_rr_client_ return; } - aws_mqtt_request_operation_completion_fn *completion_callback = operation->storage.request_storage.options.completion_callback; + aws_mqtt_request_operation_completion_fn *completion_callback = + operation->storage.request_storage.options.completion_callback; void *user_data = operation->storage.request_storage.options.user_data; if (completion_callback != NULL) { @@ -448,14 +442,17 @@ static void s_complete_request_operation_with_failure(struct aws_mqtt_rr_client_ aws_mqtt_rr_client_operation_release(operation); } -static void s_complete_request_operation_with_success(struct aws_mqtt_rr_client_operation *operation, struct aws_byte_cursor payload) { +static void s_complete_request_operation_with_success( + struct aws_mqtt_rr_client_operation *operation, + struct aws_byte_cursor payload) { AWS_FATAL_ASSERT(operation->type == AWS_MRROT_REQUEST); if (operation->state == AWS_MRROS_PENDING_DESTROY) { return; } - aws_mqtt_request_operation_completion_fn *completion_callback = operation->storage.request_storage.options.completion_callback; + aws_mqtt_request_operation_completion_fn *completion_callback = + operation->storage.request_storage.options.completion_callback; void *user_data = operation->storage.request_storage.options.user_data; if (completion_callback != NULL) { @@ -475,10 +472,13 @@ static void s_streaming_operation_on_client_shutdown(struct aws_mqtt_rr_client_o case AWS_MRROS_QUEUED: case AWS_MRROS_PENDING_SUBSCRIPTION: case AWS_MRROS_SUBSCRIBED: { - aws_mqtt_streaming_operation_subscription_status_fn *subscription_status_callback = operation->storage.streaming_storage.options.subscription_status_callback; + aws_mqtt_streaming_operation_subscription_status_fn *subscription_status_callback = + operation->storage.streaming_storage.options.subscription_status_callback; void *user_data = operation->storage.streaming_storage.options.user_data; if (subscription_status_callback != NULL) { - enum aws_rr_subscription_event_type status_type = (operation->state == AWS_MRROS_SUBSCRIBED) ? ARRSET_SUBSCRIPTION_ENDED : ARRSET_SUBSCRIPTION_SUBSCRIBE_FAILURE; + enum aws_rr_subscription_event_type status_type = (operation->state == AWS_MRROS_SUBSCRIBED) + ? ARRSET_SUBSCRIPTION_ENDED + : ARRSET_SUBSCRIPTION_SUBSCRIBE_FAILURE; (*subscription_status_callback)(status_type, error_code, user_data); } } @@ -498,7 +498,7 @@ static int s_rr_client_clean_up_operation(void *context, struct aws_hash_element /* Complete the request operation as a failure */ s_complete_request_operation_with_failure(operation, AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN); } else { - /* Streaming operations that are not subscribed should emit a subscription failure event */ + /* Non-terminal streaming operations should a subscription failure or ended event */ s_streaming_operation_on_client_shutdown(operation, AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN); } @@ -526,9 +526,12 @@ static void s_mqtt_request_response_client_external_shutdown_task_fn( /* * It is a client invariant that when external shutdown starts, it must be the case that there are no in-flight - * operations with un-executed submit tasks. This means it safe to assume that all tracked operations are - * either in the process of cleaning up already (state == AWS_MRROS_PENDING_DESTROY, requests only) or can be - * completed now (state != ??). Actual operation destruction and ref-count release is done by a scheduled task + * operations with un-executed submit tasks. This means it safe to assume that all tracked request operations are + * either in the process of cleaning up already (state == AWS_MRROS_PENDING_DESTROY) or can be + * completed now (state != AWS_MRROS_PENDING_DESTROY). Non-terminal streaming operations are moved into + * a terminal state and emit an appropriate failure/ended event. + * + * Actual operation destruction and client ref-count release is done by a scheduled task * on the operation that is triggered by dec-refing it (assuming streaming operations get closed by the binding * client). */ @@ -550,6 +553,11 @@ static void s_aws_rr_client_subscription_status_event_callback( * These tasks hold an internal reference while they exist. */ + struct aws_mqtt_request_response_client *rr_client = userdata; + + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(rr_client->loop)); + AWS_FATAL_ASSERT(rr_client->state != AWS_RRCS_SHUTTING_DOWN); + /* NYI */ } @@ -604,22 +612,6 @@ static void s_aws_rr_client_protocol_adapter_connection_event_callback( aws_rr_subscription_manager_on_protocol_adapter_connection_event(&rr_client->subscription_manager, event); } -static void s_wake_rr_client_service_task(struct aws_mqtt_request_response_client *rr_client) { - (void)rr_client; - - // NYI -} - -static void s_mqtt_rr_client_service_task_fn( struct aws_task *task, - void *arg, - enum aws_task_status task_status) { - (void)task; - (void)arg; - (void)task_status; - - // NYI -} - uint64_t aws_mqtt_hash_uint64_t(const void *item) { return *(uint64_t *)item; } @@ -653,30 +645,27 @@ static struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_clie rr_client->loop = loop; rr_client->state = AWS_RRCS_UNINITIALIZED; - aws_hash_table_init(&rr_client->operations, - allocator, - MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE, - aws_mqtt_hash_uint64_t, - aws_mqtt_compare_uint64_t_eq, - NULL, - NULL); + aws_hash_table_init( + &rr_client->operations, + allocator, + MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE, + aws_mqtt_hash_uint64_t, + aws_mqtt_compare_uint64_t_eq, + NULL, + NULL); aws_task_init( &rr_client->external_shutdown_task, s_mqtt_request_response_client_external_shutdown_task_fn, rr_client, "mqtt_rr_client_external_shutdown"); + aws_task_init( &rr_client->internal_shutdown_task, s_mqtt_request_response_client_internal_shutdown_task_fn, rr_client, "mqtt_rr_client_internal_shutdown"); - aws_task_init(&rr_client->service_task, - s_mqtt_rr_client_service_task_fn, - rr_client, - "mqtt_rr_client_service"); - /* The initial external ref belongs to the caller */ aws_ref_count_init(&rr_client->external_ref_count, rr_client, s_aws_rr_client_on_zero_external_ref_count); @@ -734,6 +723,7 @@ static void s_setup_cross_thread_initialization(struct aws_mqtt_request_response /* 1 internal ref belongs to the initialize task until it runs */ aws_ref_count_acquire(&rr_client->internal_ref_count); + aws_task_init( &rr_client->initialize_task, s_mqtt_request_response_client_initialize_task_fn, @@ -846,58 +836,80 @@ struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_client_rele ///////////////////////////////////////////////// -static bool s_are_request_operation_options_valid(const struct aws_mqtt_request_response_client *client, const struct aws_mqtt_request_operation_options *request_options) { +static bool s_are_request_operation_options_valid( + const struct aws_mqtt_request_response_client *client, + const struct aws_mqtt_request_operation_options *request_options) { if (request_options == NULL) { AWS_LOGF_ERROR(AWS_LS_MQTT_REQUEST_RESPONSE, "(%p) rr client - NULL request options", (void *)client); return false; } if (request_options->response_path_count == 0) { - AWS_LOGF_ERROR(AWS_LS_MQTT_REQUEST_RESPONSE, "(%p) rr client request options - no response paths supplied", (void *)client); + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "(%p) rr client request options - no response paths supplied", + (void *)client); return false; } for (size_t i = 0; i < request_options->response_path_count; ++i) { const struct aws_mqtt_request_operation_response_path *path = &request_options->response_paths[i]; if (!aws_mqtt_is_valid_topic(&path->topic)) { - AWS_LOGF_ERROR(AWS_LS_MQTT_REQUEST_RESPONSE, "(%p) rr client request options - "PRInSTR " is not a valid topic", (void *)client, AWS_BYTE_CURSOR_PRI(path->topic)); + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "(%p) rr client request options - " PRInSTR " is not a valid topic", + (void *)client, + AWS_BYTE_CURSOR_PRI(path->topic)); return false; } } if (request_options->correlation_token.len == 0) { - AWS_LOGF_ERROR(AWS_LS_MQTT_REQUEST_RESPONSE, "(%p) rr client request options - empty correlation token", (void *)client); + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, "(%p) rr client request options - empty correlation token", (void *)client); return false; } if (!aws_mqtt_is_valid_topic(&request_options->publish_topic)) { - AWS_LOGF_ERROR(AWS_LS_MQTT_REQUEST_RESPONSE, "(%p) rr client request options - "PRInSTR " is not a valid topic", (void *)client, AWS_BYTE_CURSOR_PRI(request_options->publish_topic)); + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "(%p) rr client request options - " PRInSTR " is not a valid topic", + (void *)client, + AWS_BYTE_CURSOR_PRI(request_options->publish_topic)); return false; } if (request_options->serialized_request.len == 0) { - AWS_LOGF_ERROR(AWS_LS_MQTT_REQUEST_RESPONSE, "(%p) rr client request options - empty request payload", (void *)client); + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, "(%p) rr client request options - empty request payload", (void *)client); return false; } return true; } -static bool s_are_streaming_operation_options_valid(struct aws_mqtt_request_response_client *client, const struct aws_mqtt_streaming_operation_options *streaming_options) { +static bool s_are_streaming_operation_options_valid( + struct aws_mqtt_request_response_client *client, + const struct aws_mqtt_streaming_operation_options *streaming_options) { if (streaming_options == NULL) { AWS_LOGF_ERROR(AWS_LS_MQTT_REQUEST_RESPONSE, "(%p) rr client - NULL streaming options", (void *)client); return false; } if (!aws_mqtt_is_valid_topic_filter(&streaming_options->topic_filter)) { - AWS_LOGF_ERROR(AWS_LS_MQTT_REQUEST_RESPONSE, "(%p) rr client streaming options - "PRInSTR " is not a valid topic filter", (void *)client, AWS_BYTE_CURSOR_PRI(streaming_options->topic_filter)); + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "(%p) rr client streaming options - " PRInSTR " is not a valid topic filter", + (void *)client, + AWS_BYTE_CURSOR_PRI(streaming_options->topic_filter)); return false; } return true; } -static uint64_t s_aws_mqtt_request_response_client_allocate_operation_id(struct aws_mqtt_request_response_client *client) { +static uint64_t s_aws_mqtt_request_response_client_allocate_operation_id( + struct aws_mqtt_request_response_client *client) { return aws_atomic_fetch_add(&client->next_id, 1); } @@ -922,7 +934,7 @@ static void s_mqtt_rr_client_submit_operation(struct aws_task *task, void *arg, operation->state = AWS_MRROS_QUEUED; - s_wake_rr_client_service_task(operation->client_internal_ref); + // NYI wake service done: @@ -947,6 +959,15 @@ static void s_aws_mqtt_request_operation_storage_clean_up(struct aws_mqtt_reques aws_byte_buf_clean_up(&storage->operation_data); } +static struct aws_byte_cursor s_aws_mqtt_rr_operation_get_subscription_topic_filter( + struct aws_mqtt_rr_client_operation *operation) { + if (operation->type == AWS_MRROT_REQUEST) { + return operation->storage.request_storage.options.subscription_topic_filter; + } else { + return operation->storage.streaming_storage.options.topic_filter; + } +} + static void s_mqtt_rr_client_destroy_operation(struct aws_task *task, void *arg, enum aws_task_status status) { (void)task; (void)status; @@ -960,8 +981,8 @@ static void s_mqtt_rr_client_destroy_operation(struct aws_task *task, void *arg, if (client->state != AWS_RRCS_SHUTTING_DOWN) { struct aws_rr_release_subscription_options release_options = { - .topic_filter = ??, - .id = operation->id, + .topic_filter = s_aws_mqtt_rr_operation_get_subscription_topic_filter(operation), + .operation_id = operation->id, }; aws_rr_subscription_manager_release_subscription(&client->subscription_manager, &release_options); } @@ -1003,7 +1024,9 @@ static void s_on_mqtt_rr_client_operation_zero_ref_count(void *context) { aws_event_loop_schedule_task_now(operation->client_internal_ref->loop, &operation->destroy_task); } -static void s_aws_mqtt_rr_client_operation_init_shared(struct aws_mqtt_rr_client_operation *operation, struct aws_mqtt_request_response_client *client) { +static void s_aws_mqtt_rr_client_operation_init_shared( + struct aws_mqtt_rr_client_operation *operation, + struct aws_mqtt_request_response_client *client) { operation->allocator = client->allocator; aws_ref_count_init(&operation->ref_count, operation, s_on_mqtt_rr_client_operation_zero_ref_count); @@ -1017,11 +1040,22 @@ static void s_aws_mqtt_rr_client_operation_init_shared(struct aws_mqtt_rr_client operation->id = s_aws_mqtt_request_response_client_allocate_operation_id(client); operation->state = AWS_MRROS_NONE; - aws_task_init(&operation->submit_task, s_mqtt_rr_client_submit_operation, operation, "MQTTRequestResponseClientOperationSubmit"); - aws_task_init(&operation->destroy_task, s_mqtt_rr_client_destroy_operation, operation, "MQTTRequestResponseClientOperationDestroy"); + aws_task_init( + &operation->submit_task, + s_mqtt_rr_client_submit_operation, + operation, + "MQTTRequestResponseClientOperationSubmit"); + aws_task_init( + &operation->destroy_task, + s_mqtt_rr_client_destroy_operation, + operation, + "MQTTRequestResponseClientOperationDestroy"); } -void s_aws_mqtt_request_operation_storage_init_from_options(struct aws_mqtt_request_operation_storage *storage, struct aws_allocator *allocator, const struct aws_mqtt_request_operation_options *request_options) { +void s_aws_mqtt_request_operation_storage_init_from_options( + struct aws_mqtt_request_operation_storage *storage, + struct aws_allocator *allocator, + const struct aws_mqtt_request_operation_options *request_options) { size_t bytes_needed = 0; bytes_needed += request_options->publish_topic.len; @@ -1039,18 +1073,32 @@ void s_aws_mqtt_request_operation_storage_init_from_options(struct aws_mqtt_requ storage->options = *request_options; aws_byte_buf_init(&storage->operation_data, allocator, bytes_needed); - aws_array_list_init_dynamic(&storage->operation_response_paths, allocator, request_options->response_path_count, sizeof(struct aws_mqtt_request_operation_response_path)); - - AWS_FATAL_ASSERT(aws_byte_buf_append_and_update(&storage->operation_data, &storage->options.publish_topic) == AWS_OP_SUCCESS); - AWS_FATAL_ASSERT(aws_byte_buf_append_and_update(&storage->operation_data, &storage->options.serialized_request) == AWS_OP_SUCCESS); - AWS_FATAL_ASSERT(aws_byte_buf_append_and_update(&storage->operation_data, &storage->options.correlation_token) == AWS_OP_SUCCESS); - AWS_FATAL_ASSERT(aws_byte_buf_append_and_update(&storage->operation_data, &storage->options.subscription_topic_filter) == AWS_OP_SUCCESS); + aws_array_list_init_dynamic( + &storage->operation_response_paths, + allocator, + request_options->response_path_count, + sizeof(struct aws_mqtt_request_operation_response_path)); + + AWS_FATAL_ASSERT( + aws_byte_buf_append_and_update(&storage->operation_data, &storage->options.publish_topic) == AWS_OP_SUCCESS); + AWS_FATAL_ASSERT( + aws_byte_buf_append_and_update(&storage->operation_data, &storage->options.serialized_request) == + AWS_OP_SUCCESS); + AWS_FATAL_ASSERT( + aws_byte_buf_append_and_update(&storage->operation_data, &storage->options.correlation_token) == + AWS_OP_SUCCESS); + AWS_FATAL_ASSERT( + aws_byte_buf_append_and_update(&storage->operation_data, &storage->options.subscription_topic_filter) == + AWS_OP_SUCCESS); for (size_t i = 0; i < request_options->response_path_count; ++i) { struct aws_mqtt_request_operation_response_path response_path = request_options->response_paths[i]; - AWS_FATAL_ASSERT(aws_byte_buf_append_and_update(&storage->operation_data, &response_path.topic) == AWS_OP_SUCCESS); - AWS_FATAL_ASSERT(aws_byte_buf_append_and_update(&storage->operation_data, &response_path.correlation_token_json_path) == AWS_OP_SUCCESS); + AWS_FATAL_ASSERT( + aws_byte_buf_append_and_update(&storage->operation_data, &response_path.topic) == AWS_OP_SUCCESS); + AWS_FATAL_ASSERT( + aws_byte_buf_append_and_update(&storage->operation_data, &response_path.correlation_token_json_path) == + AWS_OP_SUCCESS); aws_array_list_push_back(&storage->operation_response_paths, &response_path); } @@ -1072,11 +1120,13 @@ int aws_mqtt_request_response_client_submit_request( } struct aws_allocator *allocator = client->allocator; - struct aws_mqtt_rr_client_operation *operation = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_rr_client_operation)); + struct aws_mqtt_rr_client_operation *operation = + aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_rr_client_operation)); operation->allocator = allocator; operation->type = AWS_MRROT_REQUEST; - s_aws_mqtt_request_operation_storage_init_from_options(&operation->storage.request_storage, allocator, request_options); + s_aws_mqtt_request_operation_storage_init_from_options( + &operation->storage.request_storage, allocator, request_options); s_aws_mqtt_rr_client_operation_init_shared(operation, client); aws_event_loop_schedule_task_now(client->loop, &operation->submit_task); @@ -1084,13 +1134,17 @@ int aws_mqtt_request_response_client_submit_request( return AWS_OP_SUCCESS; } -void s_aws_mqtt_streaming_operation_storage_init_from_options(struct aws_mqtt_streaming_operation_storage *storage, struct aws_allocator *allocator, const struct aws_mqtt_streaming_operation_options *streaming_options) { +void s_aws_mqtt_streaming_operation_storage_init_from_options( + struct aws_mqtt_streaming_operation_storage *storage, + struct aws_allocator *allocator, + const struct aws_mqtt_streaming_operation_options *streaming_options) { size_t bytes_needed = streaming_options->topic_filter.len; storage->options = *streaming_options; aws_byte_buf_init(&storage->operation_data, allocator, bytes_needed); - AWS_FATAL_ASSERT(aws_byte_buf_append_and_update(&storage->operation_data, &storage->options.topic_filter) == AWS_OP_SUCCESS); + AWS_FATAL_ASSERT( + aws_byte_buf_append_and_update(&storage->operation_data, &storage->options.topic_filter) == AWS_OP_SUCCESS); } struct aws_mqtt_rr_client_operation *aws_mqtt_request_response_client_create_streaming_operation( @@ -1109,11 +1163,13 @@ struct aws_mqtt_rr_client_operation *aws_mqtt_request_response_client_create_str } struct aws_allocator *allocator = client->allocator; - struct aws_mqtt_rr_client_operation *operation = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_rr_client_operation)); + struct aws_mqtt_rr_client_operation *operation = + aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_rr_client_operation)); operation->allocator = allocator; operation->type = AWS_MRROT_STREAMING; - s_aws_mqtt_streaming_operation_storage_init_from_options(&operation->storage.streaming_storage, allocator, streaming_options); + s_aws_mqtt_streaming_operation_storage_init_from_options( + &operation->storage.streaming_storage, allocator, streaming_options); s_aws_mqtt_rr_client_operation_init_shared(operation, client); aws_event_loop_schedule_task_now(client->loop, &operation->submit_task); @@ -1121,7 +1177,8 @@ struct aws_mqtt_rr_client_operation *aws_mqtt_request_response_client_create_str return operation; } -struct aws_mqtt_rr_client_operation *aws_mqtt_rr_client_operation_acquire(struct aws_mqtt_rr_client_operation *operation) { +struct aws_mqtt_rr_client_operation *aws_mqtt_rr_client_operation_acquire( + struct aws_mqtt_rr_client_operation *operation) { if (operation != NULL) { aws_ref_count_acquire(&operation->ref_count); } @@ -1129,7 +1186,8 @@ struct aws_mqtt_rr_client_operation *aws_mqtt_rr_client_operation_acquire(struct return operation; } -struct aws_mqtt_rr_client_operation *aws_mqtt_rr_client_operation_release(struct aws_mqtt_rr_client_operation *operation) { +struct aws_mqtt_rr_client_operation *aws_mqtt_rr_client_operation_release( + struct aws_mqtt_rr_client_operation *operation) { if (operation != NULL) { aws_ref_count_release(&operation->ref_count); }