From 7ebff408e45c1c021469724fe49a5d98206a158b Mon Sep 17 00:00:00 2001 From: Brian McGee Date: Thu, 12 Oct 2023 12:26:45 +0100 Subject: [PATCH] feat: add store init command Separates creation of NATS streams into a separate dedicated command. Signed-off-by: Brian McGee --- internal/cli/store/init.go | 59 +++++++++++++++++++++++++++++++++++++ internal/cli/store/run.go | 15 +++------- internal/cli/store/store.go | 3 +- nix/dev/nvix.nix | 14 +++++++-- pkg/blob/grpc.go | 14 --------- pkg/blob/grpc_test.go | 11 ++++++- pkg/cli/nats.go | 19 ++++++++++++ pkg/directory/grpc.go | 14 --------- pkg/pathinfo/grpc.go | 14 --------- pkg/store/cache.go | 7 +++++ pkg/store/cdc.go | 7 +++++ pkg/store/nats.go | 9 ++++++ pkg/store/types.go | 1 + 13 files changed, 130 insertions(+), 57 deletions(-) create mode 100644 internal/cli/store/init.go create mode 100644 pkg/cli/nats.go diff --git a/internal/cli/store/init.go b/internal/cli/store/init.go new file mode 100644 index 0000000..b249b6d --- /dev/null +++ b/internal/cli/store/init.go @@ -0,0 +1,59 @@ +package store + +import ( + "context" + + "github.com/brianmcgee/nvix/pkg/blob" + "github.com/brianmcgee/nvix/pkg/cli" + "github.com/brianmcgee/nvix/pkg/directory" + "github.com/brianmcgee/nvix/pkg/pathinfo" + "github.com/charmbracelet/log" +) + +type Init struct { + Log cli.LogOptions `embed:""` + Nats cli.NatsOptions `embed:""` +} + +func (i *Init) Run() error { + i.Log.ConfigureLogger() + conn := i.Nats.Connect() + + ctx := context.Background() + + log.Info("initialising stores") + + log.Info("initialising blob chunk store") + if err := blob.NewChunkStore(conn).Init(ctx); err != nil { + log.Errorf("failed to initialise blob chunk store: %v", err) + return err + } + + log.Info("initialising blob meta store") + if err := blob.NewMetaStore(conn).Init(ctx); err != nil { + log.Errorf("failed to initialise blob meta store: %v", err) + return err + } + + log.Info("initialising directory store") + if err := directory.NewDirectoryStore(conn).Init(ctx); err != nil { + log.Errorf("failed to initialise directory store: %v", err) + return err + } + + log.Info("initialising path info store") + if err := pathinfo.NewPathInfoStore(conn).Init(ctx); err != nil { + log.Errorf("failed to initialise path info store: %v", err) + return err + } + + log.Info("initialising path info out idx store") + if err := pathinfo.NewPathInfoOutIdxStore(conn).Init(ctx); err != nil { + log.Errorf("failed to initialise path info out idx store: %v", err) + return err + } + + log.Info("initialising stores complete") + + return nil +} diff --git a/internal/cli/store/run.go b/internal/cli/store/run.go index 681a0be..6ee45f6 100644 --- a/internal/cli/store/run.go +++ b/internal/cli/store/run.go @@ -22,10 +22,8 @@ import ( "github.com/brianmcgee/nvix/pkg/blob" - grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" - "github.com/nats-io/nats.go" - pb "code.tvl.fyi/tvix/castore/protos" + grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "github.com/charmbracelet/log" "github.com/ztrue/shutdown" @@ -34,10 +32,8 @@ import ( ) type Run struct { - Log cli.LogOptions `embed:"" short:"v"` - - NatsUrl string `short:"n" env:"NVIX_STORE_NATS_URL" default:"nats://localhost:4222"` - NatsCredentials string `short:"c" env:"NVIX_STORE_NATS_CREDENTIALS_FILE" required:"" type:"path"` + Log cli.LogOptions `embed:""` + Nats cli.NatsOptions `embed:""` ListenAddr string `short:"l" env:"NVIX_STORE_LISTEN_ADDR" default:"localhost:5000"` MetricsAddr string `short:"m" env:"NVIX_STORE_METRICS_ADDR" default:"localhost:5050"` @@ -46,10 +42,7 @@ type Run struct { func (r *Run) Run() error { r.Log.ConfigureLogger() - conn, err := nats.Connect(r.NatsUrl, nats.UserCredentials(r.NatsCredentials)) - if err != nil { - log.Fatalf("failed to connect to nats: %v", err) - } + conn := r.Nats.Connect() blobServer, err := blob.NewServer(conn) if err != nil { diff --git a/internal/cli/store/store.go b/internal/cli/store/store.go index 8d63439..abf34b1 100644 --- a/internal/cli/store/store.go +++ b/internal/cli/store/store.go @@ -1,5 +1,6 @@ package store type Cli struct { - Run Run `cmd:"" default:""` + Run Run `cmd:"" default:""` + Init Init `cmd:""` } diff --git a/nix/dev/nvix.nix b/nix/dev/nvix.nix index 1d2da90..404e8b1 100644 --- a/nix/dev/nvix.nix +++ b/nix/dev/nvix.nix @@ -2,16 +2,26 @@ perSystem = {self', ...}: { config.process-compose = { dev.settings.processes = { - nvix-store = { + nvix-store-init = { depends_on = { nats-server.condition = "process_healthy"; nsc-push.condition = "process_completed_successfully"; }; working_dir = "$PRJ_DATA_DIR"; + environment = { + NVIX_STORE_NATS_CREDENTIALS_FILE = "./nsc/creds/Tvix/Store/Admin.creds"; + }; + command = "${lib.getExe self'.packages.nvix} store init -v"; + }; + nvix-store = { + depends_on = { + nvix-store-init.condition = "process_completed_successfully"; + }; + working_dir = "$PRJ_DATA_DIR"; environment = { NVIX_STORE_NATS_CREDENTIALS_FILE = "./nsc/creds/Tvix/Store/Server.creds"; }; - command = "${lib.getExe self'.packages.nvix} store"; + command = "${lib.getExe self'.packages.nvix} store run -v"; # TODO readiness probe }; }; diff --git a/pkg/blob/grpc.go b/pkg/blob/grpc.go index 4216669..e699260 100644 --- a/pkg/blob/grpc.go +++ b/pkg/blob/grpc.go @@ -8,26 +8,12 @@ import ( "github.com/brianmcgee/nvix/pkg/store" "github.com/charmbracelet/log" - "github.com/juju/errors" "github.com/nats-io/nats.go" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) func NewServer(conn *nats.Conn) (*Server, error) { - js, err := conn.JetStream() - if err != nil { - return nil, errors.Annotate(err, "failed to create a JetStream context") - } - - if _, err := js.AddStream(&DiskBasedStreamConfig); err != nil { - return nil, errors.Annotate(err, "failed to create disk based stream") - } - - if _, err := js.AddStream(&MemoryBasedStreamConfig); err != nil { - return nil, errors.Annotate(err, "failed to create memory based stream") - } - return &Server{ conn: conn, store: &store.CdcStore{ diff --git a/pkg/blob/grpc_test.go b/pkg/blob/grpc_test.go index 61d3761..4ccd451 100644 --- a/pkg/blob/grpc_test.go +++ b/pkg/blob/grpc_test.go @@ -43,7 +43,16 @@ var sizes = []bytesize.ByteSize{ func blobServer(s *server.Server, t test.TestingT) (*grpc.Server, net.Listener) { t.Helper() - blobService, err := NewServer(test.NatsConn(t, s)) + conn := test.NatsConn(t, s) + + ctx := context.Background() + if err := NewMetaStore(conn).Init(ctx); err != nil { + t.Fatal(err) + } else if err := NewChunkStore(conn).Init(ctx); err != nil { + t.Fatal(err) + } + + blobService, err := NewServer(conn) if err != nil { t.Fatalf("failed to create blob service: %v", err) } diff --git a/pkg/cli/nats.go b/pkg/cli/nats.go new file mode 100644 index 0000000..d102898 --- /dev/null +++ b/pkg/cli/nats.go @@ -0,0 +1,19 @@ +package cli + +import ( + "github.com/charmbracelet/log" + "github.com/nats-io/nats.go" +) + +type NatsOptions struct { + NatsUrl string `short:"n" env:"NVIX_STORE_NATS_URL" default:"nats://localhost:4222"` + NatsCredentials string `short:"c" env:"NVIX_STORE_NATS_CREDENTIALS_FILE" required:"" type:"path"` +} + +func (no *NatsOptions) Connect() *nats.Conn { + conn, err := nats.Connect(no.NatsUrl, nats.UserCredentials(no.NatsCredentials)) + if err != nil { + log.Fatalf("failed to connect to nats: %v", err) + } + return conn +} diff --git a/pkg/directory/grpc.go b/pkg/directory/grpc.go index 46bf950..76098ca 100644 --- a/pkg/directory/grpc.go +++ b/pkg/directory/grpc.go @@ -11,7 +11,6 @@ import ( "github.com/brianmcgee/nvix/pkg/store" "github.com/charmbracelet/log" "github.com/golang/protobuf/proto" - "github.com/juju/errors" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -19,19 +18,6 @@ import ( ) func NewServer(conn *nats.Conn) (*Server, error) { - js, err := conn.JetStream() - if err != nil { - return nil, errors.Annotate(err, "failed to create a JetStream context") - } - - if _, err := js.AddStream(&DiskBasedStreamConfig); err != nil { - return nil, errors.Annotate(err, "failed to create disk based stream") - } - - if _, err := js.AddStream(&MemoryBasedStreamConfig); err != nil { - return nil, errors.Annotate(err, "failed to create memory based stream") - } - return &Server{ conn: conn, store: NewDirectoryStore(conn), diff --git a/pkg/pathinfo/grpc.go b/pkg/pathinfo/grpc.go index 594cff0..8419ed4 100644 --- a/pkg/pathinfo/grpc.go +++ b/pkg/pathinfo/grpc.go @@ -13,7 +13,6 @@ import ( "github.com/brianmcgee/nvix/pkg/store" "github.com/charmbracelet/log" "github.com/golang/protobuf/proto" - "github.com/juju/errors" multihash "github.com/multiformats/go-multihash/core" "github.com/nats-io/nats.go" "github.com/nix-community/go-nix/pkg/hash" @@ -23,19 +22,6 @@ import ( ) func NewServer(conn *nats.Conn, blob *blob.Server, directory *directory.Server) (*Service, error) { - js, err := conn.JetStream() - if err != nil { - return nil, errors.Annotate(err, "failed to create a JetStream context") - } - - if _, err := js.AddStream(&DiskBasedStreamConfig); err != nil { - return nil, errors.Annotate(err, "failed to create disk based stream") - } - - if _, err := js.AddStream(&MemoryBasedStreamConfig); err != nil { - return nil, errors.Annotate(err, "failed to create memory based stream") - } - return &Service{ conn: conn, store: NewPathInfoStore(conn), diff --git a/pkg/store/cache.go b/pkg/store/cache.go index 028cc7d..8b3961d 100644 --- a/pkg/store/cache.go +++ b/pkg/store/cache.go @@ -17,6 +17,13 @@ type CachingStore struct { Memory Store } +func (c *CachingStore) Init(ctx context.Context) error { + if err := c.Disk.Init(ctx); err != nil { + return err + } + return c.Memory.Init(ctx) +} + func (c *CachingStore) List(ctx context.Context) (util.Iterator[io.ReadCloser], error) { return c.Disk.List(ctx) } diff --git a/pkg/store/cdc.go b/pkg/store/cdc.go index 298f360..d744ca2 100644 --- a/pkg/store/cdc.go +++ b/pkg/store/cdc.go @@ -30,6 +30,13 @@ type CdcStore struct { Chunks Store } +func (c *CdcStore) Init(ctx context.Context) error { + if err := c.Meta.Init(ctx); err != nil { + return err + } + return c.Chunks.Init(ctx) +} + func (c *CdcStore) getMeta(key string, ctx context.Context) (*pb.BlobMeta, error) { reader, err := c.Meta.Get(key, ctx) if err != nil { diff --git a/pkg/store/nats.go b/pkg/store/nats.go index 76256dd..d70ab10 100644 --- a/pkg/store/nats.go +++ b/pkg/store/nats.go @@ -18,6 +18,15 @@ type NatsStore struct { SubjectPrefix string } +func (n *NatsStore) Init(ctx context.Context) error { + js, err := n.js(ctx) + if err != nil { + return err + } + _, err = js.AddStream(n.StreamConfig) + return err +} + func (n *NatsStore) Stat(key string, ctx context.Context) (ok bool, err error) { var reader io.ReadCloser reader, err = n.Get(key, ctx) diff --git a/pkg/store/types.go b/pkg/store/types.go index 9006d4c..f300e35 100644 --- a/pkg/store/types.go +++ b/pkg/store/types.go @@ -23,6 +23,7 @@ func (d Digest) String() string { } type Store interface { + Init(ctx context.Context) error Get(key string, ctx context.Context) (io.ReadCloser, error) Put(key string, reader io.ReadCloser, ctx context.Context) error PutAsync(key string, reader io.ReadCloser, ctx context.Context) (nats.PubAckFuture, error)