Skip to content

Commit

Permalink
🐛 Restart Dendrite after Nats completes a restart.
Browse files Browse the repository at this point in the history
  • Loading branch information
Danieloni1 committed Jul 18, 2024
1 parent cd43a34 commit 68c0408
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 61 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/dendrite.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ jobs:

# Run golangci-lint
lint:
timeout-minutes: 5
timeout-minutes: 8
name: Linting
runs-on: ubuntu-latest
steps:
Expand Down
63 changes: 3 additions & 60 deletions setup/jetstream/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,9 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc
logrus.WithError(err).Error("nats connection: disconnected")
}),
natsclient.ReconnectHandler(func(_ *natsclient.Conn) {
logrus.Info("nats connection: client reconnected")
for _, stream := range []*nats.StreamConfig{
streams[6],
streams[10],
} {
err = configureStream(stream, cfg, s)
if err != nil {
logrus.WithError(err).WithField("stream", stream.Name).Error("unable to configure a stream")
}

}
logrus.Info("nats connection: nats is back up, restarting...")
process.ShutdownDendrite()
process.WaitForShutdown()
}),
natsclient.ClosedHandler(func(_ *natsclient.Conn) {
logrus.Info("nats connection: client closed")
Expand Down Expand Up @@ -258,52 +250,3 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc

return s, nc
}

func configureStream(stream *nats.StreamConfig, cfg *config.JetStream, s nats.JetStreamContext) error {
name := cfg.Prefixed(stream.Name)
info, err := s.StreamInfo(name)
if err != nil && err != natsclient.ErrStreamNotFound {
return fmt.Errorf("get stream info: %w", err)
}
subjects := stream.Subjects
if len(subjects) == 0 {
// By default we want each stream to listen for the subjects
// that are either an exact match for the stream name, or where
// the first part of the subject is the stream name. ">" is a
// wildcard in NATS for one or more subject tokens. In the case
// that the stream is called "Foo", this will match any message
// with the subject "Foo", "Foo.Bar" or "Foo.Bar.Baz" etc.
subjects = []string{name, name + ".>"}
}
if info != nil {
switch {
case !reflect.DeepEqual(info.Config.Subjects, subjects):
fallthrough
case info.Config.Retention != stream.Retention:
fallthrough
case info.Config.Storage != stream.Storage:
if err = s.DeleteStream(name); err != nil {
return fmt.Errorf("delete stream: %w", err)
}
info = nil
}
}
if info == nil {
// If we're trying to keep everything in memory (e.g. unit tests)
// then overwrite the storage policy.
if cfg.InMemory {
stream.Storage = natsclient.MemoryStorage
}

// Namespace the streams without modifying the original streams
// array, otherwise we end up with namespaces on namespaces.
namespaced := *stream
namespaced.Name = name
namespaced.Subjects = subjects
if _, err = s.AddStream(&namespaced); err != nil {
return fmt.Errorf("add stream: %w", err)
}
logrus.Infof("stream created: %s", stream.Name)
}
return nil
}

0 comments on commit 68c0408

Please sign in to comment.