diff --git a/CMakeLists.txt b/CMakeLists.txt index b33f24d6..0a2eeefa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -57,6 +57,7 @@ file(GLOB AWS_MQTT_PRIV_EXPOSED_HEADERS file(GLOB AWS_MQTT_SRC "source/*.c" "source/v5/*.c" + "source/request-response/*.c" ) file(GLOB MQTT_HEADERS diff --git a/include/aws/mqtt/private/request-response/protocol_adapter.h b/include/aws/mqtt/private/request-response/protocol_adapter.h index 3cf39a07..86d5a44f 100644 --- a/include/aws/mqtt/private/request-response/protocol_adapter.h +++ b/include/aws/mqtt/private/request-response/protocol_adapter.h @@ -6,6 +6,7 @@ * SPDX-License-Identifier: Apache-2.0. */ +#include #include #include diff --git a/source/request-response/protocol_adapter.c b/source/request-response/protocol_adapter.c index 4aae75cd..d1a649b0 100644 --- a/source/request-response/protocol_adapter.c +++ b/source/request-response/protocol_adapter.c @@ -48,7 +48,7 @@ struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_311(struct struct aws_mqtt_protocol_adapter_5_impl { struct aws_allocator *allocator; struct aws_mqtt_protocol_adapter base; - struct aws_protocol_adapter_weak_ref *callback_ref; + struct aws_weak_ref *callback_ref; struct aws_mqtt_protocol_adapter_options config; struct aws_event_loop *loop; @@ -68,10 +68,10 @@ struct aws_mqtt_protocol_adapter_5_subscription_op_data { struct aws_allocator *allocator; struct aws_byte_buf topic_filter; - struct aws_protocol_adapter_weak_ref *callback_ref; + struct aws_weak_ref *callback_ref; }; -static struct aws_mqtt_protocol_adapter_5_subscription_op_data *s_aws_mqtt_protocol_adapter_5_subscription_op_data_new(struct aws_allocator *allocator, struct aws_byte_cursor topic_filter, struct aws_protocol_adapter_weak_ref *callback_ref) { +static struct aws_mqtt_protocol_adapter_5_subscription_op_data *s_aws_mqtt_protocol_adapter_5_subscription_op_data_new(struct aws_allocator *allocator, struct aws_byte_cursor topic_filter, struct aws_weak_ref *callback_ref) { struct aws_mqtt_protocol_adapter_5_subscription_op_data *subscribe_data = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_5_subscription_op_data)); subscribe_data->allocator = allocator; @@ -207,13 +207,13 @@ int s_aws_mqtt_protocol_adapter_5_unsubscribe(void *impl, struct aws_protocol_ad struct aws_mqtt_protocol_adapter_5_publish_op_data { struct aws_allocator *allocator; - struct aws_protocol_adapter_weak_ref *callback_ref; + struct aws_weak_ref *callback_ref; void (*completion_callback_fn)(bool, void *); void *user_data; }; -static struct aws_mqtt_protocol_adapter_5_publish_op_data *s_aws_mqtt_protocol_adapter_5_publish_op_data_new(struct aws_allocator *allocator, const struct aws_protocol_adapter_publish_options *publish_options, struct aws_protocol_adapter_weak_ref *callback_ref) { +static struct aws_mqtt_protocol_adapter_5_publish_op_data *s_aws_mqtt_protocol_adapter_5_publish_op_data_new(struct aws_allocator *allocator, const struct aws_protocol_adapter_publish_options *publish_options, struct aws_weak_ref *callback_ref) { struct aws_mqtt_protocol_adapter_5_publish_op_data *publish_data = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_5_publish_op_data)); publish_data->allocator = allocator; @@ -347,8 +347,6 @@ static struct aws_mqtt_protocol_adapter_vtable s_protocol_adapter_mqtt5_vtable = }; struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_5(struct aws_allocator *allocator, struct aws_mqtt_protocol_adapter_options *options, struct aws_mqtt5_client *client) { - AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(client->loop)); - struct aws_mqtt_protocol_adapter_5_impl *adapter = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_5_impl)); adapter->allocator = allocator; @@ -373,7 +371,7 @@ struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_5(struct aw adapter->listener = aws_mqtt5_listener_new(allocator, &listener_options); - return adapter; + return &adapter->base; } void aws_mqtt_protocol_adapter_delete(struct aws_mqtt_protocol_adapter *adapter) { @@ -390,4 +388,4 @@ int aws_mqtt_protocol_adapter_unsubscribe(struct aws_mqtt_protocol_adapter *adap int aws_mqtt_protocol_adapter_publish(struct aws_mqtt_protocol_adapter *adapter, struct aws_protocol_adapter_publish_options *options) { return (*adapter->vtable->aws_mqtt_protocol_adapter_publish_fn)(adapter->impl, options); -} \ No newline at end of file +} diff --git a/source/request-response/weak_ref.c b/source/request-response/weak_ref.c index e3e5009c..97561ea9 100644 --- a/source/request-response/weak_ref.c +++ b/source/request-response/weak_ref.c @@ -51,4 +51,4 @@ void *aws_weak_ref_get_reference(struct aws_weak_ref *weak_ref) { void aws_weak_ref_zero_reference(struct aws_weak_ref *weak_ref) { weak_ref->referenced = NULL; -} \ No newline at end of file +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 9bcbb720..f1d90e22 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -442,7 +442,7 @@ add_test_case(mqtt_subscription_set_publish_single_level_wildcards) add_test_case(mqtt_subscription_set_publish_multi_level_wildcards) add_test_case(mqtt_subscription_set_get_subscriptions) -#add_test_case(request_response_mqtt5_protocol_adapter_subscribe_success) +add_test_case(request_response_mqtt5_protocol_adapter_subscribe_success) #add_test_case(request_response_mqtt5_protocol_adapter_subscribe_failure_error_code) #add_test_case(request_response_mqtt5_protocol_adapter_subscribe_failure_reason_code) #add_test_case(request_response_mqtt5_protocol_adapter_subscribe_failure_timeout) diff --git a/tests/request_response_protocol_adapter_tests.c b/tests/request_response_protocol_adapter_tests.c new file mode 100644 index 00000000..da53747b --- /dev/null +++ b/tests/request_response_protocol_adapter_tests.c @@ -0,0 +1,372 @@ +/** +* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +* SPDX-License-Identifier: Apache-2.0. +*/ + +#include "v5/mqtt5_testing_utils.h" +#include + +#include "aws/mqtt/private/request-response/protocol_adapter.h" + +#include + +struct request_response_protocol_adapter_incoming_publish_event_record { + struct aws_byte_buf topic; + struct aws_byte_buf payload; +}; + +static void s_request_response_protocol_adapter_incoming_publish_event_record_init( + struct request_response_protocol_adapter_incoming_publish_event_record *record, + struct aws_allocator *allocator, + struct aws_byte_cursor topic, + struct aws_byte_cursor payload) { + + aws_byte_buf_init_copy_from_cursor(&record->topic, allocator, topic); + aws_byte_buf_init_copy_from_cursor(&record->payload, allocator, payload); +} + +static void s_request_response_protocol_adapter_incoming_publish_event_record_cleanup(struct request_response_protocol_adapter_incoming_publish_event_record *record) { + aws_byte_buf_clean_up(&record->topic); + aws_byte_buf_clean_up(&record->payload); +} + +struct request_response_protocol_adapter_connection_event_record { + enum aws_protocol_adapter_connection_event_type event_type; + bool rejoined_session; +}; + +struct request_response_protocol_adapter_subscription_event_record { + enum aws_protocol_adapter_subscription_event_type event_type; + struct aws_byte_buf topic_filter; +}; + +static void s_request_response_protocol_adapter_subscription_event_record_init( + struct request_response_protocol_adapter_subscription_event_record *record, + struct aws_allocator *allocator, + struct aws_byte_cursor topic_filter) { + + aws_byte_buf_init_copy_from_cursor(&record->topic_filter, allocator, topic_filter); +} + +static void s_request_response_protocol_adapter_subscription_event_record_cleanup(struct request_response_protocol_adapter_subscription_event_record *record) { + aws_byte_buf_clean_up(&record->topic_filter); +} + +struct aws_request_response_mqtt5_adapter_test_fixture { + struct aws_allocator *allocator; + struct aws_mqtt5_client_mock_test_fixture mqtt5_fixture; + + struct aws_mqtt_protocol_adapter *protocol_adapter; + + struct aws_array_list incoming_publish_events; + struct aws_array_list connection_events; + struct aws_array_list subscription_events; + + bool adapter_terminated; + + struct aws_mutex lock; + struct aws_condition_variable signal; +}; + + +static void s_rr_mqtt5_protocol_adapter_test_on_subscription_event(struct aws_protocol_adapter_subscription_event *event, void *user_data) { + struct aws_request_response_mqtt5_adapter_test_fixture *fixture = user_data; + + struct request_response_protocol_adapter_subscription_event_record record = { + .event_type = event->event_type + }; + s_request_response_protocol_adapter_subscription_event_record_init(&record, fixture->allocator, event->topic_filter); + + aws_mutex_lock(&fixture->lock); + aws_array_list_push_back(&fixture->subscription_events, &record); + aws_mutex_unlock(&fixture->lock); + aws_condition_variable_notify_all(&fixture->signal); +} + +static void s_rr_mqtt5_protocol_adapter_test_on_incoming_publish(struct aws_protocol_adapter_incoming_publish_event *publish, void *user_data) { + struct aws_request_response_mqtt5_adapter_test_fixture *fixture = user_data; + + struct request_response_protocol_adapter_incoming_publish_event_record record; + AWS_ZERO_STRUCT(record); + s_request_response_protocol_adapter_incoming_publish_event_record_init(&record, fixture->allocator, publish->topic, publish->payload); + + aws_mutex_lock(&fixture->lock); + aws_array_list_push_back(&fixture->incoming_publish_events, &record); + aws_mutex_unlock(&fixture->lock); + aws_condition_variable_notify_all(&fixture->signal); +} + +static void s_rr_mqtt5_protocol_adapter_test_on_terminate_callback(void *user_data) { + struct aws_request_response_mqtt5_adapter_test_fixture *fixture = user_data; + + aws_mutex_lock(&fixture->lock); + fixture->adapter_terminated = true; + aws_mutex_unlock(&fixture->lock); + aws_condition_variable_notify_all(&fixture->signal); +} + +static void s_rr_mqtt5_protocol_adapter_test_on_connection_event(struct aws_protocol_adapter_connection_event *event, void *user_data) { + struct aws_request_response_mqtt5_adapter_test_fixture *fixture = user_data; + + struct request_response_protocol_adapter_connection_event_record record = { + .event_type = event->event_type, + .rejoined_session = event->rejoined_session + }; + + aws_mutex_lock(&fixture->lock); + aws_array_list_push_back(&fixture->connection_events, &record); + aws_mutex_unlock(&fixture->lock); + aws_condition_variable_notify_all(&fixture->signal); +} + +static int s_aws_request_response_mqtt5_adapter_test_fixture_init( + struct aws_request_response_mqtt5_adapter_test_fixture *fixture, + struct aws_allocator *allocator, + struct aws_mqtt5_client_mqtt5_mock_test_fixture_options *mqtt5_fixture_config) { + + AWS_ZERO_STRUCT(*fixture); + + fixture->allocator = allocator; + + if (aws_mqtt5_client_mock_test_fixture_init(&fixture->mqtt5_fixture, allocator, mqtt5_fixture_config)) { + return AWS_OP_ERR; + } + + struct aws_mqtt_protocol_adapter_options protocol_adapter_options = { + .subscription_event_callback = s_rr_mqtt5_protocol_adapter_test_on_subscription_event, + .incoming_publish_callback = s_rr_mqtt5_protocol_adapter_test_on_incoming_publish, + .terminate_callback = s_rr_mqtt5_protocol_adapter_test_on_terminate_callback, + .connection_event_callback = s_rr_mqtt5_protocol_adapter_test_on_connection_event, + .user_data = fixture + }; + + fixture->protocol_adapter = aws_mqtt_protocol_adapter_new_from_5(allocator, &protocol_adapter_options, fixture->mqtt5_fixture.client); + AWS_FATAL_ASSERT(fixture->protocol_adapter != NULL); + + aws_array_list_init_dynamic(&fixture->incoming_publish_events, allocator, 10, sizeof(struct request_response_protocol_adapter_incoming_publish_event_record)); + aws_array_list_init_dynamic(&fixture->connection_events, allocator, 10, sizeof(struct request_response_protocol_adapter_connection_event_record)); + aws_array_list_init_dynamic(&fixture->subscription_events, allocator, 10, sizeof(struct request_response_protocol_adapter_subscription_event_record)); + + aws_mutex_init(&fixture->lock); + aws_condition_variable_init(&fixture->signal); + + return AWS_OP_SUCCESS; +} + +static bool s_is_adapter_terminated(void *context) { + struct aws_request_response_mqtt5_adapter_test_fixture *fixture = context; + + return fixture->adapter_terminated; +} + +static void s_aws_request_response_mqtt5_adapter_test_fixture_clean_up(struct aws_request_response_mqtt5_adapter_test_fixture *fixture) { + aws_mqtt_protocol_adapter_delete(fixture->protocol_adapter); + + aws_mutex_lock(&fixture->lock); + aws_condition_variable_wait_pred(&fixture->signal, &fixture->lock, s_is_adapter_terminated, fixture); + aws_mutex_unlock(&fixture->lock); + + aws_mqtt5_client_mock_test_fixture_clean_up(&fixture->mqtt5_fixture); + + for (size_t i = 0; i < aws_array_list_length(&fixture->subscription_events); ++i) { + struct request_response_protocol_adapter_subscription_event_record record; + aws_array_list_get_at(&fixture->subscription_events, &record, i); + s_request_response_protocol_adapter_subscription_event_record_cleanup(&record); + } + aws_array_list_clean_up(&fixture->subscription_events); + + for (size_t i = 0; i < aws_array_list_length(&fixture->incoming_publish_events); ++i) { + struct request_response_protocol_adapter_incoming_publish_event_record record; + aws_array_list_get_at(&fixture->incoming_publish_events, &record, i); + s_request_response_protocol_adapter_incoming_publish_event_record_cleanup(&record); + } + aws_array_list_clean_up(&fixture->incoming_publish_events); + + aws_array_list_clean_up(&fixture->connection_events); + + aws_mutex_clean_up(&fixture->lock); + aws_condition_variable_clean_up(&fixture->signal); +} + +struct test_subscription_event_wait_context { + struct request_response_protocol_adapter_subscription_event_record *expected_event; + size_t expected_count; + struct aws_request_response_mqtt5_adapter_test_fixture *fixture; +}; + +static bool s_do_subscription_events_contain(void *context) { + struct test_subscription_event_wait_context *wait_context = context; + + size_t found = 0; + + size_t num_events = aws_array_list_length(&wait_context->fixture->subscription_events); + for (size_t i = 0; i < num_events; ++i) { + struct request_response_protocol_adapter_subscription_event_record record; + aws_array_list_get_at(&wait_context->fixture->subscription_events, &record, i); + + if (record.event_type == wait_context->expected_event->event_type) { + struct aws_byte_cursor record_topic_filter = aws_byte_cursor_from_buf(&record.topic_filter); + struct aws_byte_cursor expected_topic_filter = aws_byte_cursor_from_buf(&wait_context->expected_event->topic_filter); + if (aws_byte_cursor_eq(&record_topic_filter, &expected_topic_filter)) { + ++found; + } + } + } + + return found >= wait_context->expected_count; +} + +static void s_wait_for_subscription_events_contains(struct aws_request_response_mqtt5_adapter_test_fixture *fixture, + struct request_response_protocol_adapter_subscription_event_record *expected_event, + size_t expected_count) { + + struct test_subscription_event_wait_context context = { + .expected_event = expected_event, + .expected_count = expected_count, + .fixture = fixture, + }; + + aws_mutex_lock(&fixture->lock); + aws_condition_variable_wait_pred(&fixture->signal, &fixture->lock, s_do_subscription_events_contain, &context); + aws_mutex_unlock(&fixture->lock); +} + +struct test_connection_event_wait_context { + struct request_response_protocol_adapter_connection_event_record *expected_event; + size_t expected_count; + struct aws_request_response_mqtt5_adapter_test_fixture *fixture; +}; + +static bool s_do_connection_events_contain(void *context) { + struct test_connection_event_wait_context *wait_context = context; + + size_t found = 0; + + size_t num_events = aws_array_list_length(&wait_context->fixture->connection_events); + for (size_t i = 0; i < num_events; ++i) { + struct request_response_protocol_adapter_connection_event_record record; + aws_array_list_get_at(&wait_context->fixture->connection_events, &record, i); + + if (record.event_type == wait_context->expected_event->event_type && record.rejoined_session == wait_context->expected_event->rejoined_session) { + ++found; + } + } + + return found >= wait_context->expected_count; +} + +static void s_wait_for_connection_events_contains(struct aws_request_response_mqtt5_adapter_test_fixture *fixture, + struct request_response_protocol_adapter_connection_event_record *expected_event, + size_t expected_count) { + + struct test_connection_event_wait_context context = { + .expected_event = expected_event, + .expected_count = expected_count, + .fixture = fixture, + }; + + aws_mutex_lock(&fixture->lock); + aws_condition_variable_wait_pred(&fixture->signal, &fixture->lock, s_do_connection_events_contain, &context); + aws_mutex_unlock(&fixture->lock); +} + +struct test_incoming_publish_event_wait_context { + struct request_response_protocol_adapter_incoming_publish_event_record *expected_event; + size_t expected_count; + struct aws_request_response_mqtt5_adapter_test_fixture *fixture; +}; + +static bool s_do_incoming_publish_events_contain(void *context) { + struct test_incoming_publish_event_wait_context *wait_context = context; + + size_t found = 0; + + size_t num_events = aws_array_list_length(&wait_context->fixture->incoming_publish_events); + for (size_t i = 0; i < num_events; ++i) { + struct request_response_protocol_adapter_incoming_publish_event_record record; + aws_array_list_get_at(&wait_context->fixture->incoming_publish_events, &record, i); + + struct aws_byte_cursor record_topic = aws_byte_cursor_from_buf(&record.topic); + struct aws_byte_cursor expected_topic = aws_byte_cursor_from_buf(&wait_context->expected_event->topic); + if (!aws_byte_cursor_eq(&record_topic, &expected_topic)) { + continue; + } + + struct aws_byte_cursor record_payload = aws_byte_cursor_from_buf(&record.payload); + struct aws_byte_cursor expected_payload = aws_byte_cursor_from_buf(&wait_context->expected_event->payload); + if (!aws_byte_cursor_eq(&record_payload, &expected_payload)) { + continue; + } + + ++found; + } + + return found >= wait_context->expected_count; +} + +static void s_wait_for_incoming_publish_events_contains(struct aws_request_response_mqtt5_adapter_test_fixture *fixture, + struct request_response_protocol_adapter_incoming_publish_event_record *expected_event, + size_t expected_count) { + + struct test_incoming_publish_event_wait_context context = { + .expected_event = expected_event, + .expected_count = expected_count, + .fixture = fixture, + }; + + aws_mutex_lock(&fixture->lock); + aws_condition_variable_wait_pred(&fixture->signal, &fixture->lock, s_do_incoming_publish_events_contain, &context); + aws_mutex_unlock(&fixture->lock); +} + +static int s_request_response_mqtt5_protocol_adapter_subscribe_success_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + aws_mqtt_library_init(allocator); + + struct mqtt5_client_test_options test_options; + aws_mqtt5_client_test_init_default_options(&test_options); + + test_options.server_function_table.packet_handlers[AWS_MQTT5_PT_SUBSCRIBE] = + aws_mqtt5_server_send_suback_on_subscribe; + + struct aws_mqtt5_client_mqtt5_mock_test_fixture_options mqtt5_test_fixture_options = { + .client_options = &test_options.client_options, + .server_function_table = &test_options.server_function_table, + }; + + struct aws_request_response_mqtt5_adapter_test_fixture fixture; + ASSERT_SUCCESS(s_aws_request_response_mqtt5_adapter_test_fixture_init(&fixture, allocator, &mqtt5_test_fixture_options)); + + struct aws_mqtt5_client *client = fixture.mqtt5_fixture.client; + ASSERT_SUCCESS(aws_mqtt5_client_start(client)); + + aws_wait_for_connected_lifecycle_event(&fixture.mqtt5_fixture); + + struct request_response_protocol_adapter_subscription_event_record expected_outcome = { + .event_type = AWS_PASET_SUBSCRIBE_SUCCESS, + }; + + aws_byte_buf_init_copy_from_cursor(&expected_outcome.topic_filter, allocator, aws_byte_cursor_from_c_str("hello/world")); + + struct aws_protocol_adapter_subscribe_options subscribe_options = { + .topic_filter = aws_byte_cursor_from_buf(&expected_outcome.topic_filter), + .ack_timeout_seconds = 5, + }; + + aws_mqtt_protocol_adapter_subscribe(fixture.protocol_adapter, &subscribe_options); + + s_wait_for_subscription_events_contains(&fixture, &expected_outcome, 1); + + s_request_response_protocol_adapter_subscription_event_record_cleanup(&expected_outcome); + + s_aws_request_response_mqtt5_adapter_test_fixture_clean_up(&fixture); + + aws_mqtt_library_clean_up(); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE( + request_response_mqtt5_protocol_adapter_subscribe_success, + s_request_response_mqtt5_protocol_adapter_subscribe_success_fn) \ No newline at end of file diff --git a/tests/v5/mqtt5_client_tests.c b/tests/v5/mqtt5_client_tests.c index 06367d3d..7049dbea 100644 --- a/tests/v5/mqtt5_client_tests.c +++ b/tests/v5/mqtt5_client_tests.c @@ -1864,7 +1864,7 @@ static void s_wait_for_suback_received(struct aws_mqtt5_client_mock_test_fixture aws_mutex_unlock(&test_context->lock); } -static int s_aws_mqtt5_server_send_suback_on_subscribe( +int aws_mqtt5_server_send_suback_on_subscribe( void *packet, struct aws_mqtt5_server_mock_connection_context *connection, void *user_data) { @@ -1892,7 +1892,7 @@ static int s_mqtt5_client_subscribe_success_fn(struct aws_allocator *allocator, aws_mqtt5_client_test_init_default_options(&test_options); test_options.server_function_table.packet_handlers[AWS_MQTT5_PT_SUBSCRIBE] = - s_aws_mqtt5_server_send_suback_on_subscribe; + aws_mqtt5_server_send_suback_on_subscribe; struct aws_mqtt5_client_mock_test_fixture test_context; @@ -3241,7 +3241,7 @@ static int s_do_sub_pub_unsub_test(struct aws_allocator *allocator, enum aws_mqt test_options.client_options.publish_received_handler_user_data = &full_test_context; test_options.server_function_table.packet_handlers[AWS_MQTT5_PT_SUBSCRIBE] = - s_aws_mqtt5_server_send_suback_on_subscribe; + aws_mqtt5_server_send_suback_on_subscribe; test_options.server_function_table.packet_handlers[AWS_MQTT5_PT_PUBLISH] = aws_mqtt5_mock_server_handle_publish_puback_and_forward; test_options.server_function_table.packet_handlers[AWS_MQTT5_PT_UNSUBSCRIBE] = @@ -4139,7 +4139,7 @@ static int s_mqtt5_client_statistics_subscribe_fn(struct aws_allocator *allocato aws_mqtt5_client_test_init_default_options(&test_options); test_options.server_function_table.packet_handlers[AWS_MQTT5_PT_SUBSCRIBE] = - s_aws_mqtt5_server_send_suback_on_subscribe; + aws_mqtt5_server_send_suback_on_subscribe; struct aws_mqtt5_client_mock_test_fixture test_context; diff --git a/tests/v5/mqtt5_testing_utils.h b/tests/v5/mqtt5_testing_utils.h index 0e30f246..74cd0f8d 100644 --- a/tests/v5/mqtt5_testing_utils.h +++ b/tests/v5/mqtt5_testing_utils.h @@ -223,6 +223,11 @@ int aws_mqtt5_mock_server_send_packet( enum aws_mqtt5_packet_type packet_type, void *packet); +int aws_mqtt5_server_send_suback_on_subscribe( + void *packet, + struct aws_mqtt5_server_mock_connection_context *connection, + void *user_data); + extern const struct aws_string *g_default_client_id; #define RECONNECT_TEST_MIN_BACKOFF 500 diff --git a/tests/v5/mqtt5_to_mqtt3_adapter_tests.c b/tests/v5/mqtt5_to_mqtt3_adapter_tests.c index e3de0802..b067267a 100644 --- a/tests/v5/mqtt5_to_mqtt3_adapter_tests.c +++ b/tests/v5/mqtt5_to_mqtt3_adapter_tests.c @@ -9,18 +9,14 @@ #include #include #include -#include #include #include #include -#include #include -#include #include #include -#include enum aws_mqtt3_lifecycle_event_type { AWS_MQTT3_LET_CONNECTION_COMPLETE, diff --git a/tests/v5/request_response_protocol_adapter_tests.c b/tests/v5/request_response_protocol_adapter_tests.c deleted file mode 100644 index f635e126..00000000 --- a/tests/v5/request_response_protocol_adapter_tests.c +++ /dev/null @@ -1,185 +0,0 @@ -/** -* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -* SPDX-License-Identifier: Apache-2.0. -*/ - -#include "mqtt5_testing_utils.h" -#include - -struct request_response_protocol_adapter_incoming_publish_event_record { - struct aws_byte_buf topic; - struct aws_byte_buf payload; -}; - -static void s_request_response_protocol_adapter_incoming_publish_event_record_init( - struct request_response_protocol_adapter_incoming_publish_event_record *record, - struct aws_allocator *allocator, - struct aws_byte_cursor topic, - struct aws_byte_cursor payload) { - - aws_byte_buf_init_copy_from_cursor(&record->topic, allocator, topic); - aws_byte_buf_init_copy_from_cursor(&record->payload, allocator, payload); -} - -static void s_request_response_protocol_adapter_incoming_publish_event_record_cleanup(struct request_response_protocol_adapter_incoming_publish_event_record *record) { - aws_byte_buf_clean_up(&record->topic); - aws_byte_buf_clean_up(&record->payload); -} - -struct request_response_protocol_adapter_connection_event_record { - enum aws_protocol_adapter_connection_event_type event_type; - bool rejoined_session; -}; - -struct request_response_protocol_adapter_subscription_event_record { - enum aws_protocol_adapter_subscription_event_type event_type; - struct aws_byte_buf topic_filter; -}; - -static void s_request_response_protocol_adapter_incoming_subscription_event_record_init( - struct request_response_protocol_adapter_subscription_event_record *record, - struct aws_allocator *allocator, - struct aws_byte_cursor topic_filter) { - - aws_byte_buf_init_copy_from_cursor(&record->topic_filter, allocator, topic_filter); -} - -static void s_request_response_protocol_adapter_incoming_subscription_event_record_cleanup(struct request_response_protocol_adapter_subscription_event_record *record) { - aws_byte_buf_clean_up(&record->topic_filter); -} - -struct aws_request_response_mqtt5_adapter_test_fixture { - struct aws_allocator *allocator; - struct aws_mqtt5_client_mock_test_fixture mqtt5_fixture; - - struct aws_mqtt_protocol_adapter *protocol_adapter; - - struct aws_array_list incoming_publish_events; - struct aws_array_list connection_events; - struct aws_array_list subscription_events; - - bool adapter_terminated; - - struct aws_mutex lock; - struct aws_condition_variable signal; -}; - - -static void s_rr_mqtt5_protocol_adapter_test_on_subscription_event(struct aws_protocol_adapter_subscription_event *event, void *user_data) { - struct aws_request_response_mqtt5_adapter_test_fixture *fixture = user_data; - - struct request_response_protocol_adapter_subscription_event_record record = { - .event_type = event->event_type - }; - s_request_response_protocol_adapter_incoming_subscription_event_record_init(&record, fixture->allocator, event->topic_filter); - - aws_mutex_lock(&fixture->lock); - aws_array_list_push_back(&fixture->subscription_events, &record); - aws_mutex_unlock(&fixture->lock); - aws_condition_variable_notify_all(&fixture->signal); -} - -static void s_rr_mqtt5_protocol_adapter_test_on_incoming_publish(struct aws_protocol_adapter_incoming_publish_event *publish, void *user_data) { - struct aws_request_response_mqtt5_adapter_test_fixture *fixture = user_data; - - struct request_response_protocol_adapter_incoming_publish_event_record record; - AWS_ZERO_STRUCT(record); - s_request_response_protocol_adapter_incoming_publish_event_record_init(&record, fixture->allocator, publish->topic, publish->payload); - - aws_mutex_lock(&fixture->lock); - aws_array_list_push_back(&fixture->incoming_publish_events, &record); - aws_mutex_unlock(&fixture->lock); - aws_condition_variable_notify_all(&fixture->signal); -} - -static void s_rr_mqtt5_protocol_adapter_test_on_terminate_callback(void *user_data) { - struct aws_request_response_mqtt5_adapter_test_fixture *fixture = user_data; - - aws_mutex_lock(&fixture->lock); - fixture->adapter_terminated = true; - aws_mutex_unlock(&fixture->lock); - aws_condition_variable_notify_all(&fixture->signal); -} - -static void s_rr_mqtt5_protocol_adapter_test_on_connection_event(struct aws_protocol_adapter_connection_event *event, void *user_data) { - struct aws_request_response_mqtt5_adapter_test_fixture *fixture = user_data; - - struct request_response_protocol_adapter_connection_event_record record = { - .event_type = event->event_type, - .rejoined_session = event->rejoined_session - }; - - aws_mutex_lock(&fixture->lock); - aws_array_list_push_back(&fixture->connection_events, &record); - aws_mutex_unlock(&fixture->lock); - aws_condition_variable_notify_all(&fixture->signal); -} - -static int s_aws_request_response_mqtt5_adapter_test_fixture_init( - struct aws_request_response_mqtt5_adapter_test_fixture *fixture, - struct aws_allocator *allocator, - struct aws_mqtt5_client_mqtt5_mock_test_fixture_options *mqtt5_fixture_config) { - - AWS_ZERO_STRUCT(*fixture); - - fixture->allocator = allocator; - - if (aws_mqtt5_client_mock_test_fixture_init(&fixture->mqtt5_fixture, allocator, mqtt5_fixture_config)) { - return AWS_OP_ERR; - } - - struct aws_mqtt_protocol_adapter_options protocol_adapter_options = { - .subscription_event_callback = s_rr_mqtt5_protocol_adapter_test_on_subscription_event, - .incoming_publish_callback = s_rr_mqtt5_protocol_adapter_test_on_incoming_publish, - .terminate_callback = s_rr_mqtt5_protocol_adapter_test_on_terminate_callback, - .connection_event_callback = s_rr_mqtt5_protocol_adapter_test_on_connection_event, - .user_data = fixture - }; - - fixture->protocol_adapter = aws_mqtt_protocol_adapter_new_from_5(allocator, &protocol_adapter_options, fixture->mqtt5_fixture.client); - AWS_FATAL_ASSERT(fixture->protocol_adapter != NULL); - - aws_array_list_init_dynamic(&fixture->incoming_publish_events, allocator, 10, sizeof(struct request_response_protocol_adapter_incoming_publish_event_record)); - aws_array_list_init_dynamic(&fixture->connection_events, allocator, 10, sizeof(struct request_response_protocol_adapter_connection_event_record)); - aws_array_list_init_dynamic(&fixture->subscription_events, allocator, 10, sizeof(struct request_response_protocol_adapter_subscription_event_record)); - - aws_mutex_init(&fixture->lock); - aws_condition_variable_init(&fixture->signal); - - return AWS_OP_SUCCESS; -} - -static bool s_is_adapter_terminated(void *context) { - struct aws_request_response_mqtt5_adapter_test_fixture *fixture = context; - - return fixture->adapter_terminated; -} - -static void s_aws_mqtt5_to_mqtt3_adapter_test_fixture_clean_up(struct aws_request_response_mqtt5_adapter_test_fixture *fixture) { - aws_mqtt_protocol_adapter_delete(fixture->protocol_adapter); - - aws_mutex_lock(&fixture->lock); - aws_condition_variable_wait_pred(&fixture->signal, &fixture->signal, s_is_adapter_terminated, fixture); - aws_mutex_unlock(&fixture->lock); - - aws_mqtt5_client_mock_test_fixture_clean_up(&fixture->mqtt5_fixture); - - for (size_t i = 0; i < aws_array_list_length(&fixture->subscription_events); ++i) { - struct request_response_protocol_adapter_subscription_event_record record; - aws_array_list_get_at(&fixture->subscription_events, &record, i); - s_request_response_protocol_adapter_incoming_subscription_event_record_cleanup(&record); - } - aws_array_list_clean_up(&fixture->subscription_events); - - for (size_t i = 0; i < aws_array_list_length(&fixture->incoming_publish_events); ++i) { - struct request_response_protocol_adapter_incoming_publish_event_record record; - aws_array_list_get_at(&fixture->incoming_publish_events, &record, i); - s_request_response_protocol_adapter_incoming_publish_event_record_cleanup(&record); - } - aws_array_list_clean_up(&fixture->incoming_publish_events); - - aws_array_list_clean_up(&fixture->connection_events); - - aws_mutex_clean_up(&fixture->lock); - aws_condition_variable_clean_up(&fixture->signal); -}