Skip to content

Commit

Permalink
fixup! Batcher runner implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandros Filios <[email protected]>
  • Loading branch information
alexandrosfilios committed Sep 20, 2024
1 parent 1eb8edc commit 9d79baa
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 22 deletions.
24 changes: 2 additions & 22 deletions pkg/runner/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,16 @@ import (
"sync/atomic"
"time"

"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
"github.com/pkg/errors"
)

var logger = flogging.MustGetLogger("batch-executor")

type BatchExecutor[I any, O any] interface {
Execute(input I) (O, error)
}

type BatchRunner[V any] interface {
Run(v V) error
}

type Output[O any] struct {
Val O
Err error
}

type batcher[I any, O any] struct {
idx uint32
inputs []chan I
outputs []chan O
locks []sync.Mutex
len uint32
executor func([]I) []O
executor ExecuteFunc[I, O]
timeout time.Duration
}

Expand Down Expand Up @@ -120,7 +104,7 @@ type batchExecutor[I any, O any] struct {
*batcher[I, Output[O]]
}

func NewBatchExecutor[I any, O any](executor func([]I) []Output[O], capacity int, timeout time.Duration) BatchExecutor[I, O] {
func NewBatchExecutor[I any, O any](executor ExecuteFunc[I, Output[O]], capacity int, timeout time.Duration) BatchExecutor[I, O] {
return &batchExecutor[I, O]{batcher: newBatcher(executor, capacity, timeout)}
}

Expand All @@ -133,10 +117,6 @@ type batchRunner[V any] struct {
*batcher[V, error]
}

func NewSerialRunner[V any](runner func([]V) []error) BatchRunner[V] {
return NewBatchRunner(runner, 1, 1*time.Hour)
}

func NewBatchRunner[V any](runner func([]V) []error, capacity int, timeout time.Duration) BatchRunner[V] {
return &batchRunner[V]{batcher: newBatcher(runner, capacity, timeout)}
}
Expand Down
26 changes: 26 additions & 0 deletions pkg/runner/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package runner

import "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"

var logger = flogging.MustGetLogger("batch-executor")

type BatchExecutor[I any, O any] interface {
Execute(input I) (O, error)
}

type BatchRunner[V any] interface {
Run(v V) error
}

type Output[O any] struct {
Val O
Err error
}

type ExecuteFunc[I any, O any] func([]I) []O
32 changes: 32 additions & 0 deletions pkg/runner/serial.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package runner

func NewSerialRunner[V any](runner ExecuteFunc[V, error]) BatchRunner[V] {
return &serialRunner[V]{executor: runner}
}

type serialRunner[V any] struct {
executor ExecuteFunc[V, error]
}

func (r *serialRunner[V]) Run(val V) error {
return r.executor([]V{val})[0]
}

func NewSerialExecutor[I any, O any](executor ExecuteFunc[I, Output[O]]) BatchExecutor[I, O] {
return &serialExecutor[I, O]{executor: executor}
}

type serialExecutor[I any, O any] struct {
executor ExecuteFunc[I, Output[O]]
}

func (r *serialExecutor[I, O]) Execute(input I) (O, error) {
res := r.executor([]I{input})[0]
return res.Val, res.Err
}

0 comments on commit 9d79baa

Please sign in to comment.