Skip to content

Commit

Permalink
Merge pull request #418 from sodomelle/tls-destination-crash
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAnno authored Dec 19, 2024
2 parents 16b9c5e + 4648b61 commit 7c7ef1f
Show file tree
Hide file tree
Showing 36 changed files with 358 additions and 72 deletions.
5 changes: 5 additions & 0 deletions lib/driver.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ log_driver_free(LogPipe *s)
g_free(self->group);
if (self->id)
g_free(self->id);

signal_slot_connector_free(self->signal_slot_connector);

log_pipe_free_method(s);
}

Expand All @@ -148,6 +151,8 @@ log_driver_init_instance(LogDriver *self, GlobalConfig *cfg)
self->super.init = log_driver_init_method;
self->super.deinit = log_driver_deinit_method;
self->super.post_deinit = log_driver_post_deinit_method;

self->signal_slot_connector = signal_slot_connector_new();
}

/* LogSrcDriver */
Expand Down
4 changes: 4 additions & 0 deletions lib/driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "logpipe.h"
#include "logqueue.h"
#include "cfg.h"
#include "signal-slot-connector/signal-slot-connector.h"

/*
* Drivers overview
Expand Down Expand Up @@ -111,6 +112,9 @@ struct _LogDriver
gboolean optional;
gchar *group;
gchar *id;

SignalSlotConnector *signal_slot_connector;

GList *plugins;

StatsCounterItem *processed_group_messages;
Expand Down
2 changes: 0 additions & 2 deletions lib/logpipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ log_pipe_init_instance(LogPipe *self, GlobalConfig *cfg)
self->pipe_next = NULL;
self->persist_name = NULL;
self->plugin_name = NULL;
self->signal_slot_connector = signal_slot_connector_new();

self->queue = log_pipe_forward_msg;
self->free_fn = log_pipe_free_method;
Expand Down Expand Up @@ -168,7 +167,6 @@ _free(LogPipe *self)
g_free((gpointer)self->persist_name);
g_free(self->plugin_name);
g_list_free_full(self->info, g_free);
signal_slot_connector_free(self->signal_slot_connector);
g_free(self);
}

Expand Down
2 changes: 0 additions & 2 deletions lib/logpipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
#include "cfg.h"
#include "atomic.h"
#include "messages.h"
#include "signal-slot-connector/signal-slot-connector.h"

/* notify code values */
#define NC_CLOSE 1
Expand Down Expand Up @@ -307,7 +306,6 @@ struct _LogPipe
StatsCounterItem *discarded_messages;
const gchar *persist_name;
gchar *plugin_name;
SignalSlotConnector *signal_slot_connector;
LogPipeOptions options;

gboolean (*pre_init)(LogPipe *self);
Expand Down
6 changes: 6 additions & 0 deletions lib/logwriter.c
Original file line number Diff line number Diff line change
Expand Up @@ -1741,6 +1741,12 @@ log_writer_steal_proto(LogWriter *self)
return proto;
}

LogProtoClient *
log_writer_get_proto(LogWriter *self)
{
return self->proto;
}


/* run in the main thread in reaction to a log_writer_reopen to change
* the destination LogProtoClient instance. It needs to be ran in the main
Expand Down
1 change: 1 addition & 0 deletions lib/logwriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ gboolean log_writer_has_pending_writes(LogWriter *self);
gboolean log_writer_opened(LogWriter *self);
void log_writer_reopen(LogWriter *self, LogProtoClient *proto);
LogProtoClient *log_writer_steal_proto(LogWriter *self);
LogProtoClient *log_writer_get_proto(LogWriter *self);
void log_writer_set_queue(LogWriter *self, LogQueue *queue);
LogQueue *log_writer_get_queue(LogWriter *s);
LogWriter *log_writer_new(guint32 flags, GlobalConfig *cfg);
Expand Down
6 changes: 3 additions & 3 deletions lib/transport/transport-adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ gssize
log_transport_adapter_read_method(LogTransport *s, gpointer buf, gsize buflen, LogTransportAuxData *aux)
{
LogTransportAdapter *self = (LogTransportAdapter *) s;
LogTransport *transport = log_transport_stack_get_transport(s->stack, self->base_index);
LogTransport *transport = log_transport_stack_get_or_create_transport(s->stack, self->base_index);

return log_transport_read(transport, buf, buflen, aux);
}
Expand All @@ -36,7 +36,7 @@ gssize
log_transport_adapter_write_method(LogTransport *s, const gpointer buf, gsize count)
{
LogTransportAdapter *self = (LogTransportAdapter *) s;
LogTransport *transport = log_transport_stack_get_transport(s->stack, self->base_index);
LogTransport *transport = log_transport_stack_get_or_create_transport(s->stack, self->base_index);

return log_transport_write(transport, buf, count);
}
Expand All @@ -45,7 +45,7 @@ gssize
log_transport_adapter_writev_method(LogTransport *s, struct iovec *iov, gint iov_count)
{
LogTransportAdapter *self = (LogTransportAdapter *) s;
LogTransport *transport = log_transport_stack_get_transport(s->stack, self->base_index);
LogTransport *transport = log_transport_stack_get_or_create_transport(s->stack, self->base_index);

return log_transport_writev(transport, iov, iov_count);
}
Expand Down
2 changes: 1 addition & 1 deletion lib/transport/transport-stack.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ log_transport_stack_switch(LogTransportStack *self, gint index)
{
g_assert(index < LOG_TRANSPORT__MAX);
LogTransport *active_transport = log_transport_stack_get_active(self);
LogTransport *requested_transport = log_transport_stack_get_transport(self, index);
LogTransport *requested_transport = log_transport_stack_get_or_create_transport(self, index);

if (!requested_transport)
return FALSE;
Expand Down
13 changes: 11 additions & 2 deletions lib/transport/transport-stack.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ struct _LogTransportStack
};

static inline LogTransport *
log_transport_stack_get_transport(LogTransportStack *self, gint index)
log_transport_stack_get_or_create_transport(LogTransportStack *self, gint index)
{
g_assert(index < LOG_TRANSPORT__MAX);

Expand All @@ -127,7 +127,16 @@ log_transport_stack_get_transport(LogTransportStack *self, gint index)
static inline LogTransport *
log_transport_stack_get_active(LogTransportStack *self)
{
return log_transport_stack_get_transport(self, self->active_transport);
// TODO - Change it to log_transport_stack_get_transport after checking call sites
return log_transport_stack_get_or_create_transport(self, self->active_transport);
}

static inline LogTransport *
log_transport_stack_get_transport(LogTransportStack *self, gint index)
{
g_assert(index < LOG_TRANSPORT__MAX);

return self->transports[index];
}

void log_transport_stack_add_factory(LogTransportStack *self, LogTransportFactory *);
Expand Down
13 changes: 12 additions & 1 deletion lib/transport/transport-tls.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
#include <openssl/err.h>
#include <errno.h>

const gchar *TLS_TRANSPORT_NAME = "tls";

typedef struct _LogTransportTLS
{
LogTransportSocket super;
Expand Down Expand Up @@ -239,6 +241,15 @@ log_transport_tls_write_method(LogTransport *s, const gpointer buf, gsize buflen
return -1;
}

TLSSession *
log_tansport_tls_get_session(LogTransport *s)
{
g_assert(s->name == TLS_TRANSPORT_NAME);

LogTransportTLS *self = (LogTransportTLS *)s;
return self->tls_session;
}


static void log_transport_tls_free_method(LogTransport *s);

Expand All @@ -248,7 +259,7 @@ log_transport_tls_new(TLSSession *tls_session, gint fd)
LogTransportTLS *self = g_new0(LogTransportTLS, 1);

log_transport_stream_socket_init_instance(&self->super, fd);
self->super.super.name = "tls";
self->super.super.name = TLS_TRANSPORT_NAME;
self->super.super.cond = 0;
self->super.super.read = log_transport_tls_read_method;
self->super.super.write = log_transport_tls_write_method;
Expand Down
1 change: 1 addition & 0 deletions lib/transport/transport-tls.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@
#include "transport/tls-context.h"

LogTransport *log_transport_tls_new(TLSSession *tls_session, gint fd);
TLSSession *log_tansport_tls_get_session(LogTransport *s);

#endif
96 changes: 95 additions & 1 deletion modules/afsocket/afinet-dest.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
#include "gprocess.h"
#include "compat/openssl_support.h"
#include "afsocket-signals.h"
#include "transport/transport-tls.h"
#include "transport/transport-stack.h"

#include <sys/types.h>
#include <sys/socket.h>
Expand All @@ -53,6 +55,12 @@

static const gint MAX_UDP_PAYLOAD_SIZE = 65507;

typedef struct _AFInetDestKeptAliveConnection
{
AFSocketDestKeptAliveConnection super;
gchar *hostname;
} AFInetDestKeptAliveConnection;

typedef struct _AFInetDestDriverTLSVerifyData
{
TLSContext *tls_context;
Expand Down Expand Up @@ -222,12 +230,18 @@ afinet_dd_setup_tls_verifier(AFInetDestDriver *self)

AFInetDestDriverTLSVerifyData *verify_data;
verify_data = afinet_dd_tls_verify_data_new(transport_mapper_inet->tls_context, _afinet_dd_get_hostname(self),
self->super.super.super.super.signal_slot_connector);
self->super.super.super.signal_slot_connector);
TLSVerifier *verifier = tls_verifier_new(afinet_dd_verify_callback, verify_data, afinet_dd_tls_verify_data_free);

transport_mapper_inet_set_tls_verifier(transport_mapper_inet, verifier);
}

static AFInetDestDriverTLSVerifyData *
_get_tls_verify_data (TLSVerifier *verifier)
{
return (AFInetDestDriverTLSVerifyData *)verifier->verify_data;
}

void
afinet_dd_enable_failover(LogDriver *s)
{
Expand Down Expand Up @@ -697,6 +711,83 @@ afinet_dd_free(LogPipe *s)
afsocket_dd_free(s);
}

gboolean
afinet_dd_should_restore_connection(AFSocketDestDriver *s, AFSocketDestKeptAliveConnection *c)
{
AFInetDestDriver *self = (AFInetDestDriver *) s;
AFInetDestKeptAliveConnection *conn = (AFInetDestKeptAliveConnection *) c;

if (g_strcmp0(_afinet_dd_get_hostname(self), conn->hostname) != 0)
return FALSE;

return afsocket_dd_should_restore_connection_method(&self->super, c);
}

static void
afinet_dd_restore_connection(AFSocketDestDriver *s, AFSocketDestKeptAliveConnection *item)
{
AFInetDestDriver *self = (AFInetDestDriver *) s;

LogWriter *writer = item->writer;

if (!writer)
goto exit;

LogProtoClient *proto = log_writer_get_proto(writer);

if (!proto)
goto exit;

LogTransport *transport = log_transport_stack_get_transport(&proto->transport_stack, LOG_TRANSPORT_TLS);

if (transport)
{
TLSSession *session = log_tansport_tls_get_session(transport);
AFInetDestDriverTLSVerifyData *verify_data = _get_tls_verify_data (session->verifier);
verify_data->signal_connector = self->super.super.super.signal_slot_connector;
}

exit:
afsocket_dd_restore_connection_method(&self->super, item);
}

static void
_kept_alive_connection_free(AFSocketDestKeptAliveConnection *s)
{
AFInetDestKeptAliveConnection *self = (AFInetDestKeptAliveConnection *) s;

g_free(self->hostname);

afsocket_kept_alive_connection_free_method(&self->super);
}

static AFInetDestKeptAliveConnection *
_kept_alive_connection_new(const gchar *transport, const gchar *proto, const gchar *hostname,
GSockAddr *dest_addr, LogWriter *writer)
{
AFInetDestKeptAliveConnection *self = g_new(AFInetDestKeptAliveConnection, 1);
afsocket_kept_alive_connection_init_instance(&self->super, transport, proto, dest_addr, writer);

self->super.free_fn = _kept_alive_connection_free;

self->hostname = g_strdup(hostname);

return self;
}

static void
afinet_dd_save_connection(AFSocketDestDriver *s)
{
AFInetDestDriver *self = (AFInetDestDriver *) s;

const gchar *transport = transport_mapper_get_transport(self->super.transport_mapper);
const gchar *proto = transport_mapper_get_logproto(self->super.transport_mapper);
AFInetDestKeptAliveConnection *item = _kept_alive_connection_new(transport, proto, _afinet_dd_get_hostname(self),
self->super.dest_addr, self->super.writer);

afsocket_dd_save_connection(&self->super, &item->super);
}

static AFInetDestDriver *
afinet_dd_new_instance(TransportMapper *transport_mapper, gchar *hostname, GlobalConfig *cfg)
{
Expand All @@ -710,6 +801,9 @@ afinet_dd_new_instance(TransportMapper *transport_mapper, gchar *hostname, Globa
self->super.construct_writer = afinet_dd_construct_writer;
self->super.setup_addresses = afinet_dd_setup_addresses;
self->super.get_dest_name = afinet_dd_get_dest_name;
self->super.should_restore_connection = afinet_dd_should_restore_connection;
self->super.restore_connection = afinet_dd_restore_connection;
self->super.save_connection = afinet_dd_save_connection;

self->primary = g_strdup(hostname);

Expand Down
1 change: 0 additions & 1 deletion modules/afsocket/afinet-dest.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ typedef struct _AFInetDestDriver
gchar *bind_ip;
/* character as it can contain a service name from /etc/services */
gchar *dest_port;
/* destination hostname is stored in super.hostname */
} AFInetDestDriver;

void afinet_dd_set_localport(LogDriver *self, gchar *service);
Expand Down
Loading

0 comments on commit 7c7ef1f

Please sign in to comment.