Skip to content

Commit

Permalink
puller: close kvclient correctly when stopping a processor (#11957) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jan 20, 2025
1 parent bf93006 commit 9ef86ba
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 14 deletions.
6 changes: 5 additions & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1089,4 +1089,8 @@ func (d *ddlHandler) Run(ctx context.Context, _ ...chan<- error) error {

func (d *ddlHandler) WaitForReady(_ context.Context) {}

func (d *ddlHandler) Close() {}
func (d *ddlHandler) Close() {
if d.puller != nil {
d.puller.Close()
}
}
14 changes: 9 additions & 5 deletions cdc/processor/sourcemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,11 +280,15 @@ func (m *SourceManager) Close() {
zap.String("changefeed", m.changefeedID.ID))

start := time.Now()
m.tablePullers.Range(func(span tablepb.Span, value interface{}) bool {
value.(pullerwrapper.Wrapper).Close()
return true
})
log.Info("All pullers have been closed",
if m.multiplexing {
m.multiplexingPuller.puller.Close()
} else {
m.tablePullers.Range(func(span tablepb.Span, value interface{}) bool {
value.(pullerwrapper.Wrapper).Close()
return true
})
}
log.Info("SourceManager puller have been closed",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Duration("cost", time.Since(start)))
Expand Down
29 changes: 21 additions & 8 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@ func (p *ddlJobPullerImpl) Run(ctx context.Context, _ ...chan<- error) error {
return p.run(ctx)
}

// WaitForReady implements util.Runnable.
func (p *ddlJobPullerImpl) WaitForReady(_ context.Context) {}

// Close implements util.Runnable.
func (p *ddlJobPullerImpl) Close() {
if p.multiplexing {
p.multiplexingPuller.Close()
}
}

func (p *ddlJobPullerImpl) handleRawKVEntry(ctx context.Context, ddlRawKV *model.RawKVEntry) error {
if ddlRawKV == nil {
return nil
Expand Down Expand Up @@ -185,12 +195,6 @@ func (p *ddlJobPullerImpl) runMultiplexing(ctx context.Context) error {
return eg.Wait()
}

// WaitForReady implements util.Runnable.
func (p *ddlJobPullerImpl) WaitForReady(_ context.Context) {}

// Close implements util.Runnable.
func (p *ddlJobPullerImpl) Close() {}

// Output the DDL job entry, it contains the DDL job and the error.
func (p *ddlJobPullerImpl) Output() <-chan *model.DDLJobEntry {
return p.outputCh
Expand Down Expand Up @@ -794,6 +798,12 @@ func (h *ddlPullerImpl) Run(ctx context.Context) error {
zap.String("changefeed", h.changefeedID.ID),
zap.Uint64("resolvedTS", atomic.LoadUint64(&h.resolvedTS)))

defer func() {
log.Info("DDL puller stopped",
zap.String("namespace", h.changefeedID.Namespace),
zap.String("changefeed", h.changefeedID.ID))
}()

return g.Wait()
}

Expand All @@ -811,10 +821,13 @@ func (h *ddlPullerImpl) PopFrontDDL() (uint64, *timodel.Job) {

// Close the ddl puller, release all resources.
func (h *ddlPullerImpl) Close() {
log.Info("close the ddl puller",
h.cancel()
if h.ddlJobPuller != nil {
h.ddlJobPuller.Close()
}
log.Info("DDL puller closed",
zap.String("namespace", h.changefeedID.Namespace),
zap.String("changefeed", h.changefeedID.ID))
h.cancel()
}

func (h *ddlPullerImpl) ResolvedTs() uint64 {
Expand Down
10 changes: 10 additions & 0 deletions cdc/puller/multiplexing_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,16 @@ func (p *MultiplexingPuller) run(ctx context.Context, includeClient bool) error
return g.Wait()
}

// Close closes the puller.
func (p *MultiplexingPuller) Close() {
if p.client != nil {
p.client.Close()
}
log.Info("MultiplexingPuller is closed",
zap.String("namespace", p.changefeed.Namespace),
zap.String("changefeed", p.changefeed.ID))
}

func (p *MultiplexingPuller) handleInputCh(ctx context.Context, inputCh <-chan kv.MultiplexingEvent) error {
for {
var e kv.MultiplexingEvent
Expand Down

0 comments on commit 9ef86ba

Please sign in to comment.