Skip to content

Commit

Permalink
Test fix + rework adapter operations to ref counted
Browse files Browse the repository at this point in the history
  • Loading branch information
Bret Ambrose committed Jul 10, 2023
1 parent d346ba7 commit 642c224
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 48 deletions.
9 changes: 5 additions & 4 deletions include/aws/mqtt/private/v5/mqtt3_to_mqtt5_adapter_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ struct aws_mqtt3_to_mqtt5_adapter_operation_base;

struct aws_mqtt3_to_mqtt5_adapter_operation_vtable {
void (*destroy_fn)(struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation);
void (*fail_fn)(struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation, int error_code);
void (*complete_fn)(struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation, void *completion_data);
};

struct aws_mqtt3_to_mqtt5_adapter_publish_options {
Expand Down Expand Up @@ -66,7 +64,7 @@ enum aws_mqtt3_to_mqtt5_adapter_operation_type {

struct aws_mqtt3_to_mqtt5_adapter_operation_base {
struct aws_allocator *allocator;
const struct aws_mqtt3_to_mqtt5_adapter_operation_vtable *vtable;
struct aws_ref_count ref_count;
void *impl;

/*
Expand Down Expand Up @@ -342,7 +340,10 @@ AWS_MQTT_API struct aws_mqtt3_to_mqtt5_adapter_operation_unsubscribe *
struct aws_allocator *allocator,
const struct aws_mqtt3_to_mqtt5_adapter_unsubscribe_options *options);

AWS_MQTT_API void aws_mqtt3_to_mqtt5_adapter_operation_destroy(
AWS_MQTT_API struct aws_mqtt3_to_mqtt5_adapter_operation_base *aws_mqtt3_to_mqtt5_adapter_operation_release(
struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation);

AWS_MQTT_API struct aws_mqtt3_to_mqtt5_adapter_operation_base *aws_mqtt3_to_mqtt5_adapter_operation_acquire(
struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation);

AWS_MQTT_API void aws_mqtt3_to_mqtt5_adapter_operation_reference_adapter(
Expand Down
110 changes: 67 additions & 43 deletions source/v5/mqtt3_to_mqtt5_adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -1505,7 +1505,8 @@ static void s_aws_mqtt_client_connection_5_release(void *impl) {
aws_ref_count_release(&adapter->external_refs);
}

static void s_adapter_publish_operation_destroy(struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation) {
static void s_adapter_publish_operation_destroy(void *context) {
struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation = context;
if (operation == NULL) {
return;
}
Expand All @@ -1532,14 +1533,6 @@ static void s_adapter_publish_operation_destroy(struct aws_mqtt3_to_mqtt5_adapte
}
}

static struct aws_mqtt3_to_mqtt5_adapter_operation_vtable s_publish_operation_vtable = {
.destroy_fn = s_adapter_publish_operation_destroy,
.fail_fn = NULL,
.complete_fn = NULL,
};

static struct aws_mqtt3_to_mqtt5_adapter_operation_vtable *s_publish_operation_vtable_ptr = &s_publish_operation_vtable;

static void s_aws_mqtt3_to_mqtt5_adapter_publish_completion_fn(
enum aws_mqtt5_packet_type packet_type,
const void *packet,
Expand Down Expand Up @@ -1570,7 +1563,7 @@ struct aws_mqtt3_to_mqtt5_adapter_operation_publish *aws_mqtt3_to_mqtt5_adapter_
aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt3_to_mqtt5_adapter_operation_publish));

publish_op->base.allocator = allocator;
publish_op->base.vtable = s_publish_operation_vtable_ptr;
aws_ref_count_init(&publish_op->base.ref_count, publish_op, s_adapter_publish_operation_destroy);
publish_op->base.impl = publish_op;
publish_op->base.type = AWS_MQTT3TO5_AOT_PUBLISH;
publish_op->base.adapter = options->adapter;
Expand Down Expand Up @@ -1601,7 +1594,7 @@ struct aws_mqtt3_to_mqtt5_adapter_operation_publish *aws_mqtt3_to_mqtt5_adapter_

error:

aws_mqtt3_to_mqtt5_adapter_operation_destroy(&publish_op->base);
aws_mqtt3_to_mqtt5_adapter_operation_release(&publish_op->base);

return NULL;
}
Expand All @@ -1616,7 +1609,12 @@ void s_adapter_publish_submission_fn(struct aws_task *task, void *arg, enum aws_
aws_mqtt5_client_submit_operation_internal(
adapter->client, &operation->publish_op->base, status != AWS_TASK_STATUS_RUN_READY);

/*
* We hold an internal ref to the adapter and an additional ref to the operation while the operation is in transit
* to the event loop thread. Release those now that we've handed the operation off.
*/
aws_mqtt3_to_mqtt5_adapter_operation_dereference_adapter(&operation->base);
aws_mqtt3_to_mqtt5_adapter_operation_release(&operation->base);
}

static uint16_t s_aws_mqtt_client_connection_5_publish(
Expand Down Expand Up @@ -1663,8 +1661,13 @@ static uint16_t s_aws_mqtt_client_connection_5_publish(
}

uint16_t synthetic_id = operation->base.id;

/* transient ref on adapter while moving to the event loop thread */
aws_mqtt3_to_mqtt5_adapter_operation_reference_adapter(&operation->base);

/* transient ref on operation while moving to the event loop thread */
aws_mqtt3_to_mqtt5_adapter_operation_acquire(&operation->base);

aws_task_init(
&operation->base.submission_task,
s_adapter_publish_submission_fn,
Expand All @@ -1677,12 +1680,13 @@ static uint16_t s_aws_mqtt_client_connection_5_publish(

error:

aws_mqtt3_to_mqtt5_adapter_operation_destroy(&operation->base);
aws_mqtt3_to_mqtt5_adapter_operation_release(&operation->base);

return 0;
}

static void s_adapter_subscribe_operation_destroy(struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation) {
static void s_adapter_subscribe_operation_destroy(void *context) {
struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation = context;
if (operation == NULL) {
return;
}
Expand Down Expand Up @@ -1718,15 +1722,6 @@ static void s_adapter_subscribe_operation_destroy(struct aws_mqtt3_to_mqtt5_adap
}
}

static struct aws_mqtt3_to_mqtt5_adapter_operation_vtable s_subscribe_operation_vtable = {
.destroy_fn = s_adapter_subscribe_operation_destroy,
.fail_fn = NULL,
.complete_fn = NULL,
};

static struct aws_mqtt3_to_mqtt5_adapter_operation_vtable *s_subscribe_operation_vtable_ptr =
&s_subscribe_operation_vtable;

static enum aws_mqtt_qos s_convert_mqtt5_suback_reason_code_to_mqtt3_granted_qos(
enum aws_mqtt5_suback_reason_code reason_code) {
switch (reason_code) {
Expand Down Expand Up @@ -1851,7 +1846,7 @@ struct aws_mqtt3_to_mqtt5_adapter_operation_subscribe *aws_mqtt3_to_mqtt5_adapte
aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt3_to_mqtt5_adapter_operation_subscribe));

subscribe_op->base.allocator = allocator;
subscribe_op->base.vtable = s_subscribe_operation_vtable_ptr;
aws_ref_count_init(&subscribe_op->base.ref_count, subscribe_op, s_adapter_subscribe_operation_destroy);
subscribe_op->base.impl = subscribe_op;
subscribe_op->base.type = AWS_MQTT3TO5_AOT_SUBSCRIBE;
subscribe_op->base.adapter = options->adapter;
Expand Down Expand Up @@ -1917,7 +1912,7 @@ struct aws_mqtt3_to_mqtt5_adapter_operation_subscribe *aws_mqtt3_to_mqtt5_adapte

error:

aws_mqtt3_to_mqtt5_adapter_operation_destroy(&subscribe_op->base);
aws_mqtt3_to_mqtt5_adapter_operation_release(&subscribe_op->base);

return NULL;
}
Expand All @@ -1940,7 +1935,11 @@ void s_adapter_subscribe_submission_fn(struct aws_task *task, void *arg, enum aw
aws_mqtt5_client_submit_operation_internal(
adapter->client, &operation->subscribe_op->base, status != AWS_TASK_STATUS_RUN_READY);

/* release the transient adapter reference for the move to the event loop */
aws_mqtt3_to_mqtt5_adapter_operation_dereference_adapter(&operation->base);

/* release the transient operation reference for the move to the event loop */
aws_mqtt3_to_mqtt5_adapter_operation_release(&operation->base);
}

static uint16_t s_aws_mqtt_client_connection_5_subscribe(
Expand Down Expand Up @@ -1982,8 +1981,13 @@ static uint16_t s_aws_mqtt_client_connection_5_subscribe(
}

uint16_t synthetic_id = operation->base.id;

/* add a transient reference to the adapter until this operation is passed into the event loop */
aws_mqtt3_to_mqtt5_adapter_operation_reference_adapter(&operation->base);

/* add a transient reference to the operation until this operation is passed into the event loop */
aws_mqtt3_to_mqtt5_adapter_operation_acquire(&operation->base);

aws_task_init(
&operation->base.submission_task,
s_adapter_subscribe_submission_fn,
Expand All @@ -1996,7 +2000,7 @@ static uint16_t s_aws_mqtt_client_connection_5_subscribe(

error:

aws_mqtt3_to_mqtt5_adapter_operation_destroy(&operation->base);
aws_mqtt3_to_mqtt5_adapter_operation_release(&operation->base);

return 0;
}
Expand Down Expand Up @@ -2030,8 +2034,13 @@ static uint16_t s_aws_mqtt_client_connection_5_subscribe_multiple(
}

uint16_t synthetic_id = operation->base.id;

/* add a transient reference to the adapter until this operation is passed into the event loop */
aws_mqtt3_to_mqtt5_adapter_operation_reference_adapter(&operation->base);

/* add a transient reference to the operation until this operation is passed into the event loop */
aws_mqtt3_to_mqtt5_adapter_operation_acquire(&operation->base);

aws_task_init(
&operation->base.submission_task,
s_adapter_subscribe_submission_fn,
Expand All @@ -2044,12 +2053,13 @@ static uint16_t s_aws_mqtt_client_connection_5_subscribe_multiple(

error:

aws_mqtt3_to_mqtt5_adapter_operation_destroy(&operation->base);
aws_mqtt3_to_mqtt5_adapter_operation_release(&operation->base);

return 0;
}

static void s_adapter_unsubscribe_operation_destroy(struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation) {
static void s_adapter_unsubscribe_operation_destroy(void *context) {
struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation = context;
if (operation == NULL) {
return;
}
Expand Down Expand Up @@ -2078,15 +2088,6 @@ static void s_adapter_unsubscribe_operation_destroy(struct aws_mqtt3_to_mqtt5_ad
}
}

static struct aws_mqtt3_to_mqtt5_adapter_operation_vtable s_unsubscribe_operation_vtable = {
.destroy_fn = s_adapter_unsubscribe_operation_destroy,
.fail_fn = NULL,
.complete_fn = NULL,
};

static struct aws_mqtt3_to_mqtt5_adapter_operation_vtable *s_unsubscribe_operation_vtable_ptr =
&s_unsubscribe_operation_vtable;

static void s_aws_mqtt3_to_mqtt5_adapter_unsubscribe_completion_fn(
const struct aws_mqtt5_packet_unsuback_view *unsuback,
int error_code,
Expand Down Expand Up @@ -2115,7 +2116,7 @@ struct aws_mqtt3_to_mqtt5_adapter_operation_unsubscribe *aws_mqtt3_to_mqtt5_adap
aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt3_to_mqtt5_adapter_operation_unsubscribe));

unsubscribe_op->base.allocator = allocator;
unsubscribe_op->base.vtable = s_unsubscribe_operation_vtable_ptr;
aws_ref_count_init(&unsubscribe_op->base.ref_count, unsubscribe_op, s_adapter_unsubscribe_operation_destroy);
unsubscribe_op->base.impl = unsubscribe_op;
unsubscribe_op->base.type = AWS_MQTT3TO5_AOT_UNSUBSCRIBE;
unsubscribe_op->base.adapter = options->adapter;
Expand Down Expand Up @@ -2146,7 +2147,7 @@ struct aws_mqtt3_to_mqtt5_adapter_operation_unsubscribe *aws_mqtt3_to_mqtt5_adap

error:

aws_mqtt3_to_mqtt5_adapter_operation_destroy(&unsubscribe_op->base);
aws_mqtt3_to_mqtt5_adapter_operation_release(&unsubscribe_op->base);

return NULL;
}
Expand All @@ -2164,7 +2165,11 @@ void s_adapter_unsubscribe_submission_fn(struct aws_task *task, void *arg, enum
aws_mqtt5_client_submit_operation_internal(
adapter->client, &operation->unsubscribe_op->base, status != AWS_TASK_STATUS_RUN_READY);

/* release the transient adapter reference for the move to the event loop */
aws_mqtt3_to_mqtt5_adapter_operation_dereference_adapter(&operation->base);

/* release the transient operation reference for the move to the event loop */
aws_mqtt3_to_mqtt5_adapter_operation_release(&operation->base);
}

static uint16_t s_aws_mqtt_client_connection_5_unsubscribe(
Expand Down Expand Up @@ -2198,8 +2203,13 @@ static uint16_t s_aws_mqtt_client_connection_5_unsubscribe(
}

uint16_t synthetic_id = operation->base.id;

/* add a transient reference to the adapter until this operation is passed into the event loop */
aws_mqtt3_to_mqtt5_adapter_operation_reference_adapter(&operation->base);

/* add a transient reference to the operation until this operation is passed into the event loop */
aws_mqtt3_to_mqtt5_adapter_operation_acquire(&operation->base);

aws_task_init(
&operation->base.submission_task,
s_adapter_unsubscribe_submission_fn,
Expand All @@ -2212,7 +2222,7 @@ static uint16_t s_aws_mqtt_client_connection_5_unsubscribe(

error:

aws_mqtt3_to_mqtt5_adapter_operation_destroy(&operation->base);
aws_mqtt3_to_mqtt5_adapter_operation_release(&operation->base);

return 0;
}
Expand Down Expand Up @@ -2315,7 +2325,7 @@ static int s_adapter_operation_clean_up(void *context, struct aws_hash_element *

struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation = operation_element->value;

aws_mqtt3_to_mqtt5_adapter_operation_destroy(operation);
aws_mqtt3_to_mqtt5_adapter_operation_release(operation);

return AWS_COMMON_HASH_TABLE_ITER_CONTINUE;
}
Expand Down Expand Up @@ -2382,12 +2392,26 @@ void aws_mqtt3_to_mqtt5_adapter_operation_table_remove_operation(

struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation = existing_element.value;
if (operation != NULL) {
aws_mqtt3_to_mqtt5_adapter_operation_destroy(operation);
aws_mqtt3_to_mqtt5_adapter_operation_release(operation);
}
}

struct aws_mqtt3_to_mqtt5_adapter_operation_base *aws_mqtt3_to_mqtt5_adapter_operation_release(
struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation) {
if (operation != NULL) {
aws_ref_count_release(&operation->ref_count);
}

return NULL;
}

void aws_mqtt3_to_mqtt5_adapter_operation_destroy(struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation) {
(*operation->vtable->destroy_fn)(operation);
struct aws_mqtt3_to_mqtt5_adapter_operation_base *aws_mqtt3_to_mqtt5_adapter_operation_acquire(
struct aws_mqtt3_to_mqtt5_adapter_operation_base *operation) {
if (operation != NULL) {
aws_ref_count_acquire(&operation->ref_count);
}

return operation;
}

void aws_mqtt3_to_mqtt5_adapter_operation_reference_adapter(
Expand Down
3 changes: 2 additions & 1 deletion tests/v5/mqtt3_to_mqtt5_adapter_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -1827,7 +1827,7 @@ static int s_mqtt3to5_adapter_operation_allocation_exhaustion_fn(struct aws_allo
aws_mqtt3_to_mqtt5_adapter_operation_new_publish(allocator, &publish_options);
if (i >= UINT16_MAX) {
ASSERT_FAILS(aws_mqtt3_to_mqtt5_adapter_operation_table_add_operation(operational_state, &publish->base));
aws_mqtt3_to_mqtt5_adapter_operation_destroy(&publish->base);
aws_mqtt3_to_mqtt5_adapter_operation_release(&publish->base);
continue;
}

Expand Down Expand Up @@ -2820,6 +2820,7 @@ static int s_mqtt3to5_adapter_subscribe_single_publish_fn(struct aws_allocator *

s_wait_for_n_adapter_operation_events(&fixture, AWS_MQTT3_OET_PUBLISH_COMPLETE, 1);
s_wait_for_n_adapter_operation_events(&fixture, AWS_MQTT3_OET_PUBLISH_RECEIVED_SUBSCRIBED, 1);
s_wait_for_n_adapter_operation_events(&fixture, AWS_MQTT3_OET_PUBLISH_RECEIVED_ANY, 1);

struct aws_mqtt3_operation_event expected_events[] = {
{
Expand Down

0 comments on commit 642c224

Please sign in to comment.