From 545aba0b0397a01b7260e8884ce85d509aa5f0ca Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Wed, 6 Mar 2024 15:28:40 -0800 Subject: [PATCH] Checkpoint --- .../request-response/request_response.h | 33 +++ .../request-response/subscription_manager.h | 22 +- .../request_response_client.h | 71 ++++- .../request_response_client.c | 268 ++++++++++++++++++ 4 files changed, 371 insertions(+), 23 deletions(-) create mode 100644 include/aws/mqtt/private/request-response/request_response.h diff --git a/include/aws/mqtt/private/request-response/request_response.h b/include/aws/mqtt/private/request-response/request_response.h new file mode 100644 index 00000000..ef58ce9d --- /dev/null +++ b/include/aws/mqtt/private/request-response/request_response.h @@ -0,0 +1,33 @@ +#ifndef AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_RESPONSE_H +#define AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_RESPONSE_H + +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include + +/* + * Describes a change to the state of a request-response client subscription + */ +enum aws_rr_subscription_event_type { + + /* + * A subscribe succeeded + */ + ARRSET_SUBSCRIPTION_SUBSCRIBE_SUCCESS, + + /* + * A subscribe failed + */ + ARRSET_SUBSCRIPTION_SUBSCRIBE_FAILURE, + + /* + * A previously successful subscription has ended (generally due to a failure to resume a session) + */ + ARRSET_SUBSCRIPTION_ENDED +}; + +#endif /* AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_RESPONSE_H */ + diff --git a/include/aws/mqtt/private/request-response/subscription_manager.h b/include/aws/mqtt/private/request-response/subscription_manager.h index 605a4994..61d0e400 100644 --- a/include/aws/mqtt/private/request-response/subscription_manager.h +++ b/include/aws/mqtt/private/request-response/subscription_manager.h @@ -9,32 +9,12 @@ #include #include +#include struct aws_mqtt_protocol_adapter; struct aws_protocol_adapter_connection_event; struct aws_protocol_adapter_subscription_event; -/* - * The kind of subscription event being emitted. - */ -enum aws_rr_subscription_event_type { - - /* - * A subscribe succeeded - */ - ARRSET_SUBSCRIPTION_SUBSCRIBE_SUCCESS, - - /* - * A subscribe failed - */ - ARRSET_SUBSCRIPTION_SUBSCRIBE_FAILURE, - - /* - * A previously successful subscription has ended (generally due to a failure to resume a session) - */ - ARRSET_SUBSCRIPTION_ENDED -}; - struct aws_rr_subscription_status_event { enum aws_rr_subscription_event_type type; struct aws_byte_cursor topic_filter; diff --git a/include/aws/mqtt/request-response/request_response_client.h b/include/aws/mqtt/request-response/request_response_client.h index 001f20e2..7c0e570f 100644 --- a/include/aws/mqtt/request-response/request_response_client.h +++ b/include/aws/mqtt/request-response/request_response_client.h @@ -6,11 +6,66 @@ * SPDX-License-Identifier: Apache-2.0. */ -#include "aws/mqtt/mqtt.h" +#include + +#include struct aws_mqtt_request_response_client; struct aws_mqtt_client_connection; struct aws_mqtt5_client; +struct aws_mqtt_streaming_operation; + +struct aws_mqtt_request_operation_message_path { + struct aws_byte_cursor topic; + + /* potential point of expansion into an abstract "extractor" if we ever need to support non-JSON payloads */ + struct aws_byte_cursor correlation_token_json_path; +}; + +typedef void(aws_mqtt_request_operation_completion_fn)(int error_code, struct aws_byte_cursor payload, void *user_data); + +struct aws_mqtt_request_operation_options { + struct aws_byte_cursor subscription_topic_filter; + + struct aws_mqtt_request_operation_message_path *message_paths; + size_t message_path_count; + + struct aws_byte_cursor publish_topic; + struct aws_byte_cursor serialized_request; + struct aws_byte_cursor correlation_token; + + aws_mqtt_request_operation_completion_fn *completion_callback; + void *user_data; +}; + +struct aws_mqtt_request_operation_storage { + struct aws_mqtt_request_operation_options options; + + struct aws_array_list operation_message_paths; + + struct aws_byte_buf operation_data; +}; + +typedef void(aws_mqtt_streaming_operation_subscription_status_fn)(enum aws_rr_subscription_event_type status, int error_code, void *user_data); +typedef void(aws_mqtt_streaming_operation_incoming_publish_fn)(struct aws_byte_cursor payload, void *user_data); +typedef void(aws_mqtt_streaming_operation_terminated_fn)(void *user_data); + +struct aws_mqtt_streaming_operation_options { + struct aws_byte_cursor topic_filter; + + aws_mqtt_streaming_operation_subscription_status_fn *subscription_status_callback; + aws_mqtt_streaming_operation_incoming_publish_fn *incoming_publish_callback; + aws_mqtt_streaming_operation_terminated_fn *terminated_callback; + + void *user_data; +}; + +struct aws_mqtt_streaming_operation_storage { + struct aws_mqtt_streaming_operation_options options; + + struct aws_byte_buf operation_data; +}; + typedef void(aws_mqtt_request_response_client_initialized_callback_fn)(void *user_data); typedef void(aws_mqtt_request_response_client_terminated_callback_fn)(void *user_data); @@ -21,8 +76,8 @@ struct aws_mqtt_request_response_client_options { /* Do not bind the initialized callback; it exists mostly for tests and should not be exposed */ aws_mqtt_request_response_client_initialized_callback_fn *initialized_callback; - aws_mqtt_request_response_client_terminated_callback_fn *terminated_callback; + void *user_data; }; @@ -56,6 +111,18 @@ AWS_MQTT_API struct aws_mqtt_request_response_client *aws_mqtt_request_response_ AWS_MQTT_API struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_release( struct aws_mqtt_request_response_client *client); +AWS_MQTT_API int aws_mqtt_request_response_client_submit_request( + struct aws_mqtt_request_response_client *client, + struct aws_mqtt_request_operation_options *request_options); + +AWS_MQTT_API struct aws_mqtt_streaming_operation *aws_mqtt_request_response_client_create_streaming_operation( + struct aws_mqtt_request_response_client *client, + struct aws_mqtt_streaming_operation_options *streaming_options); + +AWS_MQTT_API struct aws_mqtt_streaming_operation *aws_mqtt_streaming_operation_acquire(struct aws_mqtt_streaming_operation *operation); + +AWS_MQTT_API struct aws_mqtt_streaming_operation *aws_mqtt_streaming_operation_release(struct aws_mqtt_streaming_operation *operation); + AWS_EXTERN_C_END #endif /* AWS_MQTT_REQUEST_RESPONSE_REQUEST_RESPONSE_CLIENT_H */ diff --git a/source/request-response/request_response_client.c b/source/request-response/request_response_client.c index 7b43f452..7ad5b6db 100644 --- a/source/request-response/request_response_client.c +++ b/source/request-response/request_response_client.c @@ -13,6 +13,243 @@ #include #include +enum aws_mqtt_request_response_operation_type { + AWS_MRROT_REQUEST, + AWS_MRROT_STREAMING, +}; + +enum aws_mqtt_request_response_operation_state { + AWS_MRROS_NONE, // creation -> in event loop enqueue + AWS_MRROS_QUEUED, // in event loop queue -> non blocked response from subscription manager + AWS_MRROS_PENDING_SUBSCRIPTION, // subscribing response from sub manager -> subscription success/failure event + AWS_MRROS_AWAITING_RESPONSE, // (request only) subscription success -> (publish failure OR correlated response received) + AWS_MRROS_SUBSCRIBED, // (streaming only) subscription success -> (operation finished OR subscription ended event) + AWS_MRROS_TERMINAL, // (streaming only) (subscription failure OR subscription ended) -> operation close/terminate +}; + +/* + +Tables/Lookups + + (Authoritative operation container) + 1. &operation.id -> &operation // added on in-thread enqueue, removed on operation completion/destruction + + (Response topic -> Correlation token extraction info) + 2. &topic -> &{topic, topic_buffer, correlation token json path buffer} // per-message-path add/replace on in-thread enqueue, removed on client destruction + + (Correlation token -> request operation) + 3. &operation.correlation token -> (request) &operation // added on in-thread request op move to awaiting response state, removed on operation completion/destruction + + (Subscription filter -> all operations using that filter) + 4. &topic_filter -> &{topic_filter, linked_list} // added on in-thread pop from queue, removed from list on operation completion/destruction also checked for empty and removed from table + +*/ + +/* All operations have an internal ref to the client they are a part of */ + +/* + On submit request operation API [Anywhere]: + + Allocate id + Create operation + Submit cross-thread task + + */ + +/* + On submit streaming operation API [Anywhere]: + + Allocate id + Create operation + Submit cross-thread task + Return (ref-counted) operation + + */ + +/* + On receive operation [Event Loop, top-level task]: + + Add to operations table + (Request) Add message paths to path table if no exist or different value + Add to timeout priority queue + Add operation to end of queue list + state <- QUEUED + Wake service task + + */ + +/* + Complete (request) operation [Event Loop]: + + Completion Callback (Success/Failure) + State <- TERMINAL + Decref operation + */ + +/* + On operation ref to zero [Anywhere]: + + Submit cross-thread task to destroy operation (operation terminate callback chains directly to the binding) + */ + +/* + On operation destroy [Event Loop, top-level task]: + + Remove from operations table + Remove from intrusive list + (Request only) Remove from correlation token table + (Streaming only) Check streaming topic table for empty list, remove entry if so + Remove from timeout priority queue + Remove from subscription manager + Wake service task // What if this is the last internal ref? Should service task have an internal reference while scheduled? + (Streaming) Invoke termination callback + Release client internal ref + + */ + +/* + On incoming publish [Event Loop]: + + If topic in streaming routes table + for all streaming operations in list + if operation.state == SUBSCRIBED + invoke publish received callback + + If topic in paths table: + If correlation token extraction success + If entry exists in correlation token table + Complete operation with publish payload + */ + +/* + On Publish completion [Event Loop]: + + If Error + Complete and Fail Operation(id) + + */ + + +/* + On protocol adapter connection event [Event Loop]: + + Notify subscription manager + Wake service task + */ + +/* + On subscription status event [Event Loop, top-level task]: + + For all streaming operations in topic_filter table list: + If Success and state == SUBSCRIBING + state <- SUBSCRIBED + Invoke Success/Failure callback with success + Else If Failure and state == SUBSCRIBING + state <- TERMINAL + Invoke Success/Failure callback with failure + Else if Subscription Ended and state != TERMINAL + state <- TERMINAL + Invoke Ended callback + If Failure or Ended: + sub manager release_subscription(operation id) + + For all request operations in request topic filter list: + If Success and state == SUBSCRIBING + MakeRequest(op) + + If Failure or Ended + Complete operation with failure + + */ + +/* + MakeRequest(op) [Event Loop]: + + state <- AWAITING_RESPONSE + if publish fails synchronously + Complete operation with failure + Decref(op) + */ + +/* + Handle acquire sub result(op, result) [Event Loop, Service Task Loop]: + + If result == {No Capacity, Failure} + If op is streaming + Invoke failure callback + state <- TERMINAL + else + Complete operation with failure + Decref + return + + If streaming + Add operation to topic filter table + State <- {SUBSCRIBING, SUBSCRIBED} + + If request + Add operation to topic filter table + if result == SUBSCRIBING + state <- SUBSCRIBING + else // (SUBSCRIBED) + MakeRequest(op) + + + */ +/* + Service task [Event Loop]: + + For all timed out operations: + Invoke On Operation Timeout + + While OperationQueue is not empty: + op = peek queue + result = subscription manager acquire sub(op) + if result == Blocked + break + pop op + handle acquire sub result (op, result) + + Reschedule Service for next timeout if it exists + */ + +/* + On operation timeout [Event Loop, Service Task Loop]: + + If request + Complete with failure + Decref-op + If streaming and state != {SUBSCRIBED, TERMINAL} + state <- TERMINAL + Invoke failure callback + + */ + +struct aws_mqtt_rr_client_operation { + struct aws_allocator *allocator; + + struct aws_ref_count ref_count; + + struct aws_mqtt_request_response_client *internal_client_ref; + + uint64_t id; + + enum aws_mqtt_request_response_operation_type type; + + union { + struct aws_mqtt_streaming_operation_storage streaming_storage; + struct aws_mqtt_request_operation_storage request_storage; + } storage; + + uint64_t ack_timeout_timepoint_ns; + struct aws_priority_queue_node priority_queue_node; + struct aws_linked_list_node node; + + enum aws_mqtt_request_response_operation_state state; +}; + +/*******************************************************************************************/ + /* Tracks the current state of the request-response client */ enum aws_request_response_client_state { @@ -386,3 +623,34 @@ struct aws_mqtt_request_response_client *aws_mqtt_request_response_client_releas return NULL; } + +int aws_mqtt_request_response_client_submit_request( + struct aws_mqtt_request_response_client *client, + struct aws_mqtt_request_operation_options *request_options) { + (void)client; + (void)request_options; + + return aws_raise_error(AWS_ERROR_UNIMPLEMENTED); +} + +AWS_MQTT_API struct aws_mqtt_streaming_operation *aws_mqtt_request_response_client_create_streaming_operation( + struct aws_mqtt_request_response_client *client, + struct aws_mqtt_streaming_operation_options *streaming_options) { + + (void)client; + (void)streaming_options; + + return NULL; +} + +AWS_MQTT_API struct aws_mqtt_streaming_operation *aws_mqtt_streaming_operation_acquire(struct aws_mqtt_streaming_operation *operation) { + (void)operation; + + return NULL; +} + +AWS_MQTT_API struct aws_mqtt_streaming_operation *aws_mqtt_streaming_operation_release(struct aws_mqtt_streaming_operation *operation) { + (void)operation; + + return NULL; +}