Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce region error handler unit test #771

Open
wants to merge 3 commits into
base: release-3.3.1.14
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 94 additions & 15 deletions src/test/java/org/tikv/common/KVMockServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,22 @@
import org.slf4j.LoggerFactory;
import org.tikv.common.key.Key;
import org.tikv.common.region.TiRegion;
import org.tikv.kvproto.Coprocessor;
import org.tikv.kvproto.Errorpb;
import org.tikv.kvproto.*;
import org.tikv.kvproto.Errorpb.EpochNotMatch;
import org.tikv.kvproto.Errorpb.Error;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.kvproto.Kvrpcpb.Context;
import org.tikv.kvproto.TikvGrpc;

public class KVMockServer extends TikvGrpc.TikvImplBase {

private static final Logger logger = LoggerFactory.getLogger(KVMockServer.class);
private int port;
private Server server;
private TiRegion region;
private TiRegion firstRegion;
private Iterable<Metapb.Region> subregions;
private Metapb.Peer newLeader;
private Errorpb.StoreNotMatch storeNotMatch;
private boolean tempError = false;
private State state = State.Normal;
private final TreeMap<Key, ByteString> dataMap = new TreeMap<>();
private final Map<Key, Supplier<Errorpb.Error.Builder>> regionErrMap = new HashMap<>();
Expand Down Expand Up @@ -97,6 +99,26 @@ public void setRegion(TiRegion region) {
this.region = region;
}

public void setNewLeader(Metapb.Peer leader) {
newLeader = leader;
}

public void setSubregions(Iterable<Metapb.Region> subregions) {
this.subregions = subregions;
}

public void setTempError(boolean tempError) {
this.tempError = tempError;
}

public void setStoreNotMatch(int request, int actual) {
this.storeNotMatch =
Errorpb.StoreNotMatch.newBuilder()
.setRequestStoreId(request)
.setActualStoreId(actual)
.build();
}

public void put(ByteString key, ByteString value) {
dataMap.put(toRawKey(key), value);
}
Expand All @@ -117,23 +139,79 @@ public void putError(String key, Supplier<Errorpb.Error.Builder> builder) {
regionErrMap.put(toRawKey(key.getBytes(StandardCharsets.UTF_8)), builder);
}

private Supplier<Errorpb.Error.Builder> takeError(Key key, boolean tempError) {
if (tempError) {
return regionErrMap.remove(key);
} else {
return regionErrMap.get(key);
}
}

public void clearAllMap() {
dataMap.clear();
regionErrMap.clear();
}

public void reset() {
clearAllMap();
state = State.Normal;
storeNotMatch = null;
newLeader = null;
subregions = null;
region = firstRegion;
tempError = false;
}

private <T> boolean tryBuilderRegionError(Key key, T builder) throws Exception {
Supplier<Errorpb.Error.Builder> errProvider = takeError(key, tempError);
if (errProvider != null) {
Error.Builder eb = errProvider.get();
if (eb != null) {
builder
.getClass()
.getMethod("setRegionError", new Class<?>[]{Error.class})
.invoke(builder, eb.build());
}
return true;
}
return false;
}

private Errorpb.Error verifyContext(Context context) throws Exception {
if (context.getRegionId() != region.getId() || !context.getPeer().equals(region.getLeader())) {
throw new Exception("context doesn't match");
if (context.getRegionId() != region.getId()) {
String errMsg =
String.format(
"client context mismatch: server: %s; client: %s",
region.toString(), context.toString());
throw new Exception(errMsg);
}

logger.warn("local region: " + region.toString());
logger.warn("client context: " + context);

Errorpb.Error.Builder errBuilder = Errorpb.Error.newBuilder();
if (storeNotMatch != null) {
return errBuilder.setStoreNotMatch(storeNotMatch).build();
}
if (!context.getPeer().equals(region.getLeader())) {
String warnMsg =
String.format("this store %d is not leader, new leader: %s", port, newLeader.toString());
logger.warn(warnMsg);
return errBuilder
.setNotLeader(
Errorpb.NotLeader.newBuilder()
.setRegionId(context.getRegionId())
.setLeader(newLeader)
.build())
.build();
}

if (!context.getRegionEpoch().equals(region.getRegionEpoch())) {
return errBuilder
.setEpochNotMatch(EpochNotMatch.newBuilder().addCurrentRegions(region.getMeta()).build())
.setEpochNotMatch(EpochNotMatch.newBuilder().addAllCurrentRegions(subregions).build())
.build();
}

return null;
}

Expand All @@ -157,13 +235,7 @@ public void rawGet(
return;
}

Supplier<Errorpb.Error.Builder> errProvider = regionErrMap.get(key);
if (errProvider != null) {
Error.Builder eb = errProvider.get();
if (eb != null) {
builder.setRegionError(eb.build());
}
} else {
if (!tryBuilderRegionError(key, builder)) {
ByteString value = dataMap.get(key);
if (value == null) {
value = ByteString.EMPTY;
Expand Down Expand Up @@ -413,6 +485,7 @@ public int start(TiRegion region) throws IOException {
}

private static class HealCheck extends HealthImplBase {

@Override
public void check(
HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
Expand All @@ -425,6 +498,7 @@ public void check(
public void start(TiRegion region, int port) throws IOException {
this.port = port;
this.region = region;
this.firstRegion = region;

logger.info("start mock server on port: " + port);
server =
Expand All @@ -434,7 +508,12 @@ public void start(TiRegion region, int port) throws IOException {

public void stop() {
if (server != null) {
server.shutdown();
server.shutdownNow();
try {
server.awaitTermination();
} catch (Exception ignore) {

}
}
}
}
9 changes: 6 additions & 3 deletions src/test/java/org/tikv/common/MockServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,12 @@ public void setup() throws IOException {
s.stream().map(TiStore::new).collect(Collectors.toList()));
leader.addGetRegionListener(
request -> Pdpb.GetRegionResponse.newBuilder().setRegion(r).build());
for (Metapb.Store store : s) {
leader.addGetStoreListener(
(request) -> Pdpb.GetStoreResponse.newBuilder().setStore(store).build());
for (PDMockServer pd : pdServers) {
pd.addGetStoreListener(
request ->
Pdpb.GetStoreResponse.newBuilder()
.setStore(s.get((int) (request.getStoreId() - 1)))
.build());
}
server = new KVMockServer();
port = server.start(region);
Expand Down
8 changes: 8 additions & 0 deletions src/test/java/org/tikv/common/MockThreeStoresTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
public class MockThreeStoresTest extends PDMockServerTest {

protected TiRegion region;
protected TiRegion firstRegion;
protected List<KVMockServer> servers = new ArrayList<>();
protected List<Metapb.Store> stores;

Expand Down Expand Up @@ -108,6 +109,13 @@ public void setup() throws IOException {
region.getPeers(0),
region.getPeersList(),
stores.stream().map(TiStore::new).collect(Collectors.toList()));
this.firstRegion =
new TiRegion(
session.getConf(),
region,
region.getPeers(0),
region.getPeersList(),
stores.stream().map(TiStore::new).collect(Collectors.toList()));
for (int port : ports) {
KVMockServer server = new KVMockServer();
server.start(this.region, port);
Expand Down
3 changes: 2 additions & 1 deletion src/test/java/org/tikv/common/PDMockServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ void setup(String addr) throws IOException {
TiConfiguration conf = TiConfiguration.createDefault(addr + ":" + ports[0]);
conf.setKvMode("RAW");
conf.setWarmUpEnable(false);
conf.setTimeout(2000);
conf.setTimeout(20000);
conf.setRawKVReadTimeoutInMS(2000);
conf.setEnableGrpcForward(true);

session = TiSession.create(conf);
Expand Down
Loading
Loading