Skip to content

Commit

Permalink
Merge #137932
Browse files Browse the repository at this point in the history
137932: backup: move file sst sink to separate package r=msbutler a=kev-cao

This patch moves `file_sst_sink` into its own package and exports a sink writer interface to facilitate writing to the sink. Doing so grants the flexibility to provide multiple methods of writing to the sink.

Epic: none

Release note: None

Co-authored-by: Kevin Cao <[email protected]>
  • Loading branch information
craig[bot] and kev-cao committed Dec 23, 2024
2 parents 8d61a42 + f3810b3 commit d58f071
Show file tree
Hide file tree
Showing 9 changed files with 418 additions and 339 deletions.
3 changes: 3 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ ALL_TESTS = [
"//pkg/backup/backuppb:backuppb_test",
"//pkg/backup/backuprand:backuprand_test",
"//pkg/backup/backupresolver:backupresolver_test",
"//pkg/backup/backupsink:backupsink_test",
"//pkg/backup/backuputils:backuputils_test",
"//pkg/backup:backup_test",
"//pkg/base:base_test",
Expand Down Expand Up @@ -812,6 +813,8 @@ GO_TARGETS = [
"//pkg/backup/backuprand:backuprand_test",
"//pkg/backup/backupresolver:backupresolver",
"//pkg/backup/backupresolver:backupresolver_test",
"//pkg/backup/backupsink:backupsink",
"//pkg/backup/backupsink:backupsink_test",
"//pkg/backup/backuptestutils:backuptestutils",
"//pkg/backup/backuputils:backuputils",
"//pkg/backup/backuputils:backuputils_test",
Expand Down
5 changes: 1 addition & 4 deletions pkg/backup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ go_library(
"backup_span_coverage.go",
"backup_telemetry.go",
"create_scheduled_backup.go",
"file_sst_sink.go",
"generative_split_and_scatter_processor.go",
"key_rewriter.go",
"restoration_data.go",
Expand Down Expand Up @@ -44,6 +43,7 @@ go_library(
"//pkg/backup/backupinfo",
"//pkg/backup/backuppb",
"//pkg/backup/backupresolver",
"//pkg/backup/backupsink",
"//pkg/backup/backuputils",
"//pkg/base",
"//pkg/build",
Expand Down Expand Up @@ -113,7 +113,6 @@ go_library(
"//pkg/sql/rowenc",
"//pkg/sql/rowexec",
"//pkg/sql/schemachanger/scbackup",
"//pkg/sql/sem/builtins",
"//pkg/sql/sem/catconstants",
"//pkg/sql/sem/catid",
"//pkg/sql/sem/eval",
Expand Down Expand Up @@ -158,7 +157,6 @@ go_library(
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//types",
"@com_github_kr_pretty//:pretty",
"@com_github_robfig_cron_v3//:cron",
"@org_golang_x_exp//maps",
],
Expand All @@ -180,7 +178,6 @@ go_test(
"create_scheduled_backup_test.go",
"data_driven_generated_test.go", # keep
"datadriven_test.go",
"file_sst_sink_test.go",
"full_cluster_backup_restore_test.go",
"generative_split_and_scatter_processor_test.go",
"key_rewriter_test.go",
Expand Down
54 changes: 20 additions & 34 deletions pkg/backup/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/backup/backuppb"
"github.com/cockroachdb/cockroach/pkg/backup/backupsink"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand Down Expand Up @@ -68,12 +69,6 @@ var (
time.Minute*5,
settings.NonNegativeDuration,
settings.WithPublic)
targetFileSize = settings.RegisterByteSizeSetting(
settings.ApplicationLevel,
"bulkio.backup.file_size",
"target size for individual data files produced during BACKUP",
128<<20,
settings.WithPublic)

preSplitExports = settings.RegisterBoolSetting(
settings.ApplicationLevel,
Expand Down Expand Up @@ -301,14 +296,6 @@ type spanAndTime struct {
finishesSpec bool
}

type exportedSpan struct {
metadata backuppb.BackupManifest_File
dataSST []byte
revStart hlc.Timestamp
completedSpans int32
resumeKey roachpb.Key
}

func runBackupProcessor(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
Expand Down Expand Up @@ -403,11 +390,12 @@ func runBackupProcessor(
return err
}

sinkConf := sstSinkConf{
id: flowCtx.NodeID.SQLInstanceID(),
enc: spec.Encryption,
progCh: progCh,
settings: &flowCtx.Cfg.Settings.SV,
sinkConf := backupsink.SSTSinkConf{
ID: flowCtx.NodeID.SQLInstanceID(),
Enc: spec.Encryption,
ProgCh: progCh,
Settings: &flowCtx.Cfg.Settings.SV,
ElideMode: spec.ElidePrefix,
}
storage, err := flowCtx.Cfg.ExternalStorage(ctx, dest, cloud.WithClientName("backup"))
if err != nil {
Expand Down Expand Up @@ -473,16 +461,14 @@ func runBackupProcessor(
// It is safe to close a nil pacer.
defer pacer.Close()

sink := makeFileSSTSink(sinkConf, storage, pacer)
sink := backupsink.MakeFileSSTSink(sinkConf, storage, pacer)
defer func() {
if err := sink.flush(ctx); err != nil {
if err := sink.Flush(ctx); err != nil {
log.Warningf(ctx, "failed to flush SST sink: %s", err)
}
logClose(ctx, sink, "SST sink")
}()

sink.elideMode = spec.ElidePrefix

// priority becomes true when we're sending re-attempts of reads far enough
// in the past that we want to run them with priority.
var priority bool
Expand Down Expand Up @@ -671,40 +657,40 @@ func runBackupProcessor(
// Even if the ExportRequest did not export any data we want to report
// the span as completed for accurate progress tracking.
if len(resp.Files) == 0 {
sink.writeWithNoData(exportedSpan{completedSpans: completedSpans})
sink.WriteWithNoData(backupsink.ExportedSpan{CompletedSpans: completedSpans})
}
for i, file := range resp.Files {
entryCounts := countRows(file.Exported, spec.PKIDs)

ret := exportedSpan{
ret := backupsink.ExportedSpan{
// BackupManifest_File just happens to contain the exact fields
// to store the metadata we need, but there's no actual File
// on-disk anywhere yet.
metadata: backuppb.BackupManifest_File{
Metadata: backuppb.BackupManifest_File{
Span: file.Span,
EntryCounts: entryCounts,
LocalityKV: destLocalityKV,
ApproximatePhysicalSize: uint64(len(file.SST)),
},
dataSST: file.SST,
revStart: resp.StartTime,
DataSST: file.SST,
RevStart: resp.StartTime,
}
if resp.ResumeSpan != nil {
ret.resumeKey = resumeSpan.span.Key
ret.ResumeKey = resumeSpan.span.Key
}
if span.start != spec.BackupStartTime {
ret.metadata.StartTime = span.start
ret.metadata.EndTime = span.end
ret.Metadata.StartTime = span.start
ret.Metadata.EndTime = span.end
}
// If multiple files were returned for this span, only one -- the
// last -- should count as completing the requested span.
if i == len(resp.Files)-1 {
ret.completedSpans = completedSpans
ret.CompletedSpans = completedSpans
}

// Cannot set the error to err, which is shared across workers.
var writeErr error
resumeSpan.span.Key, writeErr = sink.write(ctx, ret)
resumeSpan.span.Key, writeErr = sink.Write(ctx, ret)
if writeErr != nil {
return err
}
Expand All @@ -719,7 +705,7 @@ func runBackupProcessor(
// still be running and may still push new work (a retry) on to todo but
// that is OK, since that also means it is still running and thus can
// pick up that work on its next iteration.
return sink.flush(ctx)
return sink.Flush(ctx)
}
}
})
Expand Down
55 changes: 55 additions & 0 deletions pkg/backup/backupsink/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "backupsink",
srcs = [
"file_sst_sink.go",
"sink_utils.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/backup/backupsink",
visibility = ["//visibility:public"],
deps = [
"//pkg/backup/backuppb",
"//pkg/base",
"//pkg/ccl/storageccl",
"//pkg/cloud",
"//pkg/keys",
"//pkg/kv/kvpb",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/sql/execinfrapb",
"//pkg/sql/sem/builtins",
"//pkg/storage",
"//pkg/util/admission",
"//pkg/util/hlc",
"//pkg/util/log",
"@com_github_cockroachdb_errors//:errors",
"@com_github_gogo_protobuf//types",
"@com_github_kr_pretty//:pretty",
],
)

go_test(
name = "backupsink_test",
srcs = ["file_sst_sink_test.go"],
embed = [":backupsink"],
deps = [
"//pkg/backup/backuppb",
"//pkg/ccl/storageccl",
"//pkg/cloud",
"//pkg/cloud/cloudpb",
"//pkg/cloud/nodelocal",
"//pkg/keys",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/sql/execinfrapb",
"//pkg/storage",
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"@com_github_cockroachdb_errors//:errors",
"@com_github_gogo_protobuf//types",
"@com_github_stretchr_testify//require",
],
)
Loading

0 comments on commit d58f071

Please sign in to comment.