Skip to content

Commit

Permalink
Write heartbeat thread output to safe crash log (#155)
Browse files Browse the repository at this point in the history
* Add `jl_inside_heartbeat_thread()`

To allow `jl_safe_printf()` to determine whether it's being called
from the heartbeat thread.

* Write heartbeat thread output to the safe crash log

In `jl_safe_printf()`, we're already writing to the safe crash log
if we're in signal handler context (in addition to writing to
`stderr`). Now we do the same if we're in the heartbeat thread.

* Refactor JSON printing code

Concurrent `write()` calls to the same file descriptor should be
thread-safe, but can result in interleaving. Refactor the JSON
printing code used from `jl_safe_printf()` to assemble the message
into a buffer and use a single `write()` call.
  • Loading branch information
kpamnany authored and Drvi committed Jun 7, 2024
1 parent ff2ff0d commit f0e042d
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 31 deletions.
83 changes: 55 additions & 28 deletions src/jl_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -678,56 +678,83 @@ JL_DLLEXPORT int jl_printf(uv_stream_t *s, const char *format, ...)
return c;
}

STATIC_INLINE void print_error_msg_as_json(char *buf) JL_NOTSAFEPOINT
STATIC_INLINE int copystp(char *dest, const char *src)
{
// Our telemetry on SPCS expects a JSON object per line
// The following lines prepare the timestamp string and the JSON object
char *d = stpcpy(dest, src);
return (int)(d - dest);
}

// RAI-specific
STATIC_INLINE void write_to_safe_crash_log(char *buf) JL_NOTSAFEPOINT
{
int buflen = strlen(buf);
// Our telemetry on SPCS expects a JSON object per line.
// We ignore write failures because there is nothing we can do.
// We'll use a 2K byte buffer: 69 bytes for JSON message decorations,
// 1 byte for the terminating NUL character, and 3 bytes for an
// ellipsis if we have to truncate the message leaves `max_b` bytes
// for the message.
const int wbuflen = 2048;
const int max_b = wbuflen - 70 - 3;
char wbuf[wbuflen];
bzero(wbuf, wbuflen);
int wlen = 0;

// JSON preamble (32 bytes)
wlen += copystp(&wbuf[wlen], "\n{\"level\":\"Error\", \"timestamp\":\"");

// Timestamp (19 bytes)
struct timeval tv;
struct tm* tm_info;
char timestamp_buffer[50];
// Get current time
gettimeofday(&tv, NULL);
tm_info = gmtime(&tv.tv_sec);
// Format time
int offset = strftime(timestamp_buffer, 25, "%Y-%m-%dT%H:%M:%S", tm_info);
// Append milliseconds
snprintf(timestamp_buffer + offset, 25, ".%03d", tv.tv_usec / 1000);
const char *json_preamble_p1 = "\n{\"level\":\"Error\", \"timestamp\":\"";
const char *json_preamble_p2 = "\", \"message\": \"";
const char *json_postamble = "\"}\n";
// Ignore write failures because there is nothing we can do
write(jl_sig_fd, json_preamble_p1, strlen(json_preamble_p1));
write(jl_sig_fd, timestamp_buffer, strlen(timestamp_buffer));
write(jl_sig_fd, json_preamble_p2, strlen(json_preamble_p2));
// JSON escape the input string
for(size_t i = 0; i < strlen(buf); i += 1) {
wlen += strftime(&wbuf[wlen], 42, "%Y-%m-%dT%H:%M:%S", tm_info);
sprintf(&wbuf[wlen], ".%03ld", (long)tv.tv_usec / 1000);
wlen += 4;

// JSON preamble to message (15 bytes)
wlen += copystp(&wbuf[wlen], "\", \"message\": \"");

// Message
// Each iteration will advance wlen by 1 or 2
for (size_t i = 0; i < buflen; i++) {
// Truncate the message if the write buffer is full
if (wlen == max_b || wlen == max_b - 1) {
wlen += copystp(&wbuf[wlen], "...");
break;
}
switch (buf[i]) {
case '"':
write(jl_sig_fd, "\\\"", 2);
wlen += copystp(&wbuf[wlen], "\\\"");
break;
case '\b':
write(jl_sig_fd, "\\b", 2);
wlen += copystp(&wbuf[wlen], "\\b");
break;
case '\n':
write(jl_sig_fd, "\\n", 2);
wlen += copystp(&wbuf[wlen], "\\n");
break;
case '\r':
write(jl_sig_fd, "\\r", 2);
wlen += copystp(&wbuf[wlen], "\\r");
break;
case '\t':
write(jl_sig_fd, "\\t", 2);
wlen += copystp(&wbuf[wlen], "\\t");
break;
case '\\':
write(jl_sig_fd, "\\\\", 2);
wlen += copystp(&wbuf[wlen], "\\\\");
break;
default:
write(jl_sig_fd, buf + i, 1);
wbuf[wlen++] = buf[i];
break;
}
}
write(jl_sig_fd, json_postamble, strlen(json_postamble));
// JSON completion (3 bytes)
wlen += copystp(&wbuf[wlen], "\"}\n");
write(jl_sig_fd, wbuf, wlen);
fdatasync(jl_sig_fd);
}

extern int jl_inside_heartbeat_thread(void);

JL_DLLEXPORT void jl_safe_printf(const char *fmt, ...)
{
static char buf[1000];
Expand All @@ -747,8 +774,8 @@ JL_DLLEXPORT void jl_safe_printf(const char *fmt, ...)
// order is important here: we want to ensure that the threading infra
// has been initialized before we start trying to print to the
// safe crash log file
if (jl_sig_fd != 0 && jl_inside_signal_handler()) {
print_error_msg_as_json(buf);
if (jl_sig_fd != 0 && (jl_inside_signal_handler() || jl_inside_heartbeat_thread())) {
write_to_safe_crash_log(buf);
}
if (write(STDERR_FILENO, buf, strlen(buf)) < 0) {
// nothing we can do; ignore the failure
Expand Down
17 changes: 14 additions & 3 deletions src/threading.c
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,7 @@ JL_DLLEXPORT int jl_alignment(size_t sz)
#include <time.h>

volatile int heartbeat_enabled;
uv_thread_t heartbeat_uvtid;
uv_sem_t heartbeat_on_sem, // jl_heartbeat_enable -> thread
heartbeat_off_sem; // thread -> jl_heartbeat_enable
int heartbeat_interval_s,
Expand All @@ -965,12 +966,17 @@ void jl_heartbeat_threadfun(void *arg);
// start the heartbeat thread with heartbeats disabled
void jl_init_heartbeat(void)
{
uv_thread_t uvtid;
heartbeat_enabled = 0;
uv_sem_init(&heartbeat_on_sem, 0);
uv_sem_init(&heartbeat_off_sem, 0);
uv_thread_create(&uvtid, jl_heartbeat_threadfun, NULL);
uv_thread_detach(&uvtid);
uv_thread_create(&heartbeat_uvtid, jl_heartbeat_threadfun, NULL);
uv_thread_detach(&heartbeat_uvtid);
}

int jl_inside_heartbeat_thread(void)
{
uv_thread_t curr_uvtid = uv_thread_self();
return curr_uvtid == heartbeat_uvtid;
}

// enable/disable heartbeats
Expand Down Expand Up @@ -1143,6 +1149,11 @@ void jl_init_heartbeat(void)
{
}

int jl_inside_heartbeat_thread(void)
{
return 0;
}

JL_DLLEXPORT int jl_heartbeat_enable(int heartbeat_s, int show_tasks_after_n,
int reset_after_n)
{
Expand Down

0 comments on commit f0e042d

Please sign in to comment.