diff --git a/.aws/task-definition-template.json b/.aws/task-definition-template.json index 3aa0c0b7..a06380ab 100644 --- a/.aws/task-definition-template.json +++ b/.aws/task-definition-template.json @@ -177,6 +177,10 @@ "name": "APPLICATION_CRON_REVALCURRENTPLACEMENTJOB", "valueFrom": "application_cron_revalcurrentplacementjob-sync-${environment}" }, + { + "name": "APPLICATION_CRON_POSTFUNDINGSYNCJOB", + "valueFrom": "application_cron_postfundingjob-sync-${environment}" + }, { "name": "APPLICATION_JOBS_PERSONELASTICSEARCHJOB_PAGESIZE", "valueFrom": "application_jobs_personelasticsearchjob_pagesize-sync-${environment}" diff --git a/.github/workflows/ci-cd-workflow.yml b/.github/workflows/ci-cd-workflow.yml index 0802062c..77584e21 100644 --- a/.github/workflows/ci-cd-workflow.yml +++ b/.github/workflows/ci-cd-workflow.yml @@ -24,7 +24,7 @@ jobs: with: cache: maven distribution: temurin - java-version: 11 + java-version: 17 - name: Configure AWS credentials uses: aws-actions/configure-aws-credentials@v2 @@ -102,7 +102,7 @@ jobs: with: cache: maven distribution: temurin - java-version: 11 + java-version: 17 - name: Configure AWS credentials uses: aws-actions/configure-aws-credentials@v2 diff --git a/pom.xml b/pom.xml index b701d78c..4d185867 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ uk.nhs.tis sync - 1.21.0 + 1.22.0 jar sync Separate Microservice for synchronisation @@ -48,12 +48,12 @@ uk.nhs.tis tcs-persistence - 2.23.1 + 2.32.0 com.transformuk.hee.tis tcs-client - 6.1.0 + 6.2.2 com.transformuk.hee @@ -235,7 +235,7 @@ org.projectlombok lombok - 1.18.16 + 1.18.20 org.mapstruct diff --git a/src/main/java/uk/nhs/tis/sync/api/JobResource.java b/src/main/java/uk/nhs/tis/sync/api/JobResource.java index ac2b18e4..3bdd9f68 100644 --- a/src/main/java/uk/nhs/tis/sync/api/JobResource.java +++ b/src/main/java/uk/nhs/tis/sync/api/JobResource.java @@ -20,6 +20,7 @@ import uk.nhs.tis.sync.job.PersonPlacementTrainingBodyTrustJob; import uk.nhs.tis.sync.job.PersonRecordStatusJob; import uk.nhs.tis.sync.job.PostEmployingBodyTrustJob; +import uk.nhs.tis.sync.job.PostFundingSyncJob; import uk.nhs.tis.sync.job.PostTrainingBodyTrustJob; import uk.nhs.tis.sync.job.RunnableJob; import uk.nhs.tis.sync.job.person.PersonElasticSearchSyncJob; @@ -47,6 +48,7 @@ public class JobResource { private final PersonRecordStatusJob personRecordStatusJob; private RevalCurrentPmSyncJob revalCurrentPmSyncJob; private RevalCurrentPlacementSyncJob revalCurrentPlacementSyncJob; + private PostFundingSyncJob postFundingSyncJob; @Autowired private JobRunningListener jobRunningListener; @Value("${spring.profiles.active:}") @@ -79,11 +81,16 @@ public void setRevalCurrentPlacementSyncJob( this.revalCurrentPlacementSyncJob = revalCurrentPlacementSyncJob; } + @Autowired + public void setPostFundingSyncJob(PostFundingSyncJob postFundingSyncJob) { + this.postFundingSyncJob = postFundingSyncJob; + } + /** * GET /jobs/status : Get all the status of 8 jobs. * * @return map of the status for most jobs. eg. {"personPlacementEmployingBodyTrustJob", "true"}, - * which means personPlacementEmployingBodyTrustJob is currently running. + * which means personPlacementEmployingBodyTrustJob is currently running. */ @GetMapping("/jobs/status") @PreAuthorize("hasPermission('tis:sync::jobs:', 'View')") @@ -104,6 +111,7 @@ public ResponseEntity> getStatus() { if (revalCurrentPlacementSyncJob != null) { statusMap.put("revalCurrentPlacementJob", revalCurrentPlacementSyncJob.isCurrentlyRunning()); } + statusMap.put("postFundingSyncJob", postFundingSyncJob.isCurrentlyRunning()); return ResponseEntity.ok().body(statusMap); } @@ -128,7 +136,7 @@ public ResponseEntity runJobsSequentially() { * * @param name the name of the job to run * @return status of the requested job : "Already running" - the job has been running before - * triggering it "Just started" - the job has been started by this request + * triggering it "Just started" - the job has been started by this request */ @PutMapping("/job/{name}") @PreAuthorize("hasPermission('tis:sync::jobs:', 'Update')") @@ -179,6 +187,9 @@ public ResponseEntity runJob(@PathVariable String name, return ResponseEntity.badRequest().body(JOB_NOT_FOUND); } break; + case "postFundingSyncJob": + status = ensureRunning(postFundingSyncJob, params); + break; default: return ResponseEntity.badRequest().body(JOB_NOT_FOUND); } diff --git a/src/main/java/uk/nhs/tis/sync/event/listener/JobRunningListener.java b/src/main/java/uk/nhs/tis/sync/event/listener/JobRunningListener.java index 35bca9f4..0de5c67e 100644 --- a/src/main/java/uk/nhs/tis/sync/event/listener/JobRunningListener.java +++ b/src/main/java/uk/nhs/tis/sync/event/listener/JobRunningListener.java @@ -16,6 +16,7 @@ import uk.nhs.tis.sync.job.PersonPlacementTrainingBodyTrustJob; import uk.nhs.tis.sync.job.PersonRecordStatusJob; import uk.nhs.tis.sync.job.PostEmployingBodyTrustJob; +import uk.nhs.tis.sync.job.PostFundingSyncJob; import uk.nhs.tis.sync.job.PostTrainingBodyTrustJob; import uk.nhs.tis.sync.job.person.PersonElasticSearchSyncJob; import uk.nhs.tis.sync.job.reval.RevalCurrentPmSyncJob; @@ -50,6 +51,8 @@ public class JobRunningListener implements ApplicationListener :lastPostId " + + " AND startDate IS NOT NULL " + + " AND (endDate = ':endDate' OR endDate IS NULL) " + + " GROUP BY postId " + + " ORDER BY postId LIMIT :pageSize "; + + @Autowired(required = false) + protected ApplicationEventPublisher applicationEventPublisher; + private Stopwatch mainStopWatch; + + @Scheduled(cron = "${application.cron.postFundingSyncJob}") + @SchedulerLock(name = "postFundingSyncJobTask", lockAtLeastFor = FIFTEEN_MIN, lockAtMostFor = FIFTEEN_MIN) + @ManagedOperation(description = "Run sync of modifying the post funding status") + public void postFundingSyncJob() { + runSyncJob(); + } + + @Override + @ManagedOperation(description = "Is the Post funding sync job currently running") + public boolean isCurrentlyRunning() { + return mainStopWatch != null; + } + + @Override + public void run(@Nullable String params) { + postFundingSyncJob(); + } + + protected void runSyncJob() { + if (isCurrentlyRunning()) { + LOG.info("Sync job [{}] already running, exiting this execution", JOB_NAME); + return; + } + CompletableFuture.runAsync(() -> { + doDataSync(); + }); + } + + private void doDataSync() { + String queryString = buildQueryForDate(); + int skipped = 0; + int totalRecords = 0; + long lastEntityId = 0; + boolean hasMoreResults = true; + Set dataToSave = Sets.newHashSet(); + LOG.debug("Job will run with query:[{}]", queryString); + + publishJobexecutionEvent(new JobExecutionEvent(this, "Sync [" + getJobName() + "] started.")); + LOG.info("Sync [{}] started", getJobName()); + mainStopWatch = Stopwatch.createStarted(); + Stopwatch stopwatch = Stopwatch.createStarted(); + + EntityManager entityManager = null; + EntityTransaction transaction = null; + + try { + while (hasMoreResults) { + entityManager = this.entityManagerFactory.createEntityManager(); + transaction = entityManager.getTransaction(); + transaction.begin(); + List collectedData = + collectDataFromTheLastPostId(lastEntityId, queryString, entityManager); + hasMoreResults = !collectedData.isEmpty(); + LOG.info("Time taken to read chunk : [{}]", stopwatch); + if (CollectionUtils.isNotEmpty(collectedData)) { + lastEntityId = collectedData.get(collectedData.size() - 1); + totalRecords += collectedData.size(); + skipped += convertData(dataToSave, collectedData, entityManager); + } + stopwatch.reset().start(); + handleData(dataToSave, entityManager); + LOG.debug("Collected {} records and attempted to process {}.", collectedData.size(), + dataToSave.size()); + dataToSave.clear(); + transaction.commit(); + entityManager.close(); + LOG.info("Time taken to save/handle chunk : [{}]", stopwatch); + stopwatch.reset().start(); + } + LOG.info("Sync job [{}] finished. Total time taken {} for processing [{}] records", + getJobName(), mainStopWatch.stop(), totalRecords); + LOG.info("Skipped records {}", skipped); + mainStopWatch = null; + publishJobexecutionEvent( + new JobExecutionEvent(this, getSuccessMessage(Optional.ofNullable(getJobName())))); + } finally { + mainStopWatch = null; + if (transaction != null && transaction.isActive()) { + transaction.rollback(); + } + if (entityManager != null && entityManager.isOpen()) { + entityManager.close(); + } + } + } + + private String getSuccessMessage(Optional jobName) { + return "Sync [" + jobName.orElse(getJobName()) + "] completed successfully."; + } + + private void publishJobexecutionEvent(JobExecutionEvent event) { + if (applicationEventPublisher != null) { + applicationEventPublisher.publishEvent(event); + } + } + + protected String buildQueryForDate() { + LocalDate today = LocalDate.now(); + String endDate = today.minusDays(1).format(DateTimeFormatter.ISO_LOCAL_DATE); + return BASE_QUERY.replace(":endDate", endDate).replace(":pageSize", "" + DEFAULT_PAGE_SIZE); + } + + protected String getJobName() { + return this.getClass().getSimpleName(); + } + + protected int convertData(Set entitiesToSave, List entityData, + EntityManager entityManager) { + int entities = entityData.size(); + for (Long id : entityData) { + Post post = entityManager.find(Post.class, id); + + if (post != null) { + Set fundings = post.getFundings(); + + if (fundings.size() > 1) { + entitiesToSave.add(post); + } else if (fundings.size() == 1) { + post.setFundingStatus(Status.INACTIVE); + entitiesToSave.add(post); + } + } + } + return entities - entitiesToSave.size(); + } + + protected void handleData(Set dataToSave, EntityManager entityManager) { + if (CollectionUtils.isNotEmpty(dataToSave)) { + dataToSave.forEach(entityManager::persist); + entityManager.flush(); + } + } + + protected List collectDataFromTheLastPostId(long lastPostId, String queryString, + EntityManager entityManager) { + Query query = + entityManager.createNativeQuery(queryString).setParameter("lastPostId", lastPostId); + + List resultList = query.getResultList(); + return resultList.stream().filter(Objects::nonNull) + .map(result -> Long.parseLong(result.toString())) + .collect(Collectors.toList()); + } +} diff --git a/src/main/resources/config/application.yml b/src/main/resources/config/application.yml index bc03a39e..b7b50708 100644 --- a/src/main/resources/config/application.yml +++ b/src/main/resources/config/application.yml @@ -207,6 +207,7 @@ application: revalCurrentPmJob: ${APPLICATION_CRON_REVALCURRENTPMJOB:0 39 1 * * *} revalCurrentPlacementJob: ${APPLICATION_CRON_REVALCURRENTPLACEMENTJOB:0 49 1 * * *} recordResendingJob: ${APPLICATION_CRON_RECORDRESENDINGJOB} + postFundingSyncJob: ${APPLICATION_CRON_POSTFUNDINGSYNCJOB:0 0 3 * * *} jobs: runOnStartup: earliest: ${APPLICATION_JOBS_RUNONSTARTUP_EARLIEST:00:00} diff --git a/src/main/resources/static/index.html b/src/main/resources/static/index.html index 908499fd..1c7527aa 100644 --- a/src/main/resources/static/index.html +++ b/src/main/resources/static/index.html @@ -44,6 +44,11 @@

Person Record Status Sync

+
+

Post Funding Sync Job

+ +

+