Skip to content

Commit

Permalink
remove batch id
Browse files Browse the repository at this point in the history
  • Loading branch information
felixsebastian committed Jun 3, 2024
1 parent 370731d commit ba7363e
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 59 deletions.
4 changes: 2 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ package microbatch
// of events. Each batch will be called from it's own goroutine, so be careful
// when sharing state with BatchProcessor.
type BatchProcessor[E any, R any] interface {
Run(batch []E, batchId int) R
Run(batch []E) R
}

// ResultHandler will be called after each batch is done processing. This is
// called from the same goroutine as WaitForResults(). Use this if you don't
// want to think too much about parallelism.
type ResultHandler[R any] interface {
Run(result R, batchId int)
Run(result R)
}

// Config is required to call Start(), to start the MicroBatcher.
Expand Down
2 changes: 1 addition & 1 deletion exampleclient/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

type LettersBatchProcessor struct{}

func (LettersBatchProcessor) Run(batch []LetterEvent, _ int) TotalResult {
func (LettersBatchProcessor) Run(batch []LetterEvent) TotalResult {
mappings := map[string]int{"a": 1, "b": 2, "d": 4}
var sum int

Expand Down
6 changes: 3 additions & 3 deletions exampleclient/resulthandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (

type TotalResultHandler struct{}

func (TotalResultHandler) Run(totalResult TotalResult, batchId int) {
func (TotalResultHandler) Run(totalResult TotalResult) {
if totalResult.err != nil {
fmt.Printf("Batch %d failed with error: %s\n", batchId, totalResult.err)
fmt.Printf("Batch failed with error: %s\n", totalResult.err)
} else {
fmt.Printf("Batch %d total was %d\n", batchId, totalResult.total)
fmt.Printf("Batch total was %d\n", totalResult.total)
}
}
79 changes: 33 additions & 46 deletions microbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,17 @@ import (
)

type eventQueue[E any] struct {
events []E
mu sync.Mutex
}

type batchCompletedMessage[R any] struct {
batchId int
result R
slice []E
mu sync.Mutex
}

// MicroBatcher is the object that manages microbatching of an event stream.
type MicroBatcher[E any, R any] struct {
config Config[E, R]
eventQueue eventQueue[E]
stopChan chan bool
batchCompletedChan chan batchCompletedMessage[R]
lastBatchId int
sendMu sync.Mutex
batchWg sync.WaitGroup
config Config[E, R]
eventQueue eventQueue[E]
stopChan chan bool
resultChan chan R
batchWg sync.WaitGroup
}

// Start will create a MicroBatcher and start listening for events.
Expand All @@ -35,10 +28,10 @@ func Start[E any, R any](config Config[E, R]) *MicroBatcher[E, R] {
// Useful for testing.
func StartWithTicker[E any, R any](config Config[E, R], ticker Ticker) *MicroBatcher[E, R] {
mb := &MicroBatcher[E, R]{
config: config,
eventQueue: eventQueue[E]{},
stopChan: make(chan bool),
batchCompletedChan: make(chan batchCompletedMessage[R]),
config: config,
eventQueue: eventQueue[E]{},
stopChan: make(chan bool),
resultChan: make(chan R),
}

if ticker == nil {
Expand All @@ -50,30 +43,28 @@ func StartWithTicker[E any, R any](config Config[E, R], ticker Ticker) *MicroBat
}

// SubmitJob should be called to submit new Job objects to be batched.
func (mb *MicroBatcher[E, R]) SubmitJob(event E) error {
func (mb *MicroBatcher[E, R]) SubmitJob(event E) {
mb.eventQueue.mu.Lock()
defer mb.eventQueue.mu.Unlock()
mb.eventQueue.events = append(mb.eventQueue.events, event)
mb.eventQueue.slice = append(mb.eventQueue.slice, event)
maxReached := len(mb.eventQueue.slice) >= mb.config.MaxSize

if len(mb.eventQueue.events) >= mb.config.MaxSize {
mb.send()
if maxReached {
mb.send(false)
}

return nil
}

// WaitForResults should be called to send results to the ResultHandler.
// This will block until Stop() is called.
func (mb *MicroBatcher[E, R]) WaitForResults() {
for result := range mb.batchCompletedChan {
mb.config.ResultHandler.Run(result.result, result.batchId)
for result := range mb.resultChan {
mb.config.ResultHandler.Run(result)
}
}

// Stop can be called if we want to stop listening for events.
func (mb *MicroBatcher[E, R]) Stop() {
mb.stopChan <- true // stop the timer

}

func (mb *MicroBatcher[E, R]) listen(ticker Ticker) {
Expand All @@ -83,40 +74,36 @@ func (mb *MicroBatcher[E, R]) listen(ticker Ticker) {
for {
select {
case <-tickerChannel:
mb.send()
mb.send(true)
case <-mb.stopChan:
ticker.Stop() // stop the timer
mb.send() // send remaining events
mb.batchWg.Wait() // wait before closing the results channel
close(mb.batchCompletedChan) // this will unblock WaitForResults()
close(mb.stopChan) // no longer needed
ticker.Stop() // stop the timer
mb.send(true) // send remaining events
mb.batchWg.Wait() // wait before closing the results channel
close(mb.resultChan) // this will unblock WaitForResults()
close(mb.stopChan) // no longer needed
return
}
}
}()
}

// will send all events currently in the queue to the batch processor.
func (mb *MicroBatcher[E, R]) send() {
mb.sendMu.Lock()
defer mb.sendMu.Unlock()
func (mb *MicroBatcher[E, R]) send(lock bool) {
if lock {
mb.eventQueue.mu.Lock()
defer mb.eventQueue.mu.Unlock()
}

if len(mb.eventQueue.events) == 0 {
if len(mb.eventQueue.slice) == 0 {
return
}

batch := mb.eventQueue.events
mb.eventQueue.events = make([]E, 0)
newBatchId := mb.lastBatchId + 1
mb.lastBatchId = newBatchId
batch := mb.eventQueue.slice
mb.eventQueue.slice = make([]E, 0)
mb.batchWg.Add(1)

go func() {
defer mb.batchWg.Done()

mb.batchCompletedChan <- batchCompletedMessage[R]{
result: mb.config.BatchProcessor.Run(batch, newBatchId),
batchId: newBatchId,
}
mb.resultChan <- mb.config.BatchProcessor.Run(batch)
}()
}
9 changes: 2 additions & 7 deletions mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,13 @@ package microbatch

type FakeBatchProcessor struct {
calls [][]int
chans map[int]chan bool
}

func NewFakeBatchProcessor() *FakeBatchProcessor {
return &FakeBatchProcessor{calls: make([][]int, 0)}
}

func (fbp *FakeBatchProcessor) TellJobToFinish(batchId int) {
fbp.chans[batchId] <- true
}

func (fbp *FakeBatchProcessor) Run(batch []int, batchId int) string {
func (fbp *FakeBatchProcessor) Run(batch []int) string {
fbp.calls = append(fbp.calls, batch)
return "some result"
}
Expand All @@ -24,6 +19,6 @@ func NewFakeResultHandler() *FakeResultHandler {
return &FakeResultHandler{calls: make([]string, 0)}
}

func (frh *FakeResultHandler) Run(result string, eventId int) {
func (frh *FakeResultHandler) Run(result string) {
frh.calls = append(frh.calls, result)
}

0 comments on commit ba7363e

Please sign in to comment.