Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor schedule manager unittests #601

Merged
merged 1 commit into from
Oct 20, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -107,16 +106,15 @@ public void testRunningJobWithFailingRunPolicy()
@Test
public void testRunningTwoTasksStoppedAfterFirstByPolicy() throws LockException
{
ShortRunningMultipleTasks job = new ShortRunningMultipleTasks(ScheduledJob.Priority.LOW, 2, () -> {
TestJob job = new TestJob(ScheduledJob.Priority.LOW, 2, () -> {
when(myRunPolicy.validate(any(ScheduledJob.class))).thenReturn(1L);
});
myScheduler.schedule(job);

when(myLockFactory.tryLock(any(), anyString(), anyInt(), anyMap())).thenReturn(new DummyLock());

myScheduler.run();

assertThat(job.getNumRuns()).isEqualTo(1);
assertThat(job.getTaskRuns()).isEqualTo(1);
assertThat(myScheduler.getQueueSize()).isEqualTo(1);
verify(myLockFactory).tryLock(any(), anyString(), anyInt(), anyMap());
}
Expand Down Expand Up @@ -152,27 +150,19 @@ public void testRunningOneJobWithThrowingLock() throws LockException
@Test (timeout = 2000L)
public void testRunningTwoJobsInParallelShouldFail() throws InterruptedException
{
LongRunningJob job = new LongRunningJob(ScheduledJob.Priority.HIGH);
LongRunningJob job2 = new LongRunningJob(ScheduledJob.Priority.LOW);
CountDownLatch job1Latch = new CountDownLatch(1);
TestJob job = new TestJob(ScheduledJob.Priority.HIGH, job1Latch);
CountDownLatch job2Latch = new CountDownLatch(1);
TestJob job2 = new TestJob(ScheduledJob.Priority.LOW, job2Latch);
myScheduler.schedule(job);
myScheduler.schedule(job2);

final CountDownLatch cdl = new CountDownLatch(1);

new Thread()
{

@Override
public void run()
{
myScheduler.run();
cdl.countDown();
}
}.start();

myScheduler.run();

cdl.await();
new Thread(() -> myScheduler.run()).start();
new Thread(() -> myScheduler.run()).start();
waitForJobStarted(job);
job1Latch.countDown();
job2Latch.countDown();
waitForJobFinished(job);

assertThat(job.hasRun()).isTrue();
assertThat(job2.hasRun()).isFalse();
Expand All @@ -192,6 +182,7 @@ public void testTwoJobsRejected()
myScheduler.run();

assertThat(job.hasRun()).isFalse();
assertThat(job2.hasRun()).isFalse();
assertThat(myScheduler.getQueueSize()).isEqualTo(2);
verify(myRunPolicy, times(2)).validate(any(ScheduledJob.class));
}
Expand All @@ -209,14 +200,15 @@ public void testTwoJobsThrowingLock() throws LockException
myScheduler.run();

assertThat(job.hasRun()).isFalse();
assertThat(job2.hasRun()).isFalse();
assertThat(myScheduler.getQueueSize()).isEqualTo(2);
verify(myLockFactory, times(2)).tryLock(any(), anyString(), anyInt(), anyMap());
}

@Test
public void testThreeTasksOneThrowing() throws LockException
{
ShortRunningMultipleTasks job = new ShortRunningMultipleTasks(ScheduledJob.Priority.LOW, 3);
TestJob job = new TestJob(ScheduledJob.Priority.LOW, 3);
myScheduler.schedule(job);

when(myLockFactory.tryLock(any(), anyString(), anyInt(), anyMap()))
Expand All @@ -226,124 +218,92 @@ public void testThreeTasksOneThrowing() throws LockException

myScheduler.run();

assertThat(job.getNumRuns()).isEqualTo(2);
assertThat(job.getTaskRuns()).isEqualTo(2);
assertThat(myScheduler.getQueueSize()).isEqualTo(1);
verify(myLockFactory, times(3)).tryLock(any(), anyString(), anyInt(), anyMap());
}

@Test (timeout = 2000L)
public void testRemoveLongRunningJob() throws InterruptedException
public void testDescheduleRunningJob() throws InterruptedException
{
LongRunningJob job = new LongRunningJob(ScheduledJob.Priority.HIGH);
CountDownLatch jobCdl = new CountDownLatch(1);
TestJob job = new TestJob(ScheduledJob.Priority.HIGH, jobCdl);
myScheduler.schedule(job);

final CountDownLatch cdl = new CountDownLatch(1);

new Thread()
{
@Override
public void run()
{
myScheduler.run();
cdl.countDown();
}
}.start();

while(!job.hasStarted())
{
Thread.sleep(10);
}
new Thread(() -> myScheduler.run()).start();

waitForJobStarted(job);
myScheduler.deschedule(job);

cdl.await();
jobCdl.countDown();
waitForJobFinished(job);

assertThat(job.hasRun()).isTrue();
assertThat(myScheduler.getQueueSize()).isEqualTo(0);
}




private class LongRunningJob extends ScheduledJob
private void waitForJobStarted(TestJob job) throws InterruptedException
{
private volatile boolean hasRun = false;
private volatile boolean hasStarted = false;

public LongRunningJob(Priority priority)
while(!job.hasStarted())
{
super(new ConfigurationBuilder().withPriority(priority).withRunInterval(1, TimeUnit.SECONDS).build());
Thread.sleep(10);
}
}

public boolean hasStarted()
private void waitForJobFinished(TestJob job) throws InterruptedException
{
while(!job.hasRun())
{
return hasStarted;
Thread.sleep(10);
}
}

public boolean hasRun()
private class TestJob extends ScheduledJob
{
private volatile CountDownLatch countDownLatch;
private volatile boolean hasRun = false;
private volatile boolean hasStarted = false;
private final AtomicInteger taskRuns = new AtomicInteger();
private final int numTasks;
private final Runnable onCompletion;

public TestJob(Priority priority, CountDownLatch cdl)
{
return hasRun;
this(priority, cdl, 1, () -> {});
}

@Override
public Iterator<ScheduledTask> iterator()
public TestJob(Priority priority, int numTasks)
{
return Arrays.<ScheduledTask> asList(new LongRunningTask()).iterator();
this(priority, numTasks, () -> {});
}

@Override
public String toString()
public TestJob(Priority priority, int numTasks, Runnable onCompletion)
{
return "LongRunningJob " + getPriority();
super(new ConfigurationBuilder().withPriority(priority).withRunInterval(1, TimeUnit.SECONDS).build());
this.numTasks = numTasks;
this.onCompletion = onCompletion;
}

public class LongRunningTask extends ScheduledTask
public TestJob(Priority priority, CountDownLatch cdl, int numTasks, Runnable onCompletion)
{
@Override
public boolean execute()
{
hasStarted = true;
try
{
Thread.sleep(1000);
}
catch (InterruptedException e)
{
// Intentionally left empty
}
hasRun = true;
return true;
}

@Override
public void cleanup()
{
// NOOP
}
super(new ConfigurationBuilder().withPriority(priority).withRunInterval(1, TimeUnit.SECONDS).build());
this.numTasks = numTasks;
this.onCompletion = onCompletion;
countDownLatch = cdl;
}
}

private class ShortRunningMultipleTasks extends ScheduledJob
{
private final AtomicInteger numRuns = new AtomicInteger();
private final int numTasks;
private final Runnable onCompletion;

public ShortRunningMultipleTasks(Priority priority, int numTasks)
public int getTaskRuns()
{
this(priority, numTasks, () -> {});
return taskRuns.get();
}

public ShortRunningMultipleTasks(Priority priority, int numTasks, Runnable onCompletion)
public boolean hasStarted()
{
super(new ConfigurationBuilder().withPriority(priority).withRunInterval(1, TimeUnit.SECONDS).build());
this.numTasks = numTasks;
this.onCompletion = onCompletion;
return hasStarted;
}

public int getNumRuns()
public boolean hasRun()
{
return numRuns.get();
return hasRun;
}

@Override
Expand Down Expand Up @@ -371,8 +331,21 @@ public ShortRunningTask(Runnable onCompletion)
@Override
public boolean execute()
{
hasStarted = true;
try
{
if (countDownLatch != null)
{
countDownLatch.await();
}
}
catch (InterruptedException e)
{
// Intentionally left empty
}
onCompletion.run();
numRuns.incrementAndGet();
taskRuns.incrementAndGet();
hasRun = true;
return true;
}
}
Expand Down