Skip to content

Commit

Permalink
chore: batchRouter reads from file
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Nov 21, 2024
1 parent b2f32b6 commit 220d5f4
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 9 deletions.
70 changes: 61 additions & 9 deletions jobsdb/disk_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync/atomic"
"time"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/segmentio/ksuid"
Expand All @@ -19,16 +20,66 @@ type diskManager struct {
fileReaderMap map[string]fileReaderChan
maxReadSem chan struct{}
maxWriteSem chan struct{}
closeTime time.Duration
log logger.Logger
}

func NewDiskManager() *diskManager {
return &diskManager{
func NewDiskManager(conf *config.Config) *diskManager {
d := &diskManager{
fileReaderMu: &sync.Mutex{},
fileReaderMap: make(map[string]fileReaderChan),
maxReadSem: make(chan struct{}, 50),
maxWriteSem: make(chan struct{}, 50),
maxReadSem: make(chan struct{}, config.GetInt("maxReadSem", 50)),
maxWriteSem: make(chan struct{}, config.GetInt("maxWriteSem", 50)),
closeTime: config.GetDuration("closeTime", 5, time.Second),
}
go d.readRoutine()
return d
}

func (d *diskManager) readRoutine() {
ticker := time.NewTicker(10 * d.closeTime)
defer ticker.Stop()
for {
select {
case <-ticker.C:
d.fileReaderMu.Lock()
newMap := make(map[string]fileReaderChan)
for fileName, fileReadChan := range d.fileReaderMap {
if fileReadChan.closed.Load() {
close(fileReadChan.readerChan)
delete(d.fileReaderMap, fileName)
continue
}
newMap[fileName] = fileReadChan
}
d.fileReaderMap = newMap
d.fileReaderMu.Unlock()
}
}
}

func (d *diskManager) Read(fileName string, length, offset int) ([]byte, error) {
d.fileReaderMu.Lock()
if _, ok := fileReaderMap[fileName]; !ok {
fileReadChan := fileReaderChan{readerChan: make(chan readRequest), closed: &atomic.Bool{}}
fileReaderMap[fileName] = fileReadChan
go readFromFile(fileName, fileReadChan, d.closeTime)
} else {
if fileReaderMap[fileName].closed.Load() {
fileReaderMap[fileName].closed.Store(false)
go readFromFile(fileName, fileReaderMap[fileName], d.closeTime)
}
}
readRequestChan := fileReaderMap[fileName].readerChan
d.fileReaderMu.Unlock()
responseChan := make(chan payloadOrError)
defer close(responseChan)
readRequestChan <- readRequest{offset: offset, length: length, response: responseChan}
response := <-responseChan
if response.err != nil {
return nil, response.err
}
return response.payload, nil
}

func (d *diskManager) WriteToFile(jobs []*JobT) (string, error) {
Expand Down Expand Up @@ -177,12 +228,12 @@ func ConcurrentReadFromFile(fileName string, offset, length int) ([]byte, error)
if _, ok := fileReaderMap[fileName]; !ok {
fileReadChan := fileReaderChan{readerChan: make(chan readRequest), closed: &atomic.Bool{}}
fileReaderMap[fileName] = fileReadChan
go readFromFile(fileName, fileReadChan)
go readFromFile(fileName, fileReadChan, 5*time.Second)
} else {
if fileReaderMap[fileName].closed.Load() {
fileReadChan := fileReaderChan{readerChan: make(chan readRequest), closed: &atomic.Bool{}}
fileReaderMap[fileName] = fileReadChan
go readFromFile(fileName, fileReadChan)
go readFromFile(fileName, fileReadChan, 5*time.Second)
}
}
readRequestChan := fileReaderMap[fileName].readerChan
Expand All @@ -197,8 +248,9 @@ func ConcurrentReadFromFile(fileName string, offset, length int) ([]byte, error)
return response.payload, nil
}

func readFromFile(fileName string, fileReadChan fileReaderChan) {
closeTimer := time.NewTicker(5 * time.Second)
func readFromFile(fileName string, fileReadChan fileReaderChan, closeTime time.Duration) {
defer fileReadChan.closed.Store(true)
closeTimer := time.NewTicker(closeTime)
defer closeTimer.Stop()
file, err := os.Open(fileName)
if err != nil {
Expand All @@ -215,7 +267,7 @@ func readFromFile(fileName string, fileReadChan fileReaderChan) {
return
}
request.response <- payloadOrError{payload: payload, err: err}
closeTimer.Reset(5 * time.Second)
closeTimer.Reset(closeTime)
case <-closeTimer.C:
return
}
Expand Down
73 changes: 73 additions & 0 deletions jobsdb/disk_reader_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package jobsdb

import (
"context"
"fmt"
"os"
"sync"
"time"
)

func newDiskReadWorker(partition string, readChan chan readRequest) *worker {
w := &worker{
partition: partition,
readChan: readChan,
}
w.lifecycle.ctx, w.lifecycle.cancel = context.WithCancel(context.Background())
w.start()
return w

}

type worker struct {
partition string
file *os.File

lifecycle struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}

readChan chan readRequest
}

func (w *worker) start() {
fileName := w.partition
file, err := os.Open(fileName)
if err != nil {
panic(fmt.Errorf("open file - %s: %w", fileName, err))
}
w.file = file
w.lifecycle.wg.Add(1)
go func() {
defer w.lifecycle.wg.Done()
<-w.lifecycle.ctx.Done()
if err := file.Close(); err != nil {
panic(fmt.Errorf("close file - %s: %w", fileName, err))
}
}()
}

func (w *worker) Work() bool {
readReq := <-w.readChan
offset, length := readReq.offset, readReq.length
response := readReq.response
payload := make([]byte, length)
_, err := w.file.ReadAt(payload, int64(offset))
if err != nil {
response <- payloadOrError{err: fmt.Errorf("read file - %s - %d: %w", w.partition, offset, err)}
return false
}
response <- payloadOrError{payload: payload}
return true
}

func (w *worker) SleepDurations() (min, max time.Duration) {
return 0, 0 // set workerpool idle timeout to 5 seconds - that is more relevant here
}

func (w *worker) Stop() {
w.lifecycle.cancel()
w.lifecycle.wg.Wait()
}
5 changes: 5 additions & 0 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2719,6 +2719,11 @@ func (proc *Handle) Store(partition string, in *storeMessage) {

statusList, destJobs, batchDestJobs := in.statusList, in.destJobs, in.batchDestJobs
beforeStoreStatus := time.Now()
batchDestJobsFile, err := jobsdb.WriteToFile(batchDestJobs)
if err != nil {
panic(err)
}
proc.logger.Infon("batch router jobs file", logger.NewStringField("file", batchDestJobsFile))
// XX: Need to do this in a transaction
if len(batchDestJobs) > 0 {
err := misc.RetryWithNotify(
Expand Down
15 changes: 15 additions & 0 deletions router/batchrouter/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,21 @@ func (brt *Handle) getWorkerJobs(partition string) (workerJobs []*DestinationJob
panic(err)
}
jobs = toProcess.Jobs
for i := range jobs {
var params routerutils.JobParameters
err := json.Unmarshal(jobs[i].Parameters, &params)
if err != nil {
panic(fmt.Errorf("BRT: %s: Error unmarshalling job parameters: %v", brt.destType, err))
}
fileName := params.FileName
offset := params.Offset
length := params.Length
payload, err := jobsdb.ConcurrentReadFromFile(fileName, offset, length)
if err != nil {
panic(fmt.Errorf("BRT: %s: Error reading job payload from file: %v", brt.destType, err))
}
jobs[i].EventPayload = payload
}
limitsReached = toProcess.LimitsReached

brtQueryStat.Since(queryStart)
Expand Down

0 comments on commit 220d5f4

Please sign in to comment.