Skip to content

Commit

Permalink
CR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
bretambrose committed Jul 11, 2023
1 parent a09799a commit 7d5665c
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 16 deletions.
62 changes: 50 additions & 12 deletions include/aws/mqtt/private/mqtt_subscription_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
#include "aws/mqtt/v5/mqtt5_types.h"
#include <aws/common/hash_table.h>

/**
* (Transient) configuration options about a single persistent MQTT topic filter subscription
*/
struct aws_mqtt_subscription_set_subscription_options {
struct aws_byte_cursor topic_filter;

Expand All @@ -21,19 +24,28 @@ struct aws_mqtt_subscription_set_subscription_options {
bool retain_as_published;
enum aws_mqtt5_retain_handling_type retain_handling_type;

/* Callback invoked when this subscription matches an incoming publish */
aws_mqtt_client_publish_received_fn *on_publish_received;

/* Callback invoked when this subscription is removed from the set */
aws_mqtt_userdata_cleanup_fn *on_cleanup;

void *callback_user_data;
};

/**
* Persistent structure to track a single MQTT topic filter subscription
*/
struct aws_mqtt_subscription_set_subscription_record {
struct aws_allocator *allocator;
struct aws_byte_buf topic_filter;

struct aws_mqtt_subscription_set_subscription_options subscription_view;
};

/**
* (Transient) configuration options about an incoming publish message
*/
struct aws_mqtt_subscription_set_publish_received_options {
struct aws_mqtt_client_connection *connection;

Expand All @@ -45,15 +57,38 @@ struct aws_mqtt_subscription_set_publish_received_options {
struct aws_byte_cursor payload;
};

/**
* A node in the topic trie maintained by the subscription set. Each node represents a single "path segment" in a
* topic filter "path." Segments can be empty.
*
* Some examples (topic filter -> path segments):
*
* "hello/world" -> [ "hello", "world" ]
* "a/b/" -> [ "a", "b", "" ]
* "/b/" -> [ "", "b", "" ]
* "a/#/c" -> [ "a", "#", "c" ]
*
* On incoming publish, we walk the tree invoking callbacks based on topic vs. topic filter matching, segment by
* segment.
*
*/
struct aws_mqtt_subscription_set_topic_tree_node {
struct aws_allocator *allocator;

struct aws_byte_cursor topic_segment_cursor; // segment can be empty
struct aws_byte_cursor topic_segment_cursor; /* segment can be empty */
struct aws_byte_buf topic_segment;

struct aws_mqtt_subscription_set_topic_tree_node *parent;
struct aws_hash_table children; // (a node's byte_cursor -> node)

struct aws_hash_table children; /* (embedded topic_segment -> containing node) */

/*
* A node starts with a ref count of one and is incremented every time a new, overlapping path is added
* to the subscription set. When the ref count goes to zero, that means there are not subscriptions using the
* segment (or path suffix) represented by this node and therefor it can be deleted without any additional
* analysis.
*
* Replacing an existing path does not change the ref count.
*/
size_t ref_count;

bool is_subscription;
Expand All @@ -78,7 +113,7 @@ struct aws_mqtt_subscription_set {
/* a permanent ref */
struct aws_mqtt_subscription_set_topic_tree_node *root;

/* topic_filter_cursor -> persistent subscription */
/* embedded topic_filter_cursor -> persistent subscription */
struct aws_hash_table subscriptions;
};

Expand Down Expand Up @@ -107,7 +142,7 @@ AWS_MQTT_API void aws_mqtt_subscription_set_destroy(struct aws_mqtt_subscription
* @return true if the topic filter exists in the table of subscriptions, false otherwise
*/
AWS_MQTT_API bool aws_mqtt_subscription_set_is_subscribed(
struct aws_mqtt_subscription_set *subscription_set,
const struct aws_mqtt_subscription_set *subscription_set,
struct aws_byte_cursor topic_filter);

/**
Expand All @@ -118,7 +153,7 @@ AWS_MQTT_API bool aws_mqtt_subscription_set_is_subscribed(
* @return true if the set's topic tree contains a publish received callback for the topic filter, false otherwise
*/
AWS_MQTT_API bool aws_mqtt_subscription_set_is_in_topic_tree(
struct aws_mqtt_subscription_set *subscription_set,
const struct aws_mqtt_subscription_set *subscription_set,
struct aws_byte_cursor topic_filter);

/**
Expand Down Expand Up @@ -149,7 +184,7 @@ AWS_MQTT_API void aws_mqtt_subscription_set_remove_subscription(
* @param publish_options received publish message properties
*/
AWS_MQTT_API void aws_mqtt_subscription_set_on_publish_received(
struct aws_mqtt_subscription_set *subscription_set,
const struct aws_mqtt_subscription_set *subscription_set,
const struct aws_mqtt_subscription_set_publish_received_options *publish_options);

/**
Expand All @@ -161,25 +196,28 @@ AWS_MQTT_API void aws_mqtt_subscription_set_on_publish_received(
*
* The caller must invoke the cleanup function for array lists on the result. The list elements are of type
* 'struct aws_mqtt_subscription_set_subscription_options' and the topic filter cursor points to the subscription set's
* internal record.
* internal record. This means that the result must be used and cleaned up in local scope.
*/
AWS_MQTT_API void aws_mqtt_subscription_set_get_subscriptions(
struct aws_mqtt_subscription_set *subscription_set,
struct aws_array_list *subscriptions);

/**
* Creates a new subscription record. A subscription record tracks all information about a single MQTT topic filter
* subscription
*
* @param allocator
* @param subscription
* @return
* @param allocator memory allocator to use
* @param subscription all relevant information about the subscription
* @return a new persistent subscription record
*/
AWS_MQTT_API struct aws_mqtt_subscription_set_subscription_record *aws_mqtt_subscription_set_subscription_record_new(
struct aws_allocator *allocator,
const struct aws_mqtt_subscription_set_subscription_options *subscription);

/**
* Destroys a subscription record
*
* @param record
* @param record subscription record to destroy
*/
AWS_MQTT_API void aws_mqtt_subscription_set_subscription_record_destroy(
struct aws_mqtt_subscription_set_subscription_record *record);
Expand Down
8 changes: 4 additions & 4 deletions source/mqtt_subscription_set.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void aws_mqtt_subscription_set_destroy(struct aws_mqtt_subscription_set *subscri
}

static struct aws_mqtt_subscription_set_topic_tree_node *s_aws_mqtt_subscription_set_get_existing_subscription_node(
struct aws_mqtt_subscription_set *subscription_set,
const struct aws_mqtt_subscription_set *subscription_set,
struct aws_byte_cursor topic_filter) {

struct aws_mqtt_subscription_set_topic_tree_node *current_node = subscription_set->root;
Expand All @@ -155,7 +155,7 @@ static struct aws_mqtt_subscription_set_topic_tree_node *s_aws_mqtt_subscription
}

bool aws_mqtt_subscription_set_is_subscribed(
struct aws_mqtt_subscription_set *subscription_set,
const struct aws_mqtt_subscription_set *subscription_set,
struct aws_byte_cursor topic_filter) {

struct aws_hash_element *element = NULL;
Expand All @@ -165,7 +165,7 @@ bool aws_mqtt_subscription_set_is_subscribed(
}

bool aws_mqtt_subscription_set_is_in_topic_tree(
struct aws_mqtt_subscription_set *subscription_set,
const struct aws_mqtt_subscription_set *subscription_set,
struct aws_byte_cursor topic_filter) {
struct aws_mqtt_subscription_set_topic_tree_node *existing_node =
s_aws_mqtt_subscription_set_get_existing_subscription_node(subscription_set, topic_filter);
Expand Down Expand Up @@ -354,7 +354,7 @@ static void s_invoke_on_publish_received(
}

void aws_mqtt_subscription_set_on_publish_received(
struct aws_mqtt_subscription_set *subscription_set,
const struct aws_mqtt_subscription_set *subscription_set,
const struct aws_mqtt_subscription_set_publish_received_options *publish_options) {

struct aws_byte_cursor slw_cursor = aws_byte_cursor_from_string(s_single_level_wildcard);
Expand Down
1 change: 1 addition & 0 deletions tests/v5/mqtt_subscription_set_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,7 @@ static int s_mqtt_subscription_set_get_subscriptions_fn(struct aws_allocator *al
ASSERT_TRUE(aws_mqtt_subscription_set_is_subscribed(subscription_set, aws_byte_cursor_from_c_str("a/b/c")));
ASSERT_FALSE(aws_mqtt_subscription_set_is_subscribed(subscription_set, aws_byte_cursor_from_c_str("/#")));
ASSERT_FALSE(aws_mqtt_subscription_set_is_subscribed(subscription_set, aws_byte_cursor_from_c_str("/")));
ASSERT_FALSE(aws_mqtt_subscription_set_is_subscribed(subscription_set, aws_byte_cursor_from_c_str("a")));

struct aws_array_list subscriptions;
aws_mqtt_subscription_set_get_subscriptions(subscription_set, &subscriptions);
Expand Down

0 comments on commit 7d5665c

Please sign in to comment.