Skip to content

Commit

Permalink
Collect common data in job queue
Browse files Browse the repository at this point in the history
  • Loading branch information
vpaturet committed Sep 27, 2024
1 parent 5eeff8d commit bdf2a5f
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 28 deletions.
2 changes: 2 additions & 0 deletions src/main/java/no/entur/antu/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ public final class Constants {
"TimetableSweden";
public static final String VALIDATION_PROFILE_STOP = "Stop";

public static final String PROP_NETEX_FILE_CONTENT = "NETEX_FILE_CONTENT";

public static final String CAMEL_ALL_HTTP_HEADERS = "CamelHttp*";
public static final String VALIDATION_REPORT_PREFIX = "/validation-report-";
public static final String VALIDATION_REPORT_SUFFIX = ".json";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
package no.entur.antu.routes.validation;

import static no.entur.antu.Constants.FILE_HANDLE;
import static no.entur.antu.Constants.NETEX_COMMON_FILE_NAME;
import static no.entur.antu.Constants.NETEX_FILE_NAME;
import static no.entur.antu.Constants.PROP_NETEX_FILE_CONTENT;
import static no.entur.antu.Constants.VALIDATION_REPORT_ID_HEADER;
import static no.entur.antu.routes.memorystore.MemoryStoreRoute.MEMORY_STORE_FILE_NAME;

import no.entur.antu.routes.BaseRouteBuilder;
import org.apache.camel.LoggingLevel;
Expand All @@ -48,17 +47,19 @@ public void configure() throws Exception {
super.configure();

from("direct:storeCommonData")
.filter(header(NETEX_FILE_NAME).startsWith("_"))
.log(
LoggingLevel.INFO,
correlation() + "Parsing NeTEx file ${header." + FILE_HANDLE + "}"
correlation() +
"Extracting common data from NeTEx file ${header." +
NETEX_FILE_NAME +
"}"
)
.setProperty(PROP_STOP_WATCH, StopWatch::new)
.doTry()
.setHeader(MEMORY_STORE_FILE_NAME, header(NETEX_COMMON_FILE_NAME))
.to("direct:downloadSingleNetexFileFromMemoryStore")
.process(exchange ->
commonDataRepository.loadCommonDataCache(
exchange.getIn().getBody(byte[].class),
exchange.getProperty(PROP_NETEX_FILE_CONTENT, byte[].class),
exchange.getIn().getHeader(VALIDATION_REPORT_ID_HEADER, String.class)
)
)
Expand All @@ -77,7 +78,7 @@ public void configure() throws Exception {
.log(
LoggingLevel.INFO,
correlation() +
"Parsed NeTEx file ${header." +
"Extracted common data from NeTEx file ${header." +
NETEX_FILE_NAME +
"} in " +
"${exchangeProperty." +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static no.entur.antu.Constants.FILENAME_DELIMITER;
import static no.entur.antu.Constants.FILE_HANDLE;
import static no.entur.antu.Constants.JOB_TYPE_VALIDATE;
import static no.entur.antu.Constants.NETEX_COMMON_FILE_NAME;
import static no.entur.antu.Constants.NETEX_FILE_NAME;
import static no.entur.antu.Constants.TEMPORARY_FILE_NAME;
import static no.entur.antu.Constants.VALIDATION_DATASET_FILE_HANDLE_HEADER;
Expand Down Expand Up @@ -79,7 +78,6 @@ public void configure() throws Exception {
})
.choice()
.when(header(DATASET_NB_COMMON_FILES).isGreaterThan(0))
.to("direct:parseAndStoreCommonData")
.to("direct:createCommonFilesValidationJobs")
.otherwise()
// skip the common file barrier and go directly to the line file job creation step
Expand Down Expand Up @@ -155,22 +153,6 @@ public void configure() throws Exception {
.setBody(constant(""))
.routeId("upload-single-netex-files");

from("direct:parseAndStoreCommonData")
.log(LoggingLevel.DEBUG, correlation() + "Start parsing common files")
.split(exchangeProperty(PROP_ALL_NETEX_FILE_NAMES))
.filter(body().startsWith("_"))
.setHeader(
Constants.DATASET_NB_NETEX_FILES,
exchangeProperty(Exchange.SPLIT_SIZE)
)
.setHeader(NETEX_COMMON_FILE_NAME, body())
.setHeader(FILE_HANDLE, simple(Constants.GCS_BUCKET_FILE_NAME))
.log(LoggingLevel.TRACE, correlation() + "All NeTEx Files: ${body}")
.to("direct:storeCommonData")
//end split
.end()
.routeId("create-parse-and-store-common-data-jobs");

from("direct:createCommonFilesValidationJobs")
.log(
LoggingLevel.DEBUG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static no.entur.antu.Constants.VALIDATION_REPORT_ID_HEADER;
import static no.entur.antu.routes.memorystore.MemoryStoreRoute.MEMORY_STORE_FILE_NAME;

import no.entur.antu.Constants;
import no.entur.antu.exception.AntuException;
import no.entur.antu.exception.RetryableAntuException;
import no.entur.antu.memorystore.AntuMemoryStoreFileNotFoundException;
Expand All @@ -46,7 +47,6 @@
@Component
public class ValidateFilesRouteBuilder extends BaseRouteBuilder {

private static final String PROP_NETEX_FILE_CONTENT = "NETEX_FILE_CONTENT";
private static final String PROP_ALL_NETEX_FILE_NAMES =
"ALL_NETEX_FILE_NAMES";
private static final String PROP_STOP_WATCH = "PROP_STOP_WATCH";
Expand All @@ -68,8 +68,9 @@ public void configure() throws Exception {
.doTry()
.setHeader(MEMORY_STORE_FILE_NAME, header(NETEX_FILE_NAME))
.to("direct:downloadSingleNetexFileFromMemoryStore")
.setProperty(PROP_NETEX_FILE_CONTENT, body())
.setProperty(Constants.PROP_NETEX_FILE_CONTENT, body())
.to("direct:runNetexValidators")
.to("direct:storeCommonData")
// Duplicated PubSub messages are detected when trying to download the NeTEx file:
// it does not exist anymore after the report is generated and all temporary files are deleted
.doCatch(AntuMemoryStoreFileNotFoundException.class)
Expand Down Expand Up @@ -147,7 +148,7 @@ public void configure() throws Exception {
"},${header." +
NETEX_FILE_NAME +
"},${exchangeProperty." +
PROP_NETEX_FILE_CONTENT +
Constants.PROP_NETEX_FILE_CONTENT +
"},${exchangeProperty." +
PROP_NETEX_VALIDATION_CALLBACK +
"})"
Expand Down

0 comments on commit bdf2a5f

Please sign in to comment.