Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protocol adapter skeleton #342

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ file(GLOB AWS_MQTT_PRIV_EXPOSED_HEADERS
file(GLOB AWS_MQTT_SRC
"source/*.c"
"source/v5/*.c"
"source/request-response/*.c"
)

file(GLOB MQTT_HEADERS
Expand Down
19 changes: 19 additions & 0 deletions include/aws/mqtt/private/client_impl_shared.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,20 @@

struct aws_mqtt_client_connection;

/*
* Internal enum that indicates what type of struct the underlying impl pointer actually is. We use this
* to safely interact with private APIs on the implementation or extract the adapted 5 client directly, as
* necessary.
*/
enum aws_mqtt311_impl_type {

/* 311 connection impl can be cast to `struct aws_mqtt_client_connection_311_impl` */
AWS_MQTT311_IT_311_CONNECTION,

/* 311 connection impl can be cast to `struct aws_mqtt_client_connection_5_impl`*/
AWS_MQTT311_IT_5_ADAPTER,
};

struct aws_mqtt_client_connection_vtable {

struct aws_mqtt_client_connection *(*acquire_fn)(void *impl);
Expand Down Expand Up @@ -107,13 +121,18 @@ struct aws_mqtt_client_connection_vtable {
void *userdata);

int (*get_stats_fn)(void *impl, struct aws_mqtt_connection_operation_statistics *stats);

enum aws_mqtt311_impl_type (*get_impl_type)(void *impl);
};

struct aws_mqtt_client_connection {
struct aws_mqtt_client_connection_vtable *vtable;
void *impl;
};

AWS_MQTT_API enum aws_mqtt311_impl_type aws_mqtt_client_connection_get_impl_type(
struct aws_mqtt_client_connection *connection);

AWS_MQTT_API uint64_t aws_mqtt_hash_uint16_t(const void *item);

AWS_MQTT_API bool aws_mqtt_compare_uint16_t_eq(const void *a, const void *b);
Expand Down
186 changes: 186 additions & 0 deletions include/aws/mqtt/private/request-response/protocol_adapter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
#ifndef AWS_MQTT_PRIVATE_REQUEST_RESPONSE_PROTOCOL_ADAPTER_H
#define AWS_MQTT_PRIVATE_REQUEST_RESPONSE_PROTOCOL_ADAPTER_H

/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#include <aws/mqtt/exports.h>
#include <aws/mqtt/mqtt.h>

#include <aws/common/byte_buf.h>

struct aws_allocator;
struct aws_mqtt_client_connection;
struct aws_mqtt5_client;

/*
* The request-response protocol adapter is a translation layer that sits between the request-response native client
* implementation and a protocol client capable of subscribing, unsubscribing, and publishing MQTT messages.
* Valid protocol clients include the CRT MQTT5 client, the CRT MQTT311 client, and an eventstream RPC connection
bretambrose marked this conversation as resolved.
Show resolved Hide resolved
* that belongs to a Greengrass IPC client. Each of these protocol clients has a different (or even implicit)
* contract for carrying out pub-sub operations. The protocol adapter abstracts these details with a simple,
* minimal interface based on the requirements identified in the request-response design documents.
*/

/*
* Minimal MQTT subscribe options
*/
struct aws_protocol_adapter_subscribe_options {
struct aws_byte_cursor topic_filter;
uint32_t ack_timeout_seconds;
};

/*
* Minimal MQTT unsubscribe options
*/
struct aws_protocol_adapter_unsubscribe_options {
struct aws_byte_cursor topic_filter;
uint32_t ack_timeout_seconds;
};

/*
* Minimal MQTT publish options
*/
struct aws_protocol_adapter_publish_options {
struct aws_byte_cursor topic;
struct aws_byte_cursor payload;
uint32_t ack_timeout_seconds;

/*
* Invoked on success/failure of the publish itself. Our implementations use QoS1 which means that success
* will be on puback receipt.
*/
void (*completion_callback_fn)(bool, void *);
bretambrose marked this conversation as resolved.
Show resolved Hide resolved

/*
* User data to pass in when invoking the completion callback
*/
void *user_data;
};

/*
* Describes the type of subscription event (relative to a topic filter)
*/
enum aws_protocol_adapter_subscription_event_type {
AWS_PASET_SUBSCRIBE_SUCCESS,
AWS_PASET_SUBSCRIBE_FAILURE,
AWS_PASET_UNSUBSCRIBE_SUCCESS,
AWS_PASET_UNSUBSCRIBE_FAILURE,
};

/*
* An event emitted by the protocol adapter when a subscribe or unsubscribe is completed by the adapted protocol
* client.
*/
struct aws_protocol_adapter_subscription_event {
struct aws_byte_cursor topic_filter;
enum aws_protocol_adapter_subscription_event_type event_type;
};

/*
* An event emitted by the protocol adapter whenever a publish is received by the protocol client. This will
* potentially include messages that are completely unrelated to MQTT request-response. The topic is the first
* thing that should be checked for relevance.
*/
struct aws_protocol_adapter_incoming_publish_event {
struct aws_byte_cursor topic;
struct aws_byte_cursor payload;
};

/*
* An event emitted by the protocol adapter whenever the protocol client successfully reconnects to the broker.
*/
struct aws_protocol_adapter_session_event {
bool joined_session;
};

typedef void(
aws_protocol_adapter_subscription_event_fn)(struct aws_protocol_adapter_subscription_event *event, void *user_data);
typedef void(aws_protocol_adapter_incoming_publish_fn)(
struct aws_protocol_adapter_incoming_publish_event *publish,
void *user_data);
typedef void(aws_protocol_adapter_terminate_callback_fn)(void *user_data);
typedef void(aws_protocol_adapter_session_event_fn)(struct aws_protocol_adapter_session_event *event, void *user_data);

/*
* Set of callbacks invoked by the protocol adapter. These must all be set.
*/
struct aws_mqtt_protocol_adapter_options {
aws_protocol_adapter_subscription_event_fn *subscription_event_callback;
aws_protocol_adapter_incoming_publish_fn *incoming_publish_callback;
aws_protocol_adapter_terminate_callback_fn *terminate_callback;
aws_protocol_adapter_session_event_fn *session_event_callback;

/*
* User data to pass into all singleton protocol adapter callbacks. Likely either the request-response client
* or the subscription manager component of the request-response client.
*/
void *user_data;
};

struct aws_mqtt_protocol_adapter_vtable {

void (*aws_mqtt_protocol_adapter_destroy_fn)(void *);

int (*aws_mqtt_protocol_adapter_subscribe_fn)(void *, struct aws_protocol_adapter_subscribe_options *);

int (*aws_mqtt_protocol_adapter_unsubscribe_fn)(void *, struct aws_protocol_adapter_unsubscribe_options *);

int (*aws_mqtt_protocol_adapter_publish_fn)(void *, struct aws_protocol_adapter_publish_options *);
};

struct aws_mqtt_protocol_adapter {
const struct aws_mqtt_protocol_adapter_vtable *vtable;
void *impl;
};

AWS_EXTERN_C_BEGIN

/*
* Creates a new request-response protocol adapter from an MQTT311 client
*/
AWS_MQTT_API struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_311(
bretambrose marked this conversation as resolved.
Show resolved Hide resolved
struct aws_allocator *allocator,
struct aws_mqtt_protocol_adapter_options *options,
struct aws_mqtt_client_connection *connection);

/*
* Creates a new request-response protocol adapter from an MQTT5 client
*/
AWS_MQTT_API struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_5(
struct aws_allocator *allocator,
struct aws_mqtt_protocol_adapter_options *options,
struct aws_mqtt5_client *client);

/*
* Destroys a request-response protocol adapter. Destruction is an asynchronous process and the caller must
* wait for the termination callback to be invoked before assuming that no further callbacks will be invoked.
*/
AWS_MQTT_API void aws_mqtt_protocol_adapter_destroy(struct aws_mqtt_protocol_adapter *adapter);

/*
* Asks the adapted protocol client to perform an MQTT subscribe operation
*/
AWS_MQTT_API int aws_mqtt_protocol_adapter_subscribe(
struct aws_mqtt_protocol_adapter *adapter,
struct aws_protocol_adapter_subscribe_options *options);

/*
* Asks the adapted protocol client to perform an MQTT unsubscribe operation
*/
AWS_MQTT_API int aws_mqtt_protocol_adapter_unsubscribe(
struct aws_mqtt_protocol_adapter *adapter,
struct aws_protocol_adapter_unsubscribe_options *options);

/*
* Asks the adapted protocol client to perform an MQTT publish operation
*/
AWS_MQTT_API int aws_mqtt_protocol_adapter_publish(
struct aws_mqtt_protocol_adapter *adapter,
struct aws_protocol_adapter_publish_options *options);

AWS_EXTERN_C_END

#endif /* AWS_MQTT_PRIVATE_REQUEST_RESPONSE_PROTOCOL_ADAPTER_H */
66 changes: 66 additions & 0 deletions include/aws/mqtt/private/request-response/weak_ref.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#ifndef AWS_MQTT_PRIVATE_REQUEST_RESPONSE_WEAK_REF_H
#define AWS_MQTT_PRIVATE_REQUEST_RESPONSE_WEAK_REF_H

/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#include <aws/mqtt/exports.h>

#include <aws/common/common.h>

/*
* This is a simplification of the notion of a weak reference particular to the needs of the request-response
* MQTT service clients. This is not suitable for general use but could be extended
* for general use in the future. Until then, it stays private, here.
*
* This weak reference is a ref-counted object with an opaque value. The opaque value may be cleared or
* queried. These two operations *do not* provide any thread safety.
*
* The primary use is to allow one object to safely use asynchronous callback-driven APIs on a second object, despite
* the fact that the first object may get destroyed unpredictably. The two objects must be exclusive to a single
* event loop (because there's no thread safety or mutual exclusion on the opaque value held by the weak ref).
*
* The initial use is the request-response protocol adapter submitting operations to an MQTT client or an
* eventstream RPC connection. We use a single weak ref to the protocol adapter and zero its opaque value when
* the protocol adapter is destroyed. Operation callbacks that subsequently resolve can then short circuit and do
* nothing rather than call into garbage and crash.
*
* We use this rather than explicitly tracking and zeroing all pending operations (like the 3-to-5 adapter does)
* because this approach is simpler and our usage does not require any of these callbacks to be invoked once the
* request-response client is destroyed.
*/
struct aws_weak_ref;

AWS_EXTERN_C_BEGIN

/*
* Creates a new weak reference to an opaque value.
*/
AWS_MQTT_API struct aws_weak_ref *aws_weak_ref_new(struct aws_allocator *allocator, void *referenced);

/*
* Acquires a reference to the weak ref object.
*/
AWS_MQTT_API struct aws_weak_ref *aws_weak_ref_acquire(struct aws_weak_ref *weak_ref);

/*
* Removes a reference to the weak ref object. When the last reference is removed, the weak ref object will be
* destroyed. This has no effect on the opaque value held by the weak ref.
*/
AWS_MQTT_API struct aws_weak_ref *aws_weak_ref_release(struct aws_weak_ref *weak_ref);

/*
* Gets the current value of the opaque data held by the weak ref.
*/
AWS_MQTT_API void *aws_weak_ref_get_reference(struct aws_weak_ref *weak_ref);

/*
* Clears the opaque data held by the weak ref.
*/
AWS_MQTT_API void aws_weak_ref_zero_reference(struct aws_weak_ref *weak_ref);

AWS_EXTERN_C_END

#endif /* AWS_MQTT_PRIVATE_REQUEST_RESPONSE_WEAK_REF_H */
3 changes: 3 additions & 0 deletions include/aws/mqtt/v5/mqtt5_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ struct aws_mqtt5_publish_completion_options {
aws_mqtt5_publish_completion_fn *completion_callback;
void *completion_user_data;

/** Overrides the client's ack timeout with this value, for this operation only */
uint32_t ack_timeout_seconds_override;
};

Expand All @@ -351,6 +352,7 @@ struct aws_mqtt5_subscribe_completion_options {
aws_mqtt5_subscribe_completion_fn *completion_callback;
void *completion_user_data;

/** Overrides the client's ack timeout with this value, for this operation only */
uint32_t ack_timeout_seconds_override;
};

Expand All @@ -361,6 +363,7 @@ struct aws_mqtt5_unsubscribe_completion_options {
aws_mqtt5_unsubscribe_completion_fn *completion_callback;
void *completion_user_data;

/** Overrides the client's ack timeout with this value, for this operation only */
uint32_t ack_timeout_seconds_override;
};

Expand Down
7 changes: 7 additions & 0 deletions source/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -3220,6 +3220,12 @@ static void s_aws_mqtt_client_connection_311_release(void *impl) {
aws_ref_count_release(&connection->ref_count);
}

enum aws_mqtt311_impl_type s_aws_mqtt_client_connection_3_get_impl(void *impl) {
(void)impl;

return AWS_MQTT311_IT_311_CONNECTION;
}

static struct aws_mqtt_client_connection_vtable s_aws_mqtt_client_connection_311_vtable = {
.acquire_fn = s_aws_mqtt_client_connection_311_acquire,
.release_fn = s_aws_mqtt_client_connection_311_release,
Expand All @@ -3243,6 +3249,7 @@ static struct aws_mqtt_client_connection_vtable s_aws_mqtt_client_connection_311
.unsubscribe_fn = s_aws_mqtt_client_connection_311_unsubscribe,
.publish_fn = s_aws_mqtt_client_connection_311_publish,
.get_stats_fn = s_aws_mqtt_client_connection_311_get_stats,
.get_impl_type = s_aws_mqtt_client_connection_3_get_impl,
};

static struct aws_mqtt_client_connection_vtable *s_aws_mqtt_client_connection_311_vtable_ptr =
Expand Down
4 changes: 4 additions & 0 deletions source/client_impl_shared.c
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ int aws_mqtt_client_connection_get_stats(
return (*connection->vtable->get_stats_fn)(connection->impl, stats);
}

enum aws_mqtt311_impl_type aws_mqtt_client_connection_get_impl_type(struct aws_mqtt_client_connection *connection) {
return (*connection->vtable->get_impl_type)(connection->impl);
}

uint64_t aws_mqtt_hash_uint16_t(const void *item) {
return *(uint16_t *)item;
}
Expand Down
Loading