diff --git a/VERSION b/VERSION index d917d3e..b1e80bb 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.1.2 +0.1.3 diff --git a/collector/nodestats_api.go b/collector/nodestats_api.go index 8bd3dc5..d2b2a6d 100644 --- a/collector/nodestats_api.go +++ b/collector/nodestats_api.go @@ -3,17 +3,19 @@ package collector // Pipeline type type Pipeline struct { Events struct { - DurationInMillis int `json:"duration_in_millis"` - In int `json:"in"` - Filtered int `json:"filtered"` - Out int `json:"out"` + DurationInMillis int `json:"duration_in_millis"` + In int `json:"in"` + Filtered int `json:"filtered"` + Out int `json:"out"` + QueuePushDurationInMillis int `json:"queue_push_duration_in_millis"` } `json:"events"` Plugins struct { Inputs []struct { ID string `json:"id"` Events struct { - In int `json:"in"` - Out int `json:"out"` + QueuePushDurationInMillis int `json:"queue_push_duration_in_millis"` + In int `json:"in"` + Out int `json:"out"` } `json:"events"` Name string `json:"name"` } `json:"inputs,omitempty"` @@ -35,8 +37,9 @@ type Pipeline struct { Outputs []struct { ID string `json:"id"` Events struct { - In int `json:"in"` - Out int `json:"out"` + DurationInMillis int `json:"duration_in_millis"` + In int `json:"in"` + Out int `json:"out"` } `json:"events"` Name string `json:"name"` } `json:"outputs"` @@ -52,6 +55,7 @@ type Pipeline struct { Events int `json:"events"` Type string `json:"type"` Capacity struct { + QueueSizeInBytes int `json:"queue_size_in_bytes"` PageCapacityInBytes int `json:"page_capacity_in_bytes"` MaxQueueSizeInBytes int64 `json:"max_queue_size_in_bytes"` MaxUnreadEvents int `json:"max_unread_events"` diff --git a/collector/nodestats_collector.go b/collector/nodestats_collector.go index 5bade5f..7578a2c 100644 --- a/collector/nodestats_collector.go +++ b/collector/nodestats_collector.go @@ -32,21 +32,24 @@ type NodeStatsCollector struct { ProcessMemTotalVirtualInBytes *prometheus.Desc ProcessCPUTotalInMillis *prometheus.Desc - PipelineDuration *prometheus.Desc - PipelineEventsIn *prometheus.Desc - PipelineEventsFiltered *prometheus.Desc - PipelineEventsOut *prometheus.Desc - - PipelinePluginEventsDuration *prometheus.Desc - PipelinePluginEventsIn *prometheus.Desc - PipelinePluginEventsOut *prometheus.Desc - PipelinePluginMatches *prometheus.Desc - PipelinePluginFailures *prometheus.Desc + PipelineDuration *prometheus.Desc + PipelineQueuePushDuration *prometheus.Desc + PipelineEventsIn *prometheus.Desc + PipelineEventsFiltered *prometheus.Desc + PipelineEventsOut *prometheus.Desc + + PipelinePluginEventsDuration *prometheus.Desc + PipelinePluginEventsQueuePushDuration *prometheus.Desc + PipelinePluginEventsIn *prometheus.Desc + PipelinePluginEventsOut *prometheus.Desc + PipelinePluginMatches *prometheus.Desc + PipelinePluginFailures *prometheus.Desc PipelineQueueEvents *prometheus.Desc PipelineQueuePageCapacity *prometheus.Desc PipelineQueueMaxQueueSize *prometheus.Desc PipelineQueueMaxUnreadEvents *prometheus.Desc + PipelineQueueSizeInBytes *prometheus.Desc PipelineDeadLetterQueueSizeInBytes *prometheus.Desc } @@ -191,6 +194,13 @@ func NewNodeStatsCollector(logstashEndpoint string) (Collector, error) { nil, ), + PipelineQueuePushDuration: prometheus.NewDesc( + prometheus.BuildFQName(Namespace, subsystem, "pipeline_queue_push_duration_seconds_total"), + "pipeline_queue_push_duration_seconds_total", + []string{"pipeline"}, + nil, + ), + PipelineEventsIn: prometheus.NewDesc( prometheus.BuildFQName(Namespace, subsystem, "pipeline_events_in_total"), "pipeline_events_in_total", @@ -219,6 +229,13 @@ func NewNodeStatsCollector(logstashEndpoint string) (Collector, error) { nil, ), + PipelinePluginEventsQueuePushDuration: prometheus.NewDesc( + prometheus.BuildFQName(Namespace, subsystem, "plugin_queue_push_duration_seconds_total"), + "plugin_queue_push_duration_seconds_total", + []string{"pipeline", "plugin", "plugin_id", "plugin_type"}, + nil, + ), + PipelinePluginEventsIn: prometheus.NewDesc( prometheus.BuildFQName(Namespace, subsystem, "plugin_events_in_total"), "plugin_events_in", @@ -275,6 +292,13 @@ func NewNodeStatsCollector(logstashEndpoint string) (Collector, error) { nil, ), + PipelineQueueSizeInBytes: prometheus.NewDesc( + prometheus.BuildFQName(Namespace, subsystem, "queue_size_bytes"), + "queue_size_bytes", + []string{"pipeline"}, + nil, + ), + PipelineDeadLetterQueueSizeInBytes: prometheus.NewDesc( prometheus.BuildFQName(Namespace, subsystem, "dead_letter_queue_size_bytes"), "dead_letter_queue_size_bytes", @@ -514,6 +538,13 @@ func (c *NodeStatsCollector) collect(ch chan<- prometheus.Metric) (*prometheus.D pipelineID, ) + ch <- prometheus.MustNewConstMetric( + c.PipelineQueuePushDuration, + prometheus.CounterValue, + float64(pipeline.Events.QueuePushDurationInMillis/1000), + pipelineID, + ) + ch <- prometheus.MustNewConstMetric( c.PipelineEventsIn, prometheus.CounterValue, @@ -554,6 +585,15 @@ func (c *NodeStatsCollector) collect(ch chan<- prometheus.Metric) (*prometheus.D plugin.ID, "input", ) + ch <- prometheus.MustNewConstMetric( + c.PipelinePluginEventsQueuePushDuration, + prometheus.CounterValue, + float64(plugin.Events.QueuePushDurationInMillis/1000), + pipelineID, + plugin.Name, + plugin.ID, + "input", + ) } for _, plugin := range pipeline.Plugins.Filters { @@ -623,6 +663,15 @@ func (c *NodeStatsCollector) collect(ch chan<- prometheus.Metric) (*prometheus.D plugin.ID, "output", ) + ch <- prometheus.MustNewConstMetric( + c.PipelinePluginEventsDuration, + prometheus.CounterValue, + float64(plugin.Events.DurationInMillis/1000), + pipelineID, + plugin.Name, + plugin.ID, + "output", + ) } if pipeline.Queue.Type != "memory" { @@ -653,6 +702,13 @@ func (c *NodeStatsCollector) collect(ch chan<- prometheus.Metric) (*prometheus.D float64(pipeline.Queue.Capacity.MaxUnreadEvents), pipelineID, ) + + ch <- prometheus.MustNewConstMetric( + c.PipelineQueueSizeInBytes, + prometheus.CounterValue, + float64(pipeline.Queue.Capacity.QueueSizeInBytes), + pipelineID, + ) } if pipeline.DeadLetterQueue.QueueSizeInBytes != 0 { diff --git a/collector/nodestats_collector_test.go b/collector/nodestats_collector_test.go index 504c235..e16fff2 100644 --- a/collector/nodestats_collector_test.go +++ b/collector/nodestats_collector_test.go @@ -186,6 +186,7 @@ var queueJSON = []byte(` "events" : 0, "type" : "persisted", "capacity" : { + "queue_size_in_bytes": 123456, "page_capacity_in_bytes" : 262144000, "max_queue_size_in_bytes" : 8589934592, "max_unread_events" : 12