Skip to content

Commit

Permalink
Subscribe tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Bret Ambrose committed Jul 7, 2023
1 parent 09c958a commit 5c2fad2
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 18 deletions.
6 changes: 1 addition & 5 deletions source/v5/mqtt3_to_mqtt5_adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -1496,7 +1496,6 @@ static void s_adapter_publish_operation_destroy(struct aws_mqtt3_to_mqtt5_adapte

/* We're going away before our MQTT5 operation, make sure it doesn't try to call us back when it completes */
if (publish_op->publish_op != NULL) {
AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(publish_op->base.adapter->loop));
publish_op->publish_op->completion_options.completion_callback = NULL;
publish_op->publish_op->completion_options.completion_user_data = NULL;
}
Expand Down Expand Up @@ -1683,7 +1682,6 @@ static void s_adapter_subscribe_operation_destroy(struct aws_mqtt3_to_mqtt5_adap

/* We're going away before our MQTT5 operation, make sure it doesn't try to call us back when it completes */
if (subscribe_op->subscribe_op != NULL) {
AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(subscribe_op->base.adapter->loop));
subscribe_op->subscribe_op->completion_options.completion_callback = NULL;
subscribe_op->subscribe_op->completion_options.completion_user_data = NULL;
}
Expand Down Expand Up @@ -1767,7 +1765,7 @@ static void s_aws_mqtt3_to_mqtt5_adapter_subscribe_completion_fn(
&multi_sub_list,
multi_sub_subscription_ptr_buf,
suback->reason_code_count,
sizeof(struct aws_mqtt_topic_subscription));
sizeof(struct aws_mqtt_topic_subscription *));

size_t subscription_count = aws_array_list_length(&subscribe_op->subscriptions);

Expand Down Expand Up @@ -1796,7 +1794,6 @@ static void s_aws_mqtt3_to_mqtt5_adapter_subscribe_completion_fn(
&multi_sub_list,
error_code,
subscribe_op->on_multi_suback_user_data);
aws_array_list_clean_up(&multi_sub_list);
}

aws_mqtt3_to_mqtt5_adapter_operation_table_remove_operation(
Expand Down Expand Up @@ -2045,7 +2042,6 @@ static void s_adapter_unsubscribe_operation_destroy(struct aws_mqtt3_to_mqtt5_ad

/* We're going away before our MQTT5 operation, make sure it doesn't try to call us back when it completes */
if (unsubscribe_op->unsubscribe_op != NULL) {
AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(unsubscribe_op->base.adapter->loop));
unsubscribe_op->unsubscribe_op->completion_options.completion_callback = NULL;
unsubscribe_op->unsubscribe_op->completion_options.completion_user_data = NULL;
}
Expand Down
4 changes: 2 additions & 2 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,8 @@ add_test_case(mqtt3to5_adapter_publish_interrupted)

add_test_case(mqtt3to5_adapter_subscribe_single_success)
add_test_case(mqtt3to5_adapter_subscribe_multi_success)
#add_test_case(mqtt3to5_adapter_subscribe_single_failure)
#add_test_case(mqtt3to5_adapter_subscribe_multi_failure)
add_test_case(mqtt3to5_adapter_subscribe_single_failure)
add_test_case(mqtt3to5_adapter_subscribe_multi_failure)
#add_test_case(mqtt3to5_adapter_subscribe_single_publish)
#add_test_case(mqtt3to5_adapter_subscribe_multi_overlapping_publish)
#add_test_case(mqtt3to5_adapter_unsubscribe_success)
Expand Down
207 changes: 196 additions & 11 deletions tests/v5/mqtt3_to_mqtt5_adapter_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ struct aws_mqtt3_operation_event {

// subscribe complete properties
struct aws_array_list granted_subscriptions;

struct aws_byte_buf topic_storage;
};

static void s_aws_mqtt3_operation_event_clean_up(struct aws_mqtt3_operation_event *event) {
Expand All @@ -71,6 +73,7 @@ static void s_aws_mqtt3_operation_event_clean_up(struct aws_mqtt3_operation_even

aws_byte_buf_clean_up(&event->topic);
aws_byte_buf_clean_up(&event->payload);
aws_byte_buf_clean_up(&event->topic_storage);

aws_array_list_clean_up(&event->granted_subscriptions);
}
Expand Down Expand Up @@ -2197,12 +2200,14 @@ void s_aws_mqtt3_to_mqtt5_adapter_test_fixture_record_subscribe_complete(
1,
sizeof(struct aws_mqtt_topic_subscription));

aws_byte_buf_init_copy_from_cursor(&operation_event.topic_storage, fixture->mqtt5_fixture.allocator, *topic);

/*
* technically it's not safe to persist the topic cursor but they way the tests are built, the cursor will stay
* valid until the events are checked (as long as we don't delete the subscription internally)
*/
struct aws_mqtt_topic_subscription sub = {
.topic = *topic,
.topic = aws_byte_cursor_from_buf(&operation_event.topic_storage),
.qos = qos,
};
aws_array_list_push_back(&operation_event.granted_subscriptions, (void *)&sub);
Expand Down Expand Up @@ -2316,8 +2321,8 @@ static int s_mqtt3to5_adapter_subscribe_single_success_fn(struct aws_allocator *
1,
sizeof(struct aws_mqtt_topic_subscription));

s_aws_mqtt3_to_mqtt5_adapter_test_fixture_verify_operation_sequence(
&fixture, AWS_ARRAY_SIZE(expected_events), expected_events, AWS_ARRAY_SIZE(expected_events));
ASSERT_SUCCESS(s_aws_mqtt3_to_mqtt5_adapter_test_fixture_verify_operation_sequence(
&fixture, AWS_ARRAY_SIZE(expected_events), expected_events, AWS_ARRAY_SIZE(expected_events)));

aws_mqtt3_to_mqtt5_adapter_test_fixture_clean_up(&fixture);
aws_mqtt_library_clean_up();
Expand Down Expand Up @@ -2349,15 +2354,22 @@ static void s_aws_mqtt3_to_mqtt5_adapter_test_fixture_record_subscribe_multi_com
granted_count,
sizeof(struct aws_mqtt_topic_subscription));

size_t topic_length = 0;
for (size_t i = 0; i < granted_count; ++i) {
struct aws_mqtt_topic_subscription *granted_sub = NULL;
aws_array_list_get_at(topic_subacks, &granted_sub, i);

/*
* technically it's not safe to persist the topic cursor but they way the tests are built, the cursor will stay
* valid until the events are checked (as long as we don't delete the subscription internally)
*/
aws_array_list_push_back(&operation_event.granted_subscriptions, (void *)granted_sub);
topic_length += granted_sub->topic.len;
}

aws_byte_buf_init(&operation_event.topic_storage, fixture->mqtt5_fixture.allocator, topic_length);

for (size_t i = 0; i < granted_count; ++i) {
struct aws_mqtt_topic_subscription *granted_sub = NULL;
aws_array_list_get_at_ptr(&operation_event.granted_subscriptions, (void **)&granted_sub, i);

aws_byte_buf_append_and_update(&operation_event.topic_storage, &granted_sub->topic);
}

aws_mutex_lock(&fixture->lock);
Expand Down Expand Up @@ -2429,8 +2441,8 @@ static int s_mqtt3to5_adapter_subscribe_multi_success_fn(struct aws_allocator *a
2,
sizeof(struct aws_mqtt_topic_subscription));

s_aws_mqtt3_to_mqtt5_adapter_test_fixture_verify_operation_sequence(
&fixture, AWS_ARRAY_SIZE(expected_events), expected_events, AWS_ARRAY_SIZE(expected_events));
ASSERT_SUCCESS(s_aws_mqtt3_to_mqtt5_adapter_test_fixture_verify_operation_sequence(
&fixture, AWS_ARRAY_SIZE(expected_events), expected_events, AWS_ARRAY_SIZE(expected_events)));

aws_mqtt3_to_mqtt5_adapter_test_fixture_clean_up(&fixture);
aws_mqtt_library_clean_up();
Expand All @@ -2440,9 +2452,182 @@ static int s_mqtt3to5_adapter_subscribe_multi_success_fn(struct aws_allocator *a

AWS_TEST_CASE(mqtt3to5_adapter_subscribe_multi_success, s_mqtt3to5_adapter_subscribe_multi_success_fn)

static int s_mqtt5_mock_server_handle_subscribe_suback_failure(
void *packet,
struct aws_mqtt5_server_mock_connection_context *connection,
void *user_data) {

(void)user_data;

struct aws_mqtt5_packet_subscribe_view *subscribe_view = packet;

AWS_VARIABLE_LENGTH_ARRAY(
enum aws_mqtt5_suback_reason_code, mqtt5_suback_codes, subscribe_view->subscription_count);
for (size_t i = 0; i < subscribe_view->subscription_count; ++i) {
enum aws_mqtt5_suback_reason_code *reason_code_ptr = &mqtt5_suback_codes[i];
*reason_code_ptr = (i % 2) ? (enum aws_mqtt5_suback_reason_code)subscribe_view->subscriptions[i].qos
: AWS_MQTT5_SARC_QUOTA_EXCEEDED;
}

struct aws_mqtt5_packet_suback_view suback_view = {
.packet_id = subscribe_view->packet_id,
.reason_code_count = subscribe_view->subscription_count,
.reason_codes = mqtt5_suback_codes,
};

return aws_mqtt5_mock_server_send_packet(connection, AWS_MQTT5_PT_SUBACK, &suback_view);
}

static int s_mqtt3to5_adapter_subscribe_single_failure_fn(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

aws_mqtt_library_init(allocator);

struct mqtt5_client_test_options test_options;
aws_mqtt5_client_test_init_default_options(&test_options);

test_options.server_function_table.packet_handlers[AWS_MQTT5_PT_SUBSCRIBE] =
s_mqtt5_mock_server_handle_subscribe_suback_failure;

struct aws_mqtt5_client_mqtt5_mock_test_fixture_options test_fixture_options = {
.client_options = &test_options.client_options,
.server_function_table = &test_options.server_function_table,
};

struct aws_mqtt3_to_mqtt5_adapter_test_fixture fixture;
ASSERT_SUCCESS(aws_mqtt3_to_mqtt5_adapter_test_fixture_init(&fixture, allocator, &test_fixture_options, NULL));

struct aws_mqtt_client_connection *connection = fixture.connection;

struct aws_mqtt_connection_options connection_options;
s_init_adapter_connection_options_from_fixture(&connection_options, &fixture);

connection_options.on_connection_complete = s_aws_mqtt3_to_mqtt5_adapter_test_fixture_record_connection_complete;
connection_options.user_data = &fixture;

aws_mqtt_client_connection_connect(connection, &connection_options);

s_wait_for_n_adapter_lifecycle_events(&fixture, AWS_MQTT3_LET_CONNECTION_COMPLETE, 1);

struct aws_byte_cursor topic = aws_byte_cursor_from_c_str("derp");

aws_mqtt_client_connection_subscribe(
connection,
&topic,
AWS_MQTT_QOS_AT_LEAST_ONCE,
s_aws_mqtt3_to_mqtt5_adapter_test_fixture_record_topic_specific_publish,
&fixture,
NULL,
s_aws_mqtt3_to_mqtt5_adapter_test_fixture_record_subscribe_complete,
&fixture);

s_wait_for_n_adapter_operation_events(&fixture, AWS_MQTT3_OET_SUBSCRIBE_COMPLETE, 1);

struct aws_mqtt_topic_subscription expected_subs[1] = {{
.topic = topic,
.qos = AWS_MQTT_QOS_FAILURE,
}};

struct aws_mqtt3_operation_event expected_events[] = {{
.type = AWS_MQTT3_OET_SUBSCRIBE_COMPLETE,
.error_code = AWS_ERROR_SUCCESS,
}};

aws_array_list_init_static_from_initialized(
&expected_events[0].granted_subscriptions,
(void *)expected_subs,
1,
sizeof(struct aws_mqtt_topic_subscription));

ASSERT_SUCCESS(s_aws_mqtt3_to_mqtt5_adapter_test_fixture_verify_operation_sequence(
&fixture, AWS_ARRAY_SIZE(expected_events), expected_events, AWS_ARRAY_SIZE(expected_events)));

aws_mqtt3_to_mqtt5_adapter_test_fixture_clean_up(&fixture);
aws_mqtt_library_clean_up();

return AWS_OP_SUCCESS;
}

AWS_TEST_CASE(mqtt3to5_adapter_subscribe_single_failure, s_mqtt3to5_adapter_subscribe_single_failure_fn)

static int s_mqtt3to5_adapter_subscribe_multi_failure_fn(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

aws_mqtt_library_init(allocator);

struct mqtt5_client_test_options test_options;
aws_mqtt5_client_test_init_default_options(&test_options);

test_options.server_function_table.packet_handlers[AWS_MQTT5_PT_SUBSCRIBE] =
s_mqtt5_mock_server_handle_subscribe_suback_failure;

struct aws_mqtt5_client_mqtt5_mock_test_fixture_options test_fixture_options = {
.client_options = &test_options.client_options,
.server_function_table = &test_options.server_function_table,
};

struct aws_mqtt3_to_mqtt5_adapter_test_fixture fixture;
ASSERT_SUCCESS(aws_mqtt3_to_mqtt5_adapter_test_fixture_init(&fixture, allocator, &test_fixture_options, NULL));

struct aws_mqtt_client_connection *connection = fixture.connection;

struct aws_mqtt_connection_options connection_options;
s_init_adapter_connection_options_from_fixture(&connection_options, &fixture);

connection_options.on_connection_complete = s_aws_mqtt3_to_mqtt5_adapter_test_fixture_record_connection_complete;
connection_options.user_data = &fixture;

aws_mqtt_client_connection_connect(connection, &connection_options);

s_wait_for_n_adapter_lifecycle_events(&fixture, AWS_MQTT3_LET_CONNECTION_COMPLETE, 1);

struct aws_mqtt_topic_subscription subscriptions[] = {
{
.topic = aws_byte_cursor_from_c_str("topic/1"),
.qos = AWS_MQTT_QOS_AT_LEAST_ONCE,
},
{
.topic = aws_byte_cursor_from_c_str("topic/2"),
.qos = AWS_MQTT_QOS_AT_MOST_ONCE,
}};

struct aws_array_list subscription_list;
aws_array_list_init_static_from_initialized(
&subscription_list, subscriptions, 2, sizeof(struct aws_mqtt_topic_subscription));

aws_mqtt_client_connection_subscribe_multiple(
connection,
&subscription_list,
s_aws_mqtt3_to_mqtt5_adapter_test_fixture_record_subscribe_multi_complete,
&fixture);

s_wait_for_n_adapter_operation_events(&fixture, AWS_MQTT3_OET_SUBSCRIBE_COMPLETE, 1);

/* reuse the subscriptions array for validation, but the first one will fail */
subscriptions[0].qos = AWS_MQTT_QOS_FAILURE;

struct aws_mqtt3_operation_event expected_events[] = {{
.type = AWS_MQTT3_OET_SUBSCRIBE_COMPLETE,
.error_code = AWS_ERROR_SUCCESS,
}};
aws_array_list_init_static_from_initialized(
&expected_events[0].granted_subscriptions,
(void *)subscriptions,
2,
sizeof(struct aws_mqtt_topic_subscription));

ASSERT_SUCCESS(s_aws_mqtt3_to_mqtt5_adapter_test_fixture_verify_operation_sequence(
&fixture, AWS_ARRAY_SIZE(expected_events), expected_events, AWS_ARRAY_SIZE(expected_events)));

aws_mqtt3_to_mqtt5_adapter_test_fixture_clean_up(&fixture);
aws_mqtt_library_clean_up();

return AWS_OP_SUCCESS;
}

AWS_TEST_CASE(mqtt3to5_adapter_subscribe_multi_failure, s_mqtt3to5_adapter_subscribe_multi_failure_fn)

/*
#add_test_case(mqtt3to5_adapter_subscribe_single_failure)
#add_test_case(mqtt3to5_adapter_subscribe_multi_failure)
#add_test_case(mqtt3to5_adapter_subscribe_single_publish)
#add_test_case(mqtt3to5_adapter_subscribe_multi_overlapping_publish)
#add_test_case(mqtt3to5_adapter_unsubscribe_success)
Expand Down

0 comments on commit 5c2fad2

Please sign in to comment.