Skip to content

Commit

Permalink
refactor(storage/socket): Serialize reports with the json-c library
Browse files Browse the repository at this point in the history
  • Loading branch information
gfieni committed Apr 24, 2024
1 parent 1395f2f commit 8b3d30a
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 50 deletions.
79 changes: 36 additions & 43 deletions src/storage_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
#include <sys/socket.h>
#include <sys/uio.h>
#include <netdb.h>
#include <bson.h>
#include <json.h>

#include "perf.h"
#include "report.h"
Expand Down Expand Up @@ -87,7 +87,7 @@ socket_resolve_and_connect(struct socket_context *ctx)
snprintf(port_str, PORT_STR_BUFFER_SIZE, "%d", ctx->config.port);

if (getaddrinfo(ctx->config.address, port_str, &hints, &result)) {
zsys_error("socket: unable to resolve address: %s", ctx->config.address);
zsys_error("socket: Unable to resolve address: %s", ctx->config.address);
goto error_no_getaddrinfo;
}

Expand All @@ -99,7 +99,7 @@ socket_resolve_and_connect(struct socket_context *ctx)

ret = connect(sfd, rp->ai_addr, rp->ai_addrlen);
if (!ret) {
zsys_info("socket: successfully connected to %s:%d", ctx->config.address, ctx->config.port);
zsys_info("socket: Successfully connected to %s:%d", ctx->config.address, ctx->config.port);
break;
}

Expand All @@ -108,7 +108,7 @@ socket_resolve_and_connect(struct socket_context *ctx)

/* no connection have been established */
if (ret == -1) {
zsys_error("socket: failed to connect to %s:%d", ctx->config.address, ctx->config.port);
zsys_error("socket: Failed to connect to %s:%d", ctx->config.address, ctx->config.port);
goto error_not_connected;
}

Expand Down Expand Up @@ -164,13 +164,13 @@ socket_try_reconnect(struct socket_context *ctx)
ctx->retry_backoff_time = ctx->retry_backoff_time * 2 + (nbrand != -1 ? rand_jitter % 10 : 0);
}

zsys_error("socket: failed to reconnect, next try will be in %d seconds", ctx->retry_backoff_time);
zsys_error("socket: Failed to reconnect, next try will be in %d seconds", ctx->retry_backoff_time);
return -1;
} else {
ctx->last_retry_time = 0;
ctx->retry_backoff_time = 1;

zsys_info("socket: connection recovered, resuming operation");
zsys_info("socket: Connection recovered, resuming operation");
return 0;
}
}
Expand All @@ -182,22 +182,20 @@ static int
socket_store_report(struct storage_module *module, struct payload *payload)
{
struct socket_context *ctx = module->context;
bson_t document = BSON_INITIALIZER;
char timestamp_str[TIMESTAMP_STR_BUFFER_SIZE] = {0};
bson_t doc_groups;
struct json_object *jobj = NULL;
struct json_object *jobj_groups = NULL;
struct payload_group_data *group_data = NULL;
const char *group_name = NULL;
bson_t doc_group;
struct json_object *jobj_group = NULL;
struct payload_pkg_data *pkg_data = NULL;
const char *pkg_id = NULL;
bson_t doc_pkg;
struct json_object *jobj_pkg = NULL;
struct payload_cpu_data *cpu_data = NULL;
const char *cpu_id = NULL;
bson_t doc_cpu;
struct json_object *jobj_cpu = NULL;
const char *event_name = NULL;
uint64_t *event_value = NULL;
char *json_report = NULL;
size_t json_report_length = 0;
const char *json_report = NULL;
struct iovec socket_iov[2] = {0};
ssize_t nbsend;
int retry_once = 1;
Expand All @@ -211,7 +209,7 @@ socket_store_report(struct storage_module *module, struct payload *payload)

/*
* {
* "timestamp": "1529868713854",
* "timestamp": 1529868713854,
* "sensor": "test.cluster.lan",
* "target": "example",
* "groups": {
Expand All @@ -220,7 +218,7 @@ socket_store_report(struct storage_module *module, struct payload *payload)
* "cpu_id": {
* "time_enabled": 12345,
* "time_running": 12345,
* "event_name": 123456789.0,
* "event_name": 1234567890,
* more events...
* },
* more cpus...
Expand All @@ -231,52 +229,49 @@ socket_store_report(struct storage_module *module, struct payload *payload)
* }
* }
*/
snprintf(timestamp_str, TIMESTAMP_STR_BUFFER_SIZE, "%" PRIu64, payload->timestamp);
BSON_APPEND_UTF8(&document, "timestamp", timestamp_str);
jobj = json_object_new_object();

BSON_APPEND_UTF8(&document, "sensor", ctx->config.sensor_name);
BSON_APPEND_UTF8(&document, "target", payload->target_name);
json_object_object_add(jobj, "timestamp", json_object_new_uint64(payload->timestamp));
json_object_object_add(jobj, "sensor", json_object_new_string(ctx->config.sensor_name));
json_object_object_add(jobj, "target", json_object_new_string(payload->target_name));

BSON_APPEND_DOCUMENT_BEGIN(&document, "groups", &doc_groups);
jobj_groups = json_object_new_object();
json_object_object_add(jobj, "groups", jobj_groups);
for (group_data = zhashx_first(payload->groups); group_data; group_data = zhashx_next(payload->groups)) {
group_name = zhashx_cursor(payload->groups);
BSON_APPEND_DOCUMENT_BEGIN(&doc_groups, group_name, &doc_group);

jobj_group = json_object_new_object();
group_name = zhashx_cursor(payload->groups);
json_object_object_add(jobj_groups, group_name, jobj_group);
for (pkg_data = zhashx_first(group_data->pkgs); pkg_data; pkg_data = zhashx_next(group_data->pkgs)) {
pkg_id = zhashx_cursor(group_data->pkgs);
BSON_APPEND_DOCUMENT_BEGIN(&doc_group, pkg_id, &doc_pkg);

jobj_pkg = json_object_new_object();
pkg_id = zhashx_cursor(group_data->pkgs);
json_object_object_add(jobj_group, pkg_id, jobj_pkg);
for (cpu_data = zhashx_first(pkg_data->cpus); cpu_data; cpu_data = zhashx_next(pkg_data->cpus)) {
jobj_cpu = json_object_new_object();
cpu_id = zhashx_cursor(pkg_data->cpus);
BSON_APPEND_DOCUMENT_BEGIN(&doc_pkg, cpu_id, &doc_cpu);
json_object_object_add(jobj_pkg, cpu_id, jobj_cpu);

for (event_value = zhashx_first(cpu_data->events); event_value; event_value = zhashx_next(cpu_data->events)) {
event_name = zhashx_cursor(cpu_data->events);
BSON_APPEND_DOUBLE(&doc_cpu, event_name, *event_value);
json_object_object_add(jobj_cpu, event_name, json_object_new_uint64(*event_value));
}

bson_append_document_end(&doc_pkg, &doc_cpu);
}

bson_append_document_end(&doc_group, &doc_pkg);
}

bson_append_document_end(&doc_groups, &doc_group);
}
bson_append_document_end(&document, &doc_groups);

json_report = bson_as_json(&document, &json_report_length);
json_report = json_object_to_json_string_ext(jobj, JSON_C_TO_STRING_PLAIN);
if (json_report == NULL) {
zsys_error("socket: failed to convert report to json string");
zsys_error("socket: Failed to convert report to json string");
goto error_bson_to_json;
}

/*
* PowerAPI socketdb requires a newline character at the end of the json document.
* Using POSIX IOV allows to efficiently append it at the end of the json string.
*/
socket_iov[0].iov_base = json_report;
socket_iov[0].iov_len = json_report_length;
socket_iov[0].iov_base = (void *) json_report;
socket_iov[0].iov_len = strlen(json_report);
socket_iov[1].iov_base = "\n";
socket_iov[1].iov_len = 1;

Expand All @@ -289,10 +284,10 @@ socket_store_report(struct storage_module *module, struct payload *payload)
errno = 0;
nbsend = writev(ctx->socket_fd, socket_iov, 2);
if (nbsend == -1) {
zsys_error("socket: sending the report failed with error: %s", strerror(errno));
zsys_error("socket: Sending the report failed with error: %s", strerror(errno));

if (retry_once) {
zsys_info("socket: connection has been lost, attempting to reconnect...");
zsys_info("socket: Connection has been lost, attempting to reconnect...");
if (!socket_try_reconnect(ctx))
continue;
}
Expand All @@ -304,9 +299,8 @@ socket_store_report(struct storage_module *module, struct payload *payload)
ret = 0;

error_socket_disconnected:
bson_free(json_report);
error_bson_to_json:
bson_destroy(&document);
json_object_put(jobj);
return ret;
}

Expand Down Expand Up @@ -363,4 +357,3 @@ storage_socket_create(struct config *config)
free(module);
return NULL;
}

7 changes: 0 additions & 7 deletions src/storage_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,6 @@
*/
#define PORT_STR_BUFFER_SIZE 8

/*
* TIMESTAMP_STR_BUFFER_SIZE stores the maximum lenght of the buffer used to convert
* the timestamp stored as uint64_t to a null terminated string.
* It should have enough space to store all digits of UINT64_MAX and the null character.
*/
#define TIMESTAMP_STR_BUFFER_SIZE 32

/*
* MAX_DURATION_CONNECTION_RETRY stores the maximal value of a connection retry. (in seconds)
*/
Expand Down

0 comments on commit 8b3d30a

Please sign in to comment.