diff --git a/include/aws/mqtt/private/client_impl.h b/include/aws/mqtt/private/client_impl.h index ce041b2d..ab9822c3 100644 --- a/include/aws/mqtt/private/client_impl.h +++ b/include/aws/mqtt/private/client_impl.h @@ -154,6 +154,25 @@ struct aws_mqtt_reconnect_task { struct aws_allocator *allocator; }; +struct request_timeout_wrapper; + +/* used for timeout task */ +struct request_timeout_task_arg { + uint16_t packet_id; + struct aws_mqtt_client_connection_311_impl *connection; + struct request_timeout_wrapper *task_arg_wrapper; +}; + +/* + * We want the timeout task to be able to destroy the forward reference from the operation's task arg structure + * to the timeout task. But the operation task arg structures don't have any data structure in common. So to allow + * the timeout to refer back to a zero-able forward pointer, we wrap a pointer to the timeout task and embed it + * in every operation's task arg that needs to create a timeout. + */ +struct request_timeout_wrapper { + struct request_timeout_task_arg *timeout_task_arg; +}; + /* The lifetime of this struct is from subscribe -> suback */ struct subscribe_task_arg { @@ -173,6 +192,9 @@ struct subscribe_task_arg { aws_mqtt_suback_fn *single; } on_suback; void *on_suback_ud; + + struct request_timeout_wrapper timeout_wrapper; + uint64_t timeout_duration_in_ns; }; /* The lifetime of this struct is the same as the lifetime of the subscription */ @@ -429,4 +451,32 @@ void aws_mqtt_connection_statistics_change_operation_statistic_state( AWS_MQTT_API const struct aws_mqtt_client_connection_packet_handlers *aws_mqtt311_get_default_packet_handlers(void); +AWS_MQTT_API uint16_t aws_mqtt_client_connection_311_unsubscribe( + struct aws_mqtt_client_connection_311_impl *connection, + const struct aws_byte_cursor *topic_filter, + aws_mqtt_op_complete_fn *on_unsuback, + void *on_unsuback_ud, + uint64_t timeout_ns); + +AWS_MQTT_API uint16_t aws_mqtt_client_connection_311_subscribe( + struct aws_mqtt_client_connection_311_impl *connection, + const struct aws_byte_cursor *topic_filter, + enum aws_mqtt_qos qos, + aws_mqtt_client_publish_received_fn *on_publish, + void *on_publish_ud, + aws_mqtt_userdata_cleanup_fn *on_ud_cleanup, + aws_mqtt_suback_fn *on_suback, + void *on_suback_ud, + uint64_t timeout_ns); + +AWS_MQTT_API uint16_t aws_mqtt_client_connection_311_publish( + struct aws_mqtt_client_connection_311_impl *connection, + const struct aws_byte_cursor *topic, + enum aws_mqtt_qos qos, + bool retain, + const struct aws_byte_cursor *payload, + aws_mqtt_op_complete_fn *on_complete, + void *userdata, + uint64_t timeout_ns); + #endif /* AWS_MQTT_PRIVATE_CLIENT_IMPL_H */ diff --git a/source/client.c b/source/client.c index 2d896aeb..5ba4ee5c 100644 --- a/source/client.c +++ b/source/client.c @@ -93,25 +93,6 @@ void mqtt_connection_set_state( connection->synced_data.state = state; } -struct request_timeout_wrapper; - -/* used for timeout task */ -struct request_timeout_task_arg { - uint16_t packet_id; - struct aws_mqtt_client_connection_311_impl *connection; - struct request_timeout_wrapper *task_arg_wrapper; -}; - -/* - * We want the timeout task to be able to destroy the forward reference from the operation's task arg structure - * to the timeout task. But the operation task arg structures don't have any data structure in common. So to allow - * the timeout to refer back to a zero-able forward pointer, we wrap a pointer to the timeout task and embed it - * in every operation's task arg that needs to create a timeout. - */ -struct request_timeout_wrapper { - struct request_timeout_task_arg *timeout_task_arg; -}; - static void s_request_timeout(struct aws_channel_task *channel_task, void *arg, enum aws_task_status status) { (void)channel_task; struct request_timeout_task_arg *timeout_task_arg = arg; @@ -139,8 +120,14 @@ static void s_request_timeout(struct aws_channel_task *channel_task, void *arg, static struct request_timeout_task_arg *s_schedule_timeout_task( struct aws_mqtt_client_connection_311_impl *connection, - uint16_t packet_id) { - /* schedule a timeout task to run, in case server consider the publish is not received */ + uint16_t packet_id, + uint64_t timeout_duration_in_ns) { + + if (timeout_duration_in_ns == UINT64_MAX || timeout_duration_in_ns == 0 || packet_id == 0) { + return NULL; + } + + /* schedule a timeout task to run, in case server never sends us an ack */ struct aws_channel_task *request_timeout_task = NULL; struct request_timeout_task_arg *timeout_task_arg = NULL; if (!aws_mem_acquire_many( @@ -161,7 +148,7 @@ static struct request_timeout_task_arg *s_schedule_timeout_task( aws_mem_release(connection->allocator, timeout_task_arg); return NULL; } - timestamp = aws_add_u64_saturating(timestamp, connection->operation_timeout_ns); + timestamp = aws_add_u64_saturating(timestamp, timeout_duration_in_ns); aws_channel_schedule_task_future(connection->slot->channel, request_timeout_task, timestamp); return timeout_task_arg; } @@ -1900,6 +1887,20 @@ static enum aws_mqtt_client_request_state s_subscribe_send(uint16_t packet_id, b aws_mem_release(message->allocator, message); } + /* TODO: timing should start from the message written into the socket, which is aws_io_message->on_completion + * invoked, but there are bugs in the websocket handler (and maybe also the h1 handler?) where we don't properly + * fire the on_completion callbacks. */ + struct request_timeout_task_arg *timeout_task_arg = + s_schedule_timeout_task(task_arg->connection, packet_id, task_arg->timeout_duration_in_ns); + if (timeout_task_arg) { + /* + * Set up mutual references between the operation task args and the timeout task args. Whoever runs first + * "wins", does its logic, and then breaks the connection between the two. + */ + task_arg->timeout_wrapper.timeout_task_arg = timeout_task_arg; + timeout_task_arg->task_arg_wrapper = &task_arg->timeout_wrapper; + } + if (!task_arg->tree_updated) { aws_mqtt_topic_tree_transaction_commit(&task_arg->connection->thread_data.subscriptions, &transaction); task_arg->tree_updated = true; @@ -1965,6 +1966,17 @@ static void s_subscribe_complete( error_code, task_arg->on_suback_ud); } + + /* + * If we have a forward pointer to a timeout task, then that means the timeout task has not run yet. So we should + * follow it and zero out the back pointer to us, because we're going away now. The timeout task will run later + * and be harmless (even vs. future operations with the same packet id) because it only cancels if it has a back + * pointer. + */ + if (task_arg->timeout_wrapper.timeout_task_arg) { + task_arg->timeout_wrapper.timeout_task_arg->task_arg_wrapper = NULL; + } + for (size_t i = 0; i < list_len; i++) { aws_array_list_get_at(&task_arg->topics, &topic, i); s_task_topic_release(topic); @@ -1997,6 +2009,7 @@ static uint16_t s_aws_mqtt_client_connection_311_subscribe_multiple( task_arg->connection = connection; task_arg->on_suback.multi = on_suback; task_arg->on_suback_ud = on_suback_ud; + task_arg->timeout_duration_in_ns = connection->operation_timeout_ns; const size_t num_topics = aws_array_list_length(topic_filters); @@ -2135,23 +2148,33 @@ static void s_subscribe_single_complete( error_code, task_arg->on_suback_ud); } + + /* + * If we have a forward pointer to a timeout task, then that means the timeout task has not run yet. So we should + * follow it and zero out the back pointer to us, because we're going away now. The timeout task will run later + * and be harmless (even vs. future operations with the same packet id) because it only cancels if it has a back + * pointer. + */ + if (task_arg->timeout_wrapper.timeout_task_arg) { + task_arg->timeout_wrapper.timeout_task_arg->task_arg_wrapper = NULL; + } + s_task_topic_release(topic); aws_array_list_clean_up(&task_arg->topics); aws_mqtt_packet_subscribe_clean_up(&task_arg->subscribe); aws_mem_release(task_arg->connection->allocator, task_arg); } -static uint16_t s_aws_mqtt_client_connection_311_subscribe( - void *impl, +uint16_t aws_mqtt_client_connection_311_subscribe( + struct aws_mqtt_client_connection_311_impl *connection, const struct aws_byte_cursor *topic_filter, enum aws_mqtt_qos qos, aws_mqtt_client_publish_received_fn *on_publish, void *on_publish_ud, aws_mqtt_userdata_cleanup_fn *on_ud_cleanup, aws_mqtt_suback_fn *on_suback, - void *on_suback_ud) { - - struct aws_mqtt_client_connection_311_impl *connection = impl; + void *on_suback_ud, + uint64_t timeout_ns) { AWS_PRECONDITION(connection); @@ -2180,6 +2203,7 @@ static uint16_t s_aws_mqtt_client_connection_311_subscribe( task_arg->connection = connection; task_arg->on_suback.single = on_suback; task_arg->on_suback_ud = on_suback_ud; + task_arg->timeout_duration_in_ns = timeout_ns; /* It stores the pointer */ aws_array_list_init_static(&task_arg->topics, task_topic_storage, 1, sizeof(void *)); @@ -2255,6 +2279,29 @@ static uint16_t s_aws_mqtt_client_connection_311_subscribe( return 0; } +static uint16_t s_aws_mqtt_client_connection_311_subscribe( + void *impl, + const struct aws_byte_cursor *topic_filter, + enum aws_mqtt_qos qos, + aws_mqtt_client_publish_received_fn *on_publish, + void *on_publish_ud, + aws_mqtt_userdata_cleanup_fn *on_ud_cleanup, + aws_mqtt_suback_fn *on_suback, + void *on_suback_ud) { + + struct aws_mqtt_client_connection_311_impl *connection = impl; + return aws_mqtt_client_connection_311_subscribe( + connection, + topic_filter, + qos, + on_publish, + on_publish_ud, + on_ud_cleanup, + on_suback, + on_suback_ud, + connection->operation_timeout_ns); +} + /******************************************************************************* * Resubscribe ******************************************************************************/ @@ -2359,6 +2406,20 @@ static enum aws_mqtt_client_request_state s_resubscribe_send( aws_mem_release(message->allocator, message); } + /* TODO: timing should start from the message written into the socket, which is aws_io_message->on_completion + * invoked, but there are bugs in the websocket handler (and maybe also the h1 handler?) where we don't properly + * fire the on_completion callbacks. */ + struct request_timeout_task_arg *timeout_task_arg = + s_schedule_timeout_task(task_arg->connection, packet_id, task_arg->timeout_duration_in_ns); + if (timeout_task_arg) { + /* + * Set up mutual references between the operation task args and the timeout task args. Whoever runs first + * "wins", does its logic, and then breaks the connection between the two. + */ + task_arg->timeout_wrapper.timeout_task_arg = timeout_task_arg; + timeout_task_arg->task_arg_wrapper = &task_arg->timeout_wrapper; + } + return AWS_MQTT_CLIENT_REQUEST_ONGOING; handle_error: @@ -2422,6 +2483,16 @@ static void s_resubscribe_complete( clean_up: + /* + * If we have a forward pointer to a timeout task, then that means the timeout task has not run yet. So we should + * follow it and zero out the back pointer to us, because we're going away now. The timeout task will run later + * and be harmless (even vs. future operations with the same packet id) because it only cancels if it has a back + * pointer. + */ + if (task_arg->timeout_wrapper.timeout_task_arg) { + task_arg->timeout_wrapper.timeout_task_arg->task_arg_wrapper = NULL; + } + /* We need to cleanup the subscribe_task_topics, since they are not inserted into the topic tree by resubscribe. We * take the ownership to clean it up */ for (size_t i = 0; i < list_len; i++) { @@ -2451,6 +2522,7 @@ static uint16_t s_aws_mqtt_311_resubscribe_existing_topics( task_arg->connection = connection; task_arg->on_suback.multi = on_suback; task_arg->on_suback_ud = on_suback_ud; + task_arg->timeout_duration_in_ns = connection->operation_timeout_ns; /* Calculate the size of the packet. * The fixed header is 2 bytes and the packet ID is 2 bytes @@ -2511,6 +2583,7 @@ struct unsubscribe_task_arg { void *on_unsuback_ud; struct request_timeout_wrapper timeout_wrapper; + uint64_t timeout_duration_in_ns; }; static enum aws_mqtt_client_request_state s_unsubscribe_send( @@ -2602,18 +2675,17 @@ static enum aws_mqtt_client_request_state s_unsubscribe_send( /* TODO: timing should start from the message written into the socket, which is aws_io_message->on_completion * invoked, but there are bugs in the websocket handler (and maybe also the h1 handler?) where we don't properly * fire the on_completion callbacks. */ - struct request_timeout_task_arg *timeout_task_arg = s_schedule_timeout_task(task_arg->connection, packet_id); - if (!timeout_task_arg) { - return AWS_MQTT_CLIENT_REQUEST_ERROR; + struct request_timeout_task_arg *timeout_task_arg = + s_schedule_timeout_task(task_arg->connection, packet_id, task_arg->timeout_duration_in_ns); + if (timeout_task_arg) { + /* + * Set up mutual references between the operation task args and the timeout task args. Whoever runs first + * "wins", does its logic, and then breaks the connection between the two. + */ + task_arg->timeout_wrapper.timeout_task_arg = timeout_task_arg; + timeout_task_arg->task_arg_wrapper = &task_arg->timeout_wrapper; } - /* - * Set up mutual references between the operation task args and the timeout task args. Whoever runs first - * "wins", does its logic, and then breaks the connection between the two. - */ - task_arg->timeout_wrapper.timeout_task_arg = timeout_task_arg; - timeout_task_arg->task_arg_wrapper = &task_arg->timeout_wrapper; - if (!task_arg->tree_updated) { aws_mqtt_topic_tree_transaction_commit(&task_arg->connection->thread_data.subscriptions, &transaction); task_arg->tree_updated = true; @@ -2656,7 +2728,6 @@ static void s_unsubscribe_complete( */ if (task_arg->timeout_wrapper.timeout_task_arg) { task_arg->timeout_wrapper.timeout_task_arg->task_arg_wrapper = NULL; - task_arg->timeout_wrapper.timeout_task_arg = NULL; } if (task_arg->on_unsuback) { @@ -2668,13 +2739,12 @@ static void s_unsubscribe_complete( aws_mem_release(task_arg->connection->allocator, task_arg); } -static uint16_t s_aws_mqtt_client_connection_311_unsubscribe( - void *impl, +uint16_t aws_mqtt_client_connection_311_unsubscribe( + struct aws_mqtt_client_connection_311_impl *connection, const struct aws_byte_cursor *topic_filter, aws_mqtt_op_complete_fn *on_unsuback, - void *on_unsuback_ud) { - - struct aws_mqtt_client_connection_311_impl *connection = impl; + void *on_unsuback_ud, + uint64_t timeout_ns) { AWS_PRECONDITION(connection); @@ -2694,6 +2764,7 @@ static uint16_t s_aws_mqtt_client_connection_311_unsubscribe( task_arg->filter = aws_byte_cursor_from_string(task_arg->filter_string); task_arg->on_unsuback = on_unsuback; task_arg->on_unsuback_ud = on_unsuback_ud; + task_arg->timeout_duration_in_ns = timeout_ns; /* Calculate the size of the unsubscribe packet. * The fixed header is always 2 bytes, the packet ID is always 2 bytes @@ -2729,6 +2800,18 @@ static uint16_t s_aws_mqtt_client_connection_311_unsubscribe( return 0; } +static uint16_t s_aws_mqtt_client_connection_311_unsubscribe( + void *impl, + const struct aws_byte_cursor *topic_filter, + aws_mqtt_op_complete_fn *on_unsuback, + void *on_unsuback_ud) { + + struct aws_mqtt_client_connection_311_impl *connection = impl; + + return aws_mqtt_client_connection_311_unsubscribe( + connection, topic_filter, on_unsuback, on_unsuback_ud, connection->operation_timeout_ns); +} + /******************************************************************************* * Publish ******************************************************************************/ @@ -2748,6 +2831,7 @@ struct publish_task_arg { aws_mqtt_op_complete_fn *on_complete; void *userdata; + uint64_t timeout_duration_in_ns; struct request_timeout_wrapper timeout_wrapper; }; @@ -2882,15 +2966,13 @@ static enum aws_mqtt_client_request_state s_publish_send(uint16_t packet_id, boo goto write_payload_chunk; } } - if (!is_qos_0 && connection->operation_timeout_ns != UINT64_MAX) { - /* TODO: timing should start from the message written into the socket, which is aws_io_message->on_completion - * invoked, but there are bugs in the websocket handler (and maybe also the h1 handler?) where we don't properly - * fire fire the on_completion callbacks. */ - struct request_timeout_task_arg *timeout_task_arg = s_schedule_timeout_task(connection, packet_id); - if (!timeout_task_arg) { - return AWS_MQTT_CLIENT_REQUEST_ERROR; - } + /* TODO: timing should start from the message written into the socket, which is aws_io_message->on_completion + * invoked, but there are bugs in the websocket handler (and maybe also the h1 handler?) where we don't properly + * fire fire the on_completion callbacks. */ + struct request_timeout_task_arg *timeout_task_arg = + s_schedule_timeout_task(connection, packet_id, task_arg->timeout_duration_in_ns); + if (timeout_task_arg != NULL) { /* * Set up mutual references between the operation task args and the timeout task args. Whoever runs first * "wins", does its logic, and then breaks the connection between the two. @@ -2927,7 +3009,6 @@ static void s_publish_complete( */ if (task_arg->timeout_wrapper.timeout_task_arg != NULL) { task_arg->timeout_wrapper.timeout_task_arg->task_arg_wrapper = NULL; - task_arg->timeout_wrapper.timeout_task_arg = NULL; } aws_byte_buf_clean_up(&task_arg->payload_buf); @@ -2935,16 +3016,15 @@ static void s_publish_complete( aws_mem_release(connection->allocator, task_arg); } -static uint16_t s_aws_mqtt_client_connection_311_publish( - void *impl, +uint16_t aws_mqtt_client_connection_311_publish( + struct aws_mqtt_client_connection_311_impl *connection, const struct aws_byte_cursor *topic, enum aws_mqtt_qos qos, bool retain, const struct aws_byte_cursor *payload, aws_mqtt_op_complete_fn *on_complete, - void *userdata) { - - struct aws_mqtt_client_connection_311_impl *connection = impl; + void *userdata, + uint64_t timeout_ns) { AWS_PRECONDITION(connection); @@ -2968,6 +3048,7 @@ static uint16_t s_aws_mqtt_client_connection_311_publish( arg->topic = aws_byte_cursor_from_string(arg->topic_string); arg->qos = qos; arg->retain = retain; + arg->timeout_duration_in_ns = timeout_ns; struct aws_byte_cursor payload_cursor; AWS_ZERO_STRUCT(payload_cursor); @@ -3025,6 +3106,21 @@ static uint16_t s_aws_mqtt_client_connection_311_publish( return 0; } +static uint16_t s_aws_mqtt_client_connection_311_publish( + void *impl, + const struct aws_byte_cursor *topic, + enum aws_mqtt_qos qos, + bool retain, + const struct aws_byte_cursor *payload, + aws_mqtt_op_complete_fn *on_complete, + void *userdata) { + + struct aws_mqtt_client_connection_311_impl *connection = impl; + + return aws_mqtt_client_connection_311_publish( + connection, topic, qos, retain, payload, on_complete, userdata, connection->operation_timeout_ns); +} + /******************************************************************************* * Ping ******************************************************************************/ diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index a6080135..720339ca 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -78,7 +78,13 @@ add_test_case(mqtt_clean_session_not_retry) add_test_case(mqtt_clean_session_discard_previous) add_test_case(mqtt_clean_session_keep_next_session) add_test_case(mqtt_connection_publish_QoS1_timeout) -add_test_case(mqtt_connection_unsub_timeout) +add_test_case(mqtt_connection_publish_QoS1_timeout_override) +add_test_case(mqtt_connection_unsubscribe_timeout) +add_test_case(mqtt_connection_unsubscribe_timeout_override) +add_test_case(mqtt_connection_subscribe_single_timeout) +add_test_case(mqtt_connection_subscribe_single_timeout_override) +add_test_case(mqtt_connection_subscribe_multi_timeout) +add_test_case(mqtt_connection_resubscribe_timeout) add_test_case(mqtt_connection_publish_QoS1_timeout_connection_lost_reset_time) add_test_case(mqtt_connection_ping_norm) add_test_case(mqtt_connection_ping_no) diff --git a/tests/v3/connection_state_test.c b/tests/v3/connection_state_test.c index 65f8b9e7..6447bb7b 100644 --- a/tests/v3/connection_state_test.c +++ b/tests/v3/connection_state_test.c @@ -9,6 +9,7 @@ #include "mqtt_mock_server_handler.h" #include +#include #include static struct mqtt_connection_state_test test_data = {0}; @@ -2376,10 +2377,70 @@ AWS_TEST_CASE_FIXTURE( s_clean_up_mqtt_server_fn, &test_data) +/** + * Test that connection is healthy, user set the timeout for request, and timeout happens and the publish failed. + */ +static int s_test_mqtt_connection_publish_QoS1_timeout_override_fn(struct aws_allocator *allocator, void *ctx) { + (void)allocator; + struct mqtt_connection_state_test *state_test_data = ctx; + + struct aws_mqtt_connection_options connection_options = { + .user_data = state_test_data, + .clean_session = false, + .client_id = aws_byte_cursor_from_c_str("client1234"), + .host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address), + .socket_options = &state_test_data->socket_options, + .on_connection_complete = aws_test311_on_connection_complete_fn, + .ping_timeout_ms = DEFAULT_TEST_PING_TIMEOUT_MS, + .keep_alive_time_secs = 16960, /* basically stop automatically sending PINGREQ */ + }; + + struct aws_byte_cursor pub_topic = aws_byte_cursor_from_c_str("/test/topic"); + struct aws_byte_cursor payload_1 = aws_byte_cursor_from_c_str("Test Message 1"); + + ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options)); + aws_test311_wait_for_connection_to_complete(state_test_data); + + /* Disable the auto ACK packets sent by the server, which blocks the requests to complete */ + mqtt_mock_server_disable_auto_ack(state_test_data->mock_server); + + /* make a publish with QoS 1 immediate. */ + aws_mutex_lock(&state_test_data->lock); + state_test_data->expected_ops_completed = 1; + aws_mutex_unlock(&state_test_data->lock); + uint16_t packet_id_1 = aws_mqtt_client_connection_311_publish( + state_test_data->mqtt_connection->impl, + &pub_topic, + AWS_MQTT_QOS_AT_LEAST_ONCE, + false, + &payload_1, + aws_test311_on_op_complete, + state_test_data, + aws_timestamp_convert(3, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL)); + ASSERT_TRUE(packet_id_1 > 0); + + /* publish should complete after the shutdown */ + aws_test311_wait_for_ops_completed(state_test_data); + /* Check the publish has been completed with timeout error */ + ASSERT_UINT_EQUALS(state_test_data->op_complete_error, AWS_ERROR_MQTT_TIMEOUT); + ASSERT_SUCCESS(aws_mqtt_client_connection_disconnect( + state_test_data->mqtt_connection, aws_test311_on_disconnect_fn, state_test_data)); + aws_test311_wait_for_disconnect_to_complete(state_test_data); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE_FIXTURE( + mqtt_connection_publish_QoS1_timeout_override, + s_setup_mqtt_server_fn, + s_test_mqtt_connection_publish_QoS1_timeout_override_fn, + s_clean_up_mqtt_server_fn, + &test_data) + /** * Test that connection is healthy, user set the timeout for request, and timeout happens and the unsubscribe failed. */ -static int s_test_mqtt_connection_unsub_timeout_fn(struct aws_allocator *allocator, void *ctx) { +static int s_test_mqtt_connection_unsubscribe_timeout_fn(struct aws_allocator *allocator, void *ctx) { (void)allocator; struct mqtt_connection_state_test *state_test_data = ctx; @@ -2412,9 +2473,9 @@ static int s_test_mqtt_connection_unsub_timeout_fn(struct aws_allocator *allocat state_test_data->mqtt_connection, &pub_topic, aws_test311_on_op_complete, state_test_data); ASSERT_TRUE(unsub_packet_id > 0); - /* publish should complete after the shutdown */ + /* unsubscribe should complete after the timeout */ aws_test311_wait_for_ops_completed(state_test_data); - /* Check the publish has been completed with timeout error */ + /* Check that the unsubscribe has been completed with a timeout error */ ASSERT_UINT_EQUALS(state_test_data->op_complete_error, AWS_ERROR_MQTT_TIMEOUT); ASSERT_SUCCESS(aws_mqtt_client_connection_disconnect( state_test_data->mqtt_connection, aws_test311_on_disconnect_fn, state_test_data)); @@ -2424,9 +2485,328 @@ static int s_test_mqtt_connection_unsub_timeout_fn(struct aws_allocator *allocat } AWS_TEST_CASE_FIXTURE( - mqtt_connection_unsub_timeout, + mqtt_connection_unsubscribe_timeout, + s_setup_mqtt_server_fn, + s_test_mqtt_connection_unsubscribe_timeout_fn, + s_clean_up_mqtt_server_fn, + &test_data) + +/** + * Test that connection is healthy, user set the timeout for request, and timeout happens and the unsubscribe failed. + */ +static int s_test_mqtt_connection_unsubscribe_timeout_override_fn(struct aws_allocator *allocator, void *ctx) { + (void)allocator; + struct mqtt_connection_state_test *state_test_data = ctx; + + struct aws_mqtt_connection_options connection_options = { + .user_data = state_test_data, + .clean_session = false, + .client_id = aws_byte_cursor_from_c_str("client1234"), + .host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address), + .socket_options = &state_test_data->socket_options, + .on_connection_complete = aws_test311_on_connection_complete_fn, + .ping_timeout_ms = DEFAULT_TEST_PING_TIMEOUT_MS, + .keep_alive_time_secs = 16960, /* basically stop automatically sending PINGREQ */ + }; + + struct aws_byte_cursor pub_topic = aws_byte_cursor_from_c_str("/test/topic"); + + ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options)); + aws_test311_wait_for_connection_to_complete(state_test_data); + + /* Disable the auto ACK packets sent by the server, which blocks the requests to complete */ + mqtt_mock_server_disable_auto_ack(state_test_data->mock_server); + + aws_mutex_lock(&state_test_data->lock); + state_test_data->expected_ops_completed = 1; + aws_mutex_unlock(&state_test_data->lock); + + /* unsubscribe to the first topic */ + uint16_t unsub_packet_id = aws_mqtt_client_connection_311_unsubscribe( + state_test_data->mqtt_connection->impl, + &pub_topic, + aws_test311_on_op_complete, + state_test_data, + aws_timestamp_convert(3, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL)); + ASSERT_TRUE(unsub_packet_id > 0); + + /* unsubscribe should complete after the timeout */ + aws_test311_wait_for_ops_completed(state_test_data); + /* Check that the unsubscribe has been completed with a timeout error */ + ASSERT_UINT_EQUALS(state_test_data->op_complete_error, AWS_ERROR_MQTT_TIMEOUT); + ASSERT_SUCCESS(aws_mqtt_client_connection_disconnect( + state_test_data->mqtt_connection, aws_test311_on_disconnect_fn, state_test_data)); + aws_test311_wait_for_disconnect_to_complete(state_test_data); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE_FIXTURE( + mqtt_connection_unsubscribe_timeout_override, + s_setup_mqtt_server_fn, + s_test_mqtt_connection_unsubscribe_timeout_override_fn, + s_clean_up_mqtt_server_fn, + &test_data) + +/** + * Test that connection is healthy, user set the timeout for request, and timeout happens and the subscribe failed. + */ +static int s_test_mqtt_connection_subscribe_single_timeout_fn(struct aws_allocator *allocator, void *ctx) { + (void)allocator; + struct mqtt_connection_state_test *state_test_data = ctx; + + struct aws_mqtt_connection_options connection_options = { + .user_data = state_test_data, + .clean_session = false, + .client_id = aws_byte_cursor_from_c_str("client1234"), + .host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address), + .socket_options = &state_test_data->socket_options, + .on_connection_complete = aws_test311_on_connection_complete_fn, + .ping_timeout_ms = DEFAULT_TEST_PING_TIMEOUT_MS, + .protocol_operation_timeout_ms = 3000, + .keep_alive_time_secs = 16960, /* basically stop automatically sending PINGREQ */ + }; + + ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options)); + aws_test311_wait_for_connection_to_complete(state_test_data); + + /* Disable the auto ACK packets sent by the server, which blocks the requests to complete */ + mqtt_mock_server_disable_auto_ack(state_test_data->mock_server); + + /* subscribe */ + struct aws_byte_cursor pub_topic = aws_byte_cursor_from_c_str("/test/topic"); + uint16_t sub_packet_id = aws_mqtt_client_connection_subscribe( + state_test_data->mqtt_connection, + &pub_topic, + AWS_MQTT_QOS_AT_LEAST_ONCE, + aws_test311_on_publish_received, + state_test_data, + NULL, + aws_test311_on_suback, + state_test_data); + ASSERT_TRUE(sub_packet_id > 0); + + /* subscribe should complete after the timeout */ + aws_test311_wait_for_subscribe_to_complete(state_test_data); + /* Check that the subscribe has been completed with a timeout error */ + ASSERT_UINT_EQUALS(state_test_data->subscribe_complete_error, AWS_ERROR_MQTT_TIMEOUT); + + ASSERT_SUCCESS(aws_mqtt_client_connection_disconnect( + state_test_data->mqtt_connection, aws_test311_on_disconnect_fn, state_test_data)); + aws_test311_wait_for_disconnect_to_complete(state_test_data); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE_FIXTURE( + mqtt_connection_subscribe_single_timeout, + s_setup_mqtt_server_fn, + s_test_mqtt_connection_subscribe_single_timeout_fn, + s_clean_up_mqtt_server_fn, + &test_data) + +/** + * Test that connection is healthy, user set the timeout for request, and timeout happens and the subscribe failed. + */ +static int s_test_mqtt_connection_subscribe_single_timeout_override_fn(struct aws_allocator *allocator, void *ctx) { + (void)allocator; + struct mqtt_connection_state_test *state_test_data = ctx; + + struct aws_mqtt_connection_options connection_options = { + .user_data = state_test_data, + .clean_session = false, + .client_id = aws_byte_cursor_from_c_str("client1234"), + .host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address), + .socket_options = &state_test_data->socket_options, + .on_connection_complete = aws_test311_on_connection_complete_fn, + .ping_timeout_ms = DEFAULT_TEST_PING_TIMEOUT_MS, + .keep_alive_time_secs = 16960, /* basically stop automatically sending PINGREQ */ + }; + + ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options)); + aws_test311_wait_for_connection_to_complete(state_test_data); + + /* Disable the auto ACK packets sent by the server, which blocks the requests to complete */ + mqtt_mock_server_disable_auto_ack(state_test_data->mock_server); + + /* subscribe */ + struct aws_byte_cursor pub_topic = aws_byte_cursor_from_c_str("/test/topic"); + uint16_t sub_packet_id = aws_mqtt_client_connection_311_subscribe( + state_test_data->mqtt_connection->impl, + &pub_topic, + AWS_MQTT_QOS_AT_LEAST_ONCE, + aws_test311_on_publish_received, + state_test_data, + NULL, + aws_test311_on_suback, + state_test_data, + aws_timestamp_convert(3, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL)); + ASSERT_TRUE(sub_packet_id > 0); + + /* subscribe should complete after the timeout */ + aws_test311_wait_for_subscribe_to_complete(state_test_data); + /* Check that the subscribe has been completed with a timeout error */ + ASSERT_UINT_EQUALS(state_test_data->subscribe_complete_error, AWS_ERROR_MQTT_TIMEOUT); + + ASSERT_SUCCESS(aws_mqtt_client_connection_disconnect( + state_test_data->mqtt_connection, aws_test311_on_disconnect_fn, state_test_data)); + aws_test311_wait_for_disconnect_to_complete(state_test_data); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE_FIXTURE( + mqtt_connection_subscribe_single_timeout_override, + s_setup_mqtt_server_fn, + s_test_mqtt_connection_subscribe_single_timeout_override_fn, + s_clean_up_mqtt_server_fn, + &test_data) + +/** + * Test that connection is healthy, user set the timeout for request, and timeout happens and the multi-subscribe + * failed. + */ +static int s_test_mqtt_connection_subscribe_multi_timeout_fn(struct aws_allocator *allocator, void *ctx) { + (void)allocator; + struct mqtt_connection_state_test *state_test_data = ctx; + + struct aws_mqtt_connection_options connection_options = { + .user_data = state_test_data, + .clean_session = false, + .client_id = aws_byte_cursor_from_c_str("client1234"), + .host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address), + .socket_options = &state_test_data->socket_options, + .on_connection_complete = aws_test311_on_connection_complete_fn, + .ping_timeout_ms = DEFAULT_TEST_PING_TIMEOUT_MS, + .protocol_operation_timeout_ms = 3000, + .keep_alive_time_secs = 16960, /* basically stop automatically sending PINGREQ */ + }; + + ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options)); + aws_test311_wait_for_connection_to_complete(state_test_data); + + /* Disable the auto ACK packets sent by the server, which blocks the requests to complete */ + mqtt_mock_server_disable_auto_ack(state_test_data->mock_server); + + struct aws_byte_cursor sub_topic_1 = aws_byte_cursor_from_c_str("/test/topic1"); + struct aws_byte_cursor sub_topic_2 = aws_byte_cursor_from_c_str("/test/topic2"); + + struct aws_mqtt_topic_subscription sub1 = { + .topic = sub_topic_1, + .qos = AWS_MQTT_QOS_AT_LEAST_ONCE, + .on_publish = aws_test311_on_publish_received, + .on_cleanup = NULL, + .on_publish_ud = state_test_data, + }; + struct aws_mqtt_topic_subscription sub2 = { + .topic = sub_topic_2, + .qos = AWS_MQTT_QOS_AT_LEAST_ONCE, + .on_publish = aws_test311_on_publish_received, + .on_cleanup = NULL, + .on_publish_ud = state_test_data, + }; + + struct aws_array_list topic_filters; + size_t list_len = 2; + AWS_VARIABLE_LENGTH_ARRAY(uint8_t, static_buf, list_len * sizeof(struct aws_mqtt_topic_subscription)); + aws_array_list_init_static(&topic_filters, static_buf, list_len, sizeof(struct aws_mqtt_topic_subscription)); + + aws_array_list_push_back(&topic_filters, &sub1); + aws_array_list_push_back(&topic_filters, &sub2); + + uint16_t sub_packet_id = aws_mqtt_client_connection_subscribe_multiple( + state_test_data->mqtt_connection, &topic_filters, aws_test311_on_multi_suback, state_test_data); + + ASSERT_TRUE(sub_packet_id > 0); + + /* subscribe should complete after the timeout */ + aws_test311_wait_for_subscribe_to_complete(state_test_data); + /* Check that the subscribe has been completed with a timeout error */ + ASSERT_UINT_EQUALS(state_test_data->subscribe_complete_error, AWS_ERROR_MQTT_TIMEOUT); + + ASSERT_SUCCESS(aws_mqtt_client_connection_disconnect( + state_test_data->mqtt_connection, aws_test311_on_disconnect_fn, state_test_data)); + aws_test311_wait_for_disconnect_to_complete(state_test_data); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE_FIXTURE( + mqtt_connection_subscribe_multi_timeout, + s_setup_mqtt_server_fn, + s_test_mqtt_connection_subscribe_multi_timeout_fn, + s_clean_up_mqtt_server_fn, + &test_data) + +/** + * Test that connection is healthy, user set the timeout for request, and timeout happens and the subscribe failed. + */ +static int s_test_mqtt_connection_resubscribe_timeout_fn(struct aws_allocator *allocator, void *ctx) { + (void)allocator; + struct mqtt_connection_state_test *state_test_data = ctx; + + struct aws_mqtt_connection_options connection_options = { + .user_data = state_test_data, + .clean_session = false, + .client_id = aws_byte_cursor_from_c_str("client1234"), + .host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address), + .socket_options = &state_test_data->socket_options, + .on_connection_complete = aws_test311_on_connection_complete_fn, + .ping_timeout_ms = DEFAULT_TEST_PING_TIMEOUT_MS, + .protocol_operation_timeout_ms = 3000, + .keep_alive_time_secs = 16960, /* basically stop automatically sending PINGREQ */ + }; + + ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options)); + aws_test311_wait_for_connection_to_complete(state_test_data); + + /* subscribe */ + struct aws_byte_cursor pub_topic = aws_byte_cursor_from_c_str("/test/topic"); + uint16_t sub_packet_id = aws_mqtt_client_connection_subscribe( + state_test_data->mqtt_connection, + &pub_topic, + AWS_MQTT_QOS_AT_LEAST_ONCE, + aws_test311_on_publish_received, + state_test_data, + NULL, + aws_test311_on_suback, + state_test_data); + ASSERT_TRUE(sub_packet_id > 0); + + /* subscribe should complete after the timeout */ + aws_test311_wait_for_subscribe_to_complete(state_test_data); + ASSERT_UINT_EQUALS(state_test_data->subscribe_complete_error, AWS_ERROR_SUCCESS); + + ASSERT_SUCCESS(aws_mqtt_client_connection_disconnect( + state_test_data->mqtt_connection, aws_test311_on_disconnect_fn, state_test_data)); + aws_test311_wait_for_disconnect_to_complete(state_test_data); + + /* reconnection to the same server */ + ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options)); + aws_test311_wait_for_connection_to_complete(state_test_data); + + /* Disable the auto ACK packets sent by the server, which blocks the requests to complete */ + mqtt_mock_server_disable_auto_ack(state_test_data->mock_server); + + uint16_t resub_packet_id = aws_mqtt_resubscribe_existing_topics( + state_test_data->mqtt_connection, aws_test311_on_multi_suback, state_test_data); + ASSERT_TRUE(resub_packet_id > 0); + aws_test311_wait_for_subscribe_to_complete(state_test_data); + + /* Check that the resubscribe has been completed with a timeout error */ + ASSERT_UINT_EQUALS(state_test_data->subscribe_complete_error, AWS_ERROR_MQTT_TIMEOUT); + + ASSERT_SUCCESS(aws_mqtt_client_connection_disconnect( + state_test_data->mqtt_connection, aws_test311_on_disconnect_fn, state_test_data)); + aws_test311_wait_for_disconnect_to_complete(state_test_data); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE_FIXTURE( + mqtt_connection_resubscribe_timeout, s_setup_mqtt_server_fn, - s_test_mqtt_connection_unsub_timeout_fn, + s_test_mqtt_connection_resubscribe_timeout_fn, s_clean_up_mqtt_server_fn, &test_data) diff --git a/tests/v3/mqtt311_testing_utils.c b/tests/v3/mqtt311_testing_utils.c index 52a55e92..f6019c0e 100644 --- a/tests/v3/mqtt311_testing_utils.c +++ b/tests/v3/mqtt311_testing_utils.c @@ -538,6 +538,8 @@ void aws_test311_on_multi_suback( aws_mutex_lock(&state_test_data->lock); state_test_data->subscribe_completed = true; + state_test_data->subscribe_complete_error = error_code; + if (!error_code) { size_t length = aws_array_list_length(topic_subacks); for (size_t i = 0; i < length; ++i) {