Skip to content

Commit

Permalink
Merge pull request #303 from MrAnno/fix-file-writer-proto
Browse files Browse the repository at this point in the history
`file()`, `stdout()`: fix log sources getting stuck
  • Loading branch information
alltilla authored Sep 24, 2024
2 parents 1433ee6 + 024ceb9 commit fcbb8c6
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 71 deletions.
173 changes: 102 additions & 71 deletions modules/affile/logproto-file-writer.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
/*
* Copyright (c) 2002-2012 Balabit
* Copyright (c) 2024 Axoflow
* Copyright (c) 1998-2012 Balázs Scheidler
* Copyright (c) 2024 László Várady
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 as published
Expand Down Expand Up @@ -38,11 +40,92 @@ typedef struct _LogProtoFileWriter
gint buf_size;
gint buf_count;
gint fd;
gint sum_len;
gsize sum_len;
gboolean fsync;
struct iovec buffer[0];
} LogProtoFileWriter;

static inline gboolean
_flush_partial(LogProtoFileWriter *self, LogProtoStatus *status)
{
/* there is still some data from the previous file writing process */

gint len = self->partial_len - self->partial_pos;
gssize rc = log_transport_write(self->super.transport, self->partial + self->partial_pos, len);

if (rc > 0 && self->fsync)
fsync(self->fd);

if (rc < 0)
{
if (errno == EINTR || errno == EAGAIN)
{
*status = LPS_SUCCESS;
return FALSE;
}

log_proto_client_msg_rewind(&self->super);
msg_error("I/O error occurred while writing",
evt_tag_int("fd", self->super.transport->fd),
evt_tag_error(EVT_TAG_OSERROR));

*status = LPS_ERROR;
return FALSE;
}

if (rc != len)
{
self->partial_pos += rc;
*status = LPS_PARTIAL;
return FALSE;
}

log_proto_client_msg_ack(&self->super, self->partial_messages);
g_free(self->partial);
self->partial = NULL;
self->partial_messages = 0;
return TRUE;
}

static inline void
_process_partial_write(LogProtoFileWriter *self, gsize written)
{
/* partial success: not everything has been written out */

/* look for the first chunk that has been cut */
gsize sum = self->buffer[0].iov_len; /* sum is the cumulated length of the already processed items */
gint i = 0;
while (written > sum)
sum += self->buffer[++i].iov_len;

gsize first_non_written_msg_chunk_len = sum - written;
self->partial_len = first_non_written_msg_chunk_len;
gint first_non_written_chunk_index = i;
++i;

/* add the lengths of the following messages */
while (i < self->buf_count)
self->partial_len += self->buffer[i++].iov_len;

/* allocate and copy the remaining data */
self->partial = (guchar *)g_malloc(self->partial_len);
gsize ofs = first_non_written_msg_chunk_len;
gsize pos = self->buffer[first_non_written_chunk_index].iov_len - ofs;
memcpy(self->partial, (guchar *) self->buffer[first_non_written_chunk_index].iov_base + pos, ofs);
i = first_non_written_chunk_index + 1;
while (i < self->buf_count)
{
memcpy(self->partial + ofs, self->buffer[i].iov_base, self->buffer[i].iov_len);
ofs += self->buffer[i].iov_len;
++i;
}

self->partial_pos = 0;
self->partial_messages = self->buf_count - first_non_written_chunk_index;

log_proto_client_msg_ack(&self->super, self->buf_count - self->partial_messages);
}

/*
* log_proto_file_writer_flush:
*
Expand All @@ -55,99 +138,47 @@ static LogProtoStatus
log_proto_file_writer_flush(LogProtoClient *s)
{
LogProtoFileWriter *self = (LogProtoFileWriter *)s;
gint rc, i, i0, sum, ofs, pos;

if (self->partial)
{
/* there is still some data from the previous file writing process */
gint len = self->partial_len - self->partial_pos;

rc = log_transport_write(self->super.transport, self->partial + self->partial_pos, len);
if (rc > 0 && self->fsync)
fsync(self->fd);
if (rc < 0)
{
goto write_error;
}
else if (rc != len)
{
self->partial_pos += rc;
return LPS_PARTIAL;
}
else
{
log_proto_client_msg_ack(&self->super, self->partial_messages);
g_free(self->partial);
self->partial = NULL;
}
LogProtoStatus partial_flush_status;
if (!_flush_partial(self, &partial_flush_status))
return partial_flush_status;
}

/* we might be called from log_writer_deinit() without having a buffer at all */
if (self->buf_count == 0)
return LPS_SUCCESS;

rc = log_transport_writev(self->super.transport, self->buffer, self->buf_count);
gssize rc = log_transport_writev(self->super.transport, self->buffer, self->buf_count);

if (rc > 0 && self->fsync)
fsync(self->fd);

if (rc < 0)
{
goto write_error;
}
else if (rc != self->sum_len)
{
/* partial success: not everything has been written out */
/* look for the first chunk that has been cut */
sum = self->buffer[0].iov_len; /* sum is the cumulated length of the already processed items */
i = 0;
while (rc > sum)
sum += self->buffer[++i].iov_len;
self->partial_len = sum - rc; /* this is the length of the first non-written chunk */
i0 = i;
++i;
/* add the lengths of the following messages */
while (i < self->buf_count)
self->partial_len += self->buffer[i++].iov_len;
/* allocate and copy the remaining data */
self->partial = (guchar *)g_malloc(self->partial_len);
ofs = sum - rc; /* the length of the remaining (not processed) chunk in the first message */
pos = self->buffer[i0].iov_len - ofs;
memcpy(self->partial, (guchar *) self->buffer[i0].iov_base + pos, ofs);
i = i0 + 1;
while (i < self->buf_count)
{
memcpy(self->partial + ofs, self->buffer[i].iov_base, self->buffer[i].iov_len);
ofs += self->buffer[i].iov_len;
++i;
}
self->partial_pos = 0;
self->partial_messages = self->buf_count - i0;
}
else
{
log_proto_client_msg_ack(&self->super, self->buf_count);
}
if (errno == EINTR || errno == EAGAIN)
return LPS_SUCCESS;

/* free the previous message strings (the remaining part has been copied to the partial buffer) */
for (i = 0; i < self->buf_count; ++i)
g_free(self->buffer[i].iov_base);
self->buf_count = 0;
self->sum_len = 0;

return LPS_SUCCESS;

write_error:
if (errno != EINTR && errno != EAGAIN)
{
log_proto_client_msg_rewind(&self->super);
msg_error("I/O error occurred while writing",
evt_tag_int("fd", self->super.transport->fd),
evt_tag_error(EVT_TAG_OSERROR));
return LPS_ERROR;
}

return LPS_SUCCESS;
if (rc != self->sum_len)
_process_partial_write(self, rc);
else
log_proto_client_msg_ack(&self->super, self->buf_count);

/* free the previous message strings (the remaining part has been copied to the partial buffer) */
for (gint i = 0; i < self->buf_count; ++i)
g_free(self->buffer[i].iov_base);
self->buf_count = 0;
self->sum_len = 0;

return LPS_SUCCESS;
}

/*
Expand Down
8 changes: 8 additions & 0 deletions news/bugfix-303.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
`file()`, `stdout()`: fix log sources getting stuck

Due to an acknowledgment bug in the `file()` and `stdout()` destinations,
sources routed to those destinations may have gotten stuck as they were
flow-controlled incorrectly.

This issue occured only in extremely rare cases with regular files, but it
occured frequently with `/dev/stderr` and other slow pseudo-devices.

0 comments on commit fcbb8c6

Please sign in to comment.