Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
140379: changefeedccl: restore fine grained checkpoint from job proto r=aerfrei,asg0451 a=wenyihu6

As part of the fine-grained checkpointing project, changefeed now saves resolved
spans for each span above the high-water mark. This patch finalizes the
restoration process for fine-grained checkpoints. Upon resuming, changefeed
reloads span level checkpoints from the job proto and restores progress during
startup.
Restoration is required in three key areas:
1. When aggregators are initialized, including the initialization of each
aggregator frontier.
2. When `rangeFeedResumeFrontier` is initialized during every kvfeed.run.
3. When kvfeed determines which checkpointed spans to filter out from those
requiring backfill.

Note: This remains unused for now until issue #137692 is resolved.
NB: `reconcileJobStateWithLocalState` is intentionally left unchanged. We
decided to update it as part of issue #137692, as it depends on
`cachedState.SetCheckpoint`.

Resolves: #137693
Release Note: None

140619: schemachanger/scbuild: port over some partitioning helpers r=annrpom a=annrpom

Some precursor work to remove RBR fallbacks for `ADD COLUMN`.

Epic: CRDB-31282
Informs: #80545

Release note: None

140675: ci/roachtest: fix typo in roachtest nightly wrapper r=darrylwong a=srosenberg

Previous PR [1] had a typo which caused
RC runs to fail due to using an
invalid CLI arg.

[1] #137653

Epic: none

Release note: None
Release Justification: ci-only change

Co-authored-by: Wenyi Hu <[email protected]>
Co-authored-by: Annie Pompa <[email protected]>
Co-authored-by: Stan Rosenberg <[email protected]>
  • Loading branch information
4 people committed Feb 7, 2025
4 parents 38581e1 + 965fecf + 8d33e46 + 60a16bc commit cf3cf8f
Show file tree
Hide file tree
Showing 17 changed files with 408 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ elif [[ "${TC_BUILD_BRANCH}" =~ ${release_branch_regex}\.[0-9]{1,2}-rc$ ]]; then
# NOTE: in the future, instead of choosing the tests randomly as we
# do here, we plan to utilize a smarter test selection strategy (see
# #119630).
select_probability="--select_probability=0.4"
select_probability="--select-probability=0.4"
elif [[ "${TC_BUILD_BRANCH}" =~ ^release- && "${ROACHTEST_FORCE_RUN_INVALID_RELEASE_BRANCH}" != "true" ]]; then
# The only valid release branches are the ones handled above. That
# said, from time to time we might have cases where a branch with
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/alter_changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,8 @@ func generateNewProgress(
//lint:ignore SA1019 deprecated usage
Checkpoint: &jobspb.ChangefeedProgress_Checkpoint{
Spans: existingTargetSpans,
// TODO(#140509): ALTER CHANGEFED should handle fine grained
// progress and checkpointed timestamp properly.
},
ProtectedTimestampRecord: ptsRecord,
},
Expand Down
24 changes: 15 additions & 9 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,12 @@ func startDistChangefeed(
if progress := localState.progress.GetChangefeed(); progress != nil && progress.Checkpoint != nil {
checkpoint = progress.Checkpoint
}
var spanLevelCheckpoint *jobspb.TimestampSpansMap
if progress := localState.progress.GetChangefeed(); progress != nil && progress.SpanLevelCheckpoint != nil {
spanLevelCheckpoint = progress.SpanLevelCheckpoint
}
p, planCtx, err := makePlan(execCtx, jobID, details, description, initialHighWater,
trackedSpans, checkpoint, localState.drainingNodes)(ctx, dsp)
trackedSpans, checkpoint, spanLevelCheckpoint, localState.drainingNodes)(ctx, dsp)
if err != nil {
return err
}
Expand Down Expand Up @@ -382,6 +386,7 @@ func makePlan(
trackedSpans []roachpb.Span,
//lint:ignore SA1019 deprecated usage
checkpoint *jobspb.ChangefeedProgress_Checkpoint,
spanLevelCheckpoint *jobspb.TimestampSpansMap,
drainingNodes []roachpb.NodeID,
) func(context.Context, *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {
return func(ctx context.Context, dsp *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {
Expand Down Expand Up @@ -501,14 +506,15 @@ func makePlan(
}

aggregatorSpecs[i] = &execinfrapb.ChangeAggregatorSpec{
InitialHighWater: initialHighWaterPtr,
Watches: watches,
Checkpoint: aggregatorCheckpoint,
Feed: details,
UserProto: execCtx.User().EncodeProto(),
JobID: jobID,
Select: execinfrapb.Expression{Expr: details.Select},
Description: description,
Watches: watches,
Checkpoint: aggregatorCheckpoint,
InitialHighWater: initialHighWaterPtr,
SpanLevelCheckpoint: spanLevelCheckpoint,
Feed: details,
UserProto: execCtx.User().EncodeProto(),
JobID: jobID,
Select: execinfrapb.Expression{Expr: details.Select},
Description: description,
}
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcutils"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/checkpoint"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvfeed"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/resolvedspan"
Expand Down Expand Up @@ -520,6 +521,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
Spans: spans,
CheckpointSpans: ca.spec.Checkpoint.Spans,
CheckpointTimestamp: ca.spec.Checkpoint.Timestamp,
SpanLevelCheckpoint: ca.spec.SpanLevelCheckpoint,
Targets: AllTargets(ca.spec.Feed),
Metrics: &ca.metrics.KVFeedMetrics,
MM: memMon,
Expand Down Expand Up @@ -617,10 +619,8 @@ func (ca *changeAggregator) setupSpansAndFrontier() (spans []roachpb.Span, err e
}
// Checkpointed spans are spans that were above the highwater mark, and we
// must preserve that information in the frontier for future checkpointing.
for _, checkpointedSpan := range ca.spec.Checkpoint.Spans {
if _, err := ca.frontier.Forward(checkpointedSpan, checkpointedSpanTs); err != nil {
return nil, err
}
if err := checkpoint.Restore(ca.frontier, ca.spec.Checkpoint.Spans, checkpointedSpanTs, ca.spec.SpanLevelCheckpoint); err != nil {
return nil, err
}
return spans, nil
}
Expand Down Expand Up @@ -1777,6 +1777,7 @@ func (cf *changeFrontier) checkpointJobProgress(
}

cf.localState.SetHighwater(frontier)
// TODO(#137692): SetCheckpoint should take in old and new checkpoints proto.
cf.localState.SetCheckpoint(checkpoint)

return true, nil
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func TestSetupSpansAndFrontier(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

statementTime := hlc.Timestamp{WallTime: 10}
for _, tc := range []struct {
name string
expectedFrontier hlc.Timestamp
Expand Down Expand Up @@ -125,6 +126,7 @@ func TestSetupSpansAndFrontier(t *testing.T) {
Watches: tc.watches,
},
}
ca.spec.Feed.StatementTime = statementTime
_, err := ca.setupSpansAndFrontier()
require.NoError(t, err)
require.Equal(t, tc.expectedFrontier, ca.frontier.Frontier())
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1475,6 +1475,7 @@ func reloadDest(ctx context.Context, id jobspb.JobID, execCfg *sql.ExecutorConfi

// reconcileJobStateWithLocalState ensures that the job progress information
// is consistent with the state present in the local state.
// TODO(#137692): SetCheckpoint should take in old and new checkpoints proto.
func reconcileJobStateWithLocalState(
ctx context.Context, jobID jobspb.JobID, localState *cachedState, execCfg *sql.ExecutorConfig,
) error {
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/kvfeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/ccl/changefeedccl/checkpoint",
"//pkg/ccl/changefeedccl/kvevent",
"//pkg/ccl/changefeedccl/schemafeed",
"//pkg/ccl/changefeedccl/timers",
Expand Down Expand Up @@ -74,6 +75,7 @@ go_test(
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/rowenc/keyside",
"//pkg/sql/sem/tree",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
Expand Down
46 changes: 39 additions & 7 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/checkpoint"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/timers"
Expand Down Expand Up @@ -60,6 +61,7 @@ type Config struct {
Spans []roachpb.Span
CheckpointSpans []roachpb.Span
CheckpointTimestamp hlc.Timestamp
SpanLevelCheckpoint *jobspb.TimestampSpansMap
Targets changefeedbase.Targets
Writer kvevent.Writer
Metrics *kvevent.Metrics
Expand Down Expand Up @@ -129,7 +131,7 @@ func Run(ctx context.Context, cfg Config) error {

g := ctxgroup.WithContext(ctx)
f := newKVFeed(
cfg.Writer, cfg.Spans, cfg.CheckpointSpans, cfg.CheckpointTimestamp,
cfg.Writer, cfg.Spans, cfg.CheckpointSpans, cfg.CheckpointTimestamp, cfg.SpanLevelCheckpoint,
cfg.SchemaChangeEvents, cfg.SchemaChangePolicy,
cfg.NeedsInitialScan, cfg.WithDiff, cfg.WithFiltering,
cfg.WithFrontierQuantize,
Expand Down Expand Up @@ -256,6 +258,7 @@ type kvFeed struct {
spans []roachpb.Span
checkpoint []roachpb.Span
checkpointTimestamp hlc.Timestamp
spanLevelCheckpoint *jobspb.TimestampSpansMap
withFrontierQuantize time.Duration
withDiff bool
withFiltering bool
Expand Down Expand Up @@ -288,6 +291,7 @@ func newKVFeed(
spans []roachpb.Span,
checkpoint []roachpb.Span,
checkpointTimestamp hlc.Timestamp,
spanLevelCheckpoint *jobspb.TimestampSpansMap,
schemaChangeEvents changefeedbase.SchemaChangeEventClass,
schemaChangePolicy changefeedbase.SchemaChangePolicy,
withInitialBackfill, withDiff, withFiltering bool,
Expand All @@ -309,6 +313,7 @@ func newKVFeed(
spans: spans,
checkpoint: checkpoint,
checkpointTimestamp: checkpointTimestamp,
spanLevelCheckpoint: spanLevelCheckpoint,
withInitialBackfill: withInitialBackfill,
withDiff: withDiff,
withFiltering: withFiltering,
Expand Down Expand Up @@ -393,6 +398,7 @@ func (f *kvFeed) run(ctx context.Context) (err error) {
if initialScan {
f.checkpoint = nil
f.checkpointTimestamp = hlc.Timestamp{}
f.spanLevelCheckpoint = nil
}

boundaryTS := rangeFeedResumeFrontier.Frontier()
Expand Down Expand Up @@ -472,6 +478,26 @@ func filterCheckpointSpans(spans []roachpb.Span, completed []roachpb.Span) []roa
return sg.Slice()
}

// filterCheckpointSpansFromCheckpoint filters spans which have already been
// completed based on the given checkpoints information and returns the list of
// spans that still need to be done. If checkpoint is nil, it will filter out
// the given oldCheckpoint directly. Otherwise, it extracts and flattens spans
// from newCheckpoint.Entries into a single slice and filters out the completed
// spans.
func filterCheckpointSpansFromCheckpoint(
spansToScan []roachpb.Span, oldCheckpoint []roachpb.Span, newCheckpoint *jobspb.TimestampSpansMap,
) []roachpb.Span {
if newCheckpoint == nil {
return filterCheckpointSpans(spansToScan, oldCheckpoint)
}

completed := make([]roachpb.Span, 0)
for _, sp := range newCheckpoint.Entries {
completed = append(completed, sp.Spans...)
}
return filterCheckpointSpans(spansToScan, completed)
}

// scanIfShould performs a scan of KV pairs in watched span if
// - this is the initial scan, or
// - table schema is changed (a column is added/dropped) and a re-scan is needed.
Expand Down Expand Up @@ -544,7 +570,7 @@ func (f *kvFeed) scanIfShould(

// If we have initial checkpoint information specified, filter out
// spans which we no longer need to scan.
spansToBackfill := filterCheckpointSpans(spansToScan, f.checkpoint)
spansToBackfill := filterCheckpointSpansFromCheckpoint(spansToScan, f.checkpoint, f.spanLevelCheckpoint)
if len(spansToBackfill) == 0 {
return spansToScan, scanTime, nil
}
Expand Down Expand Up @@ -592,11 +618,17 @@ func (f *kvFeed) runUntilTableEvent(ctx context.Context, resumeFrontier span.Fro
}()

// We have catchup scan checkpoint. Advance frontier.
if startFrom.Less(f.checkpointTimestamp) {
for _, s := range f.checkpoint {
if _, err := resumeFrontier.Forward(s, f.checkpointTimestamp); err != nil {
return err
}
// TODO(#140686): check if we can get rid of this condition and the empty
// checkpointTimestamp check in Restore.
if f.spanLevelCheckpoint != nil || startFrom.Less(f.checkpointTimestamp) {
// Usually, f.checkpointTimestamp should be above the frontier timestamp
// startFrom. During ALTER CHANGEFEED and in some testing scenarios,
// f.checkpointTimestamp could be empty. Make sure Restore does not regress
// and forward the checkpointed spans to the checkpointTimestamp in those
// cases. This is not a problem for spanLevelCheckpoint because it always
// sets checkpointed timestamp for spans properly.
if err := checkpoint.Restore(resumeFrontier, f.checkpoint, f.checkpointTimestamp, f.spanLevelCheckpoint); err != nil {
return err
}
}

Expand Down
17 changes: 14 additions & 3 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc/keyside"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -107,6 +108,7 @@ func TestKVFeed(t *testing.T) {
endTime hlc.Timestamp
spans []roachpb.Span
checkpoint []roachpb.Span
spanLevelCheckpoint *jobspb.TimestampSpansMap
events []kvpb.RangeFeedEvent

descs []catalog.TableDescriptor
Expand Down Expand Up @@ -145,7 +147,7 @@ func TestKVFeed(t *testing.T) {
ref := rawEventFeed(tc.events)
tf := newRawTableFeed(tc.descs, tc.initialHighWater)
st := timers.New(time.Minute).GetOrCreateScopedTimers("")
f := newKVFeed(buf, tc.spans, tc.checkpoint, hlc.Timestamp{},
f := newKVFeed(buf, tc.spans, tc.checkpoint, hlc.Timestamp{}, nil,
tc.schemaChangeEvents, tc.schemaChangePolicy,
tc.needsInitialScan, tc.withDiff, true /* withFiltering */, tc.withFrontierQuantize,
0, /* consumerID */
Expand All @@ -162,7 +164,7 @@ func TestKVFeed(t *testing.T) {

// Assert that each scanConfig pushed to the channel `scans` by `f.run()`
// is what we expected (as specified in the test case).
spansToScan := filterCheckpointSpans(tc.spans, tc.checkpoint)
spansToScan := filterCheckpointSpansFromCheckpoint(tc.spans, tc.checkpoint, tc.spanLevelCheckpoint)
testG := ctxgroup.WithContext(ctx)
testG.GoCtx(func(ctx context.Context) error {
for expScans := tc.expScans; len(expScans) > 0; expScans = expScans[1:] {
Expand Down Expand Up @@ -250,6 +252,9 @@ func TestKVFeed(t *testing.T) {
checkpoint: []roachpb.Span{
tableSpan(codec, 42),
},
spanLevelCheckpoint: jobspb.NewTimestampSpansMap(map[hlc.Timestamp]roachpb.Spans{
ts(2).Next(): {tableSpan(codec, 42)},
}),
events: []kvpb.RangeFeedEvent{
kvEvent(codec, 42, "a", "b", ts(3)),
},
Expand All @@ -268,6 +273,9 @@ func TestKVFeed(t *testing.T) {
checkpoint: []roachpb.Span{
makeSpan(codec, 42, "a", "q"),
},
spanLevelCheckpoint: jobspb.NewTimestampSpansMap(map[hlc.Timestamp]roachpb.Spans{
ts(2).Next(): {makeSpan(codec, 42, "a", "q")},
}),
events: []kvpb.RangeFeedEvent{
kvEvent(codec, 42, "a", "val", ts(3)),
kvEvent(codec, 42, "d", "val", ts(3)),
Expand Down Expand Up @@ -402,7 +410,10 @@ func TestKVFeed(t *testing.T) {
expEventsCount: 4,
},
} {
t.Run(tc.name, func(t *testing.T) {
testutils.RunTrueAndFalse(t, tc.name, func(t *testing.T, useNewCheckpoint bool) {
if !useNewCheckpoint {
tc.spanLevelCheckpoint = nil
}
runTest(t, tc)
})
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/partitionccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/utilccl",
"//pkg/config/zonepb",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog",
Expand Down
31 changes: 6 additions & 25 deletions pkg/ccl/partitionccl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"

"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
Expand Down Expand Up @@ -49,7 +50,7 @@ func valueEncodePartitionTuple(
// We are operating in a context where the expressions cannot
// refer to table columns, so these two names are unambiguously
// referring to the desired partition boundaries.
maybeTuple, _ = tree.WalkExpr(replaceMinMaxValVisitor{}, maybeTuple)
maybeTuple, _ = tree.WalkExpr(zonepb.ReplaceMinMaxValVisitor{}, maybeTuple)

tuple, ok := maybeTuple.(*tree.Tuple)
if !ok {
Expand Down Expand Up @@ -98,7 +99,8 @@ func valueEncodePartitionTuple(
}

var semaCtx tree.SemaContext
typedExpr, err := schemaexpr.SanitizeVarFreeExpr(ctx, expr, cols[i].GetType(), "partition",
colTyp := cols[i].GetType()
typedExpr, err := schemaexpr.SanitizeVarFreeExpr(ctx, expr, colTyp, "partition",
&semaCtx,
volatility.Immutable,
false, /*allowAssignmentCast*/
Expand All @@ -114,7 +116,8 @@ func valueEncodePartitionTuple(
if err != nil {
return nil, errors.Wrapf(err, "evaluating %s", typedExpr)
}
if err := colinfo.CheckDatumTypeFitsColumnType(cols[i], datum.ResolvedType()); err != nil {
err = colinfo.CheckDatumTypeFitsColumnType(cols[i].GetName(), colTyp, datum.ResolvedType())
if err != nil {
return nil, err
}
value, scratch, err = valueside.EncodeWithScratch(value, valueside.NoColumnID, datum, scratch[:0])
Expand All @@ -125,28 +128,6 @@ func valueEncodePartitionTuple(
return value, nil
}

// replaceMinMaxValVisitor replaces occurrences of the unqualified
// identifiers "minvalue" and "maxvalue" in the partitioning
// (sub-)exprs by the symbolic values tree.PartitionMinVal and
// tree.PartitionMaxVal.
type replaceMinMaxValVisitor struct{}

// VisitPre satisfies the tree.Visitor interface.
func (v replaceMinMaxValVisitor) VisitPre(expr tree.Expr) (recurse bool, newExpr tree.Expr) {
if t, ok := expr.(*tree.UnresolvedName); ok && t.NumParts == 1 {
switch t.Parts[0] {
case "minvalue":
return false, tree.PartitionMinVal{}
case "maxvalue":
return false, tree.PartitionMaxVal{}
}
}
return true, expr
}

// VisitPost satisfies the Visitor interface.
func (replaceMinMaxValVisitor) VisitPost(expr tree.Expr) tree.Expr { return expr }

func createPartitioningImpl(
ctx context.Context,
evalCtx *eval.Context,
Expand Down
Loading

0 comments on commit cf3cf8f

Please sign in to comment.