diff --git a/jobsdb/disk_manager.go b/jobsdb/disk_manager.go index 54163f8034..0a30bb4286 100644 --- a/jobsdb/disk_manager.go +++ b/jobsdb/disk_manager.go @@ -9,6 +9,7 @@ import ( "sync/atomic" "time" + "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-server/utils/misc" "github.com/segmentio/ksuid" @@ -19,16 +20,66 @@ type diskManager struct { fileReaderMap map[string]fileReaderChan maxReadSem chan struct{} maxWriteSem chan struct{} + closeTime time.Duration log logger.Logger } -func NewDiskManager() *diskManager { - return &diskManager{ +func NewDiskManager(conf *config.Config) *diskManager { + d := &diskManager{ fileReaderMu: &sync.Mutex{}, fileReaderMap: make(map[string]fileReaderChan), - maxReadSem: make(chan struct{}, 50), - maxWriteSem: make(chan struct{}, 50), + maxReadSem: make(chan struct{}, config.GetInt("maxReadSem", 50)), + maxWriteSem: make(chan struct{}, config.GetInt("maxWriteSem", 50)), + closeTime: config.GetDuration("closeTime", 5, time.Second), } + go d.readRoutine() + return d +} + +func (d *diskManager) readRoutine() { + ticker := time.NewTicker(10 * d.closeTime) + defer ticker.Stop() + for { + select { + case <-ticker.C: + d.fileReaderMu.Lock() + newMap := make(map[string]fileReaderChan) + for fileName, fileReadChan := range d.fileReaderMap { + if fileReadChan.closed.Load() { + close(fileReadChan.readerChan) + delete(d.fileReaderMap, fileName) + continue + } + newMap[fileName] = fileReadChan + } + d.fileReaderMap = newMap + d.fileReaderMu.Unlock() + } + } +} + +func (d *diskManager) Read(fileName string, length, offset int) ([]byte, error) { + d.fileReaderMu.Lock() + if _, ok := fileReaderMap[fileName]; !ok { + fileReadChan := fileReaderChan{readerChan: make(chan readRequest), closed: &atomic.Bool{}} + fileReaderMap[fileName] = fileReadChan + go readFromFile(fileName, fileReadChan, d.closeTime) + } else { + if fileReaderMap[fileName].closed.Load() { + fileReaderMap[fileName].closed.Store(false) + go readFromFile(fileName, fileReaderMap[fileName], d.closeTime) + } + } + readRequestChan := fileReaderMap[fileName].readerChan + d.fileReaderMu.Unlock() + responseChan := make(chan payloadOrError) + defer close(responseChan) + readRequestChan <- readRequest{offset: offset, length: length, response: responseChan} + response := <-responseChan + if response.err != nil { + return nil, response.err + } + return response.payload, nil } func (d *diskManager) WriteToFile(jobs []*JobT) (string, error) { @@ -177,12 +228,12 @@ func ConcurrentReadFromFile(fileName string, offset, length int) ([]byte, error) if _, ok := fileReaderMap[fileName]; !ok { fileReadChan := fileReaderChan{readerChan: make(chan readRequest), closed: &atomic.Bool{}} fileReaderMap[fileName] = fileReadChan - go readFromFile(fileName, fileReadChan) + go readFromFile(fileName, fileReadChan, 5*time.Second) } else { if fileReaderMap[fileName].closed.Load() { fileReadChan := fileReaderChan{readerChan: make(chan readRequest), closed: &atomic.Bool{}} fileReaderMap[fileName] = fileReadChan - go readFromFile(fileName, fileReadChan) + go readFromFile(fileName, fileReadChan, 5*time.Second) } } readRequestChan := fileReaderMap[fileName].readerChan @@ -197,8 +248,9 @@ func ConcurrentReadFromFile(fileName string, offset, length int) ([]byte, error) return response.payload, nil } -func readFromFile(fileName string, fileReadChan fileReaderChan) { - closeTimer := time.NewTicker(5 * time.Second) +func readFromFile(fileName string, fileReadChan fileReaderChan, closeTime time.Duration) { + defer fileReadChan.closed.Store(true) + closeTimer := time.NewTicker(closeTime) defer closeTimer.Stop() file, err := os.Open(fileName) if err != nil { @@ -215,7 +267,7 @@ func readFromFile(fileName string, fileReadChan fileReaderChan) { return } request.response <- payloadOrError{payload: payload, err: err} - closeTimer.Reset(5 * time.Second) + closeTimer.Reset(closeTime) case <-closeTimer.C: return } diff --git a/jobsdb/disk_reader_worker.go b/jobsdb/disk_reader_worker.go new file mode 100644 index 0000000000..bf1387873f --- /dev/null +++ b/jobsdb/disk_reader_worker.go @@ -0,0 +1,73 @@ +package jobsdb + +import ( + "context" + "fmt" + "os" + "sync" + "time" +) + +func newDiskReadWorker(partition string, readChan chan readRequest) *worker { + w := &worker{ + partition: partition, + readChan: readChan, + } + w.lifecycle.ctx, w.lifecycle.cancel = context.WithCancel(context.Background()) + w.start() + return w + +} + +type worker struct { + partition string + file *os.File + + lifecycle struct { + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + } + + readChan chan readRequest +} + +func (w *worker) start() { + fileName := w.partition + file, err := os.Open(fileName) + if err != nil { + panic(fmt.Errorf("open file - %s: %w", fileName, err)) + } + w.file = file + w.lifecycle.wg.Add(1) + go func() { + defer w.lifecycle.wg.Done() + <-w.lifecycle.ctx.Done() + if err := file.Close(); err != nil { + panic(fmt.Errorf("close file - %s: %w", fileName, err)) + } + }() +} + +func (w *worker) Work() bool { + readReq := <-w.readChan + offset, length := readReq.offset, readReq.length + response := readReq.response + payload := make([]byte, length) + _, err := w.file.ReadAt(payload, int64(offset)) + if err != nil { + response <- payloadOrError{err: fmt.Errorf("read file - %s - %d: %w", w.partition, offset, err)} + return false + } + response <- payloadOrError{payload: payload} + return true +} + +func (w *worker) SleepDurations() (min, max time.Duration) { + return 0, 0 // set workerpool idle timeout to 5 seconds - that is more relevant here +} + +func (w *worker) Stop() { + w.lifecycle.cancel() + w.lifecycle.wg.Wait() +} diff --git a/processor/processor.go b/processor/processor.go index d8e62ea277..aecc560596 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -2719,6 +2719,11 @@ func (proc *Handle) Store(partition string, in *storeMessage) { statusList, destJobs, batchDestJobs := in.statusList, in.destJobs, in.batchDestJobs beforeStoreStatus := time.Now() + batchDestJobsFile, err := jobsdb.WriteToFile(batchDestJobs) + if err != nil { + panic(err) + } + proc.logger.Infon("batch router jobs file", logger.NewStringField("file", batchDestJobsFile)) // XX: Need to do this in a transaction if len(batchDestJobs) > 0 { err := misc.RetryWithNotify( diff --git a/router/batchrouter/handle.go b/router/batchrouter/handle.go index d3feb21656..9f3e501073 100644 --- a/router/batchrouter/handle.go +++ b/router/batchrouter/handle.go @@ -213,6 +213,21 @@ func (brt *Handle) getWorkerJobs(partition string) (workerJobs []*DestinationJob panic(err) } jobs = toProcess.Jobs + for i := range jobs { + var params routerutils.JobParameters + err := json.Unmarshal(jobs[i].Parameters, ¶ms) + if err != nil { + panic(fmt.Errorf("BRT: %s: Error unmarshalling job parameters: %v", brt.destType, err)) + } + fileName := params.FileName + offset := params.Offset + length := params.Length + payload, err := jobsdb.ConcurrentReadFromFile(fileName, offset, length) + if err != nil { + panic(fmt.Errorf("BRT: %s: Error reading job payload from file: %v", brt.destType, err)) + } + jobs[i].EventPayload = payload + } limitsReached = toProcess.LimitsReached brtQueryStat.Since(queryStart)