From cd7e72451c6ca82298d3474ebe72ce00e345b8f7 Mon Sep 17 00:00:00 2001 From: Simon Dalvai Date: Tue, 24 Oct 2023 16:38:47 +0200 Subject: [PATCH] meteorology-eurac: add message type to MsgRoute --- .../collector/lib/ingress/mq/MsgRoute.java | 52 ++++++++++--------- .../collector/meteorology/eurac/Route.java | 4 +- 2 files changed, 30 insertions(+), 26 deletions(-) diff --git a/collectors/lib/ingress-mq/src/main/java/com/opendatahub/collector/lib/ingress/mq/MsgRoute.java b/collectors/lib/ingress-mq/src/main/java/com/opendatahub/collector/lib/ingress/mq/MsgRoute.java index 7e78f0c..8de69b2 100644 --- a/collectors/lib/ingress-mq/src/main/java/com/opendatahub/collector/lib/ingress/mq/MsgRoute.java +++ b/collectors/lib/ingress-mq/src/main/java/com/opendatahub/collector/lib/ingress/mq/MsgRoute.java @@ -12,8 +12,8 @@ public class MsgRoute extends RouteBuilder { public MsgRoute(String from) { this.from = from; } - - public String getRouteUri(){ + + public String getRouteUri() { return "direct:" + from; } @@ -22,32 +22,36 @@ public MsgRoute() { @Value("${ingress.provider}") String provider; - + @Autowired RabbitMQConnection rabbitMQConfig; - + @Override public void configure() throws Exception { from("direct:" + from) - .routeId("to-odh-ingress-route") - .process(exchange -> WrapperProcessor.process(exchange, provider)) - // .to("file:bar?doneFileName=done") - .choice() - // forward to fastline - .when(header("fastline").isEqualTo(true)) - // we handle the request as invalid and forward the encapsulated payload to - // whatever mechanism we want to use to store malformed data - .to(rabbitMQConfig.getRabbitMQFastlineConnectionString()) - .end() - .choice() - // if the payload is not a valid json - .when(header("failed").isEqualTo(true)) - // we handle the request as invalid and forward the encapsulated payload to - // whatever mechanism we want to use to store malformed data - .to(rabbitMQConfig.getRabbitMQIngressDeadletterTo()) - .otherwise() - // otherwise we forward the encapsulated message to the - // internal queue waiting to be written in rawDataTable - .to(rabbitMQConfig.getRabbitMQIngressTo()); + .routeId("to-odh-ingress-route") + .choice() + .when(header("message_type").isNull()) + .process(exchange -> WrapperProcessor.process(exchange, provider)) + .when(header("message_type").isNotNull()) + .process(exchange -> WrapperProcessor.process(exchange, provider + "_" + header("message_type"))) + // .to("file:bar?doneFileName=done") + .choice() + // forward to fastline + .when(header("fastline").isEqualTo(true)) + // we handle the request as invalid and forward the encapsulated payload to + // whatever mechanism we want to use to store malformed data + .to(rabbitMQConfig.getRabbitMQFastlineConnectionString()) + .end() + .choice() + // if the payload is not a valid json + .when(header("failed").isEqualTo(true)) + // we handle the request as invalid and forward the encapsulated payload to + // whatever mechanism we want to use to store malformed data + .to(rabbitMQConfig.getRabbitMQIngressDeadletterTo()) + .otherwise() + // otherwise we forward the encapsulated message to the + // internal queue waiting to be written in rawDataTable + .to(rabbitMQConfig.getRabbitMQIngressTo()); } } diff --git a/collectors/meteorology-eurac/src/main/java/com/opendatahub/collector/meteorology/eurac/Route.java b/collectors/meteorology-eurac/src/main/java/com/opendatahub/collector/meteorology/eurac/Route.java index 39ebdd8..1c2c8da 100644 --- a/collectors/meteorology-eurac/src/main/java/com/opendatahub/collector/meteorology/eurac/Route.java +++ b/collectors/meteorology-eurac/src/main/java/com/opendatahub/collector/meteorology/eurac/Route.java @@ -18,22 +18,22 @@ public void configure() { from("cron:stations?schedule={{env:CRON_STATIONS}}") .routeId("meteorology.eurac.stations") .to(stationsUrl) + .removeHeaders("*") .process(e -> { log.info("Stations..."); e.getMessage().setHeader("message_type", "stations"); }) - .removeHeaders("*") .to("direct:mq"); // monthly from("cron:monthly?schedule={{env:CRON_MONTHLY}}") .routeId("meteorology.eurac.monthly") .to(monthlyUrl) + .removeHeaders("*") .process(e -> { log.info("Monthly..."); e.getMessage().setHeader("message_type", "monthly"); }) - .removeHeaders("*") .to("direct:mq"); // // daily