Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect common data in job queue #554

Merged
merged 1 commit into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/main/java/no/entur/antu/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ public final class Constants {
public static final String VALIDATION_CLIENT_MARDUK = "Marduk";
public static final String VALIDATION_CLIENT_KAKKA = "Kakka";

public static final String PROP_NETEX_FILE_CONTENT = "NETEX_FILE_CONTENT";

public static final String CAMEL_ALL_HTTP_HEADERS = "CamelHttp*";
public static final String VALIDATION_REPORT_PREFIX = "/validation-report-";
public static final String VALIDATION_REPORT_SUFFIX = ".json";
Expand Down
9 changes: 0 additions & 9 deletions src/main/java/no/entur/antu/config/NetexDataConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.util.List;
import java.util.Map;
import no.entur.antu.netexdata.DefaultNetexDataRepository;
import no.entur.antu.netexdata.NetexDataResource;
import org.entur.netex.validation.validator.jaxb.NetexDataRepository;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Qualifier;
Expand All @@ -20,16 +19,9 @@
@Configuration
public class NetexDataConfig {

@Bean
@Profile("!test")
NetexDataResource netexDataResource() {
return new NetexDataResource();
}

@Bean
@Profile("!test")
NetexDataRepository netexDataRepository(
NetexDataResource netexDataResource,
RedissonClient redissonClient,
@Qualifier(
SCHEDULED_STOP_POINT_AND_QUAY_ID_CACHE
Expand All @@ -46,7 +38,6 @@ NetexDataRepository netexDataRepository(
) Map<String, List<String>> serviceJourneyInterchangeInfoCache
) {
return new DefaultNetexDataRepository(
netexDataResource,
redissonClient,
scheduledStopPointAndQuayIdCache,
serviceLinksAndFromToScheduledStopPointIdCache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ public class DefaultNetexDataRepository implements NetexDataRepository {
DefaultNetexDataRepository.class
);

private final NetexDataResource netexDataResource;
private final RedissonClient redissonClient;
private final Map<String, Map<String, String>> scheduledStopPointAndQuayIdCache;
private final Map<String, Map<String, String>> serviceLinksAndFromToScheduledStopPointIdCache;
Expand All @@ -32,15 +31,13 @@ public class DefaultNetexDataRepository implements NetexDataRepository {
private final Map<String, List<String>> serviceJourneyInterchangeInfoCache;

public DefaultNetexDataRepository(
NetexDataResource netexDataResource,
RedissonClient redissonClient,
Map<String, Map<String, String>> scheduledStopPointAndQuayIdCache,
Map<String, Map<String, String>> serviceLinksAndFromToScheduledStopPointIdCache,
Map<String, List<String>> lineInfoCache,
Map<String, Map<String, List<String>>> serviceJourneyStopsCache,
Map<String, List<String>> serviceJourneyInterchangeInfoCache
) {
this.netexDataResource = netexDataResource;
this.redissonClient = redissonClient;
this.scheduledStopPointAndQuayIdCache = scheduledStopPointAndQuayIdCache;
this.serviceLinksAndFromToScheduledStopPointIdCache =
Expand Down Expand Up @@ -145,6 +142,7 @@ public void fillNetexDataCache(
byte[] fileContent,
String validationReportId
) {
NetexDataResource netexDataResource = new NetexDataResource();
netexDataResource.loadNetexData(fileContent);
// Merging with the existing map, for handing the case where there are
// multiple common files in the dataset.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
package no.entur.antu.routes.validation;

import static no.entur.antu.Constants.FILE_HANDLE;
import static no.entur.antu.Constants.NETEX_COMMON_FILE_NAME;
import static no.entur.antu.Constants.NETEX_FILE_NAME;
import static no.entur.antu.Constants.PROP_NETEX_FILE_CONTENT;
import static no.entur.antu.Constants.VALIDATION_REPORT_ID_HEADER;
import static no.entur.antu.routes.memorystore.MemoryStoreRoute.MEMORY_STORE_FILE_NAME;

import no.entur.antu.routes.BaseRouteBuilder;
import org.apache.camel.LoggingLevel;
Expand All @@ -48,17 +47,19 @@ public void configure() throws Exception {
super.configure();

from("direct:storeCommonData")
.filter(header(NETEX_FILE_NAME).startsWith("_"))
.log(
LoggingLevel.INFO,
correlation() + "Parsing NeTEx file ${header." + FILE_HANDLE + "}"
correlation() +
"Extracting common data from NeTEx file ${header." +
NETEX_FILE_NAME +
"}"
)
.setProperty(PROP_STOP_WATCH, StopWatch::new)
.doTry()
.setHeader(MEMORY_STORE_FILE_NAME, header(NETEX_COMMON_FILE_NAME))
.to("direct:downloadSingleNetexFileFromMemoryStore")
.process(exchange ->
netexDataRepository.fillNetexDataCache(
exchange.getIn().getBody(byte[].class),
exchange.getProperty(PROP_NETEX_FILE_CONTENT, byte[].class),
exchange.getIn().getHeader(VALIDATION_REPORT_ID_HEADER, String.class)
)
)
Expand All @@ -77,7 +78,7 @@ public void configure() throws Exception {
.log(
LoggingLevel.INFO,
correlation() +
"Parsed NeTEx file ${header." +
"Extracted common data from NeTEx file ${header." +
NETEX_FILE_NAME +
"} in " +
"${exchangeProperty." +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static no.entur.antu.Constants.FILENAME_DELIMITER;
import static no.entur.antu.Constants.FILE_HANDLE;
import static no.entur.antu.Constants.JOB_TYPE_VALIDATE;
import static no.entur.antu.Constants.NETEX_COMMON_FILE_NAME;
import static no.entur.antu.Constants.NETEX_FILE_NAME;
import static no.entur.antu.Constants.TEMPORARY_FILE_NAME;
import static no.entur.antu.Constants.VALIDATION_DATASET_FILE_HANDLE_HEADER;
Expand Down Expand Up @@ -79,7 +78,6 @@ public void configure() throws Exception {
})
.choice()
.when(header(DATASET_NB_COMMON_FILES).isGreaterThan(0))
.to("direct:parseAndStoreCommonData")
.to("direct:createCommonFilesValidationJobs")
.otherwise()
// skip the common file barrier and go directly to the line file job creation step
Expand Down Expand Up @@ -155,22 +153,6 @@ public void configure() throws Exception {
.setBody(constant(""))
.routeId("upload-single-netex-files");

from("direct:parseAndStoreCommonData")
.log(LoggingLevel.DEBUG, correlation() + "Start parsing common files")
.split(exchangeProperty(PROP_ALL_NETEX_FILE_NAMES))
.filter(body().startsWith("_"))
.setHeader(
Constants.DATASET_NB_NETEX_FILES,
exchangeProperty(Exchange.SPLIT_SIZE)
)
.setHeader(NETEX_COMMON_FILE_NAME, body())
.setHeader(FILE_HANDLE, simple(Constants.GCS_BUCKET_FILE_NAME))
.log(LoggingLevel.TRACE, correlation() + "All NeTEx Files: ${body}")
.to("direct:storeCommonData")
//end split
.end()
.routeId("create-parse-and-store-common-data-jobs");

from("direct:createCommonFilesValidationJobs")
.log(
LoggingLevel.DEBUG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static no.entur.antu.Constants.VALIDATION_REPORT_ID_HEADER;
import static no.entur.antu.routes.memorystore.MemoryStoreRoute.MEMORY_STORE_FILE_NAME;

import no.entur.antu.Constants;
import no.entur.antu.exception.AntuException;
import no.entur.antu.exception.RetryableAntuException;
import no.entur.antu.memorystore.AntuMemoryStoreFileNotFoundException;
Expand All @@ -46,7 +47,6 @@
@Component
public class ValidateFilesRouteBuilder extends BaseRouteBuilder {

private static final String PROP_NETEX_FILE_CONTENT = "NETEX_FILE_CONTENT";
private static final String PROP_ALL_NETEX_FILE_NAMES =
"ALL_NETEX_FILE_NAMES";
private static final String PROP_STOP_WATCH = "PROP_STOP_WATCH";
Expand All @@ -68,7 +68,10 @@ public void configure() throws Exception {
.doTry()
.setHeader(MEMORY_STORE_FILE_NAME, header(NETEX_FILE_NAME))
.to("direct:downloadSingleNetexFileFromMemoryStore")
.setProperty(PROP_NETEX_FILE_CONTENT, body())
.setProperty(Constants.PROP_NETEX_FILE_CONTENT, body())
// TODO we should not parse NeTEx data before the file is validated against the XSD
// and the XPath validators are run.
.to("direct:storeCommonData")
.to("direct:runNetexValidators")
// Duplicated PubSub messages are detected when trying to download the NeTEx file:
// it does not exist anymore after the report is generated and all temporary files are deleted
Expand Down Expand Up @@ -147,7 +150,7 @@ public void configure() throws Exception {
"},${header." +
NETEX_FILE_NAME +
"},${exchangeProperty." +
PROP_NETEX_FILE_CONTENT +
Constants.PROP_NETEX_FILE_CONTENT +
"},${exchangeProperty." +
PROP_NETEX_VALIDATION_CALLBACK +
"})"
Expand Down