Skip to content

Commit

Permalink
Merge pull request #295 from MrAnno/remove-legacy-fifo-size
Browse files Browse the repository at this point in the history
logqueue-fifo: remove legacy flow-control+log-fifo-size() coupling
  • Loading branch information
bazsi authored Sep 20, 2024
2 parents 80402c6 + a7347e4 commit fe6cfcc
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 64 deletions.
11 changes: 0 additions & 11 deletions lib/driver.c
Original file line number Diff line number Diff line change
Expand Up @@ -265,17 +265,6 @@ _create_memory_queue(LogDestDriver *self, const gchar *persist_name, gint stats_

gint log_fifo_size = self->log_fifo_size < 0 ? cfg->log_fifo_size : self->log_fifo_size;

if (cfg_is_config_version_older(cfg, VERSION_VALUE_3_22))
{
msg_warning_once("WARNING: log-fifo-size() works differently starting with " VERSION_3_22 " to avoid dropping "
"flow-controlled messages when log-fifo-size() is misconfigured. From now on, log-fifo-size() "
"only affects messages that are not flow-controlled. (Flow-controlled log paths have the "
"flags(flow-control) option set.) To enable the new behaviour, update the @version string in "
"your configuration and consider lowering the value of log-fifo-size().");

return log_queue_fifo_legacy_new(log_fifo_size, persist_name, stats_level, driver_sck_builder, queue_sck_builder);
}

return log_queue_fifo_new(log_fifo_size, persist_name, stats_level, driver_sck_builder, queue_sck_builder);
}

Expand Down
57 changes: 7 additions & 50 deletions lib/logqueue-fifo.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,6 @@ typedef struct _LogQueueFifo

gint log_fifo_size;

/* legacy: flow-controlled messages are included in the log_fifo_size limit */
gboolean use_legacy_fifo_size;

struct
{
StatsClusterKey *capacity_sc_key;
Expand Down Expand Up @@ -193,7 +190,7 @@ log_queue_fifo_drop_messages_from_input_queue(LogQueueFifo *self, InputQueue *in
path_options.ack_needed = node->ack_needed;
path_options.flow_control_requested = node->flow_control_requested;

if (!self->use_legacy_fifo_size && path_options.flow_control_requested)
if (path_options.flow_control_requested)
continue;

iv_list_del(&node->list);
Expand All @@ -202,13 +199,9 @@ log_queue_fifo_drop_messages_from_input_queue(LogQueueFifo *self, InputQueue *in
log_msg_free_queue_node(node);

LogMessage *msg = node->msg;
if (path_options.flow_control_requested)
log_msg_drop(msg, &path_options, AT_SUSPENDED);
else
{
input_queue->non_flow_controlled_len--;
log_msg_drop(msg, &path_options, AT_PROCESSED);
}

input_queue->non_flow_controlled_len--;
log_msg_drop(msg, &path_options, AT_PROCESSED);

dropped++;
}
Expand Down Expand Up @@ -236,19 +229,8 @@ log_queue_fifo_calculate_num_of_messages_to_drop(LogQueueFifo *self, InputQueue
* justify proper locking in this case.
*/

guint16 input_queue_len;
gint queue_len;

if (G_UNLIKELY(self->use_legacy_fifo_size))
{
queue_len = log_queue_fifo_get_length(&self->super);
input_queue_len = input_queue->len;
}
else
{
queue_len = log_queue_fifo_get_non_flow_controlled_length(self);
input_queue_len = input_queue->non_flow_controlled_len;
}
gint queue_len = log_queue_fifo_get_non_flow_controlled_length(self);
guint16 input_queue_len = input_queue->non_flow_controlled_len;

gboolean drop_messages = queue_len + input_queue_len > self->log_fifo_size;
if (!drop_messages)
Expand Down Expand Up @@ -312,25 +294,10 @@ log_queue_fifo_move_input(gpointer user_data)
static inline gboolean
_message_has_to_be_dropped(LogQueueFifo *self, const LogPathOptions *path_options)
{
if (G_UNLIKELY(self->use_legacy_fifo_size))
return log_queue_fifo_get_length(&self->super) >= self->log_fifo_size;

return !path_options->flow_control_requested
&& log_queue_fifo_get_non_flow_controlled_length(self) >= self->log_fifo_size;
}

static inline void
_drop_message(LogMessage *msg, const LogPathOptions *path_options)
{
if (path_options->flow_control_requested)
{
log_msg_drop(msg, path_options, AT_SUSPENDED);
return;
}

log_msg_drop(msg, path_options, AT_PROCESSED);
}

/**
* Assumed to be called from one of the input threads. If the thread_index
* cannot be determined, the item is put directly in the wait queue.
Expand Down Expand Up @@ -406,7 +373,7 @@ log_queue_fifo_push_tail(LogQueue *s, LogMessage *msg, const LogPathOptions *pat
log_queue_dropped_messages_inc(&self->super);
g_mutex_unlock(&self->super.lock);

_drop_message(msg, path_options);
log_msg_drop(msg, path_options, AT_PROCESSED);

msg_debug("Destination queue full, dropping message",
evt_tag_int("queue_len", log_queue_fifo_get_length(&self->super)),
Expand Down Expand Up @@ -748,16 +715,6 @@ log_queue_fifo_new(gint log_fifo_size, const gchar *persist_name, gint stats_lev
return &self->super;
}

LogQueue *
log_queue_fifo_legacy_new(gint log_fifo_size, const gchar *persist_name, gint stats_level,
StatsClusterKeyBuilder *driver_sck_builder, StatsClusterKeyBuilder *queue_sck_builder)
{
LogQueueFifo *self = (LogQueueFifo *) log_queue_fifo_new(log_fifo_size, persist_name, stats_level,
driver_sck_builder, queue_sck_builder);
self->use_legacy_fifo_size = TRUE;
return &self->super;
}

QueueType
log_queue_fifo_get_type(void)
{
Expand Down
3 changes: 0 additions & 3 deletions lib/logqueue-fifo.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@
LogQueue *log_queue_fifo_new(gint log_fifo_size, const gchar *persist_name, gint stats_level,
StatsClusterKeyBuilder *driver_sck_builder,
StatsClusterKeyBuilder *queue_sck_builder);
LogQueue *log_queue_fifo_legacy_new(gint log_fifo_size, const gchar *persist_name, gint stats_level,
StatsClusterKeyBuilder *driver_sck_builder,
StatsClusterKeyBuilder *queue_sck_builder);

QueueType log_queue_fifo_get_type(void);

Expand Down

0 comments on commit fe6cfcc

Please sign in to comment.