Skip to content

Commit

Permalink
Checkpoint, first subscribe test
Browse files Browse the repository at this point in the history
  • Loading branch information
Bret Ambrose committed Jul 6, 2023
1 parent 744ca5e commit 8224cc9
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 11 deletions.
2 changes: 1 addition & 1 deletion include/aws/mqtt/private/v5/mqtt3_to_mqtt5_adapter_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ struct aws_mqtt_client_connection_5_impl {

struct aws_mqtt3_to_mqtt5_adapter_operation_table operational_state;

struct aws_mqtt_subscription_set subscriptions;
struct aws_mqtt_subscription_set *subscriptions;

/* All fields after here are internal to the adapter event loop thread */

Expand Down
24 changes: 17 additions & 7 deletions source/v5/mqtt3_to_mqtt5_adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ static void s_mqtt_adapter_final_destroy_task_fn(struct aws_task *task, void *ar
adapter->client->config->websocket_handshake_transform_user_data = NULL;
}

aws_mqtt_subscription_set_destroy(adapter->subscriptions);
aws_mqtt3_to_mqtt5_adapter_operation_table_clean_up(&adapter->operational_state);

adapter->client = aws_mqtt5_client_release(adapter->client);
Expand Down Expand Up @@ -1493,7 +1494,9 @@ static void s_adapter_publish_operation_destroy(struct aws_mqtt3_to_mqtt5_adapte
adapter_to_release = publish_op->base.adapter;
}

/* We're going away before our MQTT5 operation, make sure it doesn't try to call us back when it completes */
if (publish_op->publish_op != NULL) {
AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(publish_op->base.adapter->loop));
publish_op->publish_op->completion_options.completion_callback = NULL;
publish_op->publish_op->completion_options.completion_user_data = NULL;
}
Expand Down Expand Up @@ -1588,9 +1591,10 @@ void s_adapter_publish_submission_fn(struct aws_task *task, void *arg, enum aws_

struct aws_mqtt_client_connection_5_impl *adapter = operation->base.adapter;

aws_mqtt3_to_mqtt5_adapter_operation_dereference_adapter(&operation->base);
aws_mqtt5_client_submit_operation_internal(
adapter->client, &operation->publish_op->base, status != AWS_TASK_STATUS_RUN_READY);

aws_mqtt3_to_mqtt5_adapter_operation_dereference_adapter(&operation->base);
}

static uint16_t s_aws_mqtt_client_connection_5_publish(
Expand Down Expand Up @@ -1677,7 +1681,9 @@ static void s_adapter_subscribe_operation_destroy(struct aws_mqtt3_to_mqtt5_adap
adapter_to_release = subscribe_op->base.adapter;
}

/* We're going away before our MQTT5 operation, make sure it doesn't try to call us back when it completes */
if (subscribe_op->subscribe_op != NULL) {
AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(subscribe_op->base.adapter->loop));
subscribe_op->subscribe_op->completion_options.completion_callback = NULL;
subscribe_op->subscribe_op->completion_options.completion_user_data = NULL;
}
Expand Down Expand Up @@ -1905,18 +1911,18 @@ void s_adapter_subscribe_submission_fn(struct aws_task *task, void *arg, enum aw

struct aws_mqtt_client_connection_5_impl *adapter = operation->base.adapter;

aws_mqtt3_to_mqtt5_adapter_operation_dereference_adapter(&operation->base);

size_t subscription_count = aws_array_list_length(&operation->subscriptions);
for (size_t i = 0; i < subscription_count; ++i) {
struct aws_mqtt_subscription_set_subscription_record *record = NULL;
aws_array_list_get_at(&operation->subscriptions, &record, i);

aws_mqtt_subscription_set_add_subscription(&adapter->subscriptions, &record->subscription_view);
aws_mqtt_subscription_set_add_subscription(adapter->subscriptions, &record->subscription_view);
}

aws_mqtt5_client_submit_operation_internal(
adapter->client, &operation->subscribe_op->base, status != AWS_TASK_STATUS_RUN_READY);

aws_mqtt3_to_mqtt5_adapter_operation_dereference_adapter(&operation->base);
}

static uint16_t s_aws_mqtt_client_connection_5_subscribe(
Expand Down Expand Up @@ -2039,7 +2045,9 @@ static void s_adapter_unsubscribe_operation_destroy(struct aws_mqtt3_to_mqtt5_ad
adapter_to_release = unsubscribe_op->base.adapter;
}

/* We're going away before our MQTT5 operation, make sure it doesn't try to call us back when it completes */
if (unsubscribe_op->unsubscribe_op != NULL) {
AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(unsubscribe_op->base.adapter->loop));
unsubscribe_op->unsubscribe_op->completion_options.completion_callback = NULL;
unsubscribe_op->unsubscribe_op->completion_options.completion_user_data = NULL;
}
Expand Down Expand Up @@ -2131,13 +2139,13 @@ void s_adapter_unsubscribe_submission_fn(struct aws_task *task, void *arg, enum

struct aws_mqtt_client_connection_5_impl *adapter = operation->base.adapter;

aws_mqtt3_to_mqtt5_adapter_operation_dereference_adapter(&operation->base);

aws_mqtt_subscription_set_remove_subscription(
&adapter->subscriptions, aws_byte_cursor_from_buf(&operation->topic_filter));
adapter->subscriptions, aws_byte_cursor_from_buf(&operation->topic_filter));

aws_mqtt5_client_submit_operation_internal(
adapter->client, &operation->unsubscribe_op->base, status != AWS_TASK_STATUS_RUN_READY);

aws_mqtt3_to_mqtt5_adapter_operation_dereference_adapter(&operation->base);
}

static uint16_t s_aws_mqtt_client_connection_5_unsubscribe(
Expand Down Expand Up @@ -2238,6 +2246,8 @@ struct aws_mqtt_client_connection *aws_mqtt_client_connection_new_from_mqtt5_cli

aws_mqtt3_to_mqtt5_adapter_operation_table_init(&adapter->operational_state, allocator);

adapter->subscriptions = aws_mqtt_subscription_set_new(allocator);

/*
* We start disabled to handle the case where someone passes in an mqtt5 client that is already "live."
* We'll enable the adapter as soon as they try to connect via the 311 interface. This
Expand Down
9 changes: 9 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,15 @@ add_test_case(mqtt3to5_adapter_publish_success_qos1)
add_test_case(mqtt3to5_adapter_publish_no_ack)
add_test_case(mqtt3to5_adapter_publish_interrupted)

add_test_case(mqtt3to5_adapter_subscribe_single_success)
#add_test_case(mqtt3to5_adapter_subscribe_multi_success)
#add_test_case(mqtt3to5_adapter_subscribe_single_failure)
#add_test_case(mqtt3to5_adapter_subscribe_multi_failure)
#add_test_case(mqtt3to5_adapter_subscribe_single_publish)
#add_test_case(mqtt3to5_adapter_subscribe_multi_overlapping_publish)
#add_test_case(mqtt3to5_adapter_unsubscribe_success)
#add_test_case(mqtt3to5_adapter_unsubscribe_failure)

add_test_case(mqtt_subscription_set_add_empty_not_subbed)
add_test_case(mqtt_subscription_set_add_single_path)
add_test_case(mqtt_subscription_set_add_overlapped_branching_paths)
Expand Down
197 changes: 194 additions & 3 deletions tests/v5/mqtt3_to_mqtt5_adapter_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ struct aws_mqtt3_operation_event {
enum aws_mqtt_qos qos;
struct aws_byte_buf topic;
struct aws_byte_buf payload;

// subscribe complete properties
struct aws_array_list granted_subscriptions;
};

static void s_aws_mqtt3_operation_event_clean_up(struct aws_mqtt3_operation_event *event) {
Expand All @@ -68,6 +71,8 @@ static void s_aws_mqtt3_operation_event_clean_up(struct aws_mqtt3_operation_even

aws_byte_buf_clean_up(&event->topic);
aws_byte_buf_clean_up(&event->payload);

aws_array_list_clean_up(&event->granted_subscriptions);
}

static int s_aws_mqtt3_operation_event_verify_equals(
Expand All @@ -81,6 +86,21 @@ static int s_aws_mqtt3_operation_event_verify_equals(
ASSERT_BIN_ARRAYS_EQUALS(
expected->payload.buffer, expected->payload.len, actual->payload.buffer, actual->payload.len);

ASSERT_INT_EQUALS(
aws_array_list_length(&expected->granted_subscriptions), aws_array_list_length(&actual->granted_subscriptions));
for (size_t i = 0; i < aws_array_list_length(&expected->granted_subscriptions); ++i) {

struct aws_mqtt_topic_subscription expected_sub;
aws_array_list_get_at(&expected->granted_subscriptions, &expected_sub, i);

struct aws_mqtt_topic_subscription actual_sub;
aws_array_list_get_at(&actual->granted_subscriptions, &actual_sub, i);

ASSERT_INT_EQUALS(expected_sub.qos, actual_sub.qos);
ASSERT_BIN_ARRAYS_EQUALS(
expected_sub.topic.ptr, expected_sub.topic.len, actual_sub.topic.ptr, actual_sub.topic.len);
}

return AWS_OP_SUCCESS;
}

Expand Down Expand Up @@ -1024,7 +1044,7 @@ static int s_aws_mqtt5_server_disconnect_after_connect(
}

static int s_verify_bad_connectivity_callbacks(struct aws_mqtt3_to_mqtt5_adapter_test_fixture *fixture) {
struct aws_mqtt3_lifecycle_event expected_events[] = {
struct aws_mqtt3_lifecycle_event expected_events_start[] = {
{
.type = AWS_MQTT3_LET_CONNECTION_COMPLETE,
},
Expand All @@ -1046,15 +1066,21 @@ static int s_verify_bad_connectivity_callbacks(struct aws_mqtt3_to_mqtt5_adapter
.type = AWS_MQTT3_LET_INTERRUPTED,
.error_code = AWS_ERROR_MQTT5_DISCONNECT_RECEIVED,
},
};
ASSERT_SUCCESS(s_aws_mqtt3_to_mqtt5_adapter_test_fixture_verify_lifecycle_sequence_starts_with(
fixture, AWS_ARRAY_SIZE(expected_events_start), expected_events_start));

struct aws_mqtt3_lifecycle_event expected_events_end[] = {
{
.type = AWS_MQTT3_LET_DISCONNECTION_COMPLETE,
},
{
.type = AWS_MQTT3_LET_CLOSED,
},
};
ASSERT_SUCCESS(s_aws_mqtt3_to_mqtt5_adapter_test_fixture_verify_lifecycle_sequence(
fixture, AWS_ARRAY_SIZE(expected_events), expected_events, AWS_ARRAY_SIZE(expected_events)));

ASSERT_SUCCESS(s_aws_mqtt3_to_mqtt5_adapter_test_fixture_verify_lifecycle_sequence_ends_with(
fixture, AWS_ARRAY_SIZE(expected_events_end), expected_events_end));

return AWS_OP_SUCCESS;
}
Expand Down Expand Up @@ -2145,3 +2171,168 @@ static int s_mqtt3to5_adapter_publish_interrupted_fn(struct aws_allocator *alloc
}

AWS_TEST_CASE(mqtt3to5_adapter_publish_interrupted, s_mqtt3to5_adapter_publish_interrupted_fn)

void s_aws_mqtt3_to_mqtt5_adapter_test_fixture_record_subscribe_complete(
struct aws_mqtt_client_connection *connection,
uint16_t packet_id,
const struct aws_byte_cursor *topic,
enum aws_mqtt_qos qos,
int error_code,
void *userdata) {

(void)connection;
(void)packet_id;
(void)topic;

struct aws_mqtt3_to_mqtt5_adapter_test_fixture *fixture = userdata;

struct aws_mqtt3_operation_event operation_event = {
.type = AWS_MQTT3_OET_SUBSCRIBE_COMPLETE,
.error_code = error_code,
};

aws_array_list_init_dynamic(
&operation_event.granted_subscriptions,
fixture->mqtt5_fixture.allocator,
1,
sizeof(struct aws_mqtt_topic_subscription));

/*
* technically it's not safe to persist the topic cursor but they way the tests are built, the cursor will stay
* valid until the events are checked (as long as we don't delete the subscription internally)
*/
struct aws_mqtt_topic_subscription sub = {
.topic = *topic,
.qos = qos,
};
aws_array_list_push_back(&operation_event.granted_subscriptions, (void *)&sub);

aws_mutex_lock(&fixture->lock);
aws_array_list_push_back(&fixture->operation_events, &operation_event);
aws_mutex_unlock(&fixture->lock);
aws_condition_variable_notify_all(&fixture->signal);
}

void s_aws_mqtt3_to_mqtt5_adapter_test_fixture_record_topic_specific_publish(
struct aws_mqtt_client_connection *connection,
const struct aws_byte_cursor *topic,
const struct aws_byte_cursor *payload,
bool dup,
enum aws_mqtt_qos qos,
bool retain,
void *userdata) {

(void)connection;
(void)topic;
(void)payload;
(void)dup;
(void)qos;
(void)retain;
(void)userdata;
}

static int s_mqtt5_mock_server_handle_subscribe_suback_success(
void *packet,
struct aws_mqtt5_server_mock_connection_context *connection,
void *user_data) {

(void)user_data;

struct aws_mqtt5_packet_subscribe_view *subscribe_view = packet;

AWS_VARIABLE_LENGTH_ARRAY(
enum aws_mqtt5_suback_reason_code, mqtt5_suback_codes, subscribe_view->subscription_count);
for (size_t i = 0; i < subscribe_view->subscription_count; ++i) {
enum aws_mqtt5_suback_reason_code *reason_code_ptr = &mqtt5_suback_codes[i];
*reason_code_ptr = (enum aws_mqtt5_suback_reason_code)subscribe_view->subscriptions[i].qos;
}

struct aws_mqtt5_packet_suback_view suback_view = {
.packet_id = subscribe_view->packet_id,
.reason_code_count = subscribe_view->subscription_count,
.reason_codes = mqtt5_suback_codes,
};

return aws_mqtt5_mock_server_send_packet(connection, AWS_MQTT5_PT_SUBACK, &suback_view);
}

static int s_mqtt3to5_adapter_subscribe_single_success_fn(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

aws_mqtt_library_init(allocator);

struct mqtt5_client_test_options test_options;
aws_mqtt5_client_test_init_default_options(&test_options);

test_options.server_function_table.packet_handlers[AWS_MQTT5_PT_SUBSCRIBE] =
s_mqtt5_mock_server_handle_subscribe_suback_success;

struct aws_mqtt5_client_mqtt5_mock_test_fixture_options test_fixture_options = {
.client_options = &test_options.client_options,
.server_function_table = &test_options.server_function_table,
};

struct aws_mqtt3_to_mqtt5_adapter_test_fixture fixture;
ASSERT_SUCCESS(aws_mqtt3_to_mqtt5_adapter_test_fixture_init(&fixture, allocator, &test_fixture_options, NULL));

struct aws_mqtt_client_connection *connection = fixture.connection;

struct aws_mqtt_connection_options connection_options;
s_init_adapter_connection_options_from_fixture(&connection_options, &fixture);

connection_options.on_connection_complete = s_aws_mqtt3_to_mqtt5_adapter_test_fixture_record_connection_complete;
connection_options.user_data = &fixture;

aws_mqtt_client_connection_connect(connection, &connection_options);

s_wait_for_n_adapter_lifecycle_events(&fixture, AWS_MQTT3_LET_CONNECTION_COMPLETE, 1);

struct aws_byte_cursor topic = aws_byte_cursor_from_c_str("derp");

aws_mqtt_client_connection_subscribe(
connection,
&topic,
AWS_MQTT_QOS_AT_LEAST_ONCE,
s_aws_mqtt3_to_mqtt5_adapter_test_fixture_record_topic_specific_publish,
&fixture,
NULL,
s_aws_mqtt3_to_mqtt5_adapter_test_fixture_record_subscribe_complete,
&fixture);

s_wait_for_n_adapter_operation_events(&fixture, AWS_MQTT3_OET_SUBSCRIBE_COMPLETE, 1);

struct aws_mqtt_topic_subscription expected_subs[1] = {{
.topic = topic,
.qos = AWS_MQTT_QOS_AT_LEAST_ONCE,
}};

struct aws_mqtt3_operation_event expected_events[] = {{
.type = AWS_MQTT3_OET_SUBSCRIBE_COMPLETE,
.error_code = AWS_ERROR_SUCCESS,
}};
aws_array_list_init_static(
&expected_events[0].granted_subscriptions,
(void *)expected_subs,
1,
sizeof(struct aws_mqtt_topic_subscription));

s_aws_mqtt3_to_mqtt5_adapter_test_fixture_verify_operation_sequence(
&fixture, AWS_ARRAY_SIZE(expected_events), expected_events, AWS_ARRAY_SIZE(expected_events));

aws_mqtt3_to_mqtt5_adapter_test_fixture_clean_up(&fixture);
aws_mqtt_library_clean_up();

return AWS_OP_SUCCESS;
}

AWS_TEST_CASE(mqtt3to5_adapter_subscribe_single_success, s_mqtt3to5_adapter_subscribe_single_success_fn)

/*
#add_test_case(mqtt3to5_adapter_subscribe_multi_success)
#add_test_case(mqtt3to5_adapter_subscribe_single_failure)
#add_test_case(mqtt3to5_adapter_subscribe_multi_failure)
#add_test_case(mqtt3to5_adapter_subscribe_single_publish)
#add_test_case(mqtt3to5_adapter_subscribe_multi_overlapping_publish)
#add_test_case(mqtt3to5_adapter_unsubscribe_success)
#add_test_case(mqtt3to5_adapter_unsubscribe_failure)
*/

0 comments on commit 8224cc9

Please sign in to comment.