diff --git a/lib/driver.c b/lib/driver.c index 1481879dfe..d7f1d2d466 100644 --- a/lib/driver.c +++ b/lib/driver.c @@ -134,6 +134,9 @@ log_driver_free(LogPipe *s) g_free(self->group); if (self->id) g_free(self->id); + + signal_slot_connector_free(self->signal_slot_connector); + log_pipe_free_method(s); } @@ -148,6 +151,8 @@ log_driver_init_instance(LogDriver *self, GlobalConfig *cfg) self->super.init = log_driver_init_method; self->super.deinit = log_driver_deinit_method; self->super.post_deinit = log_driver_post_deinit_method; + + self->signal_slot_connector = signal_slot_connector_new(); } /* LogSrcDriver */ diff --git a/lib/driver.h b/lib/driver.h index 56570acd44..207159b0d2 100644 --- a/lib/driver.h +++ b/lib/driver.h @@ -29,6 +29,7 @@ #include "logpipe.h" #include "logqueue.h" #include "cfg.h" +#include "signal-slot-connector/signal-slot-connector.h" /* * Drivers overview @@ -111,6 +112,9 @@ struct _LogDriver gboolean optional; gchar *group; gchar *id; + + SignalSlotConnector *signal_slot_connector; + GList *plugins; StatsCounterItem *processed_group_messages; diff --git a/lib/logpipe.c b/lib/logpipe.c index e9fae97466..7241e098b5 100644 --- a/lib/logpipe.c +++ b/lib/logpipe.c @@ -126,7 +126,6 @@ log_pipe_init_instance(LogPipe *self, GlobalConfig *cfg) self->pipe_next = NULL; self->persist_name = NULL; self->plugin_name = NULL; - self->signal_slot_connector = signal_slot_connector_new(); self->queue = log_pipe_forward_msg; self->free_fn = log_pipe_free_method; @@ -168,7 +167,6 @@ _free(LogPipe *self) g_free((gpointer)self->persist_name); g_free(self->plugin_name); g_list_free_full(self->info, g_free); - signal_slot_connector_free(self->signal_slot_connector); g_free(self); } diff --git a/lib/logpipe.h b/lib/logpipe.h index 8d5ae07f72..b15c8f7517 100644 --- a/lib/logpipe.h +++ b/lib/logpipe.h @@ -31,7 +31,6 @@ #include "cfg.h" #include "atomic.h" #include "messages.h" -#include "signal-slot-connector/signal-slot-connector.h" /* notify code values */ #define NC_CLOSE 1 @@ -307,7 +306,6 @@ struct _LogPipe StatsCounterItem *discarded_messages; const gchar *persist_name; gchar *plugin_name; - SignalSlotConnector *signal_slot_connector; LogPipeOptions options; gboolean (*pre_init)(LogPipe *self); diff --git a/lib/logwriter.c b/lib/logwriter.c index 236b046837..146eace4bf 100644 --- a/lib/logwriter.c +++ b/lib/logwriter.c @@ -1741,6 +1741,12 @@ log_writer_steal_proto(LogWriter *self) return proto; } +LogProtoClient * +log_writer_get_proto(LogWriter *self) +{ + return self->proto; +} + /* run in the main thread in reaction to a log_writer_reopen to change * the destination LogProtoClient instance. It needs to be ran in the main diff --git a/lib/logwriter.h b/lib/logwriter.h index ee2da062b0..aac2bb82ef 100644 --- a/lib/logwriter.h +++ b/lib/logwriter.h @@ -86,6 +86,7 @@ gboolean log_writer_has_pending_writes(LogWriter *self); gboolean log_writer_opened(LogWriter *self); void log_writer_reopen(LogWriter *self, LogProtoClient *proto); LogProtoClient *log_writer_steal_proto(LogWriter *self); +LogProtoClient *log_writer_get_proto(LogWriter *self); void log_writer_set_queue(LogWriter *self, LogQueue *queue); LogQueue *log_writer_get_queue(LogWriter *s); LogWriter *log_writer_new(guint32 flags, GlobalConfig *cfg); diff --git a/lib/transport/transport-adapter.c b/lib/transport/transport-adapter.c index 7b1e472e39..3a70a84391 100644 --- a/lib/transport/transport-adapter.c +++ b/lib/transport/transport-adapter.c @@ -27,7 +27,7 @@ gssize log_transport_adapter_read_method(LogTransport *s, gpointer buf, gsize buflen, LogTransportAuxData *aux) { LogTransportAdapter *self = (LogTransportAdapter *) s; - LogTransport *transport = log_transport_stack_get_transport(s->stack, self->base_index); + LogTransport *transport = log_transport_stack_get_or_create_transport(s->stack, self->base_index); return log_transport_read(transport, buf, buflen, aux); } @@ -36,7 +36,7 @@ gssize log_transport_adapter_write_method(LogTransport *s, const gpointer buf, gsize count) { LogTransportAdapter *self = (LogTransportAdapter *) s; - LogTransport *transport = log_transport_stack_get_transport(s->stack, self->base_index); + LogTransport *transport = log_transport_stack_get_or_create_transport(s->stack, self->base_index); return log_transport_write(transport, buf, count); } @@ -45,7 +45,7 @@ gssize log_transport_adapter_writev_method(LogTransport *s, struct iovec *iov, gint iov_count) { LogTransportAdapter *self = (LogTransportAdapter *) s; - LogTransport *transport = log_transport_stack_get_transport(s->stack, self->base_index); + LogTransport *transport = log_transport_stack_get_or_create_transport(s->stack, self->base_index); return log_transport_writev(transport, iov, iov_count); } diff --git a/lib/transport/transport-stack.c b/lib/transport/transport-stack.c index 556265d862..cc1fbe9166 100644 --- a/lib/transport/transport-stack.c +++ b/lib/transport/transport-stack.c @@ -59,7 +59,7 @@ log_transport_stack_switch(LogTransportStack *self, gint index) { g_assert(index < LOG_TRANSPORT__MAX); LogTransport *active_transport = log_transport_stack_get_active(self); - LogTransport *requested_transport = log_transport_stack_get_transport(self, index); + LogTransport *requested_transport = log_transport_stack_get_or_create_transport(self, index); if (!requested_transport) return FALSE; diff --git a/lib/transport/transport-stack.h b/lib/transport/transport-stack.h index a819ae065b..05c0aec421 100644 --- a/lib/transport/transport-stack.h +++ b/lib/transport/transport-stack.h @@ -108,7 +108,7 @@ struct _LogTransportStack }; static inline LogTransport * -log_transport_stack_get_transport(LogTransportStack *self, gint index) +log_transport_stack_get_or_create_transport(LogTransportStack *self, gint index) { g_assert(index < LOG_TRANSPORT__MAX); @@ -127,7 +127,16 @@ log_transport_stack_get_transport(LogTransportStack *self, gint index) static inline LogTransport * log_transport_stack_get_active(LogTransportStack *self) { - return log_transport_stack_get_transport(self, self->active_transport); + // TODO - Change it to log_transport_stack_get_transport after checking call sites + return log_transport_stack_get_or_create_transport(self, self->active_transport); +} + +static inline LogTransport * +log_transport_stack_get_transport(LogTransportStack *self, gint index) +{ + g_assert(index < LOG_TRANSPORT__MAX); + + return self->transports[index]; } void log_transport_stack_add_factory(LogTransportStack *self, LogTransportFactory *); diff --git a/lib/transport/transport-tls.c b/lib/transport/transport-tls.c index 3a632d0073..e328a6c16e 100644 --- a/lib/transport/transport-tls.c +++ b/lib/transport/transport-tls.c @@ -32,6 +32,8 @@ #include #include +const gchar *TLS_TRANSPORT_NAME = "tls"; + typedef struct _LogTransportTLS { LogTransportSocket super; @@ -239,6 +241,15 @@ log_transport_tls_write_method(LogTransport *s, const gpointer buf, gsize buflen return -1; } +TLSSession * +log_tansport_tls_get_session(LogTransport *s) +{ + g_assert(s->name == TLS_TRANSPORT_NAME); + + LogTransportTLS *self = (LogTransportTLS *)s; + return self->tls_session; +} + static void log_transport_tls_free_method(LogTransport *s); @@ -248,7 +259,7 @@ log_transport_tls_new(TLSSession *tls_session, gint fd) LogTransportTLS *self = g_new0(LogTransportTLS, 1); log_transport_stream_socket_init_instance(&self->super, fd); - self->super.super.name = "tls"; + self->super.super.name = TLS_TRANSPORT_NAME; self->super.super.cond = 0; self->super.super.read = log_transport_tls_read_method; self->super.super.write = log_transport_tls_write_method; diff --git a/lib/transport/transport-tls.h b/lib/transport/transport-tls.h index 682667e1ab..2a67765ab3 100644 --- a/lib/transport/transport-tls.h +++ b/lib/transport/transport-tls.h @@ -28,5 +28,6 @@ #include "transport/tls-context.h" LogTransport *log_transport_tls_new(TLSSession *tls_session, gint fd); +TLSSession *log_tansport_tls_get_session(LogTransport *s); #endif diff --git a/modules/afsocket/afinet-dest.c b/modules/afsocket/afinet-dest.c index 62016830e2..5d2713a609 100644 --- a/modules/afsocket/afinet-dest.c +++ b/modules/afsocket/afinet-dest.c @@ -28,6 +28,8 @@ #include "gprocess.h" #include "compat/openssl_support.h" #include "afsocket-signals.h" +#include "transport/transport-tls.h" +#include "transport/transport-stack.h" #include #include @@ -53,6 +55,12 @@ static const gint MAX_UDP_PAYLOAD_SIZE = 65507; +typedef struct _AFInetDestKeptAliveConnection +{ + AFSocketDestKeptAliveConnection super; + gchar *hostname; +} AFInetDestKeptAliveConnection; + typedef struct _AFInetDestDriverTLSVerifyData { TLSContext *tls_context; @@ -222,12 +230,18 @@ afinet_dd_setup_tls_verifier(AFInetDestDriver *self) AFInetDestDriverTLSVerifyData *verify_data; verify_data = afinet_dd_tls_verify_data_new(transport_mapper_inet->tls_context, _afinet_dd_get_hostname(self), - self->super.super.super.super.signal_slot_connector); + self->super.super.super.signal_slot_connector); TLSVerifier *verifier = tls_verifier_new(afinet_dd_verify_callback, verify_data, afinet_dd_tls_verify_data_free); transport_mapper_inet_set_tls_verifier(transport_mapper_inet, verifier); } +static AFInetDestDriverTLSVerifyData * +_get_tls_verify_data (TLSVerifier *verifier) +{ + return (AFInetDestDriverTLSVerifyData *)verifier->verify_data; +} + void afinet_dd_enable_failover(LogDriver *s) { @@ -697,6 +711,83 @@ afinet_dd_free(LogPipe *s) afsocket_dd_free(s); } +gboolean +afinet_dd_should_restore_connection(AFSocketDestDriver *s, AFSocketDestKeptAliveConnection *c) +{ + AFInetDestDriver *self = (AFInetDestDriver *) s; + AFInetDestKeptAliveConnection *conn = (AFInetDestKeptAliveConnection *) c; + + if (g_strcmp0(_afinet_dd_get_hostname(self), conn->hostname) != 0) + return FALSE; + + return afsocket_dd_should_restore_connection_method(&self->super, c); +} + +static void +afinet_dd_restore_connection(AFSocketDestDriver *s, AFSocketDestKeptAliveConnection *item) +{ + AFInetDestDriver *self = (AFInetDestDriver *) s; + + LogWriter *writer = item->writer; + + if (!writer) + goto exit; + + LogProtoClient *proto = log_writer_get_proto(writer); + + if (!proto) + goto exit; + + LogTransport *transport = log_transport_stack_get_transport(&proto->transport_stack, LOG_TRANSPORT_TLS); + + if (transport) + { + TLSSession *session = log_tansport_tls_get_session(transport); + AFInetDestDriverTLSVerifyData *verify_data = _get_tls_verify_data (session->verifier); + verify_data->signal_connector = self->super.super.super.signal_slot_connector; + } + +exit: + afsocket_dd_restore_connection_method(&self->super, item); +} + +static void +_kept_alive_connection_free(AFSocketDestKeptAliveConnection *s) +{ + AFInetDestKeptAliveConnection *self = (AFInetDestKeptAliveConnection *) s; + + g_free(self->hostname); + + afsocket_kept_alive_connection_free_method(&self->super); +} + +static AFInetDestKeptAliveConnection * +_kept_alive_connection_new(const gchar *transport, const gchar *proto, const gchar *hostname, + GSockAddr *dest_addr, LogWriter *writer) +{ + AFInetDestKeptAliveConnection *self = g_new(AFInetDestKeptAliveConnection, 1); + afsocket_kept_alive_connection_init_instance(&self->super, transport, proto, dest_addr, writer); + + self->super.free_fn = _kept_alive_connection_free; + + self->hostname = g_strdup(hostname); + + return self; +} + +static void +afinet_dd_save_connection(AFSocketDestDriver *s) +{ + AFInetDestDriver *self = (AFInetDestDriver *) s; + + const gchar *transport = transport_mapper_get_transport(self->super.transport_mapper); + const gchar *proto = transport_mapper_get_logproto(self->super.transport_mapper); + AFInetDestKeptAliveConnection *item = _kept_alive_connection_new(transport, proto, _afinet_dd_get_hostname(self), + self->super.dest_addr, self->super.writer); + + afsocket_dd_save_connection(&self->super, &item->super); +} + static AFInetDestDriver * afinet_dd_new_instance(TransportMapper *transport_mapper, gchar *hostname, GlobalConfig *cfg) { @@ -710,6 +801,9 @@ afinet_dd_new_instance(TransportMapper *transport_mapper, gchar *hostname, Globa self->super.construct_writer = afinet_dd_construct_writer; self->super.setup_addresses = afinet_dd_setup_addresses; self->super.get_dest_name = afinet_dd_get_dest_name; + self->super.should_restore_connection = afinet_dd_should_restore_connection; + self->super.restore_connection = afinet_dd_restore_connection; + self->super.save_connection = afinet_dd_save_connection; self->primary = g_strdup(hostname); diff --git a/modules/afsocket/afinet-dest.h b/modules/afsocket/afinet-dest.h index a6d489c88a..c72220bf2f 100644 --- a/modules/afsocket/afinet-dest.h +++ b/modules/afsocket/afinet-dest.h @@ -57,7 +57,6 @@ typedef struct _AFInetDestDriver gchar *bind_ip; /* character as it can contain a service name from /etc/services */ gchar *dest_port; - /* destination hostname is stored in super.hostname */ } AFInetDestDriver; void afinet_dd_set_localport(LogDriver *self, gchar *service); diff --git a/modules/afsocket/afsocket-dest.c b/modules/afsocket/afsocket-dest.c index 974179af42..5cc08e4d0b 100644 --- a/modules/afsocket/afsocket-dest.c +++ b/modules/afsocket/afsocket-dest.c @@ -37,25 +37,29 @@ #include #include -typedef struct _ReloadStoreItem + +void +afsocket_kept_alive_connection_init_instance(AFSocketDestKeptAliveConnection *s, const gchar *transport, + const gchar *proto, GSockAddr *dest_addr, LogWriter *writer) { - LogProtoClientFactory *proto_factory; - GSockAddr *dest_addr; - LogWriter *writer; -} ReloadStoreItem; + s->transport = g_strdup(transport); + s->proto = g_strdup(proto); + s->writer = writer; + s->dest_addr = g_sockaddr_ref(dest_addr); + s->free_fn = afsocket_kept_alive_connection_free_method; +} -static ReloadStoreItem * -_reload_store_item_new(AFSocketDestDriver *afsocket_dd) +static AFSocketDestKeptAliveConnection * +_kept_alive_connection_new(const gchar *transport, const gchar *proto, GSockAddr *dest_addr, LogWriter *writer) { - ReloadStoreItem *item = g_new(ReloadStoreItem, 1); - item->proto_factory = afsocket_dd->proto_factory; - item->writer = afsocket_dd->writer; - item->dest_addr = g_sockaddr_ref(afsocket_dd->dest_addr); - return item; + AFSocketDestKeptAliveConnection *conn = g_new(AFSocketDestKeptAliveConnection, 1); + afsocket_kept_alive_connection_init_instance(conn, transport, proto, dest_addr, writer); + + return conn; } -static void -_reload_store_item_free(ReloadStoreItem *self) +void +afsocket_kept_alive_connection_free_method(AFSocketDestKeptAliveConnection *self) { if (!self) return; @@ -64,11 +68,12 @@ _reload_store_item_free(ReloadStoreItem *self) log_pipe_unref((LogPipe *) self->writer); g_sockaddr_unref(self->dest_addr); - g_free(self); + g_free(self->transport); + g_free(self->proto); } static LogWriter * -_reload_store_item_release_writer(ReloadStoreItem *self) +_kept_alive_connection_steal_writer(AFSocketDestKeptAliveConnection *self) { LogWriter *writer = self->writer; self->writer = NULL; @@ -76,10 +81,11 @@ _reload_store_item_release_writer(ReloadStoreItem *self) return writer; } -static inline gboolean -_is_protocol_compatible_with_writer_after_reload(AFSocketDestDriver *self, ReloadStoreItem *item) +gboolean +afsocket_dd_should_restore_connection_method(AFSocketDestDriver *self, AFSocketDestKeptAliveConnection *c) { - return (self->proto_factory->construct == item->proto_factory->construct); + return g_strcmp0(transport_mapper_get_transport(self->transport_mapper), c->transport) == 0 + && g_strcmp0(transport_mapper_get_logproto(self->transport_mapper), c->proto) == 0; } void @@ -460,23 +466,32 @@ _afsocket_dd_try_to_restore_connection_state(AFSocketDestDriver *self) if (self->writer) return TRUE; - ReloadStoreItem *item = cfg_persist_config_fetch( - log_pipe_get_config(&self->super.super.super), - afsocket_dd_format_connections_name(self)); + AFSocketDestKeptAliveConnection *item = cfg_persist_config_fetch(log_pipe_get_config(&self->super.super.super), + afsocket_dd_format_connections_name(self)); /* We don't have an item stored in the reload cache, which means */ /* it is the first time when we try to initialize the writer */ if (!item) return FALSE; - if (_is_protocol_compatible_with_writer_after_reload(self, item)) - self->writer = _reload_store_item_release_writer(item); + if (!self->should_restore_connection(self, item)) + { + afsocket_kept_alive_connection_free(item); + return FALSE; + } - self->dest_addr = g_sockaddr_ref(item->dest_addr); - _reload_store_item_free(item); + self->restore_connection(self, item); + afsocket_kept_alive_connection_free(item); return TRUE; } +void +afsocket_dd_restore_connection_method(AFSocketDestDriver *self, AFSocketDestKeptAliveConnection *item) +{ + self->writer = _kept_alive_connection_steal_writer(item); + self->dest_addr = g_sockaddr_ref(item->dest_addr); +} + LogWriter * afsocket_dd_construct_writer_method(AFSocketDestDriver *self) { @@ -704,18 +719,24 @@ afsocket_dd_stop_writer(AFSocketDestDriver *self) log_pipe_deinit((LogPipe *) self->writer); } -static void -afsocket_dd_save_connection(AFSocketDestDriver *self) +void +afsocket_dd_save_connection(AFSocketDestDriver *self, AFSocketDestKeptAliveConnection *c) { GlobalConfig *cfg = log_pipe_get_config(&self->super.super.super); - if (self->connections_kept_alive_across_reloads) - { - ReloadStoreItem *item = _reload_store_item_new(self); - cfg_persist_config_add(cfg, afsocket_dd_format_connections_name(self), item, - (GDestroyNotify)_reload_store_item_free); - self->writer = NULL; - } + cfg_persist_config_add(cfg, afsocket_dd_format_connections_name(self), c, + (GDestroyNotify) afsocket_kept_alive_connection_free); + self->writer = NULL; +} + +static void +afsocket_dd_save_connection_method(AFSocketDestDriver *self) +{ + const gchar *transport = transport_mapper_get_transport(self->transport_mapper); + const gchar *proto = transport_mapper_get_logproto(self->transport_mapper); + AFSocketDestKeptAliveConnection *item = _kept_alive_connection_new(transport, proto, self->dest_addr, self->writer); + + afsocket_dd_save_connection(self, item); } gboolean @@ -726,9 +747,9 @@ afsocket_dd_deinit(LogPipe *s) afsocket_dd_stop_watches(self); afsocket_dd_stop_writer(self); - if (self->connection_initialized) + if (self->connection_initialized && self->connections_kept_alive_across_reloads) { - afsocket_dd_save_connection(self); + self->save_connection(self); } afsocket_dd_unregister_stats(self); @@ -788,6 +809,9 @@ afsocket_dd_init_instance(AFSocketDestDriver *self, self->super.super.super.generate_persist_name = afsocket_dd_format_name; self->setup_addresses = afsocket_dd_setup_addresses_method; self->construct_writer = afsocket_dd_construct_writer_method; + self->should_restore_connection = afsocket_dd_should_restore_connection_method; + self->restore_connection = afsocket_dd_restore_connection_method; + self->save_connection = afsocket_dd_save_connection_method; self->transport_mapper = transport_mapper; self->socket_options = socket_options; self->connections_kept_alive_across_reloads = TRUE; diff --git a/modules/afsocket/afsocket-dest.h b/modules/afsocket/afsocket-dest.h index 83e9997a90..ba934284d7 100644 --- a/modules/afsocket/afsocket-dest.h +++ b/modules/afsocket/afsocket-dest.h @@ -33,6 +33,7 @@ #include typedef struct _AFSocketDestDriver AFSocketDestDriver; +typedef struct _AFSocketDestKeptAliveConnection AFSocketDestKeptAliveConnection; struct _AFSocketDestDriver { @@ -62,8 +63,24 @@ struct _AFSocketDestDriver LogWriter *(*construct_writer)(AFSocketDestDriver *self); gboolean (*setup_addresses)(AFSocketDestDriver *s); const gchar *(*get_dest_name)(const AFSocketDestDriver *s); + + void (*save_connection)(AFSocketDestDriver *s); + gboolean (*should_restore_connection)(AFSocketDestDriver *s, AFSocketDestKeptAliveConnection *c); + void (*restore_connection)(AFSocketDestDriver *s, AFSocketDestKeptAliveConnection *c); +}; + +struct _AFSocketDestKeptAliveConnection +{ + gchar *transport; + gchar *proto; + + GSockAddr *dest_addr; + LogWriter *writer; + + void (*free_fn)(AFSocketDestKeptAliveConnection *s); }; + static inline LogWriter * afsocket_dd_construct_writer(AFSocketDestDriver *self) { @@ -96,4 +113,22 @@ gboolean afsocket_dd_deinit(LogPipe *s); void afsocket_dd_free(LogPipe *s); void afsocket_dd_connected_with_fd(gpointer self, gint fd, GSockAddr *saddr); + +void afsocket_dd_save_connection(AFSocketDestDriver *self, AFSocketDestKeptAliveConnection *c); +gboolean afsocket_dd_should_restore_connection_method(AFSocketDestDriver *self, AFSocketDestKeptAliveConnection *c); +void afsocket_dd_restore_connection_method(AFSocketDestDriver *self, AFSocketDestKeptAliveConnection *item); + +void afsocket_kept_alive_connection_init_instance(AFSocketDestKeptAliveConnection *s, + const gchar *transport, const gchar *proto, + GSockAddr *dest_addr, LogWriter *writer); + +void afsocket_kept_alive_connection_free_method(AFSocketDestKeptAliveConnection *s); + +static inline void +afsocket_kept_alive_connection_free(AFSocketDestKeptAliveConnection *self) +{ + self->free_fn(self); + g_free(self); +} + #endif diff --git a/modules/afsocket/afsocket-source.c b/modules/afsocket/afsocket-source.c index 1b4e14fd60..4c5d67b1dc 100644 --- a/modules/afsocket/afsocket-source.c +++ b/modules/afsocket/afsocket-source.c @@ -1070,7 +1070,7 @@ afsocket_sd_open_socket(AFSocketSourceDriver *self, gint *sock) AFSocketSetupSocketSignalData signal_data = {0}; signal_data.sock = *sock; - EMIT(self->super.super.super.signal_slot_connector, signal_afsocket_setup_socket, &signal_data); + EMIT(self->super.super.signal_slot_connector, signal_afsocket_setup_socket, &signal_data); return !signal_data.failure; } diff --git a/modules/afsocket/transport-mapper-inet.c b/modules/afsocket/transport-mapper-inet.c index cc05b69a86..c113f1d038 100644 --- a/modules/afsocket/transport-mapper-inet.c +++ b/modules/afsocket/transport-mapper-inet.c @@ -364,14 +364,13 @@ static gboolean transport_mapper_network_apply_transport(TransportMapper *s, GlobalConfig *cfg) { TransportMapperInet *self = (TransportMapperInet *) s; - const gchar *transport; /* determine default port, apply transport setting to afsocket flags */ if (!transport_mapper_apply_transport_method(s, cfg)) return FALSE; - transport = self->super.transport; + const gchar *transport = self->super.transport; self->server_port = NETWORK_PORT; if (strcasecmp(transport, "udp") == 0) { diff --git a/modules/afsocket/transport-mapper.h b/modules/afsocket/transport-mapper.h index 3569a7581e..777b9617c5 100644 --- a/modules/afsocket/transport-mapper.h +++ b/modules/afsocket/transport-mapper.h @@ -72,6 +72,18 @@ void transport_mapper_init_instance(TransportMapper *self, const gchar *transpor void transport_mapper_free(TransportMapper *self); void transport_mapper_free_method(TransportMapper *self); +static inline const gchar * +transport_mapper_get_transport(TransportMapper *self) +{ + return self->transport; +} + +static inline const gchar * +transport_mapper_get_logproto(TransportMapper *self) +{ + return self->logproto; +} + static inline const gchar * transport_mapper_get_transport_name(TransportMapper *self, gsize *len) { diff --git a/modules/azure-auth-header/azure-auth-header.c b/modules/azure-auth-header/azure-auth-header.c index 21c2ff8750..aa8693f7a5 100644 --- a/modules/azure-auth-header/azure-auth-header.c +++ b/modules/azure-auth-header/azure-auth-header.c @@ -181,7 +181,7 @@ _attach(LogDriverPlugin *s, LogDriver *driver) { AzureAuthHeaderPlugin *self = (AzureAuthHeaderPlugin *)s; - SignalSlotConnector *ssc = driver->super.signal_slot_connector; + SignalSlotConnector *ssc = driver->signal_slot_connector; msg_debug("AzureAuthHeaderPlugin::attach()", evt_tag_printf("SignalSlotConnector", "%p", ssc), evt_tag_printf("AzureAuthHeaderPlugin", "%p", s)); @@ -196,7 +196,7 @@ _detach(LogDriverPlugin *s, LogDriver *driver) { AzureAuthHeaderPlugin *self = (AzureAuthHeaderPlugin *)s; - SignalSlotConnector *ssc = driver->super.signal_slot_connector; + SignalSlotConnector *ssc = driver->signal_slot_connector; DISCONNECT(ssc, signal_http_header_request, _slot_append_headers, self); } @@ -226,5 +226,3 @@ azure_auth_header_plugin_new(void) return self; } - - diff --git a/modules/cloud-auth/cloud-auth.c b/modules/cloud-auth/cloud-auth.c index 4af4004b40..58d89865a8 100644 --- a/modules/cloud-auth/cloud-auth.c +++ b/modules/cloud-auth/cloud-auth.c @@ -40,7 +40,7 @@ _attach(LogDriverPlugin *s, LogDriver *d) if (!cloud_authenticator_init(self->authenticator)) return FALSE; - SignalSlotConnector *ssc = driver->super.super.signal_slot_connector; + SignalSlotConnector *ssc = driver->super.signal_slot_connector; CONNECT(ssc, signal_http_header_request, cloud_authenticator_handle_http_header_request, self->authenticator); return TRUE; @@ -54,7 +54,7 @@ _detach(LogDriverPlugin *s, LogDriver *d) cloud_authenticator_deinit(self->authenticator); - SignalSlotConnector *ssc = driver->super.super.signal_slot_connector; + SignalSlotConnector *ssc = driver->super.signal_slot_connector; DISCONNECT(ssc, signal_http_header_request, cloud_authenticator_handle_http_header_request, self->authenticator); } diff --git a/modules/ebpf/ebpf-reuseport.c b/modules/ebpf/ebpf-reuseport.c index 150d47d357..b534b09d45 100644 --- a/modules/ebpf/ebpf-reuseport.c +++ b/modules/ebpf/ebpf-reuseport.c @@ -76,7 +76,7 @@ _attach(LogDriverPlugin *s, LogDriver *driver) } self->random->bss->number_of_sockets = self->number_of_sockets; - SignalSlotConnector *ssc = driver->super.signal_slot_connector; + SignalSlotConnector *ssc = driver->signal_slot_connector; CONNECT(ssc, signal_afsocket_setup_socket, _slot_setup_socket, self); return TRUE; @@ -87,7 +87,7 @@ _detach(LogDriverPlugin *s, LogDriver *driver) { EBPFReusePort *self = (EBPFReusePort *)s; - SignalSlotConnector *ssc = driver->super.signal_slot_connector; + SignalSlotConnector *ssc = driver->signal_slot_connector; DISCONNECT(ssc, signal_afsocket_setup_socket, _slot_setup_socket, self); } diff --git a/modules/examples/inner-destinations/http-test-slots/http-test-slots.c b/modules/examples/inner-destinations/http-test-slots/http-test-slots.c index a6a0f23b1f..5bc682c510 100644 --- a/modules/examples/inner-destinations/http-test-slots/http-test-slots.c +++ b/modules/examples/inner-destinations/http-test-slots/http-test-slots.c @@ -48,7 +48,7 @@ _slot_append_test_headers(HttpTestSlotsPlugin *self, HttpHeaderRequestSignalData static gboolean _attach(LogDriverPlugin *s, LogDriver *driver) { - SignalSlotConnector *ssc = driver->super.signal_slot_connector; + SignalSlotConnector *ssc = driver->signal_slot_connector; msg_debug("HttpTestSlotsPlugin::attach()", evt_tag_printf("SignalSlotConnector", "%p", ssc)); @@ -61,7 +61,7 @@ _attach(LogDriverPlugin *s, LogDriver *driver) static void _detach(LogDriverPlugin *s, LogDriver *driver) { - SignalSlotConnector *ssc = driver->super.signal_slot_connector; + SignalSlotConnector *ssc = driver->signal_slot_connector; msg_debug("HttpTestSlotsPlugin::detach()", evt_tag_printf("SignalSlotConnector", "%p", ssc)); diff --git a/modules/examples/inner-destinations/tls-test-validation/tls-test-validation.c b/modules/examples/inner-destinations/tls-test-validation/tls-test-validation.c index 2e4694173a..0fb6061004 100644 --- a/modules/examples/inner-destinations/tls-test-validation/tls-test-validation.c +++ b/modules/examples/inner-destinations/tls-test-validation/tls-test-validation.c @@ -52,7 +52,7 @@ _slot_append_test_identity(TlsTestValidationPlugin *self, AFSocketTLSCertificate static gboolean _attach(LogDriverPlugin *s, LogDriver *driver) { - SignalSlotConnector *ssc = driver->super.signal_slot_connector; + SignalSlotConnector *ssc = driver->signal_slot_connector; msg_debug("TlsTestValidationPlugin::attach()", evt_tag_printf("SignalSlotConnector", "%p", ssc)); @@ -65,7 +65,7 @@ _attach(LogDriverPlugin *s, LogDriver *driver) static void _detach(LogDriverPlugin *s, LogDriver *driver) { - SignalSlotConnector *ssc = driver->super.signal_slot_connector; + SignalSlotConnector *ssc = driver->signal_slot_connector; msg_debug("TlsTestValidationPlugin::detach()", evt_tag_printf("SignalSlotConnector", "%p", ssc)); diff --git a/modules/http/http-worker.c b/modules/http/http-worker.c index 58092905a3..13f489adc4 100644 --- a/modules/http/http-worker.c +++ b/modules/http/http-worker.c @@ -241,7 +241,7 @@ _collect_rest_headers(HTTPDestinationWorker *self, GError **error) HTTPDestinationDriver *owner = (HTTPDestinationDriver *) self->super.owner; - EMIT(owner->super.super.super.super.signal_slot_connector, signal_http_header_request, &signal_data); + EMIT(owner->super.super.super.signal_slot_connector, signal_http_header_request, &signal_data); _set_error_from_slot_result(signal_http_header_request, signal_data.result, error); } @@ -683,7 +683,7 @@ _flush_on_target(HTTPDestinationWorker *self, const gchar *url) .http_code = http_code }; - EMIT(owner->super.super.super.super.signal_slot_connector, signal_http_response_received, &signal_data); + EMIT(owner->super.super.super.signal_slot_connector, signal_http_response_received, &signal_data); if (signal_data.result == HTTP_SLOT_RESOLVED) { diff --git a/modules/http/tests/test_http-signal_slot.c b/modules/http/tests/test_http-signal_slot.c index 7a8a3285a2..17a66f0d0f 100644 --- a/modules/http/tests/test_http-signal_slot.c +++ b/modules/http/tests/test_http-signal_slot.c @@ -104,7 +104,7 @@ _check(const gchar *expected_body, HttpHeaderRequestSignalData *data) Test(test_http_signal_slot, basic) { - SignalSlotConnector *ssc = driver->super.super.super.super.signal_slot_connector; + SignalSlotConnector *ssc = driver->super.super.super.signal_slot_connector; const gchar *test_msg = "test message"; CONNECT(ssc, signal_http_header_request, _check, test_msg); @@ -123,7 +123,7 @@ Test(test_http_signal_slot, single_with_prefix_suffix) http_dd_set_body_suffix((LogDriver *)driver, "]"); http_dd_set_delimiter((LogDriver *)driver, ","); - SignalSlotConnector *ssc = driver->super.super.super.super.signal_slot_connector; + SignalSlotConnector *ssc = driver->super.super.super.signal_slot_connector; CONNECT(ssc, signal_http_header_request, _check, "[almafa]"); @@ -143,7 +143,7 @@ Test(test_http_signal_slot, batch_with_prefix_suffix) log_threaded_dest_driver_set_batch_lines((LogDriver *)driver, 2); log_threaded_dest_driver_set_batch_timeout((LogDriver *)driver, 1000); - SignalSlotConnector *ssc = driver->super.super.super.super.signal_slot_connector; + SignalSlotConnector *ssc = driver->super.super.super.signal_slot_connector; CONNECT(ssc, signal_http_header_request, _check, "[1,2]"); diff --git a/modules/python/python-http-header.c b/modules/python/python-http-header.c index d74fa4d525..fb059cd1f3 100644 --- a/modules/python/python-http-header.c +++ b/modules/python/python-http-header.c @@ -382,7 +382,7 @@ _attach(LogDriverPlugin *s, LogDriver *driver) return FALSE; } - SignalSlotConnector *ssc = driver->super.signal_slot_connector; + SignalSlotConnector *ssc = driver->signal_slot_connector; CONNECT(ssc, signal_http_header_request, _append_headers, self); CONNECT(ssc, signal_http_response_received, _on_http_response_received, self); @@ -394,7 +394,7 @@ static void _detach(LogDriverPlugin *s, LogDriver *driver) { PythonHttpHeaderPlugin *self = (PythonHttpHeaderPlugin *) s; - SignalSlotConnector *ssc = driver->super.signal_slot_connector; + SignalSlotConnector *ssc = driver->signal_slot_connector; DISCONNECT(ssc, signal_http_header_request, _append_headers, self); DISCONNECT(ssc, signal_http_response_received, _on_http_response_received, self); diff --git a/news/bugfix-418.md b/news/bugfix-418.md new file mode 100644 index 0000000000..f243bcab62 --- /dev/null +++ b/news/bugfix-418.md @@ -0,0 +1,4 @@ +`network(), syslog()`: Fixed a potential crash for TLS destinations during reload + +In case of a TLS connection, if the handshake didn't happen before reloading AxoSyslog, +it crashed on the first message sent to that destination. \ No newline at end of file diff --git a/tests/copyright/policy b/tests/copyright/policy index caeb3e1ebf..100aab0bb7 100644 --- a/tests/copyright/policy +++ b/tests/copyright/policy @@ -277,6 +277,7 @@ tests/light/functional_tests/source_drivers/syslog_source/auto/test_auto_proto\. tests/light/src/syslog_ng_ctl/prometheus_stats_handler.py tests/light/src/syslog_ng_config/statements/template/template\.py tests/light/src/syslog_ng_config/statements/__init__\.py +tests/light/functional_tests/destination_drivers/network_destination/test_kept_alive_tls_connection_doing_handshake_after_reload\.py modules/correlation/id-counter\.[ch]$ modules/correlation/group-lines.h modules/xml/windows-eventlog-xml-parser\.h diff --git a/tests/light/functional_tests/destination_drivers/network_destination/test_kept_alive_tls_connection_doing_handshake_after_reload.py b/tests/light/functional_tests/destination_drivers/network_destination/test_kept_alive_tls_connection_doing_handshake_after_reload.py new file mode 100644 index 0000000000..f88f6672d4 --- /dev/null +++ b/tests/light/functional_tests/destination_drivers/network_destination/test_kept_alive_tls_connection_doing_handshake_after_reload.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python +############################################################################# +# Copyright (c) 2024 Axoflow +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License version 2 as published +# by the Free Software Foundation, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +# +# As an additional exemption you are allowed to compile & link against the +# OpenSSL libraries as published by the OpenSSL project. See the file +# COPYING for details. +# +############################################################################# +from src.common.file import copy_shared_file + + +def test_kept_alive_tls_connection_doing_handshake_after_reload(config, syslog_ng, port_allocator, testcase_parameters): + ca = copy_shared_file(testcase_parameters, "valid-ca.crt") + + network_source = config.create_network_source(port=port_allocator()) + tls_network_destination = config.create_network_destination( + ip="localhost", port=port_allocator(), transport="tls", + keep_alive="yes", + tls={ + "ca-file": ca, + "peer-verify": "yes", + }, + ) + + config.create_logpath(statements=[network_source, tls_network_destination]) + + tls_network_destination.start_listener() + syslog_ng.start(config) + + syslog_ng.reload(config) + network_source.write_log("test msg") + + assert tls_network_destination.read_until_logs(["test msg"]) diff --git a/tests/light/shared_files/valid-ca.crt b/tests/light/shared_files/valid-ca.crt new file mode 100644 index 0000000000..7511001b28 --- /dev/null +++ b/tests/light/shared_files/valid-ca.crt @@ -0,0 +1,12 @@ +-----BEGIN CERTIFICATE----- +MIIBrzCCAWGgAwIBAgIUcnaPg4elxLxduWuz285K04ySXm8wBQYDK2VwMEwxCzAJ +BgNVBAYTAkhVMQswCQYDVQQIDAJCUDERMA8GA1UEBwwIQnVkYXBlc3QxEDAOBgNV +BAoMB0F4b2Zsb3cxCzAJBgNVBAMMAkNBMCAXDTI0MTIxNDEyMDgxMVoYDzIxMjQx +MTIwMTIwODExWjBMMQswCQYDVQQGEwJIVTELMAkGA1UECAwCQlAxETAPBgNVBAcM +CEJ1ZGFwZXN0MRAwDgYDVQQKDAdBeG9mbG93MQswCQYDVQQDDAJDQTAqMAUGAytl +cAMhABy7FT1AbxGnqrainAD583ToCDPgewE9KEhcOyoOjx+fo1MwUTAdBgNVHQ4E +FgQUvb+QlzW+T0PrjRBlQ8xFaqzdFpYwHwYDVR0jBBgwFoAUvb+QlzW+T0PrjRBl +Q8xFaqzdFpYwDwYDVR0TAQH/BAUwAwEB/zAFBgMrZXADQQA6zlO5N4zE1EveBX8p +0qt1pszyettEDG6SOMGQsZmo0DvBo/d8IUN0pvmupP8ODCbRtBm2BFYNpwkaMnv5 +iHkH +-----END CERTIFICATE----- diff --git a/tests/light/shared_files/valid-ca.key b/tests/light/shared_files/valid-ca.key new file mode 100644 index 0000000000..c46257b0e1 --- /dev/null +++ b/tests/light/shared_files/valid-ca.key @@ -0,0 +1,3 @@ +-----BEGIN PRIVATE KEY----- +MC4CAQAwBQYDK2VwBCIEIIJdHC0MTNkrdOUeEMsbdsjB8XFE18fsTW85Gi79tdfy +-----END PRIVATE KEY----- diff --git a/tests/light/shared_files/valid-localhost.crt b/tests/light/shared_files/valid-localhost.crt new file mode 100644 index 0000000000..b8a3d868cb --- /dev/null +++ b/tests/light/shared_files/valid-localhost.crt @@ -0,0 +1,10 @@ +-----BEGIN CERTIFICATE----- +MIIBXDCCAQ4CFHPirqN81POlHqbX64hq/v7yULDPMAUGAytlcDBMMQswCQYDVQQG +EwJIVTELMAkGA1UECAwCQlAxETAPBgNVBAcMCEJ1ZGFwZXN0MRAwDgYDVQQKDAdB +eG9mbG93MQswCQYDVQQDDAJDQTAgFw0yNDEyMTQxMjA5NDNaGA8yMTI0MTEyMDEy +MDk0M1owUzELMAkGA1UEBhMCSFUxCzAJBgNVBAgMAkJQMREwDwYDVQQHDAhCdWRh +cGVzdDEQMA4GA1UECgwHQXhvZmxvdzESMBAGA1UEAwwJbG9jYWxob3N0MCowBQYD +K2VwAyEAiDEFvid9kcpBbPgxCBaVWuj8YJZ3qZB1nBdsBuBMwEkwBQYDK2VwA0EA +vuyKnc/DhVcZgn1BInCRMMDU/15kpgwoI6y36IO3ftCUmSVR79zn//YiotaeSBf9 +2HRhksjez10aoLLJvDErDg== +-----END CERTIFICATE----- diff --git a/tests/light/shared_files/valid-localhost.key b/tests/light/shared_files/valid-localhost.key new file mode 100644 index 0000000000..5b6e1e24bb --- /dev/null +++ b/tests/light/shared_files/valid-localhost.key @@ -0,0 +1,3 @@ +-----BEGIN PRIVATE KEY----- +MC4CAQAwBQYDK2VwBCIEIGfwI2Ogf0umSj1p5yOosbwOnDqhOOP9rolfsky9gytI +-----END PRIVATE KEY----- diff --git a/tests/light/src/common/file.py b/tests/light/src/common/file.py index 51c3c74a80..edf1386e85 100644 --- a/tests/light/src/common/file.py +++ b/tests/light/src/common/file.py @@ -45,6 +45,11 @@ def copy_shared_file(testcase_parameters, shared_file_name): return Path(Path.cwd(), shared_file_name) +def get_shared_file(shared_file_name): + absolute_framework_dir = Path(__file__).parents[2].resolve() + return absolute_framework_dir / "shared_files" / shared_file_name + + def delete_session_file(shared_file_name): shared_file_name = Path(shared_file_name) shared_file_name.unlink() diff --git a/tests/light/src/driver_io/network/network_io.py b/tests/light/src/driver_io/network/network_io.py index 5a27c4941a..e455045f4d 100644 --- a/tests/light/src/driver_io/network/network_io.py +++ b/tests/light/src/driver_io/network/network_io.py @@ -21,6 +21,7 @@ # ############################################################################# import socket +import ssl from enum import Enum from enum import IntEnum from pathlib import Path @@ -28,6 +29,7 @@ from src.common.asynchronous import BackgroundEventLoop from src.common.blocking import DEFAULT_TIMEOUT from src.common.file import File +from src.common.file import get_shared_file from src.common.network import SingleConnectionTCPServer from src.common.network import UDPServer from src.common.random_id import get_unique_id @@ -84,8 +86,12 @@ def construct(self, port, host=None, ip_proto_version=None): def _construct(server, reader_class): return reader_class(server), server + tls = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + tls.load_cert_chain(get_shared_file("valid-localhost.crt"), get_shared_file("valid-localhost.key")) + transport_mapping = { NetworkIO.Transport.TCP: lambda: _construct(SingleConnectionTCPServer(port, host, ip_proto_version), message_readers.SingleLineStreamReader), + NetworkIO.Transport.TLS: lambda: _construct(SingleConnectionTCPServer(port, host, ip_proto_version, tls), message_readers.SingleLineStreamReader), NetworkIO.Transport.UDP: lambda: _construct(UDPServer(port, host, ip_proto_version), message_readers.DatagramReader), } return transport_mapping[self]() diff --git a/tests/light/src/syslog_ng_config/statements/destinations/network_destination.py b/tests/light/src/syslog_ng_config/statements/destinations/network_destination.py index 1cb8c53a09..be16e80274 100644 --- a/tests/light/src/syslog_ng_config/statements/destinations/network_destination.py +++ b/tests/light/src/syslog_ng_config/statements/destinations/network_destination.py @@ -28,6 +28,7 @@ def map_transport(transport): mapping = { "tcp": NetworkIO.Transport.TCP, "udp": NetworkIO.Transport.UDP, + "tls": NetworkIO.Transport.TLS, } transport = transport.replace("_", "-").replace("'", "").replace('"', "").lower()