Skip to content

Commit

Permalink
[fix][dingo-exec] Interrupt with infinite regionSplit exception
Browse files Browse the repository at this point in the history
  • Loading branch information
guojn1 committed Nov 5, 2024
1 parent df6ab2f commit 9dbcd93
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.dingodb.common.meta.SchemaState;
import io.dingodb.meta.DdlService;
import io.dingodb.meta.entity.Column;
import io.dingodb.meta.entity.IndexTable;
import io.dingodb.meta.entity.InfoSchema;
import io.dingodb.meta.entity.Table;

Expand Down Expand Up @@ -95,7 +96,11 @@ public Object[] getIndexCol(Table index, String columnName, int seqIndex, boolea
Object[] val = new Object[16];
val[0] = tableName;
val[1] = unique ? "1" : "0";
val[2] = index.getName();
if (index instanceof IndexTable) {
val[2] = index.getName();
} else {
val[2] = "PRIMARY";
}
val[3] = seqIndex;
val[4] = columnName;
val[5] = 'A';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package io.dingodb.exec.operator;

import io.dingodb.common.concurrent.Executors;
import io.dingodb.common.config.DingoConfiguration;
import io.dingodb.common.log.LogUtils;
import io.dingodb.common.partition.RangeDistribution;
import io.dingodb.common.util.ByteArrayUtils;
import io.dingodb.common.util.Optional;
import io.dingodb.common.util.RangeUtils;
import io.dingodb.common.util.Utils;
import io.dingodb.exec.dag.Vertex;
Expand Down Expand Up @@ -157,13 +159,30 @@ private static CompletableFuture<Boolean> push(
copyContext.setDistribution(distribution);
return vertex.getSoleEdge().transformToNext(copyContext, null);
};
Integer maxRetry = Optional.mapOrGet(DingoConfiguration.instance()
.find("retry", int.class), __ -> __, () -> 30);
return CompletableFuture.supplyAsync(
supplier, Executors.executor(
"operator-" + vertex.getTask().getJobId() + "-"
+ vertex.getTask().getId() + "-" + vertex.getId() + "-" + distribution.getId()))
.exceptionally(ex -> {
if (ex != null) {
if (ex.getCause() instanceof RegionSplitException) {
int retry;
if (param.getSplitRetry().containsKey(distribution.getId())) {
int retryCnt = param.getSplitRetry().get(distribution.getId());
retry = retryCnt + 1;
} else {
retry = 1;
}
if (retry > 10) {
MetaService.root().invalidateDistribution(param.getTd().getTableId());
}
if (retry > maxRetry) {
LogUtils.error(log, ex.getMessage(), ex);
throw new RuntimeException("The number of split retries exceeds the maximum limit");
}
param.getSplitRetry().put(distribution.getId(), retry);
NavigableMap<ByteArrayUtils.ComparableByteArray, RangeDistribution> tmpDistribution =
MetaService.root().getRangeDistribution(param.getTd().getTableId());
DistributionSourceParam copyParam = param.copy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.dingodb.common.CommonId;
import io.dingodb.common.partition.RangeDistribution;
import io.dingodb.common.util.ByteArrayUtils;
import io.dingodb.common.util.Optional;
Expand All @@ -30,7 +31,9 @@
import lombok.Getter;
import lombok.Setter;

import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentHashMap;

@Getter
@JsonTypeName("distributionSource")
Expand Down Expand Up @@ -62,6 +65,8 @@ public class DistributionSourceParam extends SourceParam {
private int keepOrder;
@JsonProperty("concurrencyLevel")
private final int concurrencyLevel;
@Setter
private Map<CommonId, Integer> splitRetry = new ConcurrentHashMap<>();

public DistributionSourceParam(
Table td,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,12 @@
@Slf4j
public final class JobTableUtil {
private static final String updateDDLJobSQL = "update mysql.dingo_ddl_job set job_meta = '%s' where job_id = %d";
private static final String getJobSQL = "select job_meta, processing, job_id from mysql.dingo_ddl_job where job_id in (select min(job_id) from mysql.dingo_ddl_job group by schema_ids, table_ids, processing) and %s reorg %s order by processing desc, job_id";
private static final String getJobsSQL = "select job_meta, processing, job_id, table_ids from mysql.dingo_ddl_job where job_id in (select min(job_id) from mysql.dingo_ddl_job group by schema_ids, table_ids) %s order by processing desc, job_id";
private static final String getJobSQL = "select job_meta, processing, job_id from mysql.dingo_ddl_job where "
+ "job_id in (select min(job_id) from mysql.dingo_ddl_job group by schema_ids, table_ids, processing) "
+ "and %s reorg %s order by processing desc, job_id";
private static final String getJobsSQL = "select job_meta, processing, job_id, table_ids from mysql.dingo_ddl_job"
+ " where job_id in (select min(job_id) from mysql.dingo_ddl_job group by schema_ids, table_ids) %s "
+ "order by processing desc, job_id";
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final int general = 0;
private static final int reorg = 1;
Expand Down Expand Up @@ -78,9 +82,18 @@ public static String deleteDDLJob(Session session, DdlJob job) {

public static String addHistoryDDLJob2Table(Session session, DdlJob job, boolean updateRawArgs) {
String time = DateTimeUtils.dateFormat(new Date(System.currentTimeMillis()), "yyyy-MM-dd HH:mm:ss");
String sql = "insert into mysql.dingo_ddl_history(job_id, job_meta, schema_name, table_name, schema_ids, table_ids, create_time) values (%d, %s, %s, %s, %s, %s, %s)";
sql = String.format(sql, job.getId(), Utils.quoteForSql(""), Utils.quoteForSql(job.getSchemaName()), Utils.quoteForSql(job.getTableName()), Utils.quoteForSql(job.getSchemaId()), Utils.quoteForSql(job.getTableId()), Utils.quoteForSql(time));
session.executeUpdate(sql);
String sql = "insert into mysql.dingo_ddl_history(job_id, job_meta, schema_name, table_name, schema_ids, "
+ "table_ids, create_time) values (%d, %s, %s, %s, %s, %s, %s)";
try {
byte[] meta = job.encode(updateRawArgs);
String jobMeta = new String(meta);
sql = String.format(sql, job.getId(), Utils.quoteForSql(jobMeta), Utils.quoteForSql(job.getSchemaName()),
Utils.quoteForSql(job.getTableName()), Utils.quoteForSql(job.getSchemaId()),
Utils.quoteForSql(job.getTableId()), Utils.quoteForSql(time));
session.executeUpdate(sql);
} catch (Exception e) {
LogUtils.error(log, e.getMessage(), e);
}
return null;
}

Expand All @@ -104,39 +117,17 @@ public static void cleanMDLInfo(long jobId) {
DingoMetrics.timer("delMdlKeyEtcd").update(sub, TimeUnit.MILLISECONDS);
}

public static Pair<DdlJob, String> getGenerateJob(Session session) {
try {
return getJob(session, general, job1 -> {
if (job1.getActionType() == ActionType.ActionDropSchema) {
String sql = "select job_id from mysql.dingo_ddl_job where schema_ids = %s and processing limit 1";
sql = String.format(sql, Utils.quoteForSql(job1.getSchemaId()));
return checkJobIsRunnable(session, sql);
}
String sql = "select job_id from mysql.dingo_ddl_job t1, (select table_ids from mysql.dingo_ddl_job where job_id = %d) t2 where " +
" processing and t2.table_ids = t1.table_ids";
sql = String.format(sql, job1.getId());
return checkJobIsRunnable(session, sql);
});
} catch (Exception e) {
LogUtils.error(log, e.getMessage(), e);
return Pair.of(null, e.getMessage());
}
}

public static Pair<List<DdlJob>, String> getGenerateJobs(Session session) {
try {
return getJobs(session, general, job1 -> {
Session session1 = SessionUtil.INSTANCE.getSession();
try {
if (job1.getActionType() == ActionType.ActionDropSchema) {
String sql = "select job_id from mysql.dingo_ddl_job where schema_ids = %s and processing limit 1";
String sql = "select job_id from mysql.dingo_ddl_job where schema_ids = %s " +
"and processing limit 1";
sql = String.format(sql, Utils.quoteForSql(job1.getSchemaId()));
return checkJobIsRunnable(session1, sql);
}
// String sql = "select job_id from mysql.dingo_ddl_job t1, (select table_ids from mysql.dingo_ddl_job where job_id = %d) t2 where " +
// " processing and t2.table_ids = t1.table_ids";
// sql = String.format(sql, job1.getId());
// return checkJobIsRunnable(session1, sql);
return Pair.of(false, null);
} finally {
SessionUtil.INSTANCE.closeSession(session1);
Expand Down Expand Up @@ -205,7 +196,9 @@ public static Pair<List<DdlJob>, String> getJobs(
return Pair.of(ddlJobList, null);
}

public static Pair<DdlJob, String> getJob(Session session, int jobType, Function<DdlJob, Pair<Boolean, String>> filter) {
public static Pair<DdlJob, String> getJob(
Session session, int jobType, Function<DdlJob, Pair<Boolean, String>> filter
) {
String not = "not";
if (jobType == 1) {
not = "";
Expand Down Expand Up @@ -285,25 +278,6 @@ public static String markJobProcessing(Session session, String sql, int retry) {
}
}

public static Pair<DdlJob, String> getReorgJob(Session session) {
try {
Timer.Context timeCtx = DingoMetrics.getTimeContext("reorgJob");
Pair<DdlJob, String> res = getJob(session, reorg, job1 -> {
String sql = "select job_id from mysql.dingo_ddl_job where "
+ "(schema_ids = %s and type = %d and processing) "
+ " or (table_ids = %s and processing) "
+ " limit 1";
sql = String.format(sql, Utils.quoteForSql(job1.getSchemaId()), job1.getActionType().getCode(), Utils.quoteForSql(job1.getTableId()));
return checkJobIsRunnable(session, sql);
});
timeCtx.stop();
return res;
} catch (Exception e) {
LogUtils.error(log, e.getMessage(), e);
return Pair.of(null, e.getMessage());
}
}

public static String removeDDLReorgHandle(Session session, long jobId, MetaElement[] elements) {
if (elements.length == 0) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public static void insertDDLJobs2Table(DdlJob job, boolean updateRawArgs) {
sqlBuilder.append(
String.format(
format, job.getId(), job.mayNeedReorg(), Utils.quoteForSql(job.job2SchemaIDs()),
Utils.quoteForSql(job.job2TableIDs()), Utils.quoteForSql(jobMeta), job.getActionType().getCode(), !job.notStarted()
Utils.quoteForSql(job.job2TableIDs()), Utils.quoteForSql(jobMeta), job.getActionType().getCode(),
!job.notStarted()
)
);
String sql = sqlBuilder.toString();
Expand Down

0 comments on commit 9dbcd93

Please sign in to comment.