Skip to content

Commit

Permalink
Adapter resubscribe and get_stats
Browse files Browse the repository at this point in the history
  • Loading branch information
bretambrose committed Jul 12, 2023
1 parent 0f6268f commit b134351
Show file tree
Hide file tree
Showing 4 changed files with 475 additions and 126 deletions.
5 changes: 5 additions & 0 deletions include/aws/mqtt/private/v5/mqtt3_to_mqtt5_adapter_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ struct aws_mqtt3_to_mqtt5_adapter_publish_options {
void *on_complete_userdata;
};

/*
* A subscribe with no subscriptions represents a re-subscribe of all internally tracked topics. While this
* is a bit hacky, the alternative is to copy-and-paste almost the entire multi-subscribe adapter operation and
* supporting logic, which is approximately 300 lines.
*/
struct aws_mqtt3_to_mqtt5_adapter_subscribe_options {
struct aws_mqtt_client_connection_5_impl *adapter;

Expand Down
221 changes: 139 additions & 82 deletions source/v5/mqtt3_to_mqtt5_adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -1520,10 +1520,8 @@ static void s_adapter_publish_operation_destroy(void *context) {
}

/* We're going away before our MQTT5 operation, make sure it doesn't try to call us back when it completes */
if (publish_op->publish_op != NULL) {
publish_op->publish_op->completion_options.completion_callback = NULL;
publish_op->publish_op->completion_options.completion_user_data = NULL;
}
publish_op->publish_op->completion_options.completion_callback = NULL;
publish_op->publish_op->completion_options.completion_user_data = NULL;

aws_mqtt5_operation_release(&publish_op->publish_op->base);

Expand Down Expand Up @@ -1712,9 +1710,9 @@ static void s_adapter_subscribe_operation_destroy(void *context) {
if (subscribe_op->subscribe_op != NULL) {
subscribe_op->subscribe_op->completion_options.completion_callback = NULL;
subscribe_op->subscribe_op->completion_options.completion_user_data = NULL;
}

aws_mqtt5_operation_release(&subscribe_op->subscribe_op->base);
aws_mqtt5_operation_release(&subscribe_op->subscribe_op->base);
}

aws_mem_release(operation->allocator, operation);

Expand Down Expand Up @@ -1820,10 +1818,6 @@ static void s_aws_mqtt3_to_mqtt5_adapter_subscribe_completion_fn(
}

static int s_validate_adapter_subscribe_options(const struct aws_mqtt3_to_mqtt5_adapter_subscribe_options *options) {
if (options->subscription_count == 0) {
return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
}

for (size_t i = 0; i < options->subscription_count; ++i) {
struct aws_mqtt_topic_subscription *subscription_options = &options->subscriptions[i];
enum aws_mqtt_qos qos = subscription_options->qos;
Expand All @@ -1835,32 +1829,20 @@ static int s_validate_adapter_subscribe_options(const struct aws_mqtt3_to_mqtt5_
return AWS_OP_SUCCESS;
}

struct aws_mqtt3_to_mqtt5_adapter_operation_subscribe *aws_mqtt3_to_mqtt5_adapter_operation_new_subscribe(
struct aws_allocator *allocator,
const struct aws_mqtt3_to_mqtt5_adapter_subscribe_options *options) {

if (s_validate_adapter_subscribe_options(options)) {
return NULL;
}

struct aws_mqtt3_to_mqtt5_adapter_operation_subscribe *subscribe_op =
aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt3_to_mqtt5_adapter_operation_subscribe));

subscribe_op->base.allocator = allocator;
aws_ref_count_init(&subscribe_op->base.ref_count, subscribe_op, s_adapter_subscribe_operation_destroy);
subscribe_op->base.impl = subscribe_op;
subscribe_op->base.type = AWS_MQTT3TO5_AOT_SUBSCRIBE;
subscribe_op->base.adapter = options->adapter;
subscribe_op->base.holding_adapter_ref = false;
static int s_aws_mqtt3_to_mqtt5_adapter_build_subscribe(
struct aws_mqtt3_to_mqtt5_adapter_operation_subscribe *subscribe_op,
size_t subscription_count,
struct aws_mqtt_topic_subscription *subscriptions) {
struct aws_allocator *allocator = subscribe_op->base.allocator;

/* make persistent adapter sub array */
aws_array_list_init_dynamic(
&subscribe_op->subscriptions,
allocator,
options->subscription_count,
subscription_count,
sizeof(struct aws_mqtt_subscription_set_subscription_record *));
for (size_t i = 0; i < options->subscription_count; ++i) {
struct aws_mqtt_topic_subscription *subscription_options = &options->subscriptions[i];
for (size_t i = 0; i < subscription_count; ++i) {
struct aws_mqtt_topic_subscription *subscription_options = &subscriptions[i];

struct aws_mqtt_subscription_set_subscription_options subscription_record_options = {
.topic_filter = subscription_options->topic,
Expand All @@ -1876,20 +1858,19 @@ struct aws_mqtt3_to_mqtt5_adapter_operation_subscribe *aws_mqtt3_to_mqtt5_adapte
}

/* make temp mqtt5 subscription view array */
AWS_VARIABLE_LENGTH_ARRAY(
struct aws_mqtt5_subscription_view, mqtt5_subscription_buffer, options->subscription_count);
AWS_VARIABLE_LENGTH_ARRAY(struct aws_mqtt5_subscription_view, mqtt5_subscription_buffer, subscription_count);
struct aws_mqtt5_subscription_view *subscription_ptr = mqtt5_subscription_buffer;
for (size_t i = 0; i < options->subscription_count; ++i) {
for (size_t i = 0; i < subscription_count; ++i) {
struct aws_mqtt5_subscription_view *subscription = subscription_ptr + i;
AWS_ZERO_STRUCT(*subscription);

subscription->topic_filter = options->subscriptions[i].topic;
subscription->qos = (enum aws_mqtt5_qos)options->subscriptions[i].qos;
subscription->topic_filter = subscriptions[i].topic;
subscription->qos = (enum aws_mqtt5_qos)subscriptions[i].qos;
}

struct aws_mqtt5_packet_subscribe_view subscribe_view = {
.subscriptions = subscription_ptr,
.subscription_count = options->subscription_count,
.subscription_count = subscription_count,
};

struct aws_mqtt5_subscribe_completion_options subscribe_completion_options = {
Expand All @@ -1898,10 +1879,43 @@ struct aws_mqtt3_to_mqtt5_adapter_operation_subscribe *aws_mqtt3_to_mqtt5_adapte
};

subscribe_op->subscribe_op = aws_mqtt5_operation_subscribe_new(
allocator, options->adapter->client, &subscribe_view, &subscribe_completion_options);
allocator, subscribe_op->base.adapter->client, &subscribe_view, &subscribe_completion_options);

if (subscribe_op->subscribe_op == NULL) {
goto error;
/* subscribe options validation will have been raised as the error */
return AWS_OP_ERR;
}

return AWS_OP_SUCCESS;
}

struct aws_mqtt3_to_mqtt5_adapter_operation_subscribe *aws_mqtt3_to_mqtt5_adapter_operation_new_subscribe(
struct aws_allocator *allocator,
const struct aws_mqtt3_to_mqtt5_adapter_subscribe_options *options) {

if (s_validate_adapter_subscribe_options(options)) {
return NULL;
}

struct aws_mqtt3_to_mqtt5_adapter_operation_subscribe *subscribe_op =
aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt3_to_mqtt5_adapter_operation_subscribe));

subscribe_op->base.allocator = allocator;
aws_ref_count_init(&subscribe_op->base.ref_count, subscribe_op, s_adapter_subscribe_operation_destroy);
subscribe_op->base.impl = subscribe_op;
subscribe_op->base.type = AWS_MQTT3TO5_AOT_SUBSCRIBE;
subscribe_op->base.adapter = options->adapter;
subscribe_op->base.holding_adapter_ref = false;

/*
* If we're a regular subscribe, build the mqtt5 operation now. Otherwise, we have to wait until
* we're on the event loop thread and it's safe to query the subscription set.
*/
if (options->subscription_count > 0) {
if (s_aws_mqtt3_to_mqtt5_adapter_build_subscribe(
subscribe_op, options->subscription_count, options->subscriptions)) {
goto error;
}
}

subscribe_op->on_suback = options->on_suback;
Expand All @@ -1918,13 +1932,62 @@ struct aws_mqtt3_to_mqtt5_adapter_operation_subscribe *aws_mqtt3_to_mqtt5_adapte
return NULL;
}

static int s_aws_mqtt3_to_mqtt5_adapter_build_resubscribe(
struct aws_mqtt3_to_mqtt5_adapter_operation_subscribe *subscribe_op,
struct aws_array_list *full_subscriptions) {
size_t subscription_count = aws_array_list_length(full_subscriptions);

AWS_VARIABLE_LENGTH_ARRAY(struct aws_mqtt_topic_subscription, multi_sub_subscriptions, subscription_count);

for (size_t i = 0; i < subscription_count; ++i) {
struct aws_mqtt_subscription_set_subscription_options *existing_subscription = NULL;
aws_array_list_get_at_ptr(full_subscriptions, (void **)&existing_subscription, i);

multi_sub_subscriptions[i].topic = existing_subscription->topic_filter;
multi_sub_subscriptions[i].qos = (enum aws_mqtt_qos)existing_subscription->qos;
multi_sub_subscriptions[i].on_publish = existing_subscription->on_publish_received;
multi_sub_subscriptions[i].on_cleanup = existing_subscription->on_cleanup;
multi_sub_subscriptions[i].on_publish_ud = existing_subscription->callback_user_data;
}

return s_aws_mqtt3_to_mqtt5_adapter_build_subscribe(subscribe_op, subscription_count, multi_sub_subscriptions);
}

void s_adapter_subscribe_submission_fn(struct aws_task *task, void *arg, enum aws_task_status status) {
(void)task;

struct aws_mqtt3_to_mqtt5_adapter_operation_subscribe *operation = arg;

struct aws_mqtt_client_connection_5_impl *adapter = operation->base.adapter;

struct aws_array_list full_subscriptions;
AWS_ZERO_STRUCT(full_subscriptions);

/* If we're a re-subscribe, it's now safe to build the subscription set and MQTT5 subscribe op */
if (operation->subscribe_op == NULL) {
aws_mqtt_subscription_set_get_subscriptions(adapter->subscriptions, &full_subscriptions);
size_t subscription_count = aws_array_list_length(&full_subscriptions);
if (subscription_count == 0 || s_aws_mqtt3_to_mqtt5_adapter_build_resubscribe(operation, &full_subscriptions)) {
/* There's either nothing to do (no subscriptions) or we failed to build the op (should never happen) */
int error_code = aws_last_error();
if (subscription_count == 0) {
error_code = AWS_ERROR_MQTT_CONNECTION_RESUBSCRIBE_NO_TOPICS;
}

if (operation->on_multi_suback) {
(*operation->on_multi_suback)(
&adapter->base, operation->base.id, NULL, error_code, operation->on_multi_suback_user_data);
}

/*
* Remove the persistent ref represented by being seated in the incomplete operations table.
* The other (transient) ref gets released at the end of the function.
*/
aws_mqtt3_to_mqtt5_adapter_operation_table_remove_operation(
&adapter->operational_state, operation->base.id);
goto complete;
}
}

size_t subscription_count = aws_array_list_length(&operation->subscriptions);
for (size_t i = 0; i < subscription_count; ++i) {
struct aws_mqtt_subscription_set_subscription_record *record = NULL;
Expand All @@ -1936,6 +1999,10 @@ void s_adapter_subscribe_submission_fn(struct aws_task *task, void *arg, enum aw
aws_mqtt5_client_submit_operation_internal(
adapter->client, &operation->subscribe_op->base, status != AWS_TASK_STATUS_RUN_READY);

complete:

aws_array_list_clean_up(&full_subscriptions);

/* release the transient adapter reference for the move to the event loop */
aws_mqtt3_to_mqtt5_adapter_operation_dereference_adapter(&operation->base);

Expand Down Expand Up @@ -2075,10 +2142,8 @@ static void s_adapter_unsubscribe_operation_destroy(void *context) {
}

/* We're going away before our MQTT5 operation, make sure it doesn't try to call us back when it completes */
if (unsubscribe_op->unsubscribe_op != NULL) {
unsubscribe_op->unsubscribe_op->completion_options.completion_callback = NULL;
unsubscribe_op->unsubscribe_op->completion_options.completion_user_data = NULL;
}
unsubscribe_op->unsubscribe_op->completion_options.completion_callback = NULL;
unsubscribe_op->unsubscribe_op->completion_options.completion_user_data = NULL;

aws_mqtt5_operation_release(&unsubscribe_op->unsubscribe_op->base);

Expand Down Expand Up @@ -2276,59 +2341,51 @@ static int s_aws_mqtt_client_connection_5_get_stats(
static uint16_t s_aws_mqtt_5_resubscribe_existing_topics(
void *impl,
aws_mqtt_suback_multi_fn *on_suback,
void *on_suback_ud) {

(void)on_suback;
(void)on_suback_ud;

uint16_t packet_id = 0;
struct aws_mqtt_client_connection_5_impl *connection = impl;

struct aws_array_list full_subscriptions;
AWS_ZERO_STRUCT(full_subscriptions);
void *on_suback_user_data) {

aws_mqtt_subscription_set_get_subscriptions(connection->subscriptions, &full_subscriptions);
struct aws_mqtt_client_connection_5_impl *adapter = impl;

size_t subscription_count = aws_array_list_length(&full_subscriptions);
struct aws_mqtt3_to_mqtt5_adapter_subscribe_options subscribe_options = {
.adapter = adapter,
.subscriptions = NULL,
.subscription_count = 0,
.on_multi_suback = on_suback,
.on_multi_suback_user_data = on_suback_user_data,
};

AWS_VARIABLE_LENGTH_ARRAY(struct aws_mqtt_topic_subscription, multi_sub_subscriptions, subscription_count);
AWS_VARIABLE_LENGTH_ARRAY(struct aws_mqtt_topic_subscription *, multi_sub_subscription_ptrs, subscription_count);
struct aws_mqtt3_to_mqtt5_adapter_operation_subscribe *operation =
aws_mqtt3_to_mqtt5_adapter_operation_new_subscribe(adapter->allocator, &subscribe_options);
if (operation == NULL) {
return 0;
}

if (subscription_count == 0) {
aws_raise_error(AWS_ERROR_MQTT_CONNECTION_RESUBSCRIBE_NO_TOPICS);
goto done;
if (aws_mqtt3_to_mqtt5_adapter_operation_table_add_operation(&adapter->operational_state, &operation->base)) {
goto error;
}

for (size_t i = 0; i < subscription_count; ++i) {
struct aws_mqtt_subscription_set_subscription_options *existing_subscription = NULL;
aws_array_list_get_at_ptr(&full_subscriptions, (void **)&existing_subscription, i);
uint16_t synthetic_id = operation->base.id;

multi_sub_subscriptions[i].topic = existing_subscription->topic_filter;
multi_sub_subscriptions[i].qos = (enum aws_mqtt_qos)existing_subscription->qos;
multi_sub_subscriptions[i].on_publish = existing_subscription->on_publish_received;
multi_sub_subscriptions[i].on_cleanup = existing_subscription->on_cleanup;
multi_sub_subscriptions[i].on_publish_ud = existing_subscription->callback_user_data;
/* add a transient reference to the adapter until this operation is passed into the event loop */
aws_mqtt3_to_mqtt5_adapter_operation_reference_adapter(&operation->base);

multi_sub_subscription_ptrs[i] = &multi_sub_subscriptions[i];
}
/* add a transient reference to the operation until this operation is passed into the event loop */
aws_mqtt3_to_mqtt5_adapter_operation_acquire(&operation->base);

struct aws_array_list multi_subscription_ptr_list;
AWS_ZERO_STRUCT(multi_subscription_ptr_list);
aws_task_init(
&operation->base.submission_task,
s_adapter_subscribe_submission_fn,
operation,
"Mqtt3ToMqtt5AdapterSubscribeResubscribe");

aws_array_list_init_static_from_initialized(
&multi_subscription_ptr_list,
multi_sub_subscription_ptrs,
subscription_count,
sizeof(struct aws_mqtt_topic_subscription *));
aws_event_loop_schedule_task_now(adapter->loop, &operation->base.submission_task);

packet_id =
s_aws_mqtt_client_connection_5_subscribe_multiple(impl, &multi_subscription_ptr_list, on_suback, on_suback_ud);
return synthetic_id;

done:
error:

aws_array_list_clean_up(&full_subscriptions);
aws_mqtt3_to_mqtt5_adapter_operation_release(&operation->base);

return packet_id;
return 0;
}

static struct aws_mqtt_client_connection_vtable s_aws_mqtt_client_connection_5_vtable = {
Expand Down
17 changes: 3 additions & 14 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -392,20 +392,9 @@ add_test_case(mqtt3to5_adapter_subscribe_multi_overlapping_publish)
add_test_case(mqtt3to5_adapter_unsubscribe_success)
add_test_case(mqtt3to5_adapter_unsubscribe_failure)
add_test_case(mqtt3to5_adapter_unsubscribe_overlapped)

add_test_case(mqtt_subscription_set_add_empty_not_subbed)
add_test_case(mqtt_subscription_set_add_single_path)
add_test_case(mqtt_subscription_set_add_overlapped_branching_paths)
add_test_case(mqtt_subscription_set_remove_overlapping_path)
add_test_case(mqtt_subscription_set_remove_branching_path)
add_test_case(mqtt_subscription_set_remove_invalid)
add_test_case(mqtt_subscription_set_remove_empty_segments)
add_test_case(mqtt_subscription_set_add_remove_repeated)
add_test_case(mqtt_subscription_set_publish_single_path)
add_test_case(mqtt_subscription_set_publish_multi_path)
add_test_case(mqtt_subscription_set_publish_single_level_wildcards)
add_test_case(mqtt_subscription_set_publish_multi_level_wildcards)
add_test_case(mqtt_subscription_set_get_subscriptions)
add_test_case(mqtt3to5_adapter_get_stats)
add_test_case(mqtt3to5_adapter_resubscribe_nothing)
add_test_case(mqtt3to5_adapter_resubscribe_something)

add_test_case(mqtt_subscription_set_add_empty_not_subbed)
add_test_case(mqtt_subscription_set_add_single_path)
Expand Down
Loading

0 comments on commit b134351

Please sign in to comment.