From 2a799fa0a192ddabd4108e5662c44e4af1a85ecd Mon Sep 17 00:00:00 2001 From: Antoine Gelloz Date: Thu, 26 Jan 2023 16:22:05 +0100 Subject: [PATCH] fix: Ristretto cache for Numscript initialization modification (#417) * fix: first * fix: cache capacity to 100 numscripts * fix: flag name * fix: cleanup --- cmd/container.go | 3 ++- cmd/container_test.go | 2 +- cmd/root.go | 4 ++++ pkg/api/internal/testing.go | 2 +- pkg/ledger/ledger.go | 11 +---------- pkg/ledger/ledger_test.go | 12 +++++++++++- pkg/ledger/resolver.go | 26 ++++++++++++++++++++------ 7 files changed, 40 insertions(+), 20 deletions(-) diff --git a/cmd/container.go b/cmd/container.go index 03d62dc80..562e05f66 100644 --- a/cmd/container.go +++ b/cmd/container.go @@ -168,7 +168,8 @@ func NewContainer(v *viper.Viper, userOptions ...fx.Option) *fx.App { )) // Handle resolver - options = append(options, ledger.ResolveModule()) + options = append(options, ledger.ResolveModule( + v.GetInt64(numscriptCacheCapacity))) // Api middlewares options = append(options, routes.ProvidePerLedgerMiddleware(func(tp trace.TracerProvider) []gin.HandlerFunc { diff --git a/cmd/container_test.go b/cmd/container_test.go index 759f4563c..c19806b6d 100644 --- a/cmd/container_test.go +++ b/cmd/container_test.go @@ -209,7 +209,6 @@ func TestContainers(t *testing.T) { name := uuid.New() l, err := resolver.GetLedger(ctx, name) - if err != nil { return err } @@ -251,6 +250,7 @@ func TestContainers(t *testing.T) { // Default options v.Set(storageDriverFlag, sqlstorage.SQLite.String()) v.Set(storageDirFlag, "/tmp") + v.Set(numscriptCacheCapacity, 100) //v.Set(storageSQLiteDBNameFlag, uuid.New()) tc.init(v) app := NewContainer(v, options...) diff --git a/cmd/root.go b/cmd/root.go index 8c77b9385..a000595c0 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -49,6 +49,8 @@ const ( authBearerUseScopesFlag = "auth-bearer-use-scopes" commitPolicyFlag = "commit-policy" + + numscriptCacheCapacity = "numscript-cache-capacity" ) var ( @@ -138,6 +140,8 @@ func NewRootCommand() *cobra.Command { root.PersistentFlags().Bool(authBearerUseScopesFlag, false, "Use scopes as defined by rfc https://datatracker.ietf.org/doc/html/rfc8693") root.PersistentFlags().String(commitPolicyFlag, "", "Transaction commit policy (default or allow-past-timestamps)") + root.PersistentFlags().Int(numscriptCacheCapacity, 100, "Capacity of the cache storing Numscript in RAM") + otlptraces.InitOTLPTracesFlags(root.PersistentFlags()) internal.InitHTTPBasicFlags(root) internal.InitAnalyticsFlags(root, DefaultSegmentWriteKey) diff --git a/pkg/api/internal/testing.go b/pkg/api/internal/testing.go index 8a1eca484..96c538c0b 100644 --- a/pkg/api/internal/testing.go +++ b/pkg/api/internal/testing.go @@ -248,7 +248,7 @@ func RunTest(t *testing.T, options ...fx.Option) { options = append([]fx.Option{ api.Module(api.Config{StorageDriver: "sqlite", Version: "latest", UseScopes: true}), - ledger.ResolveModule(), + ledger.ResolveModule(100), ledgertesting.ProvideLedgerStorageDriver(), fx.Invoke(func(driver storage.Driver[ledger.Store], lc fx.Lifecycle) { lc.Append(fx.Hook{ diff --git a/pkg/ledger/ledger.go b/pkg/ledger/ledger.go index 283d73fda..6653604e8 100644 --- a/pkg/ledger/ledger.go +++ b/pkg/ledger/ledger.go @@ -39,16 +39,7 @@ func WithPastTimestamps(l *Ledger) { l.allowPastTimestamps = true } -func NewLedger(store Store, monitor Monitor, options ...LedgerOption) (*Ledger, error) { - cache, err := ristretto.NewCache(&ristretto.Config{ - NumCounters: 1e7, // number of keys to track frequency of (10M). - MaxCost: 1 << 30, // maximum cost of cache (1GB). - BufferItems: 64, // number of keys per Get buffer. - }) - if err != nil { - return nil, errors.Wrap(err, "creating ledger cache") - } - +func NewLedger(store Store, monitor Monitor, cache *ristretto.Cache, options ...LedgerOption) (*Ledger, error) { l := &Ledger{ store: store, monitor: monitor, diff --git a/pkg/ledger/ledger_test.go b/pkg/ledger/ledger_test.go index 25b210bc1..fbfb93428 100644 --- a/pkg/ledger/ledger_test.go +++ b/pkg/ledger/ledger_test.go @@ -10,12 +10,14 @@ import ( "testing" "time" + "github.com/dgraph-io/ristretto" "github.com/mitchellh/mapstructure" "github.com/numary/ledger/pkg/core" "github.com/numary/ledger/pkg/ledger" "github.com/numary/ledger/pkg/ledgertesting" "github.com/numary/ledger/pkg/storage" "github.com/pborman/uuid" + "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -69,7 +71,15 @@ func runOnLedger(f func(l *ledger.Ledger), ledgerOptions ...ledger.LedgerOption) if err != nil { return err } - l, err := ledger.NewLedger(store, ledger.NewNoOpMonitor(), ledgerOptions...) + cache, err := ristretto.NewCache(&ristretto.Config{ + NumCounters: 1e7, // number of keys to track frequency of (10M). + MaxCost: 100, // maximum cost of cache. + BufferItems: 64, // number of keys per Get buffer. + }) + if err != nil { + panic(errors.Wrap(err, "creating ledger cache")) + } + l, err := ledger.NewLedger(store, ledger.NewNoOpMonitor(), cache, ledgerOptions...) if err != nil { panic(err) } diff --git a/pkg/ledger/resolver.go b/pkg/ledger/resolver.go index 530dca53c..339ce91d1 100644 --- a/pkg/ledger/resolver.go +++ b/pkg/ledger/resolver.go @@ -4,6 +4,7 @@ import ( "context" "sync" + "github.com/dgraph-io/ristretto" "github.com/numary/ledger/pkg/storage" "github.com/pkg/errors" "go.uber.org/fx" @@ -35,21 +36,32 @@ type Resolver struct { initializedStores map[string]struct{} monitor Monitor ledgerOptions []LedgerOption + cache *ristretto.Cache } func NewResolver( storageFactory storage.Driver[Store], ledgerOptions []LedgerOption, + numscriptCacheCapacity int64, options ...ResolverOption, ) *Resolver { + cache, err := ristretto.NewCache(&ristretto.Config{ + NumCounters: 1e7, // number of keys to track frequency of (10M). + MaxCost: numscriptCacheCapacity, // maximum cost of cache. + BufferItems: 64, // number of keys per Get buffer. + }) + if err != nil { + panic(errors.Wrap(err, "creating ledger cache")) + } + options = append(DefaultResolverOptions, options...) r := &Resolver{ storageDriver: storageFactory, initializedStores: map[string]struct{}{}, + cache: cache, } for _, opt := range options { - err := opt.apply(r) - if err != nil { + if err := opt.apply(r); err != nil { panic(errors.Wrap(err, "applying option on resolver")) } } @@ -68,7 +80,7 @@ func (r *Resolver) GetLedger(ctx context.Context, name string) (*Ledger, error) _, ok := r.initializedStores[name] r.lock.RUnlock() if ok { - return NewLedger(store, r.monitor, r.ledgerOptions...) + return NewLedger(store, r.monitor, r.cache, r.ledgerOptions...) } r.lock.Lock() @@ -82,7 +94,7 @@ func (r *Resolver) GetLedger(ctx context.Context, name string) (*Ledger, error) r.initializedStores[name] = struct{}{} } - return NewLedger(store, r.monitor, r.ledgerOptions...) + return NewLedger(store, r.monitor, r.cache, r.ledgerOptions...) } const ResolverOptionsKey = `group:"_ledgerResolverOptions"` @@ -94,10 +106,12 @@ func ProvideResolverOption(provider interface{}) fx.Option { ) } -func ResolveModule() fx.Option { +func ResolveModule(numscriptCacheCapacity int64) fx.Option { return fx.Options( fx.Provide( - fx.Annotate(NewResolver, fx.ParamTags("", ResolverLedgerOptionsKey, ResolverOptionsKey)), + fx.Annotate(func(storageFactory storage.Driver[Store], ledgerOptions []LedgerOption, options ...ResolverOption) *Resolver { + return NewResolver(storageFactory, ledgerOptions, numscriptCacheCapacity, options...) + }, fx.ParamTags("", ResolverLedgerOptionsKey, ResolverOptionsKey)), ), ) }