Skip to content

Commit

Permalink
test: fix more it cases (pingcap#839)
Browse files Browse the repository at this point in the history
  • Loading branch information
asddongmen authored Jan 10, 2025
1 parent 0f6d382 commit 66ad4cc
Show file tree
Hide file tree
Showing 19 changed files with 256 additions and 244 deletions.
11 changes: 6 additions & 5 deletions .github/workflows/integration_test_mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,10 @@ jobs:
run: |
export TICDC_NEWARCH=true && make integration_test CASE=autorandom
# - name: Test availability
# if: ${{ success() }}
# run: |
# export TICDC_NEWARCH=true && make integration_test CASE=availability
- name: Test availability
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=availability
- name: Test bank
if: ${{ success() }}
Expand Down Expand Up @@ -282,7 +282,8 @@ jobs:
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=partition_table
# The 3th case in this group
- name: Test multi_tables_ddl
if: ${{ success() }}
run: |
Expand Down
14 changes: 9 additions & 5 deletions coordinator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/pkg/scheduler"
"github.com/pingcap/ticdc/server/watcher"
"github.com/pingcap/ticdc/utils/dynstream"
"github.com/pingcap/ticdc/utils/chann"
"github.com/pingcap/ticdc/utils/threadpool"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand All @@ -52,6 +52,7 @@ type Controller struct {
operatorController *operator.Controller
changefeedDB *changefeed.ChangefeedDB
backend changefeed.Backend
eventCh *chann.DrainableChann[*Event]

bootstrapped *atomic.Bool
bootstrapper *bootstrap.Bootstrapper[heartbeatpb.CoordinatorBootstrapResponse]
Expand All @@ -60,7 +61,6 @@ type Controller struct {
nodeChanged bool
nodeManager *watcher.NodeManager

stream dynstream.DynamicStream[int, string, *Event, *Controller, *StreamHandler]
taskScheduler threadpool.ThreadPool
taskHandlers []*threadpool.TaskHandle
messageCenter messaging.MessageCenter
Expand All @@ -85,7 +85,7 @@ func NewController(
updatedChangefeedCh chan map[common.ChangeFeedID]*changefeed.Changefeed,
stateChangedCh chan *ChangefeedStateChangeEvent,
backend changefeed.Backend,
stream dynstream.DynamicStream[int, string, *Event, *Controller, *StreamHandler],
eventCh *chann.DrainableChann[*Event],
taskScheduler threadpool.ThreadPool,
batchSize int, balanceInterval time.Duration,
) *Controller {
Expand All @@ -101,11 +101,11 @@ func NewController(
scheduler.BasicScheduler: scheduler.NewBasicScheduler(selfNode.ID.String(), batchSize, oc, changefeedDB, nodeManager, oc.NewAddMaintainerOperator),
scheduler.BalanceScheduler: scheduler.NewBalanceScheduler(selfNode.ID.String(), batchSize, oc, changefeedDB, nodeManager, balanceInterval, oc.NewMoveMaintainerOperator),
}),
eventCh: eventCh,
operatorController: oc,
messageCenter: mc,
changefeedDB: changefeedDB,
nodeManager: nodeManager,
stream: stream,
taskScheduler: taskScheduler,
backend: backend,
nodeChanged: false,
Expand Down Expand Up @@ -137,6 +137,10 @@ func NewController(

// HandleEvent implements the event-driven process mode
func (c *Controller) HandleEvent(event *Event) bool {
if event == nil {
return false
}

start := time.Now()
defer func() {
duration := time.Since(start)
Expand Down Expand Up @@ -514,7 +518,7 @@ func (c *Controller) RemoveNode(id node.ID) {

func (c *Controller) submitPeriodTask() {
task := func() time.Time {
c.stream.Push("coordinator", &Event{eventType: EventPeriod})
c.eventCh.In() <- &Event{eventType: EventPeriod}
return time.Now().Add(time.Millisecond * 500)
}
periodTaskhandler := c.taskScheduler.SubmitFunc(task, time.Now().Add(time.Millisecond*500))
Expand Down
73 changes: 50 additions & 23 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"math"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/coordinator/changefeed"
"github.com/pingcap/ticdc/pkg/common"
Expand All @@ -27,14 +28,16 @@ import (
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/pkg/server"
"github.com/pingcap/ticdc/utils/dynstream"
"github.com/pingcap/ticdc/server/watcher"
"github.com/pingcap/ticdc/utils/chann"
"github.com/pingcap/ticdc/utils/threadpool"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand All @@ -52,18 +55,19 @@ type coordinator struct {
controller *Controller

mc messaging.MessageCenter
stream dynstream.DynamicStream[int, string, *Event, *Controller, *StreamHandler]
taskScheduler threadpool.ThreadPool

gcManager gc.Manager
pdClient pd.Client
pdClock pdutil.Clock

eventCh *chann.DrainableChann[*Event]
updatedChangefeedCh chan map[common.ChangeFeedID]*changefeed.Changefeed
stateChangedCh chan *ChangefeedStateChangeEvent
backend changefeed.Backend

cancel func()
closed atomic.Bool
}

func New(node *node.Info,
Expand All @@ -81,41 +85,53 @@ func New(node *node.Info,
nodeInfo: node,
lastTickTime: time.Now(),
gcManager: gc.NewManager(clusterID, pdClient, pdClock),
eventCh: chann.NewAutoDrainChann[*Event](),
pdClient: pdClient,
pdClock: pdClock,
mc: mc,
updatedChangefeedCh: make(chan map[common.ChangeFeedID]*changefeed.Changefeed, 1024),
stateChangedCh: make(chan *ChangefeedStateChangeEvent, 8),
backend: backend,
}
c.stream = dynstream.NewDynamicStream(NewStreamHandler())
c.stream.Start()
c.taskScheduler = threadpool.NewThreadPoolDefault()
c.closed.Store(false)

controller := NewController(
c.version,
c.nodeInfo,
c.updatedChangefeedCh,
c.stateChangedCh,
backend,
c.stream,
c.eventCh,
c.taskScheduler,
batchSize,
balanceCheckInterval,
)

c.controller = controller
if err := c.stream.AddPath("coordinator", controller); err != nil {
log.Panic("failed to add path",
zap.Error(err))
}

// receive messages
mc.RegisterHandler(messaging.CoordinatorTopic, c.recvMessages)

nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName)

nodeManager.RegisterOwnerChangeHandler(string(c.nodeInfo.ID), func(newCoordinatorID string) {
if newCoordinatorID != string(c.nodeInfo.ID) {
log.Info("Coordinator changed, and I am not the coordinator, stop myself",
zap.String("selfID", string(c.nodeInfo.ID)),
zap.String("newCoordinatorID", newCoordinatorID))
c.AsyncStop()
}
})

return c
}

func (c *coordinator) recvMessages(_ context.Context, msg *messaging.TargetMessage) error {
c.stream.Push("coordinator", &Event{message: msg})
if c.closed.Load() {
return nil
}
c.eventCh.In() <- &Event{message: msg}
return nil
}

Expand All @@ -133,6 +149,12 @@ func (c *coordinator) Run(ctx context.Context) error {
updateMetricsTicker := time.NewTicker(time.Second * 1)
defer updateMetricsTicker.Stop()

go c.runHandleEvent(ctx)

failpoint.Inject("coordinator-run-with-error", func() error {
return errors.New("coordinator run with error")
})

for {
select {
case <-ctx.Done():
Expand All @@ -153,8 +175,17 @@ func (c *coordinator) Run(ctx context.Context) error {
if err := c.handleStateChangedEvent(ctx, event); err != nil {
return errors.Trace(err)
}
case <-updateMetricsTicker.C:
c.updateMetricsOnce()
}
}
}

func (c *coordinator) runHandleEvent(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case event := <-c.eventCh.Out():
c.controller.HandleEvent(event)
}
}
}
Expand Down Expand Up @@ -261,11 +292,13 @@ func shouldRunChangefeed(state model.FeedState) bool {
}

func (c *coordinator) AsyncStop() {
c.mc.DeRegisterHandler(messaging.CoordinatorTopic)
c.controller.Stop()
c.taskScheduler.Stop()
c.stream.Close()
c.cancel()
if c.closed.CompareAndSwap(false, true) {
c.mc.DeRegisterHandler(messaging.CoordinatorTopic)
c.controller.Stop()
c.taskScheduler.Stop()
c.eventCh.CloseAndDrain()
c.cancel()
}
}

func (c *coordinator) sendMessages(msgs []*messaging.TargetMessage) {
Expand Down Expand Up @@ -294,9 +327,3 @@ func (c *coordinator) updateGCSafepoint(
err := c.gcManager.TryUpdateGCSafePoint(ctx, gcSafepointUpperBound, false)
return errors.Trace(err)
}

func (c *coordinator) updateMetricsOnce() {
dsMetrics := c.stream.GetMetrics()
metricsDSInputChanLen.Set(float64(dsMetrics.EventChanSize))
metricsDSPendingQueueLen.Set(float64(dsMetrics.PendingQueueLen))
}
31 changes: 25 additions & 6 deletions coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/ticdc/pkg/common"
appcontext "github.com/pingcap/ticdc/pkg/common/context"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/etcd"
"github.com/pingcap/ticdc/pkg/messaging"
"github.com/pingcap/ticdc/pkg/messaging/proto"
"github.com/pingcap/ticdc/pkg/node"
Expand Down Expand Up @@ -230,9 +231,10 @@ func TestCoordinatorScheduling(t *testing.T) {
}()

ctx := context.Background()
nodeManager := watcher.NewNodeManager(nil, nil)
info := node.NewInfo("127.0.0.1:8700", "")
etcdClient := newMockEtcdClient(string(info.ID))
nodeManager := watcher.NewNodeManager(nil, etcdClient)
appcontext.SetService(watcher.NodeManagerName, nodeManager)
info := node.NewInfo("127.0.0.1:38300", "")
nodeManager.GetAliveNodes()[info.ID] = info
mc := messaging.NewMessageCenter(ctx,
info.ID, 100, config.NewDefaultMessageCenterConfig())
Expand Down Expand Up @@ -292,9 +294,10 @@ func TestCoordinatorScheduling(t *testing.T) {

func TestScaleNode(t *testing.T) {
ctx := context.Background()
nodeManager := watcher.NewNodeManager(nil, nil)
appcontext.SetService(watcher.NodeManagerName, nodeManager)
info := node.NewInfo("127.0.0.1:28300", "")
etcdClient := newMockEtcdClient(string(info.ID))
nodeManager := watcher.NewNodeManager(nil, etcdClient)
appcontext.SetService(watcher.NodeManagerName, nodeManager)
nodeManager.GetAliveNodes()[info.ID] = info
mc1 := messaging.NewMessageCenter(ctx, info.ID, 0, config.NewDefaultMessageCenterConfig())
appcontext.SetService(appcontext.MessageCenter, mc1)
Expand Down Expand Up @@ -367,9 +370,10 @@ func TestScaleNode(t *testing.T) {

func TestBootstrapWithUnStoppedChangefeed(t *testing.T) {
ctx := context.Background()
nodeManager := watcher.NewNodeManager(nil, nil)
info := node.NewInfo("127.0.0.1:28301", "")
etcdClient := newMockEtcdClient(string(info.ID))
nodeManager := watcher.NewNodeManager(nil, etcdClient)
appcontext.SetService(watcher.NodeManagerName, nodeManager)
info := node.NewInfo("127.0.0.1:8700", "")
nodeManager.GetAliveNodes()[info.ID] = info
mc1 := messaging.NewMessageCenter(ctx, info.ID, 0, config.NewDefaultMessageCenterConfig())
appcontext.SetService(appcontext.MessageCenter, mc1)
Expand Down Expand Up @@ -485,3 +489,18 @@ func startMaintainerNode(ctx context.Context,
manager: maintainerM,
}
}

type mockEtcdClient struct {
ownerID string
etcd.CDCEtcdClient
}

func newMockEtcdClient(ownerID string) *mockEtcdClient {
return &mockEtcdClient{
ownerID: ownerID,
}
}

func (m *mockEtcdClient) GetOwnerID(ctx context.Context) (model.CaptureID, error) {
return model.CaptureID(m.ownerID), nil
}
28 changes: 0 additions & 28 deletions coordinator/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package coordinator

import (
"github.com/pingcap/ticdc/pkg/messaging"
"github.com/pingcap/ticdc/utils/dynstream"
)

const (
Expand All @@ -29,30 +28,3 @@ type Event struct {
eventType int
message *messaging.TargetMessage
}

// StreamHandler implements the dynstream Handler, no real logic, just forward event
type StreamHandler struct{}

func NewStreamHandler() *StreamHandler {
return &StreamHandler{}
}

func (m *StreamHandler) Path(_ *Event) string {
return "coordinator"
}

func (m *StreamHandler) Handle(dest *Controller, events ...*Event) bool {
if len(events) != 1 {
// TODO: Support batch
panic("unexpected event count")
}
event := events[0]
return dest.HandleEvent(event)
}

func (m *StreamHandler) GetSize(_ *Event) int { return 0 }
func (m *StreamHandler) GetArea(_ string, _ *Controller) int { return 0 }
func (m *StreamHandler) GetTimestamp(_ *Event) dynstream.Timestamp { return 0 }
func (m *StreamHandler) GetType(_ *Event) dynstream.EventType { return dynstream.DefaultEventType }
func (m *StreamHandler) IsPaused(_ *Event) bool { return false }
func (m *StreamHandler) OnDrop(_ *Event) {}
27 changes: 0 additions & 27 deletions coordinator/helper_test.go

This file was deleted.

Loading

0 comments on commit 66ad4cc

Please sign in to comment.