Skip to content

Commit

Permalink
Add timeout and service task support to the request-response client
Browse files Browse the repository at this point in the history
  • Loading branch information
bretambrose committed Mar 14, 2024
1 parent 3d50374 commit ccf6ac8
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 10 deletions.
1 change: 1 addition & 0 deletions include/aws/mqtt/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ enum aws_mqtt_error {
AWS_ERROR_MQTT_CONNECTION_SUBSCRIBE_FAILURE,
AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE,
AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN,
AWS_ERROR_MQTT_REQUEST_RESPONSE_TIMEOUT,

AWS_ERROR_END_MQTT_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_MQTT_PACKAGE_ID),
};
Expand Down
4 changes: 4 additions & 0 deletions source/mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ bool aws_mqtt_is_valid_topic_filter(const struct aws_byte_cursor *topic_filter)
AWS_DEFINE_ERROR_INFO_MQTT(
AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN,
"Request operation failed due to client shut down"),
AWS_DEFINE_ERROR_INFO_MQTT(
AWS_ERROR_MQTT_REQUEST_RESPONSE_TIMEOUT,
"Request operation failed due to timeout"),

};
/* clang-format on */
#undef AWS_DEFINE_ERROR_INFO_MQTT
Expand Down
163 changes: 159 additions & 4 deletions source/request-response/request_response_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <aws/mqtt/request-response/request_response_client.h>

#include <aws/common/clock.h>
#include <aws/common/ref_count.h>
#include <aws/common/task_scheduler.h>
#include <aws/io/event_loop.h>
Expand All @@ -13,6 +14,8 @@
#include <aws/mqtt/private/request-response/subscription_manager.h>
#include <aws/mqtt/private/v5/mqtt5_client_impl.h>

#include <inttypes.h>

#define MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE 50

enum aws_mqtt_request_response_operation_type {
Expand Down Expand Up @@ -303,6 +306,9 @@ struct aws_mqtt_rr_client_operation {
struct aws_mqtt_request_operation_storage request_storage;
} storage;

uint64_t timeout_timepoint_ns;
struct aws_priority_queue_node priority_queue_node;

/* Sometimes this is client->operation_queue, other times it is an entry in the client's topic_filter table */
struct aws_linked_list_node node;

Expand Down Expand Up @@ -370,6 +376,9 @@ struct aws_mqtt_request_response_client {
struct aws_task external_shutdown_task;
struct aws_task internal_shutdown_task;

uint64_t scheduled_service_timepoint_ns;
struct aws_task service_task;

enum aws_request_response_client_state state;

struct aws_atomic_var next_id;
Expand All @@ -378,6 +387,13 @@ struct aws_mqtt_request_response_client {

/* &operation->id -> &operation */
struct aws_hash_table operations;

/*
* heap of operation pointers where the timeout is the sort value. Elements are added to this on operation
* submission and removed on operation timeout/completion/termination. Request-response operations have actual
* timeouts, while streaming operations have UINT64_MAX timeouts.
*/
struct aws_priority_queue operations_by_timeout;
};

static void s_aws_rr_client_on_zero_internal_ref_count(void *context) {
Expand All @@ -401,6 +417,8 @@ static void s_mqtt_request_response_client_final_destroy(struct aws_mqtt_request
AWS_FATAL_ASSERT(aws_hash_table_get_entry_count(&client->operations) == 0);
aws_hash_table_clean_up(&client->operations);

aws_priority_queue_clean_up(&client->operations_by_timeout);

aws_mem_release(client->allocator, client);

if (terminate_callback != NULL) {
Expand All @@ -421,6 +439,15 @@ static void s_mqtt_request_response_client_internal_shutdown_task_fn(
s_mqtt_request_response_client_final_destroy(client);
}

static void s_remove_operation_from_timeout_queue(struct aws_mqtt_rr_client_operation *operation) {
struct aws_mqtt_request_response_client *client = operation->client_internal_ref;

if (aws_priority_queue_node_is_in_queue(&operation->priority_queue_node)) {
struct aws_mqtt_rr_client_operation *queued_operation = NULL;
aws_priority_queue_remove(&client->operations_by_timeout, &queued_operation, &operation->priority_queue_node);
}
}

static void s_complete_request_operation_with_failure(struct aws_mqtt_rr_client_operation *operation, int error_code) {
AWS_FATAL_ASSERT(operation->type == AWS_MRROT_REQUEST);
AWS_FATAL_ASSERT(error_code != AWS_ERROR_SUCCESS);
Expand Down Expand Up @@ -515,6 +542,20 @@ static void s_mqtt_request_response_client_external_shutdown_task_fn(
aws_ref_count_release(&client->internal_ref_count);
}

static void s_mqtt_request_response_client_wake_service(struct aws_mqtt_request_response_client *client) {
uint64_t now = 0;
aws_high_res_clock_get_ticks(&now);

if (client->scheduled_service_timepoint_ns == 0 || now < client->scheduled_service_timepoint_ns) {
if (now < client->scheduled_service_timepoint_ns) {
aws_event_loop_cancel_task(client->loop, &client->service_task);
}

client->scheduled_service_timepoint_ns = now;
aws_event_loop_schedule_task_now(client->loop, &client->service_task);
}
}

static void s_aws_rr_client_subscription_status_event_callback(
const struct aws_rr_subscription_status_event *event,
void *userdata) {
Expand Down Expand Up @@ -595,6 +636,22 @@ bool aws_mqtt_compare_uint64_t_eq(const void *a, const void *b) {
return *(uint64_t *)a == *(uint64_t *)b;
}

static int s_compare_rr_operation_timeouts(const void *a, const void *b) {
const struct aws_mqtt_rr_client_operation **operation_a_ptr = (void *)a;
const struct aws_mqtt_rr_client_operation *operation_a = *operation_a_ptr;

const struct aws_mqtt_rr_client_operation **operation_b_ptr = (void *)b;
const struct aws_mqtt_rr_client_operation *operation_b = *operation_b_ptr;

if (operation_a->timeout_timepoint_ns < operation_b->timeout_timepoint_ns) {
return -1;
} else if (operation_a->timeout_timepoint_ns > operation_b->timeout_timepoint_ns) {
return 1;
} else {
return 0;
}
}

static struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_client_new(
struct aws_allocator *allocator,
const struct aws_mqtt_request_response_client_options *options,
Expand Down Expand Up @@ -632,6 +689,13 @@ static struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_clie
NULL,
NULL);

aws_priority_queue_init_dynamic(
&rr_client->operations_by_timeout,
allocator,
100,
sizeof(struct aws_mqtt_rr_client_operation *),
s_compare_rr_operation_timeouts);

aws_linked_list_init(&rr_client->operation_queue);

aws_task_init(
Expand Down Expand Up @@ -673,6 +737,81 @@ static void s_aws_rr_client_init_subscription_manager(
&rr_client->subscription_manager, allocator, rr_client->client_adapter, &subscription_manager_options);
}

static void s_check_for_operation_timeouts(struct aws_mqtt_request_response_client *client) {
uint64_t now = 0;
aws_high_res_clock_get_ticks(&now);

struct aws_priority_queue *timeout_queue = &client->operations_by_timeout;

bool done = aws_priority_queue_size(timeout_queue) == 0;
while (!done) {
struct aws_mqtt_rr_client_operation **next_operation_by_timeout_ptr = NULL;
aws_priority_queue_top(timeout_queue, (void **)&next_operation_by_timeout_ptr);
AWS_FATAL_ASSERT(next_operation_by_timeout_ptr != NULL);
struct aws_mqtt_rr_client_operation *next_operation_by_timeout = *next_operation_by_timeout_ptr;
AWS_FATAL_ASSERT(next_operation_by_timeout != NULL);

// If the current top of the heap hasn't timed out than nothing has
if (next_operation_by_timeout->timeout_timepoint_ns > now) {
break;
}

/* Ack timeout for this operation has been reached */
aws_priority_queue_pop(timeout_queue, &next_operation_by_timeout);

AWS_LOGF_INFO(
AWS_LS_MQTT_REQUEST_RESPONSE,
"id=%p: request operation with id:%" PRIu64 " has timed out",
(void *)client,
next_operation_by_timeout->id);

s_complete_request_operation_with_failure(next_operation_by_timeout, AWS_ERROR_MQTT_REQUEST_RESPONSE_TIMEOUT);

done = aws_priority_queue_size(timeout_queue) == 0;
}
}

static uint64_t s_mqtt_request_response_client_get_next_service_time(struct aws_mqtt_request_response_client *client) {
if (aws_priority_queue_size(&client->operations_by_timeout) > 0) {
struct aws_mqtt_rr_client_operation **next_operation_by_timeout_ptr = NULL;
aws_priority_queue_top(&client->operations_by_timeout, (void **)&next_operation_by_timeout_ptr);
AWS_FATAL_ASSERT(next_operation_by_timeout_ptr != NULL);
struct aws_mqtt_rr_client_operation *next_operation_by_timeout = *next_operation_by_timeout_ptr;
AWS_FATAL_ASSERT(next_operation_by_timeout != NULL);

return next_operation_by_timeout->timeout_timepoint_ns;
}

return UINT64_MAX;
}

static void s_mqtt_request_response_service_task_fn(
struct aws_task *task,
void *arg,
enum aws_task_status task_status) {
(void)task;

if (task_status == AWS_TASK_STATUS_CANCELED) {
return;
}

struct aws_mqtt_request_response_client *client = arg;
client->scheduled_service_timepoint_ns = 0;

if (client->state == AWS_RRCS_ACTIVE) {

// timeouts
s_check_for_operation_timeouts(client);

// TODO: operation intake and service

// schedule next service
client->scheduled_service_timepoint_ns = s_mqtt_request_response_client_get_next_service_time(client);
aws_event_loop_schedule_task_future(
client->loop, &client->service_task, client->scheduled_service_timepoint_ns);
}
}

static void s_mqtt_request_response_client_initialize_task_fn(
struct aws_task *task,
void *arg,
Expand All @@ -687,6 +826,11 @@ static void s_mqtt_request_response_client_initialize_task_fn(
s_aws_rr_client_init_subscription_manager(client, client->allocator);

client->state = AWS_RRCS_ACTIVE;

aws_task_init(&client->service_task, s_mqtt_request_response_service_task_fn, client, "mqtt_rr_client_service");

aws_event_loop_schedule_task_future(client->loop, &client->service_task, UINT64_MAX);
client->scheduled_service_timepoint_ns = UINT64_MAX;
}

if (client->config.initialized_callback != NULL) {
Expand Down Expand Up @@ -913,24 +1057,26 @@ static void s_mqtt_rr_client_submit_operation(struct aws_task *task, void *arg,
(void)task;

struct aws_mqtt_rr_client_operation *operation = arg;
struct aws_mqtt_request_response_client *client = operation->client_internal_ref;

if (status == AWS_TASK_STATUS_CANCELED) {
goto done;
}

// add appropriate client table entries
aws_hash_table_put(&operation->client_internal_ref->operations, &operation->id, operation, NULL);
aws_hash_table_put(&client->operations, &operation->id, operation, NULL);

// NYI other tables

// NYI set up timeout
// add to timeout priority queue
aws_priority_queue_push_ref(&client->operations_by_timeout, (void *)&operation, &operation->priority_queue_node);

// enqueue
aws_linked_list_push_back(&operation->client_internal_ref->operation_queue, &operation->node);

operation->state = AWS_MRROS_QUEUED;

// NYI wake service
s_mqtt_request_response_client_wake_service(operation->client_internal_ref);

done:

Expand Down Expand Up @@ -972,6 +1118,7 @@ static void s_mqtt_rr_client_destroy_operation(struct aws_task *task, void *arg,
struct aws_mqtt_request_response_client *client = operation->client_internal_ref;

aws_hash_table_remove(&client->operations, &operation->id, NULL, NULL);
s_remove_operation_from_timeout_queue(operation);

aws_linked_list_remove(&operation->node);

Expand All @@ -986,7 +1133,6 @@ static void s_mqtt_rr_client_destroy_operation(struct aws_task *task, void *arg,
/*
NYI:
Remove from timeout tracking
Remove from topic filter table
Remove from correlation token table
Expand Down Expand Up @@ -1115,11 +1261,19 @@ int aws_mqtt_request_response_client_submit_request(
return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
}

uint64_t now = 0;
if (aws_high_res_clock_get_ticks(&now)) {
return aws_raise_error(AWS_ERROR_CLOCK_FAILURE);
}

struct aws_allocator *allocator = client->allocator;
struct aws_mqtt_rr_client_operation *operation =
aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_rr_client_operation));
operation->allocator = allocator;
operation->type = AWS_MRROT_REQUEST;
operation->timeout_timepoint_ns =
now +
aws_timestamp_convert(client->config.operation_timeout_seconds, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL);

s_aws_mqtt_request_operation_storage_init_from_options(
&operation->storage.request_storage, allocator, request_options);
Expand Down Expand Up @@ -1163,6 +1317,7 @@ struct aws_mqtt_rr_client_operation *aws_mqtt_request_response_client_create_str
aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_rr_client_operation));
operation->allocator = allocator;
operation->type = AWS_MRROT_STREAMING;
operation->timeout_timepoint_ns = UINT64_MAX;

s_aws_mqtt_streaming_operation_storage_init_from_options(
&operation->storage.streaming_storage, allocator, streaming_options);
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ add_test_case(rrc_submit_streaming_operation_failure_invalid_subscription_topic_

add_test_case(rrc_submit_request_operation_failure_by_shutdown)
add_test_case(rrc_submit_streaming_operation_and_shutdown)
add_test_case(rrc_submit_request_operation_failure_by_timeout)

generate_test_driver(${PROJECT_NAME}-tests)

Expand Down
Loading

0 comments on commit ccf6ac8

Please sign in to comment.