Skip to content

Commit

Permalink
add unique flag
Browse files Browse the repository at this point in the history
  • Loading branch information
ponfee committed Jul 23, 2024
1 parent bbe995b commit 3f45f76
Show file tree
Hide file tree
Showing 26 changed files with 70 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,13 @@ public class PredecessorInstance extends ToJsonString implements Serializable {
private static final long serialVersionUID = 422243686633743869L;

private long instanceId;
private int sequence;
private String curNode;
private RunState runState;
private List<PredecessorTask> tasks;

public static PredecessorInstance of(SchedWorkflow workflow, List<SchedTask> tasks) {
PredecessorInstance instance = new PredecessorInstance();
instance.setInstanceId(workflow.getInstanceId());
instance.setSequence(workflow.getSequence());
instance.setCurNode(workflow.getCurNode());
instance.setRunState(RunState.of(workflow.getRunState()));
instance.setTasks(Collects.convert(tasks, PredecessorInstance::convert));
Expand Down
18 changes: 13 additions & 5 deletions disjob-core/src/main/java/cn/ponfee/disjob/core/enums/RunType.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,30 +33,34 @@ public enum RunType implements IntValueEnum<RunType> {
/**
* 调度计划
*/
SCHEDULE(1, "调度计划"),
SCHEDULE(1, true, "调度计划"),

/**
* 任务依赖
*/
DEPEND(2, "任务依赖"),
DEPEND(2, false, "任务依赖"),

/**
* 失败重试
*/
RETRY(3, "失败重试"),
RETRY(3, false, "失败重试"),

/**
* 手动触发
*/
MANUAL(4, "手动触发"),
MANUAL(4, true, "手动触发"),

;

public static final long UNIQUE_FLAG = 0L;

private final int value;
private final boolean uniqueFlag;
private final String desc;

RunType(int value, String desc) {
RunType(int value, boolean uniqueFlag, String desc) {
this.value = value;
this.uniqueFlag = uniqueFlag;
this.desc = desc;
}

Expand All @@ -70,6 +74,10 @@ public String desc() {
return desc;
}

public boolean isUniqueFlag() {
return uniqueFlag;
}

public static RunType of(Integer value) {
return Objects.requireNonNull(Const.MAPPING.get(value), () -> "Invalid run type value: " + value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,9 @@ public class SchedDepend extends BaseEntity {
*/
private Long childJobId;

/**
* 序号(从1开始)
*/
private Integer sequence;

public SchedDepend(Long parentJobId, Long childJobId, Integer sequence) {
public SchedDepend(Long parentJobId, Long childJobId) {
this.parentJobId = parentJobId;
this.childJobId = childJobId;
this.sequence = sequence;
}

public static List<Long> parseTriggerValue(String triggerValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public class SchedInstance extends BaseEntity {
*/
private Integer runType;

/**
* 唯一标识(保证trigger_time唯一):0-SCHEDULE/MANUAL;{instance_id}-其它场景;
*/
private Long uniqueFlag;

/**
* 运行状态:10-待运行;20-运行中;30-已暂停;40-已完成;50-已取消;
*
Expand Down Expand Up @@ -184,6 +189,11 @@ public InstanceAttach parseAttach() {
return Jsons.fromJson(attach, InstanceAttach.class);
}

public SchedInstance fillUniqueFlag() {
this.uniqueFlag = (RunType.of(runType).isUniqueFlag() && !isWorkflowNode()) ? RunType.UNIQUE_FLAG : instanceId;
return this;
}

public void markTerminated(RunState runState, Date runEndTime) {
Assert.state(runState.isTerminal(), () -> "Invalid terminal run state: " + instanceId + ", " + runState);
Assert.state(runEndTime != null, () -> "Run end time cannot be null: " + instanceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,6 @@ public class SchedWorkflow extends BaseEntity {
*/
private String preNode;

/**
* 序号(从1开始)
*/
private Integer sequence;

/**
* 运行状态:10-待运行;20-运行中;30-已暂停;40-已完成;50-已取消;
*
Expand All @@ -68,11 +63,10 @@ public class SchedWorkflow extends BaseEntity {
*/
private Long instanceId;

public SchedWorkflow(Long wnstanceId, String curNode, String preNode, int sequence) {
public SchedWorkflow(Long wnstanceId, String curNode, String preNode) {
this.wnstanceId = wnstanceId;
this.curNode = curNode;
this.preNode = preNode;
this.sequence = sequence;
this.runState = RunState.WAITING.value();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ public abstract class ServerRegistry<R extends Server, D extends Server> impleme
*/
protected final TripState state = TripState.createStarted();

protected ServerRegistry(String namespace, char separator) {
protected ServerRegistry(AbstractRegistryProperties config, char separator) {
this.separator = separator;

String prefix = prune(namespace, separator);
String prefix = prune(config.getNamespace(), separator);

this.registryRole = ServerRole.of(GenericUtils.getActualTypeArgument(getClass(), 0));
this.registryRootPath = prefix + registryRole.key();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public abstract class ConsulServerRegistry<R extends Server, D extends Server> e
private final ConsulSubscriberThread consulSubscriberThread;

protected ConsulServerRegistry(ConsulRegistryProperties config) {
super(config.getNamespace(), ':');
super(config, ':');

this.client = new ConsulClient(config.getHost(), config.getPort());
this.token = StringUtils.isBlank(config.getToken()) ? null : config.getToken().trim();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public abstract class DatabaseServerRegistry<R extends Server, D extends Server>
private final LoopThread discoverHeartbeatThread;

protected DatabaseServerRegistry(DatabaseRegistryProperties config, JdbcTemplateWrapper wrapper) {
super(config.getNamespace(), ':');
super(config, ':');
this.namespace = config.getNamespace().trim();
this.jdbcTemplateWrapper = wrapper;
this.sessionTimeoutMs = config.getSessionTimeoutMs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public abstract class EtcdServerRegistry<R extends Server, D extends Server> ext

protected EtcdServerRegistry(EtcdRegistryProperties config) {
// etcd separator must be '/'
super(config.getNamespace(), '/');
super(config, '/');
this.ttl = config.getSessionTimeoutMs() / 2000;

CountDownLatch latch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public abstract class NacosServerRegistry<R extends Server, D extends Server> ex
private final EventListener eventListener;

protected NacosServerRegistry(NacosRegistryProperties config) {
super(config.getNamespace(), ':');
super(config, ':');
this.groupName = StringUtils.isBlank(config.getNamespace()) ? Constants.DEFAULT_GROUP : config.getNamespace().trim();

CountDownLatch latch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public abstract class RedisServerRegistry<R extends Server, D extends Server> ex

protected RedisServerRegistry(StringRedisTemplate stringRedisTemplate,
RedisRegistryProperties config) {
super(config.getNamespace(), ':');
super(config, ':');
this.registryChannel = registryRootPath + separator + CHANNEL;
this.discoveryChannel = discoveryRootPath + separator + CHANNEL;
this.stringRedisTemplate = stringRedisTemplate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public abstract class ZookeeperServerRegistry<R extends Server, D extends Server
private final String zkRegistryRootPath;

protected ZookeeperServerRegistry(ZookeeperRegistryProperties config) {
super(config.getNamespace(), '/');
super(config, '/');
// zookeeper parent path must start with "/"
this.zkRegistryRootPath = separator + registryRootPath;
String zkDiscoveryRootPath = separator + discoveryRootPath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,10 +379,7 @@ private void parseTriggerConfig(SchedJob job) {
// 校验是否有循环依赖 以及 依赖层级是否太深
checkCircularDepends(jobId, new HashSet<>(parentJobIds));

List<SchedDepend> list = new ArrayList<>(parentJobIds.size());
for (int i = 0; i < parentJobIds.size(); i++) {
list.add(new SchedDepend(parentJobIds.get(i), jobId, i + 1));
}
List<SchedDepend> list = Collects.convert(parentJobIds, pid -> new SchedDepend(pid, jobId));
Collects.batchProcess(list, dependMapper::batchInsert, JobConstants.PROCESS_BATCH_SIZE);
job.setTriggerValue(Joiner.on(Str.COMMA).join(parentJobIds));
job.setNextTriggerTime(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ private Tuple2<RunState, Date> obtainRunState(List<SchedTask> tasks) {
}

private void createInstance(TriggerInstance tInstance) {
instanceMapper.insert(tInstance.getInstance());
instanceMapper.insert(tInstance.getInstance().fillUniqueFlag());

if (tInstance instanceof GeneralInstanceCreator.GeneralInstance) {
GeneralInstanceCreator.GeneralInstance creator = (GeneralInstanceCreator.GeneralInstance) tInstance;
Expand All @@ -559,7 +559,7 @@ private void createInstance(TriggerInstance tInstance) {
WorkflowInstanceCreator.WorkflowInstance creator = (WorkflowInstanceCreator.WorkflowInstance) tInstance;
Collects.batchProcess(creator.getWorkflows(), workflowMapper::batchInsert, PROCESS_BATCH_SIZE);
for (Tuple2<SchedInstance, List<SchedTask>> sub : creator.getNodeInstances()) {
instanceMapper.insert(sub.a);
instanceMapper.insert(sub.a.fillUniqueFlag());
Collects.batchProcess(sub.b, taskMapper::batchInsert, PROCESS_BATCH_SIZE);
}
} else {
Expand Down Expand Up @@ -756,10 +756,9 @@ private void createWorkflowNode(SchedInstance leadInstance, WorkflowGraph graph,
try {
long nextInstanceId = generateId();
List<SchedTask> tasks = splitJob(SplitJobParam.from(job, target.getName()), nextInstanceId);
long triggerTime = leadInstance.getTriggerTime() + workflow.getSequence();
RunType runType = RunType.of(leadInstance.getRunType());
SchedWorkflow predecessor = predecessors.stream().max(Comparator.comparing(BaseEntity::getUpdatedAt)).orElse(null);
SchedInstance nextInstance = SchedInstance.create(nextInstanceId, job.getJobId(), runType, triggerTime, 0);
SchedInstance nextInstance = SchedInstance.create(nextInstanceId, job.getJobId(), runType, System.currentTimeMillis(), 0);
nextInstance.setRnstanceId(wnstanceId);
nextInstance.setPnstanceId(predecessor == null ? null : getRetryOriginalInstanceId(instanceMapper.get(predecessor.getInstanceId())));
nextInstance.setWnstanceId(wnstanceId);
Expand All @@ -769,7 +768,7 @@ private void createWorkflowNode(SchedInstance leadInstance, WorkflowGraph graph,
Assert.isTrue(row > 0, () -> "Start workflow node failed: " + workflow);

// save to db
instanceMapper.insert(nextInstance);
instanceMapper.insert(nextInstance.fillUniqueFlag());
Collects.batchProcess(tasks, taskMapper::batchInsert, PROCESS_BATCH_SIZE);
TransactionUtils.doAfterTransactionCommit(() -> super.dispatch(job, nextInstance, tasks));
} catch (Throwable t) {
Expand Down Expand Up @@ -895,7 +894,6 @@ private void retryJob(SchedInstance prev, LazyLoader<SchedJob> lazyJob) {

// 1、build sched instance
retriedCount++;
// DAG任务可能会有triggerTime冲突问题,worker端会重试:Duplicate entry '1003164910267351006-1721480078288-3' for key 'uk_jobid_triggertime_runtype'
long triggerTime = schedJob.computeRetryTriggerTime(retriedCount);
SchedInstance retryInstance = SchedInstance.create(retryInstanceId, schedJob.getJobId(), RunType.RETRY, triggerTime, retriedCount);
retryInstance.setRnstanceId(prev.obtainRnstanceId());
Expand Down Expand Up @@ -930,7 +928,7 @@ private void retryJob(SchedInstance prev, LazyLoader<SchedJob> lazyJob) {

// 3、save to db
Assert.notEmpty(tasks, "Insert list of task cannot be empty.");
instanceMapper.insert(retryInstance);
instanceMapper.insert(retryInstance.fillUniqueFlag());
Collects.batchProcess(tasks, taskMapper::batchInsert, PROCESS_BATCH_SIZE);

TransactionUtils.doAfterTransactionCommit(() -> super.dispatch(schedJob, retryInstance, tasks));
Expand Down Expand Up @@ -965,15 +963,9 @@ private void dependJob(SchedInstance parentInstance) {
Objects.requireNonNull(transactionTemplate.getTransactionManager()),
() -> {
TriggerInstanceCreator creator = TriggerInstanceCreator.of(childJob.getJobType(), this);
// ### Cause: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '1003164910267351007-1683124620000-2' for key 'uk_jobid_triggertime_runtype'
// 加sequence解决唯一索引问题:UNIQUE KEY `uk_jobid_triggertime_runtype` (`job_id`, `trigger_time`, `run_type`)
//
// 极端情况还是会存在唯一索引值冲突:比如依赖的任务多于1000个,但这种情况可以限制所依赖父任务的个数来解决,暂不考虑
// parent1(trigger_time=1000, sequence=1001),parent2(trigger_time=2000, sequence=1)
long triggerTime = (parentInstance.getTriggerTime() / 1000) * 1000 + depend.getSequence();
TriggerInstance tInstance = creator.create(childJob, RunType.DEPEND, triggerTime);
TriggerInstance tInstance = creator.create(childJob, RunType.DEPEND, System.currentTimeMillis());
tInstance.getInstance().setRnstanceId(parentInstance.obtainRnstanceId());
tInstance.getInstance().setPnstanceId(parentInstance.getInstanceId());
tInstance.getInstance().setPnstanceId(getRetryOriginalInstanceId(parentInstance));
createInstance(tInstance);
return () -> creator.dispatch(childJob, tInstance);
},
Expand Down Expand Up @@ -1049,7 +1041,6 @@ private List<PredecessorInstance> findPredecessorInstances(long instanceId, long
tasks.sort(Comparator.comparing(SchedTask::getTaskNo));
return PredecessorInstance.of(e, tasks);
})
.sorted(Comparator.comparing(PredecessorInstance::getSequence))
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ public Long getInstanceJobId(long instanceId) {
return instanceMapper.getJobId(instanceId);
}

public SchedInstance getInstance(long jobId, long triggerTime, int runType) {
return instanceMapper.getByJobIdAndTriggerTimeAndRunType(jobId, triggerTime, runType);
public SchedInstance getInstance(long jobId, long triggerTime, int runType, long uniqueFlag) {
return instanceMapper.getByUniqueKey(jobId, triggerTime, runType, uniqueFlag);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public class SupervisorProperties extends ToJsonString implements Serializable {

public void check() {
Assert.isTrue(maximumSplitTaskSize > 0, "Maximum split task size must be greater than 0.");
Assert.isTrue(maximumJobDependsDepth > 0, "Maximum job depends depth must be greater than 0.");
Assert.isTrue(0 < maximumJobDependsDepth && maximumJobDependsDepth < 100, "Maximum job depends depth must be range [1, 99].");
Assert.isTrue(0 < maximumJobRetryCount && maximumJobRetryCount < 10, "Maximum job retry count must be range [1, 9].");
Assert.isTrue(scanTriggeringJobPeriodMs >= 1000, "Scan triggering job period ms cannot less than 1000.");
Assert.isTrue(scanWaitingInstancePeriodMs >= 15000, "Scan waiting instance period ms cannot less than 15000.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ int updateNextScanTime(@Param("instanceId") long instanceId,

int deleteByWnstanceId(long wnstanceId);

SchedInstance getByJobIdAndTriggerTimeAndRunType(@Param("jobId") long jobId,
@Param("triggerTime") long triggerTime,
@Param("runType") int runType);
SchedInstance getByUniqueKey(@Param("jobId") long jobId,
@Param("triggerTime") long triggerTime,
@Param("runType") int runType,
@Param("uniqueFlag") long uniqueFlag);

// -------------------------------------------------query for page

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,18 @@
<sql id="Table_Name">sched_depend</sql>

<sql id="Base_Column_List">
parent_job_id, child_job_id, `sequence`
parent_job_id, child_job_id
</sql>

<insert id="batchInsert" parameterType="collection" keyColumn="id" keyProperty="id" useGeneratedKeys="true">
INSERT INTO <include refid="Table_Name" /> (
parent_job_id,
child_job_id,
`sequence`
child_job_id
) VALUES
<foreach collection="collection" item="item" separator=",">
(
#{item.parentJobId,jdbcType=BIGINT},
#{item.childJobId,jdbcType=BIGINT},
#{item.sequence,jdbcType=INTEGER}
#{item.childJobId,jdbcType=BIGINT}
)
</foreach>
</insert>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@

<update id="softDelete" parameterType="string">
UPDATE <include refid="Table_Name" />
SET is_deleted = NULL,
SET is_deleted = id,
updated_by = #{updatedBy,jdbcType=VARCHAR},
version = version+1
WHERE `group` = #{group,jdbcType=VARCHAR}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
job_id,
trigger_time,
run_type,
unique_flag,
run_state,
run_start_time,
retried_count,
Expand All @@ -31,6 +32,7 @@
#{jobId,jdbcType=BIGINT},
#{triggerTime,jdbcType=BIGINT},
#{runType,jdbcType=TINYINT},
#{uniqueFlag,jdbcType=BIGINT},
#{runState,jdbcType=TINYINT},
#{runStartTime,jdbcType=TIMESTAMP},
#{retriedCount,jdbcType=TINYINT},
Expand All @@ -50,12 +52,13 @@
WHERE instance_id = #{instanceId,jdbcType=BIGINT}
</select>

<select id="getByJobIdAndTriggerTimeAndRunType" resultType="cn.ponfee.disjob.core.model.SchedInstance">
<select id="getByUniqueKey" resultType="cn.ponfee.disjob.core.model.SchedInstance">
SELECT <include refid="Base_Column_List"/>
FROM <include refid="Table_Name" />
WHERE job_id = #{jobId,jdbcType=BIGINT}
AND trigger_time = #{triggerTime,jdbcType=BIGINT}
AND run_type = #{runType,jdbcType=TINYINT}
AND unique_flag = #{uniqueFlag,jdbcType=BIGINT}
</select>

<select id="getWnstanceId" parameterType="_long" resultType="long">
Expand Down
Loading

0 comments on commit 3f45f76

Please sign in to comment.