Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: CAS修复时间轮并发问题 #3528

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceArray;

/**
* @author xuxueli 2019-05-21
Expand All @@ -33,7 +33,7 @@ public static JobScheduleHelper getInstance(){
private Thread ringThread;
private volatile boolean scheduleThreadToStop = false;
private volatile boolean ringThreadToStop = false;
private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
private static final AtomicReferenceArray<List<Integer>> ringData = new AtomicReferenceArray<List<Integer>>(new List[60]);

public void start(){

Expand Down Expand Up @@ -238,7 +238,7 @@ public void run() {
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
List<Integer> tmpData = ringData.getAndSet((nowSecond + 60 - i) % 60, null); // CAS读取并置空,put时就获取不到即将被消费的list,避免并发操作同一个list
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
Expand Down Expand Up @@ -285,12 +285,12 @@ private void refreshNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exce

private void pushTimeRing(int ringSecond, int jobId){
// push async ring
List<Integer> ringItemData = ringData.get(ringSecond);
List<Integer> ringItemData = ringData.getAndSet(ringSecond, null);
if (ringItemData == null) {
ringItemData = new ArrayList<Integer>();
ringData.put(ringSecond, ringItemData);
}
ringItemData.add(jobId);
ringData.set(ringSecond, ringItemData);

logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );
}
Expand All @@ -316,13 +316,11 @@ public void toStop(){

// if has ring data
boolean hasRingData = false;
if (!ringData.isEmpty()) {
for (int second : ringData.keySet()) {
List<Integer> tmpData = ringData.get(second);
if (tmpData!=null && tmpData.size()>0) {
hasRingData = true;
break;
}
for (int second = 0; second < 60; second++) {
List<Integer> tmpData = ringData.get(second);
if (tmpData != null && tmpData.size() > 0) {
hasRingData = true;
break;
}
}
if (hasRingData) {
Expand All @@ -340,7 +338,7 @@ public void toStop(){
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
if (ringThread.getState() != Thread.State.TERMINATED){
if (ringThread.getState() != Thread.State.TERMINATED) {
// interrupt and wait
ringThread.interrupt();
try {
Expand All @@ -361,7 +359,7 @@ public static Date generateNextValidTime(XxlJobInfo jobInfo, Date fromTime) thro
Date nextValidTime = new CronExpression(jobInfo.getScheduleConf()).getNextValidTimeAfter(fromTime);
return nextValidTime;
} else if (ScheduleTypeEnum.FIX_RATE == scheduleTypeEnum /*|| ScheduleTypeEnum.FIX_DELAY == scheduleTypeEnum*/) {
return new Date(fromTime.getTime() + Integer.valueOf(jobInfo.getScheduleConf())*1000 );
return new Date(fromTime.getTime() + Integer.valueOf(jobInfo.getScheduleConf()) * 1000);
}
return null;
}
Expand Down