Skip to content

Commit

Permalink
Added more context to error of producer, producer counters, bump mess…
Browse files Browse the repository at this point in the history
…aging (pulsar client) (#45)

* added more context to error of producer

Signed-off-by: Amir Malka <[email protected]>

* fix

Signed-off-by: Amir Malka <[email protected]>

* bump messaging with new pulsar client

Signed-off-by: Amir Malka <[email protected]>

* fix

Signed-off-by: Amir Malka <[email protected]>

* increase sleep in test

Signed-off-by: Amir Malka <[email protected]>

---------

Signed-off-by: Amir Malka <[email protected]>
  • Loading branch information
amirmalka authored Feb 5, 2024
1 parent 5100fed commit 35e9604
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 12 deletions.
16 changes: 15 additions & 1 deletion adapters/backend/v1/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,26 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
)

const (
prometheusStatusLabel = "status"

prometheusStatusLabelValueSuccess = "success"
prometheusStatusLabelValueError = "error"
)

var (
connectedClientsGauge = promauto.NewGauge(prometheus.GaugeOpts{
Name: "synchronizer_connected_clients_count",
Help: "The number of connected clients",
})

pulsarProducerMessagesProducedCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "synchronizer_pulsar_producer_messages_produced_count",
Help: "The total number of messages produced to pulsar",
}, []string{prometheusStatusLabel})
pulsarProducerMessagePayloadBytesProducedCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "synchronizer_pulsar_producer_message_payload_bytes_produced_count",
Help: "Counter of bytes published to pulsar (message payload) successfully",
}, []string{prometheusStatusLabel})
/*
messagesReceivedCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "synchronizer_messages_received_count",
Expand Down
15 changes: 14 additions & 1 deletion adapters/backend/v1/pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/kubescape/synchronizer/domain"
"github.com/kubescape/synchronizer/messaging"
"github.com/kubescape/synchronizer/utils"
"github.com/prometheus/client_golang/prometheus"
)

const (
Expand Down Expand Up @@ -306,11 +307,23 @@ func (p *PulsarMessageProducer) ProduceMessage(ctx context.Context, id domain.Cl
}

func logPulsarSyncAsyncErrors(msgID pulsar.MessageID, message *pulsar.ProducerMessage, err error) {
var metricLabels prometheus.Labels
if err != nil {
logger.L().Error("failed to send message to pulsar", helpers.Error(err))
metricLabels = prometheus.Labels{prometheusStatusLabel: prometheusStatusLabelValueError}
logger.L().Error("failed to send message to pulsar",
helpers.Error(err),
helpers.String("messageID", msgID.String()),
helpers.Int("payloadBytes", len(message.Payload)),
helpers.Interface("messageProperties", message.Properties))
} else {
metricLabels = prometheus.Labels{prometheusStatusLabel: prometheusStatusLabelValueSuccess}
logger.L().Debug("successfully sent message to pulsar", helpers.String("messageID", msgID.String()), helpers.Interface("messageProperties", message.Properties))
}

pulsarProducerMessagesProducedCounter.With(metricLabels).Inc()
if message != nil {
pulsarProducerMessagePayloadBytesProducedCounter.With(metricLabels).Add(float64(len(message.Payload)))
}
}

func NewProducerMessage(producerMessageKey, account, cluster, eventType string, payload []byte) *pulsar.ProducerMessage {
Expand Down
2 changes: 1 addition & 1 deletion core/synchronizer_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,7 @@ func TestSynchronizer_TC08(t *testing.T) {
// add applicationprofile to k8s
_, err = td.clusters[0].storageclient.ApplicationProfiles(namespace).Create(context.TODO(), td.clusters[0].applicationprofile, metav1.CreateOptions{})
require.NoError(t, err)
time.Sleep(15 * time.Second)
time.Sleep(20 * time.Second)
// check object in postgres
_, objFound, err := td.processor.GetObjectFromPostgres(td.clusters[0].account, td.clusters[0].cluster, "spdx.softwarecomposition.kubescape.io/v1beta1/applicationprofiles", namespace, name)
assert.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ toolchain go1.21.5

require (
github.com/SergJa/jsonhash v0.0.0-20210531165746-fc45f346aa74
github.com/apache/pulsar-client-go v0.11.1
github.com/apache/pulsar-client-go v0.12.0
github.com/armosec/armoapi-go v0.0.292
github.com/armosec/armosec-infra v0.0.16
github.com/armosec/event-ingester-service v0.1.1-0.20240103121040-e464c2791b07
Expand All @@ -23,7 +23,7 @@ require (
github.com/kinbiko/jsonassert v1.1.1
github.com/kubescape/backend v0.0.14
github.com/kubescape/go-logger v0.0.22
github.com/kubescape/messaging v0.0.21
github.com/kubescape/messaging v0.0.22
github.com/kubescape/storage v0.0.57
github.com/panjf2000/ants/v2 v2.8.2
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2
Expand Down Expand Up @@ -92,7 +92,7 @@ require (
github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/dsnet/compress v0.0.2-0.20210315054119-f66993602bf5 // indirect
github.com/dvsekhvalnov/jose2go v1.5.0 // indirect
github.com/dvsekhvalnov/jose2go v1.6.0 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/facebookincubator/nvdtools v0.1.5 // indirect
github.com/fatih/color v1.16.0 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY
github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/apache/pulsar-client-go v0.11.1 h1:WxLitlPG4Dz62BblGlx51wm0rw76eRefJsWdawI22QM=
github.com/apache/pulsar-client-go v0.11.1/go.mod h1:FoijqJwgjroSKptIWp1vvK1CXs8dXnQiL8I+MHOri4A=
github.com/apache/pulsar-client-go v0.12.0 h1:rrMlwpr6IgLRPXLRRh2vSlcw5tGV2PUSjZwmqgh2B2I=
github.com/apache/pulsar-client-go v0.12.0/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/ardielle/ardielle-go v1.5.2 h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4=
github.com/ardielle/ardielle-go v1.5.2/go.mod h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
Expand Down Expand Up @@ -246,8 +246,8 @@ github.com/dsnet/compress v0.0.2-0.20210315054119-f66993602bf5 h1:iFaUwBSo5Svw6L
github.com/dsnet/compress v0.0.2-0.20210315054119-f66993602bf5/go.mod h1:qssHWj60/X5sZFNxpG4HBPDHVqxNm4DfnCKgrbZOT+s=
github.com/dsnet/golib v0.0.0-20171103203638-1ea166775780/go.mod h1:Lj+Z9rebOhdfkVLjJ8T6VcRQv3SXugXy999NBtR9aFY=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dvsekhvalnov/jose2go v1.5.0 h1:3j8ya4Z4kMCwT5nXIKFSV84YS+HdqSSO0VsTQxaLAeM=
github.com/dvsekhvalnov/jose2go v1.5.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU=
github.com/dvsekhvalnov/jose2go v1.6.0 h1:Y9gnSnP4qEI0+/uQkHvFXeD2PLPJeXEL+ySMEA2EjTY=
github.com/dvsekhvalnov/jose2go v1.6.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU=
github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g=
github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down Expand Up @@ -564,8 +564,8 @@ github.com/kubescape/go-logger v0.0.22 h1:gle7wH6emOiGv9ljdpVi82pWLQ3jGucrUucvil
github.com/kubescape/go-logger v0.0.22/go.mod h1:x3HBpZo3cMT/WIdy18BxvVVd5D0e/PWFVk/HiwBNu3g=
github.com/kubescape/k8s-interface v0.0.154 h1:D6TRgSBjbD/eTf2FKswSB9rdd9dsW2AQJL0RUm3NPH8=
github.com/kubescape/k8s-interface v0.0.154/go.mod h1:5sz+5Cjvo98lTbTVDiDA4MmlXxeHSVMW/wR0V3hV4K8=
github.com/kubescape/messaging v0.0.21 h1:BifsDivMrdTUybVmpq5xvdNLRTBpYaz60F779oenbHs=
github.com/kubescape/messaging v0.0.21/go.mod h1:Kug87F/xHtK21V5N6E0Y6dREWdwsMPpAv54QKhDpmy4=
github.com/kubescape/messaging v0.0.22 h1:817bf++aPlxfAzaTTHz+sj44FtLF36Il+hbSsL7Y7B4=
github.com/kubescape/messaging v0.0.22/go.mod h1:tuayUHsKEvxA6XyruJZPkxZpbYbIpvQpQDpxO88Icss=
github.com/kubescape/opa-utils v0.0.272 h1:hqEuYGf/B2HuqbdVUtSsUGJopfXbQOgl3+KvFAu2Gd8=
github.com/kubescape/opa-utils v0.0.272/go.mod h1:VmplJnkhei6mDna+6z183k/HX6GOPgsXiwIlDW8mhKw=
github.com/kubescape/storage v0.0.57 h1:ybKh0gQLhSHzm0MEhqjROMPtgE1/7K7PKBfNqOEm5WQ=
Expand Down

0 comments on commit 35e9604

Please sign in to comment.