diff --git a/include/aws/mqtt/private/v5/mqtt3_to_mqtt5_adapter_impl.h b/include/aws/mqtt/private/v5/mqtt3_to_mqtt5_adapter_impl.h index bf51b66a..adae3f8d 100644 --- a/include/aws/mqtt/private/v5/mqtt3_to_mqtt5_adapter_impl.h +++ b/include/aws/mqtt/private/v5/mqtt3_to_mqtt5_adapter_impl.h @@ -20,8 +20,6 @@ struct aws_mqtt3_to_mqtt5_adapter_operation_base; struct aws_mqtt3_to_mqtt5_adapter_operation_vtable { void (*destroy_fn)(struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation); - void (*fail_fn)(struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation, int error_code); - void (*complete_fn)(struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation, void *completion_data); }; struct aws_mqtt3_to_mqtt5_adapter_publish_options { @@ -66,7 +64,7 @@ enum aws_mqtt3_to_mqtt5_adapter_operation_type { struct aws_mqtt3_to_mqtt5_adapter_operation_base { struct aws_allocator *allocator; - const struct aws_mqtt3_to_mqtt5_adapter_operation_vtable *vtable; + struct aws_ref_count ref_count; void *impl; /* @@ -342,7 +340,10 @@ AWS_MQTT_API struct aws_mqtt3_to_mqtt5_adapter_operation_unsubscribe * struct aws_allocator *allocator, const struct aws_mqtt3_to_mqtt5_adapter_unsubscribe_options *options); -AWS_MQTT_API void aws_mqtt3_to_mqtt5_adapter_operation_destroy( +AWS_MQTT_API struct aws_mqtt3_to_mqtt5_adapter_operation_base *aws_mqtt3_to_mqtt5_adapter_operation_release( + struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation); + +AWS_MQTT_API struct aws_mqtt3_to_mqtt5_adapter_operation_base *aws_mqtt3_to_mqtt5_adapter_operation_acquire( struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation); AWS_MQTT_API void aws_mqtt3_to_mqtt5_adapter_operation_reference_adapter( diff --git a/source/v5/mqtt3_to_mqtt5_adapter.c b/source/v5/mqtt3_to_mqtt5_adapter.c index b6069cb0..59c84dd3 100644 --- a/source/v5/mqtt3_to_mqtt5_adapter.c +++ b/source/v5/mqtt3_to_mqtt5_adapter.c @@ -1505,7 +1505,8 @@ static void s_aws_mqtt_client_connection_5_release(void *impl) { aws_ref_count_release(&adapter->external_refs); } -static void s_adapter_publish_operation_destroy(struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation) { +static void s_adapter_publish_operation_destroy(void *context) { + struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation = context; if (operation == NULL) { return; } @@ -1532,14 +1533,6 @@ static void s_adapter_publish_operation_destroy(struct aws_mqtt3_to_mqtt5_adapte } } -static struct aws_mqtt3_to_mqtt5_adapter_operation_vtable s_publish_operation_vtable = { - .destroy_fn = s_adapter_publish_operation_destroy, - .fail_fn = NULL, - .complete_fn = NULL, -}; - -static struct aws_mqtt3_to_mqtt5_adapter_operation_vtable *s_publish_operation_vtable_ptr = &s_publish_operation_vtable; - static void s_aws_mqtt3_to_mqtt5_adapter_publish_completion_fn( enum aws_mqtt5_packet_type packet_type, const void *packet, @@ -1570,7 +1563,7 @@ struct aws_mqtt3_to_mqtt5_adapter_operation_publish *aws_mqtt3_to_mqtt5_adapter_ aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt3_to_mqtt5_adapter_operation_publish)); publish_op->base.allocator = allocator; - publish_op->base.vtable = s_publish_operation_vtable_ptr; + aws_ref_count_init(&publish_op->base.ref_count, publish_op, s_adapter_publish_operation_destroy); publish_op->base.impl = publish_op; publish_op->base.type = AWS_MQTT3TO5_AOT_PUBLISH; publish_op->base.adapter = options->adapter; @@ -1601,7 +1594,7 @@ struct aws_mqtt3_to_mqtt5_adapter_operation_publish *aws_mqtt3_to_mqtt5_adapter_ error: - aws_mqtt3_to_mqtt5_adapter_operation_destroy(&publish_op->base); + aws_mqtt3_to_mqtt5_adapter_operation_release(&publish_op->base); return NULL; } @@ -1616,7 +1609,12 @@ void s_adapter_publish_submission_fn(struct aws_task *task, void *arg, enum aws_ aws_mqtt5_client_submit_operation_internal( adapter->client, &operation->publish_op->base, status != AWS_TASK_STATUS_RUN_READY); + /* + * We hold an internal ref to the adapter and an additional ref to the operation while the operation is in transit + * to the event loop thread. Release those now that we've handed the operation off. + */ aws_mqtt3_to_mqtt5_adapter_operation_dereference_adapter(&operation->base); + aws_mqtt3_to_mqtt5_adapter_operation_release(&operation->base); } static uint16_t s_aws_mqtt_client_connection_5_publish( @@ -1663,8 +1661,13 @@ static uint16_t s_aws_mqtt_client_connection_5_publish( } uint16_t synthetic_id = operation->base.id; + + /* transient ref on adapter while moving to the event loop thread */ aws_mqtt3_to_mqtt5_adapter_operation_reference_adapter(&operation->base); + /* transient ref on operation while moving to the event loop thread */ + aws_mqtt3_to_mqtt5_adapter_operation_acquire(&operation->base); + aws_task_init( &operation->base.submission_task, s_adapter_publish_submission_fn, @@ -1677,12 +1680,13 @@ static uint16_t s_aws_mqtt_client_connection_5_publish( error: - aws_mqtt3_to_mqtt5_adapter_operation_destroy(&operation->base); + aws_mqtt3_to_mqtt5_adapter_operation_release(&operation->base); return 0; } -static void s_adapter_subscribe_operation_destroy(struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation) { +static void s_adapter_subscribe_operation_destroy(void *context) { + struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation = context; if (operation == NULL) { return; } @@ -1718,15 +1722,6 @@ static void s_adapter_subscribe_operation_destroy(struct aws_mqtt3_to_mqtt5_adap } } -static struct aws_mqtt3_to_mqtt5_adapter_operation_vtable s_subscribe_operation_vtable = { - .destroy_fn = s_adapter_subscribe_operation_destroy, - .fail_fn = NULL, - .complete_fn = NULL, -}; - -static struct aws_mqtt3_to_mqtt5_adapter_operation_vtable *s_subscribe_operation_vtable_ptr = - &s_subscribe_operation_vtable; - static enum aws_mqtt_qos s_convert_mqtt5_suback_reason_code_to_mqtt3_granted_qos( enum aws_mqtt5_suback_reason_code reason_code) { switch (reason_code) { @@ -1851,7 +1846,7 @@ struct aws_mqtt3_to_mqtt5_adapter_operation_subscribe *aws_mqtt3_to_mqtt5_adapte aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt3_to_mqtt5_adapter_operation_subscribe)); subscribe_op->base.allocator = allocator; - subscribe_op->base.vtable = s_subscribe_operation_vtable_ptr; + aws_ref_count_init(&subscribe_op->base.ref_count, subscribe_op, s_adapter_subscribe_operation_destroy); subscribe_op->base.impl = subscribe_op; subscribe_op->base.type = AWS_MQTT3TO5_AOT_SUBSCRIBE; subscribe_op->base.adapter = options->adapter; @@ -1917,7 +1912,7 @@ struct aws_mqtt3_to_mqtt5_adapter_operation_subscribe *aws_mqtt3_to_mqtt5_adapte error: - aws_mqtt3_to_mqtt5_adapter_operation_destroy(&subscribe_op->base); + aws_mqtt3_to_mqtt5_adapter_operation_release(&subscribe_op->base); return NULL; } @@ -1940,7 +1935,11 @@ void s_adapter_subscribe_submission_fn(struct aws_task *task, void *arg, enum aw aws_mqtt5_client_submit_operation_internal( adapter->client, &operation->subscribe_op->base, status != AWS_TASK_STATUS_RUN_READY); + /* release the transient adapter reference for the move to the event loop */ aws_mqtt3_to_mqtt5_adapter_operation_dereference_adapter(&operation->base); + + /* release the transient operation reference for the move to the event loop */ + aws_mqtt3_to_mqtt5_adapter_operation_release(&operation->base); } static uint16_t s_aws_mqtt_client_connection_5_subscribe( @@ -1982,8 +1981,13 @@ static uint16_t s_aws_mqtt_client_connection_5_subscribe( } uint16_t synthetic_id = operation->base.id; + + /* add a transient reference to the adapter until this operation is passed into the event loop */ aws_mqtt3_to_mqtt5_adapter_operation_reference_adapter(&operation->base); + /* add a transient reference to the operation until this operation is passed into the event loop */ + aws_mqtt3_to_mqtt5_adapter_operation_acquire(&operation->base); + aws_task_init( &operation->base.submission_task, s_adapter_subscribe_submission_fn, @@ -1996,7 +2000,7 @@ static uint16_t s_aws_mqtt_client_connection_5_subscribe( error: - aws_mqtt3_to_mqtt5_adapter_operation_destroy(&operation->base); + aws_mqtt3_to_mqtt5_adapter_operation_release(&operation->base); return 0; } @@ -2030,8 +2034,13 @@ static uint16_t s_aws_mqtt_client_connection_5_subscribe_multiple( } uint16_t synthetic_id = operation->base.id; + + /* add a transient reference to the adapter until this operation is passed into the event loop */ aws_mqtt3_to_mqtt5_adapter_operation_reference_adapter(&operation->base); + /* add a transient reference to the operation until this operation is passed into the event loop */ + aws_mqtt3_to_mqtt5_adapter_operation_acquire(&operation->base); + aws_task_init( &operation->base.submission_task, s_adapter_subscribe_submission_fn, @@ -2044,12 +2053,13 @@ static uint16_t s_aws_mqtt_client_connection_5_subscribe_multiple( error: - aws_mqtt3_to_mqtt5_adapter_operation_destroy(&operation->base); + aws_mqtt3_to_mqtt5_adapter_operation_release(&operation->base); return 0; } -static void s_adapter_unsubscribe_operation_destroy(struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation) { +static void s_adapter_unsubscribe_operation_destroy(void *context) { + struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation = context; if (operation == NULL) { return; } @@ -2078,15 +2088,6 @@ static void s_adapter_unsubscribe_operation_destroy(struct aws_mqtt3_to_mqtt5_ad } } -static struct aws_mqtt3_to_mqtt5_adapter_operation_vtable s_unsubscribe_operation_vtable = { - .destroy_fn = s_adapter_unsubscribe_operation_destroy, - .fail_fn = NULL, - .complete_fn = NULL, -}; - -static struct aws_mqtt3_to_mqtt5_adapter_operation_vtable *s_unsubscribe_operation_vtable_ptr = - &s_unsubscribe_operation_vtable; - static void s_aws_mqtt3_to_mqtt5_adapter_unsubscribe_completion_fn( const struct aws_mqtt5_packet_unsuback_view *unsuback, int error_code, @@ -2115,7 +2116,7 @@ struct aws_mqtt3_to_mqtt5_adapter_operation_unsubscribe *aws_mqtt3_to_mqtt5_adap aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt3_to_mqtt5_adapter_operation_unsubscribe)); unsubscribe_op->base.allocator = allocator; - unsubscribe_op->base.vtable = s_unsubscribe_operation_vtable_ptr; + aws_ref_count_init(&unsubscribe_op->base.ref_count, unsubscribe_op, s_adapter_unsubscribe_operation_destroy); unsubscribe_op->base.impl = unsubscribe_op; unsubscribe_op->base.type = AWS_MQTT3TO5_AOT_UNSUBSCRIBE; unsubscribe_op->base.adapter = options->adapter; @@ -2146,7 +2147,7 @@ struct aws_mqtt3_to_mqtt5_adapter_operation_unsubscribe *aws_mqtt3_to_mqtt5_adap error: - aws_mqtt3_to_mqtt5_adapter_operation_destroy(&unsubscribe_op->base); + aws_mqtt3_to_mqtt5_adapter_operation_release(&unsubscribe_op->base); return NULL; } @@ -2164,7 +2165,11 @@ void s_adapter_unsubscribe_submission_fn(struct aws_task *task, void *arg, enum aws_mqtt5_client_submit_operation_internal( adapter->client, &operation->unsubscribe_op->base, status != AWS_TASK_STATUS_RUN_READY); + /* release the transient adapter reference for the move to the event loop */ aws_mqtt3_to_mqtt5_adapter_operation_dereference_adapter(&operation->base); + + /* release the transient operation reference for the move to the event loop */ + aws_mqtt3_to_mqtt5_adapter_operation_release(&operation->base); } static uint16_t s_aws_mqtt_client_connection_5_unsubscribe( @@ -2198,8 +2203,13 @@ static uint16_t s_aws_mqtt_client_connection_5_unsubscribe( } uint16_t synthetic_id = operation->base.id; + + /* add a transient reference to the adapter until this operation is passed into the event loop */ aws_mqtt3_to_mqtt5_adapter_operation_reference_adapter(&operation->base); + /* add a transient reference to the operation until this operation is passed into the event loop */ + aws_mqtt3_to_mqtt5_adapter_operation_acquire(&operation->base); + aws_task_init( &operation->base.submission_task, s_adapter_unsubscribe_submission_fn, @@ -2212,7 +2222,7 @@ static uint16_t s_aws_mqtt_client_connection_5_unsubscribe( error: - aws_mqtt3_to_mqtt5_adapter_operation_destroy(&operation->base); + aws_mqtt3_to_mqtt5_adapter_operation_release(&operation->base); return 0; } @@ -2315,7 +2325,7 @@ static int s_adapter_operation_clean_up(void *context, struct aws_hash_element * struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation = operation_element->value; - aws_mqtt3_to_mqtt5_adapter_operation_destroy(operation); + aws_mqtt3_to_mqtt5_adapter_operation_release(operation); return AWS_COMMON_HASH_TABLE_ITER_CONTINUE; } @@ -2382,12 +2392,26 @@ void aws_mqtt3_to_mqtt5_adapter_operation_table_remove_operation( struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation = existing_element.value; if (operation != NULL) { - aws_mqtt3_to_mqtt5_adapter_operation_destroy(operation); + aws_mqtt3_to_mqtt5_adapter_operation_release(operation); + } +} + +struct aws_mqtt3_to_mqtt5_adapter_operation_base *aws_mqtt3_to_mqtt5_adapter_operation_release( + struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation) { + if (operation != NULL) { + aws_ref_count_release(&operation->ref_count); } + + return NULL; } -void aws_mqtt3_to_mqtt5_adapter_operation_destroy(struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation) { - (*operation->vtable->destroy_fn)(operation); +struct aws_mqtt3_to_mqtt5_adapter_operation_base *aws_mqtt3_to_mqtt5_adapter_operation_acquire( + struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation) { + if (operation != NULL) { + aws_ref_count_acquire(&operation->ref_count); + } + + return operation; } void aws_mqtt3_to_mqtt5_adapter_operation_reference_adapter( diff --git a/tests/v5/mqtt3_to_mqtt5_adapter_tests.c b/tests/v5/mqtt3_to_mqtt5_adapter_tests.c index a842c551..6334e272 100644 --- a/tests/v5/mqtt3_to_mqtt5_adapter_tests.c +++ b/tests/v5/mqtt3_to_mqtt5_adapter_tests.c @@ -1827,7 +1827,7 @@ static int s_mqtt3to5_adapter_operation_allocation_exhaustion_fn(struct aws_allo aws_mqtt3_to_mqtt5_adapter_operation_new_publish(allocator, &publish_options); if (i >= UINT16_MAX) { ASSERT_FAILS(aws_mqtt3_to_mqtt5_adapter_operation_table_add_operation(operational_state, &publish->base)); - aws_mqtt3_to_mqtt5_adapter_operation_destroy(&publish->base); + aws_mqtt3_to_mqtt5_adapter_operation_release(&publish->base); continue; } @@ -2820,6 +2820,7 @@ static int s_mqtt3to5_adapter_subscribe_single_publish_fn(struct aws_allocator * s_wait_for_n_adapter_operation_events(&fixture, AWS_MQTT3_OET_PUBLISH_COMPLETE, 1); s_wait_for_n_adapter_operation_events(&fixture, AWS_MQTT3_OET_PUBLISH_RECEIVED_SUBSCRIBED, 1); + s_wait_for_n_adapter_operation_events(&fixture, AWS_MQTT3_OET_PUBLISH_RECEIVED_ANY, 1); struct aws_mqtt3_operation_event expected_events[] = { {