diff --git a/cdc/cdc/kv/client_test.go b/cdc/cdc/kv/client_test.go index 5a2f6400..5ea37c30 100644 --- a/cdc/cdc/kv/client_test.go +++ b/cdc/cdc/kv/client_test.go @@ -332,7 +332,6 @@ func waitRequestID(c *check.C, allocatedID uint64) { func (s *clientSuite) TestConnectOfflineTiKV(c *check.C) { defer testleak.AfterTest(c)() ctx, cancel := context.WithCancel(context.Background()) - defer cancel() wg := &sync.WaitGroup{} ch2 := make(chan *cdcpb.ChangeDataEvent, 10) srv := newMockChangeDataService(c, ch2) @@ -340,6 +339,7 @@ func (s *clientSuite) TestConnectOfflineTiKV(c *check.C) { defer func() { close(ch2) server2.Stop() + cancel() wg.Wait() }() @@ -410,22 +410,20 @@ func (s *clientSuite) TestConnectOfflineTiKV(c *check.C) { case <-time.After(time.Second): c.Fatalf("reconnection not succeed in 1 second") } - checkEvent(event, 1) + checkEvent(event, GetSafeResolvedTs(1)) select { case event = <-eventCh: case <-time.After(time.Second): c.Fatalf("reconnection not succeed in 1 second") } - checkEvent(event, ver.Ver) + checkEvent(event, GetSafeResolvedTs(ver.Ver)) // check gRPC connection active counter is updated correctly bucket, ok := grpcPool.bucketConns[invalidStore] c.Assert(ok, check.IsTrue) empty := bucket.recycle() c.Assert(empty, check.IsTrue) - - cancel() } func (s *clientSuite) TestRecvLargeMessageSize(c *check.C) { @@ -438,10 +436,9 @@ func (s *clientSuite) TestRecvLargeMessageSize(c *check.C) { defer func() { close(ch2) server2.Stop() + cancel() wg.Wait() }() - // Cancel first, and then close the server. - defer cancel() rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) c.Assert(err, check.IsNil) @@ -510,7 +507,6 @@ func (s *clientSuite) TestRecvLargeMessageSize(c *check.C) { c.Fatalf("receiving message takes too long") } c.Assert(len(event.Val.Value), check.Equals, largeValSize) - cancel() } func (s *clientSuite) TestHandleError(c *check.C) { @@ -531,6 +527,7 @@ func (s *clientSuite) TestHandleError(c *check.C) { server1.Stop() close(ch2) server2.Stop() + cancel() wg.Wait() }() @@ -678,8 +675,6 @@ consumePreResolvedTs: } c.Assert(event.Resolved, check.NotNil) c.Assert(event.Resolved.ResolvedTs, check.Equals, uint64(120)) - - cancel() } // TestCompatibilityWithSameConn tests kv client returns an error when TiKV returns @@ -696,6 +691,7 @@ func (s *clientSuite) TestCompatibilityWithSameConn(c *check.C) { defer func() { close(ch1) server1.Stop() + cancel() wg.Wait() }() @@ -744,7 +740,6 @@ func (s *clientSuite) TestCompatibilityWithSameConn(c *check.C) { }} ch1 <- incompatibility wg2.Wait() - cancel() } // TestClusterIDMismatch tests kv client returns an error when TiKV returns @@ -761,6 +756,7 @@ func (s *clientSuite) TestClusterIDMismatch(c *check.C) { defer func() { close(changeDataCh) mockService.Stop() + cancel() wg.Wait() }() @@ -816,7 +812,6 @@ func (s *clientSuite) TestClusterIDMismatch(c *check.C) { changeDataCh <- clusterIDMismatchEvent wg2.Wait() - cancel() } func (s *clientSuite) testHandleFeedEvent(c *check.C) { @@ -830,6 +825,7 @@ func (s *clientSuite) testHandleFeedEvent(c *check.C) { defer func() { close(ch1) server1.Stop() + cancel() wg.Wait() }() @@ -1230,8 +1226,6 @@ func (s *clientSuite) testHandleFeedEvent(c *check.C) { c.Errorf("expected event %v not received", multipleExpected) } } - - cancel() } func (s *clientSuite) TestHandleFeedEvent(c *check.C) { @@ -1259,8 +1253,10 @@ func (s *clientSuite) TestStreamSendWithError(c *check.C) { defer testleak.AfterTest(c)() ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} - defer wg.Wait() - defer cancel() + defer func() { + cancel() + wg.Wait() + }() server1Stopped := make(chan struct{}) ch1 := make(chan *cdcpb.ChangeDataEvent, 10) @@ -1383,6 +1379,7 @@ func (s *clientSuite) testStreamRecvWithError(c *check.C, failpointStr string) { defer func() { close(ch1) server1.Stop() + cancel() wg.Wait() }() @@ -1486,7 +1483,6 @@ eventLoop: } } c.Assert(events, check.DeepEquals, expected) - cancel() } // TestStreamRecvWithErrorAndResolvedGoBack mainly tests the scenario that the `Recv` call of a gPRC @@ -1722,6 +1718,7 @@ func (s *clientSuite) TestIncompatibleTiKV(c *check.C) { defer func() { close(ch1) server1.Stop() + cancel() wg.Wait() }() @@ -1785,8 +1782,6 @@ func (s *clientSuite) TestIncompatibleTiKV(c *check.C) { case <-time.After(time.Second): c.Errorf("expected events are not receive") } - - cancel() } // TestPendingRegionError tests kv client should return an error when receiving @@ -1803,6 +1798,7 @@ func (s *clientSuite) TestNoPendingRegionError(c *check.C) { defer func() { close(ch1) server1.Stop() + cancel() wg.Wait() }() @@ -1862,8 +1858,6 @@ func (s *clientSuite) TestNoPendingRegionError(c *check.C) { ev = <-eventCh c.Assert(ev.Resolved, check.NotNil) c.Assert(ev.Resolved.ResolvedTs, check.Equals, uint64(200)) - - cancel() } // TestDropStaleRequest tests kv client should drop an event if its request id is outdated. @@ -1879,6 +1873,7 @@ func (s *clientSuite) TestDropStaleRequest(c *check.C) { defer func() { close(ch1) server1.Stop() + cancel() wg.Wait() }() @@ -1967,7 +1962,6 @@ func (s *clientSuite) TestDropStaleRequest(c *check.C) { c.Errorf("expected event %v not received", expectedEv) } } - cancel() } // TestResolveLock tests the resolve lock logic in kv client @@ -1983,6 +1977,7 @@ func (s *clientSuite) TestResolveLock(c *check.C) { defer func() { close(ch1) server1.Stop() + cancel() wg.Wait() }() @@ -2044,7 +2039,7 @@ func (s *clientSuite) TestResolveLock(c *check.C) { { Resolved: &model.ResolvedSpan{ Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - ResolvedTs: tso, + ResolvedTs: GetSafeResolvedTs(tso), }, RegionID: regionID, }, @@ -2062,8 +2057,6 @@ func (s *clientSuite) TestResolveLock(c *check.C) { // sleep 10s to simulate no resolved event longer than ResolveLockInterval // resolve lock check ticker is 5s. time.Sleep(10 * time.Second) - - cancel() } func (s *clientSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.ChangeDataEvent) { @@ -2077,6 +2070,7 @@ func (s *clientSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.Chan defer func() { close(ch1) server1.Stop() + cancel() wg.Wait() }() @@ -2128,7 +2122,6 @@ func (s *clientSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.Chan ch1 <- event } clientWg.Wait() - cancel() } // TODO(resolved-ts): should panic. Just logging as error now. diff --git a/cdc/cdc/kv/region_worker.go b/cdc/cdc/kv/region_worker.go index 360709e5..dc2f6df9 100644 --- a/cdc/cdc/kv/region_worker.go +++ b/cdc/cdc/kv/region_worker.go @@ -730,6 +730,15 @@ func (w *regionWorker) handleResolvedTs( return nil } regionID := state.sri.verID.GetID() + + // In TiKV, when a leader transfer occurs, the old leader may send the last + // resolved ts, which may be larger than the new leader's first causal + // timestamp after transfer. So we fallback the resolved ts to a safe + // interval to make sure it's correct. + // See https://github.com/tikv/migration/issues/193. + // TODO: fix the issue completely. + resolvedTs = GetSafeResolvedTs(resolvedTs) + // Send resolved ts update in non blocking way, since we can re-query real // resolved ts from region state even if resolved ts update is discarded. // NOTICE: We send any regionTsInfo to resolveLock thread to give us a chance to trigger resolveLock logic @@ -828,3 +837,21 @@ func RunWorkerPool(ctx context.Context) error { }) return errg.Wait() } + +func GetSafeResolvedTs(resolvedTs uint64) uint64 { + cfg := config.GetGlobalServerConfig().KVClient + + logicalTs := oracle.ExtractLogical(resolvedTs) + physicalTime := oracle.GetTimeFromTS(resolvedTs) + + safeTime := physicalTime.Add(-cfg.ResolvedTsSafeInterval) + physicalTs := oracle.GetPhysical(safeTime) + + if physicalTs < 0 { + log.Warn("The resolvedTs is smaller than the ResolvedTsSafeInterval", + zap.Uint64("resolvedTs", resolvedTs)) + return resolvedTs + } + + return oracle.ComposeTS(physicalTs, logicalTs) +} diff --git a/cdc/cdc/kv/region_worker_test.go b/cdc/cdc/kv/region_worker_test.go index 8bd4f28e..383aaf64 100644 --- a/cdc/cdc/kv/region_worker_test.go +++ b/cdc/cdc/kv/region_worker_test.go @@ -135,3 +135,38 @@ func (s *regionWorkerSuite) TestRegionWorkerPoolSize(c *check.C) { size = getWorkerPoolSize() c.Assert(size, check.Equals, maxWorkerPoolSize) } + +func (s *regionWorkerSuite) TestGetSafeResolvedTs(c *check.C) { + defer testleak.AfterTest(c)() + + testCases := []struct { + resolvedTs uint64 + expected uint64 + }{ + { + resolvedTs: 0, + expected: 0, + }, + { + resolvedTs: 10, + expected: 10, + }, + { + resolvedTs: (1 << 18) * 3 * 1000, + expected: 0, + }, + { + resolvedTs: (1<<18)*3*1000 + 1, + expected: 1, + }, + { + resolvedTs: (1<<18)*4*1000 + 1, + expected: (1<<18)*1000 + 1, + }, + } + + for _, testCase := range testCases { + safeResolvedTs := GetSafeResolvedTs(testCase.resolvedTs) + c.Assert(testCase.expected, check.Equals, safeResolvedTs) + } +} diff --git a/cdc/pkg/cmd/server/server_test.go b/cdc/pkg/cmd/server/server_test.go index e4b2ab7f..e7beb1f1 100644 --- a/cdc/pkg/cmd/server/server_test.go +++ b/cdc/pkg/cmd/server/server_test.go @@ -170,9 +170,10 @@ func TestParseCfg(t *testing.T) { }, PerKeySpanMemoryQuota: 10 * 1024 * 1024, // 10M KVClient: &config.KVClientConfig{ - WorkerConcurrent: 8, - WorkerPoolSize: 0, - RegionScanLimit: 40, + WorkerConcurrent: 8, + WorkerPoolSize: 0, + RegionScanLimit: 40, + ResolvedTsSafeInterval: 3 * time.Second, }, Debug: &config.DebugConfig{ EnableKeySpanActor: false, @@ -299,9 +300,10 @@ server-worker-pool-size = 16 Security: &config.SecurityConfig{}, PerKeySpanMemoryQuota: 10 * 1024 * 1024, // 10M KVClient: &config.KVClientConfig{ - WorkerConcurrent: 8, - WorkerPoolSize: 0, - RegionScanLimit: 40, + WorkerConcurrent: 8, + WorkerPoolSize: 0, + RegionScanLimit: 40, + ResolvedTsSafeInterval: 3 * time.Second, }, Debug: &config.DebugConfig{ EnableKeySpanActor: false, @@ -427,9 +429,10 @@ cert-allowed-cn = ["dd","ee"] }, PerKeySpanMemoryQuota: 10 * 1024 * 1024, // 10M KVClient: &config.KVClientConfig{ - WorkerConcurrent: 8, - WorkerPoolSize: 0, - RegionScanLimit: 40, + WorkerConcurrent: 8, + WorkerPoolSize: 0, + RegionScanLimit: 40, + ResolvedTsSafeInterval: 3 * time.Second, }, Debug: &config.DebugConfig{ EnableKeySpanActor: false, diff --git a/cdc/pkg/config/config_test_data.go b/cdc/pkg/config/config_test_data.go index f82c485e..6abd9e74 100644 --- a/cdc/pkg/config/config_test_data.go +++ b/cdc/pkg/config/config_test_data.go @@ -80,7 +80,8 @@ const ( "kv-client": { "worker-concurrent": 8, "worker-pool-size": 0, - "region-scan-limit": 40 + "region-scan-limit": 40, + "resolved-ts-safe-interval": 3000000000 }, "debug": { "enable-keyspan-actor": false, diff --git a/cdc/pkg/config/kvclient.go b/cdc/pkg/config/kvclient.go index 0df0261c..1a984196 100644 --- a/cdc/pkg/config/kvclient.go +++ b/cdc/pkg/config/kvclient.go @@ -13,6 +13,8 @@ package config +import "time" + // KVClientConfig represents config for kv client type KVClientConfig struct { // how many workers will be used for a single region worker @@ -21,4 +23,6 @@ type KVClientConfig struct { WorkerPoolSize int `toml:"worker-pool-size" json:"worker-pool-size"` // region incremental scan limit for one table in a single store RegionScanLimit int `toml:"region-scan-limit" json:"region-scan-limit"` + // the safe interval to move backward resolved ts + ResolvedTsSafeInterval time.Duration `toml:"resolved-ts-safe-interval" json:"resolved-ts-safe-interval"` } diff --git a/cdc/pkg/config/server_config.go b/cdc/pkg/config/server_config.go index 7a4254e7..3634501a 100644 --- a/cdc/pkg/config/server_config.go +++ b/cdc/pkg/config/server_config.go @@ -93,9 +93,10 @@ var defaultServerConfig = &ServerConfig{ Security: &SecurityConfig{}, PerKeySpanMemoryQuota: 10 * 1024 * 1024, // 10MB KVClient: &KVClientConfig{ - WorkerConcurrent: 8, - WorkerPoolSize: 0, // 0 will use NumCPU() * 2 - RegionScanLimit: 40, + WorkerConcurrent: 8, + WorkerPoolSize: 0, // 0 will use NumCPU() * 2 + RegionScanLimit: 40, + ResolvedTsSafeInterval: 3 * time.Second, }, Debug: &DebugConfig{ EnableKeySpanActor: false,