Skip to content

Commit

Permalink
Update subscription manager and associated systems to support durable…
Browse files Browse the repository at this point in the history
…/indefined streaming operations
  • Loading branch information
bretambrose committed Mar 13, 2024
1 parent 15e8a32 commit 8de5a00
Show file tree
Hide file tree
Showing 8 changed files with 798 additions and 137 deletions.
39 changes: 32 additions & 7 deletions include/aws/mqtt/private/request-response/request_response.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,49 @@
#include <aws/mqtt/mqtt.h>

/*
* Describes a change to the state of a request-response client subscription
* Describes a change to the state of a request operation subscription
*/
enum aws_rr_subscription_event_type {

/*
* A subscribe succeeded
* A request subscription subscribe succeeded
*/
ARRSET_SUBSCRIPTION_SUBSCRIBE_SUCCESS,
ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS,

/*
* A subscribe failed
* A request subscription subscribe failed
*/
ARRSET_SUBSCRIPTION_SUBSCRIBE_FAILURE,
ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_FAILURE,

/*
* A previously successful subscription has ended (generally due to a failure to resume a session)
* A previously successful request subscription has ended.
*
* Under normal circumstances this can happen when
*
* (1) failure to rejoin a session
* (2) a successful unsubscribe when the subscription is no longer needed
*/
ARRSET_SUBSCRIPTION_ENDED
ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIPTION_ENDED,

/*
* A streaming subscription subscribe succeeded
*/
ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED,

/*
* The protocol client failed to rejoin a session containing a previously-established streaming subscription
*/
ARRSET_STREAMING_SUBSCRIPTION_LOST,

/*
* A streaming subscription subscribe attempt resulted in an error or reason code that the client has determined
* will result in indefinite failures to subscribe. In this case, we stop attempting to resubscribe.
*
* Situations that can lead to this:
* (1) Permission failures
* (2) Invalid topic filter
*/
ARRSET_STREAMING_SUBSCRIPTION_HALTED
};

#endif /* AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_RESPONSE_H */
30 changes: 30 additions & 0 deletions source/request-response/protocol_adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ static void s_protocol_adapter_311_subscribe_completion(
.topic_filter = aws_byte_cursor_from_buf(&subscribe_data->topic_filter),
.event_type = AWS_PASET_SUBSCRIBE,
.error_code = error_code,
.retryable = true,
};

(*adapter->config.subscription_event_callback)(&subscribe_event, adapter->config.user_data);
Expand Down Expand Up @@ -487,6 +488,28 @@ static void s_aws_mqtt_protocol_adapter_5_destroy(void *impl) {

/* Subscribe */

static bool s_is_retryable_subscribe(enum aws_mqtt5_suback_reason_code reason_code, int error_code) {
if (error_code == AWS_ERROR_MQTT5_PACKET_VALIDATION || error_code == AWS_ERROR_MQTT5_SUBSCRIBE_OPTIONS_VALIDATION) {
return false;
} else if (error_code != AWS_ERROR_SUCCESS) {
return true;
}

switch (reason_code) {
case AWS_MQTT5_SARC_GRANTED_QOS_0:
case AWS_MQTT5_SARC_GRANTED_QOS_1:
case AWS_MQTT5_SARC_GRANTED_QOS_2:
case AWS_MQTT5_SARC_UNSPECIFIED_ERROR:
case AWS_MQTT5_SARC_PACKET_IDENTIFIER_IN_USE:
case AWS_MQTT5_SARC_IMPLEMENTATION_SPECIFIC_ERROR:
case AWS_MQTT5_SARC_QUOTA_EXCEEDED:
return true;

default:
return false;
}
}

static void s_protocol_adapter_5_subscribe_completion(
const struct aws_mqtt5_packet_suback_view *suback,
int error_code,
Expand All @@ -498,6 +521,12 @@ static void s_protocol_adapter_5_subscribe_completion(
goto done;
}

enum aws_mqtt5_suback_reason_code reason_code = AWS_MQTT5_SARC_GRANTED_QOS_0;
if (suback != NULL && suback->reason_code_count > 0) {
reason_code = suback->reason_codes[0];
}
bool is_retryable = s_is_retryable_subscribe(reason_code, error_code);

if (error_code == AWS_ERROR_SUCCESS) {
if (suback == NULL || suback->reason_code_count != 1 || suback->reason_codes[0] >= 128) {
error_code = AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE;
Expand All @@ -508,6 +537,7 @@ static void s_protocol_adapter_5_subscribe_completion(
.topic_filter = aws_byte_cursor_from_buf(&subscribe_data->topic_filter),
.event_type = AWS_PASET_SUBSCRIBE,
.error_code = error_code,
.retryable = is_retryable,
};

(*adapter->config.subscription_event_callback)(&subscribe_event, adapter->config.user_data);
Expand Down
5 changes: 1 addition & 4 deletions source/request-response/request_response_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -454,10 +454,7 @@ static void s_streaming_operation_on_client_shutdown(struct aws_mqtt_rr_client_o
operation->storage.streaming_storage.options.subscription_status_callback;
void *user_data = operation->storage.streaming_storage.options.user_data;
if (subscription_status_callback != NULL) {
enum aws_rr_subscription_event_type status_type = (operation->state == AWS_MRROS_SUBSCRIBED)
? ARRSET_SUBSCRIPTION_ENDED
: ARRSET_SUBSCRIPTION_SUBSCRIBE_FAILURE;
(*subscription_status_callback)(status_type, error_code, user_data);
(*subscription_status_callback)(ARRSET_STREAMING_SUBSCRIPTION_HALTED, error_code, user_data);
}
}

Expand Down
Loading

0 comments on commit 8de5a00

Please sign in to comment.