diff --git a/config/config.yaml b/config/config.yaml index ccc45785e0..234b4b65cf 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -64,11 +64,11 @@ Archiver: JobsDB: jobDoneMigrateThres: 0.8 jobStatusMigrateThres: 5 - maxDSSize: 100000 + maxDSSize: 1 maxMigrateOnce: 10 maxMigrateDSProbe: 10 maxTableSizeInMB: 300 - migrateDSLoopSleepDuration: 30s + migrateDSLoopSleepDuration: 3s addNewDSLoopSleepDuration: 5s refreshDSListLoopSleepDuration: 5s backupCheckSleepDuration: 5s diff --git a/docker-compose.yml b/docker-compose.yml index 157caa9913..c4ee97aec8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,20 +7,20 @@ services: - build/docker.env ports: - "6432:5432" - backend: - build: - context: ./ - dockerfile: Dockerfile - depends_on: - - db - - transformer - entrypoint: sh -c '/wait-for db:5432 -- ./rudder-server' - ports: - - "8080:8080" - env_file: - - build/docker.env - environment: - - JOBS_DB_HOST=db + # backend: + # build: + # context: ./ + # dockerfile: Dockerfile + # depends_on: + # - db + # - transformer + # entrypoint: sh -c '/wait-for db:5432 -- ./rudder-server' + # ports: + # - "8080:8080" + # env_file: + # - build/docker.env + # environment: + # - JOBS_DB_HOST=db # Uncomment the following lines to mount workspaceConfig file # volumes: # - :/etc/rudderstack/workspaceConfig.json @@ -28,25 +28,25 @@ services: image: "rudderstack/rudder-transformer:latest" ports: - "9090:9090" - minio: - image: minio/minio - profiles: - - storage - ports: - - "9000:9000" - - "9001:9001" - environment: - - MINIO_ROOT_USER=root - - MINIO_ROOT_PASSWORD=password - command: server --console-address :9001 /data - etcd: - image: docker.io/bitnami/etcd:3 - profiles: - - multi-tenant - environment: - - ALLOW_NONE_AUTHENTICATION=yes - - ETCD_NAME=etcd - - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 - - ETCD_ADVERTISE_CLIENT_URLS=http://mode-provider:2379 - ports: - - "2379:2379" + # minio: + # image: minio/minio + # profiles: + # - storage + # ports: + # - "9000:9000" + # - "9001:9001" + # environment: + # - MINIO_ROOT_USER=root + # - MINIO_ROOT_PASSWORD=password + # command: server --console-address :9001 /data + # etcd: + # image: docker.io/bitnami/etcd:3 + # profiles: + # - multi-tenant + # environment: + # - ALLOW_NONE_AUTHENTICATION=yes + # - ETCD_NAME=etcd + # - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 + # - ETCD_ADVERTISE_CLIENT_URLS=http://mode-provider:2379 + # ports: + # - "2379:2379" diff --git a/jobsdb/disk_manager.go b/jobsdb/disk_manager.go index 0a30bb4286..c63401a72c 100644 --- a/jobsdb/disk_manager.go +++ b/jobsdb/disk_manager.go @@ -177,15 +177,19 @@ func WriteToFile(jobs []*JobT) (string, error) { } } }() + buffer := bufio.NewWriter(file) deferredFuncs = append(deferredFuncs, func() bool { + if err := buffer.Flush(); err != nil { + panic(err) + } if err := file.Close(); err != nil { - return false + panic(err) } return true }) offset := 0 for i := range jobs { - length, err := file.Write(jobs[i].EventPayload) + length, err := buffer.Write(jobs[i].EventPayload) if err != nil { return "", fmt.Errorf("write job payload to file - %s: %w", fileName, err) } diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index a27358c841..d01aa226ca 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -319,6 +319,8 @@ type JobsDB interface { JournalMarkDone(opID int64) error IsMasterBackupEnabled() bool + + CommitFileName(tx StoreSafeTx, fileName string) error } /* @@ -825,6 +827,24 @@ func (jd *Handle) init() { jd.workersAndAuxSetup() + err = jd.WithTx(func(tx *Tx) error { + _, err := tx.Exec(`CREATE TABLE IF NOT EXISTS dataset_files ( + file_name TEXT NOT NULL, + jobs_table TEXT NOT NULL) + `) + if err != nil { + return fmt.Errorf("failed to create dataset_files table: %w", err) + } + _, err = tx.Exec(`CREATE INDEX IF NOT EXISTS dataset_files_index ON dataset_files (jobs_table)`) + if err != nil { + return fmt.Errorf("failed to create dataset_files_index index: %w", err) + } + return nil + }) + if err != nil { + panic(fmt.Errorf("failed to setup dataset_files table: %w", err)) + } + err = jd.WithTx(func(tx *Tx) error { // only one migration should run at a time and block all other processes from adding or removing tables return jd.withDistributedLock(context.Background(), tx, "schema_migrate", func() error { @@ -2004,6 +2024,16 @@ func (jd *Handle) GetDistinctParameterValues(ctx context.Context, parameterName return val, nil } +func (jd *Handle) CommitFileName(tx StoreSafeTx, fileName string) error { + dsList := jd.getDSList() + ds := dsList[len(dsList)-1] + _, err := tx.SqlTx().Exec(fmt.Sprintf(`INSERT INTO dataset_files (file_name, jobs_table) VALUES ('%s', '%s')`, fileName, ds.JobTable)) + if err != nil { + return fmt.Errorf("error commiting file name: %w", err) + } + return nil +} + func (jd *Handle) doStoreJobsInTx(ctx context.Context, tx *Tx, ds dataSetT, jobList []*JobT) error { store := func() error { var stmt *sql.Stmt diff --git a/jobsdb/migration.go b/jobsdb/migration.go index 37594d6175..702475232e 100644 --- a/jobsdb/migration.go +++ b/jobsdb/migration.go @@ -5,6 +5,7 @@ import ( "database/sql" "encoding/json" "fmt" + "os" "strconv" "strings" "time" @@ -156,13 +157,17 @@ func (jd *Handle) doMigrateDS(ctx context.Context) error { ), ) totalJobsMigrated += noJobsMigrated - } else { - jd.logger.Infon( - "[[ migrateDSLoop ]]: No jobs to migrate", - logger.NewStringField("from", source.ds.Index), - logger.NewStringField("to", destination.Index), - ) + _, err = tx.Exec(`UPDATE dataset_files SET jobs_table = $1 WHERE jobs_table = $2`, destination.JobTable, source.ds.JobTable) + if err != nil { + return fmt.Errorf("failed to update dataset files: %w", err) + } + continue } + jd.logger.Infon( + "[[ migrateDSLoop ]]: No jobs to migrate", + logger.NewStringField("from", source.ds.Index), + logger.NewStringField("to", destination.Index), + ) } if err = jd.createDSIndicesInTx(ctx, tx, destination); err != nil { return fmt.Errorf("create %v indices: %w", destination, err) @@ -172,6 +177,31 @@ func (jd *Handle) doMigrateDS(ctx context.Context) error { } jd.logger.Infof("[[ migrateDSLoop ]]: Total migrated %d jobs", totalJobsMigrated) } + for i := range migrateFrom { + source := migrateFrom[i].ds + if migrateFrom[i].numJobsPending == 0 { + rows, err := tx.QueryContext(ctx, `DELETE FROM dataset_files WHERE jobs_table = $1 returning file_name`, source.JobTable) + if err != nil { + return fmt.Errorf("failed to delete dataset files: %w", err) + } + for rows.Next() { + var fileName string + if err = rows.Scan(&fileName); err != nil { + return fmt.Errorf("failed to scan dataset files: %w", err) + } + jd.logger.Infon("[[ migrateDSLoop ]]: Deleting file", logger.NewStringField("file", fileName)) + defer func(fileName string) { + _ = os.Remove(fileName) + }(fileName) + } + if rows.Err() != nil { + return fmt.Errorf("rows.Err - delete dataset files: %w", rows.Err()) + } + if err := rows.Close(); err != nil { + return fmt.Errorf("failed to close rows: %w", err) + } + } + } opPayload, err := json.Marshal(&journalOpPayloadT{From: migrateFromDatasets}) if err != nil { diff --git a/mocks/jobsdb/mock_jobsdb.go b/mocks/jobsdb/mock_jobsdb.go index 8394fb6d0e..475caba625 100644 --- a/mocks/jobsdb/mock_jobsdb.go +++ b/mocks/jobsdb/mock_jobsdb.go @@ -44,6 +44,20 @@ func (m *MockJobsDB) EXPECT() *MockJobsDBMockRecorder { return m.recorder } +// CommitFileName mocks base method. +func (m *MockJobsDB) CommitFileName(tx jobsdb.StoreSafeTx, fileName string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CommitFileName", tx, fileName) + ret0, _ := ret[0].(error) + return ret0 +} + +// CommitFileName indicates an expected call of CommitFileName. +func (mr *MockJobsDBMockRecorder) CommitFileName(tx, fileName any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CommitFileName", reflect.TypeOf((*MockJobsDB)(nil).CommitFileName), tx, fileName) +} + // DeleteExecuting mocks base method. func (m *MockJobsDB) DeleteExecuting() { m.ctrl.T.Helper() diff --git a/processor/processor.go b/processor/processor.go index aecc560596..a54e8616a0 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -2719,14 +2719,14 @@ func (proc *Handle) Store(partition string, in *storeMessage) { statusList, destJobs, batchDestJobs := in.statusList, in.destJobs, in.batchDestJobs beforeStoreStatus := time.Now() - batchDestJobsFile, err := jobsdb.WriteToFile(batchDestJobs) - if err != nil { - panic(err) - } - proc.logger.Infon("batch router jobs file", logger.NewStringField("file", batchDestJobsFile)) // XX: Need to do this in a transaction if len(batchDestJobs) > 0 { - err := misc.RetryWithNotify( + batchDestJobsFile, err := jobsdb.WriteToFile(batchDestJobs) + if err != nil { + panic(err) + } + proc.logger.Infon("batch router jobs file", logger.NewStringField("file", batchDestJobsFile)) + err = misc.RetryWithNotify( context.Background(), proc.jobsDBCommandTimeout.Load(), proc.jobdDBMaxRetries.Load(), @@ -2738,6 +2738,9 @@ func (proc *Handle) Store(partition string, in *storeMessage) { if err != nil { return fmt.Errorf("storing batch router jobs: %w", err) } + if err := proc.batchRouterDB.CommitFileName(tx, batchDestJobsFile); err != nil { + return fmt.Errorf("committing batch router jobs file: %w", err) + } // rsources stats err = proc.updateRudderSourcesStats(ctx, tx, batchDestJobs) @@ -2760,13 +2763,13 @@ func (proc *Handle) Store(partition string, in *storeMessage) { ) } - destJobsFile, err := jobsdb.WriteToFile(destJobs) - if err != nil { - panic(err) - } - proc.logger.Infon("router jobs file", logger.NewStringField("file", destJobsFile)) if len(destJobs) > 0 { func() { + destJobsFile, err := jobsdb.WriteToFile(destJobs) + if err != nil { + panic(err) + } + proc.logger.Infon("router jobs file", logger.NewStringField("file", destJobsFile)) // Only one goroutine can store to a router destination at a time, otherwise we may have different transactions // committing at different timestamps which can cause events with lower jobIDs to appear after events with higher ones. // For that purpose, before storing, we lock the relevant destination IDs (in sorted order to avoid deadlocks). @@ -2787,7 +2790,7 @@ func (proc *Handle) Store(partition string, in *storeMessage) { )) } - err := misc.RetryWithNotify( + err = misc.RetryWithNotify( context.Background(), proc.jobsDBCommandTimeout.Load(), proc.jobdDBMaxRetries.Load(), @@ -2799,6 +2802,9 @@ func (proc *Handle) Store(partition string, in *storeMessage) { if err != nil { return fmt.Errorf("storing router jobs: %w", err) } + if err := proc.routerDB.CommitFileName(tx, destJobsFile); err != nil { + return fmt.Errorf("committing router jobs file: %w", err) + } // rsources stats err = proc.updateRudderSourcesStats(ctx, tx, destJobs) @@ -2841,7 +2847,7 @@ func (proc *Handle) Store(partition string, in *storeMessage) { txnStart := time.Now() in.rsourcesStats.CollectStats(statusList) - err = misc.RetryWithNotify(context.Background(), proc.jobsDBCommandTimeout.Load(), proc.jobdDBMaxRetries.Load(), func(ctx context.Context) error { + err := misc.RetryWithNotify(context.Background(), proc.jobsDBCommandTimeout.Load(), proc.jobdDBMaxRetries.Load(), func(ctx context.Context) error { return proc.gatewayDB.WithUpdateSafeTx(ctx, func(tx jobsdb.UpdateSafeTx) error { err := proc.gatewayDB.UpdateJobStatusInTx(ctx, tx, statusList, []string{proc.config.GWCustomVal}, nil) if err != nil {