diff --git a/lib/filterx/filterx-eval.h b/lib/filterx/filterx-eval.h index ddccbcbf2..14b0313a8 100644 --- a/lib/filterx/filterx-eval.h +++ b/lib/filterx/filterx-eval.h @@ -94,7 +94,6 @@ filterx_eval_prepare_for_fork(FilterXEvalContext *context, LogMessage **pmsg, co filterx_eval_sync_message(context, pmsg, path_options); if (context) filterx_scope_write_protect(context->scope); - log_msg_write_protect(*pmsg); } #endif diff --git a/lib/logmpx.c b/lib/logmpx.c index 2ac1d8397..6cd119756 100644 --- a/lib/logmpx.c +++ b/lib/logmpx.c @@ -94,6 +94,7 @@ log_multiplexer_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_op * data we still need */ filterx_eval_prepare_for_fork(path_options->filterx_context, &msg, path_options); + log_msg_write_protect(msg); } for (fallback = 0; (fallback == 0) || (fallback == 1 && self->fallback_exists && !delivered); fallback++) { diff --git a/lib/logthrsource/logthrsourcedrv.c b/lib/logthrsource/logthrsourcedrv.c index 6817fbd60..e495fccf7 100644 --- a/lib/logthrsource/logthrsourcedrv.c +++ b/lib/logthrsource/logthrsourcedrv.c @@ -431,6 +431,10 @@ log_threaded_source_worker_blocking_post(LogThreadedSourceWorker *self, LogMessa { log_threaded_source_worker_post(self, msg); + /* unlocked, as only this thread can decrease the window size */ + if (!self->control->auto_close_batches && !log_threaded_source_worker_free_to_send(self)) + log_threaded_source_worker_close_batch(self); + /* * The wakeup lock must be held before calling free_to_send() and suspend(), * otherwise g_cond_signal() might be called between free_to_send() and diff --git a/lib/logthrsource/logthrsourcedrv.h b/lib/logthrsource/logthrsourcedrv.h index 67baf2a44..cf204426c 100644 --- a/lib/logthrsource/logthrsourcedrv.h +++ b/lib/logthrsource/logthrsourcedrv.h @@ -127,7 +127,7 @@ void log_threaded_source_worker_close_batch(LogThreadedSourceWorker *self); /* blocking API */ void log_threaded_source_worker_blocking_post(LogThreadedSourceWorker *self, LogMessage *msg); -/* non-blocking API, use it wisely (thread boundaries) */ +/* non-blocking API, use it wisely (thread boundaries); call close_batch() at least before suspending */ void log_threaded_source_worker_post(LogThreadedSourceWorker *self, LogMessage *msg); gboolean log_threaded_source_worker_free_to_send(LogThreadedSourceWorker *self); diff --git a/news/bugfix-314.md b/news/bugfix-314.md new file mode 100644 index 000000000..564b0ac00 --- /dev/null +++ b/news/bugfix-314.md @@ -0,0 +1 @@ +`opentelemetry()`, `axosyslog-otlp()` sources: fix source hang-up on flow-controlled paths