Skip to content

Commit

Permalink
adding a workflow to maintain information about external schedules
Browse files Browse the repository at this point in the history
  • Loading branch information
gs-gunjan committed Aug 10, 2023
1 parent ee24f3e commit f035894
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ boolean registerMetrics(PrometheusMetricsHandler metricsHandler)
@Named("refresh-all-versions")
boolean initVersions(SchedulesFactory schedulesFactory, ArtifactsRefreshService artifactsRefreshService, ArtifactRepositoryProviderConfiguration configuration)
{
schedulesFactory.registerSingleInstance(ParentEvent.REFRESH_ALL_VERSION_ARTIFACTS_SCHEDULE.name(), configuration.getVersionsUpdateIntervalInMillis(), configuration.getVersionsUpdateIntervalInMillis(),() -> artifactsRefreshService.refreshAllVersionsForAllProjects(false,false,false, ParentEvent.REFRESH_ALL_VERSION_ARTIFACTS_SCHEDULE.name()));
schedulesFactory.registerExternalTriggerSchedule(ParentEvent.REFRESH_ALL_VERSION_ARTIFACTS_SCHEDULE.name(), configuration.getVersionsUpdateIntervalInMillis(), true, () -> artifactsRefreshService.refreshAllVersionsForAllProjects(false,false,false, ParentEvent.REFRESH_ALL_VERSION_ARTIFACTS_SCHEDULE.name()));
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import org.finos.legend.depot.core.authorisation.api.AuthorisationProvider;
import org.finos.legend.depot.core.authorisation.resources.BaseAuthorisedResource;
import org.finos.legend.depot.schedules.services.SchedulesFactory;
Expand Down Expand Up @@ -97,12 +98,13 @@ public List<ScheduleInstance> getSchedulerInstances()
@Path("/schedules/{scheduleName}")
@ApiOperation(ResourceLoggingAndTracing.TRIGGER_SCHEDULE)
@Produces(MediaType.APPLICATION_JSON)
public Response forceScheduler(@PathParam("scheduleName") String scheduleName)
public Response forceScheduler(@PathParam("scheduleName") String scheduleName,
@QueryParam("forceRun") @DefaultValue("false") @ApiParam("Whether to run the schedule if disabled") boolean forceRun)
{
return handle(ResourceLoggingAndTracing.TRIGGER_SCHEDULE, () ->
{
validateUser();
schedulesFactory.trigger(scheduleName);
schedulesFactory.trigger(scheduleName, forceRun);
return Response.noContent().build();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ public void run()
}
}

public void registerExternalTriggerSchedule(String name, long intervalInMilliseconds, boolean isSingleInstance, Supplier<Object> function)
{
createScheduleInfo(name, intervalInMilliseconds, isSingleInstance, function);
}

public void registerSingleInstance(String name, long delayStartInMilliseconds, long intervalInMilliseconds, Supplier<Object> function)
{
register(name, delayStartInMilliseconds, intervalInMilliseconds, true, function);
Expand All @@ -84,17 +89,22 @@ public void register(String name, long delayStartInMilliseconds, long intervalIn

private void register(String name, long delayStartInMilliseconds, long intervalInMilliseconds, boolean singleInstance, Supplier<Object> function)
{
ScheduleInfo info = schedulesStore.get(name).orElse(new ScheduleInfo(name));
info.frequency = intervalInMilliseconds;
info.singleInstance = singleInstance;
functions.put(name,function);
schedulesStore.createOrUpdate(info);
createScheduleInfo(name, intervalInMilliseconds, singleInstance, function);

TimerTask timerTask = createTimerTask(name);
tasksRegistry.put(name,timerTask);
timer.scheduleAtFixedRate(timerTask, delayStartInMilliseconds, intervalInMilliseconds);
}

private void createScheduleInfo(String name, long intervalInMilliseconds, boolean singleInstance, Supplier<Object> function)
{
ScheduleInfo info = schedulesStore.get(name).orElse(new ScheduleInfo(name));
info.frequency = intervalInMilliseconds;
info.singleInstance = singleInstance;
functions.put(name, function);
schedulesStore.createOrUpdate(info);
}

private TimerTask createTimerTask(String name)
{
return new TimerTask()
Expand Down Expand Up @@ -122,7 +132,7 @@ public void run()
LOGGER.info("Skipping {} execution", name);
return;
}
execute(schedule);
execute(schedule, true);
}
};
}
Expand Down Expand Up @@ -159,10 +169,10 @@ public void deRegisterAll()
});
}

public void trigger(String scheduleName)
public void trigger(String scheduleName, boolean forceRun)
{
Optional<ScheduleInfo> scheduleInfo = schedulesStore.get(scheduleName);
scheduleInfo.ifPresent(schedule -> execute(schedule));
scheduleInfo.ifPresent(schedule -> execute(schedule, forceRun));
}

void run(String scheduleName)
Expand All @@ -184,21 +194,29 @@ public void toggleDisableAll(boolean toggle)
schedulesStore.getAll().forEach(info -> toggleDisable(info.name, toggle));
}

private void execute(ScheduleInfo schedule)
private void execute(ScheduleInfo schedule, boolean forceRun)
{
try
{
if (functions.containsKey(schedule.name))
if (forceRun || !schedule.isDisabled())
{
this.instancesStore.insert(new ScheduleInstance(schedule.name,toDate(LocalDateTime.now().plusSeconds(schedule.frequency / 1000L))));
LOGGER.info("Starting schedule {} ", schedule.name);
Object result = functions.get(schedule.name).get();
LOGGER.info("Schedule {} result {}", schedule.name, result);
if (functions.containsKey(schedule.name))
{
this.instancesStore.insert(new ScheduleInstance(schedule.name,toDate(LocalDateTime.now().plusSeconds(schedule.frequency / 1000L))));
LOGGER.info("Starting schedule {} ", schedule.name);
Object result = functions.get(schedule.name).get();
LOGGER.info("Schedule {} result {}", schedule.name, result);
}
else
{
LOGGER.warn("No function to execute {}", schedule.name);
}
}
else
{
LOGGER.warn("No function to execute {}", schedule.name);
LOGGER.warn("Schedule {} is disabled and force run flag is false", schedule.name);
}

}
catch (Exception e)
{
Expand Down

0 comments on commit f035894

Please sign in to comment.