From 3f82711625dbb4a505880b90ad1a5b8fc2f2bbed Mon Sep 17 00:00:00 2001 From: Kiran Pamnany Date: Mon, 27 May 2024 15:39:36 -0400 Subject: [PATCH] Write heartbeat thread output to safe crash log (#155) * Add `jl_inside_heartbeat_thread()` To allow `jl_safe_printf()` to determine whether it's being called from the heartbeat thread. * Write heartbeat thread output to the safe crash log In `jl_safe_printf()`, we're already writing to the safe crash log if we're in signal handler context (in addition to writing to `stderr`). Now we do the same if we're in the heartbeat thread. * Refactor JSON printing code Concurrent `write()` calls to the same file descriptor should be thread-safe, but can result in interleaving. Refactor the JSON printing code used from `jl_safe_printf()` to assemble the message into a buffer and use a single `write()` call. --- src/jl_uv.c | 83 ++++++++++++++++++++++++++++++++----------------- src/threading.c | 17 ++++++++-- 2 files changed, 69 insertions(+), 31 deletions(-) diff --git a/src/jl_uv.c b/src/jl_uv.c index 5c51c3f8a6cf4..fd8718ebbea26 100644 --- a/src/jl_uv.c +++ b/src/jl_uv.c @@ -678,56 +678,83 @@ JL_DLLEXPORT int jl_printf(uv_stream_t *s, const char *format, ...) return c; } -STATIC_INLINE void print_error_msg_as_json(char *buf) JL_NOTSAFEPOINT +STATIC_INLINE int copystp(char *dest, const char *src) { - // Our telemetry on SPCS expects a JSON object per line - // The following lines prepare the timestamp string and the JSON object + char *d = stpcpy(dest, src); + return (int)(d - dest); +} + +// RAI-specific +STATIC_INLINE void write_to_safe_crash_log(char *buf) JL_NOTSAFEPOINT +{ + int buflen = strlen(buf); + // Our telemetry on SPCS expects a JSON object per line. + // We ignore write failures because there is nothing we can do. + // We'll use a 2K byte buffer: 69 bytes for JSON message decorations, + // 1 byte for the terminating NUL character, and 3 bytes for an + // ellipsis if we have to truncate the message leaves `max_b` bytes + // for the message. + const int wbuflen = 2048; + const int max_b = wbuflen - 70 - 3; + char wbuf[wbuflen]; + bzero(wbuf, wbuflen); + int wlen = 0; + + // JSON preamble (32 bytes) + wlen += copystp(&wbuf[wlen], "\n{\"level\":\"Error\", \"timestamp\":\""); + + // Timestamp (19 bytes) struct timeval tv; struct tm* tm_info; - char timestamp_buffer[50]; - // Get current time gettimeofday(&tv, NULL); tm_info = gmtime(&tv.tv_sec); - // Format time - int offset = strftime(timestamp_buffer, 25, "%Y-%m-%dT%H:%M:%S", tm_info); - // Append milliseconds - snprintf(timestamp_buffer + offset, 25, ".%03d", tv.tv_usec / 1000); - const char *json_preamble_p1 = "\n{\"level\":\"Error\", \"timestamp\":\""; - const char *json_preamble_p2 = "\", \"message\": \""; - const char *json_postamble = "\"}\n"; - // Ignore write failures because there is nothing we can do - write(jl_sig_fd, json_preamble_p1, strlen(json_preamble_p1)); - write(jl_sig_fd, timestamp_buffer, strlen(timestamp_buffer)); - write(jl_sig_fd, json_preamble_p2, strlen(json_preamble_p2)); - // JSON escape the input string - for(size_t i = 0; i < strlen(buf); i += 1) { + wlen += strftime(&wbuf[wlen], 42, "%Y-%m-%dT%H:%M:%S", tm_info); + sprintf(&wbuf[wlen], ".%03ld", (long)tv.tv_usec / 1000); + wlen += 4; + + // JSON preamble to message (15 bytes) + wlen += copystp(&wbuf[wlen], "\", \"message\": \""); + + // Message + // Each iteration will advance wlen by 1 or 2 + for (size_t i = 0; i < buflen; i++) { + // Truncate the message if the write buffer is full + if (wlen == max_b || wlen == max_b - 1) { + wlen += copystp(&wbuf[wlen], "..."); + break; + } switch (buf[i]) { case '"': - write(jl_sig_fd, "\\\"", 2); + wlen += copystp(&wbuf[wlen], "\\\""); break; case '\b': - write(jl_sig_fd, "\\b", 2); + wlen += copystp(&wbuf[wlen], "\\b"); break; case '\n': - write(jl_sig_fd, "\\n", 2); + wlen += copystp(&wbuf[wlen], "\\n"); break; case '\r': - write(jl_sig_fd, "\\r", 2); + wlen += copystp(&wbuf[wlen], "\\r"); break; case '\t': - write(jl_sig_fd, "\\t", 2); + wlen += copystp(&wbuf[wlen], "\\t"); break; case '\\': - write(jl_sig_fd, "\\\\", 2); + wlen += copystp(&wbuf[wlen], "\\\\"); break; default: - write(jl_sig_fd, buf + i, 1); + wbuf[wlen++] = buf[i]; + break; } } - write(jl_sig_fd, json_postamble, strlen(json_postamble)); + // JSON completion (3 bytes) + wlen += copystp(&wbuf[wlen], "\"}\n"); + write(jl_sig_fd, wbuf, wlen); fdatasync(jl_sig_fd); } +extern int jl_inside_heartbeat_thread(void); + JL_DLLEXPORT void jl_safe_printf(const char *fmt, ...) { static char buf[1000]; @@ -747,8 +774,8 @@ JL_DLLEXPORT void jl_safe_printf(const char *fmt, ...) // order is important here: we want to ensure that the threading infra // has been initialized before we start trying to print to the // safe crash log file - if (jl_sig_fd != 0 && jl_inside_signal_handler()) { - print_error_msg_as_json(buf); + if (jl_sig_fd != 0 && (jl_inside_signal_handler() || jl_inside_heartbeat_thread())) { + write_to_safe_crash_log(buf); } if (write(STDERR_FILENO, buf, strlen(buf)) < 0) { // nothing we can do; ignore the failure diff --git a/src/threading.c b/src/threading.c index 77b24dcb5208c..b0b88cfb7c2a5 100644 --- a/src/threading.c +++ b/src/threading.c @@ -951,6 +951,7 @@ JL_DLLEXPORT int jl_alignment(size_t sz) #include volatile int heartbeat_enabled; +uv_thread_t heartbeat_uvtid; uv_sem_t heartbeat_on_sem, // jl_heartbeat_enable -> thread heartbeat_off_sem; // thread -> jl_heartbeat_enable int heartbeat_interval_s, @@ -965,12 +966,17 @@ void jl_heartbeat_threadfun(void *arg); // start the heartbeat thread with heartbeats disabled void jl_init_heartbeat(void) { - uv_thread_t uvtid; heartbeat_enabled = 0; uv_sem_init(&heartbeat_on_sem, 0); uv_sem_init(&heartbeat_off_sem, 0); - uv_thread_create(&uvtid, jl_heartbeat_threadfun, NULL); - uv_thread_detach(&uvtid); + uv_thread_create(&heartbeat_uvtid, jl_heartbeat_threadfun, NULL); + uv_thread_detach(&heartbeat_uvtid); +} + +int jl_inside_heartbeat_thread(void) +{ + uv_thread_t curr_uvtid = uv_thread_self(); + return curr_uvtid == heartbeat_uvtid; } // enable/disable heartbeats @@ -1143,6 +1149,11 @@ void jl_init_heartbeat(void) { } +int jl_inside_heartbeat_thread(void) +{ + return 0; +} + JL_DLLEXPORT int jl_heartbeat_enable(int heartbeat_s, int show_tasks_after_n, int reset_after_n) {