Skip to content

Commit

Permalink
Checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
Bret Ambrose committed Nov 13, 2023
1 parent ff4b239 commit 3f704ba
Show file tree
Hide file tree
Showing 12 changed files with 319 additions and 156 deletions.
6 changes: 4 additions & 2 deletions bin/elastipubsub/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,8 @@ static void s_establish_subscriptions(struct app_ctx *app_ctx) {
app_ctx,
s_on_subscribe_removed,
s_on_subscribe_complete,
connection_data);
connection_data,
NULL);

if (id == 0) {
if (aws_mqtt_client_connection_disconnect(
Expand Down Expand Up @@ -574,7 +575,8 @@ static void s_publish(struct app_ctx *app_ctx) {
false,
&payload_cursor,
s_on_publish_complete,
app_ctx);
app_ctx,
NULL);

if (id == 0) {
++failed_publishes;
Expand Down
51 changes: 46 additions & 5 deletions include/aws/mqtt/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,42 @@ struct aws_mqtt_connection_options {
bool clean_session;
};

/**
* Additional publish options that are not directly part of the base API. New publish options should always go
* here.
*/
struct aws_mqtt_publish_options {
/**
* An override value for the connection-wide protocol operation timeout setting. If this is zero, then the
* connection's timeout value is used.
*/
uint32_t protocol_operation_timeout_ms_override;
};

/**
* Additional subscribe options that are not directly part of the base API. New subscribe options should always go
* here.
*/
struct aws_mqtt_subscribe_options {
/**
* An override value for the connection-wide protocol operation timeout setting. If this is zero, then the
* connection's timeout value is used.
*/
uint32_t protocol_operation_timeout_ms_override;
};

/**
* Additional unsubscribe options that are not directly part of the base API. New unsubscribe options should always go
* here.
*/
struct aws_mqtt_unsubscribe_options {
/**
* An override value for the connection-wide protocol operation timeout setting. If this is zero, then the
* connection's timeout value is used.
*/
uint32_t protocol_operation_timeout_ms_override;
};

/**
* Contains some simple statistics about the current state of the connection's queue of operations
*/
Expand Down Expand Up @@ -595,7 +631,8 @@ uint16_t aws_mqtt_client_connection_subscribe_multiple(
struct aws_mqtt_client_connection *connection,
const struct aws_array_list *topic_filters,
aws_mqtt_suback_multi_fn *on_suback,
void *on_suback_ud);
void *on_suback_ud,
struct aws_mqtt_subscribe_options *subscribe_options);

/**
* Subscribe to a single topic filter. on_publish will be called when a PUBLISH matching topic_filter is received.
Expand All @@ -621,7 +658,8 @@ uint16_t aws_mqtt_client_connection_subscribe(
void *on_publish_ud,
aws_mqtt_userdata_cleanup_fn *on_ud_cleanup,
aws_mqtt_suback_fn *on_suback,
void *on_suback_ud);
void *on_suback_ud,
struct aws_mqtt_subscribe_options *subscribe_options);

/**
* Resubscribe to all topics currently subscribed to. This is to help when resuming a connection with a clean session.
Expand All @@ -637,7 +675,8 @@ AWS_MQTT_API
uint16_t aws_mqtt_resubscribe_existing_topics(
struct aws_mqtt_client_connection *connection,
aws_mqtt_suback_multi_fn *on_suback,
void *on_suback_ud);
void *on_suback_ud,
struct aws_mqtt_subscribe_options *subscribe_options);

/**
* Unsubscribe to a topic filter.
Expand All @@ -655,7 +694,8 @@ uint16_t aws_mqtt_client_connection_unsubscribe(
struct aws_mqtt_client_connection *connection,
const struct aws_byte_cursor *topic_filter,
aws_mqtt_op_complete_fn *on_unsuback,
void *on_unsuback_ud);
void *on_unsuback_ud,
struct aws_mqtt_unsubscribe_options *unsubscribe_options);

/**
* Send a PUBLISH packet over connection.
Expand All @@ -680,7 +720,8 @@ uint16_t aws_mqtt_client_connection_publish(
bool retain,
const struct aws_byte_cursor *payload,
aws_mqtt_op_complete_fn *on_complete,
void *userdata);
void *userdata,
struct aws_mqtt_publish_options *publish_options);

/**
* Queries the connection's internal statistics for incomplete/unacked operations.
Expand Down
14 changes: 9 additions & 5 deletions include/aws/mqtt/private/client_impl_shared.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ struct aws_mqtt_client_connection_vtable {
void *impl,
const struct aws_array_list *topic_filters,
aws_mqtt_suback_multi_fn *on_suback,
void *on_suback_ud);
void *on_suback_ud,
struct aws_mqtt_subscribe_options *subscribe_options);

uint16_t (*subscribe_fn)(
void *impl,
Expand All @@ -87,15 +88,17 @@ struct aws_mqtt_client_connection_vtable {
void *on_publish_ud,
aws_mqtt_userdata_cleanup_fn *on_ud_cleanup,
aws_mqtt_suback_fn *on_suback,
void *on_suback_ud);
void *on_suback_ud,
struct aws_mqtt_subscribe_options *subscribe_options);

uint16_t (*resubscribe_existing_topics_fn)(void *impl, aws_mqtt_suback_multi_fn *on_suback, void *on_suback_ud);
uint16_t (*resubscribe_existing_topics_fn)(void *impl, aws_mqtt_suback_multi_fn *on_suback, void *on_suback_ud, struct aws_mqtt_subscribe_options *subscribe_options);

uint16_t (*unsubscribe_fn)(
void *impl,
const struct aws_byte_cursor *topic_filter,
aws_mqtt_op_complete_fn *on_unsuback,
void *on_unsuback_ud);
void *on_unsuback_ud,
struct aws_mqtt_unsubscribe_options *unsubscribe_options);

uint16_t (*publish_fn)(
void *impl,
Expand All @@ -104,7 +107,8 @@ struct aws_mqtt_client_connection_vtable {
bool retain,
const struct aws_byte_cursor *payload,
aws_mqtt_op_complete_fn *on_complete,
void *userdata);
void *userdata,
struct aws_mqtt_publish_options *publish_options);

int (*get_stats_fn)(void *impl, struct aws_mqtt_connection_operation_statistics *stats);
};
Expand Down
12 changes: 12 additions & 0 deletions include/aws/mqtt/private/v5/mqtt5_to_mqtt3_adapter_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ struct aws_mqtt5_to_mqtt3_adapter_publish_options {

aws_mqtt_op_complete_fn *on_complete;
void *on_complete_userdata;

uint32_t ack_timeout_seconds_override;
};

/*
Expand All @@ -45,6 +47,8 @@ struct aws_mqtt5_to_mqtt3_adapter_subscribe_options {

aws_mqtt_suback_multi_fn *on_multi_suback;
void *on_multi_suback_user_data;

uint32_t ack_timeout_seconds_override;
};

struct aws_mqtt5_to_mqtt3_adapter_unsubscribe_options {
Expand All @@ -54,6 +58,8 @@ struct aws_mqtt5_to_mqtt3_adapter_unsubscribe_options {

aws_mqtt_op_complete_fn *on_unsuback;
void *on_unsuback_user_data;

uint32_t ack_timeout_seconds_override;
};

enum aws_mqtt5_to_mqtt3_adapter_operation_type {
Expand Down Expand Up @@ -125,6 +131,12 @@ struct aws_mqtt5_to_mqtt3_adapter_operation_subscribe {

aws_mqtt_suback_multi_fn *on_multi_suback;
void *on_multi_suback_user_data;

/*
* Resusbscribe does not instantiate its mqtt5 subscribe operation until we're across a thread boundary, so
* we store the timeout override here during that interval.
*/
uint32_t ack_timeout_seconds_override;
};

struct aws_mqtt5_to_mqtt3_adapter_operation_unsubscribe {
Expand Down
15 changes: 10 additions & 5 deletions source/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1972,7 +1972,8 @@ static uint16_t s_aws_mqtt_client_connection_311_subscribe_multiple(
void *impl,
const struct aws_array_list *topic_filters,
aws_mqtt_suback_multi_fn *on_suback,
void *on_suback_ud) {
void *on_suback_ud,
struct aws_mqtt_subscribe_options *subscribe_options) {

struct aws_mqtt_client_connection_311_impl *connection = impl;

Expand Down Expand Up @@ -2143,7 +2144,8 @@ static uint16_t s_aws_mqtt_client_connection_311_subscribe(
void *on_publish_ud,
aws_mqtt_userdata_cleanup_fn *on_ud_cleanup,
aws_mqtt_suback_fn *on_suback,
void *on_suback_ud) {
void *on_suback_ud,
struct aws_mqtt_subscribe_options *subscribe_options) {

struct aws_mqtt_client_connection_311_impl *connection = impl;

Expand Down Expand Up @@ -2430,7 +2432,8 @@ static void s_resubscribe_complete(
static uint16_t s_aws_mqtt_311_resubscribe_existing_topics(
void *impl,
aws_mqtt_suback_multi_fn *on_suback,
void *on_suback_ud) {
void *on_suback_ud,
struct aws_mqtt_subscribe_options *subscribe_options) {

struct aws_mqtt_client_connection_311_impl *connection = impl;

Expand Down Expand Up @@ -2666,7 +2669,8 @@ static uint16_t s_aws_mqtt_client_connection_311_unsubscribe(
void *impl,
const struct aws_byte_cursor *topic_filter,
aws_mqtt_op_complete_fn *on_unsuback,
void *on_unsuback_ud) {
void *on_unsuback_ud,
struct aws_mqtt_unsubscribe_options *unsubscribe_options) {

struct aws_mqtt_client_connection_311_impl *connection = impl;

Expand Down Expand Up @@ -2936,7 +2940,8 @@ static uint16_t s_aws_mqtt_client_connection_311_publish(
bool retain,
const struct aws_byte_cursor *payload,
aws_mqtt_op_complete_fn *on_complete,
void *userdata) {
void *userdata,
struct aws_mqtt_publish_options *publish_options) {

struct aws_mqtt_client_connection_311_impl *connection = impl;

Expand Down
25 changes: 15 additions & 10 deletions source/client_impl_shared.c
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,10 @@ uint16_t aws_mqtt_client_connection_subscribe_multiple(
struct aws_mqtt_client_connection *connection,
const struct aws_array_list *topic_filters,
aws_mqtt_suback_multi_fn *on_suback,
void *on_suback_ud) {
void *on_suback_ud,
struct aws_mqtt_subscribe_options *subscribe_options) {

return (*connection->vtable->subscribe_multiple_fn)(connection->impl, topic_filters, on_suback, on_suback_ud);
return (*connection->vtable->subscribe_multiple_fn)(connection->impl, topic_filters, on_suback, on_suback_ud, subscribe_options);
}

uint16_t aws_mqtt_client_connection_subscribe(
Expand All @@ -162,27 +163,30 @@ uint16_t aws_mqtt_client_connection_subscribe(
void *on_publish_ud,
aws_mqtt_userdata_cleanup_fn *on_ud_cleanup,
aws_mqtt_suback_fn *on_suback,
void *on_suback_ud) {
void *on_suback_ud,
struct aws_mqtt_subscribe_options *subscribe_options) {

return (*connection->vtable->subscribe_fn)(
connection->impl, topic_filter, qos, on_publish, on_publish_ud, on_ud_cleanup, on_suback, on_suback_ud);
connection->impl, topic_filter, qos, on_publish, on_publish_ud, on_ud_cleanup, on_suback, on_suback_ud, subscribe_options);
}

uint16_t aws_mqtt_resubscribe_existing_topics(
struct aws_mqtt_client_connection *connection,
aws_mqtt_suback_multi_fn *on_suback,
void *on_suback_ud) {
void *on_suback_ud,
struct aws_mqtt_subscribe_options *subscribe_options) {

return (*connection->vtable->resubscribe_existing_topics_fn)(connection->impl, on_suback, on_suback_ud);
return (*connection->vtable->resubscribe_existing_topics_fn)(connection->impl, on_suback, on_suback_ud, subscribe_options);
}

uint16_t aws_mqtt_client_connection_unsubscribe(
struct aws_mqtt_client_connection *connection,
const struct aws_byte_cursor *topic_filter,
aws_mqtt_op_complete_fn *on_unsuback,
void *on_unsuback_ud) {
void *on_unsuback_ud,
struct aws_mqtt_unsubscribe_options *unsubscribe_options) {

return (*connection->vtable->unsubscribe_fn)(connection->impl, topic_filter, on_unsuback, on_unsuback_ud);
return (*connection->vtable->unsubscribe_fn)(connection->impl, topic_filter, on_unsuback, on_unsuback_ud, unsubscribe_options);
}

uint16_t aws_mqtt_client_connection_publish(
Expand All @@ -192,9 +196,10 @@ uint16_t aws_mqtt_client_connection_publish(
bool retain,
const struct aws_byte_cursor *payload,
aws_mqtt_op_complete_fn *on_complete,
void *userdata) {
void *userdata,
struct aws_mqtt_publish_options *publish_options) {

return (*connection->vtable->publish_fn)(connection->impl, topic, qos, retain, payload, on_complete, userdata);
return (*connection->vtable->publish_fn)(connection->impl, topic, qos, retain, payload, on_complete, userdata, publish_options);
}

int aws_mqtt_client_connection_get_stats(
Expand Down
Loading

0 comments on commit 3f704ba

Please sign in to comment.