diff --git a/src/test/java/org/tikv/common/KVMockServer.java b/src/test/java/org/tikv/common/KVMockServer.java index 69d8a55ee0..b059d1f9e3 100644 --- a/src/test/java/org/tikv/common/KVMockServer.java +++ b/src/test/java/org/tikv/common/KVMockServer.java @@ -47,13 +47,10 @@ import org.slf4j.LoggerFactory; import org.tikv.common.key.Key; import org.tikv.common.region.TiRegion; -import org.tikv.kvproto.Coprocessor; -import org.tikv.kvproto.Errorpb; +import org.tikv.kvproto.*; import org.tikv.kvproto.Errorpb.EpochNotMatch; import org.tikv.kvproto.Errorpb.Error; -import org.tikv.kvproto.Kvrpcpb; import org.tikv.kvproto.Kvrpcpb.Context; -import org.tikv.kvproto.TikvGrpc; public class KVMockServer extends TikvGrpc.TikvImplBase { @@ -61,6 +58,11 @@ public class KVMockServer extends TikvGrpc.TikvImplBase { private int port; private Server server; private TiRegion region; + private TiRegion firstRegion; + private Iterable subregions; + private Metapb.Peer newLeader; + private Errorpb.StoreNotMatch storeNotMatch; + private boolean tempError = false; private State state = State.Normal; private final TreeMap dataMap = new TreeMap<>(); private final Map> regionErrMap = new HashMap<>(); @@ -97,6 +99,26 @@ public void setRegion(TiRegion region) { this.region = region; } + public void setNewLeader(Metapb.Peer leader) { + newLeader = leader; + } + + public void setSubregions(Iterable subregions) { + this.subregions = subregions; + } + + public void setTempError(boolean tempError) { + this.tempError = tempError; + } + + public void setStoreNotMatch(int request, int actual) { + this.storeNotMatch = + Errorpb.StoreNotMatch.newBuilder() + .setRequestStoreId(request) + .setActualStoreId(actual) + .build(); + } + public void put(ByteString key, ByteString value) { dataMap.put(toRawKey(key), value); } @@ -117,23 +139,79 @@ public void putError(String key, Supplier builder) { regionErrMap.put(toRawKey(key.getBytes(StandardCharsets.UTF_8)), builder); } + private Supplier takeError(Key key, boolean tempError) { + if (tempError) { + return regionErrMap.remove(key); + } else { + return regionErrMap.get(key); + } + } + public void clearAllMap() { dataMap.clear(); regionErrMap.clear(); } + public void reset() { + clearAllMap(); + state = State.Normal; + storeNotMatch = null; + newLeader = null; + subregions = null; + region = firstRegion; + tempError = false; + } + + private boolean tryBuilderRegionError(Key key, T builder) throws Exception { + Supplier errProvider = takeError(key, tempError); + if (errProvider != null) { + Error.Builder eb = errProvider.get(); + if (eb != null) { + builder + .getClass() + .getMethod("setRegionError", new Class[]{Error.class}) + .invoke(builder, eb.build()); + } + return true; + } + return false; + } + private Errorpb.Error verifyContext(Context context) throws Exception { - if (context.getRegionId() != region.getId() || !context.getPeer().equals(region.getLeader())) { - throw new Exception("context doesn't match"); + if (context.getRegionId() != region.getId()) { + String errMsg = + String.format( + "client context mismatch: server: %s; client: %s", + region.toString(), context.toString()); + throw new Exception(errMsg); } + logger.warn("local region: " + region.toString()); + logger.warn("client context: " + context); + Errorpb.Error.Builder errBuilder = Errorpb.Error.newBuilder(); + if (storeNotMatch != null) { + return errBuilder.setStoreNotMatch(storeNotMatch).build(); + } + if (!context.getPeer().equals(region.getLeader())) { + String warnMsg = + String.format("this store %d is not leader, new leader: %s", port, newLeader.toString()); + logger.warn(warnMsg); + return errBuilder + .setNotLeader( + Errorpb.NotLeader.newBuilder() + .setRegionId(context.getRegionId()) + .setLeader(newLeader) + .build()) + .build(); + } if (!context.getRegionEpoch().equals(region.getRegionEpoch())) { return errBuilder - .setEpochNotMatch(EpochNotMatch.newBuilder().addCurrentRegions(region.getMeta()).build()) + .setEpochNotMatch(EpochNotMatch.newBuilder().addAllCurrentRegions(subregions).build()) .build(); } + return null; } @@ -157,13 +235,7 @@ public void rawGet( return; } - Supplier errProvider = regionErrMap.get(key); - if (errProvider != null) { - Error.Builder eb = errProvider.get(); - if (eb != null) { - builder.setRegionError(eb.build()); - } - } else { + if (!tryBuilderRegionError(key, builder)) { ByteString value = dataMap.get(key); if (value == null) { value = ByteString.EMPTY; @@ -413,6 +485,7 @@ public int start(TiRegion region) throws IOException { } private static class HealCheck extends HealthImplBase { + @Override public void check( HealthCheckRequest request, StreamObserver responseObserver) { @@ -425,6 +498,7 @@ public void check( public void start(TiRegion region, int port) throws IOException { this.port = port; this.region = region; + this.firstRegion = region; logger.info("start mock server on port: " + port); server = @@ -434,7 +508,12 @@ public void start(TiRegion region, int port) throws IOException { public void stop() { if (server != null) { - server.shutdown(); + server.shutdownNow(); + try { + server.awaitTermination(); + } catch (Exception ignore) { + + } } } } diff --git a/src/test/java/org/tikv/common/MockServerTest.java b/src/test/java/org/tikv/common/MockServerTest.java index 02cab4c46f..790d7608dd 100644 --- a/src/test/java/org/tikv/common/MockServerTest.java +++ b/src/test/java/org/tikv/common/MockServerTest.java @@ -65,9 +65,12 @@ public void setup() throws IOException { s.stream().map(TiStore::new).collect(Collectors.toList())); leader.addGetRegionListener( request -> Pdpb.GetRegionResponse.newBuilder().setRegion(r).build()); - for (Metapb.Store store : s) { - leader.addGetStoreListener( - (request) -> Pdpb.GetStoreResponse.newBuilder().setStore(store).build()); + for (PDMockServer pd : pdServers) { + pd.addGetStoreListener( + request -> + Pdpb.GetStoreResponse.newBuilder() + .setStore(s.get((int) (request.getStoreId() - 1))) + .build()); } server = new KVMockServer(); port = server.start(region); diff --git a/src/test/java/org/tikv/common/MockThreeStoresTest.java b/src/test/java/org/tikv/common/MockThreeStoresTest.java index a88ecec8eb..74b8131b94 100644 --- a/src/test/java/org/tikv/common/MockThreeStoresTest.java +++ b/src/test/java/org/tikv/common/MockThreeStoresTest.java @@ -35,6 +35,7 @@ public class MockThreeStoresTest extends PDMockServerTest { protected TiRegion region; + protected TiRegion firstRegion; protected List servers = new ArrayList<>(); protected List stores; @@ -108,6 +109,13 @@ public void setup() throws IOException { region.getPeers(0), region.getPeersList(), stores.stream().map(TiStore::new).collect(Collectors.toList())); + this.firstRegion = + new TiRegion( + session.getConf(), + region, + region.getPeers(0), + region.getPeersList(), + stores.stream().map(TiStore::new).collect(Collectors.toList())); for (int port : ports) { KVMockServer server = new KVMockServer(); server.start(this.region, port); diff --git a/src/test/java/org/tikv/common/PDMockServerTest.java b/src/test/java/org/tikv/common/PDMockServerTest.java index edd3a19f44..bf8665d114 100644 --- a/src/test/java/org/tikv/common/PDMockServerTest.java +++ b/src/test/java/org/tikv/common/PDMockServerTest.java @@ -87,7 +87,8 @@ void setup(String addr) throws IOException { TiConfiguration conf = TiConfiguration.createDefault(addr + ":" + ports[0]); conf.setKvMode("RAW"); conf.setWarmUpEnable(false); - conf.setTimeout(2000); + conf.setTimeout(20000); + conf.setRawKVReadTimeoutInMS(2000); conf.setEnableGrpcForward(true); session = TiSession.create(conf); diff --git a/src/test/java/org/tikv/common/RegionErrorTest.java b/src/test/java/org/tikv/common/RegionErrorTest.java index 5341cbff60..cf8ad54a5e 100644 --- a/src/test/java/org/tikv/common/RegionErrorTest.java +++ b/src/test/java/org/tikv/common/RegionErrorTest.java @@ -17,18 +17,32 @@ package org.tikv.common; +import static org.junit.Assert.fail; + +import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; import java.util.stream.Collectors; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.tikv.common.KVMockServer.State; +import org.tikv.common.apiversion.RequestKeyCodec; import org.tikv.common.region.TiRegion; import org.tikv.common.region.TiStore; +import org.tikv.common.util.Pair; +import org.tikv.kvproto.Errorpb; +import org.tikv.kvproto.Errorpb.ServerIsBusy; +import org.tikv.kvproto.Errorpb.StaleCommand; import org.tikv.kvproto.Metapb; +import org.tikv.kvproto.Metapb.Peer; +import org.tikv.kvproto.Pdpb; import org.tikv.raw.RawKVClient; public class RegionErrorTest extends MockThreeStoresTest { + @Before public void init() throws Exception { upgradeToV2Cluster(); @@ -40,6 +54,7 @@ private RawKVClient createClient() { @Test public void testOnEpochNotMatch() { + initCluster(); try (RawKVClient client = createClient()) { // Construct a key that is less than the prefix of RAW API v2; ByteString key = ByteString.copyFromUtf8("key-test-epoch-not-match"); @@ -71,6 +86,7 @@ public void testOnEpochNotMatch() { // Update the region of each server for (KVMockServer server : servers) { server.setRegion(newRegion); + server.setSubregions(ImmutableList.of(newRegion.getMeta())); } // Forbid the client get region from PD leader. @@ -81,4 +97,443 @@ public void testOnEpochNotMatch() { Assert.assertEquals(Optional.of(value), client.get(key)); } } + + @Test + public void testOlderEpochNotMatch() { + initCluster(); + try (RawKVClient client = createClient()) { + // Construct a key that is less than the prefix of RAW API v2; + ByteString key = ByteString.copyFromUtf8("key-test-epoch-not-match"); + ByteString value = ByteString.copyFromUtf8("value"); + + ByteString requestKey = client.getSession().getPDClient().getCodec().encodeKey(key); + put(requestKey, value); + + Assert.assertEquals(Optional.of(value), client.get(key)); + + Metapb.Region newMeta = + Metapb.Region.newBuilder() + .mergeFrom(this.region.getMeta()) + .setRegionEpoch(Metapb.RegionEpoch.newBuilder().setConfVer(1).setVersion(1)) + .setStartKey(PDClientV2MockTest.encode(requestKey)) + .setEndKey(PDClientV2MockTest.encode(requestKey.concat(ByteString.copyFromUtf8("0")))) + .build(); + + // Increase the region epoch for the cluster, + // this will cause the cluster return an EpochNotMatch region error. + TiRegion newRegion = + new TiRegion( + this.region.getConf(), + newMeta, + this.region.getLeader(), + this.region.getPeersList(), + stores.stream().map(TiStore::new).collect(Collectors.toList())); + + // Update the region of each server + for (KVMockServer server : servers) { + server.setRegion(newRegion); + server.setSubregions(ImmutableList.of(newRegion.getMeta())); + } + + // Forbid the client get region from PD leader. + leader.addGetRegionListener(request -> null); + + // The get should success since the region cache + // will be updated the currentRegions of `EpochNotMatch` error. + try { + client.get(key); + fail( + "This request should fail since the local region epoch is newer than the remote region epoch."); + } catch (Exception ignored) { + } + } + } + + @Test + public void testDecodeEpochNotMatch() { + initCluster(); + try (RawKVClient client = createClient()) { + // Construct a key that is less than the prefix of RAW API v2; + ByteString key = ByteString.copyFromUtf8("key-test-decode-not-match"); + ByteString value = ByteString.copyFromUtf8("value"); + + RequestKeyCodec c = client.getSession().getPDClient().getCodec(); + ByteString requestKey = c.encodeKey(key); + put(requestKey, value); + + Assert.assertEquals(Optional.of(value), client.get(key)); + + Metapb.Region newMeta = + Metapb.Region.newBuilder() + .mergeFrom(this.region.getMeta()) + .setRegionEpoch(Metapb.RegionEpoch.newBuilder().setConfVer(2).setVersion(3)) + .setStartKey(PDClientV2MockTest.encode(requestKey)) + .setEndKey(PDClientV2MockTest.encode(requestKey.concat(ByteString.copyFromUtf8("0")))) + .build(); + + // Increase the region epoch for the cluster, + // this will cause the cluster return an EpochNotMatch region error. + TiRegion newRegion = + new TiRegion( + this.region.getConf(), + newMeta, + this.region.getLeader(), + this.region.getPeersList(), + stores.stream().map(TiStore::new).collect(Collectors.toList())); + + // ["", minKeyspaceKey) + Metapb.Region newRegionA = + Metapb.Region.newBuilder() + .mergeFrom(this.region.getMeta()) + .setId(10) + .setRegionEpoch(Metapb.RegionEpoch.newBuilder().setConfVer(2).setVersion(3)) + .setStartKey(ByteString.EMPTY) + .setEndKey(PDClientV2MockTest.encode(c.encodeKey(ByteString.EMPTY))) + .build(); + // [minKeyspaceKey, maxKeyspaceKey) + Pair range = c.encodeRange(ByteString.EMPTY, ByteString.EMPTY); + Metapb.Region newRegionB = + Metapb.Region.newBuilder() + .mergeFrom(this.region.getMeta()) + .setRegionEpoch(Metapb.RegionEpoch.newBuilder().setConfVer(2).setVersion(3)) + .setStartKey(PDClientV2MockTest.encode(range.first)) + .setEndKey(PDClientV2MockTest.encode(range.second)) + .build(); + // [maxKeyspaceKey, "") + Metapb.Region newRegionC = + Metapb.Region.newBuilder() + .mergeFrom(this.region.getMeta()) + .setId(14) + .setRegionEpoch(Metapb.RegionEpoch.newBuilder().setConfVer(2).setVersion(3)) + .setStartKey(PDClientV2MockTest.encode(range.second)) + .setEndKey(ByteString.EMPTY) + .build(); + List subregions = + new ArrayList() { + { + add(newRegionA); + add(newRegionB); + add(newRegionC); + } + }; + + // Update the region of each server + for (KVMockServer server : servers) { + server.setRegion(newRegion); + server.setSubregions(subregions); + } + + // Forbid the client get region from PD leader. + leader.addGetRegionListener( + request -> { + return Pdpb.GetRegionResponse.newBuilder() + .setRegion(newRegionA) + .setLeader(newRegionA.getPeers(0)) + .build(); + }); + + // The get should success since the region cache + // will be updated the currentRegions of `EpochNotMatch` error. + Assert.assertEquals(Optional.of(value), client.get(key)); + } + } + + @Test + public void testEmptyEpochNotMatch() { + initCluster(); + try (RawKVClient client = createClient()) { + // Construct a key that is less than the prefix of RAW API v2; + ByteString key = ByteString.copyFromUtf8("key-test-empty-epoch-not-match"); + ByteString value = ByteString.copyFromUtf8("value"); + + RequestKeyCodec c = client.getSession().getPDClient().getCodec(); + ByteString requestKey = c.encodeKey(key); + put(requestKey, value); + + Assert.assertEquals(Optional.of(value), client.get(key)); + + servers + .get(0) + .putError( + requestKey.toStringUtf8(), + () -> + Errorpb.Error.newBuilder() + .setEpochNotMatch(Errorpb.EpochNotMatch.getDefaultInstance())); + + TiRegion newRegion = + new TiRegion( + this.region.getConf(), + region.getMeta(), + Metapb.Peer.newBuilder().setId(2).setStoreId(2).build(), + this.region.getPeersList(), + stores.stream().map(TiStore::new).collect(Collectors.toList())); + + leader.addGetRegionListener( + request -> { + return Pdpb.GetRegionResponse.newBuilder() + .setRegion(newRegion.getMeta()) + .setLeader(newRegion.getLeader()) + .build(); + }); + + for (int i = 1; i < servers.size(); i++) { + servers.get(i).setRegion(newRegion); + } + + Assert.assertEquals(Optional.of(value), client.get(key)); + } + } + + @Test + public void testTiKVFailed() { + initCluster(); + try (RawKVClient client = createClient()) { + // Construct a key that is less than the prefix of RAW API v2; + ByteString key = ByteString.copyFromUtf8("key-test-tikv-failed"); + ByteString value = ByteString.copyFromUtf8("value"); + + RequestKeyCodec c = client.getSession().getPDClient().getCodec(); + ByteString requestKey = c.encodeKey(key); + put(requestKey, value); + + Assert.assertEquals(Optional.of(value), client.get(key)); + for (KVMockServer server : servers) { + server.setState(State.Fail); + } + try { + Assert.assertEquals(Optional.of(value), client.get(key)); + fail(); + } catch (Exception ignored) { + } + } + } + + @Test + public void testNotLeader() { + initCluster(); + try (RawKVClient client = createClient()) { + // Construct a key that is less than the prefix of RAW API v2; + ByteString key = ByteString.copyFromUtf8("key-test-not-leader"); + ByteString value = ByteString.copyFromUtf8("value"); + + RequestKeyCodec c = client.getSession().getPDClient().getCodec(); + ByteString requestKey = c.encodeKey(key); + put(requestKey, value); + + Assert.assertEquals(Optional.of(value), client.get(key)); + + // Forbid the client get region from PD leader. + leader.addGetRegionListener(request -> null); + + // Increase the region epoch for the cluster, + // this will cause the cluster return an EpochNotMatch region error. + TiRegion newRegion = + new TiRegion( + this.region.getConf(), + region.getMeta(), + Metapb.Peer.newBuilder().setId(2).setStoreId(2).build(), + this.region.getPeersList(), + stores.stream().map(TiStore::new).collect(Collectors.toList())); + + for (KVMockServer server : servers) { + server.setRegion(newRegion); + server.setNewLeader(newRegion.getLeader()); + } + + // The get should success since the leader is in the region error resp + Assert.assertEquals(Optional.of(value), client.get(key)); + + newRegion = + new TiRegion( + this.region.getConf(), + region.getMeta(), + Metapb.Peer.newBuilder().setId(3).setStoreId(3).build(), + this.region.getPeersList(), + stores.stream().map(TiStore::new).collect(Collectors.toList())); + + for (KVMockServer server : servers) { + server.setRegion(newRegion); + server.setNewLeader(Metapb.Peer.getDefaultInstance()); + } + + TiRegion finalNewRegion = newRegion; + leader.addGetRegionListener( + request -> + Pdpb.GetRegionResponse.newBuilder() + .setRegion(finalNewRegion.getMeta()) + .setLeader(Metapb.Peer.newBuilder().setId(3).setStoreId(3).build()) + .build()); + + Assert.assertEquals(Optional.of(value), client.get(key)); + } + } + + @Test + public void testStoreNotMatch() { + initCluster(); + try (RawKVClient client = createClient()) { + // Construct a key that is less than the prefix of RAW API v2; + ByteString key = ByteString.copyFromUtf8("key-test-store-not-match"); + ByteString value = ByteString.copyFromUtf8("value"); + + RequestKeyCodec c = client.getSession().getPDClient().getCodec(); + ByteString requestKey = c.encodeKey(key); + put(requestKey, value); + + Assert.assertEquals(Optional.of(value), client.get(key)); + + TiRegion newRegion = + new TiRegion( + this.region.getConf(), + region.getMeta(), + Metapb.Peer.newBuilder().setId(3).setStoreId(3).build(), + this.region.getPeersList(), + stores.stream().map(TiStore::new).collect(Collectors.toList())); + servers.get(0).setStoreNotMatch(1, 2); + for (KVMockServer server : servers) { + server.setRegion(newRegion); + } + leader.addGetRegionListener( + request -> + Pdpb.GetRegionResponse.newBuilder() + .setRegion(region.getMeta()) + .setLeader(Peer.newBuilder().setStoreId(2).setId(2).build()) + .build()); + + Assert.assertEquals(Optional.of(value), client.get(key)); + } + } + + public void testRetryableRegionError(String caseName, Errorpb.Error.Builder eb) { + initCluster(); + // Set the first server to be temporarily unavailable in the first try. + servers.get(0).setTempError(true); + try (RawKVClient client = createClient()) { + // Construct a key that is less than the prefix of RAW API v2; + ByteString key = ByteString.copyFromUtf8("key-test-" + caseName); + ByteString value = ByteString.copyFromUtf8("value"); + + RequestKeyCodec c = client.getSession().getPDClient().getCodec(); + ByteString requestKey = c.encodeKey(key); + put(requestKey, value); + + // First try should success, and refresh region cache. + Assert.assertEquals(Optional.of(value), client.get(key)); + servers.get(0).putError(requestKey.toStringUtf8(), () -> eb); + Assert.assertEquals(Optional.of(value), client.get(key)); + } + } + + public void testUnretryableRegionError(String caseName, Errorpb.Error.Builder eb) { + initCluster(); + try (RawKVClient client = createClient()) { + // Construct a key that is less than the prefix of RAW API v2; + ByteString key = ByteString.copyFromUtf8("key-test-" + caseName); + ByteString value = ByteString.copyFromUtf8("value"); + + RequestKeyCodec c = client.getSession().getPDClient().getCodec(); + ByteString requestKey = c.encodeKey(key); + put(requestKey, value); + + // First try should success, and refresh region cache. + Assert.assertEquals(Optional.of(value), client.get(key)); + + servers.get(0).putError(requestKey.toStringUtf8(), () -> eb); + // The request should fail since the region cache is empty and pd is not available. + try { + client.get(key); + fail(); + } catch (Exception ignore) { + } + + // Forbid the client get region from PD leader. + leader.addGetRegionListener(request -> null); + try { + client.get(key); + fail(); + } catch (Exception ignore) { + } + + leader.addGetRegionListener( + request -> + Pdpb.GetRegionResponse.newBuilder() + .setRegion(region.getMeta()) + .setLeader(Peer.newBuilder().setStoreId(2).setId(2).build()) + .build()); + + TiRegion newRegion = + new TiRegion( + this.region.getConf(), + region.getMeta(), + Metapb.Peer.newBuilder().setId(2).setStoreId(2).build(), + this.region.getPeersList(), + stores.stream().map(TiStore::new).collect(Collectors.toList())); + for (KVMockServer server : servers) { + server.setRegion(newRegion); + } + + Assert.assertEquals(Optional.of(value), client.get(key)); + } + } + + @Test + public void testKeyNotInRegion() { + testUnretryableRegionError( + "key-not-in-region", + Errorpb.Error.newBuilder().setKeyNotInRegion(Errorpb.KeyNotInRegion.getDefaultInstance())); + } + + @Test + public void testRaftEntryTooLarge() { + testUnretryableRegionError( + "raft-entry-too-large", + Errorpb.Error.newBuilder() + .setRaftEntryTooLarge(Errorpb.RaftEntryTooLarge.getDefaultInstance())); + } + + @Test + public void testServerIsBusy() { + testRetryableRegionError( + "server-is-busy", + Errorpb.Error.newBuilder().setServerIsBusy(ServerIsBusy.getDefaultInstance())); + } + + @Test + public void testStaleCommand() { + testRetryableRegionError( + "stale-command", + Errorpb.Error.newBuilder().setStaleCommand(StaleCommand.getDefaultInstance())); + } + + @Test + public void testTsoBatchUsedUp() { + testRetryableRegionError( + "tso-batch-used-up", Errorpb.Error.newBuilder().setMessage("TsoBatchUsedUp")); + } + + @Test + public void testRaftProposalDropped() { + testRetryableRegionError( + "raft-proposal-dropped", Errorpb.Error.newBuilder().setMessage("Raft ProposalDropped")); + } + + @Test + public void testUnknownRegionError() { + testRetryableRegionError( + "unknown-region-error", Errorpb.Error.newBuilder().setMessage("Unknown Region Error")); + } + + private void initCluster() { + for (KVMockServer server : servers) { + server.reset(); + } + leader.addGetRegionListener( + request -> + Pdpb.GetRegionResponse.newBuilder() + .setRegion(firstRegion.getMeta()) + .setLeader(firstRegion.getLeader()) + .build()); + // Reset cluster regions; + session.getRegionManager().invalidateAll(); + } }