Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(storage/socket): Serialize reports with the json-c library #62

Merged
merged 2 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
- name: Install dependencies
run: |
sudo apt-get update
sudo apt-get install -y libczmq-dev libpfm4-dev libmongoc-dev
sudo apt-get install -y libczmq-dev libpfm4-dev libjson-c-dev libmongoc-dev

- name: Configure CMake
run: cmake -B ${{github.workspace}}/build -DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}} -DCMAKE_C_COMPILER=${{matrix.compiler}} -DCMAKE_C_CLANG_TIDY=clang-tidy
Expand Down
5 changes: 3 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ set(SENSOR_SOURCES
find_package(LibPFM REQUIRED)
find_package(PkgConfig)
pkg_check_modules(CZMQ REQUIRED libczmq)
pkg_check_modules(JSONC REQUIRED json-c)

if(WITH_MONGODB)
pkg_check_modules(MONGOC REQUIRED libmongoc-1.0)
Expand All @@ -48,5 +49,5 @@ if(DEFINED ENV{GIT_TAG} AND DEFINED ENV{GIT_REV})
endif()

add_executable(hwpc-sensor "${SENSOR_SOURCES}")
target_include_directories(hwpc-sensor SYSTEM PRIVATE "${LIBPFM_INCLUDE_DIRS}" "${CZMQ_INCLUDE_DIRS}" "${MONGOC_INCLUDE_DIRS}")
target_link_libraries(hwpc-sensor "${LIBPFM_LIBRARIES}" "${CZMQ_LIBRARIES}" "${MONGOC_LIBRARIES}")
target_include_directories(hwpc-sensor SYSTEM PRIVATE "${LIBPFM_INCLUDE_DIRS}" "${CZMQ_INCLUDE_DIRS}" "${JSONC_INCLUDE_DIRS}" "${MONGOC_INCLUDE_DIRS}")
target_link_libraries(hwpc-sensor "${LIBPFM_LIBRARIES}" "${CZMQ_LIBRARIES}" "${JSONC_LIBRARIES}" "${MONGOC_LIBRARIES}")
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ ENV DEBIAN_FRONTEND=noninteractive
ARG BUILD_TYPE=Debug
ARG MONGODB_SUPPORT=ON
RUN apt update && \
apt install -y build-essential git clang-tidy cmake pkg-config libczmq-dev libsystemd-dev uuid-dev && \
apt install -y build-essential git clang-tidy cmake pkg-config libczmq-dev libjson-c-dev libsystemd-dev uuid-dev && \
echo "${MONGODB_SUPPORT}" |grep -iq "on" && apt install -y libmongoc-dev || true
COPY --from=libpfm-builder /root/libpfm4*.deb /tmp/
RUN dpkg -i /tmp/libpfm4_*.deb /tmp/libpfm4-dev_*.deb && \
Expand All @@ -34,7 +34,7 @@ ARG MONGODB_SUPPORT=ON
ARG FILE_CAPABILITY=CAP_SYS_ADMIN
RUN useradd -d /opt/powerapi -m powerapi && \
apt update && \
apt install -y libczmq4 libcap2-bin && \
apt install -y libczmq4 libjson-c5 libcap2-bin && \
echo "${MONGODB_SUPPORT}" |grep -iq "on" && apt install -y libmongoc-1.0-0 || true && \
echo "${BUILD_TYPE}" |grep -iq "debug" && apt install -y libasan6 libubsan1 || true && \
rm -rf /var/lib/apt/lists/*
Expand Down
80 changes: 37 additions & 43 deletions src/storage_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
#include <sys/socket.h>
#include <sys/uio.h>
#include <netdb.h>
#include <bson.h>
#include <json.h>

#include "perf.h"
#include "report.h"
Expand Down Expand Up @@ -87,7 +87,7 @@ socket_resolve_and_connect(struct socket_context *ctx)
snprintf(port_str, PORT_STR_BUFFER_SIZE, "%d", ctx->config.port);

if (getaddrinfo(ctx->config.address, port_str, &hints, &result)) {
zsys_error("socket: unable to resolve address: %s", ctx->config.address);
zsys_error("socket: Unable to resolve address: %s", ctx->config.address);
goto error_no_getaddrinfo;
}

Expand All @@ -99,7 +99,7 @@ socket_resolve_and_connect(struct socket_context *ctx)

ret = connect(sfd, rp->ai_addr, rp->ai_addrlen);
if (!ret) {
zsys_info("socket: successfully connected to %s:%d", ctx->config.address, ctx->config.port);
zsys_info("socket: Successfully connected to %s:%d", ctx->config.address, ctx->config.port);
break;
}

Expand All @@ -108,7 +108,7 @@ socket_resolve_and_connect(struct socket_context *ctx)

/* no connection have been established */
if (ret == -1) {
zsys_error("socket: failed to connect to %s:%d", ctx->config.address, ctx->config.port);
zsys_error("socket: Failed to connect to %s:%d", ctx->config.address, ctx->config.port);
goto error_not_connected;
}

Expand Down Expand Up @@ -164,13 +164,13 @@ socket_try_reconnect(struct socket_context *ctx)
ctx->retry_backoff_time = ctx->retry_backoff_time * 2 + (nbrand != -1 ? rand_jitter % 10 : 0);
}

zsys_error("socket: failed to reconnect, next try will be in %d seconds", ctx->retry_backoff_time);
zsys_error("socket: Failed to reconnect, next try will be in %d seconds", ctx->retry_backoff_time);
return -1;
} else {
ctx->last_retry_time = 0;
ctx->retry_backoff_time = 1;

zsys_info("socket: connection recovered, resuming operation");
zsys_info("socket: Connection recovered, resuming operation");
return 0;
}
}
Expand All @@ -182,21 +182,20 @@ static int
socket_store_report(struct storage_module *module, struct payload *payload)
{
struct socket_context *ctx = module->context;
bson_t document = BSON_INITIALIZER;
char timestamp_str[TIMESTAMP_STR_BUFFER_SIZE] = {0};
bson_t doc_groups;
struct json_object *jobj = NULL;
struct json_object *jobj_groups = NULL;
struct payload_group_data *group_data = NULL;
const char *group_name = NULL;
bson_t doc_group;
struct json_object *jobj_group = NULL;
struct payload_pkg_data *pkg_data = NULL;
const char *pkg_id = NULL;
bson_t doc_pkg;
struct json_object *jobj_pkg = NULL;
struct payload_cpu_data *cpu_data = NULL;
const char *cpu_id = NULL;
bson_t doc_cpu;
struct json_object *jobj_cpu = NULL;
const char *event_name = NULL;
uint64_t *event_value = NULL;
char *json_report = NULL;
const char *json_report = NULL;
size_t json_report_length = 0;
struct iovec socket_iov[2] = {0};
ssize_t nbsend;
Expand All @@ -211,7 +210,7 @@ socket_store_report(struct storage_module *module, struct payload *payload)

/*
* {
* "timestamp": "1529868713854",
* "timestamp": 1529868713854,
* "sensor": "test.cluster.lan",
* "target": "example",
* "groups": {
Expand All @@ -220,7 +219,7 @@ socket_store_report(struct storage_module *module, struct payload *payload)
* "cpu_id": {
* "time_enabled": 12345,
* "time_running": 12345,
* "event_name": 123456789.0,
* "event_name": 1234567890,
* more events...
* },
* more cpus...
Expand All @@ -231,51 +230,48 @@ socket_store_report(struct storage_module *module, struct payload *payload)
* }
* }
*/
snprintf(timestamp_str, TIMESTAMP_STR_BUFFER_SIZE, "%" PRIu64, payload->timestamp);
BSON_APPEND_UTF8(&document, "timestamp", timestamp_str);
jobj = json_object_new_object();

BSON_APPEND_UTF8(&document, "sensor", ctx->config.sensor_name);
BSON_APPEND_UTF8(&document, "target", payload->target_name);
json_object_object_add(jobj, "timestamp", json_object_new_uint64(payload->timestamp));
json_object_object_add(jobj, "sensor", json_object_new_string(ctx->config.sensor_name));
json_object_object_add(jobj, "target", json_object_new_string(payload->target_name));

BSON_APPEND_DOCUMENT_BEGIN(&document, "groups", &doc_groups);
jobj_groups = json_object_new_object();
json_object_object_add(jobj, "groups", jobj_groups);
for (group_data = zhashx_first(payload->groups); group_data; group_data = zhashx_next(payload->groups)) {
group_name = zhashx_cursor(payload->groups);
BSON_APPEND_DOCUMENT_BEGIN(&doc_groups, group_name, &doc_group);

jobj_group = json_object_new_object();
group_name = zhashx_cursor(payload->groups);
json_object_object_add(jobj_groups, group_name, jobj_group);
for (pkg_data = zhashx_first(group_data->pkgs); pkg_data; pkg_data = zhashx_next(group_data->pkgs)) {
pkg_id = zhashx_cursor(group_data->pkgs);
BSON_APPEND_DOCUMENT_BEGIN(&doc_group, pkg_id, &doc_pkg);

jobj_pkg = json_object_new_object();
pkg_id = zhashx_cursor(group_data->pkgs);
json_object_object_add(jobj_group, pkg_id, jobj_pkg);
for (cpu_data = zhashx_first(pkg_data->cpus); cpu_data; cpu_data = zhashx_next(pkg_data->cpus)) {
jobj_cpu = json_object_new_object();
cpu_id = zhashx_cursor(pkg_data->cpus);
BSON_APPEND_DOCUMENT_BEGIN(&doc_pkg, cpu_id, &doc_cpu);
json_object_object_add(jobj_pkg, cpu_id, jobj_cpu);

for (event_value = zhashx_first(cpu_data->events); event_value; event_value = zhashx_next(cpu_data->events)) {
event_name = zhashx_cursor(cpu_data->events);
BSON_APPEND_DOUBLE(&doc_cpu, event_name, *event_value);
json_object_object_add(jobj_cpu, event_name, json_object_new_uint64(*event_value));
}

bson_append_document_end(&doc_pkg, &doc_cpu);
}

bson_append_document_end(&doc_group, &doc_pkg);
}

bson_append_document_end(&doc_groups, &doc_group);
}
bson_append_document_end(&document, &doc_groups);

json_report = bson_as_json(&document, &json_report_length);
json_report = json_object_to_json_string_length(jobj, JSON_C_TO_STRING_PLAIN, &json_report_length);
if (json_report == NULL) {
zsys_error("socket: failed to convert report to json string");
goto error_bson_to_json;
zsys_error("socket: Failed to convert report to json string");
goto error_json_to_string;
}

/*
* PowerAPI socketdb requires a newline character at the end of the json document.
* Using POSIX IOV allows to efficiently append it at the end of the json string.
*/
socket_iov[0].iov_base = json_report;
socket_iov[0].iov_base = (void *) json_report;
socket_iov[0].iov_len = json_report_length;
socket_iov[1].iov_base = "\n";
socket_iov[1].iov_len = 1;
Expand All @@ -289,10 +285,10 @@ socket_store_report(struct storage_module *module, struct payload *payload)
errno = 0;
nbsend = writev(ctx->socket_fd, socket_iov, 2);
if (nbsend == -1) {
zsys_error("socket: sending the report failed with error: %s", strerror(errno));
zsys_error("socket: Sending the report failed with error: %s", strerror(errno));

if (retry_once) {
zsys_info("socket: connection has been lost, attempting to reconnect...");
zsys_info("socket: Connection has been lost, attempting to reconnect...");
if (!socket_try_reconnect(ctx))
continue;
}
Expand All @@ -303,10 +299,9 @@ socket_store_report(struct storage_module *module, struct payload *payload)

ret = 0;

error_json_to_string:
error_socket_disconnected:
bson_free(json_report);
error_bson_to_json:
bson_destroy(&document);
json_object_put(jobj);
return ret;
}

Expand Down Expand Up @@ -363,4 +358,3 @@ storage_socket_create(struct config *config)
free(module);
return NULL;
}

7 changes: 0 additions & 7 deletions src/storage_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,6 @@
*/
#define PORT_STR_BUFFER_SIZE 8

/*
* TIMESTAMP_STR_BUFFER_SIZE stores the maximum lenght of the buffer used to convert
* the timestamp stored as uint64_t to a null terminated string.
* It should have enough space to store all digits of UINT64_MAX and the null character.
*/
#define TIMESTAMP_STR_BUFFER_SIZE 32

/*
* MAX_DURATION_CONNECTION_RETRY stores the maximal value of a connection retry. (in seconds)
*/
Expand Down