-
Notifications
You must be signed in to change notification settings - Fork 38
/
Copy pathmain.c
222 lines (180 loc) · 6.75 KB
/
main.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
/* Copyright (c) Microsoft Corporation. All rights reserved.
Licensed under the MIT License. */
#include <applibs/log.h>
#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include "eventgrid_config.h"
#include "eventloop_timer_utilities.h"
#include "exitcodes.h"
#include "mqtt_connection.h"
#include "mqtt.h"
#include "options.h"
static volatile sig_atomic_t exitCode = ExitCode_Success;
// Timer / polling
static EventLoop *eventLoop = NULL;
static EventLoopTimer *publishMessageTimer = NULL;
static void PublishMessageTimerHandler(EventLoopTimer *eventLoopTimer);
static void ExitCodeCallbackHandler(ExitCode ec);
static void UpdateTelemetry(char *mqttMessageToPublish, size_t messageBufferSize);
static void PublishCallback(void **unused, struct mqtt_response_publish *published);
static bool BuildUtcDateTimeString(char *outputBuffer, size_t outputBufferSize, time_t t);
static MQTT_MESSAGE mqtt_msg;
static MQTT_Context mqttContext = {.hostname = NULL,
.port = "8883",
.publishTopic = EVENT_GRID_PUBLISH_TOPIC,
.subscribeTopic = EVENT_GRID_SUBSCRIBE_TOPIC,
.ca_cert = EVENT_GRID_CA_CERTIFICATE,
.messageSize = 128,
.messageQOS = MQTT_PUBLISH_QOS_1,
.topicSize = 256 };
/// <summary>
/// This function publishes a message to Azure Event Grid on configured topic
/// </summary>
static void PublishMessageTimerHandler(EventLoopTimer *eventLoopTimer)
{
if (ConsumeEventLoopTimerEvent(eventLoopTimer) != 0) {
exitCode = ExitCode_PublishMessageTimer_Consume;
return;
}
UpdateTelemetry(mqtt_msg.message, sizeof(mqtt_msg.message));
mqtt_msg.message_length = strnlen(mqtt_msg.message, sizeof(mqtt_msg.message));
SendTelemetry(mqtt_msg.message, mqtt_msg.message_length, GetPublishTopicName());
}
/// <summary>
/// This function is called when the device receives a new message from Azure Event Grid
/// </summary>
static void PublishCallback(void **unused, struct mqtt_response_publish *published)
{
char *message = NULL;
// Print the message received
message = (char *)malloc(published->application_message_size + 1);
if (!message) {
Log_Debug("Error: Cannot print received message. Memory allocation failed.\n");
return;
}
memset(message, 0x00, published->application_message_size + 1);
memcpy(message, published->application_message, published->application_message_size);
Log_Debug("Message Received: %s", message);
free(message);
}
/// <summary>
/// Signal handler for termination requests. This handler must be async-signal-safe.
/// </summary>
static void TerminationHandler(int signalNumber)
{
// Don't use Log_Debug here, as it is not guaranteed to be async-signal-safe.
exitCode = ExitCode_TermHandler_SigTerm;
}
static void ExitCodeCallbackHandler(ExitCode ec)
{
exitCode = ec;
}
/// <summary>
/// Add date and time and simulated temperature to the message that is to be published.
/// </summary>
static void UpdateTelemetry(char *mqttMessageToPublish, size_t messageBufferSize)
{
static float temperature = 50.f;
char dateTimeBuffer[DATETIME_BUFFER_SIZE] = {0};
if (!mqttMessageToPublish || !messageBufferSize) {
Log_Debug("Error: Publish message buffer is null or empty.\n");
return;
}
memset(mqttMessageToPublish, 0, messageBufferSize);
time_t now;
time(&now);
if (now != -1) {
BuildUtcDateTimeString(dateTimeBuffer, sizeof(dateTimeBuffer), now);
}
// Generate a simulated temperature.
float delta = ((float)(rand() % 41)) / 20.0f - 1.0f; // between -1.0 and +1.0
temperature += delta;
snprintf(mqttMessageToPublish, messageBufferSize, "%s: Temperature %f\n",
dateTimeBuffer,
temperature);
}
/// <summary>
/// Helper function to build the UTC date time string.
/// </summary>
static bool BuildUtcDateTimeString(char *outputBuffer, size_t outputBufferSize, time_t t)
{
// Format string to create an ISO 8601 time. This corresponds to the DTDL datetime schema item.
static const char *ISO8601Format = "%Y-%m-%dT%H:%M:%SZ";
bool result;
struct tm *currentTimeTm;
currentTimeTm = gmtime(&t);
if (strftime(outputBuffer, outputBufferSize, ISO8601Format, currentTimeTm) == 0) {
Log_Debug("ERROR: strftime: %s (%d)\n", errno, strerror(errno));
result = false;
} else {
result = true;
}
return result;
}
/// <summary>
/// Initialize peripherals, device twins, direct methods, timers.
/// </summary>
static ExitCode InitPeripheralsAndHandlers(void)
{
struct sigaction action;
memset(&action, 0, sizeof(struct sigaction));
action.sa_handler = TerminationHandler;
sigaction(SIGTERM, &action, NULL);
eventLoop = EventLoop_Create();
if (eventLoop == NULL) {
Log_Debug("Could not create event loop.\n");
return ExitCode_Init_EventLoop;
}
// Publish telemetry message every one second.
struct timespec publishMessagePeriod = {.tv_sec = 1, .tv_nsec = 0};
publishMessageTimer =
CreateEventLoopPeriodicTimer(eventLoop, &PublishMessageTimerHandler,
&publishMessagePeriod);
if (publishMessageTimer == NULL) {
return ExitCode_Init_PublishMessageTimer;
}
mqttContext.hostname = Options_GetAzureEventGridHostname();
int ret = InitializeMqtt(eventLoop, PublishCallback, ExitCodeCallbackHandler,
&mqttContext);
if (ret != ExitCode_Success) {
return ret;
}
// Create timers needed for MQTT connection.
CreateMqttTimers();
return ExitCode_Success;
}
/// <summary>
/// Close peripherals and handlers.
/// </summary>
static void ClosePeripheralsAndHandlers(void)
{
DisconnectMqtt();
DisposeMqttTimers();
DisposeEventLoopTimer(publishMessageTimer);
EventLoop_Close(eventLoop);
}
int main(int argc, char *argv[])
{
Log_Debug("Azure Event Grid Application starting.\n");
exitCode = Options_ParseArgs(argc, argv);
if (exitCode != ExitCode_Success) {
return exitCode;
}
exitCode = InitPeripheralsAndHandlers();
if (exitCode == ExitCode_Success) {
ConnectMqtt();
}
// Main loop
while (exitCode == ExitCode_Success) {
EventLoop_Run_Result result = EventLoop_Run(eventLoop, -1, true);
// Continue if interrupted by signal, e.g. due to breakpoint being set.
if (result == EventLoop_Run_Failed && errno != EINTR) {
exitCode = ExitCode_Main_EventLoopFail;
}
}
ClosePeripheralsAndHandlers();
Log_Debug("Application exiting.\n");
return exitCode;
}