Skip to content

Commit

Permalink
ddl_notifier: add subscription API (#56182)
Browse files Browse the repository at this point in the history
ref #55722
  • Loading branch information
lance6716 authored Sep 24, 2024
1 parent 53a3d20 commit 1787505
Show file tree
Hide file tree
Showing 7 changed files with 377 additions and 74 deletions.
9 changes: 8 additions & 1 deletion pkg/ddl/notifier/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,19 @@ go_library(
"events.go",
"publish.go",
"store.go",
"subscribe.go",
],
importpath = "github.com/pingcap/tidb/pkg/ddl/notifier",
visibility = ["//visibility:public"],
deps = [
"//pkg/ddl/session",
"//pkg/kv",
"//pkg/meta/model",
"//pkg/sessionctx",
"//pkg/util/intest",
"//pkg/util/logutil",
"@com_github_pingcap_errors//:errors",
"@org_uber_go_zap//:zap",
],
)

Expand All @@ -22,14 +27,16 @@ go_test(
timeout = "short",
srcs = [
"events_test.go",
"publish_testkit_test.go",
"testkit_test.go",
],
embed = [":notifier"],
flaky = True,
shard_count = 3,
deps = [
"//pkg/ddl/session",
"//pkg/meta/model",
"//pkg/parser/model",
"//pkg/sessionctx",
"//pkg/testkit",
"@com_github_stretchr_testify//require",
],
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/notifier/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func (s *SchemaChangeEvent) MarshalJSON() ([]byte, error) {
func (s *SchemaChangeEvent) UnmarshalJSON(b []byte) error {
var j jsonSchemaChangeEvent
err := json.Unmarshal(b, &j)
if err != nil {
if err == nil {
s.inner = &j
}
return err
Expand Down
62 changes: 0 additions & 62 deletions pkg/ddl/notifier/publish_testkit_test.go

This file was deleted.

33 changes: 23 additions & 10 deletions pkg/ddl/notifier/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ type Store interface {
ctx context.Context,
se *sess.Session,
ddlJobID int64,
multiSchemaChangeID int,
multiSchemaChangeID int64,
processedBy uint64,
) error
Delete(ctx context.Context, se *sess.Session, ddlJobID int64, multiSchemaChangeID int) error
List(ctx context.Context, se *sess.Session, limit int) ([]*schemaChange, error)
List(ctx context.Context, se *sess.Session) ([]*schemaChange, error)
}

// DefaultStore is the system table store. Still WIP now.
Expand Down Expand Up @@ -64,29 +64,42 @@ func (t *tableStore) Insert(ctx context.Context, s *sess.Session, change *schema
return err
}

//revive:disable

func (t *tableStore) UpdateProcessed(ctx context.Context, se *sess.Session, ddlJobID int64, multiSchemaChangeID int, processedBy uint64) error {
//TODO implement me
panic("implement me")
func (t *tableStore) UpdateProcessed(
ctx context.Context,
se *sess.Session,
ddlJobID int64,
multiSchemaChangeID int64,
processedBy uint64,
) error {
sql := fmt.Sprintf(`
UPDATE %s.%s
SET processed_by_flag = %d
WHERE ddl_job_id = %d AND multi_schema_change_seq = %d`,
t.db, t.table,
processedBy,
ddlJobID, multiSchemaChangeID)
_, err := se.Execute(ctx, sql, "ddl_notifier")
return err
}

//revive:disable

func (t *tableStore) Delete(ctx context.Context, se *sess.Session, ddlJobID int64, multiSchemaChangeID int) error {
//TODO implement me
panic("implement me")
}

//revive:enable

func (t *tableStore) List(ctx context.Context, se *sess.Session, limit int) ([]*schemaChange, error) {
func (t *tableStore) List(ctx context.Context, se *sess.Session) ([]*schemaChange, error) {
sql := fmt.Sprintf(`
SELECT
ddl_job_id,
multi_schema_change_seq,
schema_change,
processed_by_flag
FROM %s.%s ORDER BY ddl_job_id, multi_schema_change_seq LIMIT %d`,
t.db, t.table, limit)
FROM %s.%s ORDER BY ddl_job_id, multi_schema_change_seq`,
t.db, t.table)
rows, err := se.Execute(ctx, sql, "ddl_notifier")
if err != nil {
return nil, err
Expand Down
210 changes: 210 additions & 0 deletions pkg/ddl/notifier/subscribe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package notifier

import (
"context"
goerr "errors"
"fmt"
"time"

"github.com/pingcap/errors"
sess "github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
)

// SchemaChangeHandler function is used by subscribers to handle the
// SchemaChangeEvent generated by the publisher (DDL module currently). It will
// be called at least once for every SchemaChange. The sctx has already started a
// pessimistic transaction and handler should execute exactly once SQL
// modification logic with it. After the function is returned, subscribing
// framework will commit the whole transaction with internal flag modification to
// provide exactly-once delivery. The handler will be called periodically, with
// no guarantee about the latency between the execution time and
// SchemaChangeEvent happening time.
//
// The handler function must be registered by RegisterHandler before the
// ddlNotifier is started. If the handler can't immediately serve the handling
// after registering, it can return nil to tell the ddlNotifier to act like the
// change has been handled, or return ErrNotReadyRetryLater to hold the change
// and re-handle later.
type SchemaChangeHandler func(
ctx context.Context,
sctx sessionctx.Context,
change *SchemaChangeEvent,
) error

// ErrNotReadyRetryLater should be returned by a registered handler that is not
// ready to process the events.
var ErrNotReadyRetryLater = errors.New("not ready, retry later")

// HandlerID is the type of the persistent ID used to register a handler. Every
// ID occupies a bit in a BIGINT column, so at most we can only have 64 IDs. To
// avoid duplicate IDs, all IDs should be defined in below declaration.
type HandlerID int

const (
// TestHandlerID is used for testing only.
TestHandlerID HandlerID = 0
)

// RegisterHandler must be called with an exclusive and fixed HandlerID for each
// handler to register the handler. Illegal ID will panic. RegisterHandler should
// not be called after the global ddlNotifier is started.
//
// RegisterHandler is not concurrency-safe.
func RegisterHandler(id HandlerID, handler SchemaChangeHandler) {
intID := int(id)
// the ID is used by bit operation in processedByFlag. We use BIGINT UNSIGNED to
// store it so only 64 IDs are allowed.
if intID < 0 || intID >= 64 {
panic(fmt.Sprintf("illegal HandlerID: %d", id))
}

if _, ok := globalDDLNotifier.handlers[id]; ok {
panic(fmt.Sprintf("HandlerID %d already registered", id))
}
globalDDLNotifier.handlers[id] = handler
}

type ddlNotifier struct {
ownedSCtx sessionctx.Context
store Store
handlers map[HandlerID]SchemaChangeHandler
pollInterval time.Duration
}

var globalDDLNotifier *ddlNotifier

// InitDDLNotifier initializes the global ddlNotifier. It should be called only
// once and before any RegisterHandler call. The ownership of the sctx is passed
// to the ddlNotifier.
func InitDDLNotifier(
sctx sessionctx.Context,
store Store,
pollInterval time.Duration,
) {
globalDDLNotifier = &ddlNotifier{
ownedSCtx: sctx,
store: store,
handlers: make(map[HandlerID]SchemaChangeHandler),
pollInterval: pollInterval,
}
}

// ResetDDLNotifier is used for testing only.
func ResetDDLNotifier() { globalDDLNotifier = nil }

// StartDDLNotifier starts the global ddlNotifier. It will block until the
// context is canceled.
func StartDDLNotifier(ctx context.Context) {
globalDDLNotifier.Start(ctx)
}

// Start starts the ddlNotifier. It will block until the context is canceled.
func (n *ddlNotifier) Start(ctx context.Context) {
ctx = kv.WithInternalSourceType(ctx, kv.InternalDDLNotifier)
ticker := time.NewTicker(n.pollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := n.processEvents(ctx); err != nil {
logutil.Logger(ctx).Error("Error processing events", zap.Error(err))
}
}
}
}

func (n *ddlNotifier) processEvents(ctx context.Context) error {
changes, err := n.store.List(ctx, sess.NewSession(n.ownedSCtx))
if err != nil {
return errors.Trace(err)
}

// we should ensure deliver order of events to a handler, so if a handler returns
// error for previous events it should not receive later events.
skipHandlers := make(map[HandlerID]struct{})
for _, change := range changes {
for handlerID, handler := range n.handlers {
if _, ok := skipHandlers[handlerID]; ok {
continue
}
if err2 := n.processEventForHandler(ctx, change, handlerID, handler); err2 != nil {
skipHandlers[handlerID] = struct{}{}

if !goerr.Is(err2, ErrNotReadyRetryLater) {
logutil.Logger(ctx).Error("Error processing change",
zap.Int64("ddlJobID", change.ddlJobID),
zap.Int64("multiSchemaChangeSeq", change.multiSchemaChangeSeq),
zap.Int("handlerID", int(handlerID)),
zap.Error(err2))
}
continue
}
}
// TODO: remove the processed change after all handlers processed it.
}

return nil
}

func (n *ddlNotifier) processEventForHandler(
ctx context.Context,
change *schemaChange,
handlerID HandlerID,
handler SchemaChangeHandler,
) (err error) {
if (change.processedByFlag & (1 << handlerID)) != 0 {
return nil
}

se := sess.NewSession(n.ownedSCtx)

if err = se.Begin(ctx); err != nil {
return errors.Trace(err)
}
defer func() {
if err == nil {
err = errors.Trace(se.Commit(ctx))
} else {
se.Rollback()
}
}()

// TODO: Should we attach a timeout to this ctx?
if err = handler(ctx, n.ownedSCtx, change.event); err != nil {
return errors.Trace(err)
}

newFlag := change.processedByFlag | (1 << handlerID)
if err = n.store.UpdateProcessed(
ctx,
se,
change.ddlJobID,
change.multiSchemaChangeSeq,
newFlag,
); err != nil {
return errors.Trace(err)
}
change.processedByFlag = newFlag

return nil
}
Loading

0 comments on commit 1787505

Please sign in to comment.