From 35e960480978c90ac91fcd60f80780525969989e Mon Sep 17 00:00:00 2001 From: Amir Malka Date: Mon, 5 Feb 2024 11:16:13 +0200 Subject: [PATCH] Added more context to error of producer, producer counters, bump messaging (pulsar client) (#45) * added more context to error of producer Signed-off-by: Amir Malka * fix Signed-off-by: Amir Malka * bump messaging with new pulsar client Signed-off-by: Amir Malka * fix Signed-off-by: Amir Malka * increase sleep in test Signed-off-by: Amir Malka --------- Signed-off-by: Amir Malka --- adapters/backend/v1/metrics.go | 16 +++++++++++++++- adapters/backend/v1/pulsar.go | 15 ++++++++++++++- core/synchronizer_integration_test.go | 2 +- go.mod | 6 +++--- go.sum | 12 ++++++------ 5 files changed, 39 insertions(+), 12 deletions(-) diff --git a/adapters/backend/v1/metrics.go b/adapters/backend/v1/metrics.go index 3434615..05fd081 100644 --- a/adapters/backend/v1/metrics.go +++ b/adapters/backend/v1/metrics.go @@ -5,12 +5,26 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" ) +const ( + prometheusStatusLabel = "status" + + prometheusStatusLabelValueSuccess = "success" + prometheusStatusLabelValueError = "error" +) + var ( connectedClientsGauge = promauto.NewGauge(prometheus.GaugeOpts{ Name: "synchronizer_connected_clients_count", Help: "The number of connected clients", }) - + pulsarProducerMessagesProducedCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "synchronizer_pulsar_producer_messages_produced_count", + Help: "The total number of messages produced to pulsar", + }, []string{prometheusStatusLabel}) + pulsarProducerMessagePayloadBytesProducedCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "synchronizer_pulsar_producer_message_payload_bytes_produced_count", + Help: "Counter of bytes published to pulsar (message payload) successfully", + }, []string{prometheusStatusLabel}) /* messagesReceivedCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "synchronizer_messages_received_count", diff --git a/adapters/backend/v1/pulsar.go b/adapters/backend/v1/pulsar.go index 095a39f..94d7fa8 100644 --- a/adapters/backend/v1/pulsar.go +++ b/adapters/backend/v1/pulsar.go @@ -16,6 +16,7 @@ import ( "github.com/kubescape/synchronizer/domain" "github.com/kubescape/synchronizer/messaging" "github.com/kubescape/synchronizer/utils" + "github.com/prometheus/client_golang/prometheus" ) const ( @@ -306,11 +307,23 @@ func (p *PulsarMessageProducer) ProduceMessage(ctx context.Context, id domain.Cl } func logPulsarSyncAsyncErrors(msgID pulsar.MessageID, message *pulsar.ProducerMessage, err error) { + var metricLabels prometheus.Labels if err != nil { - logger.L().Error("failed to send message to pulsar", helpers.Error(err)) + metricLabels = prometheus.Labels{prometheusStatusLabel: prometheusStatusLabelValueError} + logger.L().Error("failed to send message to pulsar", + helpers.Error(err), + helpers.String("messageID", msgID.String()), + helpers.Int("payloadBytes", len(message.Payload)), + helpers.Interface("messageProperties", message.Properties)) } else { + metricLabels = prometheus.Labels{prometheusStatusLabel: prometheusStatusLabelValueSuccess} logger.L().Debug("successfully sent message to pulsar", helpers.String("messageID", msgID.String()), helpers.Interface("messageProperties", message.Properties)) } + + pulsarProducerMessagesProducedCounter.With(metricLabels).Inc() + if message != nil { + pulsarProducerMessagePayloadBytesProducedCounter.With(metricLabels).Add(float64(len(message.Payload))) + } } func NewProducerMessage(producerMessageKey, account, cluster, eventType string, payload []byte) *pulsar.ProducerMessage { diff --git a/core/synchronizer_integration_test.go b/core/synchronizer_integration_test.go index c929b37..0b400e8 100644 --- a/core/synchronizer_integration_test.go +++ b/core/synchronizer_integration_test.go @@ -987,7 +987,7 @@ func TestSynchronizer_TC08(t *testing.T) { // add applicationprofile to k8s _, err = td.clusters[0].storageclient.ApplicationProfiles(namespace).Create(context.TODO(), td.clusters[0].applicationprofile, metav1.CreateOptions{}) require.NoError(t, err) - time.Sleep(15 * time.Second) + time.Sleep(20 * time.Second) // check object in postgres _, objFound, err := td.processor.GetObjectFromPostgres(td.clusters[0].account, td.clusters[0].cluster, "spdx.softwarecomposition.kubescape.io/v1beta1/applicationprofiles", namespace, name) assert.NoError(t, err) diff --git a/go.mod b/go.mod index 41ed9c6..9e84095 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ toolchain go1.21.5 require ( github.com/SergJa/jsonhash v0.0.0-20210531165746-fc45f346aa74 - github.com/apache/pulsar-client-go v0.11.1 + github.com/apache/pulsar-client-go v0.12.0 github.com/armosec/armoapi-go v0.0.292 github.com/armosec/armosec-infra v0.0.16 github.com/armosec/event-ingester-service v0.1.1-0.20240103121040-e464c2791b07 @@ -23,7 +23,7 @@ require ( github.com/kinbiko/jsonassert v1.1.1 github.com/kubescape/backend v0.0.14 github.com/kubescape/go-logger v0.0.22 - github.com/kubescape/messaging v0.0.21 + github.com/kubescape/messaging v0.0.22 github.com/kubescape/storage v0.0.57 github.com/panjf2000/ants/v2 v2.8.2 github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 @@ -92,7 +92,7 @@ require ( github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c // indirect github.com/docker/go-units v0.5.0 // indirect github.com/dsnet/compress v0.0.2-0.20210315054119-f66993602bf5 // indirect - github.com/dvsekhvalnov/jose2go v1.5.0 // indirect + github.com/dvsekhvalnov/jose2go v1.6.0 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/facebookincubator/nvdtools v0.1.5 // indirect github.com/fatih/color v1.16.0 // indirect diff --git a/go.sum b/go.sum index aebd712..de0949a 100644 --- a/go.sum +++ b/go.sum @@ -109,8 +109,8 @@ github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= -github.com/apache/pulsar-client-go v0.11.1 h1:WxLitlPG4Dz62BblGlx51wm0rw76eRefJsWdawI22QM= -github.com/apache/pulsar-client-go v0.11.1/go.mod h1:FoijqJwgjroSKptIWp1vvK1CXs8dXnQiL8I+MHOri4A= +github.com/apache/pulsar-client-go v0.12.0 h1:rrMlwpr6IgLRPXLRRh2vSlcw5tGV2PUSjZwmqgh2B2I= +github.com/apache/pulsar-client-go v0.12.0/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk= github.com/ardielle/ardielle-go v1.5.2 h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4= github.com/ardielle/ardielle-go v1.5.2/go.mod h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= @@ -246,8 +246,8 @@ github.com/dsnet/compress v0.0.2-0.20210315054119-f66993602bf5 h1:iFaUwBSo5Svw6L github.com/dsnet/compress v0.0.2-0.20210315054119-f66993602bf5/go.mod h1:qssHWj60/X5sZFNxpG4HBPDHVqxNm4DfnCKgrbZOT+s= github.com/dsnet/golib v0.0.0-20171103203638-1ea166775780/go.mod h1:Lj+Z9rebOhdfkVLjJ8T6VcRQv3SXugXy999NBtR9aFY= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= -github.com/dvsekhvalnov/jose2go v1.5.0 h1:3j8ya4Z4kMCwT5nXIKFSV84YS+HdqSSO0VsTQxaLAeM= -github.com/dvsekhvalnov/jose2go v1.5.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU= +github.com/dvsekhvalnov/jose2go v1.6.0 h1:Y9gnSnP4qEI0+/uQkHvFXeD2PLPJeXEL+ySMEA2EjTY= +github.com/dvsekhvalnov/jose2go v1.6.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU= github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g= github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -564,8 +564,8 @@ github.com/kubescape/go-logger v0.0.22 h1:gle7wH6emOiGv9ljdpVi82pWLQ3jGucrUucvil github.com/kubescape/go-logger v0.0.22/go.mod h1:x3HBpZo3cMT/WIdy18BxvVVd5D0e/PWFVk/HiwBNu3g= github.com/kubescape/k8s-interface v0.0.154 h1:D6TRgSBjbD/eTf2FKswSB9rdd9dsW2AQJL0RUm3NPH8= github.com/kubescape/k8s-interface v0.0.154/go.mod h1:5sz+5Cjvo98lTbTVDiDA4MmlXxeHSVMW/wR0V3hV4K8= -github.com/kubescape/messaging v0.0.21 h1:BifsDivMrdTUybVmpq5xvdNLRTBpYaz60F779oenbHs= -github.com/kubescape/messaging v0.0.21/go.mod h1:Kug87F/xHtK21V5N6E0Y6dREWdwsMPpAv54QKhDpmy4= +github.com/kubescape/messaging v0.0.22 h1:817bf++aPlxfAzaTTHz+sj44FtLF36Il+hbSsL7Y7B4= +github.com/kubescape/messaging v0.0.22/go.mod h1:tuayUHsKEvxA6XyruJZPkxZpbYbIpvQpQDpxO88Icss= github.com/kubescape/opa-utils v0.0.272 h1:hqEuYGf/B2HuqbdVUtSsUGJopfXbQOgl3+KvFAu2Gd8= github.com/kubescape/opa-utils v0.0.272/go.mod h1:VmplJnkhei6mDna+6z183k/HX6GOPgsXiwIlDW8mhKw= github.com/kubescape/storage v0.0.57 h1:ybKh0gQLhSHzm0MEhqjROMPtgE1/7K7PKBfNqOEm5WQ=