Skip to content

Commit

Permalink
Sync point
Browse files Browse the repository at this point in the history
  • Loading branch information
Bret Ambrose committed Jan 22, 2024
1 parent 29016f1 commit 76364e9
Show file tree
Hide file tree
Showing 4 changed files with 443 additions and 0 deletions.
4 changes: 4 additions & 0 deletions include/aws/mqtt/private/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <aws/mqtt/private/client_impl_shared.h>
#include <aws/mqtt/private/fixed_header.h>
#include <aws/mqtt/private/mqtt311_decoder.h>
#include <aws/mqtt/private/mqtt311_listener.h>
#include <aws/mqtt/private/topic_tree.h>

#include <aws/common/hash_table.h>
Expand Down Expand Up @@ -255,6 +256,9 @@ struct aws_mqtt_client_connection_311_impl {
aws_mqtt_on_operation_statistics_fn *on_any_operation_statistics;
void *on_any_operation_statistics_ud;

/* listener callbacks */
struct aws_mqtt311_callback_set_manager callback_manager;

/* Connection tasks. */
struct aws_mqtt_reconnect_task *reconnect_task;
struct aws_channel_task ping_task;
Expand Down
183 changes: 183 additions & 0 deletions include/aws/mqtt/private/mqtt311_listener.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#ifndef AWS_MQTT_MQTT311_LISTENER_H
#define AWS_MQTT_MQTT311_LISTENER_H

#include <aws/mqtt/mqtt.h>

#include <aws/common/rw_lock.h>
#include <aws/mqtt/client.h>

AWS_PUSH_SANE_WARNING_LEVEL

/**
* Callback signature for when an mqtt311 listener has completely destroyed itself.
*/
typedef void(aws_mqtt311_listener_termination_completion_fn)(void *complete_ctx);

/**
* A record that tracks MQTT311 client connection callbacks which can be dynamically injected via a listener.
*
* All the callbacks that are supported here are invoked only on the 311 connection's event loop. With the
* add/remove callback set also on the event loop, everything is correctly serialized without data races.
*/
struct aws_mqtt311_callback_set {

/* Called from s_packet_handler_publish which is event-loop invoked */
aws_mqtt_client_publish_received_fn *publish_received_handler;

/* Called from s_mqtt_client_shutdown which is event-loop invoked */
aws_mqtt_client_on_connection_interrupted_fn *connection_interrupted_handler;

/* Called from s_packet_handler_connack which is event-loop invoked */
aws_mqtt_client_on_connection_interrupted_fn *connection_resumed_handler;

/* Also called from s_packet_handler_connack which is event-loop invoked */
aws_mqtt_client_on_connection_success_fn *connection_success_handler;

void *user_data;
};

/**
* An internal type for managing chains of callbacks attached to an mqtt311 client connection. Supports chains for
* lifecycle events and incoming publish packet handling.
*
* Assumed to be owned and used only by an MQTT311 client connection.
*/
struct aws_mqtt311_callback_set_manager {
struct aws_allocator *allocator;

struct aws_mqtt_client_connection *connection;

struct aws_linked_list callback_set_entries;

uint64_t next_callback_set_entry_id;
};


/**
* Configuration options for MQTT311 listener objects.
*/
struct aws_mqtt311_listener_config {

/**
* MQTT311 client connection to listen to events on
*/
struct aws_mqtt_client_connection *connection;

/**
* Callbacks to invoke when events occur on the MQTT311 client connection
*/
struct aws_mqtt311_callback_set listener_callbacks;

/**
* Listener destruction is asynchronous and thus requires a termination callback and associated user data
* to notify the user that the listener has been fully destroyed and no further events will be received.
*/
aws_mqtt311_listener_termination_completion_fn *termination_callback;
void *termination_callback_user_data;
};

AWS_EXTERN_C_BEGIN

/**
* Creates a new MQTT311 listener object. For as long as the listener lives, incoming publishes and lifecycle events
* will be forwarded to the callbacks configured on the listener.
*
* @param allocator allocator to use
* @param config listener configuration
* @return a new aws_mqtt311_listener object
*/
AWS_MQTT_API struct aws_mqtt311_listener *aws_mqtt311_listener_new(
struct aws_allocator *allocator,
struct aws_mqtt311_listener_config *config);

/**
* Adds a reference to an mqtt311 listener.
*
* @param listener listener to add a reference to
* @return the listener object
*/
AWS_MQTT_API struct aws_mqtt311_listener *aws_mqtt311_listener_acquire(struct aws_mqtt311_listener *listener);

/**
* Removes a reference to an mqtt311 listener. When the reference count drops to zero, the listener's asynchronous
* destruction will be started.
*
* @param listener listener to remove a reference from
* @return NULL
*/
AWS_MQTT_API struct aws_mqtt311_listener *aws_mqtt311_listener_release(struct aws_mqtt311_listener *listener);


/**
* Initializes a callback set manager
*/
AWS_MQTT_API
void aws_mqtt311_callback_set_manager_init(
struct aws_mqtt311_callback_set_manager *manager,
struct aws_allocator *allocator,
struct aws_mqtt_client_connection *connection);

/**
* Cleans up a callback set manager.
*
* aws_mqtt311_callback_set_manager_init must have been previously called or this will crash.
*/
AWS_MQTT_API
void aws_mqtt311_callback_set_manager_clean_up(struct aws_mqtt311_callback_set_manager *manager);

/**
* Adds a callback set to the front of the handler chain. Returns an integer id that can be used to selectively
* remove the callback set from the manager.
*
* May only be called on the client's event loop thread.
*/
AWS_MQTT_API
uint64_t aws_mqtt311_callback_set_manager_push_front(
struct aws_mqtt311_callback_set_manager *manager,
struct aws_mqtt311_callback_set *callback_set);

/**
* Removes a callback set from the handler chain.
*
* May only be called on the client's event loop thread.
*/
AWS_MQTT_API
void aws_mqtt311_callback_set_manager_remove(struct aws_mqtt311_callback_set_manager *manager, uint64_t callback_set_id);

/**
* Walks the incoming publish handler chain for an MQTT311 connection. The chain's callbacks will be invoked
* until either the end is reached or one of the callbacks returns true.
*
* May only be called on the client's event loop thread.
*/
AWS_MQTT_API
void aws_mqtt311_callback_set_manager_on_publish_received(
struct aws_mqtt311_callback_set_manager *manager,
const struct aws_byte_cursor *topic,
const struct aws_byte_cursor *payload,
bool dup,
enum aws_mqtt_qos qos,
bool retain);

AWS_MQTT_API
void aws_mqtt311_callback_set_manager_on_connection_interrupted(
struct aws_mqtt311_callback_set_manager *manager);

AWS_MQTT_API
void aws_mqtt311_callback_set_manager_on_connection_resumed(
struct aws_mqtt311_callback_set_manager *manager);

AWS_MQTT_API
void aws_mqtt311_callback_set_manager_on_connection_success(
struct aws_mqtt311_callback_set_manager *manager);

AWS_EXTERN_C_END

AWS_POP_SANE_WARNING_LEVEL

#endif /* AWS_MQTT_MQTT311_LISTENER_H */
5 changes: 5 additions & 0 deletions source/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ static void s_mqtt_client_shutdown(
(void)channel;

struct aws_mqtt_client_connection_311_impl *connection = user_data;
AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(connection->loop));

AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT, "id=%p: Channel has been shutdown with error code %d", (void *)connection, error_code);
Expand Down Expand Up @@ -801,6 +802,8 @@ static void s_mqtt_client_connection_destroy_final(struct aws_mqtt_client_connec
termination_handler_user_data = connection->on_termination_ud;
}

aws_mqtt311_callback_set_manager_clean_up(&connection->callback_manager);

/* If the reconnect_task isn't freed, free it */
if (connection->reconnect_task) {
aws_mem_release(connection->reconnect_task->allocator, connection->reconnect_task);
Expand Down Expand Up @@ -3351,6 +3354,8 @@ struct aws_mqtt_client_connection *aws_mqtt_client_connection_new(struct aws_mqt
connection->handler.vtable = aws_mqtt_get_client_channel_vtable();
connection->handler.impl = connection;

aws_mqtt311_callback_set_manager_init(&connection->callback_manager, connection->allocator, connection);

return &connection->base;

failed_init_outstanding_requests_table:
Expand Down
Loading

0 comments on commit 76364e9

Please sign in to comment.