From eaaee1a1b463f13ed2347315d279e2b321bb2a8a Mon Sep 17 00:00:00 2001 From: SimFG Date: Tue, 22 Oct 2024 17:10:21 +0800 Subject: [PATCH] add the target info to the ts manager key Signed-off-by: SimFG --- core/config/reader.go | 1 + core/reader/replicate_channel_manager.go | 53 +++++++++----- core/reader/replicate_channel_manager_test.go | 12 +++- core/reader/ts_manager.go | 71 +++++++++++++------ server/cdc_impl.go | 1 + 5 files changed, 94 insertions(+), 44 deletions(-) diff --git a/core/config/reader.go b/core/config/reader.go index 1795355..93a635b 100644 --- a/core/config/reader.go +++ b/core/config/reader.go @@ -24,4 +24,5 @@ type ReaderConfig struct { Retry RetrySettings SourceChannelNum int TargetChannelNum int + ReplicateID string } diff --git a/core/reader/replicate_channel_manager.go b/core/reader/replicate_channel_manager.go index 0d4838a..51d5820 100644 --- a/core/reader/replicate_channel_manager.go +++ b/core/reader/replicate_channel_manager.go @@ -57,6 +57,7 @@ var replicatePool = conc.NewPool[struct{}](10, conc.WithExpiryDuration(time.Minu type replicateChannelManager struct { replicateCtx context.Context + replicateID string streamDispatchClient msgdispatcher.Client streamCreator StreamCreator targetClient api.TargetAPI @@ -108,6 +109,7 @@ func NewReplicateChannelManagerWithDispatchClient( downstream string, ) (api.ChannelManager, error) { return &replicateChannelManager{ + replicateID: readConfig.ReplicateID, streamDispatchClient: dispatchClient, streamCreator: NewDisptachClientStreamCreator(factory, dispatchClient), targetClient: client, @@ -468,7 +470,7 @@ func (r *replicateChannelManager) AddPartition(ctx context.Context, dbInfo *mode } r.channelLock.RUnlock() if len(handlers) == 0 { - partitionLog.Info("waiting handler", zap.Int64("collection_id", collectionID)) + partitionLog.Info("waiting handler") return errors.New("no handler found") } return nil @@ -478,7 +480,7 @@ func (r *replicateChannelManager) AddPartition(ctx context.Context, dbInfo *mode } if len(handlers) == 0 { - partitionLog.Warn("no handler found", zap.Int64("collection_id", collectionID)) + partitionLog.Warn("no handler found") return errors.New("no handler found") } @@ -591,11 +593,11 @@ func (r *replicateChannelManager) StopReadCollection(ctx context.Context, info * } func (r *replicateChannelManager) GetChannelChan() <-chan string { - return GetTSManager().GetTargetChannelChan() + return GetTSManager().GetTargetChannelChan(r.replicateID) } func (r *replicateChannelManager) GetMsgChan(pChannel string) <-chan *api.ReplicateMsg { - return GetTSManager().GetTargetMsgChan(pChannel) + return GetTSManager().GetTargetMsgChan(r.replicateID, pChannel) } func (r *replicateChannelManager) GetEventChan() <-chan *api.ReplicateAPIEvent { @@ -641,6 +643,7 @@ func (r *replicateChannelManager) startReadChannel(sourceInfo *model.SourceColle channelHandler.forwardMsgFunc = r.forwardMsg channelHandler.isDroppedCollection = r.isDroppedCollection channelHandler.isDroppedPartition = r.isDroppedPartition + channelHandler.replicateID = r.replicateID diffValueForKey := r.channelMapping.CheckKeyNotExist(sourceInfo.PChannel, targetInfo.PChannel) if !diffValueForKey { @@ -658,9 +661,6 @@ func (r *replicateChannelManager) startReadChannel(sourceInfo *model.SourceColle } return nil, nil } - if sourceInfo.SeekPosition != nil { - GetTSManager().CollectTS(channelHandler.targetPChannel, sourceInfo.SeekPosition.GetTimestamp()) - } if !r.channelMapping.CheckKeyExist(sourceInfo.PChannel, targetInfo.PChannel) { log.Info("diff target pchannel", zap.String("source_channel", sourceInfo.PChannel), @@ -831,6 +831,7 @@ func (r *replicateChannelManager) stopReadChannel(pChannelName string, collectio type replicateChannelHandler struct { replicateCtx context.Context + replicateID string sourcePChannel string targetPChannel string targetClient api.TargetAPI @@ -867,6 +868,7 @@ type replicateChannelHandler struct { func (r *replicateChannelHandler) AddCollection(sourceInfo *model.SourceCollectionInfo, targetInfo *model.TargetCollectionInfo) { <-r.startReadChan + r.collectionSourceSeekPosition(sourceInfo.SeekPosition) collectionID := sourceInfo.CollectionID streamChan, closeStreamFunc, err := r.streamCreator.GetStreamChan(r.replicateCtx, sourceInfo.VChannel, sourceInfo.SeekPosition) if err != nil { @@ -1145,6 +1147,10 @@ func (r *replicateChannelHandler) Close() { // r.stream.Close() } +func (r *replicateChannelHandler) getTSManagerChannelKey(channelName string) string { + return FormatChanKey(r.replicateID, channelName) +} + func (r *replicateChannelHandler) innerHandleReplicateMsg(forward bool, msg *api.ReplicateMsg) { msgPack := msg.MsgPack p := r.handlePack(forward, msgPack) @@ -1154,20 +1160,28 @@ func (r *replicateChannelHandler) innerHandleReplicateMsg(forward bool, msg *api p.CollectionID = msg.CollectionID p.CollectionName = msg.CollectionName p.PChannelName = msg.PChannelName - GetTSManager().SendTargetMsg(r.targetPChannel, p) + GetTSManager().SendTargetMsg(r.getTSManagerChannelKey(r.targetPChannel), p) +} + +func (r *replicateChannelHandler) collectionSourceSeekPosition(sourceSeekPosition *msgstream.MsgPosition) { + if sourceSeekPosition == nil { + return + } + GetTSManager().CollectTS(r.getTSManagerChannelKey(r.targetPChannel), sourceSeekPosition.GetTimestamp()) } func (r *replicateChannelHandler) startReadChannel() { - close(r.startReadChan) var cts uint64 = math.MaxUint64 if r.sourceSeekPosition != nil { cts = r.sourceSeekPosition.GetTimestamp() } - GetTSManager().InitTSInfo(r.targetPChannel, time.Duration(r.handlerOpts.TTInterval)*time.Millisecond, cts, r.handlerOpts.MessageBufferSize) + GetTSManager().InitTSInfo(r.replicateID, r.targetPChannel, time.Duration(r.handlerOpts.TTInterval)*time.Millisecond, cts, r.handlerOpts.MessageBufferSize) log.Info("start read channel", zap.String("channel_name", r.sourcePChannel), zap.String("target_channel", r.targetPChannel), ) + close(r.startReadChan) + r.collectionSourceSeekPosition(r.sourceSeekPosition) go func() { for { select { @@ -1289,6 +1303,7 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa return pack.Msgs[i].BeginTs() < pack.Msgs[j].BeginTs() || (pack.Msgs[i].BeginTs() == pack.Msgs[j].BeginTs() && pack.Msgs[i].Type() == commonpb.MsgType_Delete) }) + tsManagerChannelKey := r.getTSManagerChannelKey(r.targetPChannel) r.addCollectionLock.RLock() if *r.addCollectionCnt != 0 { @@ -1326,7 +1341,7 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa log.Warn("begin timestamp is 0", zap.Uint64("end_ts", pack.EndTs), zap.Any("hasValidMsg", hasValidMsg)) } } - GetTSManager().CollectTS(r.targetPChannel, beginTS) + GetTSManager().CollectTS(tsManagerChannelKey, beginTS) r.addCollectionLock.RUnlock() if r.msgPackCallback != nil { @@ -1562,26 +1577,26 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa position.ChannelName = pChannel } - maxTS, _ := GetTSManager().GetMaxTS(r.targetPChannel) + maxTS, _ := GetTSManager().GetMaxTS(tsManagerChannelKey) resetTS := resetMsgPackTimestamp(newPack, maxTS) if resetTS { - GetTSManager().CollectTS(r.targetPChannel, newPack.EndTs) + GetTSManager().CollectTS(tsManagerChannelKey, newPack.EndTs) } - GetTSManager().LockTargetChannel(r.targetPChannel) - defer GetTSManager().UnLockTargetChannel(r.targetPChannel) + GetTSManager().LockTargetChannel(tsManagerChannelKey) + defer GetTSManager().UnLockTargetChannel(tsManagerChannelKey) - if !needTsMsg && len(newPack.Msgs) == 0 && !GetTSManager().UnsafeShouldSendTSMsg(r.targetPChannel) { + if !needTsMsg && len(newPack.Msgs) == 0 && !GetTSManager().UnsafeShouldSendTSMsg(tsManagerChannelKey) { return api.EmptyMsgPack } - generateTS, ok := GetTSManager().UnsafeGetMaxTS(r.targetPChannel) + generateTS, ok := GetTSManager().UnsafeGetMaxTS(tsManagerChannelKey) if !ok { log.Warn("not found the max ts", zap.String("channel", r.targetPChannel)) r.sendErrEvent(fmt.Errorf("not found the max ts")) return nil } - GetTSManager().UnsafeUpdatePackTS(r.targetPChannel, newPack.BeginTs, func(newTS uint64) (uint64, bool) { + GetTSManager().UnsafeUpdatePackTS(tsManagerChannelKey, newPack.BeginTs, func(newTS uint64) (uint64, bool) { generateTS = newTS reset := resetMsgPackTimestamp(newPack, newTS) return newPack.EndTs, reset @@ -1611,7 +1626,7 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa } newPack.Msgs = append(newPack.Msgs, timeTickMsg) - GetTSManager().UnsafeUpdateTSInfo(r.targetPChannel, generateTS, resetLastTs) + GetTSManager().UnsafeUpdateTSInfo(tsManagerChannelKey, generateTS, resetLastTs) msgTime, _ := tsoutil.ParseHybridTs(generateTS) TSMetricVec.WithLabelValues(r.targetPChannel).Set(float64(msgTime)) r.ttRateLog.Debug("time tick msg", zap.String("channel", r.targetPChannel), zap.Uint64("max_ts", generateTS)) diff --git a/core/reader/replicate_channel_manager_test.go b/core/reader/replicate_channel_manager_test.go index 1f59d79..5f282f0 100644 --- a/core/reader/replicate_channel_manager_test.go +++ b/core/reader/replicate_channel_manager_test.go @@ -224,6 +224,7 @@ func TestStartReadCollectionForMilvus(t *testing.T) { InitBackOff: 1, MaxBackOff: 1, }, + ReplicateID: "127.0.0.1:19530", }, &api.DefaultMetaOp{}, func(s string, pack *msgstream.MsgPack) { }, "milvus") assert.NoError(t, err) @@ -410,6 +411,7 @@ func TestStartReadCollectionForKafka(t *testing.T) { InitBackOff: 1, MaxBackOff: 1, }, + ReplicateID: "127.0.0.1:19530", }, &api.DefaultMetaOp{}, func(s string, pack *msgstream.MsgPack) { }, "kafka") assert.NoError(t, err) @@ -665,6 +667,7 @@ func TestReplicateChannelHandler(t *testing.T) { factory := msgstream.NewMockFactory(t) stream := msgstream.NewMockMsgStream(t) targetClient := mocks.NewTargetAPI(t) + replicateID := "127.0.0.1:19530" factory.EXPECT().NewMsgStream(mock.Anything).Return(stream, nil) stream.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Times(4) @@ -689,6 +692,7 @@ func TestReplicateChannelHandler(t *testing.T) { handler.isDroppedPartition = func(i int64) bool { return false } + handler.replicateID = replicateID time.Sleep(100 * time.Millisecond) handler.startReadChannel() @@ -718,7 +722,7 @@ func TestReplicateChannelHandler(t *testing.T) { handler.RemovePartitionInfo(2, "p2", 10002) assert.False(t, handler.IsEmpty()) - assert.NotNil(t, GetTSManager().GetTargetMsgChan(handler.targetPChannel)) + assert.NotNil(t, GetTSManager().GetTargetMsgChan(replicateID, handler.targetPChannel)) // test updateTargetPartitionInfo targetClient.EXPECT().GetPartitionInfo(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("mock error 2")).Once() @@ -751,6 +755,7 @@ func TestReplicateChannelHandler(t *testing.T) { stream := msgstream.NewMockMsgStream(t) targetClient := mocks.NewTargetAPI(t) streamChan := make(chan *msgstream.MsgPack) + replicateID := "127.0.0.1:19530" factory.EXPECT().NewMsgStream(mock.Anything).Return(stream, nil) stream.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice() @@ -782,6 +787,7 @@ func TestReplicateChannelHandler(t *testing.T) { TTInterval: 10000, }) assert.NoError(t, err) + handler.replicateID = replicateID handler.startReadChannel() handler.isDroppedCollection = func(i int64) bool { @@ -790,7 +796,7 @@ func TestReplicateChannelHandler(t *testing.T) { handler.isDroppedPartition = func(i int64) bool { return false } - GetTSManager().InitTSInfo(handler.targetPChannel, 100*time.Millisecond, math.MaxUint64, 10) + GetTSManager().InitTSInfo(replicateID, handler.targetPChannel, 100*time.Millisecond, math.MaxUint64, 10) err = handler.AddPartitionInfo(&pb.CollectionInfo{ ID: 1, @@ -807,7 +813,7 @@ func TestReplicateChannelHandler(t *testing.T) { noRetry(handler) done := make(chan struct{}) - targetMsgChan := GetTSManager().GetTargetMsgChan(handler.targetPChannel) + targetMsgChan := GetTSManager().GetTargetMsgChan(replicateID, handler.targetPChannel) go func() { defer close(done) diff --git a/core/reader/ts_manager.go b/core/reader/ts_manager.go index a989d40..84c3c66 100644 --- a/core/reader/ts_manager.go +++ b/core/reader/ts_manager.go @@ -20,7 +20,9 @@ package reader import ( "context" + "fmt" "math" + "strings" "sync" "time" @@ -73,21 +75,21 @@ type tsManager struct { // deprecated lastMsgTS util.Map[string, uint64] - channelTS2 *typeutil.ConcurrentMap[string, *tsInfo] - channelTSLocks *lock.KeyLock[string] - rateLog *log.RateLog - targetChannelChan chan string + channelTS2 *typeutil.ConcurrentMap[string, *tsInfo] + channelTSLocks *lock.KeyLock[string] + rateLog *log.RateLog + targetChannelChans *typeutil.ConcurrentMap[string, chan string] } func GetTSManager() *tsManager { tsOnce.Do(func() { tsInstance = &tsManager{ - retryOptions: util.GetRetryOptions(config.GetCommonConfig().Retry), - lastTS: util.NewValue[uint64](0), - rateLog: log.NewRateLog(1, log.L()), - channelTS2: typeutil.NewConcurrentMap[string, *tsInfo](), - channelTSLocks: lock.NewKeyLock[string](), - targetChannelChan: make(chan string, 10), + retryOptions: util.GetRetryOptions(config.GetCommonConfig().Retry), + lastTS: util.NewValue[uint64](0), + rateLog: log.NewRateLog(1, log.L()), + channelTS2: typeutil.NewConcurrentMap[string, *tsInfo](), + channelTSLocks: lock.NewKeyLock[string](), + targetChannelChans: typeutil.NewConcurrentMap[string, chan string](), } }) return tsInstance @@ -134,6 +136,7 @@ func (m *tsManager) CollectTS(channelName string, currentTS uint64) { } ts2, ok := m.channelTS2.Get(channelName) if !ok { + log.Warn("collect ts failed, channel not exist", zap.String("channelName", channelName)) m.channelTS2.Insert(channelName, &tsInfo{ cts: currentTS, }) @@ -240,15 +243,19 @@ func (m *tsManager) UnLockTargetChannel(channelName string) { m.channelTSLocks.Unlock(channelName) } -func (m *tsManager) GetTargetChannelChan() <-chan string { - return m.targetChannelChan +func (m *tsManager) GetTargetChannelChan(replicateID string) <-chan string { + m.channelTSLocks.RLock(replicateID) + defer m.channelTSLocks.RUnlock(replicateID) + c, _ := m.targetChannelChans.Get(replicateID) + return c } -func (m *tsManager) GetTargetMsgChan(channelName string) <-chan *api.ReplicateMsg { - m.channelTSLocks.RLock(channelName) - defer m.channelTSLocks.RUnlock(channelName) +func (m *tsManager) GetTargetMsgChan(replicateID string, channelName string) <-chan *api.ReplicateMsg { + channelKey := FormatChanKey(replicateID, channelName) + m.channelTSLocks.RLock(channelKey) + defer m.channelTSLocks.RUnlock(channelKey) - ts, ok := m.channelTS2.Get(channelName) + ts, ok := m.channelTS2.Get(channelKey) if !ok { return nil } @@ -267,17 +274,25 @@ func (m *tsManager) SendTargetMsg(channelName string, msg *api.ReplicateMsg) { ts.targetMsgChan <- msg } -func (m *tsManager) InitTSInfo(channelName string, p time.Duration, c uint64, channeBufferSize int) { - m.channelTSLocks.Lock(channelName) - defer m.channelTSLocks.Unlock(channelName) +func (m *tsManager) InitTSInfo(replicateID string, channelName string, p time.Duration, c uint64, channeBufferSize int) { + channelKey := FormatChanKey(replicateID, channelName) + m.channelTSLocks.Lock(channelKey) + defer m.channelTSLocks.Unlock(channelKey) if c == math.MaxUint64 { c = 0 } t := time.Now().Add(-p) - ts, ok := m.channelTS2.Get(channelName) + ts, ok := m.channelTS2.Get(channelKey) if !ok { - m.targetChannelChan <- channelName - m.channelTS2.Insert(channelName, &tsInfo{ + m.channelTSLocks.Lock(replicateID) + targetChannelChan, ok := m.targetChannelChans.Get(replicateID) + if !ok { + targetChannelChan = make(chan string, 10) + m.targetChannelChans.Insert(replicateID, targetChannelChan) + } + m.channelTSLocks.Unlock(replicateID) + targetChannelChan <- channelName + m.channelTS2.Insert(channelKey, &tsInfo{ cts: c, sts: t, period: p, @@ -343,3 +358,15 @@ func (m *tsManager) UnsafeGetMaxTS(channelName string) (uint64, bool) { } return ts.cts, true } + +func FormatChanKey(replicateID, channelName string) string { + return fmt.Sprintf("%s.%s", replicateID, channelName) +} + +func ParseChanKey(key string) (string, string) { + lastSplit := strings.LastIndex(key, ".") + if lastSplit == -1 { + panic("invalid key") + } + return key[:lastSplit], key[lastSplit+1:] +} diff --git a/server/cdc_impl.go b/server/cdc_impl.go index 78c2866..3622057 100644 --- a/server/cdc_impl.go +++ b/server/cdc_impl.go @@ -785,6 +785,7 @@ func (e *MetaCDC) newReplicateEntity(info *meta.TaskInfo) (*ReplicateEntity, err Retry: e.config.Retry, SourceChannelNum: e.config.SourceConfig.ChannelNum, TargetChannelNum: info.MilvusConnectParam.ChannelNum, + ReplicateID: uKey, }, metaOp, func(s string, pack *msgstream.MsgPack) { replicateMetric(info, s, pack, metrics.OPTypeRead) }, downstream)