Skip to content

Commit

Permalink
chore: link files to datasets and remove them when dataset is dropped
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Nov 21, 2024
1 parent 220d5f4 commit 898a92c
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 59 deletions.
4 changes: 2 additions & 2 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ Archiver:
JobsDB:
jobDoneMigrateThres: 0.8
jobStatusMigrateThres: 5
maxDSSize: 100000
maxDSSize: 1
maxMigrateOnce: 10
maxMigrateDSProbe: 10
maxTableSizeInMB: 300
migrateDSLoopSleepDuration: 30s
migrateDSLoopSleepDuration: 3s
addNewDSLoopSleepDuration: 5s
refreshDSListLoopSleepDuration: 5s
backupCheckSleepDuration: 5s
Expand Down
72 changes: 36 additions & 36 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,46 +7,46 @@ services:
- build/docker.env
ports:
- "6432:5432"
backend:
build:
context: ./
dockerfile: Dockerfile
depends_on:
- db
- transformer
entrypoint: sh -c '/wait-for db:5432 -- ./rudder-server'
ports:
- "8080:8080"
env_file:
- build/docker.env
environment:
- JOBS_DB_HOST=db
# backend:
# build:
# context: ./
# dockerfile: Dockerfile
# depends_on:
# - db
# - transformer
# entrypoint: sh -c '/wait-for db:5432 -- ./rudder-server'
# ports:
# - "8080:8080"
# env_file:
# - build/docker.env
# environment:
# - JOBS_DB_HOST=db
# Uncomment the following lines to mount workspaceConfig file
# volumes:
# - <absolute_path_to_workspace_config>:/etc/rudderstack/workspaceConfig.json
transformer:
image: "rudderstack/rudder-transformer:latest"
ports:
- "9090:9090"
minio:
image: minio/minio
profiles:
- storage
ports:
- "9000:9000"
- "9001:9001"
environment:
- MINIO_ROOT_USER=root
- MINIO_ROOT_PASSWORD=password
command: server --console-address :9001 /data
etcd:
image: docker.io/bitnami/etcd:3
profiles:
- multi-tenant
environment:
- ALLOW_NONE_AUTHENTICATION=yes
- ETCD_NAME=etcd
- ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379
- ETCD_ADVERTISE_CLIENT_URLS=http://mode-provider:2379
ports:
- "2379:2379"
# minio:
# image: minio/minio
# profiles:
# - storage
# ports:
# - "9000:9000"
# - "9001:9001"
# environment:
# - MINIO_ROOT_USER=root
# - MINIO_ROOT_PASSWORD=password
# command: server --console-address :9001 /data
# etcd:
# image: docker.io/bitnami/etcd:3
# profiles:
# - multi-tenant
# environment:
# - ALLOW_NONE_AUTHENTICATION=yes
# - ETCD_NAME=etcd
# - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379
# - ETCD_ADVERTISE_CLIENT_URLS=http://mode-provider:2379
# ports:
# - "2379:2379"
8 changes: 6 additions & 2 deletions jobsdb/disk_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,19 @@ func WriteToFile(jobs []*JobT) (string, error) {
}
}
}()
buffer := bufio.NewWriter(file)
deferredFuncs = append(deferredFuncs, func() bool {
if err := buffer.Flush(); err != nil {
panic(err)
}
if err := file.Close(); err != nil {
return false
panic(err)
}
return true
})
offset := 0
for i := range jobs {
length, err := file.Write(jobs[i].EventPayload)
length, err := buffer.Write(jobs[i].EventPayload)
if err != nil {
return "", fmt.Errorf("write job payload to file - %s: %w", fileName, err)
}
Expand Down
30 changes: 30 additions & 0 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ type JobsDB interface {
JournalMarkDone(opID int64) error

IsMasterBackupEnabled() bool

CommitFileName(tx StoreSafeTx, fileName string) error
}

/*
Expand Down Expand Up @@ -825,6 +827,24 @@ func (jd *Handle) init() {

jd.workersAndAuxSetup()

err = jd.WithTx(func(tx *Tx) error {
_, err := tx.Exec(`CREATE TABLE IF NOT EXISTS dataset_files (
file_name TEXT NOT NULL,
jobs_table TEXT NOT NULL)
`)
if err != nil {
return fmt.Errorf("failed to create dataset_files table: %w", err)
}
_, err = tx.Exec(`CREATE INDEX IF NOT EXISTS dataset_files_index ON dataset_files (jobs_table)`)
if err != nil {
return fmt.Errorf("failed to create dataset_files_index index: %w", err)
}
return nil
})
if err != nil {
panic(fmt.Errorf("failed to setup dataset_files table: %w", err))
}

err = jd.WithTx(func(tx *Tx) error {
// only one migration should run at a time and block all other processes from adding or removing tables
return jd.withDistributedLock(context.Background(), tx, "schema_migrate", func() error {
Expand Down Expand Up @@ -2004,6 +2024,16 @@ func (jd *Handle) GetDistinctParameterValues(ctx context.Context, parameterName
return val, nil
}

func (jd *Handle) CommitFileName(tx StoreSafeTx, fileName string) error {
dsList := jd.getDSList()
ds := dsList[len(dsList)-1]
_, err := tx.SqlTx().Exec(fmt.Sprintf(`INSERT INTO dataset_files (file_name, jobs_table) VALUES ('%s', '%s')`, fileName, ds.JobTable))
if err != nil {
return fmt.Errorf("error commiting file name: %w", err)

Check failure on line 2032 in jobsdb/jobsdb.go

View workflow job for this annotation

GitHub Actions / lint

`commiting` is a misspelling of `committing` (misspell)
}
return nil
}

func (jd *Handle) doStoreJobsInTx(ctx context.Context, tx *Tx, ds dataSetT, jobList []*JobT) error {
store := func() error {
var stmt *sql.Stmt
Expand Down
42 changes: 36 additions & 6 deletions jobsdb/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"encoding/json"
"fmt"
"os"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -156,13 +157,17 @@ func (jd *Handle) doMigrateDS(ctx context.Context) error {
),
)
totalJobsMigrated += noJobsMigrated
} else {
jd.logger.Infon(
"[[ migrateDSLoop ]]: No jobs to migrate",
logger.NewStringField("from", source.ds.Index),
logger.NewStringField("to", destination.Index),
)
_, err = tx.Exec(`UPDATE dataset_files SET jobs_table = $1 WHERE jobs_table = $2`, destination.JobTable, source.ds.JobTable)
if err != nil {
return fmt.Errorf("failed to update dataset files: %w", err)
}
continue
}
jd.logger.Infon(
"[[ migrateDSLoop ]]: No jobs to migrate",
logger.NewStringField("from", source.ds.Index),
logger.NewStringField("to", destination.Index),
)
}
if err = jd.createDSIndicesInTx(ctx, tx, destination); err != nil {
return fmt.Errorf("create %v indices: %w", destination, err)
Expand All @@ -172,6 +177,31 @@ func (jd *Handle) doMigrateDS(ctx context.Context) error {
}
jd.logger.Infof("[[ migrateDSLoop ]]: Total migrated %d jobs", totalJobsMigrated)
}
for i := range migrateFrom {
source := migrateFrom[i].ds
if migrateFrom[i].numJobsPending == 0 {
rows, err := tx.QueryContext(ctx, `DELETE FROM dataset_files WHERE jobs_table = $1 returning file_name`, source.JobTable)
if err != nil {
return fmt.Errorf("failed to delete dataset files: %w", err)
}
for rows.Next() {
var fileName string
if err = rows.Scan(&fileName); err != nil {
return fmt.Errorf("failed to scan dataset files: %w", err)
}
jd.logger.Infon("[[ migrateDSLoop ]]: Deleting file", logger.NewStringField("file", fileName))
defer func(fileName string) {
_ = os.Remove(fileName)
}(fileName)
}
if rows.Err() != nil {
return fmt.Errorf("rows.Err - delete dataset files: %w", rows.Err())
}
if err := rows.Close(); err != nil {
return fmt.Errorf("failed to close rows: %w", err)
}
}
}

opPayload, err := json.Marshal(&journalOpPayloadT{From: migrateFromDatasets})
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions mocks/jobsdb/mock_jobsdb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 19 additions & 13 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2719,14 +2719,14 @@ func (proc *Handle) Store(partition string, in *storeMessage) {

statusList, destJobs, batchDestJobs := in.statusList, in.destJobs, in.batchDestJobs
beforeStoreStatus := time.Now()
batchDestJobsFile, err := jobsdb.WriteToFile(batchDestJobs)
if err != nil {
panic(err)
}
proc.logger.Infon("batch router jobs file", logger.NewStringField("file", batchDestJobsFile))
// XX: Need to do this in a transaction
if len(batchDestJobs) > 0 {
err := misc.RetryWithNotify(
batchDestJobsFile, err := jobsdb.WriteToFile(batchDestJobs)
if err != nil {
panic(err)
}
proc.logger.Infon("batch router jobs file", logger.NewStringField("file", batchDestJobsFile))
err = misc.RetryWithNotify(
context.Background(),
proc.jobsDBCommandTimeout.Load(),
proc.jobdDBMaxRetries.Load(),
Expand All @@ -2738,6 +2738,9 @@ func (proc *Handle) Store(partition string, in *storeMessage) {
if err != nil {
return fmt.Errorf("storing batch router jobs: %w", err)
}
if err := proc.batchRouterDB.CommitFileName(tx, batchDestJobsFile); err != nil {
return fmt.Errorf("committing batch router jobs file: %w", err)
}

// rsources stats
err = proc.updateRudderSourcesStats(ctx, tx, batchDestJobs)
Expand All @@ -2760,13 +2763,13 @@ func (proc *Handle) Store(partition string, in *storeMessage) {
)
}

destJobsFile, err := jobsdb.WriteToFile(destJobs)
if err != nil {
panic(err)
}
proc.logger.Infon("router jobs file", logger.NewStringField("file", destJobsFile))
if len(destJobs) > 0 {
func() {
destJobsFile, err := jobsdb.WriteToFile(destJobs)
if err != nil {
panic(err)
}
proc.logger.Infon("router jobs file", logger.NewStringField("file", destJobsFile))
// Only one goroutine can store to a router destination at a time, otherwise we may have different transactions
// committing at different timestamps which can cause events with lower jobIDs to appear after events with higher ones.
// For that purpose, before storing, we lock the relevant destination IDs (in sorted order to avoid deadlocks).
Expand All @@ -2787,7 +2790,7 @@ func (proc *Handle) Store(partition string, in *storeMessage) {
))
}

err := misc.RetryWithNotify(
err = misc.RetryWithNotify(
context.Background(),
proc.jobsDBCommandTimeout.Load(),
proc.jobdDBMaxRetries.Load(),
Expand All @@ -2799,6 +2802,9 @@ func (proc *Handle) Store(partition string, in *storeMessage) {
if err != nil {
return fmt.Errorf("storing router jobs: %w", err)
}
if err := proc.routerDB.CommitFileName(tx, destJobsFile); err != nil {
return fmt.Errorf("committing router jobs file: %w", err)
}

// rsources stats
err = proc.updateRudderSourcesStats(ctx, tx, destJobs)
Expand Down Expand Up @@ -2841,7 +2847,7 @@ func (proc *Handle) Store(partition string, in *storeMessage) {

txnStart := time.Now()
in.rsourcesStats.CollectStats(statusList)
err = misc.RetryWithNotify(context.Background(), proc.jobsDBCommandTimeout.Load(), proc.jobdDBMaxRetries.Load(), func(ctx context.Context) error {
err := misc.RetryWithNotify(context.Background(), proc.jobsDBCommandTimeout.Load(), proc.jobdDBMaxRetries.Load(), func(ctx context.Context) error {
return proc.gatewayDB.WithUpdateSafeTx(ctx, func(tx jobsdb.UpdateSafeTx) error {
err := proc.gatewayDB.UpdateJobStatusInTx(ctx, tx, statusList, []string{proc.config.GWCustomVal}, nil)
if err != nil {
Expand Down

0 comments on commit 898a92c

Please sign in to comment.