Skip to content

Commit

Permalink
Mqtt5 operation timeout (#333)
Browse files Browse the repository at this point in the history
Co-authored-by: Bret Ambrose <[email protected]>
  • Loading branch information
bretambrose and Bret Ambrose authored Nov 7, 2023
1 parent c475ef1 commit ff4b239
Show file tree
Hide file tree
Showing 8 changed files with 570 additions and 63 deletions.
6 changes: 6 additions & 0 deletions include/aws/mqtt/private/v5/mqtt5_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,12 @@ struct aws_mqtt5_client_operational_state {
struct aws_linked_list unacked_operations;
struct aws_linked_list write_completion_operations;

/*
* heap of operation pointers where the timeout is the sort value. Elements are added/removed from this
* data structure in exact synchronization with unacked_operations_table.
*/
struct aws_priority_queue operations_by_ack_timeout;

/*
* Is there an io message in transit (to the socket) that has not invoked its write completion callback yet?
* The client implementation only allows one in-transit message at a time, and so if this is true, we don't
Expand Down
7 changes: 6 additions & 1 deletion include/aws/mqtt/private/v5/mqtt5_options_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ struct aws_mqtt5_operation_vtable {
int (*aws_mqtt5_operation_validate_vs_connection_settings_fn)(
const void *operation_packet_view,
const struct aws_mqtt5_client *client);

uint32_t (*aws_mqtt5_operation_get_ack_timeout_override_fn)(const struct aws_mqtt5_operation *operation);
};

/* Flags that indicate the way in which an operation is currently affecting the statistics of the client */
Expand All @@ -64,6 +66,7 @@ struct aws_mqtt5_operation {
const struct aws_mqtt5_operation_vtable *vtable;
struct aws_ref_count ref_count;
uint64_t ack_timeout_timepoint_ns;
struct aws_priority_queue_node priority_queue_node;
struct aws_linked_list_node node;

enum aws_mqtt5_packet_type packet_type;
Expand Down Expand Up @@ -163,7 +166,7 @@ struct aws_mqtt5_client_options_storage {
uint64_t max_reconnect_delay_ms;
uint64_t min_connected_time_to_reset_reconnect_delay_ms;

uint64_t ack_timeout_seconds;
uint32_t ack_timeout_seconds;

uint32_t ping_timeout_ms;
uint32_t connack_timeout_ms;
Expand Down Expand Up @@ -208,6 +211,8 @@ AWS_MQTT_API int aws_mqtt5_operation_validate_vs_connection_settings(
const struct aws_mqtt5_operation *operation,
const struct aws_mqtt5_client *client);

AWS_MQTT_API uint32_t aws_mqtt5_operation_get_ack_timeout_override(const struct aws_mqtt5_operation *operation);

/* Connect */

AWS_MQTT_API struct aws_mqtt5_operation_connect *aws_mqtt5_operation_connect_new(
Expand Down
14 changes: 10 additions & 4 deletions include/aws/mqtt/v5/mqtt5_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -344,31 +344,37 @@ typedef void(aws_mqtt5_client_termination_completion_fn)(void *complete_ctx);
/* operation completion options structures */

/**
* Completion callback options for the Publish operation
* Completion options for the Publish operation
*/
struct aws_mqtt5_publish_completion_options {
aws_mqtt5_publish_completion_fn *completion_callback;
void *completion_user_data;

uint32_t ack_timeout_seconds_override;
};

/**
* Completion callback options for the Subscribe operation
* Completion options for the Subscribe operation
*/
struct aws_mqtt5_subscribe_completion_options {
aws_mqtt5_subscribe_completion_fn *completion_callback;
void *completion_user_data;

uint32_t ack_timeout_seconds_override;
};

/**
* Completion callback options for the Unsubscribe operation
* Completion options for the Unsubscribe operation
*/
struct aws_mqtt5_unsubscribe_completion_options {
aws_mqtt5_unsubscribe_completion_fn *completion_callback;
void *completion_user_data;

uint32_t ack_timeout_seconds_override;
};

/**
* Public completion callback options for the a DISCONNECT operation
* Completion options for the a DISCONNECT operation
*/
struct aws_mqtt5_disconnect_completion_options {
aws_mqtt5_disconnect_completion_fn *completion_callback;
Expand Down
172 changes: 116 additions & 56 deletions source/v5/mqtt5_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,13 @@ static void s_complete_operation(
const void *view) {
if (client != NULL) {
aws_mqtt5_client_statistics_change_operation_statistic_state(client, operation, AWS_MQTT5_OSS_NONE);
if (aws_priority_queue_node_is_in_queue(&operation->priority_queue_node)) {
struct aws_mqtt5_operation *queued_operation = NULL;
aws_priority_queue_remove(
&client->operational_state.operations_by_ack_timeout,
&queued_operation,
&operation->priority_queue_node);
}
}

aws_mqtt5_operation_complete(operation, error_code, packet_type, view);
Expand All @@ -197,43 +204,47 @@ static void s_complete_operation_list(
}

static void s_check_timeouts(struct aws_mqtt5_client *client, uint64_t now) {
if (client->config->ack_timeout_seconds == 0) {
return;
}
struct aws_priority_queue *timeout_queue = &client->operational_state.operations_by_ack_timeout;

bool done = aws_priority_queue_size(timeout_queue) == 0;
while (!done) {
struct aws_mqtt5_operation **next_operation_by_timeout_ptr = NULL;
aws_priority_queue_top(timeout_queue, (void **)&next_operation_by_timeout_ptr);
AWS_FATAL_ASSERT(next_operation_by_timeout_ptr != NULL);
struct aws_mqtt5_operation *next_operation_by_timeout = *next_operation_by_timeout_ptr;
AWS_FATAL_ASSERT(next_operation_by_timeout != NULL);

// If the top of the heap hasn't timed out than nothing has
if (next_operation_by_timeout->ack_timeout_timepoint_ns > now) {
break;
}

struct aws_linked_list_node *node = aws_linked_list_begin(&client->operational_state.unacked_operations);
while (node != aws_linked_list_end(&client->operational_state.unacked_operations)) {
struct aws_mqtt5_operation *operation = AWS_CONTAINER_OF(node, struct aws_mqtt5_operation, node);
node = aws_linked_list_next(node);
if (operation->ack_timeout_timepoint_ns < now) {
/* Timeout for this packet has been reached */
aws_mqtt5_packet_id_t packet_id = aws_mqtt5_operation_get_packet_id(operation);
AWS_LOGF_INFO(
AWS_LS_MQTT5_CLIENT,
"id=%p: %s packet with id:%d has timed out",
(void *)client,
aws_mqtt5_packet_type_to_c_string(operation->packet_type),
(int)packet_id);

struct aws_hash_element *elem = NULL;
aws_hash_table_find(&client->operational_state.unacked_operations_table, &packet_id, &elem);

if (elem == NULL || elem->value == NULL) {
AWS_LOGF_ERROR(
AWS_LS_MQTT5_CLIENT,
"id=%p: timeout for unknown operation with id %d",
(void *)client,
(int)packet_id);
return;
}
/* Ack timeout for this operation has been reached */
aws_priority_queue_pop(timeout_queue, &next_operation_by_timeout);

aws_linked_list_remove(&operation->node);
aws_hash_table_remove(&client->operational_state.unacked_operations_table, &packet_id, NULL, NULL);
aws_mqtt5_packet_id_t packet_id = aws_mqtt5_operation_get_packet_id(next_operation_by_timeout);
AWS_LOGF_INFO(
AWS_LS_MQTT5_CLIENT,
"id=%p: %s packet with id:%d has timed out",
(void *)client,
aws_mqtt5_packet_type_to_c_string(next_operation_by_timeout->packet_type),
(int)packet_id);

s_complete_operation(client, operation, AWS_ERROR_MQTT_TIMEOUT, AWS_MQTT5_PT_NONE, NULL);
} else {
break;
struct aws_hash_element *elem = NULL;
aws_hash_table_find(&client->operational_state.unacked_operations_table, &packet_id, &elem);

if (elem == NULL || elem->value == NULL) {
AWS_LOGF_ERROR(
AWS_LS_MQTT5_CLIENT, "id=%p: timeout for unknown operation with id %d", (void *)client, (int)packet_id);
return;
}

aws_linked_list_remove(&next_operation_by_timeout->node);
aws_hash_table_remove(&client->operational_state.unacked_operations_table, &packet_id, NULL, NULL);

s_complete_operation(client, next_operation_by_timeout, AWS_ERROR_MQTT_TIMEOUT, AWS_MQTT5_PT_NONE, NULL);

done = aws_priority_queue_size(timeout_queue) == 0;
}
}

Expand Down Expand Up @@ -412,7 +423,11 @@ static uint64_t s_compute_next_service_time_client_mqtt_connect(struct aws_mqtt5
return aws_min_u64(client->next_mqtt_connect_packet_timeout_time, operation_processing_time);
}

static uint64_t s_min_non_0_64(uint64_t a, uint64_t b) {
/*
* Returns the minimum of two numbers, ignoring zero. Zero is returned only if both are zero. Useful when we're
* computing (next service) timepoints and zero means "no timepoint"
*/
static uint64_t s_min_non_zero_u64(uint64_t a, uint64_t b) {
if (a == 0) {
return b;
}
Expand All @@ -424,6 +439,19 @@ static uint64_t s_min_non_0_64(uint64_t a, uint64_t b) {
return aws_min_u64(a, b);
}

/*
* If there are unacked operations, returns the earliest point in time that one could timeout.
*/
static uint64_t s_get_unacked_operation_timeout_for_next_service_time(struct aws_mqtt5_client *client) {
if (aws_priority_queue_size(&client->operational_state.operations_by_ack_timeout) > 0) {
struct aws_mqtt5_operation **operation = NULL;
aws_priority_queue_top(&client->operational_state.operations_by_ack_timeout, (void **)&operation);
return (*operation)->ack_timeout_timepoint_ns;
}

return 0;
}

static uint64_t s_compute_next_service_time_client_connected(struct aws_mqtt5_client *client, uint64_t now) {

/* ping and ping timeout */
Expand All @@ -432,13 +460,8 @@ static uint64_t s_compute_next_service_time_client_connected(struct aws_mqtt5_cl
next_service_time = aws_min_u64(next_service_time, client->next_ping_timeout_time);
}

/* unacked operations timeout */
if (client->config->ack_timeout_seconds != 0 &&
!aws_linked_list_empty(&client->operational_state.unacked_operations)) {
struct aws_linked_list_node *node = aws_linked_list_begin(&client->operational_state.unacked_operations);
struct aws_mqtt5_operation *operation = AWS_CONTAINER_OF(node, struct aws_mqtt5_operation, node);
next_service_time = aws_min_u64(next_service_time, operation->ack_timeout_timepoint_ns);
}
next_service_time =
s_min_non_zero_u64(next_service_time, s_get_unacked_operation_timeout_for_next_service_time(client));

if (client->desired_state != AWS_MCS_CONNECTED) {
next_service_time = now;
Expand All @@ -447,29 +470,21 @@ static uint64_t s_compute_next_service_time_client_connected(struct aws_mqtt5_cl
uint64_t operation_processing_time =
s_aws_mqtt5_client_compute_operational_state_service_time(&client->operational_state, now);

next_service_time = s_min_non_0_64(operation_processing_time, next_service_time);
next_service_time = s_min_non_zero_u64(operation_processing_time, next_service_time);

/* reset reconnect delay interval */
next_service_time = s_min_non_0_64(client->next_reconnect_delay_reset_time_ns, next_service_time);
next_service_time = s_min_non_zero_u64(client->next_reconnect_delay_reset_time_ns, next_service_time);

return next_service_time;
}

static uint64_t s_compute_next_service_time_client_clean_disconnect(struct aws_mqtt5_client *client, uint64_t now) {
uint64_t ack_timeout_time = 0;

/* unacked operations timeout */
if (client->config->ack_timeout_seconds != 0 &&
!aws_linked_list_empty(&client->operational_state.unacked_operations)) {
struct aws_linked_list_node *node = aws_linked_list_begin(&client->operational_state.unacked_operations);
struct aws_mqtt5_operation *operation = AWS_CONTAINER_OF(node, struct aws_mqtt5_operation, node);
ack_timeout_time = operation->ack_timeout_timepoint_ns;
}
uint64_t ack_timeout_time = s_get_unacked_operation_timeout_for_next_service_time(client);

uint64_t operation_processing_time =
s_aws_mqtt5_client_compute_operational_state_service_time(&client->operational_state, now);

return s_min_non_0_64(ack_timeout_time, operation_processing_time);
return s_min_non_zero_u64(ack_timeout_time, operation_processing_time);
}

static uint64_t s_compute_next_service_time_client_channel_shutdown(struct aws_mqtt5_client *client, uint64_t now) {
Expand Down Expand Up @@ -587,8 +602,10 @@ static void s_aws_mqtt5_client_operational_state_reset(
s_complete_operation_list(client, &client_operational_state->unacked_operations, completion_error_code);

if (is_final) {
aws_priority_queue_clean_up(&client_operational_state->operations_by_ack_timeout);
aws_hash_table_clean_up(&client_operational_state->unacked_operations_table);
} else {
aws_priority_queue_clear(&client->operational_state.operations_by_ack_timeout);
aws_hash_table_clear(&client_operational_state->unacked_operations_table);
}
}
Expand Down Expand Up @@ -2497,6 +2514,25 @@ int aws_mqtt5_operation_bind_packet_id(
return AWS_OP_ERR;
}

/*
* Priority queue comparison function for ack timeout processing
*/
static int s_compare_operation_timeouts(const void *a, const void *b) {
const struct aws_mqtt5_operation **operation_a_ptr = (void *)a;
const struct aws_mqtt5_operation *operation_a = *operation_a_ptr;

const struct aws_mqtt5_operation **operation_b_ptr = (void *)b;
const struct aws_mqtt5_operation *operation_b = *operation_b_ptr;

if (operation_a->ack_timeout_timepoint_ns < operation_b->ack_timeout_timepoint_ns) {
return -1;
} else if (operation_a->ack_timeout_timepoint_ns > operation_b->ack_timeout_timepoint_ns) {
return 1;
} else {
return 0;
}
}

int aws_mqtt5_client_operational_state_init(
struct aws_mqtt5_client_operational_state *client_operational_state,
struct aws_allocator *allocator,
Expand All @@ -2517,6 +2553,15 @@ int aws_mqtt5_client_operational_state_init(
return AWS_OP_ERR;
}

if (aws_priority_queue_init_dynamic(
&client_operational_state->operations_by_ack_timeout,
allocator,
100,
sizeof(struct aws_mqtt5_operation *),
s_compare_operation_timeouts)) {
return AWS_OP_ERR;
}

client_operational_state->next_mqtt_packet_id = 1;
client_operational_state->current_operation = NULL;
client_operational_state->client = client;
Expand Down Expand Up @@ -2631,6 +2676,7 @@ void aws_mqtt5_client_on_disconnection_update_operational_state(struct aws_mqtt5
client, &operations_to_fail, AWS_ERROR_MQTT5_OPERATION_FAILED_DUE_TO_OFFLINE_QUEUE_POLICY);

aws_hash_table_clear(&client->operational_state.unacked_operations_table);
aws_priority_queue_clear(&client->operational_state.operations_by_ack_timeout);

/*
* Prevents inbound resolution on the highly unlikely, illegal server behavior of sending a PUBLISH before
Expand Down Expand Up @@ -3065,10 +3111,24 @@ int aws_mqtt5_client_service_operational_state(struct aws_mqtt5_client_operation
break;
}

if (client->config->ack_timeout_seconds != 0) {
uint32_t ack_timeout_seconds = aws_mqtt5_operation_get_ack_timeout_override(current_operation);
if (ack_timeout_seconds == 0) {
ack_timeout_seconds = client->config->ack_timeout_seconds;
}

if (ack_timeout_seconds > 0) {
current_operation->ack_timeout_timepoint_ns =
now + aws_timestamp_convert(
client->config->ack_timeout_seconds, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL);
now + aws_timestamp_convert(ack_timeout_seconds, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL);
} else {
current_operation->ack_timeout_timepoint_ns = UINT64_MAX;
}

if (aws_priority_queue_push_ref(
&client_operational_state->operations_by_ack_timeout,
(void *)&current_operation,
&current_operation->priority_queue_node)) {
operational_error_code = aws_last_error();
break;
}

aws_linked_list_push_back(&client_operational_state->unacked_operations, &current_operation->node);
Expand Down
Loading

0 comments on commit ff4b239

Please sign in to comment.