Skip to content

Commit

Permalink
fix: Ristretto cache for Numscript initialization modification (#417)
Browse files Browse the repository at this point in the history
* fix: first

* fix: cache capacity to 100 numscripts

* fix: flag name

* fix: cleanup
  • Loading branch information
Antoine Gelloz authored and flemzord committed Jan 26, 2023
1 parent 3f88d83 commit 2a799fa
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 20 deletions.
3 changes: 2 additions & 1 deletion cmd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ func NewContainer(v *viper.Viper, userOptions ...fx.Option) *fx.App {
))

// Handle resolver
options = append(options, ledger.ResolveModule())
options = append(options, ledger.ResolveModule(
v.GetInt64(numscriptCacheCapacity)))

// Api middlewares
options = append(options, routes.ProvidePerLedgerMiddleware(func(tp trace.TracerProvider) []gin.HandlerFunc {
Expand Down
2 changes: 1 addition & 1 deletion cmd/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ func TestContainers(t *testing.T) {

name := uuid.New()
l, err := resolver.GetLedger(ctx, name)

if err != nil {
return err
}
Expand Down Expand Up @@ -251,6 +250,7 @@ func TestContainers(t *testing.T) {
// Default options
v.Set(storageDriverFlag, sqlstorage.SQLite.String())
v.Set(storageDirFlag, "/tmp")
v.Set(numscriptCacheCapacity, 100)
//v.Set(storageSQLiteDBNameFlag, uuid.New())
tc.init(v)
app := NewContainer(v, options...)
Expand Down
4 changes: 4 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const (
authBearerUseScopesFlag = "auth-bearer-use-scopes"

commitPolicyFlag = "commit-policy"

numscriptCacheCapacity = "numscript-cache-capacity"
)

var (
Expand Down Expand Up @@ -138,6 +140,8 @@ func NewRootCommand() *cobra.Command {
root.PersistentFlags().Bool(authBearerUseScopesFlag, false, "Use scopes as defined by rfc https://datatracker.ietf.org/doc/html/rfc8693")
root.PersistentFlags().String(commitPolicyFlag, "", "Transaction commit policy (default or allow-past-timestamps)")

root.PersistentFlags().Int(numscriptCacheCapacity, 100, "Capacity of the cache storing Numscript in RAM")

otlptraces.InitOTLPTracesFlags(root.PersistentFlags())
internal.InitHTTPBasicFlags(root)
internal.InitAnalyticsFlags(root, DefaultSegmentWriteKey)
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/internal/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func RunTest(t *testing.T, options ...fx.Option) {

options = append([]fx.Option{
api.Module(api.Config{StorageDriver: "sqlite", Version: "latest", UseScopes: true}),
ledger.ResolveModule(),
ledger.ResolveModule(100),
ledgertesting.ProvideLedgerStorageDriver(),
fx.Invoke(func(driver storage.Driver[ledger.Store], lc fx.Lifecycle) {
lc.Append(fx.Hook{
Expand Down
11 changes: 1 addition & 10 deletions pkg/ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,7 @@ func WithPastTimestamps(l *Ledger) {
l.allowPastTimestamps = true
}

func NewLedger(store Store, monitor Monitor, options ...LedgerOption) (*Ledger, error) {
cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: 1e7, // number of keys to track frequency of (10M).
MaxCost: 1 << 30, // maximum cost of cache (1GB).
BufferItems: 64, // number of keys per Get buffer.
})
if err != nil {
return nil, errors.Wrap(err, "creating ledger cache")
}

func NewLedger(store Store, monitor Monitor, cache *ristretto.Cache, options ...LedgerOption) (*Ledger, error) {
l := &Ledger{
store: store,
monitor: monitor,
Expand Down
12 changes: 11 additions & 1 deletion pkg/ledger/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import (
"testing"
"time"

"github.com/dgraph-io/ristretto"
"github.com/mitchellh/mapstructure"
"github.com/numary/ledger/pkg/core"
"github.com/numary/ledger/pkg/ledger"
"github.com/numary/ledger/pkg/ledgertesting"
"github.com/numary/ledger/pkg/storage"
"github.com/pborman/uuid"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -69,7 +71,15 @@ func runOnLedger(f func(l *ledger.Ledger), ledgerOptions ...ledger.LedgerOption)
if err != nil {
return err
}
l, err := ledger.NewLedger(store, ledger.NewNoOpMonitor(), ledgerOptions...)
cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: 1e7, // number of keys to track frequency of (10M).
MaxCost: 100, // maximum cost of cache.
BufferItems: 64, // number of keys per Get buffer.
})
if err != nil {
panic(errors.Wrap(err, "creating ledger cache"))
}
l, err := ledger.NewLedger(store, ledger.NewNoOpMonitor(), cache, ledgerOptions...)
if err != nil {
panic(err)
}
Expand Down
26 changes: 20 additions & 6 deletions pkg/ledger/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"sync"

"github.com/dgraph-io/ristretto"
"github.com/numary/ledger/pkg/storage"
"github.com/pkg/errors"
"go.uber.org/fx"
Expand Down Expand Up @@ -35,21 +36,32 @@ type Resolver struct {
initializedStores map[string]struct{}
monitor Monitor
ledgerOptions []LedgerOption
cache *ristretto.Cache
}

func NewResolver(
storageFactory storage.Driver[Store],
ledgerOptions []LedgerOption,
numscriptCacheCapacity int64,
options ...ResolverOption,
) *Resolver {
cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: 1e7, // number of keys to track frequency of (10M).
MaxCost: numscriptCacheCapacity, // maximum cost of cache.
BufferItems: 64, // number of keys per Get buffer.
})
if err != nil {
panic(errors.Wrap(err, "creating ledger cache"))
}

options = append(DefaultResolverOptions, options...)
r := &Resolver{
storageDriver: storageFactory,
initializedStores: map[string]struct{}{},
cache: cache,
}
for _, opt := range options {
err := opt.apply(r)
if err != nil {
if err := opt.apply(r); err != nil {
panic(errors.Wrap(err, "applying option on resolver"))
}
}
Expand All @@ -68,7 +80,7 @@ func (r *Resolver) GetLedger(ctx context.Context, name string) (*Ledger, error)
_, ok := r.initializedStores[name]
r.lock.RUnlock()
if ok {
return NewLedger(store, r.monitor, r.ledgerOptions...)
return NewLedger(store, r.monitor, r.cache, r.ledgerOptions...)
}

r.lock.Lock()
Expand All @@ -82,7 +94,7 @@ func (r *Resolver) GetLedger(ctx context.Context, name string) (*Ledger, error)
r.initializedStores[name] = struct{}{}
}

return NewLedger(store, r.monitor, r.ledgerOptions...)
return NewLedger(store, r.monitor, r.cache, r.ledgerOptions...)
}

const ResolverOptionsKey = `group:"_ledgerResolverOptions"`
Expand All @@ -94,10 +106,12 @@ func ProvideResolverOption(provider interface{}) fx.Option {
)
}

func ResolveModule() fx.Option {
func ResolveModule(numscriptCacheCapacity int64) fx.Option {
return fx.Options(
fx.Provide(
fx.Annotate(NewResolver, fx.ParamTags("", ResolverLedgerOptionsKey, ResolverOptionsKey)),
fx.Annotate(func(storageFactory storage.Driver[Store], ledgerOptions []LedgerOption, options ...ResolverOption) *Resolver {
return NewResolver(storageFactory, ledgerOptions, numscriptCacheCapacity, options...)
}, fx.ParamTags("", ResolverLedgerOptionsKey, ResolverOptionsKey)),
),
)
}

0 comments on commit 2a799fa

Please sign in to comment.