Skip to content

Commit

Permalink
[fix #193] fix cdc lose data by add resolved ts interval (#196)
Browse files Browse the repository at this point in the history
* resolvedts - x

Signed-off-by: zeminzhou <[email protected]>

* resolvedts - x

Signed-off-by: zeminzhou <[email protected]>

* resolvedts - x

Signed-off-by: zeminzhou <[email protected]>

* fix comment & ut

Signed-off-by: zeminzhou <[email protected]>

* add ut

Signed-off-by: zeminzhou <[email protected]>

* fix ut

Signed-off-by: zeminzhou <[email protected]>

* fix ut

Signed-off-by: zeminzhou <[email protected]>

* fix comment

Signed-off-by: zeminzhou <[email protected]>

* fix ut

Signed-off-by: zeminzhou <[email protected]>

* remove tmp

Signed-off-by: zeminzhou <[email protected]>

* fix comment

Signed-off-by: zeminzhou <[email protected]>

* fix ut

Signed-off-by: zeminzhou <[email protected]>

* fix kv ut timeout

Signed-off-by: zeminzhou <[email protected]>

* fix ut

Signed-off-by: zeminzhou <[email protected]>

* fix check

Signed-off-by: zeminzhou <[email protected]>

* fix ut

Signed-off-by: zeminzhou <[email protected]>
  • Loading branch information
zeminzhou authored Aug 8, 2022
1 parent 86810be commit 67011f4
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 39 deletions.
45 changes: 19 additions & 26 deletions cdc/cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,14 +332,14 @@ func waitRequestID(c *check.C, allocatedID uint64) {
func (s *clientSuite) TestConnectOfflineTiKV(c *check.C) {
defer testleak.AfterTest(c)()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg := &sync.WaitGroup{}
ch2 := make(chan *cdcpb.ChangeDataEvent, 10)
srv := newMockChangeDataService(c, ch2)
server2, addr := newMockService(ctx, c, srv, wg)
defer func() {
close(ch2)
server2.Stop()
cancel()
wg.Wait()
}()

Expand Down Expand Up @@ -410,22 +410,20 @@ func (s *clientSuite) TestConnectOfflineTiKV(c *check.C) {
case <-time.After(time.Second):
c.Fatalf("reconnection not succeed in 1 second")
}
checkEvent(event, 1)
checkEvent(event, GetSafeResolvedTs(1))

select {
case event = <-eventCh:
case <-time.After(time.Second):
c.Fatalf("reconnection not succeed in 1 second")
}
checkEvent(event, ver.Ver)
checkEvent(event, GetSafeResolvedTs(ver.Ver))

// check gRPC connection active counter is updated correctly
bucket, ok := grpcPool.bucketConns[invalidStore]
c.Assert(ok, check.IsTrue)
empty := bucket.recycle()
c.Assert(empty, check.IsTrue)

cancel()
}

func (s *clientSuite) TestRecvLargeMessageSize(c *check.C) {
Expand All @@ -438,10 +436,9 @@ func (s *clientSuite) TestRecvLargeMessageSize(c *check.C) {
defer func() {
close(ch2)
server2.Stop()
cancel()
wg.Wait()
}()
// Cancel first, and then close the server.
defer cancel()

rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -510,7 +507,6 @@ func (s *clientSuite) TestRecvLargeMessageSize(c *check.C) {
c.Fatalf("receiving message takes too long")
}
c.Assert(len(event.Val.Value), check.Equals, largeValSize)
cancel()
}

func (s *clientSuite) TestHandleError(c *check.C) {
Expand All @@ -531,6 +527,7 @@ func (s *clientSuite) TestHandleError(c *check.C) {
server1.Stop()
close(ch2)
server2.Stop()
cancel()
wg.Wait()
}()

Expand Down Expand Up @@ -678,8 +675,6 @@ consumePreResolvedTs:
}
c.Assert(event.Resolved, check.NotNil)
c.Assert(event.Resolved.ResolvedTs, check.Equals, uint64(120))

cancel()
}

// TestCompatibilityWithSameConn tests kv client returns an error when TiKV returns
Expand All @@ -696,6 +691,7 @@ func (s *clientSuite) TestCompatibilityWithSameConn(c *check.C) {
defer func() {
close(ch1)
server1.Stop()
cancel()
wg.Wait()
}()

Expand Down Expand Up @@ -744,7 +740,6 @@ func (s *clientSuite) TestCompatibilityWithSameConn(c *check.C) {
}}
ch1 <- incompatibility
wg2.Wait()
cancel()
}

// TestClusterIDMismatch tests kv client returns an error when TiKV returns
Expand All @@ -761,6 +756,7 @@ func (s *clientSuite) TestClusterIDMismatch(c *check.C) {
defer func() {
close(changeDataCh)
mockService.Stop()
cancel()
wg.Wait()
}()

Expand Down Expand Up @@ -816,7 +812,6 @@ func (s *clientSuite) TestClusterIDMismatch(c *check.C) {
changeDataCh <- clusterIDMismatchEvent

wg2.Wait()
cancel()
}

func (s *clientSuite) testHandleFeedEvent(c *check.C) {
Expand All @@ -830,6 +825,7 @@ func (s *clientSuite) testHandleFeedEvent(c *check.C) {
defer func() {
close(ch1)
server1.Stop()
cancel()
wg.Wait()
}()

Expand Down Expand Up @@ -1230,8 +1226,6 @@ func (s *clientSuite) testHandleFeedEvent(c *check.C) {
c.Errorf("expected event %v not received", multipleExpected)
}
}

cancel()
}

func (s *clientSuite) TestHandleFeedEvent(c *check.C) {
Expand Down Expand Up @@ -1259,8 +1253,10 @@ func (s *clientSuite) TestStreamSendWithError(c *check.C) {
defer testleak.AfterTest(c)()
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
defer wg.Wait()
defer cancel()
defer func() {
cancel()
wg.Wait()
}()

server1Stopped := make(chan struct{})
ch1 := make(chan *cdcpb.ChangeDataEvent, 10)
Expand Down Expand Up @@ -1383,6 +1379,7 @@ func (s *clientSuite) testStreamRecvWithError(c *check.C, failpointStr string) {
defer func() {
close(ch1)
server1.Stop()
cancel()
wg.Wait()
}()

Expand Down Expand Up @@ -1486,7 +1483,6 @@ eventLoop:
}
}
c.Assert(events, check.DeepEquals, expected)
cancel()
}

// TestStreamRecvWithErrorAndResolvedGoBack mainly tests the scenario that the `Recv` call of a gPRC
Expand Down Expand Up @@ -1722,6 +1718,7 @@ func (s *clientSuite) TestIncompatibleTiKV(c *check.C) {
defer func() {
close(ch1)
server1.Stop()
cancel()
wg.Wait()
}()

Expand Down Expand Up @@ -1785,8 +1782,6 @@ func (s *clientSuite) TestIncompatibleTiKV(c *check.C) {
case <-time.After(time.Second):
c.Errorf("expected events are not receive")
}

cancel()
}

// TestPendingRegionError tests kv client should return an error when receiving
Expand All @@ -1803,6 +1798,7 @@ func (s *clientSuite) TestNoPendingRegionError(c *check.C) {
defer func() {
close(ch1)
server1.Stop()
cancel()
wg.Wait()
}()

Expand Down Expand Up @@ -1862,8 +1858,6 @@ func (s *clientSuite) TestNoPendingRegionError(c *check.C) {
ev = <-eventCh
c.Assert(ev.Resolved, check.NotNil)
c.Assert(ev.Resolved.ResolvedTs, check.Equals, uint64(200))

cancel()
}

// TestDropStaleRequest tests kv client should drop an event if its request id is outdated.
Expand All @@ -1879,6 +1873,7 @@ func (s *clientSuite) TestDropStaleRequest(c *check.C) {
defer func() {
close(ch1)
server1.Stop()
cancel()
wg.Wait()
}()

Expand Down Expand Up @@ -1967,7 +1962,6 @@ func (s *clientSuite) TestDropStaleRequest(c *check.C) {
c.Errorf("expected event %v not received", expectedEv)
}
}
cancel()
}

// TestResolveLock tests the resolve lock logic in kv client
Expand All @@ -1983,6 +1977,7 @@ func (s *clientSuite) TestResolveLock(c *check.C) {
defer func() {
close(ch1)
server1.Stop()
cancel()
wg.Wait()
}()

Expand Down Expand Up @@ -2044,7 +2039,7 @@ func (s *clientSuite) TestResolveLock(c *check.C) {
{
Resolved: &model.ResolvedSpan{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: tso,
ResolvedTs: GetSafeResolvedTs(tso),
},
RegionID: regionID,
},
Expand All @@ -2062,8 +2057,6 @@ func (s *clientSuite) TestResolveLock(c *check.C) {
// sleep 10s to simulate no resolved event longer than ResolveLockInterval
// resolve lock check ticker is 5s.
time.Sleep(10 * time.Second)

cancel()
}

func (s *clientSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.ChangeDataEvent) {
Expand All @@ -2077,6 +2070,7 @@ func (s *clientSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.Chan
defer func() {
close(ch1)
server1.Stop()
cancel()
wg.Wait()
}()

Expand Down Expand Up @@ -2128,7 +2122,6 @@ func (s *clientSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.Chan
ch1 <- event
}
clientWg.Wait()
cancel()
}

// TODO(resolved-ts): should panic. Just logging as error now.
Expand Down
27 changes: 27 additions & 0 deletions cdc/cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,15 @@ func (w *regionWorker) handleResolvedTs(
return nil
}
regionID := state.sri.verID.GetID()

// In TiKV, when a leader transfer occurs, the old leader may send the last
// resolved ts, which may be larger than the new leader's first causal
// timestamp after transfer. So we fallback the resolved ts to a safe
// interval to make sure it's correct.
// See https://github.com/tikv/migration/issues/193.
// TODO: fix the issue completely.
resolvedTs = GetSafeResolvedTs(resolvedTs)

// Send resolved ts update in non blocking way, since we can re-query real
// resolved ts from region state even if resolved ts update is discarded.
// NOTICE: We send any regionTsInfo to resolveLock thread to give us a chance to trigger resolveLock logic
Expand Down Expand Up @@ -828,3 +837,21 @@ func RunWorkerPool(ctx context.Context) error {
})
return errg.Wait()
}

func GetSafeResolvedTs(resolvedTs uint64) uint64 {
cfg := config.GetGlobalServerConfig().KVClient

logicalTs := oracle.ExtractLogical(resolvedTs)
physicalTime := oracle.GetTimeFromTS(resolvedTs)

safeTime := physicalTime.Add(-cfg.ResolvedTsSafeInterval)
physicalTs := oracle.GetPhysical(safeTime)

if physicalTs < 0 {
log.Warn("The resolvedTs is smaller than the ResolvedTsSafeInterval",
zap.Uint64("resolvedTs", resolvedTs))
return resolvedTs
}

return oracle.ComposeTS(physicalTs, logicalTs)
}
35 changes: 35 additions & 0 deletions cdc/cdc/kv/region_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,38 @@ func (s *regionWorkerSuite) TestRegionWorkerPoolSize(c *check.C) {
size = getWorkerPoolSize()
c.Assert(size, check.Equals, maxWorkerPoolSize)
}

func (s *regionWorkerSuite) TestGetSafeResolvedTs(c *check.C) {
defer testleak.AfterTest(c)()

testCases := []struct {
resolvedTs uint64
expected uint64
}{
{
resolvedTs: 0,
expected: 0,
},
{
resolvedTs: 10,
expected: 10,
},
{
resolvedTs: (1 << 18) * 3 * 1000,
expected: 0,
},
{
resolvedTs: (1<<18)*3*1000 + 1,
expected: 1,
},
{
resolvedTs: (1<<18)*4*1000 + 1,
expected: (1<<18)*1000 + 1,
},
}

for _, testCase := range testCases {
safeResolvedTs := GetSafeResolvedTs(testCase.resolvedTs)
c.Assert(testCase.expected, check.Equals, safeResolvedTs)
}
}
21 changes: 12 additions & 9 deletions cdc/pkg/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,10 @@ func TestParseCfg(t *testing.T) {
},
PerKeySpanMemoryQuota: 10 * 1024 * 1024, // 10M
KVClient: &config.KVClientConfig{
WorkerConcurrent: 8,
WorkerPoolSize: 0,
RegionScanLimit: 40,
WorkerConcurrent: 8,
WorkerPoolSize: 0,
RegionScanLimit: 40,
ResolvedTsSafeInterval: 3 * time.Second,
},
Debug: &config.DebugConfig{
EnableKeySpanActor: false,
Expand Down Expand Up @@ -299,9 +300,10 @@ server-worker-pool-size = 16
Security: &config.SecurityConfig{},
PerKeySpanMemoryQuota: 10 * 1024 * 1024, // 10M
KVClient: &config.KVClientConfig{
WorkerConcurrent: 8,
WorkerPoolSize: 0,
RegionScanLimit: 40,
WorkerConcurrent: 8,
WorkerPoolSize: 0,
RegionScanLimit: 40,
ResolvedTsSafeInterval: 3 * time.Second,
},
Debug: &config.DebugConfig{
EnableKeySpanActor: false,
Expand Down Expand Up @@ -427,9 +429,10 @@ cert-allowed-cn = ["dd","ee"]
},
PerKeySpanMemoryQuota: 10 * 1024 * 1024, // 10M
KVClient: &config.KVClientConfig{
WorkerConcurrent: 8,
WorkerPoolSize: 0,
RegionScanLimit: 40,
WorkerConcurrent: 8,
WorkerPoolSize: 0,
RegionScanLimit: 40,
ResolvedTsSafeInterval: 3 * time.Second,
},
Debug: &config.DebugConfig{
EnableKeySpanActor: false,
Expand Down
3 changes: 2 additions & 1 deletion cdc/pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ const (
"kv-client": {
"worker-concurrent": 8,
"worker-pool-size": 0,
"region-scan-limit": 40
"region-scan-limit": 40,
"resolved-ts-safe-interval": 3000000000
},
"debug": {
"enable-keyspan-actor": false,
Expand Down
4 changes: 4 additions & 0 deletions cdc/pkg/config/kvclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package config

import "time"

// KVClientConfig represents config for kv client
type KVClientConfig struct {
// how many workers will be used for a single region worker
Expand All @@ -21,4 +23,6 @@ type KVClientConfig struct {
WorkerPoolSize int `toml:"worker-pool-size" json:"worker-pool-size"`
// region incremental scan limit for one table in a single store
RegionScanLimit int `toml:"region-scan-limit" json:"region-scan-limit"`
// the safe interval to move backward resolved ts
ResolvedTsSafeInterval time.Duration `toml:"resolved-ts-safe-interval" json:"resolved-ts-safe-interval"`
}
Loading

0 comments on commit 67011f4

Please sign in to comment.