Skip to content

Commit

Permalink
[fix][dingo-exec] Interrupt with infinite regionSplit exception
Browse files Browse the repository at this point in the history
  • Loading branch information
guojn1 committed Nov 4, 2024
1 parent df6ab2f commit 5a5e0c8
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.dingodb.common.meta.SchemaState;
import io.dingodb.meta.DdlService;
import io.dingodb.meta.entity.Column;
import io.dingodb.meta.entity.IndexTable;
import io.dingodb.meta.entity.InfoSchema;
import io.dingodb.meta.entity.Table;

Expand Down Expand Up @@ -95,7 +96,11 @@ public Object[] getIndexCol(Table index, String columnName, int seqIndex, boolea
Object[] val = new Object[16];
val[0] = tableName;
val[1] = unique ? "1" : "0";
val[2] = index.getName();
if (index instanceof IndexTable) {
val[2] = index.getName();
} else {
val[2] = "PRIMARY";
}
val[3] = seqIndex;
val[4] = columnName;
val[5] = 'A';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,21 @@ private static CompletableFuture<Boolean> push(
.exceptionally(ex -> {
if (ex != null) {
if (ex.getCause() instanceof RegionSplitException) {
int retry;
if (param.getSplitRetry().containsKey(distribution.getId())) {
int retryCnt = param.getSplitRetry().get(distribution.getId());
retry = retryCnt + 1;
} else {
retry = 1;
}
if (retry > 50) {
MetaService.root().invalidateDistribution(param.getTd().getTableId());
}
if (retry > 100) {
LogUtils.error(log, ex.getMessage(), ex);
throw new RuntimeException("The number of split retries exceeds the maximum limit");
}
param.getSplitRetry().put(distribution.getId(), retry);
NavigableMap<ByteArrayUtils.ComparableByteArray, RangeDistribution> tmpDistribution =
MetaService.root().getRangeDistribution(param.getTd().getTableId());
DistributionSourceParam copyParam = param.copy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.dingodb.common.CommonId;
import io.dingodb.common.partition.RangeDistribution;
import io.dingodb.common.util.ByteArrayUtils;
import io.dingodb.common.util.Optional;
Expand All @@ -30,7 +31,9 @@
import lombok.Getter;
import lombok.Setter;

import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentHashMap;

@Getter
@JsonTypeName("distributionSource")
Expand Down Expand Up @@ -62,6 +65,8 @@ public class DistributionSourceParam extends SourceParam {
private int keepOrder;
@JsonProperty("concurrencyLevel")
private final int concurrencyLevel;
@Setter
private Map<CommonId, Integer> splitRetry = new ConcurrentHashMap<>();

public DistributionSourceParam(
Table td,
Expand Down

0 comments on commit 5a5e0c8

Please sign in to comment.