Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proxy protocol moves tolog transport #156

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions lib/logproto/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ set(LOGPROTO_HEADERS
logproto/logproto-server.h
logproto/logproto-text-client.h
logproto/logproto-text-server.h
logproto/logproto-proxied-text-server.h
PARENT_SCOPE)

set(LOGPROTO_SOURCES
Expand All @@ -26,7 +25,6 @@ set(LOGPROTO_SOURCES
logproto/logproto-server.c
logproto/logproto-text-client.c
logproto/logproto-text-server.c
logproto/logproto-proxied-text-server.c
PARENT_SCOPE)

add_test_subdirectory(tests)
2 changes: 0 additions & 2 deletions lib/logproto/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ logprotoinclude_HEADERS = \
lib/logproto/logproto-framed-server.h \
lib/logproto/logproto-text-client.h \
lib/logproto/logproto-text-server.h \
lib/logproto/logproto-proxied-text-server.h \
lib/logproto/logproto-multiline-server.h \
lib/logproto/logproto-record-server.h \
lib/logproto/logproto-builtins.h \
Expand All @@ -26,7 +25,6 @@ logproto_sources = \
lib/logproto/logproto-framed-server.c \
lib/logproto/logproto-text-client.c \
lib/logproto/logproto-text-server.c \
lib/logproto/logproto-proxied-text-server.c \
lib/logproto/logproto-multiline-server.c \
lib/logproto/logproto-record-server.c \
lib/logproto/logproto-builtins.c
Expand Down
17 changes: 10 additions & 7 deletions lib/logproto/logproto-buffered-server.c
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,10 @@ log_proto_buffered_server_apply_state(LogProtoBufferedServer *self, PersistEntry
struct stat st;
gint64 ofs = 0;
LogProtoBufferedServerState *state;
LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack);
gint fd;

fd = self->super.transport->fd;
fd = transport->fd;
self->persist_handle = handle;

if (fstat(fd, &st) < 0)
Expand Down Expand Up @@ -254,7 +255,7 @@ log_proto_buffered_server_apply_state(LogProtoBufferedServer *self, PersistEntry
raw_buffer = g_alloca(state->raw_buffer_size);
}

rc = log_transport_read(self->super.transport, raw_buffer, state->raw_buffer_size, NULL);
rc = log_transport_read(transport, raw_buffer, state->raw_buffer_size, NULL);
if (rc != state->raw_buffer_size)
{
msg_notice("Error re-reading buffer contents of the file to be continued, restarting from the beginning",
Expand Down Expand Up @@ -586,8 +587,9 @@ LogProtoPrepareAction
log_proto_buffered_server_prepare(LogProtoServer *s, GIOCondition *cond, gint *timeout G_GNUC_UNUSED)
{
LogProtoBufferedServer *self = (LogProtoBufferedServer *) s;
LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack);

*cond = self->super.transport->cond;
*cond = transport->cond;

/* if there's no pending I/O in the transport layer, then we want to do a read */
if (*cond == 0)
Expand All @@ -600,7 +602,9 @@ static gint
log_proto_buffered_server_read_data_method(LogProtoBufferedServer *self, guchar *buf, gsize len,
LogTransportAuxData *aux)
{
return log_transport_read(self->super.transport, buf, len, aux);
LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack);

return log_transport_read(transport, buf, len, aux);
}

static void
Expand Down Expand Up @@ -816,7 +820,7 @@ log_proto_buffered_server_fetch_into_buffer(LogProtoBufferedServer *self)
{
/* an error occurred while reading */
msg_error("I/O error occurred while reading",
evt_tag_int(EVT_TAG_FD, self->super.transport->fd),
evt_tag_int(EVT_TAG_FD, self->super.transport_stack.fd),
evt_tag_error(EVT_TAG_OSERROR));
result = G_IO_STATUS_ERROR;
}
Expand All @@ -825,7 +829,7 @@ log_proto_buffered_server_fetch_into_buffer(LogProtoBufferedServer *self)
{
/* EOF read */
msg_trace("EOF occurred while reading",
evt_tag_int(EVT_TAG_FD, self->super.transport->fd));
evt_tag_int(EVT_TAG_FD, self->super.transport_stack.fd));
if (state->raw_buffer_leftover_size > 0)
{
msg_error("EOF read on a channel with leftovers from previous character conversion, dropping input");
Expand Down Expand Up @@ -1080,7 +1084,6 @@ log_proto_buffered_server_init(LogProtoBufferedServer *self, LogTransport *trans
self->super.prepare = log_proto_buffered_server_prepare;
self->super.fetch = log_proto_buffered_server_fetch;
self->super.free_fn = log_proto_buffered_server_free_method;
self->super.transport = transport;
self->super.restart_with_state = log_proto_buffered_server_restart_with_state;
self->super.validate_options = log_proto_buffered_server_validate_options_method;
self->convert = (GIConv) -1;
Expand Down
5 changes: 0 additions & 5 deletions lib/logproto/logproto-builtins.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "logproto-dgram-server.h"
#include "logproto-text-client.h"
#include "logproto-text-server.h"
#include "logproto-proxied-text-server.h"
#include "logproto-framed-client.h"
#include "logproto-framed-server.h"
#include "plugin.h"
Expand All @@ -38,8 +37,6 @@ DEFINE_LOG_PROTO_SERVER(log_proto_dgram);
DEFINE_LOG_PROTO_CLIENT(log_proto_text);
DEFINE_LOG_PROTO_SERVER(log_proto_text);
DEFINE_LOG_PROTO_SERVER(log_proto_text_with_nuls);
DEFINE_LOG_PROTO_SERVER(log_proto_proxied_text);
DEFINE_LOG_PROTO_SERVER(log_proto_proxied_text_tls_passthrough, .use_multitransport = TRUE);
DEFINE_LOG_PROTO_CLIENT(log_proto_framed);
DEFINE_LOG_PROTO_SERVER(log_proto_framed);

Expand All @@ -51,8 +48,6 @@ static Plugin framed_server_plugins[] =
LOG_PROTO_CLIENT_PLUGIN(log_proto_text, "text"),
LOG_PROTO_SERVER_PLUGIN(log_proto_text, "text"),
LOG_PROTO_SERVER_PLUGIN(log_proto_text_with_nuls, "text-with-nuls"),
LOG_PROTO_SERVER_PLUGIN(log_proto_proxied_text, "proxied-tcp"),
LOG_PROTO_SERVER_PLUGIN(log_proto_proxied_text_tls_passthrough, "proxied-tls-passthrough"),
LOG_PROTO_CLIENT_PLUGIN(log_proto_framed, "framed"),
LOG_PROTO_SERVER_PLUGIN(log_proto_framed, "framed"),
};
Expand Down
4 changes: 2 additions & 2 deletions lib/logproto/logproto-client.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ log_proto_client_validate_options_method(LogProtoClient *s)
void
log_proto_client_free_method(LogProtoClient *s)
{
log_transport_free(s->transport);
log_transport_stack_deinit(&s->transport_stack);
}

void
Expand All @@ -55,7 +55,7 @@ log_proto_client_init(LogProtoClient *self, LogTransport *transport, const LogPr
self->validate_options = log_proto_client_validate_options_method;
self->free_fn = log_proto_client_free_method;
self->options = options;
self->transport = transport;
log_transport_stack_init(&self->transport_stack, transport);
}

void
Expand Down
5 changes: 2 additions & 3 deletions lib/logproto/logproto-client.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ struct _LogProtoClient
{
LogProtoStatus status;
const LogProtoClientOptions *options;
LogTransport *transport;
LogTransportStack transport_stack;
/* FIXME: rename to something else */
gboolean (*prepare)(LogProtoClient *s, gint *fd, GIOCondition *cond, gint *timeout);
LogProtoStatus (*post)(LogProtoClient *s, LogMessage *logmsg, guchar *msg, gsize msg_len, gboolean *consumed);
Expand Down Expand Up @@ -165,7 +165,7 @@ static inline gint
log_proto_client_get_fd(LogProtoClient *s)
{
/* FIXME: Layering violation */
return s->transport->fd;
return s->transport_stack.fd;
}

static inline void
Expand Down Expand Up @@ -219,7 +219,6 @@ struct _LogProtoClientFactory
{
LogProtoClient *(*construct)(LogTransport *transport, const LogProtoClientOptions *options);
gint default_inet_port;
gboolean use_multitransport;
gboolean stateful;
};

Expand Down
10 changes: 6 additions & 4 deletions lib/logproto/logproto-framed-server.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ static LogProtoPrepareAction
log_proto_framed_server_prepare(LogProtoServer *s, GIOCondition *cond, gint *timeout G_GNUC_UNUSED)
{
LogProtoFramedServer *self = (LogProtoFramedServer *) s;
LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack);

*cond = self->super.transport->cond;
*cond = transport->cond;

/* there is a half message in our buffer so try to wait */
if (!self->half_message_in_buffer)
Expand All @@ -96,6 +97,7 @@ static gboolean
log_proto_framed_server_fetch_data(LogProtoFramedServer *self, gboolean *may_read,
LogProtoStatus *status)
{
LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack);
gint rc;
*status = LPS_SUCCESS;

Expand All @@ -109,15 +111,15 @@ log_proto_framed_server_fetch_data(LogProtoFramedServer *self, gboolean *may_rea
return FALSE;

log_transport_aux_data_reinit(&self->buffer_aux);
rc = log_transport_read(self->super.transport, &self->buffer[self->buffer_end], self->buffer_size - self->buffer_end,
rc = log_transport_read(transport, &self->buffer[self->buffer_end], self->buffer_size - self->buffer_end,
&self->buffer_aux);

if (rc < 0)
{
if (errno != EAGAIN)
{
msg_error("Error reading RFC6587 style framed data",
evt_tag_int("fd", self->super.transport->fd),
evt_tag_int("fd", transport->fd),
evt_tag_error("error"));
*status = LPS_ERROR;
}
Expand All @@ -132,7 +134,7 @@ log_proto_framed_server_fetch_data(LogProtoFramedServer *self, gboolean *may_rea
if (rc == 0)
{
msg_trace("EOF occurred while reading",
evt_tag_int(EVT_TAG_FD, self->super.transport->fd));
evt_tag_int(EVT_TAG_FD, transport->fd));
*status = LPS_EOF;
return FALSE;
}
Expand Down
5 changes: 3 additions & 2 deletions lib/logproto/logproto-record-server.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,17 @@ static gint
log_proto_record_server_read_data(LogProtoBufferedServer *s, guchar *buf, gsize len, LogTransportAuxData *aux)
{
LogProtoRecordServer *self = (LogProtoRecordServer *) s;
LogTransport *transport = log_transport_stack_get_active(&self->super.super.transport_stack);
gint rc;

/* assert that we have enough space in the buffer to read record_size bytes */
g_assert(len >= self->record_size);
len = self->record_size;
rc = log_transport_read(self->super.super.transport, buf, len, aux);
rc = log_transport_read(transport, buf, len, aux);
if (rc > 0 && rc != self->record_size)
{
msg_error("Record size was set, and couldn't read enough bytes",
evt_tag_int(EVT_TAG_FD, self->super.super.transport->fd),
evt_tag_int(EVT_TAG_FD, transport->fd),
evt_tag_int("record_size", self->record_size),
evt_tag_int("read", rc));
errno = EIO;
Expand Down
14 changes: 7 additions & 7 deletions lib/logproto/logproto-server.c
Original file line number Diff line number Diff line change
Expand Up @@ -122,17 +122,17 @@ log_proto_server_validate_options_method(LogProtoServer *s)
}

void
log_proto_server_free_method(LogProtoServer *s)
log_proto_server_free_method(LogProtoServer *self)
{
log_transport_free(s->transport);
log_transport_stack_deinit(&self->transport_stack);
}

void
log_proto_server_free(LogProtoServer *s)
log_proto_server_free(LogProtoServer *self)
{
if (s->free_fn)
s->free_fn(s);
g_free(s);
if (self->free_fn)
self->free_fn(self);
g_free(self);
}

void
Expand All @@ -141,7 +141,7 @@ log_proto_server_init(LogProtoServer *self, LogTransport *transport, const LogPr
self->validate_options = log_proto_server_validate_options_method;
self->free_fn = log_proto_server_free_method;
self->options = options;
self->transport = transport;
log_transport_stack_init(&self->transport_stack, transport);
}

gboolean
Expand Down
5 changes: 2 additions & 3 deletions lib/logproto/logproto-server.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ struct _LogProtoServer
{
LogProtoStatus status;
const LogProtoServerOptions *options;
LogTransport *transport;
LogTransportStack transport_stack;
AckTracker *ack_tracker;

LogProtoServerWakeupCallback wakeup_callback;
Expand Down Expand Up @@ -144,7 +144,7 @@ log_proto_server_get_fd(LogProtoServer *s)
/* FIXME: Layering violation, as transport may not be fd based at all.
* But LogReader assumes it is. */

return s->transport->fd;
return s->transport_stack.fd;
}

static inline void
Expand Down Expand Up @@ -213,7 +213,6 @@ struct _LogProtoServerFactory
{
LogProtoServer *(*construct)(LogTransport *transport, const LogProtoServerOptions *options);
gint default_inet_port;
gboolean use_multitransport;
};

static inline LogProtoServer *
Expand Down
18 changes: 10 additions & 8 deletions lib/logproto/logproto-text-client.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ static gboolean
log_proto_text_client_prepare(LogProtoClient *s, gint *fd, GIOCondition *cond, gint *timeout)
{
LogProtoTextClient *self = (LogProtoTextClient *) s;
LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack);

*fd = self->super.transport->fd;
*cond = self->super.transport->cond;
*fd = transport->fd;
*cond = transport->cond;

/* if there's no pending I/O in the transport layer, then we want to do a write */
if (*cond == 0)
Expand All @@ -50,23 +51,24 @@ static LogProtoStatus
log_proto_text_client_drop_input(LogProtoClient *s)
{
LogProtoTextClient *self = (LogProtoTextClient *) s;
LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack);
guchar buf[1024];
gint rc = -1;

do
{
rc = log_transport_read(self->super.transport, buf, sizeof(buf), NULL);
rc = log_transport_read(transport, buf, sizeof(buf), NULL);
}
while (rc > 0);

if (rc == -1 && errno != EAGAIN)
{
msg_error("Error reading data", evt_tag_int("fd", self->super.transport->fd), evt_tag_error("error"));
msg_error("Error reading data", evt_tag_int("fd", transport->fd), evt_tag_error("error"));
return LPS_ERROR;
}
else if (rc == 0)
{
msg_error("EOF occurred while idle", evt_tag_int("fd", log_proto_client_get_fd(&self->super)));
msg_error("EOF occurred while idle", evt_tag_int("fd", transport->fd));
return LPS_ERROR;
}

Expand All @@ -77,6 +79,7 @@ static LogProtoStatus
log_proto_text_client_flush(LogProtoClient *s)
{
LogProtoTextClient *self = (LogProtoTextClient *) s;
LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack);
gint rc;

if (!self->partial)
Expand All @@ -87,13 +90,13 @@ log_proto_text_client_flush(LogProtoClient *s)
/* attempt to flush previously buffered data */
gint len = self->partial_len - self->partial_pos;

rc = log_transport_write(self->super.transport, &self->partial[self->partial_pos], len);
rc = log_transport_write(transport, &self->partial[self->partial_pos], len);
if (rc < 0)
{
if (errno != EAGAIN && errno != EINTR)
{
msg_error("I/O error occurred while writing",
evt_tag_int("fd", self->super.transport->fd),
evt_tag_int("fd", transport->fd),
evt_tag_error(EVT_TAG_OSERROR));
return LPS_ERROR;
}
Expand Down Expand Up @@ -202,7 +205,6 @@ log_proto_text_client_init(LogProtoTextClient *self, LogTransport *transport, co
self->super.process_in = log_proto_text_client_drop_input;
self->super.post = log_proto_text_client_post;
self->super.free_fn = log_proto_text_client_free;
self->super.transport = transport;
self->next_state = -1;
}

Expand Down
2 changes: 1 addition & 1 deletion lib/logproto/logproto.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#ifndef LOGPROTO_H_INCLUDED
#define LOGPROTO_H_INCLUDED

#include "transport/logtransport.h"
#include "transport/transport-stack.h"

#define RFC6587_MAX_FRAME_LEN_DIGITS 10

Expand Down
3 changes: 1 addition & 2 deletions lib/logproto/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ set(TEST_LOGPROTO_SOURCES
test-dgram-server.c
test-framed-server.c
test-indented-multiline-server.c
test-regexp-multiline-server.c
test-proxy-proto.c)
test-regexp-multiline-server.c)

add_unit_test(LIBTEST CRITERION
TARGET test_logproto
Expand Down
3 changes: 1 addition & 2 deletions lib/logproto/tests/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ lib_logproto_tests_test_logproto_SOURCES = \
lib/logproto/tests/test-dgram-server.c \
lib/logproto/tests/test-framed-server.c \
lib/logproto/tests/test-indented-multiline-server.c \
lib/logproto/tests/test-regexp-multiline-server.c \
lib/logproto/tests/test-proxy-proto.c
lib/logproto/tests/test-regexp-multiline-server.c

lib_logproto_tests_test_findeom_CFLAGS = \
$(TEST_CFLAGS) \
Expand Down
Loading
Loading