diff --git a/include/aws/mqtt/private/v5/mqtt3_to_mqtt5_adapter_impl.h b/include/aws/mqtt/private/v5/mqtt3_to_mqtt5_adapter_impl.h index 1a3e2b3f..bf51b66a 100644 --- a/include/aws/mqtt/private/v5/mqtt3_to_mqtt5_adapter_impl.h +++ b/include/aws/mqtt/private/v5/mqtt3_to_mqtt5_adapter_impl.h @@ -281,7 +281,7 @@ struct aws_mqtt_client_connection_5_impl { struct aws_mqtt3_to_mqtt5_adapter_operation_table operational_state; - struct aws_mqtt_subscription_set subscriptions; + struct aws_mqtt_subscription_set *subscriptions; /* All fields after here are internal to the adapter event loop thread */ diff --git a/source/v5/mqtt3_to_mqtt5_adapter.c b/source/v5/mqtt3_to_mqtt5_adapter.c index 37643d41..aa0d605a 100644 --- a/source/v5/mqtt3_to_mqtt5_adapter.c +++ b/source/v5/mqtt3_to_mqtt5_adapter.c @@ -38,6 +38,7 @@ static void s_mqtt_adapter_final_destroy_task_fn(struct aws_task *task, void *ar adapter->client->config->websocket_handshake_transform_user_data = NULL; } + aws_mqtt_subscription_set_destroy(adapter->subscriptions); aws_mqtt3_to_mqtt5_adapter_operation_table_clean_up(&adapter->operational_state); adapter->client = aws_mqtt5_client_release(adapter->client); @@ -1493,7 +1494,9 @@ static void s_adapter_publish_operation_destroy(struct aws_mqtt3_to_mqtt5_adapte adapter_to_release = publish_op->base.adapter; } + /* We're going away before our MQTT5 operation, make sure it doesn't try to call us back when it completes */ if (publish_op->publish_op != NULL) { + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(publish_op->base.adapter->loop)); publish_op->publish_op->completion_options.completion_callback = NULL; publish_op->publish_op->completion_options.completion_user_data = NULL; } @@ -1588,9 +1591,10 @@ void s_adapter_publish_submission_fn(struct aws_task *task, void *arg, enum aws_ struct aws_mqtt_client_connection_5_impl *adapter = operation->base.adapter; - aws_mqtt3_to_mqtt5_adapter_operation_dereference_adapter(&operation->base); aws_mqtt5_client_submit_operation_internal( adapter->client, &operation->publish_op->base, status != AWS_TASK_STATUS_RUN_READY); + + aws_mqtt3_to_mqtt5_adapter_operation_dereference_adapter(&operation->base); } static uint16_t s_aws_mqtt_client_connection_5_publish( @@ -1677,7 +1681,9 @@ static void s_adapter_subscribe_operation_destroy(struct aws_mqtt3_to_mqtt5_adap adapter_to_release = subscribe_op->base.adapter; } + /* We're going away before our MQTT5 operation, make sure it doesn't try to call us back when it completes */ if (subscribe_op->subscribe_op != NULL) { + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(subscribe_op->base.adapter->loop)); subscribe_op->subscribe_op->completion_options.completion_callback = NULL; subscribe_op->subscribe_op->completion_options.completion_user_data = NULL; } @@ -1905,18 +1911,18 @@ void s_adapter_subscribe_submission_fn(struct aws_task *task, void *arg, enum aw struct aws_mqtt_client_connection_5_impl *adapter = operation->base.adapter; - aws_mqtt3_to_mqtt5_adapter_operation_dereference_adapter(&operation->base); - size_t subscription_count = aws_array_list_length(&operation->subscriptions); for (size_t i = 0; i < subscription_count; ++i) { struct aws_mqtt_subscription_set_subscription_record *record = NULL; aws_array_list_get_at(&operation->subscriptions, &record, i); - aws_mqtt_subscription_set_add_subscription(&adapter->subscriptions, &record->subscription_view); + aws_mqtt_subscription_set_add_subscription(adapter->subscriptions, &record->subscription_view); } aws_mqtt5_client_submit_operation_internal( adapter->client, &operation->subscribe_op->base, status != AWS_TASK_STATUS_RUN_READY); + + aws_mqtt3_to_mqtt5_adapter_operation_dereference_adapter(&operation->base); } static uint16_t s_aws_mqtt_client_connection_5_subscribe( @@ -2039,7 +2045,9 @@ static void s_adapter_unsubscribe_operation_destroy(struct aws_mqtt3_to_mqtt5_ad adapter_to_release = unsubscribe_op->base.adapter; } + /* We're going away before our MQTT5 operation, make sure it doesn't try to call us back when it completes */ if (unsubscribe_op->unsubscribe_op != NULL) { + AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(unsubscribe_op->base.adapter->loop)); unsubscribe_op->unsubscribe_op->completion_options.completion_callback = NULL; unsubscribe_op->unsubscribe_op->completion_options.completion_user_data = NULL; } @@ -2131,13 +2139,13 @@ void s_adapter_unsubscribe_submission_fn(struct aws_task *task, void *arg, enum struct aws_mqtt_client_connection_5_impl *adapter = operation->base.adapter; - aws_mqtt3_to_mqtt5_adapter_operation_dereference_adapter(&operation->base); - aws_mqtt_subscription_set_remove_subscription( - &adapter->subscriptions, aws_byte_cursor_from_buf(&operation->topic_filter)); + adapter->subscriptions, aws_byte_cursor_from_buf(&operation->topic_filter)); aws_mqtt5_client_submit_operation_internal( adapter->client, &operation->unsubscribe_op->base, status != AWS_TASK_STATUS_RUN_READY); + + aws_mqtt3_to_mqtt5_adapter_operation_dereference_adapter(&operation->base); } static uint16_t s_aws_mqtt_client_connection_5_unsubscribe( @@ -2238,6 +2246,8 @@ struct aws_mqtt_client_connection *aws_mqtt_client_connection_new_from_mqtt5_cli aws_mqtt3_to_mqtt5_adapter_operation_table_init(&adapter->operational_state, allocator); + adapter->subscriptions = aws_mqtt_subscription_set_new(allocator); + /* * We start disabled to handle the case where someone passes in an mqtt5 client that is already "live." * We'll enable the adapter as soon as they try to connect via the 311 interface. This diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index c31422ac..5be49eb4 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -384,6 +384,15 @@ add_test_case(mqtt3to5_adapter_publish_success_qos1) add_test_case(mqtt3to5_adapter_publish_no_ack) add_test_case(mqtt3to5_adapter_publish_interrupted) +add_test_case(mqtt3to5_adapter_subscribe_single_success) +#add_test_case(mqtt3to5_adapter_subscribe_multi_success) +#add_test_case(mqtt3to5_adapter_subscribe_single_failure) +#add_test_case(mqtt3to5_adapter_subscribe_multi_failure) +#add_test_case(mqtt3to5_adapter_subscribe_single_publish) +#add_test_case(mqtt3to5_adapter_subscribe_multi_overlapping_publish) +#add_test_case(mqtt3to5_adapter_unsubscribe_success) +#add_test_case(mqtt3to5_adapter_unsubscribe_failure) + add_test_case(mqtt_subscription_set_add_empty_not_subbed) add_test_case(mqtt_subscription_set_add_single_path) add_test_case(mqtt_subscription_set_add_overlapped_branching_paths) diff --git a/tests/v5/mqtt3_to_mqtt5_adapter_tests.c b/tests/v5/mqtt3_to_mqtt5_adapter_tests.c index 408407d9..35e67cc4 100644 --- a/tests/v5/mqtt3_to_mqtt5_adapter_tests.c +++ b/tests/v5/mqtt3_to_mqtt5_adapter_tests.c @@ -59,6 +59,9 @@ struct aws_mqtt3_operation_event { enum aws_mqtt_qos qos; struct aws_byte_buf topic; struct aws_byte_buf payload; + + // subscribe complete properties + struct aws_array_list granted_subscriptions; }; static void s_aws_mqtt3_operation_event_clean_up(struct aws_mqtt3_operation_event *event) { @@ -68,6 +71,8 @@ static void s_aws_mqtt3_operation_event_clean_up(struct aws_mqtt3_operation_even aws_byte_buf_clean_up(&event->topic); aws_byte_buf_clean_up(&event->payload); + + aws_array_list_clean_up(&event->granted_subscriptions); } static int s_aws_mqtt3_operation_event_verify_equals( @@ -81,6 +86,21 @@ static int s_aws_mqtt3_operation_event_verify_equals( ASSERT_BIN_ARRAYS_EQUALS( expected->payload.buffer, expected->payload.len, actual->payload.buffer, actual->payload.len); + ASSERT_INT_EQUALS( + aws_array_list_length(&expected->granted_subscriptions), aws_array_list_length(&actual->granted_subscriptions)); + for (size_t i = 0; i < aws_array_list_length(&expected->granted_subscriptions); ++i) { + + struct aws_mqtt_topic_subscription expected_sub; + aws_array_list_get_at(&expected->granted_subscriptions, &expected_sub, i); + + struct aws_mqtt_topic_subscription actual_sub; + aws_array_list_get_at(&actual->granted_subscriptions, &actual_sub, i); + + ASSERT_INT_EQUALS(expected_sub.qos, actual_sub.qos); + ASSERT_BIN_ARRAYS_EQUALS( + expected_sub.topic.ptr, expected_sub.topic.len, actual_sub.topic.ptr, actual_sub.topic.len); + } + return AWS_OP_SUCCESS; } @@ -1024,7 +1044,7 @@ static int s_aws_mqtt5_server_disconnect_after_connect( } static int s_verify_bad_connectivity_callbacks(struct aws_mqtt3_to_mqtt5_adapter_test_fixture *fixture) { - struct aws_mqtt3_lifecycle_event expected_events[] = { + struct aws_mqtt3_lifecycle_event expected_events_start[] = { { .type = AWS_MQTT3_LET_CONNECTION_COMPLETE, }, @@ -1046,6 +1066,11 @@ static int s_verify_bad_connectivity_callbacks(struct aws_mqtt3_to_mqtt5_adapter .type = AWS_MQTT3_LET_INTERRUPTED, .error_code = AWS_ERROR_MQTT5_DISCONNECT_RECEIVED, }, + }; + ASSERT_SUCCESS(s_aws_mqtt3_to_mqtt5_adapter_test_fixture_verify_lifecycle_sequence_starts_with( + fixture, AWS_ARRAY_SIZE(expected_events_start), expected_events_start)); + + struct aws_mqtt3_lifecycle_event expected_events_end[] = { { .type = AWS_MQTT3_LET_DISCONNECTION_COMPLETE, }, @@ -1053,8 +1078,9 @@ static int s_verify_bad_connectivity_callbacks(struct aws_mqtt3_to_mqtt5_adapter .type = AWS_MQTT3_LET_CLOSED, }, }; - ASSERT_SUCCESS(s_aws_mqtt3_to_mqtt5_adapter_test_fixture_verify_lifecycle_sequence( - fixture, AWS_ARRAY_SIZE(expected_events), expected_events, AWS_ARRAY_SIZE(expected_events))); + + ASSERT_SUCCESS(s_aws_mqtt3_to_mqtt5_adapter_test_fixture_verify_lifecycle_sequence_ends_with( + fixture, AWS_ARRAY_SIZE(expected_events_end), expected_events_end)); return AWS_OP_SUCCESS; } @@ -2145,3 +2171,168 @@ static int s_mqtt3to5_adapter_publish_interrupted_fn(struct aws_allocator *alloc } AWS_TEST_CASE(mqtt3to5_adapter_publish_interrupted, s_mqtt3to5_adapter_publish_interrupted_fn) + +void s_aws_mqtt3_to_mqtt5_adapter_test_fixture_record_subscribe_complete( + struct aws_mqtt_client_connection *connection, + uint16_t packet_id, + const struct aws_byte_cursor *topic, + enum aws_mqtt_qos qos, + int error_code, + void *userdata) { + + (void)connection; + (void)packet_id; + (void)topic; + + struct aws_mqtt3_to_mqtt5_adapter_test_fixture *fixture = userdata; + + struct aws_mqtt3_operation_event operation_event = { + .type = AWS_MQTT3_OET_SUBSCRIBE_COMPLETE, + .error_code = error_code, + }; + + aws_array_list_init_dynamic( + &operation_event.granted_subscriptions, + fixture->mqtt5_fixture.allocator, + 1, + sizeof(struct aws_mqtt_topic_subscription)); + + /* + * technically it's not safe to persist the topic cursor but they way the tests are built, the cursor will stay + * valid until the events are checked (as long as we don't delete the subscription internally) + */ + struct aws_mqtt_topic_subscription sub = { + .topic = *topic, + .qos = qos, + }; + aws_array_list_push_back(&operation_event.granted_subscriptions, (void *)&sub); + + aws_mutex_lock(&fixture->lock); + aws_array_list_push_back(&fixture->operation_events, &operation_event); + aws_mutex_unlock(&fixture->lock); + aws_condition_variable_notify_all(&fixture->signal); +} + +void s_aws_mqtt3_to_mqtt5_adapter_test_fixture_record_topic_specific_publish( + struct aws_mqtt_client_connection *connection, + const struct aws_byte_cursor *topic, + const struct aws_byte_cursor *payload, + bool dup, + enum aws_mqtt_qos qos, + bool retain, + void *userdata) { + + (void)connection; + (void)topic; + (void)payload; + (void)dup; + (void)qos; + (void)retain; + (void)userdata; +} + +static int s_mqtt5_mock_server_handle_subscribe_suback_success( + void *packet, + struct aws_mqtt5_server_mock_connection_context *connection, + void *user_data) { + + (void)user_data; + + struct aws_mqtt5_packet_subscribe_view *subscribe_view = packet; + + AWS_VARIABLE_LENGTH_ARRAY( + enum aws_mqtt5_suback_reason_code, mqtt5_suback_codes, subscribe_view->subscription_count); + for (size_t i = 0; i < subscribe_view->subscription_count; ++i) { + enum aws_mqtt5_suback_reason_code *reason_code_ptr = &mqtt5_suback_codes[i]; + *reason_code_ptr = (enum aws_mqtt5_suback_reason_code)subscribe_view->subscriptions[i].qos; + } + + struct aws_mqtt5_packet_suback_view suback_view = { + .packet_id = subscribe_view->packet_id, + .reason_code_count = subscribe_view->subscription_count, + .reason_codes = mqtt5_suback_codes, + }; + + return aws_mqtt5_mock_server_send_packet(connection, AWS_MQTT5_PT_SUBACK, &suback_view); +} + +static int s_mqtt3to5_adapter_subscribe_single_success_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + aws_mqtt_library_init(allocator); + + struct mqtt5_client_test_options test_options; + aws_mqtt5_client_test_init_default_options(&test_options); + + test_options.server_function_table.packet_handlers[AWS_MQTT5_PT_SUBSCRIBE] = + s_mqtt5_mock_server_handle_subscribe_suback_success; + + struct aws_mqtt5_client_mqtt5_mock_test_fixture_options test_fixture_options = { + .client_options = &test_options.client_options, + .server_function_table = &test_options.server_function_table, + }; + + struct aws_mqtt3_to_mqtt5_adapter_test_fixture fixture; + ASSERT_SUCCESS(aws_mqtt3_to_mqtt5_adapter_test_fixture_init(&fixture, allocator, &test_fixture_options, NULL)); + + struct aws_mqtt_client_connection *connection = fixture.connection; + + struct aws_mqtt_connection_options connection_options; + s_init_adapter_connection_options_from_fixture(&connection_options, &fixture); + + connection_options.on_connection_complete = s_aws_mqtt3_to_mqtt5_adapter_test_fixture_record_connection_complete; + connection_options.user_data = &fixture; + + aws_mqtt_client_connection_connect(connection, &connection_options); + + s_wait_for_n_adapter_lifecycle_events(&fixture, AWS_MQTT3_LET_CONNECTION_COMPLETE, 1); + + struct aws_byte_cursor topic = aws_byte_cursor_from_c_str("derp"); + + aws_mqtt_client_connection_subscribe( + connection, + &topic, + AWS_MQTT_QOS_AT_LEAST_ONCE, + s_aws_mqtt3_to_mqtt5_adapter_test_fixture_record_topic_specific_publish, + &fixture, + NULL, + s_aws_mqtt3_to_mqtt5_adapter_test_fixture_record_subscribe_complete, + &fixture); + + s_wait_for_n_adapter_operation_events(&fixture, AWS_MQTT3_OET_SUBSCRIBE_COMPLETE, 1); + + struct aws_mqtt_topic_subscription expected_subs[1] = {{ + .topic = topic, + .qos = AWS_MQTT_QOS_AT_LEAST_ONCE, + }}; + + struct aws_mqtt3_operation_event expected_events[] = {{ + .type = AWS_MQTT3_OET_SUBSCRIBE_COMPLETE, + .error_code = AWS_ERROR_SUCCESS, + }}; + aws_array_list_init_static( + &expected_events[0].granted_subscriptions, + (void *)expected_subs, + 1, + sizeof(struct aws_mqtt_topic_subscription)); + + s_aws_mqtt3_to_mqtt5_adapter_test_fixture_verify_operation_sequence( + &fixture, AWS_ARRAY_SIZE(expected_events), expected_events, AWS_ARRAY_SIZE(expected_events)); + + aws_mqtt3_to_mqtt5_adapter_test_fixture_clean_up(&fixture); + aws_mqtt_library_clean_up(); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE(mqtt3to5_adapter_subscribe_single_success, s_mqtt3to5_adapter_subscribe_single_success_fn) + +/* +#add_test_case(mqtt3to5_adapter_subscribe_multi_success) +#add_test_case(mqtt3to5_adapter_subscribe_single_failure) +#add_test_case(mqtt3to5_adapter_subscribe_multi_failure) +#add_test_case(mqtt3to5_adapter_subscribe_single_publish) +#add_test_case(mqtt3to5_adapter_subscribe_multi_overlapping_publish) +#add_test_case(mqtt3to5_adapter_unsubscribe_success) +#add_test_case(mqtt3to5_adapter_unsubscribe_failure) + */ \ No newline at end of file