Skip to content

Commit

Permalink
feat: add store init command
Browse files Browse the repository at this point in the history
Separates creation of NATS streams into a separate dedicated command.

Signed-off-by: Brian McGee <[email protected]>
  • Loading branch information
brianmcgee committed Oct 12, 2023
1 parent f6832b8 commit 7ebff40
Show file tree
Hide file tree
Showing 13 changed files with 130 additions and 57 deletions.
59 changes: 59 additions & 0 deletions internal/cli/store/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package store

import (
"context"

"github.com/brianmcgee/nvix/pkg/blob"
"github.com/brianmcgee/nvix/pkg/cli"
"github.com/brianmcgee/nvix/pkg/directory"
"github.com/brianmcgee/nvix/pkg/pathinfo"
"github.com/charmbracelet/log"
)

type Init struct {
Log cli.LogOptions `embed:""`
Nats cli.NatsOptions `embed:""`
}

func (i *Init) Run() error {
i.Log.ConfigureLogger()
conn := i.Nats.Connect()

ctx := context.Background()

log.Info("initialising stores")

log.Info("initialising blob chunk store")
if err := blob.NewChunkStore(conn).Init(ctx); err != nil {
log.Errorf("failed to initialise blob chunk store: %v", err)
return err
}

log.Info("initialising blob meta store")
if err := blob.NewMetaStore(conn).Init(ctx); err != nil {
log.Errorf("failed to initialise blob meta store: %v", err)
return err
}

log.Info("initialising directory store")
if err := directory.NewDirectoryStore(conn).Init(ctx); err != nil {
log.Errorf("failed to initialise directory store: %v", err)
return err
}

log.Info("initialising path info store")
if err := pathinfo.NewPathInfoStore(conn).Init(ctx); err != nil {
log.Errorf("failed to initialise path info store: %v", err)
return err
}

log.Info("initialising path info out idx store")
if err := pathinfo.NewPathInfoOutIdxStore(conn).Init(ctx); err != nil {
log.Errorf("failed to initialise path info out idx store: %v", err)
return err
}

log.Info("initialising stores complete")

return nil
}
15 changes: 4 additions & 11 deletions internal/cli/store/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ import (

"github.com/brianmcgee/nvix/pkg/blob"

grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/nats-io/nats.go"

pb "code.tvl.fyi/tvix/castore/protos"
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"

"github.com/charmbracelet/log"
"github.com/ztrue/shutdown"
Expand All @@ -34,10 +32,8 @@ import (
)

type Run struct {
Log cli.LogOptions `embed:"" short:"v"`

NatsUrl string `short:"n" env:"NVIX_STORE_NATS_URL" default:"nats://localhost:4222"`
NatsCredentials string `short:"c" env:"NVIX_STORE_NATS_CREDENTIALS_FILE" required:"" type:"path"`
Log cli.LogOptions `embed:""`
Nats cli.NatsOptions `embed:""`

ListenAddr string `short:"l" env:"NVIX_STORE_LISTEN_ADDR" default:"localhost:5000"`
MetricsAddr string `short:"m" env:"NVIX_STORE_METRICS_ADDR" default:"localhost:5050"`
Expand All @@ -46,10 +42,7 @@ type Run struct {
func (r *Run) Run() error {
r.Log.ConfigureLogger()

conn, err := nats.Connect(r.NatsUrl, nats.UserCredentials(r.NatsCredentials))
if err != nil {
log.Fatalf("failed to connect to nats: %v", err)
}
conn := r.Nats.Connect()

blobServer, err := blob.NewServer(conn)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion internal/cli/store/store.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package store

type Cli struct {
Run Run `cmd:"" default:""`
Run Run `cmd:"" default:""`
Init Init `cmd:""`
}
14 changes: 12 additions & 2 deletions nix/dev/nvix.nix
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,26 @@
perSystem = {self', ...}: {
config.process-compose = {
dev.settings.processes = {
nvix-store = {
nvix-store-init = {
depends_on = {
nats-server.condition = "process_healthy";
nsc-push.condition = "process_completed_successfully";
};
working_dir = "$PRJ_DATA_DIR";
environment = {
NVIX_STORE_NATS_CREDENTIALS_FILE = "./nsc/creds/Tvix/Store/Admin.creds";
};
command = "${lib.getExe self'.packages.nvix} store init -v";
};
nvix-store = {
depends_on = {
nvix-store-init.condition = "process_completed_successfully";
};
working_dir = "$PRJ_DATA_DIR";
environment = {
NVIX_STORE_NATS_CREDENTIALS_FILE = "./nsc/creds/Tvix/Store/Server.creds";
};
command = "${lib.getExe self'.packages.nvix} store";
command = "${lib.getExe self'.packages.nvix} store run -v";
# TODO readiness probe
};
};
Expand Down
14 changes: 0 additions & 14 deletions pkg/blob/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,12 @@ import (

"github.com/brianmcgee/nvix/pkg/store"
"github.com/charmbracelet/log"
"github.com/juju/errors"
"github.com/nats-io/nats.go"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func NewServer(conn *nats.Conn) (*Server, error) {
js, err := conn.JetStream()
if err != nil {
return nil, errors.Annotate(err, "failed to create a JetStream context")
}

if _, err := js.AddStream(&DiskBasedStreamConfig); err != nil {
return nil, errors.Annotate(err, "failed to create disk based stream")
}

if _, err := js.AddStream(&MemoryBasedStreamConfig); err != nil {
return nil, errors.Annotate(err, "failed to create memory based stream")
}

return &Server{
conn: conn,
store: &store.CdcStore{
Expand Down
11 changes: 10 additions & 1 deletion pkg/blob/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,16 @@ var sizes = []bytesize.ByteSize{
func blobServer(s *server.Server, t test.TestingT) (*grpc.Server, net.Listener) {
t.Helper()

blobService, err := NewServer(test.NatsConn(t, s))
conn := test.NatsConn(t, s)

ctx := context.Background()
if err := NewMetaStore(conn).Init(ctx); err != nil {
t.Fatal(err)
} else if err := NewChunkStore(conn).Init(ctx); err != nil {
t.Fatal(err)
}

blobService, err := NewServer(conn)
if err != nil {
t.Fatalf("failed to create blob service: %v", err)
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/cli/nats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package cli

import (
"github.com/charmbracelet/log"
"github.com/nats-io/nats.go"
)

type NatsOptions struct {
NatsUrl string `short:"n" env:"NVIX_STORE_NATS_URL" default:"nats://localhost:4222"`
NatsCredentials string `short:"c" env:"NVIX_STORE_NATS_CREDENTIALS_FILE" required:"" type:"path"`
}

func (no *NatsOptions) Connect() *nats.Conn {
conn, err := nats.Connect(no.NatsUrl, nats.UserCredentials(no.NatsCredentials))
if err != nil {
log.Fatalf("failed to connect to nats: %v", err)
}
return conn
}
14 changes: 0 additions & 14 deletions pkg/directory/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,13 @@ import (
"github.com/brianmcgee/nvix/pkg/store"
"github.com/charmbracelet/log"
"github.com/golang/protobuf/proto"
"github.com/juju/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/nats-io/nats.go"
)

func NewServer(conn *nats.Conn) (*Server, error) {
js, err := conn.JetStream()
if err != nil {
return nil, errors.Annotate(err, "failed to create a JetStream context")
}

if _, err := js.AddStream(&DiskBasedStreamConfig); err != nil {
return nil, errors.Annotate(err, "failed to create disk based stream")
}

if _, err := js.AddStream(&MemoryBasedStreamConfig); err != nil {
return nil, errors.Annotate(err, "failed to create memory based stream")
}

return &Server{
conn: conn,
store: NewDirectoryStore(conn),
Expand Down
14 changes: 0 additions & 14 deletions pkg/pathinfo/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/brianmcgee/nvix/pkg/store"
"github.com/charmbracelet/log"
"github.com/golang/protobuf/proto"
"github.com/juju/errors"
multihash "github.com/multiformats/go-multihash/core"
"github.com/nats-io/nats.go"
"github.com/nix-community/go-nix/pkg/hash"
Expand All @@ -23,19 +22,6 @@ import (
)

func NewServer(conn *nats.Conn, blob *blob.Server, directory *directory.Server) (*Service, error) {
js, err := conn.JetStream()
if err != nil {
return nil, errors.Annotate(err, "failed to create a JetStream context")
}

if _, err := js.AddStream(&DiskBasedStreamConfig); err != nil {
return nil, errors.Annotate(err, "failed to create disk based stream")
}

if _, err := js.AddStream(&MemoryBasedStreamConfig); err != nil {
return nil, errors.Annotate(err, "failed to create memory based stream")
}

return &Service{
conn: conn,
store: NewPathInfoStore(conn),
Expand Down
7 changes: 7 additions & 0 deletions pkg/store/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ type CachingStore struct {
Memory Store
}

func (c *CachingStore) Init(ctx context.Context) error {
if err := c.Disk.Init(ctx); err != nil {
return err
}
return c.Memory.Init(ctx)
}

func (c *CachingStore) List(ctx context.Context) (util.Iterator[io.ReadCloser], error) {
return c.Disk.List(ctx)
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/store/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ type CdcStore struct {
Chunks Store
}

func (c *CdcStore) Init(ctx context.Context) error {
if err := c.Meta.Init(ctx); err != nil {
return err
}
return c.Chunks.Init(ctx)
}

func (c *CdcStore) getMeta(key string, ctx context.Context) (*pb.BlobMeta, error) {
reader, err := c.Meta.Get(key, ctx)
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions pkg/store/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ type NatsStore struct {
SubjectPrefix string
}

func (n *NatsStore) Init(ctx context.Context) error {
js, err := n.js(ctx)
if err != nil {
return err
}
_, err = js.AddStream(n.StreamConfig)
return err
}

func (n *NatsStore) Stat(key string, ctx context.Context) (ok bool, err error) {
var reader io.ReadCloser
reader, err = n.Get(key, ctx)
Expand Down
1 change: 1 addition & 0 deletions pkg/store/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func (d Digest) String() string {
}

type Store interface {
Init(ctx context.Context) error
Get(key string, ctx context.Context) (io.ReadCloser, error)
Put(key string, reader io.ReadCloser, ctx context.Context) error
PutAsync(key string, reader io.ReadCloser, ctx context.Context) (nats.PubAckFuture, error)
Expand Down

0 comments on commit 7ebff40

Please sign in to comment.