diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/executor/ShowIndexFromTableExecutor.java b/dingo-calcite/src/main/java/io/dingodb/calcite/executor/ShowIndexFromTableExecutor.java index 51d91a46db..9bd6cee81e 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/executor/ShowIndexFromTableExecutor.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/executor/ShowIndexFromTableExecutor.java @@ -21,6 +21,7 @@ import io.dingodb.common.meta.SchemaState; import io.dingodb.meta.DdlService; import io.dingodb.meta.entity.Column; +import io.dingodb.meta.entity.IndexTable; import io.dingodb.meta.entity.InfoSchema; import io.dingodb.meta.entity.Table; @@ -95,7 +96,11 @@ public Object[] getIndexCol(Table index, String columnName, int seqIndex, boolea Object[] val = new Object[16]; val[0] = tableName; val[1] = unique ? "1" : "0"; - val[2] = index.getName(); + if (index instanceof IndexTable) { + val[2] = index.getName(); + } else { + val[2] = "PRIMARY"; + } val[3] = seqIndex; val[4] = columnName; val[5] = 'A'; diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/NewCalcDistributionOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/NewCalcDistributionOperator.java index 4c8d12f2e4..efc69caf64 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/NewCalcDistributionOperator.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/NewCalcDistributionOperator.java @@ -164,6 +164,21 @@ private static CompletableFuture push( .exceptionally(ex -> { if (ex != null) { if (ex.getCause() instanceof RegionSplitException) { + int retry; + if (param.getSplitRetry().containsKey(distribution.getId())) { + int retryCnt = param.getSplitRetry().get(distribution.getId()); + retry = retryCnt + 1; + } else { + retry = 1; + } + if (retry > 50) { + MetaService.root().invalidateDistribution(param.getTd().getTableId()); + } + if (retry > 100) { + LogUtils.error(log, ex.getMessage(), ex); + throw new RuntimeException("The number of split retries exceeds the maximum limit"); + } + param.getSplitRetry().put(distribution.getId(), retry); NavigableMap tmpDistribution = MetaService.root().getRangeDistribution(param.getTd().getTableId()); DistributionSourceParam copyParam = param.copy( diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/DistributionSourceParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/DistributionSourceParam.java index 16029a62ed..31c3d88309 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/DistributionSourceParam.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/DistributionSourceParam.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyOrder; import com.fasterxml.jackson.annotation.JsonTypeName; +import io.dingodb.common.CommonId; import io.dingodb.common.partition.RangeDistribution; import io.dingodb.common.util.ByteArrayUtils; import io.dingodb.common.util.Optional; @@ -30,7 +31,9 @@ import lombok.Getter; import lombok.Setter; +import java.util.Map; import java.util.NavigableMap; +import java.util.concurrent.ConcurrentHashMap; @Getter @JsonTypeName("distributionSource") @@ -62,6 +65,8 @@ public class DistributionSourceParam extends SourceParam { private int keepOrder; @JsonProperty("concurrencyLevel") private final int concurrencyLevel; + @Setter + private Map splitRetry = new ConcurrentHashMap<>(); public DistributionSourceParam( Table td, diff --git a/dingo-executor/src/main/java/io/dingodb/server/executor/ddl/JobTableUtil.java b/dingo-executor/src/main/java/io/dingodb/server/executor/ddl/JobTableUtil.java index e5dbb24025..af39f9f078 100644 --- a/dingo-executor/src/main/java/io/dingodb/server/executor/ddl/JobTableUtil.java +++ b/dingo-executor/src/main/java/io/dingodb/server/executor/ddl/JobTableUtil.java @@ -42,8 +42,12 @@ @Slf4j public final class JobTableUtil { private static final String updateDDLJobSQL = "update mysql.dingo_ddl_job set job_meta = '%s' where job_id = %d"; - private static final String getJobSQL = "select job_meta, processing, job_id from mysql.dingo_ddl_job where job_id in (select min(job_id) from mysql.dingo_ddl_job group by schema_ids, table_ids, processing) and %s reorg %s order by processing desc, job_id"; - private static final String getJobsSQL = "select job_meta, processing, job_id, table_ids from mysql.dingo_ddl_job where job_id in (select min(job_id) from mysql.dingo_ddl_job group by schema_ids, table_ids) %s order by processing desc, job_id"; + private static final String getJobSQL = "select job_meta, processing, job_id from mysql.dingo_ddl_job where " + + "job_id in (select min(job_id) from mysql.dingo_ddl_job group by schema_ids, table_ids, processing) " + + "and %s reorg %s order by processing desc, job_id"; + private static final String getJobsSQL = "select job_meta, processing, job_id, table_ids from mysql.dingo_ddl_job" + + " where job_id in (select min(job_id) from mysql.dingo_ddl_job group by schema_ids, table_ids) %s " + + "order by processing desc, job_id"; private static final ObjectMapper objectMapper = new ObjectMapper(); private static final int general = 0; private static final int reorg = 1; @@ -78,9 +82,18 @@ public static String deleteDDLJob(Session session, DdlJob job) { public static String addHistoryDDLJob2Table(Session session, DdlJob job, boolean updateRawArgs) { String time = DateTimeUtils.dateFormat(new Date(System.currentTimeMillis()), "yyyy-MM-dd HH:mm:ss"); - String sql = "insert into mysql.dingo_ddl_history(job_id, job_meta, schema_name, table_name, schema_ids, table_ids, create_time) values (%d, %s, %s, %s, %s, %s, %s)"; - sql = String.format(sql, job.getId(), Utils.quoteForSql(""), Utils.quoteForSql(job.getSchemaName()), Utils.quoteForSql(job.getTableName()), Utils.quoteForSql(job.getSchemaId()), Utils.quoteForSql(job.getTableId()), Utils.quoteForSql(time)); - session.executeUpdate(sql); + String sql = "insert into mysql.dingo_ddl_history(job_id, job_meta, schema_name, table_name, schema_ids, " + + "table_ids, create_time) values (%d, %s, %s, %s, %s, %s, %s)"; + try { + byte[] meta = job.encode(updateRawArgs); + String jobMeta = new String(meta); + sql = String.format(sql, job.getId(), Utils.quoteForSql(jobMeta), Utils.quoteForSql(job.getSchemaName()), + Utils.quoteForSql(job.getTableName()), Utils.quoteForSql(job.getSchemaId()), + Utils.quoteForSql(job.getTableId()), Utils.quoteForSql(time)); + session.executeUpdate(sql); + } catch (Exception e) { + LogUtils.error(log, e.getMessage(), e); + } return null; } @@ -104,39 +117,17 @@ public static void cleanMDLInfo(long jobId) { DingoMetrics.timer("delMdlKeyEtcd").update(sub, TimeUnit.MILLISECONDS); } - public static Pair getGenerateJob(Session session) { - try { - return getJob(session, general, job1 -> { - if (job1.getActionType() == ActionType.ActionDropSchema) { - String sql = "select job_id from mysql.dingo_ddl_job where schema_ids = %s and processing limit 1"; - sql = String.format(sql, Utils.quoteForSql(job1.getSchemaId())); - return checkJobIsRunnable(session, sql); - } - String sql = "select job_id from mysql.dingo_ddl_job t1, (select table_ids from mysql.dingo_ddl_job where job_id = %d) t2 where " + - " processing and t2.table_ids = t1.table_ids"; - sql = String.format(sql, job1.getId()); - return checkJobIsRunnable(session, sql); - }); - } catch (Exception e) { - LogUtils.error(log, e.getMessage(), e); - return Pair.of(null, e.getMessage()); - } - } - public static Pair, String> getGenerateJobs(Session session) { try { return getJobs(session, general, job1 -> { Session session1 = SessionUtil.INSTANCE.getSession(); try { if (job1.getActionType() == ActionType.ActionDropSchema) { - String sql = "select job_id from mysql.dingo_ddl_job where schema_ids = %s and processing limit 1"; + String sql = "select job_id from mysql.dingo_ddl_job where schema_ids = %s " + + "and processing limit 1"; sql = String.format(sql, Utils.quoteForSql(job1.getSchemaId())); return checkJobIsRunnable(session1, sql); } -// String sql = "select job_id from mysql.dingo_ddl_job t1, (select table_ids from mysql.dingo_ddl_job where job_id = %d) t2 where " + -// " processing and t2.table_ids = t1.table_ids"; -// sql = String.format(sql, job1.getId()); -// return checkJobIsRunnable(session1, sql); return Pair.of(false, null); } finally { SessionUtil.INSTANCE.closeSession(session1); @@ -205,7 +196,9 @@ public static Pair, String> getJobs( return Pair.of(ddlJobList, null); } - public static Pair getJob(Session session, int jobType, Function> filter) { + public static Pair getJob( + Session session, int jobType, Function> filter + ) { String not = "not"; if (jobType == 1) { not = ""; @@ -285,25 +278,6 @@ public static String markJobProcessing(Session session, String sql, int retry) { } } - public static Pair getReorgJob(Session session) { - try { - Timer.Context timeCtx = DingoMetrics.getTimeContext("reorgJob"); - Pair res = getJob(session, reorg, job1 -> { - String sql = "select job_id from mysql.dingo_ddl_job where " - + "(schema_ids = %s and type = %d and processing) " - + " or (table_ids = %s and processing) " - + " limit 1"; - sql = String.format(sql, Utils.quoteForSql(job1.getSchemaId()), job1.getActionType().getCode(), Utils.quoteForSql(job1.getTableId())); - return checkJobIsRunnable(session, sql); - }); - timeCtx.stop(); - return res; - } catch (Exception e) { - LogUtils.error(log, e.getMessage(), e); - return Pair.of(null, e.getMessage()); - } - } - public static String removeDDLReorgHandle(Session session, long jobId, MetaElement[] elements) { if (elements.length == 0) { return null; diff --git a/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/ddl/DdlHandler.java b/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/ddl/DdlHandler.java index 787ef6462a..bbca9e87ae 100644 --- a/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/ddl/DdlHandler.java +++ b/dingo-store-proxy/src/main/java/io/dingodb/store/proxy/ddl/DdlHandler.java @@ -98,7 +98,8 @@ public static void insertDDLJobs2Table(DdlJob job, boolean updateRawArgs) { sqlBuilder.append( String.format( format, job.getId(), job.mayNeedReorg(), Utils.quoteForSql(job.job2SchemaIDs()), - Utils.quoteForSql(job.job2TableIDs()), Utils.quoteForSql(jobMeta), job.getActionType().getCode(), !job.notStarted() + Utils.quoteForSql(job.job2TableIDs()), Utils.quoteForSql(jobMeta), job.getActionType().getCode(), + !job.notStarted() ) ); String sql = sqlBuilder.toString();