From 192025a2174eb360c4524df93857ca9e988b0848 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" <82795818+Sidddddarth@users.noreply.github.com> Date: Mon, 14 Oct 2024 17:36:46 +0530 Subject: [PATCH 1/3] chore: fix jobsdb flaky test (#5197) --- jobsdb/jobsdb_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/jobsdb/jobsdb_test.go b/jobsdb/jobsdb_test.go index d958c2eeda..2c88a605b3 100644 --- a/jobsdb/jobsdb_test.go +++ b/jobsdb/jobsdb_test.go @@ -1281,6 +1281,7 @@ func TestMaxAgeCleanup(t *testing.T) { false, tablePrefix, )) + defer jobsDB.TearDown() abortedJobs, err := jobsDB.GetAborted( context.Background(), From af08cfa917f227816c20b94b57373d5b415bc163 Mon Sep 17 00:00:00 2001 From: Rohith BCS Date: Tue, 15 Oct 2024 10:42:24 +0530 Subject: [PATCH 2/3] chore: remove old replay code (#5198) --- app/app.go | 5 - app/apphandlers/embeddedAppHandler.go | 19 -- app/features.go | 15 - cmd/backupfilemigrator/backup_file.go | 46 --- .../backup_migrator_integration_test.go | 183 ----------- cmd/backupfilemigrator/file_migrator.go | 284 ----------------- cmd/backupfilemigrator/main.go | 146 --------- ...0.1703893976586.1703894226969.workspace.gz | Bin 4246 -> 0 bytes ...0.1703893976586.1703894226969.workspace.gz | Bin 645 -> 0 bytes enterprise/replay/dumpsloader/dumpsloader.go | 110 ------- enterprise/replay/dumpsloader/gwreplay.go | 125 -------- enterprise/replay/dumpsloader/procreplay.go | 96 ------ enterprise/replay/replayer/replay.go | 185 ----------- enterprise/replay/replayer/sourceWorker.go | 286 ------------------ enterprise/replay/replayer/transformer.go | 67 ---- enterprise/replay/setup.go | 120 -------- enterprise/replay/utils/utils.go | 58 ---- 17 files changed, 1745 deletions(-) delete mode 100644 cmd/backupfilemigrator/backup_file.go delete mode 100644 cmd/backupfilemigrator/backup_migrator_integration_test.go delete mode 100644 cmd/backupfilemigrator/file_migrator.go delete mode 100644 cmd/backupfilemigrator/main.go delete mode 100644 cmd/backupfilemigrator/testdata/gw_jobs_52306.1311260446.1311289420.1703893976586.1703894226969.workspace.gz delete mode 100644 cmd/backupfilemigrator/testdata/gw_jobs_52307.1311260446.1311289420.1703893976586.1703894226969.workspace.gz delete mode 100644 enterprise/replay/dumpsloader/dumpsloader.go delete mode 100644 enterprise/replay/dumpsloader/gwreplay.go delete mode 100644 enterprise/replay/dumpsloader/procreplay.go delete mode 100644 enterprise/replay/replayer/replay.go delete mode 100644 enterprise/replay/replayer/sourceWorker.go delete mode 100644 enterprise/replay/replayer/transformer.go delete mode 100644 enterprise/replay/setup.go delete mode 100644 enterprise/replay/utils/utils.go diff --git a/app/app.go b/app/app.go index c41fc7b902..ef3c2c4036 100644 --- a/app/app.go +++ b/app/app.go @@ -17,7 +17,6 @@ import ( backendconfig "github.com/rudderlabs/rudder-server/backend-config" configenv "github.com/rudderlabs/rudder-server/enterprise/config-env" - "github.com/rudderlabs/rudder-server/enterprise/replay" "github.com/rudderlabs/rudder-server/enterprise/reporting" suppression "github.com/rudderlabs/rudder-server/enterprise/suppress-user" "github.com/rudderlabs/rudder-server/jobsdb" @@ -86,10 +85,6 @@ func (a *app) initFeatures() { EnterpriseToken: a.options.EnterpriseToken, Log: enterpriseLogger.Child("reporting"), }, - Replay: &replay.Factory{ - EnterpriseToken: a.options.EnterpriseToken, - Log: enterpriseLogger.Child("replay"), - }, ConfigEnv: &configenv.Factory{ EnterpriseToken: a.options.EnterpriseToken, Log: enterpriseLogger.Child("config-env"), diff --git a/app/apphandlers/embeddedAppHandler.go b/app/apphandlers/embeddedAppHandler.go index aef1468585..e4d128df30 100644 --- a/app/apphandlers/embeddedAppHandler.go +++ b/app/apphandlers/embeddedAppHandler.go @@ -50,7 +50,6 @@ type embeddedApp struct { versionHandler func(w http.ResponseWriter, r *http.Request) log logger.Logger config struct { - enableReplay bool processorDSLimit config.ValueLoader[int] routerDSLimit config.ValueLoader[int] batchRouterDSLimit config.ValueLoader[int] @@ -59,7 +58,6 @@ type embeddedApp struct { } func (a *embeddedApp) Setup() error { - a.config.enableReplay = config.GetBoolVar(types.DefaultReplayEnabled, "Replay.enabled") a.config.processorDSLimit = config.GetReloadableIntVar(0, 1, "Processor.jobsDB.dsLimit", "JobsDB.dsLimit") a.config.gatewayDSLimit = config.GetReloadableIntVar(0, 1, "Gateway.jobsDB.dsLimit", "JobsDB.dsLimit") a.config.routerDSLimit = config.GetReloadableIntVar(0, 1, "Router.jobsDB.dsLimit", "JobsDB.dsLimit") @@ -378,23 +376,6 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) g.Go(func() error { return gw.StartWebHandler(ctx) }) - if a.config.enableReplay { - var replayDB jobsdb.Handle - err := replayDB.Setup( - jobsdb.ReadWrite, options.ClearDB, "replay", - ) - if err != nil { - return fmt.Errorf("could not setup replayDB: %w", err) - } - replay, err := a.app.Features().Replay.Setup(ctx, config, &replayDB, gatewayDB, routerDB, batchRouterDB) - if err != nil { - return err - } - if err := replay.Start(); err != nil { - return fmt.Errorf("could not start replay: %w", err) - } - defer func() { _ = replay.Stop() }() - } g.Go(func() error { // This should happen only after setupDatabaseTables() is called and journal table migrations are done diff --git a/app/features.go b/app/features.go index e85a84664c..c5f7b0be4d 100644 --- a/app/features.go +++ b/app/features.go @@ -9,8 +9,6 @@ import ( "github.com/rudderlabs/rudder-go-kit/config" backendconfig "github.com/rudderlabs/rudder-server/backend-config" - "github.com/rudderlabs/rudder-server/enterprise/replay" - "github.com/rudderlabs/rudder-server/jobsdb" "github.com/rudderlabs/rudder-server/utils/types" ) @@ -37,24 +35,11 @@ type ReportingFeature interface { Setup(cxt context.Context, backendConfig backendconfig.BackendConfig) types.Reporting } -/********************************* -Replay Feature -*********************************/ - -// ReplayFeature handles inserting of failed jobs into respective gw/rt jobsdb -type ReplayFeature interface { - Setup(ctx context.Context, config *config.Config, replayDB, gwDB, routerDB, batchRouterDB *jobsdb.Handle) (replay.Replay, error) -} - -// ReplayFeatureSetup is a function that initializes a Replay feature -type ReplayFeatureSetup func(App) ReplayFeature - // Features contains optional implementations of Enterprise only features. type Features struct { SuppressUser SuppressUserFeature ConfigEnv ConfigEnvFeature Reporting ReportingFeature - Replay ReplayFeature TrackedUsers TrackedUsersFeature } diff --git a/cmd/backupfilemigrator/backup_file.go b/cmd/backupfilemigrator/backup_file.go deleted file mode 100644 index 7c73a3bdf8..0000000000 --- a/cmd/backupfilemigrator/backup_file.go +++ /dev/null @@ -1,46 +0,0 @@ -package main - -import ( - "fmt" - "regexp" - "strconv" -) - -var backupFileNamePattern = regexp.MustCompile(`^(/)?((?P[^/]+)/)?(?P[^/]+)/gw_jobs_(?P[0-9]+).(?P[0-9]+).(?P[0-9]+).(?P[0-9]+).(?P[0-9]+).(?P[^.]+).gz$`) - -type backupFileInfo struct { - filePath string - prefix string - instance string - tableIDx int64 - minJobID int64 - maxJobID int64 - minJobCreatedAt int64 - maxJobCreatedAt int64 - workspaceID string -} - -func newBackupFileInfo(backupFilePath string) (*backupFileInfo, error) { - matches := backupFileNamePattern.FindStringSubmatch(backupFilePath) - if matches == nil { - return nil, fmt.Errorf("unable to parse object name %s", backupFilePath) - } - - info := &backupFileInfo{ - filePath: backupFilePath, - prefix: matches[backupFileNamePattern.SubexpIndex("prefix")], - instance: matches[backupFileNamePattern.SubexpIndex("instance")], - workspaceID: matches[backupFileNamePattern.SubexpIndex("workspaceID")], - } - var err error - info.tableIDx, err = strconv.ParseInt(matches[backupFileNamePattern.SubexpIndex("tableIDx")], 10, 64) - if err != nil { - return nil, fmt.Errorf("invalid table name in filename %w", err) - } - info.minJobID, _ = strconv.ParseInt(matches[backupFileNamePattern.SubexpIndex("minJobID")], 10, 64) - info.maxJobID, _ = strconv.ParseInt(matches[backupFileNamePattern.SubexpIndex("maxJobID")], 10, 64) - info.minJobCreatedAt, _ = strconv.ParseInt(matches[backupFileNamePattern.SubexpIndex("minJobCreatedAt")], 10, 64) - info.maxJobCreatedAt, _ = strconv.ParseInt(matches[backupFileNamePattern.SubexpIndex("maxJobCreatedAt")], 10, 64) - - return info, nil -} diff --git a/cmd/backupfilemigrator/backup_migrator_integration_test.go b/cmd/backupfilemigrator/backup_migrator_integration_test.go deleted file mode 100644 index 4f29615de8..0000000000 --- a/cmd/backupfilemigrator/backup_migrator_integration_test.go +++ /dev/null @@ -1,183 +0,0 @@ -package main - -import ( - "context" - "os" - "testing" - "time" - - "github.com/rudderlabs/rudder-go-kit/filemanager" - "github.com/rudderlabs/rudder-go-kit/logger" - - "github.com/ory/dockertest/v3" - "github.com/stretchr/testify/require" - - "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/minio" -) - -func TestBackupFileMigrationIntegration(t *testing.T) { - pool, err := dockertest.NewPool("") - require.NoError(t, err) - - prepareFileManager := func(t testing.TB) filemanager.FileManager { - minioContainer, err := minio.Setup(pool, t) - require.NoError(t, err) - - t.Setenv("JOBS_BACKUP_STORAGE_PROVIDER", "MINIO") - t.Setenv("MINIO_ENDPOINT", minioContainer.Endpoint) - t.Setenv("JOBS_BACKUP_BUCKET", minioContainer.BucketName) - t.Setenv("MINIO_SECRET_ACCESS_KEY", minioContainer.AccessKeySecret) - t.Setenv("MINIO_ACCESS_KEY_ID", minioContainer.AccessKeyID) - - fm, err := filemanager.NewMinioManager(minioContainer.ToFileManagerConfig(""), logger.NewLogger(), func() time.Duration { - return time.Minute - }) - require.NoError(t, err) - return fm - } - - t.Run("test successful migration - batch size 5", func(t *testing.T) { - fm := prepareFileManager(t) - f, err := os.Open("testdata/gw_jobs_52306.1311260446.1311289420.1703893976586.1703894226969.workspace.gz") - require.NoError(t, err) - _, err = fm.Upload(context.Background(), f, "dummy", "dummy-v0-rudderstack-10") - require.NoError(t, err) - - require.NoError(t, app.Run([]string{ - "migration-tool", - "migrate", - "-startTime", "2023-12-29T23:52:55.726942+00:00", - "-endTime", "2023-12-29T23:53:59.809574+00:00", - "-uploadBatchSize", "5", - "-backupFileNamePrefix", "dummy", - })) - - itr := filemanager.IterateFilesWithPrefix(context.Background(), "", "", 100, fm) - listOfTransformedFiles := make([]string, 0) - for itr.Next() { - listOfTransformedFiles = append(listOfTransformedFiles, itr.Get().Key) - } - require.Len(t, listOfTransformedFiles, 9) // 8 converted files and one original file - require.ElementsMatch(t, listOfTransformedFiles, []string{ - "dummy/dummy-v0-rudderstack-10/gw_jobs_52306.1311260446.1311289420.1703893976586.1703894226969.workspace.gz", - "dummy/source_id_1/gw/2023-12-29/23/dummy-v0-rudderstack-10/1703893984_1703894012_workspace.json.gz", - "dummy/source_id_2/gw/2023-12-29/23/dummy-v0-rudderstack-10/1703893979_1703893986_workspace.json.gz", - "dummy/source_id_2/gw/2023-12-29/23/dummy-v0-rudderstack-10/1703893988_1703893993_workspace.json.gz", - "dummy/source_id_2/gw/2023-12-29/23/dummy-v0-rudderstack-10/1703893994_1703894000_workspace.json.gz", - "dummy/source_id_2/gw/2023-12-29/23/dummy-v0-rudderstack-10/1703894001_1703894008_workspace.json.gz", - "dummy/source_id_2/gw/2023-12-29/23/dummy-v0-rudderstack-10/1703894009_1703894014_workspace.json.gz", - "dummy/source_id_3/gw/2023-12-29/23/dummy-v0-rudderstack-10/1703893981_1703893995_workspace.json.gz", - "dummy/source_id_3/gw/2023-12-29/23/dummy-v0-rudderstack-10/1703893997_1703894004_workspace.json.gz", - }) - }) - - t.Run("test successful migration - batch size 12", func(t *testing.T) { - fm := prepareFileManager(t) - f, err := os.Open("testdata/gw_jobs_52306.1311260446.1311289420.1703893976586.1703894226969.workspace.gz") - require.NoError(t, err) - _, err = fm.Upload(context.Background(), f, "dummy", "dummy-v0-rudderstack-10") - require.NoError(t, err) - - require.NoError(t, app.Run([]string{ - "migration-tool", - "migrate", - "-startTime", "2023-12-29T23:52:55.726942+00:00", - "-endTime", "2023-12-29T23:53:59.809574+00:00", - "-uploadBatchSize", "12", - "-backupFileNamePrefix", "dummy", - })) - - itr := filemanager.IterateFilesWithPrefix(context.Background(), "", "", 100, fm) - listOfTransformedFiles := make([]string, 0) - for itr.Next() { - listOfTransformedFiles = append(listOfTransformedFiles, itr.Get().Key) - } - require.Len(t, listOfTransformedFiles, 6) // 5 converted files and 1 original file - require.ElementsMatch(t, listOfTransformedFiles, []string{ - "dummy/dummy-v0-rudderstack-10/gw_jobs_52306.1311260446.1311289420.1703893976586.1703894226969.workspace.gz", - "dummy/source_id_2/gw/2023-12-29/23/dummy-v0-rudderstack-10/1703893979_1703893996_workspace.json.gz", - "dummy/source_id_2/gw/2023-12-29/23/dummy-v0-rudderstack-10/1703893998_1703894013_workspace.json.gz", - "dummy/source_id_2/gw/2023-12-29/23/dummy-v0-rudderstack-10/1703894014_1703894014_workspace.json.gz", - "dummy/source_id_3/gw/2023-12-29/23/dummy-v0-rudderstack-10/1703893981_1703894004_workspace.json.gz", - "dummy/source_id_1/gw/2023-12-29/23/dummy-v0-rudderstack-10/1703893984_1703894012_workspace.json.gz", - }) - }) - - t.Run("no migration - out of time range", func(t *testing.T) { - fm := prepareFileManager(t) - f, err := os.Open("testdata/gw_jobs_52306.1311260446.1311289420.1703893976586.1703894226969.workspace.gz") - require.NoError(t, err) - _, err = fm.Upload(context.Background(), f, "dummy", "dummy-v0-rudderstack-10") - require.NoError(t, err) - - require.NoError(t, app.Run([]string{ - "migration-tool", - "migrate", - "-startTime", "2023-11-29T23:52:55.726942+00:00", - "-endTime", "2023-11-29T23:53:59.809574+00:00", - "-uploadBatchSize", "12", - "-backupFileNamePrefix", "dummy", - })) - - itr := filemanager.IterateFilesWithPrefix(context.Background(), "", "", 100, fm) - listOfTransformedFiles := make([]string, 0) - for itr.Next() { - listOfTransformedFiles = append(listOfTransformedFiles, itr.Get().Key) - } - require.ElementsMatch(t, listOfTransformedFiles, []string{ - "dummy/dummy-v0-rudderstack-10/gw_jobs_52306.1311260446.1311289420.1703893976586.1703894226969.workspace.gz", - }) - }) - - t.Run("no migration - no file with prefix", func(t *testing.T) { - fm := prepareFileManager(t) - f, err := os.Open("testdata/gw_jobs_52306.1311260446.1311289420.1703893976586.1703894226969.workspace.gz") - require.NoError(t, err) - _, err = fm.Upload(context.Background(), f, "dummy", "dummy-v0-rudderstack-10") - require.NoError(t, err) - - require.NoError(t, app.Run([]string{ - "migration-tool", - "migrate", - "-startTime", "2023-11-29T23:52:55.726942+00:00", - "-endTime", "2023-11-29T23:53:59.809574+00:00", - "-uploadBatchSize", "12", - "-backupFileNamePrefix", "randomPrefix", - })) - - itr := filemanager.IterateFilesWithPrefix(context.Background(), "", "", 100, fm) - listOfTransformedFiles := make([]string, 0) - for itr.Next() { - listOfTransformedFiles = append(listOfTransformedFiles, itr.Get().Key) - } - require.ElementsMatch(t, listOfTransformedFiles, []string{ - "dummy/dummy-v0-rudderstack-10/gw_jobs_52306.1311260446.1311289420.1703893976586.1703894226969.workspace.gz", - }) - }) - - t.Run("failed to migrate file - parsing fail", func(t *testing.T) { - fm := prepareFileManager(t) - f, err := os.Open("testdata/gw_jobs_52307.1311260446.1311289420.1703893976586.1703894226969.workspace.gz") - require.NoError(t, err) - _, err = fm.Upload(context.Background(), f, "dummy", "dummy-v0-rudderstack-10") - require.NoError(t, err) - - require.Error(t, app.Run([]string{ - "migration-tool", - "migrate", - "-startTime", "2023-12-29T23:52:55.726942+00:00", - "-endTime", "2023-12-29T23:53:59.809574+00:00", - "-uploadBatchSize", "12", - "-backupFileNamePrefix", "dummy", - })) - - itr := filemanager.IterateFilesWithPrefix(context.Background(), "", "", 100, fm) - listOfTransformedFiles := make([]string, 0) - for itr.Next() { - listOfTransformedFiles = append(listOfTransformedFiles, itr.Get().Key) - } - require.ElementsMatch(t, listOfTransformedFiles, []string{ - "dummy/dummy-v0-rudderstack-10/gw_jobs_52307.1311260446.1311289420.1703893976586.1703894226969.workspace.gz", - }) - }) -} diff --git a/cmd/backupfilemigrator/file_migrator.go b/cmd/backupfilemigrator/file_migrator.go deleted file mode 100644 index a2d808e96c..0000000000 --- a/cmd/backupfilemigrator/file_migrator.go +++ /dev/null @@ -1,284 +0,0 @@ -package main - -import ( - "bufio" - "compress/gzip" - "context" - "encoding/json" - "fmt" - "os" - "path" - "path/filepath" - "strings" - "time" - - "github.com/rudderlabs/rudder-go-kit/stringify" - obskit "github.com/rudderlabs/rudder-observability-kit/go/labels" - - kitconfig "github.com/rudderlabs/rudder-go-kit/config" - - "github.com/rudderlabs/rudder-go-kit/bytesize" - - "github.com/rudderlabs/rudder-server/utils/types" - - jsoniter "github.com/json-iterator/go" - - "github.com/samber/lo" - - "github.com/rudderlabs/rudder-server/utils/misc" - - "github.com/tidwall/gjson" - - "github.com/rudderlabs/rudder-go-kit/filemanager" - kitlogger "github.com/rudderlabs/rudder-go-kit/logger" - - "github.com/rudderlabs/rudder-server/enterprise/replay/utils" -) - -const ( - gatewayJobsFilePrefix = "gw_jobs_" - localDumpDirName = "/rudder-s3-dumps/" -) - -var jsonfast = jsoniter.ConfigCompatibleWithStandardLibrary - -type fileMigrator struct { - conf *config - logger kitlogger.Logger - fileManager filemanager.FileManager -} - -type config struct { - startTime time.Time - endTime time.Time - backupFileNamePrefix string - uploadBatchSize int -} - -// list all the files that needs to be migrated -func (m *fileMigrator) listFilePathToMigrate(ctx context.Context) []*backupFileInfo { - listOfFiles := make([]*backupFileInfo, 0) - - startTimeMilli := m.conf.startTime.UnixNano() / int64(time.Millisecond) - endTimeMilli := m.conf.endTime.UnixNano() / int64(time.Millisecond) - - iterator := filemanager.IterateFilesWithPrefix(ctx, - m.conf.backupFileNamePrefix, - "", - 100, - m.fileManager, - ) - for iterator.Next() { - object := iterator.Get() - filePath := object.Key - if !strings.Contains(filePath, gatewayJobsFilePrefix) { - continue - } - - // file name should be of format gw_jobs_9710.974705928.974806056.1604871241214.1604872598504.gz - fileInfo, err := newBackupFileInfo(filePath) - if err != nil { - m.logger.Warnn("unable to parse file name", kitlogger.NewStringField("filePath", filePath), obskit.Error(err)) - continue - } - - // gw dump file name format gw_jobs_..._.gz - // ex: gw_jobs_9710.974705928.974806056.1604871241214.1604872598504.gz - minJobCreatedAt, maxJobCreatedAt, err := utils.GetMinMaxCreatedAt(filePath) - var pass bool - if err == nil { - pass = maxJobCreatedAt >= startTimeMilli && minJobCreatedAt <= endTimeMilli - } else { - m.logger.Warnn("parsing failed, fallback to comparing start and end time stamps with gw dump last modified.", kitlogger.NewStringField("filePath", filePath), obskit.Error(err)) - pass = object.LastModified.After(m.conf.startTime) && object.LastModified.Before(m.conf.endTime) - } - if pass { - listOfFiles = append(listOfFiles, fileInfo) - } - } - return listOfFiles -} - -// convert old file line entry to new file format -func (m *fileMigrator) convertToNewFormat(lineBytes []byte, createdAt time.Time) ([]*newFileFormat, error) { - var gatewayBatchReq []types.SingularEventT - err := jsonfast.Unmarshal([]byte(gjson.GetBytes(lineBytes, "event_payload.batch").String()), &gatewayBatchReq) - if err != nil { - return nil, fmt.Errorf("unable to unmarshall gateway events: %w", err) - } - listOfNewEvents := make([]*newFileFormat, 0, len(gatewayBatchReq)) - userID := gjson.GetBytes(lineBytes, "user_id").String() - for _, singleEvent := range gatewayBatchReq { - payloadBytes, err := json.Marshal(singleEvent) - if err != nil { - return nil, fmt.Errorf("unable to marshall single event: %w", err) - } - j := &newFileFormat{} - j.UserID = userID - j.EventPayload = payloadBytes - j.CreatedAt = createdAt - j.MessageID = stringify.Any(singleEvent["messageId"]) - listOfNewEvents = append(listOfNewEvents, j) - } - return listOfNewEvents, nil -} - -// uploadFile creates a new format file and upload to storage -func (m *fileMigrator) uploadFile(ctx context.Context, jobs []*newFileFormat, sourceId, workspaceId, instanceId string) error { - firstJobCreatedAt := jobs[0].CreatedAt.UTC() - lastJobCreatedAt := jobs[len(jobs)-1].CreatedAt.UTC() - - localFilePath := path.Join( - lo.Must(misc.CreateTMPDIR()), - "rudder-backups", - sourceId, - fmt.Sprintf("%d_%d_%s.json.gz", firstJobCreatedAt.Unix(), lastJobCreatedAt.Unix(), workspaceId), - ) - if err := os.MkdirAll(filepath.Dir(localFilePath), os.ModePerm); err != nil { - return fmt.Errorf("creating gz file %q: mkdir error: %w", localFilePath, err) - } - writer, err := misc.CreateGZ(localFilePath) - if err != nil { - return fmt.Errorf("failed to create gz writer: %w", err) - } - defer func() { _ = os.Remove(localFilePath) }() - - for _, job := range jobs { - jobBytes, err := json.Marshal(job) - if err != nil { - _ = writer.Close() - return fmt.Errorf("failed to marshal job: %w", err) - } - if _, err := writer.Write(append(jobBytes, '\n')); err != nil { - _ = writer.Close() - return fmt.Errorf("write to local file failed: %w", err) - } - } - if err := writer.Close(); err != nil { - return fmt.Errorf("failed to close writer: %w", err) - } - - localFile, err := os.Open(localFilePath) - if err != nil { - return fmt.Errorf("opening local file: %w", err) - } - defer func() { _ = localFile.Close() }() - prefixes := []string{ - sourceId, - "gw", - firstJobCreatedAt.Format("2006-01-02"), - fmt.Sprintf("%d", firstJobCreatedAt.Hour()), - instanceId, - } - _, err = m.fileManager.Upload(ctx, localFile, prefixes...) - if err != nil { - return fmt.Errorf("failed to upload file: %w", err) - } - return nil -} - -// download file locally -func (m *fileMigrator) downloadFile(ctx context.Context, filePath string) (string, error) { - filePathTokens := strings.Split(filePath, "/") - // e.g. rudder-saas/dummy/dummy-v0-rudderstack-10/gw_jobs_11796.317963152.317994396.1703547948443.1703548519552.dummy-workspace-id.gz - tmpdirPath, err := misc.CreateTMPDIR() - if err != nil { - return "", fmt.Errorf("failed to create tmp directory: %w", err) - } - tempPath := path.Join(tmpdirPath, localDumpDirName, filePathTokens[len(filePathTokens)-1]) - - err = os.MkdirAll(filepath.Dir(tempPath), os.ModePerm) - if err != nil { - return "", fmt.Errorf("failed to make local directory: %w", err) - } - - file, err := os.Create(tempPath) - if err != nil { - return "", fmt.Errorf("make local file: %w", err) - } - defer func() { - err = file.Close() - if err != nil { - m.logger.Errorn("closing local file", kitlogger.NewStringField("tempFilePath", tempPath), obskit.Error(err)) - } - }() - - err = m.fileManager.Download(ctx, file, filePath) - if err != nil { - return "", fmt.Errorf("downloading file: %w", err) - } - m.logger.Debugf("file downloaded at %s", tempPath) - return tempPath, nil -} - -func (m *fileMigrator) processFile(ctx context.Context, fileInfo *backupFileInfo) error { - instanceID := fileInfo.instance - workspaceID := fileInfo.workspaceID - - localFilePath, err := m.downloadFile(ctx, fileInfo.filePath) - if err != nil { - return fmt.Errorf("failed to download file locally, err: %w", err) - } - defer func() { _ = os.Remove(localFilePath) }() - - rawFile, err := os.Open(localFilePath) - if err != nil { - return fmt.Errorf("creating gzip reader, err: %w", err) - } - - reader, err := gzip.NewReader(rawFile) - if err != nil { - return fmt.Errorf("failed to get new gzip reader, err: %w", err) - } - - sc := bufio.NewScanner(reader) - // default scanner buffer maxCapacity is 64K - // set it to higher value to avoid read stop on read size error - maxCapacity := kitconfig.GetInt64("FileMigrator.maxScannerCapacity", 10*bytesize.MB) // 10MB - buf := make([]byte, maxCapacity) - sc.Buffer(buf, int(maxCapacity)) - - defer func() { _ = rawFile.Close() }() - - eventsToDump := make(map[string][]*newFileFormat) - for sc.Scan() { - lineBytes := sc.Bytes() - copyLineBytes := make([]byte, len(lineBytes)) - copy(copyLineBytes, lineBytes) - timeStamp := gjson.GetBytes(copyLineBytes, "created_at").String() - createdAt, err := time.Parse(time.RFC3339Nano, timeStamp) - if err != nil { - return fmt.Errorf("failed to parse created_at, err: %w", err) - } - if !(m.conf.startTime.Before(createdAt) && m.conf.endTime.After(createdAt)) { - continue - } - - // convert to new format - newFormatFileEntries, err := m.convertToNewFormat(copyLineBytes, createdAt) - if err != nil { - return fmt.Errorf("failed to convert to new file format, err: %w", err) - } - sourceID := gjson.GetBytes(copyLineBytes, "parameters.source_id").String() - if eventsToDump[sourceID] == nil { - eventsToDump[sourceID] = make([]*newFileFormat, 0) - } - // prepare batch dump to a new file - eventsToDump[sourceID] = append(eventsToDump[sourceID], newFormatFileEntries...) - // save to new file - if len(eventsToDump[sourceID]) >= m.conf.uploadBatchSize { - err := m.uploadFile(ctx, eventsToDump[sourceID], sourceID, workspaceID, instanceID) - if err != nil { - return fmt.Errorf("failed to upload file, sourceID:%s, err: %w", sourceID, err) - } - delete(eventsToDump, sourceID) - } - } - for sourceID, jobs := range eventsToDump { - err := m.uploadFile(ctx, jobs, sourceID, workspaceID, instanceID) - if err != nil { - return fmt.Errorf("failed to upload file, sourceID:%s, err: %w", sourceID, err) - } - } - return nil -} diff --git a/cmd/backupfilemigrator/main.go b/cmd/backupfilemigrator/main.go deleted file mode 100644 index 3dd62207f3..0000000000 --- a/cmd/backupfilemigrator/main.go +++ /dev/null @@ -1,146 +0,0 @@ -package main - -import ( - "context" - "encoding/json" - "fmt" - "os" - "os/signal" - "sort" - "strings" - "syscall" - "time" - - obskit "github.com/rudderlabs/rudder-observability-kit/go/labels" - - "github.com/urfave/cli/v2" - - kitconfig "github.com/rudderlabs/rudder-go-kit/config" - "github.com/rudderlabs/rudder-go-kit/filemanager" - kitlogger "github.com/rudderlabs/rudder-go-kit/logger" - "github.com/rudderlabs/rudder-server/utils/filemanagerutil" -) - -type newFileFormat struct { - UserID string `json:"userId"` - EventPayload json.RawMessage `json:"payload"` - CreatedAt time.Time `json:"createdAt"` - MessageID string `json:"messageId"` -} - -var app = &cli.App{ - Name: "migration-tool", - Usage: "Backup file migration tool", - Commands: []*cli.Command{ - { - Name: "migrate", - Usage: "Backup file migration", - Flags: []cli.Flag{ - &cli.TimestampFlag{ - Name: "startTime", - Layout: time.RFC3339Nano, - Usage: "start time from which we need to convert files in RFC3339 format", - Required: true, - }, - &cli.TimestampFlag{ - Name: "endTime", - Layout: time.RFC3339Nano, - Usage: "end time upto which we need to convert files in RFC3339 format", - Required: true, - }, - &cli.IntFlag{ - Name: "uploadBatchSize", - Usage: "no. of messages to upload in single file", - Required: true, - }, - &cli.StringFlag{ - Name: "backupFileNamePrefix", - Usage: "prefix to identify files in the bucket", - Required: true, - }, - }, - Action: func(c *cli.Context) error { - return migrate(c) - }, - }, - }, -} - -func init() { - sort.Sort(cli.FlagsByName(app.Flags)) - sort.Sort(cli.CommandsByName(app.Commands)) -} - -func main() { - if err := app.Run(os.Args); err != nil { - fmt.Println(err) - os.Exit(1) - } -} - -func migrate(c *cli.Context) error { - ctx := c.Context - startTime := c.Timestamp("startTime") - endTime := c.Timestamp("endTime") - uploadBatchSize := c.Int("uploadBatchSize") - backupFileNamePrefix := c.String("backupFileNamePrefix") - conf := kitconfig.New() - logger := kitlogger.NewFactory(conf).NewLogger().Child("backup-file-migrator") - - return run(ctx, conf, logger, *startTime, *endTime, uploadBatchSize, backupFileNamePrefix) -} - -func run( - ctx context.Context, - conf *kitconfig.Config, - logger kitlogger.Logger, - startTime time.Time, - endTime time.Time, - uploadBatchSize int, - backupFileNamePrefix string, -) error { - ctx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) - defer cancel() - - backupFileNamePrefix = strings.TrimSpace(backupFileNamePrefix) - if backupFileNamePrefix == "" { - return fmt.Errorf("backupFileNamePrefix should not be empty") - } - - storageProvider := kitconfig.GetString("JOBS_BACKUP_STORAGE_PROVIDER", "S3") - fileManagerOpts := filemanagerutil.ProviderConfigOpts(ctx, storageProvider, conf) - fileManagerOpts.Prefix = backupFileNamePrefix - fileManager, err := filemanager.New(&filemanager.Settings{ - Provider: storageProvider, - Config: filemanager.GetProviderConfigFromEnv(fileManagerOpts), - Conf: conf, - }) - if err != nil { - return fmt.Errorf("creating file manager, err: %w", err) - } - - migratorConfig := &config{ - startTime: startTime, - endTime: endTime, - backupFileNamePrefix: backupFileNamePrefix, - uploadBatchSize: uploadBatchSize, - } - - migrator := &fileMigrator{ - conf: migratorConfig, - logger: logger, - fileManager: fileManager, - } - // list all files to migrate - listOfFiles := migrator.listFilePathToMigrate(ctx) - - // step 2: download data from file - for _, fileInfo := range listOfFiles { - err := migrator.processFile(ctx, fileInfo) - if err != nil { - logger.Errorn("fail to process file", kitlogger.NewStringField("filePath", fileInfo.filePath), obskit.Error(err)) - return err - } - } - return nil -} diff --git a/cmd/backupfilemigrator/testdata/gw_jobs_52306.1311260446.1311289420.1703893976586.1703894226969.workspace.gz b/cmd/backupfilemigrator/testdata/gw_jobs_52306.1311260446.1311289420.1703893976586.1703894226969.workspace.gz deleted file mode 100644 index b62d3f1dd9639cd75cb1a34abaa2e3f7638a2e65..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 4246 zcmV;H5NYopiwFn}QrBew17~+%YHwn5Uo|o_Fg7kRGchqTHZU|aHZCzUF)=bYIW#gb zE-^PSGdMXjIX5;nI5sXZH!w3eIW#gdHaRvqE;2}AYC?8-d1g>oGj1?ZPE%o7NH=9g zSadOZE^2dcZUF6_Yj4{~mWIFYuNdU(ZZ+zDY0M1Z9?62dCbi1a7RbdSsH-JPOfrtr zw%F+ZzNh4*XIfDf65s}mF=HT>DN!V=`0!lnty90e`TThm-u(EdH&GaD9V1AKq!DMH zA?+kZMg;2>rO7D%=0E=Qrv2dF?e`#z5Gpa}61IN+6z`ulzbX2f6qEV^es7ymvo3ZR z7v0;{^7xao)k-N4t&TyL*^#_xSv2 z`=NXHv)$x9CX2l7qrJCX+{XJ)u(a;eP$;qy5K0Z1-^!(bvrgw*I!?N4wj6uwj2@ z&Tjn@Uj5H6Z??OSG4Hy)w|@QGr}LKE>3DJR)AV{;)%z2J%@^Nk9`OT~y4g3&(!PHm z_iX~*{qxE{olU&YZ{B|nA>L2-ry3~SNHPc{yjRejN+M@$gg&*DdWjh}KJW7m8q@~6 zwNFj@^qq_SSKdNHuRgrrpVAME{($$b)WuJq>|K013KLdv$uaUyCB!|k$XNnMvCIX_ zFx7(OquBQ!-tW2(pFe%s?Sy+R@Alz-b+>wNH`NM`x`oF7f1Bsm|IJ-?zuLzCjJy2F z{cE(2JG4KE`}p_I@HNx_%I}dM_CHVCeO&zVehxqm1 ze)$b?3>=3j5+X@LgJCL=)ut{Gk6W>L1YyTAM;<7-7yh6;k1vC#Cy8e}mA%#ATJ1`s{69? z+hTF^P=h6(O>O}5t9Diu+omo0vc1DKz3GclvA5*X+aQ@xh(v4=*U=#74MT}Bskx5~ z?utJQaXMJ?0rA&>IK@J<1TH!Y6ez=9BTJM+A*Ez2cGe|)6mfxnk9f1Vp-ngMo6&Sz z)8cWK5no)bud8(rPW!g#F4uKiv$894JihvB$zue24F^Zw6N9*M;8ZYW5w*cdB|S8~ zI|Fm*UwS&`Y_Q}5=C1*B&z%brOsG;JJM3H`?W0182C%F^JP+YG=HxIY_qb@seNjyx zimqTqb+aqyPbfRZDeMPV;uR58{9)MgXT+8a*bdmfRBVlk(QND` zHJTZsB}dc|jkvYYTLNR0unW*tjIekf;_M6uTnrBQfcR@bJP2o$(iBM?!7{@wk4)qQ z@rbDm7;DVFW0~23W$wqUY(1%~>9(MNIym6PXuZ3xHV}K?ZOeYSF8kZA=+^>%w;5ee zlvKh}#H<$J{Tqsm4H_vOlt|1NCC*8sorn05A+!PU0r7u5#0l4uxFnHK0lGyH=oYwQ z1aJ$NQ)xUM&z1{*kVae7xUTMS(T}&y;_apYD_(%*zFv%X&8P+|J{C>87Ih!mqN~Z* z7+WN6HD?6Hke@LWL(|hBQ6eT8vG=yAiifhgS~Zg!+DxuC z`)qlVGI~J(Y;e&z<{D{9fp~@mbs@^&;yH+$bK-G5Sn>h!*MK;;##rGKA`lwGl~V@| zhBcxj8XAJoBzz|xKTxmD7H`|SYBqJX-WKgmUuMK7b9TMB>gvAOmZLk|EH2yG;&xfk ztG%;sg10hSgQ#G6G!A?TmppWU=Fxf+q0>%x)+Lq%pQ>I1#0T}-fcUFG9Gr2mqskzg zVnjSZaBP!HD01Tkrc~-Ajw61cUYk_~n^jj`F#(I+fwf*?+>F-rdNJ+Gss_sr)_ii) z&#F68PKu|L8&ifRY*BJ>NDht!a!g2MwZxG@e9iqCjyjEgr)8?;&oR)T<2{20-X27n3a9G zDn^AU!Q$^zZpy;}l}AZ3=q;Bp(#*gaSf!Z5%!qJ?6+aX4=Txo@h!2RrT*SErkgY== zymCN1CS(=i$Ox$ceH()I#}PkJt~J$EfE8bsqxrVTdh@J7uP!hk{y580mww&Xi<^h0 zU4u0*{~>Fk#!$;SV$?&VATURoXo|dHHsWL?_NU8QB+DV&(0)9HTD+xE;8CvD_M|zyJq=l(Jc`2GO>S%%bRdOahY-O*+o)fpV?P z4)|!()xF=&7MIH`Ybi&Y_4Q)>*i7cItz7Pww$mXX zYW5Jfo{U^MXMz9u*vt zYXgXLfSm-Nq#-6+tN$>>2kqK`_^Uu1uA2KGLN1CV5zk{&R!{Vwa@mGO(h+ruh*9c>p4XM;fN2ZaBl0rZ%38%;Fr%Lk;;-svaO)kS!EXuZS z=eYioR&z-R8I23XdqDVfEt3E(KBpFKsMRuH`(m*LHd*j8Ad`CpTxyA|7ZmZB zn5I&56OR{iP~TKid)pP&<*I1c;Amew=34|`JvEeScX|r#T*L=4+JN}0K-?zDByk1_;{{Nrp}}Y%h#2WG zbvh@rjw5~`Ml0LRYBsvs6jiRva`(7*zN=}T8@4ODdRx;mExY-yrp>D8ZL)nc}2Hgz|BEc>UT zYY@I01#vNoIL}Sf$wP>3;Zl<^DLO}#IDH1FqVg=npHsFr`04}VFBWmImnzyMP*f@A zQfqmhHQ`8klMp+31Fr`}GUve7}A{ce4c_ofNbXMSHg^ zMtA!}Ij0;e4@_$z;hq5E^V$Uw;)Xm3PBq2)^hCL05*vLw;)8H)K>Sr89%9fk5QeN{ z5D^mzR-7?{yo4B?C!xGNUaCbM7G3K-t`~E_H%|@QEVH7e?CZxTMa#{$nShrLuj{S^ ztNwPK(N&7uKr!Fn1c)m*xaHtXge0To&O6J)>4QJ;cDxOvu$_tH+?)J-*6#Y-#_2Oz-0OEC9?3(JT zD<@a0W;EYV57R8KG?dFJ>byJ&!|AEF9HWt@TyCAqKMe6fv^F6ADiFuYMeU;ju#L$L z=8?{mx&*k*F<=rDIX?YU9f;P-%0CwU{Gl0*w?#W<&ELR^?@P68)^)oE>~Cl>`l&0s z+f6z8X}?^kP=s+D2%~BQf-mqqpHK_Hj+^;}1&!t*eN$YJ%%XN{3 zYi(GUqjI%()P->Z8W=H21>z=f1WrXl%EQAYQWZEmV`%(5#MvNP8xVgLh!dJwb`czP zqp~5DNONHkm5L++Edf_@9Pw|K3th8WwOo~X5?Mdr&7Rf~%;`2f1LwUMjp?lNtc1rv z-THkR%^GF269y?xOSr8?i2}H3b<5d zp}~~PqP09>Q@JEq2-5TKApVVLtz6}DsO98s*Nj3}=JgIPzKGVm<6iKPf=WX3)9W`V zh1KbT7B*f!{ROT2J5sby1&l>d0<*l3 sQA!$dp#^d-ab%cQSb7bQlG7)FoJsglK5ZcUUzqU!1M1)ITPKSE08IliqW}N^ diff --git a/cmd/backupfilemigrator/testdata/gw_jobs_52307.1311260446.1311289420.1703893976586.1703894226969.workspace.gz b/cmd/backupfilemigrator/testdata/gw_jobs_52307.1311260446.1311289420.1703893976586.1703894226969.workspace.gz deleted file mode 100644 index 1031f7e5342fe05fcfc2fe15c73d8a348bd45427..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 645 zcmV;00($))iwFqEpWI~t17~+%YHwn5Uo|o_Fg7kRGchqTHZU|aHZCzUF)=bYIW#gb zE-^PSGdMXjIX5;nI5sXZH!w3eIW#gdHaRvqE;2}AYC?8-d1g>oGj1?ZPE%o7NH=9g zSadOZE^2dcZUBu^?`zvI5dBw*zitY$WGQwig^rdI){Id?Lo!A|QQHX5*oqjhE#Y_Ee1S zR`KX&Zz+aof;2f-P#wg*mD1%^8Uu0x|Gup=1<$Mck&v=-4Yy$>Lv}hWK?V_e`jl#o| zPnS9KsfVj~%CyxX=PY2MS)E7T{j0WE*}1CQ%BzeW@>-i!vw=TNpxmsH9AV$W-2 z{$tlXtv1C=QNem#xUSZj|4)0q`-z#t;%Dr9VmOl(b|h#p__K9YhF^Uw?uBoUD=fCC fFPZPQS@KzU`+4=_E*zU&Z1MgZE8aGR;R65wLq{vl diff --git a/enterprise/replay/dumpsloader/dumpsloader.go b/enterprise/replay/dumpsloader/dumpsloader.go deleted file mode 100644 index 411411a861..0000000000 --- a/enterprise/replay/dumpsloader/dumpsloader.go +++ /dev/null @@ -1,110 +0,0 @@ -package dumpsloader - -import ( - "context" - "fmt" - "sort" - "strings" - "time" - - "golang.org/x/sync/errgroup" - - "github.com/rudderlabs/rudder-go-kit/config" - "github.com/rudderlabs/rudder-go-kit/filemanager" - "github.com/rudderlabs/rudder-server/enterprise/replay/utils" - - "github.com/tidwall/gjson" - - "github.com/rudderlabs/rudder-go-kit/logger" - "github.com/rudderlabs/rudder-server/jobsdb" -) - -type DumpsLoader interface { - Start() - Stop() error -} - -// DumpsLoaderHandleT - dumps-loader handle -type dumpsLoader struct { - ctx context.Context - cancel context.CancelFunc - log logger.Logger - dbHandle *jobsdb.Handle - uploader filemanager.FileManager - config dumpsConfig -} - -type dumpsConfig struct { - prefix string - bucket string - startAfterKey string - startTime time.Time - endTime time.Time -} - -// procErrorRequestHandler is an empty struct to capture Proc Error re-stream request handling functionality -type procErrorRequestHandler struct { - *dumpsLoader - g errgroup.Group - tablePrefix string -} - -// gwReplayRequestHandler is an empty struct to capture Gateway replay handling functionality -type gwReplayRequestHandler struct { - *dumpsLoader - g errgroup.Group - tablePrefix string -} - -type OrderedJobs struct { - SortIndex int - Job *jobsdb.JobT -} - -func storeJobs(ctx context.Context, objects []OrderedJobs, dbHandle *jobsdb.Handle, log logger.Logger) error { - // sorting dumps list on index - sort.Slice(objects, func(i, j int) bool { - return objects[i].SortIndex < objects[j].SortIndex - }) - - var jobs []*jobsdb.JobT - for _, object := range objects { - jobs = append(jobs, object.Job) - } - - log.Info("Total dumps count : ", len(objects)) - err := dbHandle.Store(ctx, jobs) - if err != nil { - return fmt.Errorf("failed to write dumps locations to DB with error: %w", err) - } - return nil -} - -// Setup sets up dumps-loader. -func Setup(ctx context.Context, config *config.Config, db *jobsdb.Handle, tablePrefix string, uploader filemanager.FileManager, bucket string, log logger.Logger) (DumpsLoader, error) { - ctx, cancel := context.WithCancel(ctx) - dumpHandler := &dumpsLoader{ - ctx: ctx, - cancel: cancel, - log: log, - dbHandle: db, - uploader: uploader, - config: dumpsConfig{ - prefix: strings.TrimSpace(config.GetString("JOBS_REPLAY_BACKUP_PREFIX", "")), - bucket: bucket, - startAfterKey: gjson.GetBytes(db.GetLastJob(ctx).EventPayload, "location").String(), - }, - } - startTime, endTime, err := utils.GetStartAndEndTime(config) - if err != nil { - return nil, err - } - dumpHandler.config.startTime = startTime - dumpHandler.config.endTime = endTime - switch tablePrefix { - case "gw": - return &gwReplayRequestHandler{dumpHandler, errgroup.Group{}, tablePrefix}, nil - default: - return &procErrorRequestHandler{dumpHandler, errgroup.Group{}, tablePrefix}, nil - } -} diff --git a/enterprise/replay/dumpsloader/gwreplay.go b/enterprise/replay/dumpsloader/gwreplay.go deleted file mode 100644 index c71348091b..0000000000 --- a/enterprise/replay/dumpsloader/gwreplay.go +++ /dev/null @@ -1,125 +0,0 @@ -package dumpsloader - -import ( - "context" - "fmt" - "strconv" - "strings" - "time" - - "github.com/google/uuid" - - "github.com/rudderlabs/rudder-server/enterprise/replay/utils" - - "github.com/rudderlabs/rudder-go-kit/config" - "github.com/rudderlabs/rudder-go-kit/filemanager" - "github.com/rudderlabs/rudder-server/jobsdb" -) - -func (g *gwReplayRequestHandler) Start() { - g.handleRecovery() - g.g.Go(func() error { - return g.fetchDumpsList(g.ctx) - }) -} - -func (g *gwReplayRequestHandler) Stop() error { - g.cancel() - return g.g.Wait() -} - -func (g *gwReplayRequestHandler) handleRecovery() { - // remove dangling executing - g.dbHandle.FailExecuting() -} - -func (g *gwReplayRequestHandler) fetchDumpsList(ctx context.Context) error { - maxItems := config.GetInt64("MAX_ITEMS", 1000) // MAX_ITEMS is the max number of files to be fetched in one iteration from object storage - uploadMaxItems := config.GetInt("UPLOAD_MAX_ITEMS", 1) // UPLOAD_MAX_ITEMS is the max number of objects to be uploaded to postgres - - g.log.Info("Fetching gw dump files list") - objects := make([]OrderedJobs, 0) - iter := filemanager.IterateFilesWithPrefix(ctx, - g.config.prefix, - g.config.startAfterKey, - maxItems, - g.uploader, - ) - for iter.Next() { - object := iter.Get() - filePath := object.Key - if strings.Contains(filePath, "gw_jobs_") { - startTimeMilli := g.config.startTime.UnixNano() / int64(time.Millisecond) - endTimeMilli := g.config.endTime.UnixNano() / int64(time.Millisecond) - key := object.Key - tokens := strings.Split(key, "gw_jobs_") - tokens = strings.Split(tokens[1], ".") - var idx int - var err error - if idx, err = strconv.Atoi(tokens[0]); err != nil { - continue - } - - // gw dump file name format gw_jobs_..._.gz - // ex: gw_jobs_9710.974705928.974806056.1604871241214.1604872598504.gz - minJobCreatedAt, maxJobCreatedAt, err := utils.GetMinMaxCreatedAt(object.Key) - var pass bool - if err == nil { - pass = maxJobCreatedAt >= startTimeMilli && minJobCreatedAt <= endTimeMilli - } else { - g.log.Infof("gw dump name(%s) is not of the expected format. Parse failed with error %w", object.Key, err) - g.log.Info("Falling back to comparing start and end time stamps with gw dump last modified.") - pass = object.LastModified.After(g.config.startTime) && object.LastModified.Before(g.config.endTime) - } - if pass { - job := jobsdb.JobT{ - UUID: uuid.New(), - UserID: fmt.Sprintf(`random-%s`, uuid.New()), - Parameters: []byte(`{}`), - CustomVal: "replay", - EventPayload: []byte(fmt.Sprintf(`{"location": %q}`, object.Key)), - } - objects = append(objects, OrderedJobs{Job: &job, SortIndex: idx}) - } - } else { - startTimeMilli := g.config.startTime.Unix() - fileName := strings.Split(filePath, "/")[len(strings.Split(filePath, "/"))-1] - firstEventAt, err := strconv.ParseInt(strings.Split(fileName, "_")[0], 10, 64) - if err != nil { - g.log.Info("Failed to parse firstEventAt from file name: ", fileName) - continue - } - if firstEventAt >= startTimeMilli { - job := jobsdb.JobT{ - UUID: uuid.New(), - UserID: fmt.Sprintf(`random-%s`, uuid.New()), - Parameters: []byte(`{}`), - CustomVal: "replay", - EventPayload: []byte(fmt.Sprintf(`{"location": %q}`, object.Key)), - } - objects = append(objects, OrderedJobs{Job: &job, SortIndex: int(firstEventAt)}) - } - } - - if len(objects) >= uploadMaxItems { - err := storeJobs(ctx, objects, g.dbHandle, g.log) - if err != nil { - return err - } - objects = nil - } - } - - if iter.Err() != nil { - return fmt.Errorf("failed to iterate gw dump files with error: %w", iter.Err()) - } - if len(objects) != 0 { - err := storeJobs(ctx, objects, g.dbHandle, g.log) - if err != nil { - return err - } - } - - g.log.Info("Dumps loader job is done") - return nil -} diff --git a/enterprise/replay/dumpsloader/procreplay.go b/enterprise/replay/dumpsloader/procreplay.go deleted file mode 100644 index dabd4c4d29..0000000000 --- a/enterprise/replay/dumpsloader/procreplay.go +++ /dev/null @@ -1,96 +0,0 @@ -package dumpsloader - -import ( - "context" - "fmt" - "strconv" - "strings" - - "golang.org/x/sync/errgroup" - - "github.com/google/uuid" - - "github.com/rudderlabs/rudder-go-kit/config" - "github.com/rudderlabs/rudder-go-kit/filemanager" - "github.com/rudderlabs/rudder-server/jobsdb" -) - -func (p *procErrorRequestHandler) Start() { - p.g = errgroup.Group{} - p.handleRecovery() - p.g.Go(func() error { - return p.fetchDumpsList(p.ctx) - }) -} - -func (p *procErrorRequestHandler) Stop() error { - p.cancel() - return p.g.Wait() -} - -func (p *procErrorRequestHandler) handleRecovery() { - // remove dangling executing - p.dbHandle.FailExecuting() -} - -func (p *procErrorRequestHandler) fetchDumpsList(ctx context.Context) error { - objects := make([]OrderedJobs, 0) - p.log.Info("Fetching proc err files list") - var err error - maxItems := config.GetInt64("MAX_ITEMS", 1000) // MAX_ITEMS is the max number of files to be fetched in one iteration from object storage - uploadMaxItems := config.GetInt("UPLOAD_MAX_ITEMS", 1) // UPLOAD_MAX_ITEMS is the max number of objects to be uploaded to postgres - - iter := filemanager.IterateFilesWithPrefix(ctx, - p.config.prefix, - p.config.startAfterKey, - maxItems, - p.uploader, - ) - for iter.Next() { - object := iter.Get() - if strings.Contains(object.Key, "rudder-proc-err-logs") { - if object.LastModified.Before(p.config.startTime) || (object.LastModified.Sub(p.config.endTime).Hours() > 1) { - p.log.Debugf("Skipping object: %v ObjectLastModifiedTime: %v", object.Key, object.LastModified) - continue - } - key := object.Key - tokens := strings.Split(key, "proc-err") - tokens = strings.Split(tokens[1], "/") - tokens = strings.Split(tokens[len(tokens)-1], ".") - tokens = strings.Split(tokens[2], "-") - var idx int - if idx, err = strconv.Atoi(tokens[0]); err != nil { - continue - } - - job := jobsdb.JobT{ - UUID: uuid.New(), - UserID: fmt.Sprintf(`random-%s`, uuid.New()), - Parameters: []byte(`{}`), - CustomVal: "replay", - EventPayload: []byte(fmt.Sprintf(`{"location": %q}`, object.Key)), - } - objects = append(objects, OrderedJobs{Job: &job, SortIndex: idx}) - } - if len(objects) >= uploadMaxItems { - err := storeJobs(ctx, objects, p.dbHandle, p.log) - if err != nil { - return err - } - objects = nil - } - - } - if iter.Err() != nil { - return fmt.Errorf("failed to iterate proc err files with error: %w", iter.Err()) - } - if len(objects) != 0 { - err := storeJobs(ctx, objects, p.dbHandle, p.log) - if err != nil { - return err - } - } - - p.log.Info("Dumps loader job is done") - return nil -} diff --git a/enterprise/replay/replayer/replay.go b/enterprise/replay/replayer/replay.go deleted file mode 100644 index abe4e2cd5a..0000000000 --- a/enterprise/replay/replayer/replay.go +++ /dev/null @@ -1,185 +0,0 @@ -package replayer - -import ( - "context" - "math/rand" - "sort" - "time" - - "golang.org/x/sync/errgroup" - - "github.com/rudderlabs/rudder-go-kit/stats" - "github.com/rudderlabs/rudder-server/enterprise/replay/dumpsloader" - "github.com/rudderlabs/rudder-server/enterprise/replay/utils" - - "github.com/rudderlabs/rudder-go-kit/config" - "github.com/rudderlabs/rudder-go-kit/filemanager" - "github.com/rudderlabs/rudder-go-kit/logger" - "github.com/rudderlabs/rudder-server/jobsdb" - "github.com/rudderlabs/rudder-server/processor/transformer" -) - -type Replay interface { - Start() - Stop() error -} - -type Replayer struct { - ctx context.Context - cancel context.CancelFunc - g errgroup.Group - log logger.Logger - db *jobsdb.Handle - toDB *jobsdb.Handle - dumpsLoader dumpsloader.DumpsLoader - uploader filemanager.FileManager - config replayConfig - workers []*SourceWorkerT - initSourceWorkersChannel chan bool -} - -type replayConfig struct { - bucket string - startTime time.Time - endTime time.Time - noOfWorkers int - dbReadSize int - tablePrefix string -} - -func (handle *Replayer) generatorLoop(ctx context.Context) error { - handle.log.Infof("generator reading from replay_jobs_* started") - for { - select { - case <-ctx.Done(): - handle.log.Infof("generator reading from replay_jobs_* stopped:Context cancelled") - return nil - case <-handle.initSourceWorkersChannel: - } - - queryParams := jobsdb.GetQueryParams{ - CustomValFilters: []string{"replay"}, - JobsLimit: handle.config.dbReadSize, - } - jobsResult, err := handle.db.GetJobs(ctx, []string{jobsdb.Unprocessed.State, jobsdb.Failed.State}, queryParams) - if err != nil { - handle.log.Errorf("Error getting to retry jobs: %v", err) - return err - } - combinedList := jobsResult.Jobs - - handle.log.Infof("length of combinedList : %d", len(combinedList)) - - sort.Slice(combinedList, func(i, j int) bool { - return combinedList[i].JobID < combinedList[j].JobID - }) - - // List of jobs which can be processed mapped per channel - type workerJobT struct { - worker *SourceWorkerT - job *jobsdb.JobT - } - - var statusList []*jobsdb.JobStatusT - var toProcess []workerJobT - - for _, job := range combinedList { - w := handle.workers[rand.Intn(handle.config.noOfWorkers)] - status := jobsdb.JobStatusT{ - JobID: job.JobID, - JobState: jobsdb.Executing.State, - ExecTime: time.Now(), - RetryTime: time.Now(), - ErrorCode: "", - ErrorResponse: []byte(`{}`), // check - Parameters: []byte(`{}`), // check - JobParameters: job.Parameters, - } - statusList = append(statusList, &status) - toProcess = append(toProcess, workerJobT{worker: w, job: job}) - } - - // Mark the jobs as executing - err = handle.db.UpdateJobStatus(ctx, statusList, []string{"replay"}, nil) - if err != nil { - return err - } - - // Send the jobs to the jobQ - for _, wrkJob := range toProcess { - wrkJob.worker.channel <- wrkJob.job - } - time.Sleep(5 * time.Second) - } -} - -func (handle *Replayer) initSourceWorkers(ctx context.Context) { - handle.workers = make([]*SourceWorkerT, handle.config.noOfWorkers) - for i := 0; i < handle.config.noOfWorkers; i++ { - worker := &SourceWorkerT{ - log: handle.log, - channel: make(chan *jobsdb.JobT, handle.config.dbReadSize), - workerID: i, - replayHandler: handle, - tablePrefix: handle.config.tablePrefix, - uploader: handle.uploader, - } - handle.workers[i] = worker - worker.transformer = transformer.NewTransformer(config.Default, handle.log, stats.Default) - handle.g.Go(func() error { - err := worker.workerProcess(ctx) - if err != nil { - return err - } - return nil - }) - } - close(handle.initSourceWorkersChannel) -} - -func Setup(ctx context.Context, config *config.Config, dumpsLoader dumpsloader.DumpsLoader, db, toDB *jobsdb.Handle, tablePrefix string, uploader filemanager.FileManager, bucket string, log logger.Logger) (Replay, error) { - ctx, cancel := context.WithCancel(ctx) - handle := &Replayer{ - ctx: ctx, - cancel: cancel, - g: errgroup.Group{}, - log: log, - db: db, - toDB: toDB, - dumpsLoader: dumpsLoader, - uploader: uploader, - config: replayConfig{ - bucket: bucket, - tablePrefix: tablePrefix, - noOfWorkers: config.GetInt("WORKERS_PER_SOURCE", 4), - dbReadSize: config.GetInt("DB_READ_SIZE", 10), - }, - } - handle.initSourceWorkersChannel = make(chan bool) - startTime, endTime, err := utils.GetStartAndEndTime(config) - if err != nil { - return nil, err - } - handle.config.startTime = startTime - handle.config.endTime = endTime - return handle, nil -} - -func (handle *Replayer) Start() { - handle.g.Go(func() error { - handle.initSourceWorkers(handle.ctx) - return nil - }) - handle.g.Go(func() error { - return handle.generatorLoop(handle.ctx) - }) -} - -func (handle *Replayer) Stop() error { - handle.cancel() - for _, worker := range handle.workers { - handle.log.Infof("Closing worker channels") - close(worker.channel) - } - return handle.g.Wait() -} diff --git a/enterprise/replay/replayer/sourceWorker.go b/enterprise/replay/replayer/sourceWorker.go deleted file mode 100644 index 67259f430c..0000000000 --- a/enterprise/replay/replayer/sourceWorker.go +++ /dev/null @@ -1,286 +0,0 @@ -package replayer - -import ( - "bufio" - "compress/gzip" - "context" - "encoding/json" - "fmt" - "os" - "path/filepath" - "strings" - "time" - - "github.com/rudderlabs/rudder-go-kit/logger" - "github.com/rudderlabs/rudder-server/utils/misc" - - "github.com/google/uuid" - "github.com/tidwall/gjson" - - "github.com/rudderlabs/rudder-go-kit/config" - "github.com/rudderlabs/rudder-go-kit/filemanager" - backendconfig "github.com/rudderlabs/rudder-server/backend-config" - "github.com/rudderlabs/rudder-server/jobsdb" - "github.com/rudderlabs/rudder-server/processor/transformer" -) - -type SourceWorkerT struct { - log logger.Logger - channel chan *jobsdb.JobT - workerID int - replayHandler *Replayer - tablePrefix string - transformer transformer.Transformer - uploader filemanager.FileManager -} - -func (worker *SourceWorkerT) workerProcess(ctx context.Context) error { - worker.log.Debugf("worker started %d", worker.workerID) - for job := range worker.channel { - worker.log.Debugf("job received: %s", job.EventPayload) - - err := worker.replayJobsInFile(ctx, gjson.GetBytes(job.EventPayload, "location").String()) - if err != nil { - worker.log.Errorf("failed to replay job with error: %w", err) - return err - } - - status := jobsdb.JobStatusT{ - JobID: job.JobID, - JobState: jobsdb.Succeeded.State, - ExecTime: time.Now(), - RetryTime: time.Now(), - ErrorCode: "", - ErrorResponse: []byte(`{}`), // check - Parameters: []byte(`{}`), // check - JobParameters: job.Parameters, - } - err = worker.replayHandler.db.UpdateJobStatus(ctx, []*jobsdb.JobStatusT{&status}, []string{"replay"}, nil) - if err != nil { - return err - } - } - return nil -} - -func (worker *SourceWorkerT) replayJobsInFile(ctx context.Context, filePath string) error { - filePathTokens := strings.Split(filePath, "/") - - var err error - dumpDownloadPathDirName := "/rudder-s3-dumps/" - tmpdirPath := strings.TrimSuffix(config.GetString("RUDDER_TMPDIR", "/tmp"), "/") - if tmpdirPath == "" { - tmpdirPath, err = os.UserHomeDir() - if err != nil { - return err - } - } - path := fmt.Sprintf( - `%v%v%v`, tmpdirPath, dumpDownloadPathDirName, filePathTokens[len(filePathTokens)-1]) - - err = os.MkdirAll(filepath.Dir(path), os.ModePerm) - if err != nil { - return err - } - - file, err := os.Create(path) - if err != nil { - return err // Cannot open file to write - } - - err = worker.uploader.Download(ctx, file, filePath) - if err != nil { - return err // failed to download - } - worker.log.Debugf("file downloaded at %s", path) - defer func() { _ = file.Close() }() - - rawf, err := os.Open(path) - if err != nil { - return err // failed to open file - } - - reader, err := gzip.NewReader(rawf) - if err != nil { - return err // failed to read gzip file - } - - sc := bufio.NewScanner(reader) - // default scanner buffer maxCapacity is 64K - // set it to higher value to avoid read stop on read size error - maxCapacity := 10240 * 1024 // 10MB - buf := make([]byte, maxCapacity) - sc.Buffer(buf, maxCapacity) - - defer func() { _ = rawf.Close() }() - - var jobs []*jobsdb.JobT - - var transEvents []transformer.TransformerEvent - transformationVersionID := config.GetString("TRANSFORMATION_VERSION_ID", "") - regexMatch := strings.Contains(filePath, "gw_jobs_") || strings.Contains(filePath, "rudder-proc-err-logs") - for sc.Scan() { - lineBytes := sc.Bytes() - copyLineBytes := make([]byte, len(lineBytes)) - copy(copyLineBytes, lineBytes) - if !regexMatch { - copyLineBytes, err = transformArchivalToBackup(copyLineBytes, filePath) - if err != nil { - worker.log.Errorf("failed to transform archival to backup: %s", err) - continue - } - } - if transformationVersionID == "" { - timeStamp := gjson.GetBytes(copyLineBytes, worker.getFieldIdentifier(createdAt)).String() - createdAt, err := time.Parse(misc.NOTIMEZONEFORMATPARSE, getFormattedTimeStamp(timeStamp)) - if err != nil { - worker.log.Errorf("failed to parse created at: %s", err) - continue - } - if !(worker.replayHandler.config.startTime.Before(createdAt) && worker.replayHandler.config.endTime.After(createdAt)) { - continue - } - job := jobsdb.JobT{ - UUID: uuid.New(), - UserID: gjson.GetBytes(copyLineBytes, worker.getFieldIdentifier(userID)).String(), - Parameters: []byte(gjson.GetBytes(copyLineBytes, worker.getFieldIdentifier(parameters)).String()), - CustomVal: gjson.GetBytes(copyLineBytes, worker.getFieldIdentifier(customVal)).String(), - EventPayload: []byte(gjson.GetBytes(copyLineBytes, worker.getFieldIdentifier(eventPayload)).String()), - WorkspaceId: gjson.GetBytes(copyLineBytes, worker.getFieldIdentifier(workspaceID)).String(), - } - jobs = append(jobs, &job) - continue - } - - message, ok := gjson.ParseBytes(copyLineBytes).Value().(map[string]interface{}) - if !ok { - worker.log.Errorf("EventPayload not a json: %v", copyLineBytes) - continue - } - - messageID := uuid.New().String() - - metadata := transformer.Metadata{ - MessageID: messageID, - DestinationID: gjson.GetBytes(copyLineBytes, "parameters.destination_id").String(), - } - - transformation := backendconfig.TransformationT{VersionID: config.GetString("TRANSFORMATION_VERSION_ID", "")} - - transEvent := transformer.TransformerEvent{ - Message: message, - Metadata: metadata, - Destination: backendconfig.DestinationT{Transformations: []backendconfig.TransformationT{transformation}}, - } - - transEvents = append(transEvents, transEvent) - } - - userTransformBatchSize := config.GetReloadableIntVar(200, 1, "Processor.userTransformBatchSize") - if transformationVersionID != "" { - response := worker.transformer.UserTransform(context.TODO(), transEvents, userTransformBatchSize.Load()) - - for _, ev := range response.Events { - destEventJSON, err := json.Marshal(ev.Output[worker.getFieldIdentifier(eventPayload)]) - if err != nil { - worker.log.Errorf("Error unmarshalling transformer output: %v", err) - continue - } - createdAtString, ok := ev.Output[worker.getFieldIdentifier(createdAt)].(string) - if !ok { - worker.log.Errorf("Error getting created at from transformer output: %v", err) - continue - } - createdAt, err := time.Parse(misc.NOTIMEZONEFORMATPARSE, getFormattedTimeStamp(createdAtString)) - if err != nil { - worker.log.Errorf("failed to parse created at: %s", err) - continue - } - if !(worker.replayHandler.config.startTime.Before(createdAt) && worker.replayHandler.config.endTime.After(createdAt)) { - continue - } - params, err := json.Marshal(ev.Output[worker.getFieldIdentifier(parameters)]) - if err != nil { - worker.log.Errorf("Error unmarshalling transformer output: %v", err) - continue - } - job := jobsdb.JobT{ - UUID: uuid.New(), - UserID: ev.Output[worker.getFieldIdentifier(userID)].(string), - Parameters: params, - CustomVal: ev.Output[worker.getFieldIdentifier(customVal)].(string), - EventPayload: destEventJSON, - WorkspaceId: ev.Output[worker.getFieldIdentifier(workspaceID)].(string), - } - jobs = append(jobs, &job) - } - - for _, failedEv := range response.FailedEvents { - worker.log.Errorf(`Event failed in transformer with err: %v`, failedEv.Error) - } - - } - worker.log.Infof("brt-debug: TO_DB=%s", worker.replayHandler.toDB.Identifier()) - - err = worker.replayHandler.toDB.Store(ctx, jobs) - if err != nil { - return err - } - - err = os.Remove(path) - if err != nil { - worker.log.Errorf("[%s]: failed to remove file with error: %w", err) - return err - } - return nil -} - -const ( - userID = "userID" - parameters = "parameters" - customVal = "customVal" - eventPayload = "eventPayload" - workspaceID = "workspaceID" - createdAt = "createdAt" -) - -func (worker *SourceWorkerT) getFieldIdentifier(field string) string { - if worker.tablePrefix == "gw" { - switch field { - case userID: - return "user_id" - case parameters: - return "parameters" - case customVal: - return "custom_val" - case eventPayload: - return "event_payload" - case workspaceID: - return "workspace_id" - case createdAt: - return "created_at" - default: - return "" - } - } - switch field { - case userID: - return "UserID" - case parameters: - return "Parameters" - case customVal: - return "CustomVal" - case eventPayload: - return "EventPayload" - case workspaceID: - return "WorkspaceID" - case createdAt: - return "CreatedAt" - default: - return "" - } -} - -func getFormattedTimeStamp(timeStamp string) string { - return strings.Split(strings.TrimSuffix(timeStamp, "Z"), ".")[0] -} diff --git a/enterprise/replay/replayer/transformer.go b/enterprise/replay/replayer/transformer.go deleted file mode 100644 index 1dc987064f..0000000000 --- a/enterprise/replay/replayer/transformer.go +++ /dev/null @@ -1,67 +0,0 @@ -package replayer - -import ( - "encoding/json" - "strings" - "time" - - "github.com/google/uuid" -) - -type Payload struct { - Batch json.RawMessage `json:"batch"` - ReceivedAt string `json:"receivedAt"` - RequestIP string `json:"requestIP"` - WriteKey string `json:"writeKey"` -} -type Params struct { - SourceID string `json:"source_id"` -} - -func transformArchivalToBackup(input []byte, path string) ([]byte, error) { - var originalPayload struct { - CreatedAt time.Time `json:"createdAt"` - MessageID string `json:"messageId"` - Payload json.RawMessage `json:"payload"` - UserID string `json:"userId"` - } - var sourceId string - err := json.Unmarshal(input, &originalPayload) - if err != nil { - return nil, err - } - if len(strings.Split(path, "/")) == 7 { - sourceId = strings.Split(path, "/")[1] - } else { - sourceId = strings.Split(path, "/")[0] - } - desiredPayload := struct { - CreatedAt time.Time `json:"created_at"` - CustomVal string `json:"custom_val"` - EventCount int `json:"event_count"` - EventPayload Payload `json:"event_payload"` - ExpireAt string `json:"expire_at"` - JobID int `json:"job_id"` - Parameters Params `json:"parameters"` - UserID string `json:"user_id"` - UUID string `json:"uuid"` - WorkspaceID string `json:"workspace_id"` - }{ - CreatedAt: originalPayload.CreatedAt, - CustomVal: "GW", - EventPayload: Payload{ - Batch: originalPayload.Payload, - }, - UUID: uuid.NewString(), - Parameters: Params{ - SourceID: sourceId, - }, - EventCount: 1, - UserID: originalPayload.UserID, - } - desiredJSON, err := json.Marshal(desiredPayload) - if err != nil { - return nil, err - } - return desiredJSON, nil -} diff --git a/enterprise/replay/setup.go b/enterprise/replay/setup.go deleted file mode 100644 index 06a78a08c5..0000000000 --- a/enterprise/replay/setup.go +++ /dev/null @@ -1,120 +0,0 @@ -package replay - -import ( - "context" - "errors" - "strings" - - "github.com/rudderlabs/rudder-go-kit/config" - "github.com/rudderlabs/rudder-go-kit/filemanager" - "github.com/rudderlabs/rudder-go-kit/logger" - "github.com/rudderlabs/rudder-server/enterprise/replay/dumpsloader" - "github.com/rudderlabs/rudder-server/enterprise/replay/replayer" - "github.com/rudderlabs/rudder-server/jobsdb" - "github.com/rudderlabs/rudder-server/utils/filemanagerutil" - "github.com/rudderlabs/rudder-server/utils/types" -) - -func initFileManager(ctx context.Context, config *config.Config, log logger.Logger) (filemanager.FileManager, string, error) { - bucket := strings.TrimSpace(config.GetString("JOBS_REPLAY_BACKUP_BUCKET", "")) - if bucket == "" { - log.Error("[[ Replay ]] JOBS_REPLAY_BACKUP_BUCKET is not set") - return nil, "", errors.New("JOBS_REPLAY_BACKUP_BUCKET is not set") - } - - provider := config.GetString("JOBS_BACKUP_STORAGE_PROVIDER", "S3") - uploader, err := filemanager.New(&filemanager.Settings{ - Provider: provider, - Config: filemanager.GetProviderConfigFromEnv(filemanagerutil.ProviderConfigOpts(ctx, provider, config)), - Conf: config, - }) - if err != nil { - log.Errorf("[[ Replay ]] Error creating file manager: %s", err.Error()) - return nil, "", err - } - - return uploader, bucket, nil -} - -type Factory struct { - EnterpriseToken string - Log logger.Logger -} -type replay struct { - toDB *jobsdb.Handle - dumpsLoader dumpsloader.DumpsLoader - replayer replayer.Replay -} -type Replay interface { - Start() error - Stop() error -} - -// Setup initializes Replay feature -func (m *Factory) Setup(ctx context.Context, config *config.Config, replayDB, gwDB, routerDB, batchRouterDB *jobsdb.Handle) (Replay, error) { - if m.Log == nil { - m.Log = logger.NewLogger().Child("enterprise").Child("replay") - } - if m.EnterpriseToken == "" { - return nil, errors.New("enterprise token is not set") - } - if !config.GetBool("Replay.enabled", types.DefaultReplayEnabled) { - return nil, errors.New("replay is not enabled") - } - m.Log.Info("[[ Replay ]] Setting up Replay") - tablePrefix := config.GetString("TO_REPLAY", "gw") - replayToDB := config.GetString("REPLAY_TO_DB", "gw") - m.Log.Infof("TO_REPLAY=%s and REPLAY_TO_DB=%s", tablePrefix, replayToDB) - uploader, bucket, err := initFileManager(ctx, config, m.Log) - if err != nil { - return nil, err - } - - dumpsLoader, err := dumpsloader.Setup(ctx, config, replayDB, tablePrefix, uploader, bucket, m.Log) - if err != nil { - return nil, err - } - var toDB *jobsdb.Handle - switch replayToDB { - case "gw": - toDB = gwDB - case "rt": - toDB = routerDB - case "brt": - toDB = batchRouterDB - default: - toDB = routerDB - } - setup, err := replayer.Setup(ctx, config, dumpsLoader, replayDB, toDB, tablePrefix, uploader, bucket, m.Log) - if err != nil { - return nil, err - } - return &replay{ - toDB: toDB, - dumpsLoader: dumpsLoader, - replayer: setup, - }, nil -} - -func (r *replay) Start() error { - err := r.toDB.Start() - if err != nil { - return err - } - r.dumpsLoader.Start() - r.replayer.Start() - return nil -} - -func (r *replay) Stop() error { - err := r.replayer.Stop() - if err != nil { - return err - } - err = r.dumpsLoader.Stop() - if err != nil { - return err - } - r.toDB.Stop() - return nil -} diff --git a/enterprise/replay/utils/utils.go b/enterprise/replay/utils/utils.go deleted file mode 100644 index 4c257a01cd..0000000000 --- a/enterprise/replay/utils/utils.go +++ /dev/null @@ -1,58 +0,0 @@ -package utils - -import ( - "fmt" - "strconv" - "strings" - "time" - - "github.com/rudderlabs/rudder-go-kit/config" - "github.com/rudderlabs/rudder-server/utils/misc" -) - -func GetStartAndEndTime(config *config.Config) (time.Time, time.Time, error) { - var startTime, endTime time.Time - parse, err := time.Parse(misc.RFC3339Milli, strings.TrimSpace(config.GetString("START_TIME", "2000-10-02T15:04:05.000Z"))) - if err != nil { - return time.Time{}, time.Time{}, err - } - startTime = parse - endTimeStr := strings.TrimSpace(config.GetString("END_TIME", "")) - if endTimeStr == "" { - endTime = time.Now() - } else { - endTime, err = time.Parse(misc.RFC3339Milli, endTimeStr) - if err != nil { - return time.Time{}, time.Time{}, fmt.Errorf("invalid END_TIME. Err: %w", err) - } - } - return startTime, endTime, nil -} - -func GetMinMaxCreatedAt(key string) (int64, int64, error) { - var err error - var minJobCreatedAt, maxJobCreatedAt int64 - keyTokens := strings.Split(key, "_") - if len(keyTokens) != 3 { - return minJobCreatedAt, maxJobCreatedAt, fmt.Errorf("%s 's parse with _ gave tokens more than 3. Expected 3", key) - } - keyTokens = strings.Split(keyTokens[2], ".") - if len(keyTokens) > 7 { - return minJobCreatedAt, maxJobCreatedAt, fmt.Errorf("%s 's parse with . gave tokens more than 7. Expected 6 or 7", keyTokens[2]) - } - - if len(keyTokens) < 6 { // for backward compatibility TODO: remove this check after some time - return minJobCreatedAt, maxJobCreatedAt, fmt.Errorf("%s 's parse with . gave tokens less than 6. Expected 6 or 7", keyTokens[2]) - } - minJobCreatedAt, err = strconv.ParseInt(keyTokens[3], 10, 64) - if err != nil { - return minJobCreatedAt, maxJobCreatedAt, fmt.Errorf("ParseInt of %s failed with err: %w", keyTokens[3], err) - } - - maxJobCreatedAt, err = strconv.ParseInt(keyTokens[4], 10, 64) - if err != nil { - return minJobCreatedAt, maxJobCreatedAt, fmt.Errorf("ParseInt of %s failed with err: %w", keyTokens[4], err) - } - - return minJobCreatedAt, maxJobCreatedAt, nil -} From d0671a650704ff536fc4c1ab6ddcd3b04b873784 Mon Sep 17 00:00:00 2001 From: devops-github-rudderstack <88187154+devops-github-rudderstack@users.noreply.github.com> Date: Tue, 15 Oct 2024 11:37:35 +0530 Subject: [PATCH 3/3] chore: release 1.36.0 (#5196) --- CHANGELOG.md | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index bf422e72a8..6a809d5438 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,41 @@ # Changelog +## [1.36.0](https://github.com/rudderlabs/rudder-server/compare/v1.35.1...v1.36.0) (2024-10-14) + + +### Features + +* introduce merge window for snowflake ingestion ([#5160](https://github.com/rudderlabs/rudder-server/issues/5160)) ([0e44f18](https://github.com/rudderlabs/rudder-server/commit/0e44f186675fd57c954a23b81c8c00c0b28410f2)) + + +### Bug Fixes + +* bigquery validations for partition column and type ([#5168](https://github.com/rudderlabs/rudder-server/issues/5168)) ([72443b2](https://github.com/rudderlabs/rudder-server/commit/72443b2e3d8a69f194a571ff3c0f83d9fdb55b48)) +* change retl check to source category ([#5167](https://github.com/rudderlabs/rudder-server/issues/5167)) ([72443b2](https://github.com/rudderlabs/rudder-server/commit/72443b2e3d8a69f194a571ff3c0f83d9fdb55b48)) +* clickhouse temporary files deletion happening twice ([#5182](https://github.com/rudderlabs/rudder-server/issues/5182)) ([9f52106](https://github.com/rudderlabs/rudder-server/commit/9f52106dc180649e2169d0529d1eefeded602ace)) +* set hosted secret as auth instead of workspace token ([#5181](https://github.com/rudderlabs/rudder-server/issues/5181)) ([519f3c6](https://github.com/rudderlabs/rudder-server/commit/519f3c641597404f236acc37cdbe9d1f715c5e3c)) +* sourceID and originalSourceID not flipped before transformation ([#5177](https://github.com/rudderlabs/rudder-server/issues/5177)) ([72443b2](https://github.com/rudderlabs/rudder-server/commit/72443b2e3d8a69f194a571ff3c0f83d9fdb55b48)) + + +### Miscellaneous + +* add metrics and logs for source webhooks ([#5078](https://github.com/rudderlabs/rudder-server/issues/5078)) ([e7cccae](https://github.com/rudderlabs/rudder-server/commit/e7cccae19377281d41eb50f8635857032081dbf8)) +* add stats to dedup module ([#5190](https://github.com/rudderlabs/rudder-server/issues/5190)) ([f305282](https://github.com/rudderlabs/rudder-server/commit/f305282f98ded5b2edb1c000af8ff2c69e94405d)) +* cleanup old jobs(beyond maxAge) at startup ([#5188](https://github.com/rudderlabs/rudder-server/issues/5188)) ([3dff5e6](https://github.com/rudderlabs/rudder-server/commit/3dff5e649ac8ab5c5a3f991e54b0cbf5401cb8ad)) +* cleanup warehouse ([#5150](https://github.com/rudderlabs/rudder-server/issues/5150)) ([7818610](https://github.com/rudderlabs/rudder-server/commit/78186105fe2422de9e14b8a8562b690c55351868)) +* **deps:** bump cloud.google.com/go/bigquery from 1.63.0 to 1.63.1 in the frequent group ([#5161](https://github.com/rudderlabs/rudder-server/issues/5161)) ([3d4ebf8](https://github.com/rudderlabs/rudder-server/commit/3d4ebf82588a87ea9cc933bf37b8bce0b0331b2c)) +* **deps:** bump cloud.google.com/go/storage from 1.43.0 to 1.44.0 in the frequent group ([#5173](https://github.com/rudderlabs/rudder-server/issues/5173)) ([a136618](https://github.com/rudderlabs/rudder-server/commit/a136618545a2f89449ea2f95a63b6cbd24650a1d)) +* **deps:** bump github.com/snowflakedb/gosnowflake from 1.11.1 to 1.11.2 in the go-deps group ([#5174](https://github.com/rudderlabs/rudder-server/issues/5174)) ([bf45d23](https://github.com/rudderlabs/rudder-server/commit/bf45d23f18e1c0d2dae8f944006d5644210dbf4b)) +* **deps:** bump rudderlabs/pr-description-enforcer from 1.0.0 to 1.1.0 ([#5162](https://github.com/rudderlabs/rudder-server/issues/5162)) ([a52a071](https://github.com/rudderlabs/rudder-server/commit/a52a071f5ce5da00a2ed8ce574d9633664671a02)) +* **deps:** bump the go-deps group across 1 directory with 8 updates ([#5189](https://github.com/rudderlabs/rudder-server/issues/5189)) ([0561ee2](https://github.com/rudderlabs/rudder-server/commit/0561ee26cb6c2eee8268b5e950fcf77ceb14be36)) +* **deps:** bump the go-deps group across 1 directory with 9 updates ([#5158](https://github.com/rudderlabs/rudder-server/issues/5158)) ([6171604](https://github.com/rudderlabs/rudder-server/commit/6171604437e08869078876769e5bd8d2c8b8a98d)) +* fix jobsdb flaky test ([#5197](https://github.com/rudderlabs/rudder-server/issues/5197)) ([192025a](https://github.com/rudderlabs/rudder-server/commit/192025a2174eb360c4524df93857ca9e988b0848)) +* fix stuck runUploadJobAllocator ([#5191](https://github.com/rudderlabs/rudder-server/issues/5191)) ([ae9d984](https://github.com/rudderlabs/rudder-server/commit/ae9d984046988069adf2b6743bdd0c376742d1e5)) +* one (*sql.DB) pool for all jobsdb ([#5170](https://github.com/rudderlabs/rudder-server/issues/5170)) ([261aa60](https://github.com/rudderlabs/rudder-server/commit/261aa60e1119ea26c6b7ced7dbf87c64fda9f5e8)) +* sync release v1.35.2 to main branch ([#5172](https://github.com/rudderlabs/rudder-server/issues/5172)) ([2fa0fa0](https://github.com/rudderlabs/rudder-server/commit/2fa0fa07b13fd22a46fa63088bcc2cd34e2a040c)) +* trim eventNames sent to reporting if length exceeds 50 characters ([#5171](https://github.com/rudderlabs/rudder-server/issues/5171)) ([87283c4](https://github.com/rudderlabs/rudder-server/commit/87283c4a8de9f96eb6cc9e1232a6711845e6cf1e)) +* update event_delivery_time histogram buckets ([#5186](https://github.com/rudderlabs/rudder-server/issues/5186)) ([68b1ad5](https://github.com/rudderlabs/rudder-server/commit/68b1ad55491d7fe05aaea1a1360c2d3e354e9a05)) + ## [1.35.3](https://github.com/rudderlabs/rudder-server/compare/v1.35.2...v1.35.3) (2024-10-08)