Skip to content

Commit

Permalink
branch-3.0: [Fix](job)Fix CAS competition failure leading to message …
Browse files Browse the repository at this point in the history
…publishing failure. #45018 (#45029)

Cherry-picked from #45018

Co-authored-by: Calvin Kirs <[email protected]>
  • Loading branch information
github-actions[bot] and CalvinKirs authored Dec 8, 2024
1 parent fcd2ee7 commit bdd5e95
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,16 @@ public void start() {
public boolean publishEvent(Object... args) {
try {
RingBuffer<T> ringBuffer = disruptor.getRingBuffer();
return ringBuffer.tryPublishEvent(eventTranslator, args);
// Check if the RingBuffer has enough capacity to reserve 10 slots for tasks
// If there is insufficient capacity (less than 10 slots available)
// log a warning and drop the current task
if (!ringBuffer.hasAvailableCapacity(10)) {
LOG.warn("ring buffer has no available capacity,task will be dropped,"
+ "please check the task queue size.");
return false;
}
ringBuffer.publishEvent(eventTranslator, args);
return true;
} catch (Exception e) {
LOG.warn("Failed to publish event", e);
// Handle the exception, e.g., retry or alert
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@ public void schedulerInstantJob(T job, TaskType taskType, C context) throws JobE
for (AbstractTask task : tasks) {
if (!taskDisruptorGroupManager.dispatchInstantTask(task, job.getJobType(),
job.getJobConfig())) {
throw new JobException(job.formatMsgWhenExecuteQueueFull(task.getTaskId()));
String errorMsg = job.formatMsgWhenExecuteQueueFull(task.getTaskId());
task.onFail(errorMsg);
throw new JobException(errorMsg);

}
log.info("dispatch instant job, job id is {}, job name is {}, task id is {}", job.getJobId(),
Expand Down

0 comments on commit bdd5e95

Please sign in to comment.