diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index 2d728509e7..0035cf86f7 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -73,6 +73,15 @@ const ( pgErrorCodeTableReadonly = "RS001" ) +type payloadColumnType int + +const ( + JSONB payloadColumnType = iota + BYTEA + TEXT + // JSON // Explore afterwards? +) + // QueryConditions holds jobsdb query conditions type QueryConditions struct { // if IgnoreCustomValFiltersInQuery is true, CustomValFilters is not going to be used @@ -499,6 +508,7 @@ type Handle struct { config *config.Config conf struct { + payloadColumnType payloadColumnType maxTableSize config.ValueLoader[int64] cacheExpiration config.ValueLoader[time.Duration] addNewDSLoopSleepDuration config.ValueLoader[time.Duration] @@ -702,6 +712,18 @@ func WithStats(s stats.Stats) OptsFunc { } } +func WithBinaryPayload() OptsFunc { + return func(jd *Handle) { + jd.conf.payloadColumnType = 1 + } +} + +func WithTextPayload() OptsFunc { + return func(jd *Handle) { + jd.conf.payloadColumnType = 2 + } +} + func WithSkipMaintenanceErr(ignore bool) OptsFunc { return func(jd *Handle) { jd.conf.skipMaintenanceError = ignore @@ -770,6 +792,8 @@ func (jd *Handle) init() { jd.config = config.Default } + jd.conf.payloadColumnType = payloadColumnType(jd.config.GetIntVar(0, 1, jd.tablePrefix+"."+string(jd.ownerType)+".payloadColumnType", jd.tablePrefix+".payloadColumnType")) + if jd.stats == nil { jd.stats = stats.Default } @@ -1429,6 +1453,15 @@ func (jd *Handle) createDSInTx(tx *Tx, newDS dataSetT) error { } func (jd *Handle) createDSTablesInTx(ctx context.Context, tx *Tx, newDS dataSetT) error { + var payloadColumnType string + switch jd.conf.payloadColumnType { + case JSONB: + payloadColumnType = "JSONB" + case BYTEA: + payloadColumnType = "BYTEA" + case TEXT: + payloadColumnType = "TEXT" + } if _, err := tx.ExecContext(ctx, fmt.Sprintf(`CREATE TABLE %q ( job_id BIGSERIAL PRIMARY KEY, workspace_id TEXT NOT NULL DEFAULT '', @@ -1436,7 +1469,7 @@ func (jd *Handle) createDSTablesInTx(ctx context.Context, tx *Tx, newDS dataSetT user_id TEXT NOT NULL, parameters JSONB NOT NULL, custom_val VARCHAR(64) NOT NULL, - event_payload JSONB NOT NULL, + event_payload `+payloadColumnType+` NOT NULL, event_count INTEGER NOT NULL DEFAULT 1, created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), expire_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW());`, newDS.JobTable)); err != nil { @@ -2215,6 +2248,7 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, param resultsetStates := map[string]struct{}{} for rows.Next() { var job JobT + var payload Payload var jsState sql.NullString var jsAttemptNum sql.NullInt64 var jsExecTime sql.NullTime @@ -2223,13 +2257,14 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, param var jsErrorResponse []byte var jsParameters []byte err := rows.Scan(&job.JobID, &job.UUID, &job.UserID, &job.Parameters, &job.CustomVal, - &job.EventPayload, &job.EventCount, &job.CreatedAt, &job.ExpireAt, &job.WorkspaceId, &job.PayloadSize, &runningEventCount, &runningPayloadSize, + &payload, &job.EventCount, &job.CreatedAt, &job.ExpireAt, &job.WorkspaceId, &job.PayloadSize, &runningEventCount, &runningPayloadSize, &jsState, &jsAttemptNum, &jsExecTime, &jsRetryTime, &jsErrorCode, &jsErrorResponse, &jsParameters) if err != nil { return JobsResult{}, false, err } + job.EventPayload = payload.PayloadBytes() if jsState.Valid { resultsetStates[jsState.String] = struct{}{} job.LastJobStatus.JobState = jsState.String @@ -2289,6 +2324,32 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, param }, true, nil } +type Payload struct { + S string + B []byte +} + +func (p *Payload) PayloadBytes() []byte { + if p.B != nil { + return p.B + } + return []byte(p.S) +} + +func (p *Payload) Scan(src interface{}) error { + b, ok := src.([]byte) + if !ok { + s, ok := src.(string) + if !ok { + return errors.New("neither string nor bytes") + } + p.S = s + return nil + } + p.B = b + return nil +} + func (jd *Handle) updateJobStatusDSInTx(ctx context.Context, tx *Tx, ds dataSetT, statusList []*JobStatusT, tags statTags) (updatedStates map[string]map[string]map[ParameterFilterT]struct{}, err error) { if len(statusList) == 0 { return diff --git a/jobsdb/migration.go b/jobsdb/migration.go index 37594d6175..ea3c8bf825 100644 --- a/jobsdb/migration.go +++ b/jobsdb/migration.go @@ -459,18 +459,86 @@ func (jd *Handle) getMigrationList(dsList []dataSetT) (migrateFrom []dsWithPendi return } +func getColumnConversion(srcType, destType string) string { + if srcType == destType { + return "j.event_payload" + } + switch srcType { + case "jsonb": + switch destType { + case "text": + return "j.event_payload::TEXT" + case "bytea": + return "convert_to(j.event_payload::TEXT, 'UTF8')" + default: + panic(fmt.Sprintf("unsupported payload column types: src-%s, dest-%s", srcType, destType)) + } + case "bytea": + switch destType { + case "text": + return "convert_from(j.event_payload, 'UTF8')" + case "jsonb": + return "convert_from(j.event_payload, 'UTF8')::jsonb" + default: + panic(fmt.Sprintf("unsupported payload column types: src-%s, dest-%s", srcType, destType)) + } + case "text": + switch destType { + case "jsonb": + return "j.event_payload::jsonb" + case "bytea": + return "convert_to(j.event_payload, 'UTF8')" + default: + panic(fmt.Sprintf("unsupported payload column types: src-%s, dest-%s", srcType, destType)) + } + default: + panic(fmt.Sprintf("unsupported payload column types: src-%s, dest-%s", srcType, destType)) + } +} + func (jd *Handle) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS dataSetT) (int, error) { defer jd.getTimerStat( "migration_jobs", &statTags{CustomValFilters: []string{jd.tablePrefix}}, ).RecordDuration()() + columnTypeMap := map[string]string{srcDS.JobTable: "jsonb", destDS.JobTable: "jsonb"} + // find colummn types first - to differentiate between `bytea` and `jsonb` + rows, err := tx.QueryContext( + ctx, + fmt.Sprintf( + `select table_name, data_type + from information_schema.columns + where table_name IN ('%[1]s', '%[2]s') and column_name='event_payload';`, + srcDS.JobTable, destDS.JobTable, + ), + ) + if err != nil { + return 0, fmt.Errorf("failed to get column types: %w", err) + } + defer rows.Close() + var jobsTable, columnType string + for rows.Next() { + if err = rows.Scan(&jobsTable, &columnType); err != nil { + return 0, fmt.Errorf("failed to scan column types: %w", err) + } + if columnType != "bytea" && columnType != "jsonb" && columnType != "text" { + return 0, fmt.Errorf("unsupported column type %s", columnType) + } + columnTypeMap[jobsTable] = columnType + } + if err = rows.Err(); err != nil { + return 0, fmt.Errorf("rows.Err() on column types: %w", err) + } + payloadLiteral := getColumnConversion(columnTypeMap[srcDS.JobTable], columnTypeMap[destDS.JobTable]) + jd.logger.Info(payloadLiteral) + compactDSQuery := fmt.Sprintf( `with last_status as (select * from "v_last_%[1]s"), inserted_jobs as ( insert into %[3]q (job_id, workspace_id, uuid, user_id, custom_val, parameters, event_payload, event_count, created_at, expire_at) - (select j.job_id, j.workspace_id, j.uuid, j.user_id, j.custom_val, j.parameters, j.event_payload, j.event_count, j.created_at, j.expire_at from %[2]q j left join last_status js on js.job_id = j.job_id + (select j.job_id, j.workspace_id, j.uuid, j.user_id, j.custom_val, j.parameters, %[6]s, j.event_count, j.created_at, j.expire_at from %[2]q j left join last_status js on js.job_id = j.job_id where js.job_id is null or js.job_state = ANY('{%[5]s}') order by j.job_id) returning job_id ), insertedStatuses as @@ -484,6 +552,7 @@ func (jd *Handle) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS dat destDS.JobTable, destDS.JobStatusTable, strings.Join(validNonTerminalStates, ","), + payloadLiteral, ) var numJobsMigrated int64 diff --git a/jobsdb/migration_test.go b/jobsdb/migration_test.go index 8036df455e..b834fe0a5b 100644 --- a/jobsdb/migration_test.go +++ b/jobsdb/migration_test.go @@ -11,6 +11,7 @@ import ( "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/testhelper/rand" + "github.com/rudderlabs/rudder-server/utils/tx" ) func TestMigration(t *testing.T) { @@ -326,4 +327,323 @@ func TestMigration(t *testing.T) { updatedTableSizes := getTableSizes(jobDB.getDSList()) require.Equal(t, newTableSizes[fmt.Sprintf("%s_job_status_1", tablePrefix)], updatedTableSizes[fmt.Sprintf("%s_job_status_1", tablePrefix)]) }) + + t.Run("migration between different table types(jsonb, text, bytea)", func(t *testing.T) { + config.Reset() + c := config.New() + c.Set("JobsDB.maxDSSize", 1) + + _ = startPostgres(t) + + triggerAddNewDS := make(chan time.Time) + triggerMigrateDS := make(chan time.Time) + + jobDB := Handle{ + TriggerAddNewDS: func() <-chan time.Time { + return triggerAddNewDS + }, + TriggerMigrateDS: func() <-chan time.Time { + return triggerMigrateDS + }, + config: c, + } + tablePrefix := strings.ToLower(rand.String(5)) + err := jobDB.Setup( + ReadWrite, + true, + tablePrefix, + ) + require.NoError(t, err) + defer jobDB.TearDown() + + c.Set("JobsDB."+tablePrefix+"."+"maxDSRetention", "1ms") + + customVal := rand.String(5) + jobs := genJobs(defaultWorkspaceID, customVal, 30, 1) + require.NoError(t, jobDB.Store(context.Background(), jobs[:10])) + + // let 8 jobs succeed, and 2 repeatedly fail + require.NoError( + t, + jobDB.UpdateJobStatus( + context.Background(), + genJobStatuses(jobs[:8], "executing"), + []string{customVal}, + []ParameterFilterT{}, + ), + ) + require.NoError( + t, + jobDB.UpdateJobStatus( + context.Background(), + genJobStatuses(jobs[:8], "succeeded"), + []string{customVal}, + []ParameterFilterT{}, + ), + ) + + require.NoError( + t, + jobDB.UpdateJobStatus( + context.Background(), + genJobStatuses(jobs[8:10], "executing"), + []string{customVal}, + []ParameterFilterT{}, + ), + `status update failed in 1st DS`, + ) + require.NoError( + t, + jobDB.UpdateJobStatus( + context.Background(), + genJobStatuses(jobs[8:10], "failed"), + []string{customVal}, + []ParameterFilterT{}, + ), + `status update failed in 1st DS`, + ) + require.EqualValues(t, 1, jobDB.GetMaxDSIndex()) + + jobDB.conf.payloadColumnType = 1 + triggerAddNewDS <- time.Now() // trigger addNewDSLoop to run + triggerAddNewDS <- time.Now() // Second time, waits for the first loop to finish + require.EqualValues(t, 2, jobDB.GetMaxDSIndex()) + + var payloadType string + secondTableName := fmt.Sprintf("%s_jobs_2", tablePrefix) + err = jobDB.dbHandle.QueryRowContext(context.Background(), fmt.Sprintf(`select data_type from information_schema.columns where table_name='%s' and column_name='event_payload';`, secondTableName)).Scan(&payloadType) + require.NoError(t, err) + require.EqualValues(t, "bytea", payloadType) + + // add some more jobs to the new DS + require.NoError(t, jobDB.Store(context.Background(), jobs[10:20])) + + // triggerMigrateDS <- time.Now() + // triggerMigrateDS <- time.Now() + // var payloadType_1_1 string + // err = jobDB.dbHandle.QueryRowContext(context.Background(), fmt.Sprintf(`select data_type from information_schema.columns where table_name='%s' and column_name='event_payload';`, tablePrefix+"_jobs_1_1")).Scan(&payloadType_1_1) + // require.NoError(t, err) + // require.EqualValues(t, "bytea", payloadType_1_1) + + triggerAddNewDS <- time.Now() // trigger addNewDSLoop to run + triggerAddNewDS <- time.Now() // Second time, waits for the first loop to finish + require.EqualValues(t, 3, jobDB.GetMaxDSIndex()) + thirdTableName := fmt.Sprintf("%s_jobs_3", tablePrefix) + err = jobDB.dbHandle.QueryRowContext(context.Background(), fmt.Sprintf(`select data_type from information_schema.columns where table_name='%s' and column_name='event_payload';`, thirdTableName)).Scan(&payloadType) + require.NoError(t, err) + require.EqualValues(t, "bytea", payloadType) + + // last DS + // should have enough statuses for a clean up to be triggered + // all non-terminal + require.NoError(t, jobDB.Store(context.Background(), jobs[20:30])) + for i := 0; i < 10; i++ { + require.NoError( + t, + jobDB.UpdateJobStatus( + context.Background(), + genJobStatuses(jobs[20:30], "executing"), + []string{customVal}, + []ParameterFilterT{}, + ), + `status update failed in 3rd DS`, + ) + require.NoError( + t, + jobDB.UpdateJobStatus( + context.Background(), + genJobStatuses(jobs[20:30], "failed"), + []string{customVal}, + []ParameterFilterT{}, + ), + `status update failed in 3rd DS`, + ) + } + + c.Set("JobsDB.maxDSSize", 100000) + jobDB.conf.payloadColumnType = 2 + triggerMigrateDS <- time.Now() // trigger migrateDSLoop to run + triggerMigrateDS <- time.Now() // waits for last loop to finish + + // data moved from both jsonb and bytea columns to a text column + + // we should see that in the three DSs we have, + // the first one should only have non-terminal jobs left now(with only the last status) in an jobs_1_1 + // the second one should have all jobs + // the third DS should have all jobs with only the last status per job + + // check that the first DS has only non-terminal jobs + dsList := jobDB.getDSList() + require.Len(t, dsList, 2) // 2_1, 3 + require.Equal(t, `2_1`, dsList[0].Index) + var count int64 + err = jobDB.dbHandle.QueryRow( + fmt.Sprintf( + `SELECT COUNT(*) FROM %[1]s_jobs_2_1 WHERE %[1]s_jobs_2_1.custom_val = $1`, + tablePrefix, + ), + customVal, + ).Scan(&count) + require.NoError(t, err) + require.EqualValues(t, 12, count) + + err = jobDB.dbHandle.QueryRowContext(context.Background(), fmt.Sprintf(`select data_type from information_schema.columns where table_name='%s' and column_name='event_payload';`, tablePrefix+"_jobs_2_1")).Scan(&payloadType) + require.NoError(t, err) + require.EqualValues(t, "text", payloadType) + + require.Equal(t, `3`, dsList[1].Index) + err = jobDB.dbHandle.QueryRow( + fmt.Sprintf( + `SELECT COUNT(*) FROM %[1]s_jobs_3 WHERE %[1]s_jobs_3.custom_val = $1`, + tablePrefix, + ), + customVal, + ).Scan(&count) + require.NoError(t, err) + require.EqualValues(t, 10, count) + + err = jobDB.dbHandle.QueryRow( + fmt.Sprintf( + `SELECT COUNT(*) FROM %[1]s_job_status_3 where job_state = 'failed';`, + tablePrefix, + ), + ).Scan(&count) + require.NoError(t, err) + require.EqualValues(t, 100, count) + + getJobs, err := jobDB.GetToProcess(context.Background(), GetQueryParams{ + IgnoreCustomValFiltersInQuery: true, + EventsLimit: 1, + JobsLimit: 1, + }, nil) + require.NoError(t, err) + require.Equal(t, 1, getJobs.EventsCount) + require.JSONEq( + t, + `{"receivedAt":"2021-06-06T20:26:39.598+05:30","writeKey":"writeKey","requestIP":"[::1]", "batch": [{"anonymousId":"anon_id","channel":"android-sdk","context":{"app":{"build":"1","name":"RudderAndroidClient","namespace":"com.rudderlabs.android.sdk","version":"1.0"},"device":{"id":"49e4bdd1c280bc00","manufacturer":"Google","model":"Android SDK built for x86","name":"generic_x86"},"library":{"name":"com.rudderstack.android.sdk.core"},"locale":"en-US","network":{"carrier":"Android"},"screen":{"density":420,"height":1794,"width":1080},"traits":{"anonymousId":"49e4bdd1c280bc00"},"user_agent":"Dalvik/2.1.0 (Linux; U; Android 9; Android SDK built for x86 Build/PSR1.180720.075)"},"event":"Demo Track","integrations":{"All":true},"messageId":"b96f3d8a-7c26-4329-9671-4e3202f42f15","originalTimestamp":"2019-08-12T05:08:30.909Z","properties":{"category":"Demo Category","floatVal":4.501,"label":"Demo Label","testArray":[{"id":"elem1","value":"e1"},{"id":"elem2","value":"e2"}],"testMap":{"t1":"a","t2":4},"value":5},"rudderId":"a-292e-4e79-9880-f8009e0ae4a3","sentAt":"2019-08-12T05:08:30.909Z","type":"track"}]}`, + string(getJobs.Jobs[0].EventPayload), + ) + }) +} + +func TestPayloadLiteral(t *testing.T) { + config.Reset() + c := config.New() + c.Set("JobsDB.maxDSSize", 1) + + pg := startPostgres(t) + db := pg.DB + + byteJD := Handle{ + config: c, + } + byteJD.conf.payloadColumnType = 1 + require.NoError(t, byteJD.Setup( + ReadWrite, + true, + "bytea", + )) + defer byteJD.TearDown() + + jsonbJD := Handle{ + config: c, + } + jsonbJD.conf.payloadColumnType = 0 + require.NoError(t, jsonbJD.Setup( + ReadWrite, + true, + "jsonb", + )) + defer jsonbJD.TearDown() + + textJD := Handle{ + config: c, + } + textJD.conf.payloadColumnType = 2 + require.NoError(t, textJD.Setup( + ReadWrite, + true, + "text", + )) + defer textJD.TearDown() + + ctx := context.Background() + jobs := genJobs("wsid", "cv", 1, 1) + require.NoError(t, byteJD.Store(ctx, jobs)) + require.NoError(t, textJD.Store(ctx, jobs)) + require.NoError(t, jsonbJD.Store(ctx, jobs)) + + prefixes := []string{"text", "jsonb", "bytea"} + for i := range prefixes { + _, err := db.ExecContext(ctx, fmt.Sprintf(`ALTER TABLE %[1]s_job_status_1 DROP CONSTRAINT fk_%[1]s_job_status_1_job_id`, prefixes[i])) + require.NoError(t, err) + _, err = db.ExecContext(ctx, fmt.Sprintf(`ALTER TABLE %[1]s_jobs_1 DROP CONSTRAINT %[1]s_jobs_1_pkey`, prefixes[i])) + require.NoError(t, err) + } // we drop these two because migrateJobsInTx moved jobIDs too, and we're only interested in moving jobs between two different column types + txn, err := db.Begin() + require.NoError(t, err) + for i := range prefixes { + for j := range prefixes { + if i == j { + continue + } + src := prefixes[i] + dest := prefixes[j] + _, err := textJD.migrateJobsInTx( + ctx, + &tx.Tx{Tx: txn}, + dataSetT{ + JobTable: src + "_jobs_1", + JobStatusTable: src + "_job_status_1", + Index: "1", + }, + dataSetT{ + JobTable: dest + "_jobs_1", + JobStatusTable: dest + "_job_status_1", + Index: "1", + }, + ) + require.NoError(t, err) + } + } + require.NoError(t, txn.Commit()) + + byteJobs, err := byteJD.GetUnprocessed(ctx, GetQueryParams{ + EventsLimit: 100, JobsLimit: 100, IgnoreCustomValFiltersInQuery: true, + }) + require.NoError(t, err) + textJobs, err := textJD.GetUnprocessed(ctx, GetQueryParams{ + EventsLimit: 100, JobsLimit: 100, IgnoreCustomValFiltersInQuery: true, + }) + require.NoError(t, err) + jsonbJobs, err := jsonbJD.GetUnprocessed(ctx, GetQueryParams{ + EventsLimit: 100, JobsLimit: 100, IgnoreCustomValFiltersInQuery: true, + }) + require.NoError(t, err) + require.Equal(t, 4, byteJobs.EventsCount) + require.Equal(t, 7, textJobs.EventsCount) + require.Equal(t, 6, jsonbJobs.EventsCount) + expectedPayload := `{"receivedAt":"2021-06-06T20:26:39.598+05:30","writeKey":"writeKey","requestIP":"[::1]", "batch": [{"anonymousId":"anon_id","channel":"android-sdk","context":{"app":{"build":"1","name":"RudderAndroidClient","namespace":"com.rudderlabs.android.sdk","version":"1.0"},"device":{"id":"49e4bdd1c280bc00","manufacturer":"Google","model":"Android SDK built for x86","name":"generic_x86"},"library":{"name":"com.rudderstack.android.sdk.core"},"locale":"en-US","network":{"carrier":"Android"},"screen":{"density":420,"height":1794,"width":1080},"traits":{"anonymousId":"49e4bdd1c280bc00"},"user_agent":"Dalvik/2.1.0 (Linux; U; Android 9; Android SDK built for x86 Build/PSR1.180720.075)"},"event":"Demo Track","integrations":{"All":true},"messageId":"b96f3d8a-7c26-4329-9671-4e3202f42f15","originalTimestamp":"2019-08-12T05:08:30.909Z","properties":{"category":"Demo Category","floatVal":4.501,"label":"Demo Label","testArray":[{"id":"elem1","value":"e1"},{"id":"elem2","value":"e2"}],"testMap":{"t1":"a","t2":4},"value":5},"rudderId":"a-292e-4e79-9880-f8009e0ae4a3","sentAt":"2019-08-12T05:08:30.909Z","type":"track"}]}` + + for i := range byteJobs.Jobs { + require.JSONEq( + t, + expectedPayload, + string(byteJobs.Jobs[i].EventPayload), + ) + } + for i := range textJobs.Jobs { + require.JSONEq( + t, + expectedPayload, + string(textJobs.Jobs[i].EventPayload), + ) + } + for i := range jsonbJobs.Jobs { + require.JSONEq( + t, + expectedPayload, + string(jsonbJobs.Jobs[i].EventPayload), + ) + } }