Skip to content

Commit

Permalink
return an error code when an mqtt5 to mqtt3 adapter puback has a fail…
Browse files Browse the repository at this point in the history
…ing reason code (#352)
  • Loading branch information
sbSteveK authored Feb 29, 2024
1 parent d4346e0 commit 74da9ca
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 3 deletions.
1 change: 1 addition & 0 deletions include/aws/mqtt/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ enum aws_mqtt_error {
AWS_ERROR_MQTT_CONNECTION_RESET_FOR_ADAPTER_CONNECT,
AWS_ERROR_MQTT_CONNECTION_RESUBSCRIBE_NO_TOPICS,
AWS_ERROR_MQTT_CONNECTION_SUBSCRIBE_FAILURE,
AWS_ERROR_MQTT_ACK_REASON_CODE_FAILURE,

AWS_ERROR_END_MQTT_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_MQTT_PACKAGE_ID),
};
Expand Down
3 changes: 3 additions & 0 deletions source/mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ bool aws_mqtt_is_valid_topic_filter(const struct aws_byte_cursor *topic_filter)
AWS_DEFINE_ERROR_INFO_MQTT(
AWS_ERROR_MQTT_CONNECTION_SUBSCRIBE_FAILURE,
"MQTT subscribe operation failed"),
AWS_DEFINE_ERROR_INFO_MQTT(
AWS_ERROR_MQTT_ACK_REASON_CODE_FAILURE,
"MQTT ack packet received with a failing reason code"),
};
/* clang-format on */
#undef AWS_DEFINE_ERROR_INFO_MQTT
Expand Down
12 changes: 9 additions & 3 deletions source/v5/mqtt5_to_mqtt3_adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -1823,16 +1823,22 @@ static void s_aws_mqtt5_to_mqtt3_adapter_publish_completion_fn(
int error_code,
void *complete_ctx) {

(void)packet_type;
(void)packet;
int error_code_final = error_code;

if (error_code_final == AWS_ERROR_SUCCESS && packet_type == AWS_MQTT5_PT_PUBACK) {
const struct aws_mqtt5_packet_puback_view *puback_view = packet;
if (puback_view->reason_code >= 128) {
error_code_final = AWS_ERROR_MQTT_ACK_REASON_CODE_FAILURE;
}
}

struct aws_mqtt5_to_mqtt3_adapter_operation_publish *publish_op = complete_ctx;

if (publish_op->on_publish_complete != NULL) {
(*publish_op->on_publish_complete)(
&publish_op->base.adapter->base,
publish_op->base.id,
error_code,
error_code_final,
publish_op->on_publish_complete_user_data);
}

Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ add_test_case(mqtt5to3_adapter_publish_failure_invalid)
add_test_case(mqtt5to3_adapter_publish_failure_offline_queue_policy)
add_test_case(mqtt5to3_adapter_publish_success_qos0)
add_test_case(mqtt5to3_adapter_publish_success_qos1)
add_test_case(mqtt5to3_adapter_publish_qos1_fail_ack)
add_test_case(mqtt5to3_adapter_publish_no_ack)
add_test_case(mqtt5to3_adapter_publish_interrupted)
add_test_case(mqtt5to3_adapter_subscribe_single_success)
Expand Down
80 changes: 80 additions & 0 deletions tests/v5/mqtt5_to_mqtt3_adapter_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -2368,6 +2368,86 @@ static int s_mqtt5to3_adapter_publish_success_qos1_fn(struct aws_allocator *allo

AWS_TEST_CASE(mqtt5to3_adapter_publish_success_qos1, s_mqtt5to3_adapter_publish_success_qos1_fn)

int aws_mqtt5_mock_server_handle_not_authorized_publish_puback(
void *packet,
struct aws_mqtt5_server_mock_connection_context *connection,
void *user_data) {

(void)user_data;

struct aws_mqtt5_packet_publish_view *publish_view = packet;
if (publish_view->qos != AWS_MQTT5_QOS_AT_LEAST_ONCE) {
return AWS_OP_SUCCESS;
}

struct aws_mqtt5_packet_puback_view puback_view = {
.packet_id = publish_view->packet_id,
.reason_code = AWS_MQTT5_PARC_NOT_AUTHORIZED,
};

return aws_mqtt5_mock_server_send_packet(connection, AWS_MQTT5_PT_PUBACK, &puback_view);
}

static int s_mqtt5to3_adapter_publish_qos1_fail_ack_fn(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

aws_mqtt_library_init(allocator);

struct mqtt5_client_test_options test_options;
aws_mqtt5_client_test_init_default_options(&test_options);

/* Return a fail qos1 puback */
test_options.server_function_table.packet_handlers[AWS_MQTT5_PT_PUBLISH] =
aws_mqtt5_mock_server_handle_not_authorized_publish_puback;

struct aws_mqtt5_client_mqtt5_mock_test_fixture_options test_fixture_options = {
.client_options = &test_options.client_options,
.server_function_table = &test_options.server_function_table,
};

struct aws_mqtt5_to_mqtt3_adapter_test_fixture fixture;
ASSERT_SUCCESS(aws_mqtt5_to_mqtt3_adapter_test_fixture_init(&fixture, allocator, &test_fixture_options));

struct aws_mqtt_client_connection *connection = fixture.connection;

struct aws_mqtt_connection_options connection_options;
s_init_adapter_connection_options_from_fixture(&connection_options, &fixture);

connection_options.on_connection_complete = s_aws_mqtt5_to_mqtt3_adapter_test_fixture_record_connection_complete;
connection_options.user_data = &fixture;

aws_mqtt_client_connection_connect(connection, &connection_options);

s_wait_for_n_adapter_lifecycle_events(&fixture, AWS_MQTT3_LET_CONNECTION_COMPLETE, 1);

struct aws_byte_cursor topic = aws_byte_cursor_from_c_str("derp");

aws_mqtt_client_connection_publish(
connection,
&topic,
AWS_MQTT_QOS_AT_LEAST_ONCE,
false,
NULL,
s_aws_mqtt5_to_mqtt3_adapter_test_fixture_record_publish_complete,
&fixture);

s_wait_for_n_adapter_operation_events(&fixture, AWS_MQTT3_OET_PUBLISH_COMPLETE, 1);

struct aws_mqtt3_operation_event expected_events[] = {{
.type = AWS_MQTT3_OET_PUBLISH_COMPLETE,
.error_code = AWS_ERROR_MQTT_ACK_REASON_CODE_FAILURE,
}};
s_aws_mqtt5_to_mqtt3_adapter_test_fixture_verify_operation_sequence(
&fixture, AWS_ARRAY_SIZE(expected_events), expected_events, AWS_ARRAY_SIZE(expected_events));

aws_mqtt5_to_mqtt3_adapter_test_fixture_clean_up(&fixture);
aws_mqtt_library_clean_up();

return AWS_OP_SUCCESS;
}

AWS_TEST_CASE(mqtt5to3_adapter_publish_qos1_fail_ack, s_mqtt5to3_adapter_publish_qos1_fail_ack_fn)

static int s_mqtt5to3_adapter_publish_no_ack_fn(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

Expand Down

0 comments on commit 74da9ca

Please sign in to comment.