Skip to content

Commit

Permalink
Merge pull request #7 from snapp-incubator/fix/streams
Browse files Browse the repository at this point in the history
fix: fix stream update and create
  • Loading branch information
kianaza authored Jun 29, 2024
2 parents 96d0ce0 + f41f226 commit 28db558
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 43 deletions.
6 changes: 4 additions & 2 deletions internal/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@ func Default() Config {
},
NATS: natsclient.Config{
NewStreamAllow: true,
Stream: natsclient.Stream{
Name: "stream",
Streams: []natsclient.Stream{{
Name: "test",
Subject: "test",
},
},
URL: "localhost:4222",
PublishInterval: 2 * time.Second,
RequestTimeout: 50 * time.Millisecond,
MaxPubAcksInflight: 1000,
QueueSubscriptionGroup: "group",
FlushTimeout: 2 * time.Second,
ClientName: "localhost",
},
Metric: metric.Config{
Server: metric.Server{Address: ":8080"},
Expand Down
3 changes: 2 additions & 1 deletion internal/natsclient/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import "time"

type Config struct {
NewStreamAllow bool `json:"new_stream_allow" koanf:"new_stream_allow"`
Stream Stream `json:"stream,omitempty" koanf:"stream"`
Streams []Stream `json:"stream,omitempty" koanf:"stream"`
URL string `json:"url,omitempty" koanf:"url"`
PublishInterval time.Duration `json:"publish_interval" koanf:"publish_interval"`
RequestTimeout time.Duration `json:"request_timeout" koanf:"request_timeout"`
MaxPubAcksInflight int `json:"max_pub_acks_inflight" koanf:"max_pub_acks_inflight"`
QueueSubscriptionGroup string `json:"queue_subscription_group" koanf:"queue_subscription_group"`
FlushTimeout time.Duration `json:"flush_timeout" koanf:"flush_timeout"`
ClientName string `json:"client_name" koanf:"client_name"`
}

type Stream struct {
Expand Down
91 changes: 62 additions & 29 deletions internal/natsclient/jetstream.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package natsclient

import (
"slices"
"time"

"github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

var (
successfulSubscribe = "successful subscribe"
failedPublish = "failed publish"
successfulPublish = "successful publish"
)

type Message struct {
Subject string
Data []byte
Expand All @@ -26,14 +34,14 @@ func NewJetstream(config Config, logger *zap.Logger) *Jetstream {
j := &Jetstream{
config: &config,
logger: logger,
metrics: NewMetrics(),
metrics: NewMetrics(config.ClientName),
}

j.connect()

j.createJetstreamContext()

j.createStream()
j.UpdateOrCreateStream()

return j
}
Expand Down Expand Up @@ -64,33 +72,49 @@ func (j *Jetstream) createJetstreamContext() {
}
}

func (j *Jetstream) createStream() {
_, err := j.jetstream.StreamInfo(j.config.Stream.Name)
if err == nil {
_, err = j.jetstream.UpdateStream(&nats.StreamConfig{
Name: j.config.Stream.Name,
Subjects: []string{j.config.Stream.Subject},
})
if err != nil {
j.logger.Panic("could not add subject to existing stream", zap.Error(err))
}
} else if err == nats.ErrStreamNotFound && j.config.NewStreamAllow {
_, err = j.jetstream.AddStream(&nats.StreamConfig{
Name: j.config.Stream.Name,
Subjects: []string{j.config.Stream.Subject},
})
if err != nil {
j.logger.Panic("could not add stream", zap.Error(err))
func (j *Jetstream) UpdateOrCreateStream() {
for _, stream := range j.config.Streams {
info, err := j.jetstream.StreamInfo(stream.Name)
if err == nil {
j.updateStream(stream, info)
} else if err == nats.ErrStreamNotFound && j.config.NewStreamAllow {
j.createStream(stream)
} else {
j.logger.Panic("could not add subject", zap.Error(err))
}
} else {
j.logger.Panic("could not add subject", zap.Error(err))
}
}
func (j *Jetstream) updateStream(stream Stream, info *nats.StreamInfo) {
subjects := append(info.Config.Subjects, stream.Subject)
slices.Sort(subjects)
subjects = slices.Compact(subjects)
_, err := j.jetstream.UpdateStream(&nats.StreamConfig{
Name: stream.Name,
Subjects: subjects,
})
if err != nil {
j.logger.Panic("could not add subject to existing stream", zap.Error(err))
}
j.logger.Info("stream updated")
}

func (j *Jetstream) createStream(stream Stream) {
_, err := j.jetstream.AddStream(&nats.StreamConfig{
Name: stream.Name,
Subjects: []string{stream.Subject},
})
if err != nil {
j.logger.Panic("could not add stream", zap.Error(err))
}
j.logger.Info("add new stream")
}

func (j *Jetstream) StartBlackboxTest() {
messageChannel := j.createSubscribe(j.config.Stream.Subject)
go j.jetstreamPublish(j.config.Stream.Subject)
go j.jetstreamSubscribe(messageChannel)
for _, stream := range j.config.Streams {
messageChannel := j.createSubscribe(stream.Subject)
go j.jetstreamPublish(stream.Subject, stream.Name)
go j.jetstreamSubscribe(messageChannel, stream.Name)
}
}

// Subscribe subscribes to a list of subjects and returns a channel with incoming messages
Expand All @@ -115,7 +139,7 @@ func (j *Jetstream) createSubscribe(subject string) chan *Message {

}

func (j *Jetstream) jetstreamSubscribe(h chan *Message) {
func (j *Jetstream) jetstreamSubscribe(h chan *Message, streamName string) {
for msg := range h {
var publishTime time.Time
err := publishTime.UnmarshalBinary(msg.Data)
Expand All @@ -126,20 +150,26 @@ func (j *Jetstream) jetstreamSubscribe(h chan *Message) {
}
latency := time.Since(publishTime).Seconds()
j.metrics.Latency.Observe(latency)
j.metrics.SuccessCounter.WithLabelValues("successful subscribe").Add(1)
j.metrics.SuccessCounter.With(prometheus.Labels{
"type": successfulSubscribe,
"stream": streamName,
}).Add(1)
j.logger.Info("Received message: ", zap.String("subject", msg.Subject), zap.Float64("latency", latency))
}
}

func (j *Jetstream) jetstreamPublish(subject string) {
func (j *Jetstream) jetstreamPublish(subject string, streamName string) {
for {
t, err := time.Now().MarshalBinary()
if err != nil {
j.logger.Error("could not marshal current time.", zap.Error(err))
}

if ack, err := j.jetstream.Publish(subject, t); err != nil {
j.metrics.SuccessCounter.WithLabelValues("failed publish").Add(1)
j.metrics.SuccessCounter.With(prometheus.Labels{
"type": failedPublish,
"stream": streamName,
}).Add(1)
if err == nats.ErrTimeout {
j.logger.Error("Request timeout: No response received within the timeout period.")
} else if err == nats.ErrNoStreamResponse {
Expand All @@ -148,7 +178,10 @@ func (j *Jetstream) jetstreamPublish(subject string) {
j.logger.Error("Request failed: %v", zap.Error(err))
}
} else {
j.metrics.SuccessCounter.WithLabelValues("successful publish").Add(1)
j.metrics.SuccessCounter.With(prometheus.Labels{
"type": successfulPublish,
"stream": streamName,
}).Add(1)
j.logger.Info("receive ack", zap.String("stream", ack.Stream))
}

Expand Down
22 changes: 11 additions & 11 deletions internal/natsclient/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ import (

const (
Namespace = "nats_blackbox_exporter"
Subsystem = "client"
)

var latencyBuckets = []float64{
0.0001,
0.0005,
0.0007,
0.001,
0.0013,
0.0015,
0.0017,
0.002,
0.0023,
0.0025,
0.0027,
0.003,
0.004,
0.005,
0.006,
}

// Metrics has all the client metrics.
Expand Down Expand Up @@ -68,30 +68,30 @@ func newCounterVec(counterOpts prometheus.CounterOpts, labelNames []string) prom
return *ev
}

func NewMetrics() Metrics {
func NewMetrics(clinetName string) Metrics {
return Metrics{
Connection: newCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Subsystem: clinetName,
Name: "connection_errors_total",
Help: "total number of disconnections and reconnections",
ConstLabels: nil,
}, []string{"type"}),
// nolint: exhaustruct
Latency: newHistogram(prometheus.HistogramOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Subsystem: clinetName,
Name: "latency",
Help: "from publish to consume duration in seconds",
ConstLabels: nil,
Buckets: latencyBuckets,
}),
SuccessCounter: newCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Subsystem: clinetName,
Name: "success_counter",
Help: "publish and consume success rate",
ConstLabels: nil,
}, []string{"type"}),
}, []string{"type", "stream"}),
}
}

0 comments on commit 28db558

Please sign in to comment.