Skip to content

Commit

Permalink
refact: take actions for review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jayanta2018 committed May 1, 2024
1 parent 184f4c3 commit bfae2a9
Show file tree
Hide file tree
Showing 8 changed files with 291 additions and 101 deletions.
4 changes: 4 additions & 0 deletions .aws/task-definition-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@
"name": "APPLICATION_CRON_REVALCURRENTPLACEMENTJOB",
"valueFrom": "application_cron_revalcurrentplacementjob-sync-${environment}"
},
{
"name": "APPLICATION_CRON_POSTFUNDINGSYNCJOB",
"valueFrom": "application_cron_postfundingjob-sync-${environment}"
},
{
"name": "APPLICATION_JOBS_PERSONELASTICSEARCHJOB_PAGESIZE",
"valueFrom": "application_jobs_personelasticsearchjob_pagesize-sync-${environment}"
Expand Down
7 changes: 0 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -463,11 +463,4 @@
</dependencies>
</profile>
</profiles>
<distributionManagement>
<repository>
<id>codeartifact</id>
<name>CodeArtifact</name>
<url>https://hee-430723991443.d.codeartifact.eu-west-1.amazonaws.com/maven/Health-Education-England/</url>
</repository>
</distributionManagement>
</project>
12 changes: 3 additions & 9 deletions src/main/java/uk/nhs/tis/sync/api/JobResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void setRevalCurrentPlacementSyncJob(
this.revalCurrentPlacementSyncJob = revalCurrentPlacementSyncJob;
}

@Autowired(required = false)
@Autowired
public void setPostFundingSyncJob(PostFundingSyncJob postFundingSyncJob) {
this.postFundingSyncJob = postFundingSyncJob;
}
Expand Down Expand Up @@ -111,9 +111,7 @@ public ResponseEntity<Map<String, Boolean>> getStatus() {
if (revalCurrentPlacementSyncJob != null) {
statusMap.put("revalCurrentPlacementJob", revalCurrentPlacementSyncJob.isCurrentlyRunning());
}
if (postFundingSyncJob != null) {
statusMap.put("postFundingSyncJob", postFundingSyncJob.isCurrentlyRunning());
}
statusMap.put("postFundingSyncJob", postFundingSyncJob.isCurrentlyRunning());
return ResponseEntity.ok().body(statusMap);
}

Expand Down Expand Up @@ -190,11 +188,7 @@ public ResponseEntity<String> runJob(@PathVariable String name,
}
break;
case "postFundingSyncJob":
if (postFundingSyncJob != null) {
status = ensureRunning(postFundingSyncJob, params);
} else {
return ResponseEntity.badRequest().body(JOB_NOT_FOUND);
}
status = ensureRunning(postFundingSyncJob, params);
break;
default:
return ResponseEntity.badRequest().body(JOB_NOT_FOUND);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void setRevalCurrentPmSyncJob(RevalCurrentPmSyncJob revalCurrentPmSyncJob
this.revalCurrentPmSyncJob = revalCurrentPmSyncJob;
}

@Autowired(required = false)
@Autowired
public void setPostFundingSyncJob(PostFundingSyncJob postFundingSyncJob) {
this.postFundingSyncJob = postFundingSyncJob;
}
Expand Down Expand Up @@ -113,12 +113,10 @@ public void runJobs() {
Thread.sleep(SLEEP_DURATION);
} while (revalCurrentPmSyncJob.isCurrentlyRunning());
}
if (postFundingSyncJob != null) {
postFundingSyncJob.postFundingSyncJob();
do {
Thread.sleep(SLEEP_DURATION);
} while (postFundingSyncJob.isCurrentlyRunning());
}
postFundingSyncJob.postFundingSyncJob();
do {
Thread.sleep(SLEEP_DURATION);
} while (postFundingSyncJob.isCurrentlyRunning());
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
}
Expand Down
68 changes: 37 additions & 31 deletions src/main/java/uk/nhs/tis/sync/job/PostFundingSyncJob.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package uk.nhs.tis.sync.job;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.transformuk.hee.tis.tcs.api.enumeration.Status;
import com.transformuk.hee.tis.tcs.service.model.Post;
import com.transformuk.hee.tis.tcs.service.model.PostFunding;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import javax.persistence.EntityManager;
import net.javacrumbs.shedlock.core.SchedulerLock;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -26,30 +25,30 @@
@Component
@ManagedResource(objectName = "sync.mbean:name=PostFundingSyncJob",
description = "Job for updating funding status for posts")
public class PostFundingSyncJob extends PersonDateChangeCaptureSyncJobTemplate<Post> {
public class PostFundingSyncJob extends PostFundingSyncJobTemplate<Post> {

private static final Logger LOG = LoggerFactory.getLogger(PostFundingSyncJob.class);

private static final String BASE_QUERY = "SELECT DISTINCT p.id FROM Post p"
+ " JOIN ( SELECT postId FROM PostFunding"
+ " WHERE startDate IS NOT NULL AND (endDate = ':endDate' OR endDate IS NULL)"
+ " GROUP BY postId"
+ " HAVING COUNT(id) > 0)"
+ " pf ON p.id = pf.postId ORDER BY p.id LIMIT :pageSize";

private final ObjectMapper objectMapper;

public PostFundingSyncJob(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
private static final String BASE_QUERY = "SELECT DISTINCT p.id FROM Post p "
+ " JOIN ( "
+ " SELECT postId "
+ " FROM PostFunding "
+ " WHERE postId > :lastPostId "
+ " AND startDate IS NOT NULL "
+ " AND (endDate = ':endDate' OR endDate IS NULL) "
+ " GROUP BY postId "
+ " ) pf ON p.id = pf.postId "
+ " ORDER BY p.id LIMIT :pageSize ";

@Override
public void run(String params) {
postFundingSyncJob();
}

@Scheduled(cron = "${application.cron.postFundingSyncJob}")
@ManagedOperation(description = "update post funding status")
@SchedulerLock(name = "postFundingScheduledTask", lockAtLeastFor = FIFTEEN_MIN,
lockAtMostFor = FIFTEEN_MIN)
@ManagedOperation(description = "Update post funding status")
public void postFundingSyncJob() {
super.runSyncJob(null);
}
Expand All @@ -64,21 +63,28 @@ protected String buildQueryForDate(LocalDate dateOfChange) {
protected int convertData(Set<Post> entitiesToSave, List<Long> entityData,
EntityManager entityManager) {
int entities = entityData.size();
entityData.stream()
.map(id -> entityManager.find(Post.class, id))
.filter(Objects::nonNull)
.forEach(post -> {
// check if the post has multiple post fundings
Set<PostFunding> postFundings = post.getFundings();
if (postFundings.size() > 1) {
// if the post has multiple post fundings, do nothing
entitiesToSave.add(post);
} else if (postFundings.size() == 1) {
// if the post has a single post funding, set its status to "INACTIVE"
post.setFundingStatus(Status.INACTIVE);
entitiesToSave.add(post);
}
});
for (Long id : entityData) {
Post post = entityManager.find(Post.class, id);
if (post != null) {
// Explicitly load PostFunding entities without triggering further lazy loading
List<PostFunding> fundings = entityManager.createQuery(
"SELECT pf FROM PostFunding pf WHERE pf.post.id = :postId",
PostFunding.class)
.setParameter("postId", post.getId())
.getResultList();

if (fundings.size() > 1) {
// If there are multiple fundings, save the current post
entitiesToSave.add(post);
} else if (fundings.size() == 1) {
// If there's a single funding, update status to inactive and save
PostFunding funding = fundings.get(0);
LOG.info("Funding for the post {} is {} ", post.toString(), funding.toString());
post.setFundingStatus(Status.INACTIVE);
entitiesToSave.add(post);
}
}
}
return entities - entitiesToSave.size();
}

Expand Down
172 changes: 172 additions & 0 deletions src/main/java/uk/nhs/tis/sync/job/PostFundingSyncJobTemplate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package uk.nhs.tis.sync.job;

import com.google.common.base.Stopwatch;
import com.google.common.collect.Sets;
import java.math.BigInteger;
import java.time.LocalDate;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.EntityTransaction;
import javax.persistence.Query;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.jmx.export.annotation.ManagedOperation;
import uk.nhs.tis.sync.event.JobExecutionEvent;

public abstract class PostFundingSyncJobTemplate<T> implements RunnableJob {

protected static final int DEFAULT_PAGE_SIZE = 5000;
protected static final int FIFTEEN_MIN = 15 * 60 * 1000;
protected static final String FULL_SYNC_DATE_STR = "ANY";
protected static final String NO_DATE_OVERRIDE = "NONE";
private static final Logger LOG = LoggerFactory.getLogger(PostFundingSyncJobTemplate.class);
protected String dateOfChangeOverride;
@Autowired
protected EntityManagerFactory entityManagerFactory;
@Autowired(required = false)
protected ApplicationEventPublisher applicationEventPublisher;
private Stopwatch mainStopWatch;

protected abstract String buildQueryForDate(LocalDate dateOfChange);

protected abstract int convertData(Set<T> entitiesToSave, List<Long> entityData,
EntityManager entityManager);

protected abstract void handleData(Set<T> dataToSave, EntityManager entityManager);

protected String getJobName() {
return this.getClass().getSimpleName();
}

protected List<Long> collectData(long lastPostId, String queryString,
EntityManager entityManager) {
Query query =
entityManager.createNativeQuery(queryString).setParameter("lastPostId", lastPostId);

List<BigInteger> resultList = query.getResultList();
return resultList.stream().filter(Objects::nonNull)
.map(result -> Long.parseLong(result.toString()))
.collect(Collectors.toList());
}

@ManagedOperation(description = "Is the sync job currently running")
public boolean isCurrentlyRunning() {
return mainStopWatch != null;
}

@ManagedOperation(description = "The current elapsed time of the current sync job")
public String elapsedTime() {
return mainStopWatch != null ? mainStopWatch.toString() : "0s";
}

protected void runSyncJob(String dateOption) {
if (mainStopWatch != null) {
LOG.info("Sync job [{}] already running, exiting this execution", getJobName());
return;
}
CompletableFuture.runAsync(() -> doDataSync(dateOption))
.exceptionally(t -> {
publishJobexecutionEvent(
new JobExecutionEvent(this, getFailureMessage(Optional.ofNullable(getJobName()), t)));
LOG.error("Job run ended due an Exception", t);
return null;
});
}

private void doDataSync(String dateOption) {
LocalDate dateOfChange = magicallyGetDateOfChanges(dateOption);
String queryString = buildQueryForDate(dateOfChange);
int skipped = 0;
int totalRecords = 0;
long lastEntityId = 0;
boolean hasMoreResults = true;
Set<T> dataToSave = Sets.newHashSet();
LOG.debug("Job will run with query:[{}]", queryString);

publishJobexecutionEvent(new JobExecutionEvent(this, "Sync [" + getJobName() + "] started."));
LOG.info("Sync [{}] started", getJobName());
mainStopWatch = Stopwatch.createStarted();
Stopwatch stopwatch = Stopwatch.createStarted();

EntityManager entityManager = null;
EntityTransaction transaction = null;

try {
while (hasMoreResults) {
entityManager = this.entityManagerFactory.createEntityManager();
transaction = entityManager.getTransaction();
transaction.begin();
List<Long> collectedData =
collectData(lastEntityId, queryString, entityManager);
hasMoreResults = !collectedData.isEmpty();
LOG.info("Time taken to read chunk : [{}]", stopwatch);
if (CollectionUtils.isNotEmpty(collectedData)) {
lastEntityId = collectedData.get(collectedData.size() - 1);
totalRecords += collectedData.size();
skipped += convertData(dataToSave, collectedData, entityManager);
}
stopwatch.reset().start();
handleData(dataToSave, entityManager);
LOG.debug("Collected {} records and attempted to process {}.", collectedData.size(),
dataToSave.size());
dataToSave.clear();
transaction.commit();
entityManager.close();
LOG.info("Time taken to save/handle chunk : [{}]", stopwatch);
stopwatch.reset().start();
}
LOG.info("Sync job [{}] finished. Total time taken {} for processing [{}] records",
getJobName(), mainStopWatch.stop(), totalRecords);
LOG.info("Skipped records {}", skipped);
mainStopWatch = null;
publishJobexecutionEvent(
new JobExecutionEvent(this, getSuccessMessage(Optional.ofNullable(getJobName()))));
} finally {
mainStopWatch = null;
if (transaction != null && transaction.isActive()) {
transaction.rollback();
}
if (entityManager != null && entityManager.isOpen()) {
entityManager.close();
}
}
}

private LocalDate magicallyGetDateOfChanges(String dateToUse) {
if (StringUtils.equalsIgnoreCase(dateToUse, NO_DATE_OVERRIDE)) {
dateToUse = this.dateOfChangeOverride;
}
if (StringUtils.isEmpty(dateToUse)) {
return LocalDate.now();
}
if (FULL_SYNC_DATE_STR.equalsIgnoreCase(dateToUse)) {
return null;
}
return LocalDate.parse(dateToUse);
}

protected String getSuccessMessage(Optional<String> jobName) {
return "Sync [" + jobName.orElse(getJobName()) + "] completed successfully.";
}

protected String getFailureMessage(Optional<String> jobName, Throwable e) {
return "<!channel> Sync [" + jobName.orElse(getJobName()) + "] failed with exception ["
+ e.getMessage() + "].";
}

protected void publishJobexecutionEvent(JobExecutionEvent event) {
if (applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(event);
}
}
}
10 changes: 0 additions & 10 deletions src/test/java/uk/nhs/tis/sync/api/JobResourceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,16 +216,6 @@ void shouldReturnErrorWhenRevalCurrentPlacementSyncIsTriggeredButNotAvailable()
.andExpect(jsonPath("$.error").value("Job not found"));
}

@Test
void shouldReturnErrorWhenPostFundingSyncJobIsTriggeredButNotAvailable() throws Exception {
jobResource.setPostFundingSyncJob(null);

mockMvc.perform(put("/api/job/postFundingSyncJob")
.contentType(MediaType.APPLICATION_JSON))
.andExpect(status().isBadRequest())
.andExpect(jsonPath("$.error").value("Job not found"));
}

@DisplayName("run personRecordStatusJob with correct date argument")
@ParameterizedTest(name = "Should return 'Just started' status when personRecordStatusJob is triggered with \"{0}\".")
@ValueSource(strings = {
Expand Down
Loading

0 comments on commit bfae2a9

Please sign in to comment.