Skip to content

Commit

Permalink
Collect common data in job queue
Browse files Browse the repository at this point in the history
  • Loading branch information
vpaturet committed Sep 25, 2024
1 parent 7ac6200 commit f6b502a
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
package no.entur.antu.routes.validation;

import static no.entur.antu.Constants.FILE_HANDLE;
import static no.entur.antu.Constants.NETEX_COMMON_FILE_NAME;
import static no.entur.antu.Constants.NETEX_FILE_NAME;
import static no.entur.antu.Constants.VALIDATION_REPORT_ID_HEADER;
import static no.entur.antu.routes.memorystore.MemoryStoreRoute.MEMORY_STORE_FILE_NAME;
import static no.entur.antu.routes.validation.ValidateFilesRouteBuilder.PROP_NETEX_FILE_CONTENT;

import no.entur.antu.commondata.CommonDataRepository;
import no.entur.antu.routes.BaseRouteBuilder;
Expand All @@ -48,17 +47,19 @@ public void configure() throws Exception {
super.configure();

from("direct:storeCommonData")
.filter(header(NETEX_FILE_NAME).startsWith("_"))
.log(
LoggingLevel.INFO,
correlation() + "Parsing NeTEx file ${header." + FILE_HANDLE + "}"
correlation() +
"Extracting common data from NeTEx file ${header." +
NETEX_FILE_NAME +
"}"
)
.setProperty(PROP_STOP_WATCH, StopWatch::new)
.doTry()
.setHeader(MEMORY_STORE_FILE_NAME, header(NETEX_COMMON_FILE_NAME))
.to("direct:downloadSingleNetexFileFromMemoryStore")
.process(exchange ->
commonDataRepository.loadCommonDataCache(
exchange.getIn().getBody(byte[].class),
exchange.getProperty(PROP_NETEX_FILE_CONTENT, byte[].class),
exchange.getIn().getHeader(VALIDATION_REPORT_ID_HEADER, String.class)
)
)
Expand All @@ -77,7 +78,7 @@ public void configure() throws Exception {
.log(
LoggingLevel.INFO,
correlation() +
"Parsed NeTEx file ${header." +
"Extracted common data from NeTEx file ${header." +
NETEX_FILE_NAME +
"} in " +
"${exchangeProperty." +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static no.entur.antu.Constants.FILENAME_DELIMITER;
import static no.entur.antu.Constants.FILE_HANDLE;
import static no.entur.antu.Constants.JOB_TYPE_VALIDATE;
import static no.entur.antu.Constants.NETEX_COMMON_FILE_NAME;
import static no.entur.antu.Constants.NETEX_FILE_NAME;
import static no.entur.antu.Constants.TEMPORARY_FILE_NAME;
import static no.entur.antu.Constants.VALIDATION_DATASET_FILE_HANDLE_HEADER;
Expand Down Expand Up @@ -79,7 +78,6 @@ public void configure() throws Exception {
})
.choice()
.when(header(DATASET_NB_COMMON_FILES).isGreaterThan(0))
.to("direct:parseAndStoreCommonData")
.to("direct:createCommonFilesValidationJobs")
.otherwise()
// skip the common file barrier and go directly to the line file job creation step
Expand Down Expand Up @@ -155,22 +153,6 @@ public void configure() throws Exception {
.setBody(constant(""))
.routeId("upload-single-netex-files");

from("direct:parseAndStoreCommonData")
.log(LoggingLevel.DEBUG, correlation() + "Start parsing common files")
.split(exchangeProperty(PROP_ALL_NETEX_FILE_NAMES))
.filter(body().startsWith("_"))
.setHeader(
Constants.DATASET_NB_NETEX_FILES,
exchangeProperty(Exchange.SPLIT_SIZE)
)
.setHeader(NETEX_COMMON_FILE_NAME, body())
.setHeader(FILE_HANDLE, simple(Constants.GCS_BUCKET_FILE_NAME))
.log(LoggingLevel.TRACE, correlation() + "All NeTEx Files: ${body}")
.to("direct:storeCommonData")
//end split
.end()
.routeId("create-parse-and-store-common-data-jobs");

from("direct:createCommonFilesValidationJobs")
.log(
LoggingLevel.DEBUG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
@Component
public class ValidateFilesRouteBuilder extends BaseRouteBuilder {

private static final String PROP_NETEX_FILE_CONTENT = "NETEX_FILE_CONTENT";
public static final String PROP_NETEX_FILE_CONTENT = "NETEX_FILE_CONTENT";
private static final String PROP_ALL_NETEX_FILE_NAMES =
"ALL_NETEX_FILE_NAMES";
private static final String PROP_STOP_WATCH = "PROP_STOP_WATCH";
Expand All @@ -70,6 +70,7 @@ public void configure() throws Exception {
.to("direct:downloadSingleNetexFileFromMemoryStore")
.setProperty(PROP_NETEX_FILE_CONTENT, body())
.to("direct:runNetexValidators")
.to("direct:storeCommonData")
// Duplicated PubSub messages are detected when trying to download the NeTEx file:
// it does not exist anymore after the report is generated and all temporary files are deleted
.doCatch(AntuMemoryStoreFileNotFoundException.class)
Expand Down

0 comments on commit f6b502a

Please sign in to comment.