Skip to content

Commit

Permalink
dead code clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
musketyr committed Dec 4, 2024
1 parent 47393fe commit 6d5c81c
Show file tree
Hide file tree
Showing 6 changed files with 7 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ public <R> Publisher<R> executeOnlyOnLeader(JobRunContext context, Callable<R> s
return Mono.fromCallable(supplier).subscribeOn(Schedulers.fromExecutorService(getExecutorService(context.getStatus().getName())));
}
eventPublisher.publishEvent(JobExecutorEvent.leaderOnly(EXECUTOR_TYPE, JobExecutorEvent.Execution.SKIP, context.getStatus(), executorId.id()));
context.skipped();
return Mono.empty();
}).flux();
}
Expand All @@ -120,7 +119,6 @@ public <R> Publisher<R> executeConcurrently(JobRunContext context, int maxConcur
LOGGER.trace("Skipping execution of the job {} as the concurrency level {} is already reached", context.getStatus().getName(), maxConcurrency);
}
eventPublisher.publishEvent(JobExecutorEvent.concurrent(EXECUTOR_TYPE, JobExecutorEvent.Execution.SKIP, context.getStatus(), maxConcurrency, executorId.id()));
context.skipped();
return decreaseCurrentExecutionCount(context.getStatus().getName(), commands).flatMap(decreased -> Mono.empty());
}

Expand All @@ -137,7 +135,6 @@ public <R> Publisher<R> executeOnlyOnFollower(JobRunContext context, Callable<R>
return readMasterHostname(context.getStatus().getName(), commands).flatMap(h -> {
if (!"".equals(h) && h.equals(executorId.id())) {
eventPublisher.publishEvent(JobExecutorEvent.followerOnly(EXECUTOR_TYPE, JobExecutorEvent.Execution.SKIP, context.getStatus(), executorId.id()));
context.skipped();
return Mono.empty();
}
context.executed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,5 @@ default long getDurationMillis() {

@Nullable Throwable getException();

DefaultJobRunStatus copy(String idSuffix);
JobRunStatus copy(String idSuffix);
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public Optional<Object> getMessage() {
return Optional.ofNullable(message).map(QueueMessage::getMessage);
}

public Optional<String> getMessageId() {
return Optional.ofNullable(message).map(QueueMessage::getId);
}

@Override
public String toString() {
return "JobExecutionStartedEvent{name='%s', id='%s', message=%s}".formatted(name, id, message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,18 @@ public class DefaultJobRunContext implements JobRunContext {
private Consumer<JobRunStatus> onFinished = s -> { };
private BiConsumer<JobRunStatus, Object> onResult = (aStatus, r) -> { };
private Consumer<JobRunStatus> onExecuted = s -> { };
private Consumer<JobRunStatus> onSkipped = s -> { };

public DefaultJobRunContext(JobRunStatus status) {
this.status = status;
}

private DefaultJobRunContext(JobRunStatus status, BiConsumer<JobRunStatus, QueueMessage<?>> onMessage, BiConsumer<JobRunStatus, Throwable> onError, Consumer<JobRunStatus> onFinished, BiConsumer<JobRunStatus, Object> onResult, Consumer<JobRunStatus> onExecuted, Consumer<JobRunStatus> onSkipped) {
private DefaultJobRunContext(JobRunStatus status, BiConsumer<JobRunStatus, QueueMessage<?>> onMessage, BiConsumer<JobRunStatus, Throwable> onError, Consumer<JobRunStatus> onFinished, BiConsumer<JobRunStatus, Object> onResult, Consumer<JobRunStatus> onExecuted) {
this.status = status;
this.onMessage = onMessage;
this.onError = onError;
this.onFinished = onFinished;
this.onResult = onResult;
this.onExecuted = onExecuted;
this.onSkipped = onSkipped;
}

@Override
Expand Down Expand Up @@ -79,12 +77,6 @@ public JobRunContext onExecuted(Consumer<JobRunStatus> onExecuted) {
return this;
}

@Override
public JobRunContext onSkipped(Consumer<JobRunStatus> onSkipped) {
this.onSkipped = this.onSkipped.andThen(onSkipped);
return this;
}

@Override
public void message(@Nullable QueueMessage<?> event) {
onMessage.accept(status, event);
Expand All @@ -110,19 +102,14 @@ public void executed() {
onExecuted.accept(status);
}

@Override
public void skipped() {
onSkipped.accept(status);
}

@Override
public JobRunStatus getStatus() {
return status;
}

@Override
public JobRunContext createChildContext(String isSuffix) {
return new DefaultJobRunContext(status.copy(isSuffix), onMessage, onError, onFinished, onResult, onExecuted, onSkipped);
return new DefaultJobRunContext(status.copy(isSuffix), onMessage, onError, onFinished, onResult, onExecuted);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,6 @@ static JobRunContext create(JobRunStatus status) {
JobRunContext onExecuted(Consumer<JobRunStatus> onExecuted);


/**
* Registers a listener for the skipped event. The listener is called when the job execution is skipped because
* there are restrictions that prevents the execution e.g. concurrency limit or leader/follower restrictions.
*
* @param onSkipped the listener to be called when the job execution is skipped
* @return this context
*/
JobRunContext onSkipped(Consumer<JobRunStatus> onSkipped);

/**
* Signals new incoming message.
* @param event the message or <code>null</code> if the job is not a consumer
Expand Down Expand Up @@ -119,11 +110,6 @@ static JobRunContext create(JobRunStatus status) {
*/
void executed();

/**
* Signals the job is skipped.
*/
void skipped();

/**
* Returns the status of the job.
* @return the status of the job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ private <R> Publisher<R> doExecuteConcurrently(JobExecutorEvent.Type type, JobR
if (increasedCount > concurrency) {
counts.get(status.getName()).decrementAndGet();
eventPublisher.publishEvent(new JobExecutorEvent(EXECUTOR_TYPE, type, JobExecutorEvent.Execution.SKIP, status, concurrency, executorId.id()));
context.skipped();
return null;
}

Expand Down

0 comments on commit 6d5c81c

Please sign in to comment.