diff --git a/.github/workflows/dendrite.yml b/.github/workflows/dendrite.yml index 17c3b62e80..b51ace79e9 100644 --- a/.github/workflows/dendrite.yml +++ b/.github/workflows/dendrite.yml @@ -62,7 +62,7 @@ jobs: # Run golangci-lint lint: - timeout-minutes: 5 + timeout-minutes: 8 name: Linting runs-on: ubuntu-latest steps: diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index ce1f1379a2..635dacfb30 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -98,17 +98,9 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc logrus.WithError(err).Error("nats connection: disconnected") }), natsclient.ReconnectHandler(func(_ *natsclient.Conn) { - logrus.Info("nats connection: client reconnected") - for _, stream := range []*nats.StreamConfig{ - streams[6], - streams[10], - } { - err = configureStream(stream, cfg, s) - if err != nil { - logrus.WithError(err).WithField("stream", stream.Name).Error("unable to configure a stream") - } - - } + logrus.Info("nats connection: nats is back up, restarting...") + process.ShutdownDendrite() + process.WaitForShutdown() }), natsclient.ClosedHandler(func(_ *natsclient.Conn) { logrus.Info("nats connection: client closed") @@ -258,52 +250,3 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc return s, nc } - -func configureStream(stream *nats.StreamConfig, cfg *config.JetStream, s nats.JetStreamContext) error { - name := cfg.Prefixed(stream.Name) - info, err := s.StreamInfo(name) - if err != nil && err != natsclient.ErrStreamNotFound { - return fmt.Errorf("get stream info: %w", err) - } - subjects := stream.Subjects - if len(subjects) == 0 { - // By default we want each stream to listen for the subjects - // that are either an exact match for the stream name, or where - // the first part of the subject is the stream name. ">" is a - // wildcard in NATS for one or more subject tokens. In the case - // that the stream is called "Foo", this will match any message - // with the subject "Foo", "Foo.Bar" or "Foo.Bar.Baz" etc. - subjects = []string{name, name + ".>"} - } - if info != nil { - switch { - case !reflect.DeepEqual(info.Config.Subjects, subjects): - fallthrough - case info.Config.Retention != stream.Retention: - fallthrough - case info.Config.Storage != stream.Storage: - if err = s.DeleteStream(name); err != nil { - return fmt.Errorf("delete stream: %w", err) - } - info = nil - } - } - if info == nil { - // If we're trying to keep everything in memory (e.g. unit tests) - // then overwrite the storage policy. - if cfg.InMemory { - stream.Storage = natsclient.MemoryStorage - } - - // Namespace the streams without modifying the original streams - // array, otherwise we end up with namespaces on namespaces. - namespaced := *stream - namespaced.Name = name - namespaced.Subjects = subjects - if _, err = s.AddStream(&namespaced); err != nil { - return fmt.Errorf("add stream: %w", err) - } - logrus.Infof("stream created: %s", stream.Name) - } - return nil -}