Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch commits #669

Merged
merged 5 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/fabric/fabricdev/core/fabricdev/channelprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (p *provider) NewChannel(nw driver.FabricNetworkService, channelName string
}

// Vault
vault, txIDStore, err := p.newVault(nw.ConfigService(), channelName, p.drivers, p.tracerProvider)
vault, txIDStore, err := p.newVault(nw.ConfigService(), channelName, p.drivers, p.metricsProvider, p.tracerProvider)
if err != nil {
return nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions docs/fabric/fabricdev/core/fabricdev/vault/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db"
dbdriver "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/trace"
)

var logger = flogging.MustGetLogger("fabric-sdk.core.vault")

func New(configService driver.ConfigService, channel string, drivers []dbdriver.NamedDriver, tracerProvider trace.TracerProvider) (*Vault, driver.TXIDStore, error) {
func New(configService driver.ConfigService, channel string, drivers []dbdriver.NamedDriver, metricsProvider metrics.Provider, tracerProvider trace.TracerProvider) (*Vault, driver.TXIDStore, error) {
var d dbdriver.Driver
for _, driver := range drivers {
if driver.Name == configService.VaultPersistenceType() {
Expand Down Expand Up @@ -51,10 +52,10 @@ func New(configService driver.ConfigService, channel string, drivers []dbdriver.
if txIDStoreCacheSize > 0 {
logger.Debugf("creating txID store second cache with size [%d]", txIDStoreCacheSize)
c := txidstore.NewCache(txidStore, secondcache.NewTyped[*txidstore.Entry](txIDStoreCacheSize), logger)
return NewVault(persistence, c, tracerProvider), c, nil
return NewVault(persistence, c, metricsProvider, tracerProvider), c, nil
} else {
logger.Debugf("txID store without cache selected")
c := txidstore.NewNoCache(txidStore)
return NewVault(persistence, c, tracerProvider), c, nil
return NewVault(persistence, c, metricsProvider, tracerProvider), c, nil
}
}
4 changes: 3 additions & 1 deletion docs/fabric/fabricdev/core/fabricdev/vault/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/common/driver"
fdriver "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics"
"go.opentelemetry.io/otel/trace"
)

Expand All @@ -23,14 +24,15 @@ type (
)

// NewVault returns a new instance of Vault
func NewVault(store vault.VersionedPersistence, txIDStore TXIDStore, tracerProvider trace.TracerProvider) *Vault {
func NewVault(store vault.VersionedPersistence, txIDStore TXIDStore, metricsProvider metrics.Provider, tracerProvider trace.TracerProvider) *Vault {
return vault.New[fdriver.ValidationCode](
flogging.MustGetLogger("fabric-sdk.generic.vault"),
store,
txIDStore,
&fdriver.ValidationCodeProvider{},
newInterceptor,
&populator{},
metricsProvider,
tracerProvider,
)
}
Expand Down
126 changes: 126 additions & 0 deletions pkg/runner/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package runner

import (
"sync"
"sync/atomic"
"time"

"github.com/pkg/errors"
)

type batcher[I any, O any] struct {
idx uint32
inputs []chan I
outputs []chan O
locks []sync.Mutex
len uint32
executor ExecuteFunc[I, O]
timeout time.Duration
}

func newBatcher[I any, O any](executor func([]I) []O, capacity int, timeout time.Duration) *batcher[I, O] {
inputs := make([]chan I, capacity)
outputs := make([]chan O, capacity)
locks := make([]sync.Mutex, capacity)
for i := 0; i < capacity; i++ {
inputs[i] = make(chan I)
outputs[i] = make(chan O)
locks[i] = sync.Mutex{}
}

e := &batcher[I, O]{
inputs: inputs,
outputs: outputs,
locks: locks,
len: uint32(capacity),
executor: executor,
timeout: timeout,
}
go e.start()
return e
}

func (r *batcher[I, O]) start() {
var inputs []I
ticker := time.NewTicker(r.timeout)
firstIdx := uint32(0) // Points to the first element of a new cycle
for {
// If we fill a whole cycle, the elements will be from firstIdx % r.len to lastIdx % r.len
var lastIdx uint32
var lastElement I
select {
case lastElement = <-r.inputs[(firstIdx+r.len-1)%r.len]:
lastIdx = firstIdx + r.len
logger.Debugf("Execute because %d input channels are full", r.len)
case <-ticker.C:
lastIdx = atomic.LoadUint32(&r.idx)
if lastIdx == firstIdx {
logger.Debugf("No new elements. Skip execution...")
continue
}
lastElement = <-r.inputs[(lastIdx-1)%r.len] // We read the lastElement here just to avoid code repetition
logger.Debugf("Execute because timeout of %v passed", r.timeout)
}
logger.Debugf("Read batch range [%d,%d)", firstIdx, lastIdx)

inputs = make([]I, lastIdx-firstIdx)
for i := uint32(0); i < lastIdx-firstIdx-1; i++ {
inputs[i] = <-r.inputs[(i+firstIdx)%r.len]
}
inputs[lastIdx-firstIdx-1] = lastElement
ticker.Reset(r.timeout)

logger.Debugf("Start execution for %d inputs", len(inputs))
outs := r.executor(inputs)
logger.Debugf("Execution finished with %d outputs", len(outs))
if len(inputs) != len(outs) {
panic(errors.Errorf("expected %d outputs, but got %d", len(inputs), len(outs)))
}
for i, err := range outs {
r.outputs[(firstIdx+uint32(i))%r.len] <- err
}
logger.Debugf("Results distributed for range [%d,%d)", firstIdx, lastIdx)
firstIdx = lastIdx
}
}

func (r *batcher[I, O]) call(input I) O {
idx := atomic.AddUint32(&r.idx, 1) - 1
r.locks[idx%r.len].Lock()
defer r.locks[idx%r.len].Unlock()
r.inputs[idx%r.len] <- input
logger.Debugf("Enqueued input [%d] and waiting for result", idx)
defer logger.Debugf("Return result of output [%d]", idx)
return <-r.outputs[idx%r.len]
}

type batchExecutor[I any, O any] struct {
*batcher[I, Output[O]]
}

func NewBatchExecutor[I any, O any](executor ExecuteFunc[I, Output[O]], capacity int, timeout time.Duration) BatchExecutor[I, O] {
return &batchExecutor[I, O]{batcher: newBatcher(executor, capacity, timeout)}
}

func (r *batchExecutor[I, O]) Execute(input I) (O, error) {
o := r.batcher.call(input)
return o.Val, o.Err
}

type batchRunner[V any] struct {
*batcher[V, error]
}

func NewBatchRunner[V any](runner func([]V) []error, capacity int, timeout time.Duration) BatchRunner[V] {
return &batchRunner[V]{batcher: newBatcher(runner, capacity, timeout)}
}

func (r *batchRunner[V]) Run(val V) error {
return r.call(val)
}
83 changes: 83 additions & 0 deletions pkg/runner/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package runner

import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)

var counter uint32

func TestBatchRunner(t *testing.T) {
atomic.StoreUint32(&counter, 0)
runner, m, locksObtained := newBatchRunner()

run(t, runner, 1000)
assert.Len(t, m, 1000)
assert.Equal(t, "val_10", m["key_10"])
assert.Equal(t, 10, int(atomic.LoadUint32(locksObtained)))
}

func TestBatchRunnerFewRequests(t *testing.T) {
atomic.StoreUint32(&counter, 0)
runner, m, locksObtained := newBatchRunner()

run(t, runner, 1)

assert.Len(t, m, 1)
assert.Equal(t, "val_1", m["key_1"])
assert.Equal(t, 1, int(atomic.LoadUint32(locksObtained)))

run(t, runner, 3)
assert.Len(t, m, 4)
assert.Equal(t, 2, int(atomic.LoadUint32(locksObtained)))
}

func newBatchRunner() (BatchRunner[int], map[string]string, *uint32) {
var locksObtained uint32
m := make(map[string]string)
var mu sync.RWMutex
runner := NewBatchRunner(func(vs []int) []error {
mu.Lock()
atomic.AddUint32(&locksObtained, 1)
defer mu.Unlock()
errs := make([]error, len(vs))
for i, v := range vs {
m[fmt.Sprintf("key_%d", v)] = fmt.Sprintf("val_%d", v)
if v%10 == 0 {
errs[i] = errors.Errorf("error_%d", v)
}
}
return errs
}, 100, 10*time.Millisecond)
return runner, m, &locksObtained
}

func run(t *testing.T, runner BatchRunner[int], times int) {
var wg sync.WaitGroup
wg.Add(times)
for i := 0; i < times; i++ {
v := int(atomic.AddUint32(&counter, 1))
go func() {
defer wg.Done()
err := runner.Run(v)
if v%10 == 0 {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
}()
}
wg.Wait()
}
26 changes: 26 additions & 0 deletions pkg/runner/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package runner

import "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"

var logger = flogging.MustGetLogger("batch-executor")

type BatchExecutor[I any, O any] interface {
Execute(input I) (O, error)
}

type BatchRunner[V any] interface {
Run(v V) error
}

type Output[O any] struct {
Val O
Err error
}

type ExecuteFunc[I any, O any] func([]I) []O
32 changes: 32 additions & 0 deletions pkg/runner/serial.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package runner

func NewSerialRunner[V any](runner ExecuteFunc[V, error]) BatchRunner[V] {
return &serialRunner[V]{executor: runner}
}

type serialRunner[V any] struct {
executor ExecuteFunc[V, error]
}

func (r *serialRunner[V]) Run(val V) error {
return r.executor([]V{val})[0]
}

func NewSerialExecutor[I any, O any](executor ExecuteFunc[I, Output[O]]) BatchExecutor[I, O] {
return &serialExecutor[I, O]{executor: executor}
}

type serialExecutor[I any, O any] struct {
executor ExecuteFunc[I, Output[O]]
}

func (r *serialExecutor[I, O]) Execute(input I) (O, error) {
res := r.executor([]I{input})[0]
return res.Val, res.Err
}
2 changes: 1 addition & 1 deletion platform/common/core/generic/vault/inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (i *Inspector) SetStateMetadata(driver.Namespace, driver.PKey, driver.Metad
panic("programming error: the rwset inspector is read-only")
}

func (i *Inspector) SetStateMetadatas(ns driver.Namespace, kvs map[driver.PKey]driver.Metadata, block driver.BlockNum, txnum driver.TxNum) map[driver.PKey]error {
func (i *Inspector) SetStateMetadatas(ns driver.Namespace, kvs map[driver.PKey]driver.VersionedMetadataValue) map[driver.PKey]error {
panic("programming error: the rwset inspector is read-only")
}

Expand Down
3 changes: 2 additions & 1 deletion platform/common/core/generic/vault/interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"testing"

"github.com/hyperledger-labs/fabric-smart-client/platform/common/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/assert"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
)
Expand Down Expand Up @@ -45,7 +46,7 @@ func (qe mockQE) Done() {
type mockTXIDStoreReader struct {
}

func (m mockTXIDStoreReader) Iterator(pos interface{}) (driver.TxIDIterator[int], error) {
func (m mockTXIDStoreReader) Iterator(interface{}) (collections.Iterator[*driver.ByNum[int]], error) {
panic("not implemented")
}
func (m mockTXIDStoreReader) Get(txID driver.TxID) (int, string, error) {
Expand Down
21 changes: 20 additions & 1 deletion platform/common/core/generic/vault/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,35 @@ SPDX-License-Identifier: Apache-2.0
package vault

import (
"time"

"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/tracing"
"go.opentelemetry.io/otel/trace"
)

type Metrics struct {
CommitDuration metrics.Histogram
BatchedCommitDuration metrics.Histogram

Vault trace.Tracer
}

func NewMetrics(p trace.TracerProvider) *Metrics {
func NewMetrics(m metrics.Provider, p trace.TracerProvider) *Metrics {
return &Metrics{
CommitDuration: m.NewHistogram(metrics.HistogramOpts{
Namespace: "vault",
Name: "commit",
Help: "Histogram for the duration of commit",
Buckets: utils.ExponentialBucketTimeRange(0, 5*time.Second, 15),
}),
BatchedCommitDuration: m.NewHistogram(metrics.HistogramOpts{
Namespace: "vault",
Name: "batched_commit",
Help: "Histogram for the duration of commit with the batching overhead",
Buckets: utils.ExponentialBucketTimeRange(0, 5*time.Second, 15),
}),
Vault: p.Tracer("vault", tracing.WithMetricsOpts(tracing.MetricsOpts{
Namespace: "coresdk",
LabelNames: []tracing.LabelName{},
Expand Down
Loading