Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
140753: roachtest/mixedversion: TestTestPlanner should restore default version r=herkolategan,darrylwon a=srosenberg

We saw that `Test_maxNumPlanSteps` suddenly failed in an otherwise unrelated backport [1]. The reason turned out to be non-determinisim. That is, a different
unit test, namely `TestTestPlanner` did not
restore the default version (`clusterupgrade.TestBuildVersion`).

This change forwardports the missing restore to
ensure future test executions follow the same
PRNG sequence.

[1] #140674

Epic: none

Release note: None

140785: rac2,kvserver: start the StreamCloseScheduler r=sumeerbhola a=pav-kv

This commit enables the `StreamCloseScheduler`, which is responsible for closing RACv2 streams some time (400ms) after they enter `StateProbe`.

The initialization had to move from `NewStore` to `Store.Start()` because it needs the `stopper` to start the job.

Epic: none
Release note: none

140791: sem/tree: avoid assertion error on unimplemented builtins in views r=yuzefovich a=yuzefovich

Previously, we would hit an assertion error when trying to use an unimplemented builtin in the CREATE VIEW statement and this is now fixed. The issue is that we resolve unimplemented builtins as a definition with zero overloads, and all places that previously assumed at least one existing overload have been audited. The resolved function definition has been updated to have "unsupported with issue" integer indicating why there are no overloads.

Additionally, `UnsupportedWithIssue` property is now changed to be `uint` since we didn't use "negative value as having no corresponding issue" ability.

I decided to not include a release note since this seems like an edge case and we've only seen this a handful of times in sentry.

Fixes: #128535.

Release note: None

141009: sqlstats: use `BatchProcessLatencyBuckets` for flush latency r=xinhaoz a=dhartunian

The max of 10s on the IO latency buckets is too short to measure flush latency effectively. Previously, this metric was measuring per-statement flush latency, but this was altered in #122919.

Release note: None

Co-authored-by: Stan Rosenberg <[email protected]>
Co-authored-by: Pavel Kalinnikov <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: David Hartunian <[email protected]>
  • Loading branch information
5 people committed Feb 10, 2025
5 parents d7dbf95 + 8a97c58 + 592dae9 + adf3ea3 + 15122d7 commit 731fff3
Show file tree
Hide file tree
Showing 11 changed files with 88 additions and 56 deletions.
2 changes: 2 additions & 0 deletions pkg/cmd/roachtest/roachtestutil/mixedversion/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ var (
const seed = 12345 // expectations are based on this seed

func TestTestPlanner(t *testing.T) {
// N.B. we must restore default versions since other tests may depend on it.
defer setDefaultVersions()
// Make some test-only mutators available to the test.
mutatorsAvailable := append([]mutator{
concurrentUserHooksMutator{},
Expand Down
3 changes: 3 additions & 0 deletions pkg/crosscluster/logical/create_logical_replication_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,9 @@ func lookupFunctionID(
if len(rf.Overloads) > 1 {
return 0, errors.Newf("function %q has more than 1 overload", u.String())
}
if rf.UnsupportedWithIssue != 0 {
return 0, rf.MakeUnsupportedError()
}
fnOID := rf.Overloads[0].Oid
descID := typedesc.UserDefinedTypeOIDToID(fnOID)
if descID == 0 {
Expand Down
15 changes: 7 additions & 8 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/close_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
)

type streamCloseScheduler struct {
stopper *stop.Stopper
clock timeutil.TimeSource
scheduler RaftScheduler
// nonEmptyCh is used to signal the scheduler that there are events to
Expand All @@ -43,15 +42,15 @@ type scheduledQueue struct {
}

func NewStreamCloseScheduler(
stopper *stop.Stopper, clock timeutil.TimeSource, scheduler RaftScheduler,
clock timeutil.TimeSource, scheduler RaftScheduler,
) *streamCloseScheduler {
return &streamCloseScheduler{stopper: stopper, scheduler: scheduler, clock: clock}
return &streamCloseScheduler{scheduler: scheduler, clock: clock}
}

func (s *streamCloseScheduler) Start(ctx context.Context) error {
func (s *streamCloseScheduler) Start(ctx context.Context, stopper *stop.Stopper) error {
s.nonEmptyCh = make(chan struct{}, 1)
return s.stopper.RunAsyncTask(ctx,
"flow-control-stream-close-scheduler", s.run)
return stopper.RunAsyncTask(ctx, "flow-control-stream-close-scheduler",
func(ctx context.Context) { s.run(ctx, stopper) })
}

// streamCloseScheduler implements the rac2.ProbeToCloseTimerScheduler
Expand Down Expand Up @@ -92,7 +91,7 @@ func (s *streamCloseScheduler) ScheduleSendStreamCloseRaftMuLocked(
// constant is used to avoid the timer from signaling.
const maxStreamCloserDelay = 24 * time.Hour

func (s *streamCloseScheduler) run(_ context.Context) {
func (s *streamCloseScheduler) run(_ context.Context, stopper *stop.Stopper) {
timer := s.clock.NewTimer()
timer.Reset(s.nextDelay(s.clock.Now()))
defer timer.Stop()
Expand All @@ -102,7 +101,7 @@ func (s *streamCloseScheduler) run(_ context.Context) {
// maxStreamCloserDelay. When an event is added, the nonEmptyCh will be
// signaled and the timer will be reset to the next event's delay.
select {
case <-s.stopper.ShouldQuiesce():
case <-stopper.ShouldQuiesce():
return
case <-s.nonEmptyCh:
case <-timer.Ch():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func TestStreamCloseScheduler(t *testing.T) {
stopper = stop.NewStopper()
clock = timeutil.NewManualTime(timeutil.UnixEpoch)
raftScheduler = &testingRaftScheduler{clock: clock}
closeScheduler = NewStreamCloseScheduler(stopper, clock, raftScheduler)
require.NoError(t, closeScheduler.Start(ctx))
closeScheduler = NewStreamCloseScheduler(clock, raftScheduler)
require.NoError(t, closeScheduler.Start(ctx, stopper))
return fmt.Sprintf("now=%vs", clock.Now().Unix())

case "schedule":
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ type rangeControllerInitState struct {
// RangeControllerFactory abstracts RangeController creation for testing.
type RangeControllerFactory interface {
// New creates a new RangeController.
New(ctx context.Context, state rangeControllerInitState) rac2.RangeController
New(context.Context, rangeControllerInitState) rac2.RangeController
}

// ProcessorOptions are specified when creating a new Processor.
Expand Down
39 changes: 22 additions & 17 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1579,23 +1579,6 @@ func NewStore(
cfg.RaftSchedulerConcurrency, cfg.RaftSchedulerShardSize, cfg.RaftSchedulerConcurrencyPriority,
cfg.RaftElectionTimeoutTicks)

// kvflowRangeControllerFactory depends on the raft scheduler, so it must be
// created per-store rather than per-node like other replication admission
// control (flow control) v2 components.
s.kvflowRangeControllerFactory = replica_rac2.NewRangeControllerFactoryImpl(
s.Clock(),
s.cfg.KVFlowEvalWaitMetrics,
s.cfg.KVFlowRangeControllerMetrics,
s.cfg.KVFlowStreamTokenProvider,
replica_rac2.NewStreamCloseScheduler(
s.stopper, timeutil.DefaultTimeSource{}, s.scheduler),
(*racV2Scheduler)(s.scheduler),
s.cfg.KVFlowSendTokenWatcher,
s.cfg.KVFlowWaitForEvalConfig,
s.cfg.RaftMaxInflightBytes,
s.TestingKnobs().FlowControlTestingKnobs,
)

// Run a log SyncWaiter loop for every 32 raft scheduler goroutines.
// Experiments on c5d.12xlarge instances (48 vCPUs, the largest single-socket
// instance AWS offers) show that with fewer SyncWaiters, raft log callback
Expand Down Expand Up @@ -2274,6 +2257,28 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
}
}

scs := replica_rac2.NewStreamCloseScheduler(timeutil.DefaultTimeSource{}, s.scheduler)
if err := scs.Start(ctx, stopper); err != nil {
return err
}
// kvflowRangeControllerFactory depends on the raft scheduler, so it must be
// created per-store rather than per-node like other replication admission
// control (flow control) v2 components.
// NB: this factory is used in the Replica initialization flow (below), so we
// must create it no later than here.
s.kvflowRangeControllerFactory = replica_rac2.NewRangeControllerFactoryImpl(
s.Clock(),
s.cfg.KVFlowEvalWaitMetrics,
s.cfg.KVFlowRangeControllerMetrics,
s.cfg.KVFlowStreamTokenProvider,
scs,
(*racV2Scheduler)(s.scheduler),
s.cfg.KVFlowSendTokenWatcher,
s.cfg.KVFlowWaitForEvalConfig,
s.cfg.RaftMaxInflightBytes,
s.TestingKnobs().FlowControlTestingKnobs,
)

now := s.cfg.Clock.Now()
s.startedAt = now.WallTime

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ func makeServerMetrics(cfg *ExecutorConfig) ServerMetrics {
Mode: metric.HistogramModePreferHdrLatency,
Metadata: MetaSQLStatsFlushLatency,
Duration: 6 * metricsSampleInterval,
BucketConfig: metric.IOLatencyBuckets,
BucketConfig: metric.BatchProcessLatencyBuckets,
}),
SQLStatsRemovedRows: metric.NewCounter(MetaSQLStatsRemovedRows),
SQLTxnStatsCollectionOverhead: metric.NewHistogram(metric.HistogramOptions{
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/views
Original file line number Diff line number Diff line change
Expand Up @@ -1942,3 +1942,8 @@ statement error cross database type references are not supported: db106602a.publ
CREATE OR REPLACE VIEW v AS (SELECT 1 FROM (VALUES (1)) val(i) WHERE 'foo'::db106602a.e = 'foo'::db106602a.e)

subtest end

# Regression test for hitting an assertion error when using an unimplemented
# builtin in the view (#128535).
statement error pgcode 0A000 unimplemented
CREATE VIEW v128535 AS SELECT json_to_tsvector()
16 changes: 2 additions & 14 deletions pkg/sql/schema_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scbuild"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins/builtinsregistry"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
"github.com/lib/pq/oid"
Expand Down Expand Up @@ -472,18 +470,8 @@ func (sr *schemaResolver) ResolveFunction(
case builtinDef != nil && routine != nil:
return builtinDef.MergeWith(routine, path)
case builtinDef != nil:
props, _ := builtinsregistry.GetBuiltinProperties(builtinDef.Name)
if props.UnsupportedWithIssue != 0 {
// Note: no need to embed the function name in the message; the
// caller will add the function name as prefix.
const msg = "this function is not yet supported"
var unImplErr error
if props.UnsupportedWithIssue < 0 {
unImplErr = unimplemented.New(builtinDef.Name+"()", msg)
} else {
unImplErr = unimplemented.NewWithIssueDetail(props.UnsupportedWithIssue, builtinDef.Name, msg)
}
return nil, pgerror.Wrapf(unImplErr, pgcode.InvalidParameterValue, "%s()", builtinDef.Name)
if builtinDef.UnsupportedWithIssue != 0 {
return nil, builtinDef.MakeUnsupportedError()
}
return builtinDef, nil
case routine != nil:
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/sem/eval/parse_doid.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ func ParseDOid(ctx context.Context, evalCtx *Context, s string, t *types.T) (*tr
return nil, pgerror.Newf(pgcode.AmbiguousAlias,
"more than one function named '%s'", funcDef.Name)
}
if funcDef.UnsupportedWithIssue != 0 {
return nil, funcDef.MakeUnsupportedError()
}
overload := funcDef.Overloads[0]
return tree.NewDOidWithTypeAndName(overload.Oid, t, funcDef.Name), nil
case oid.T_regprocedure:
Expand Down
53 changes: 40 additions & 13 deletions pkg/sql/sem/tree/function_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/errors"
"github.com/lib/pq/oid"
)
Expand Down Expand Up @@ -42,7 +43,26 @@ type ResolvedFunctionDefinition struct {
// not qualified.
Name string

// Overloads is the set of overloads for this resolved function. It can be
// empty in which case UnsupportedWithIssue is set.
Overloads []QualifiedOverload

// UnsupportedWithIssue, if non-zero, indicates the built-in is not really
// supported and provides the issue number to link.
UnsupportedWithIssue uint
}

// MakeUnsupportedError returns an implemented error if UnsupportedWithIssue is
// non-zero.
func (fd *ResolvedFunctionDefinition) MakeUnsupportedError() error {
if fd.UnsupportedWithIssue == 0 {
return nil
}
const msg = "this function is not yet supported"
return pgerror.Wrapf(
unimplemented.NewWithIssueDetail(int(fd.UnsupportedWithIssue), fd.Name, msg),
pgcode.InvalidParameterValue, "%s()", fd.Name,
)
}

type qualifiedOverloads []QualifiedOverload
Expand Down Expand Up @@ -70,11 +90,9 @@ func MakeQualifiedOverload(schema string, overload *Overload) QualifiedOverload
// FunctionProperties defines the properties of the built-in
// functions that are common across all overloads.
type FunctionProperties struct {
// UnsupportedWithIssue, if non-zero indicates the built-in is not
// really supported; the name is a placeholder. Value -1 just says
// "not supported" without an issue to link; values > 0 provide an
// issue number to link.
UnsupportedWithIssue int
// UnsupportedWithIssue, if non-zero, indicates the built-in is not really
// supported and provides the issue number to link.
UnsupportedWithIssue uint

// Undocumented, when set to true, indicates that the built-in function is
// hidden from documentation. This is currently used to hide experimental
Expand Down Expand Up @@ -505,8 +523,8 @@ func combineOverloads(a, b []QualifiedOverload, path SearchPath) []QualifiedOver
// method, function is resolved to one overload, so that we can get rid of this
// function and similar methods below.
func (fd *ResolvedFunctionDefinition) GetClass() (FunctionClass, error) {
if len(fd.Overloads) < 1 {
return 0, errors.AssertionFailedf("no overloads found for function %s", fd.Name)
if fd.UnsupportedWithIssue != 0 {
return 0, fd.MakeUnsupportedError()
}
ret := fd.Overloads[0].Class
for i := range fd.Overloads {
Expand All @@ -523,8 +541,8 @@ func (fd *ResolvedFunctionDefinition) GetClass() (FunctionClass, error) {
// different length. This is good enough since we don't create UDF with
// ReturnLabel.
func (fd *ResolvedFunctionDefinition) GetReturnLabel() ([]string, error) {
if len(fd.Overloads) < 1 {
return nil, errors.AssertionFailedf("no overloads found for function %s", fd.Name)
if fd.UnsupportedWithIssue != 0 {
return nil, fd.MakeUnsupportedError()
}
ret := fd.Overloads[0].ReturnLabels
for i := range fd.Overloads {
Expand All @@ -539,8 +557,8 @@ func (fd *ResolvedFunctionDefinition) GetReturnLabel() ([]string, error) {
// checking each overload's HasSequenceArguments flag. Ambiguous error is
// returned if there is any overload has a different flag.
func (fd *ResolvedFunctionDefinition) GetHasSequenceArguments() (bool, error) {
if len(fd.Overloads) < 1 {
return false, errors.AssertionFailedf("no overloads found for function %s", fd.Name)
if fd.UnsupportedWithIssue != 0 {
return false, fd.MakeUnsupportedError()
}
ret := fd.Overloads[0].HasSequenceArguments
for i := range fd.Overloads {
Expand All @@ -554,12 +572,21 @@ func (fd *ResolvedFunctionDefinition) GetHasSequenceArguments() (bool, error) {
// QualifyBuiltinFunctionDefinition qualified all overloads in a function
// definition with a schema name. Note that this function can only be used for
// builtin function.
//
// It must be called during the initialization of the process.
func QualifyBuiltinFunctionDefinition(
def *FunctionDefinition, schema string,
) *ResolvedFunctionDefinition {
if len(def.Definition) == 0 && def.UnsupportedWithIssue == 0 {
panic(errors.AssertionFailedf("function %s has no overloads yet UnsupportedWithIssue is not set", def.Name))
}
if len(def.Definition) > 0 && def.UnsupportedWithIssue != 0 {
panic(errors.AssertionFailedf("function %s has %d overloads yet UnsupportedWithIssue is set to %d", def.Name, len(def.Definition), def.UnsupportedWithIssue))
}
ret := &ResolvedFunctionDefinition{
Name: def.Name,
Overloads: make([]QualifiedOverload, 0, len(def.Definition)),
Name: def.Name,
Overloads: make([]QualifiedOverload, 0, len(def.Definition)),
UnsupportedWithIssue: def.UnsupportedWithIssue,
}
for _, o := range def.Definition {
ret.Overloads = append(
Expand Down

0 comments on commit 731fff3

Please sign in to comment.