Skip to content

Commit

Permalink
feature: new api
Browse files Browse the repository at this point in the history
Signed-off-by: Valery Piashchynski <[email protected]>
  • Loading branch information
rustatian committed Oct 17, 2023
1 parent 04cb9da commit 95e3291
Show file tree
Hide file tree
Showing 11 changed files with 481 additions and 233 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ toolchain go1.21.1
require (
github.com/goccy/go-json v0.10.2
github.com/google/uuid v1.3.1
github.com/nats-io/nats.go v1.30.2
github.com/nats-io/nats.go v1.31.0
github.com/roadrunner-server/api/v4 v4.9.0
github.com/roadrunner-server/endure/v2 v2.4.3
github.com/roadrunner-server/errors v1.3.0
Expand All @@ -22,8 +22,7 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/nats-io/nats-server/v2 v2.10.1 // indirect
github.com/klauspost/compress v1.17.1 // indirect
github.com/nats-io/nkeys v0.4.5 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
Expand All @@ -32,4 +31,5 @@ require (
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
)
22 changes: 6 additions & 16 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,14 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU=
github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.10.1 h1:MIJ614dhOIdo71iSzY8ln78miXwrYvlvXHUyS+XdKZQ=
github.com/nats-io/nats-server/v2 v2.10.1/go.mod h1:3PMvMSu2cuK0J9YInRLWdFpFsswKKGUS77zVSAudRto=
github.com/nats-io/nats.go v1.30.2 h1:aloM0TGpPorZKQhbAkdCzYDj+ZmsJDyeo3Gkbr72NuY=
github.com/nats-io/nats.go v1.30.2/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM=
github.com/klauspost/compress v1.17.1 h1:NE3C767s2ak2bweCZo3+rdP4U/HoyVXLv/X9f2gPS5g=
github.com/klauspost/compress v1.17.1/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E=
github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8=
github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk=
github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down Expand Up @@ -57,9 +49,7 @@ golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
283 changes: 275 additions & 8 deletions go.work.sum

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions natsjobs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

const (
pipeSubject string = "subject"
pipeStream string = "stream"
pipeStream string = "streamID"
pipePrefetch string = "prefetch"
pipeDeleteAfterAck string = "delete_after_ack"
pipeDeliverNew string = "deliver_new"
Expand All @@ -21,7 +21,7 @@ type config struct {

Priority int64 `mapstructure:"priority"`
Subject string `mapstructure:"subject"`
Stream string `mapstructure:"stream"`
Stream string `mapstructure:"streamID"`
Prefetch int `mapstructure:"prefetch"`
RateLimit uint64 `mapstructure:"rate_limit"`
DeleteAfterAck bool `mapstructure:"delete_after_ack"`
Expand All @@ -43,7 +43,7 @@ func (c *config) InitDefaults() {
}

if c.Stream == "" {
c.Stream = "default-stream"
c.Stream = "default-streamID"
}

if c.Subject == "" {
Expand Down
128 changes: 55 additions & 73 deletions natsjobs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package natsjobs

import (
"context"
stderr "errors"
"sync/atomic"
"time"

"github.com/goccy/go-json"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/roadrunner-server/api/v4/plugins/v3/jobs"
"github.com/roadrunner-server/errors"
jprop "go.opentelemetry.io/contrib/propagators/jaeger"
Expand All @@ -29,7 +29,7 @@ var _ jobs.Driver = (*Driver)(nil)
type Configurer interface {
// UnmarshalKey takes a single key and unmarshal it into a Struct.
UnmarshalKey(name string, out any) error
// Has checks if config section exists.
// Has checks if a config section exists.
Has(name string) bool
}

Expand All @@ -45,22 +45,29 @@ type Driver struct {
stopped uint64

// nats
conn *nats.Conn
sub *nats.Subscription
msgCh chan *nats.Msg
js nats.JetStreamContext
consumer *consumer
conn *nats.Conn
stream jetstream.Stream
jetstream jetstream.JetStream
msgCh chan jetstream.Msg

// config
priority int64
subject string
stream string
streamID string
prefetch int
rateLimit uint64
deleteAfterAck bool
deliverNew bool
deleteStreamOnStop bool
}

type consumer struct {
id string
jsc jetstream.Consumer
context jetstream.ConsumeContext
}

func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logger, cfg Configurer, pipe jobs.Pipeline, pq jobs.Queue) (*Driver, error) {
const op = errors.Op("new_nats_consumer")

Expand Down Expand Up @@ -107,29 +114,17 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logg
return nil, errors.E(op, err)
}

js, err := conn.JetStream()
js, err := jetstream.New(conn, jetstream.WithPublishAsyncMaxPending(1000))
if err != nil {
return nil, errors.E(op, err)
}

var si *nats.StreamInfo
si, err = js.StreamInfo(conf.Stream)
stream, err := js.CreateOrUpdateStream(context.Background(), jetstream.StreamConfig{
Name: conf.Stream,
Subjects: []string{conf.Subject},
})
if err != nil {
if stderr.Is(err, nats.ErrStreamNotFound) {
si, err = js.AddStream(&nats.StreamConfig{
Name: conf.Stream,
Subjects: []string{conf.Subject},
})
if err != nil {
return nil, errors.E(op, err)
}
} else {
return nil, errors.E(op, err)
}
}

if si == nil {
return nil, errors.E(op, errors.Str("failed to create a stream"))
return nil, err
}

cs := &Driver{
Expand All @@ -140,17 +135,19 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logg
stopped: 0,
queue: pq,

conn: conn,
js: js,
conn: conn,
stream: stream,
jetstream: js,

priority: conf.Priority,
subject: conf.Subject,
stream: conf.Stream,
streamID: conf.Stream,
deleteAfterAck: conf.DeleteAfterAck,
deleteStreamOnStop: conf.DeleteStreamOnStop,
prefetch: conf.Prefetch,
deliverNew: conf.DeliverNew,
rateLimit: conf.RateLimit,
msgCh: make(chan *nats.Msg, conf.Prefetch),
msgCh: make(chan jetstream.Msg, conf.Prefetch),
}

cs.pipeline.Store(&pipe)
Expand Down Expand Up @@ -195,29 +192,17 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.
return nil, errors.E(op, err)
}

js, err := conn.JetStream()
js, err := jetstream.New(conn)
if err != nil {
return nil, errors.E(op, err)
}

var si *nats.StreamInfo
si, err = js.StreamInfo(pipe.String(pipeStream, "default-stream"))
stream, err := js.CreateOrUpdateStream(context.Background(), jetstream.StreamConfig{
Name: pipe.String(pipeStream, "default-stream"),
Subjects: []string{pipe.String(pipeSubject, "default")},
})
if err != nil {
if stderr.Is(err, nats.ErrStreamNotFound) {
si, err = js.AddStream(&nats.StreamConfig{
Name: pipe.String(pipeStream, "default-stream"),
Subjects: []string{pipe.String(pipeSubject, "default")},
})
if err != nil {
return nil, errors.E(op, err)
}
} else {
return nil, errors.E(op, err)
}
}

if si == nil {
return nil, errors.E(op, errors.Str("failed to create a stream"))
return nil, err
}

cs := &Driver{
Expand All @@ -228,17 +213,18 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.
stopCh: make(chan struct{}),
stopped: 0,

conn: conn,
js: js,
conn: conn,
stream: stream,

priority: pipe.Priority(),
subject: pipe.String(pipeSubject, "default"),
stream: pipe.String(pipeStream, "default-stream"),
streamID: pipe.String(pipeStream, "default-streamID"),
prefetch: pipe.Int(pipePrefetch, 100),
deleteAfterAck: pipe.Bool(pipeDeleteAfterAck, false),
deliverNew: pipe.Bool(pipeDeliverNew, false),
deleteStreamOnStop: pipe.Bool(pipeDeleteStreamOnStop, false),
rateLimit: uint64(pipe.Int(pipeRateLimit, 1000)),
msgCh: make(chan *nats.Msg, pipe.Int(pipePrefetch, 100)),
msgCh: make(chan jetstream.Msg, pipe.Int(pipePrefetch, 100)),
}

cs.pipeline.Store(&pipe)
Expand All @@ -264,7 +250,10 @@ func (c *Driver) Push(ctx context.Context, job jobs.Message) error {
return errors.E(op, err)
}

_, err = c.js.Publish(c.subject, data)
_, err = c.jetstream.PublishMsg(ctx, &nats.Msg{
Data: data,
Subject: c.subject,
})
if err != nil {
return errors.E(op, err)
}
Expand Down Expand Up @@ -293,7 +282,7 @@ func (c *Driver) Run(ctx context.Context, p jobs.Pipeline) error {
}

atomic.AddUint32(&c.listeners, 1)
err := c.listenerInit()
err := c.listenerInit(c.stream, c.streamID, c.rateLimit)
if err != nil {
return errors.E(op, err)
}
Expand Down Expand Up @@ -324,15 +313,8 @@ func (c *Driver) Pause(ctx context.Context, p string) error {
// remove listener
atomic.AddUint32(&c.listeners, ^uint32(0))

if c.sub != nil {
err := c.sub.Drain()
if err != nil {
c.log.Error("drain error", zap.Error(err))
}
}

c.consumer.context.Stop()
c.stopCh <- struct{}{}
c.sub = nil

c.log.Debug("pipeline was paused", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name()), zap.Time("start", start), zap.Duration("elapsed", time.Since(start)))

Expand All @@ -356,7 +338,7 @@ func (c *Driver) Resume(ctx context.Context, p string) error {
return errors.Str("nats listener is already in the active state")
}

err := c.listenerInit()
err := c.listenerInit(c.stream, c.streamID, c.rateLimit)
if err != nil {
return err
}
Expand Down Expand Up @@ -384,8 +366,8 @@ func (c *Driver) State(ctx context.Context) (*jobs.State, error) {
Ready: ready(atomic.LoadUint32(&c.listeners)),
}

if c.sub != nil {
ci, err := c.sub.ConsumerInfo()
if c.consumer != nil && c.consumer.jsc != nil {
ci, err := c.consumer.jsc.Info(ctx)
if err != nil {
return nil, err
}
Expand All @@ -412,18 +394,15 @@ func (c *Driver) Stop(ctx context.Context) error {
_ = c.queue.Remove(pipe.Name())

if atomic.LoadUint32(&c.listeners) > 0 {
if c.sub != nil {
err := c.sub.Drain()
if err != nil {
c.log.Error("drain error", zap.Error(err))
}
err := c.stream.Purge(ctx)
if err != nil {
c.log.Error("drain error", zap.Error(err))
}

c.stopCh <- struct{}{}
}

if c.deleteStreamOnStop {
err := c.js.DeleteStream(c.stream)
err := c.jetstream.DeleteStream(ctx, c.streamID)
if err != nil {
return err
}
Expand Down Expand Up @@ -453,13 +432,16 @@ func (c *Driver) requeue(item *Item) error {
return errors.E(op, err)
}

_, err = c.js.Publish(c.subject, data)
_, err = c.jetstream.PublishMsg(context.Background(), &nats.Msg{
Subject: c.subject,
Data: data,
})
if err != nil {
return errors.E(op, err)
}

// delete the old message
_ = c.js.DeleteMsg(c.stream, item.Options.seq)
_ = c.stream.DeleteMsg(context.Background(), item.Options.seq)

item = nil
return nil
Expand Down
Loading

0 comments on commit 95e3291

Please sign in to comment.