Skip to content

Commit

Permalink
meteorology-eurac: add message type to MsgRoute
Browse files Browse the repository at this point in the history
  • Loading branch information
dulvui committed Oct 24, 2023
1 parent 25a5f21 commit cd7e724
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ public class MsgRoute extends RouteBuilder {
public MsgRoute(String from) {
this.from = from;
}
public String getRouteUri(){

public String getRouteUri() {
return "direct:" + from;
}

Expand All @@ -22,32 +22,36 @@ public MsgRoute() {

@Value("${ingress.provider}")
String provider;

@Autowired
RabbitMQConnection rabbitMQConfig;

@Override
public void configure() throws Exception {
from("direct:" + from)
.routeId("to-odh-ingress-route")
.process(exchange -> WrapperProcessor.process(exchange, provider))
// .to("file:bar?doneFileName=done")
.choice()
// forward to fastline
.when(header("fastline").isEqualTo(true))
// we handle the request as invalid and forward the encapsulated payload to
// whatever mechanism we want to use to store malformed data
.to(rabbitMQConfig.getRabbitMQFastlineConnectionString())
.end()
.choice()
// if the payload is not a valid json
.when(header("failed").isEqualTo(true))
// we handle the request as invalid and forward the encapsulated payload to
// whatever mechanism we want to use to store malformed data
.to(rabbitMQConfig.getRabbitMQIngressDeadletterTo())
.otherwise()
// otherwise we forward the encapsulated message to the
// internal queue waiting to be written in rawDataTable
.to(rabbitMQConfig.getRabbitMQIngressTo());
.routeId("to-odh-ingress-route")
.choice()
.when(header("message_type").isNull())
.process(exchange -> WrapperProcessor.process(exchange, provider))
.when(header("message_type").isNotNull())
.process(exchange -> WrapperProcessor.process(exchange, provider + "_" + header("message_type")))
// .to("file:bar?doneFileName=done")
.choice()
// forward to fastline
.when(header("fastline").isEqualTo(true))
// we handle the request as invalid and forward the encapsulated payload to
// whatever mechanism we want to use to store malformed data
.to(rabbitMQConfig.getRabbitMQFastlineConnectionString())
.end()
.choice()
// if the payload is not a valid json
.when(header("failed").isEqualTo(true))
// we handle the request as invalid and forward the encapsulated payload to
// whatever mechanism we want to use to store malformed data
.to(rabbitMQConfig.getRabbitMQIngressDeadletterTo())
.otherwise()
// otherwise we forward the encapsulated message to the
// internal queue waiting to be written in rawDataTable
.to(rabbitMQConfig.getRabbitMQIngressTo());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@ public void configure() {
from("cron:stations?schedule={{env:CRON_STATIONS}}")
.routeId("meteorology.eurac.stations")
.to(stationsUrl)
.removeHeaders("*")
.process(e -> {
log.info("Stations...");
e.getMessage().setHeader("message_type", "stations");
})
.removeHeaders("*")
.to("direct:mq");

// monthly
from("cron:monthly?schedule={{env:CRON_MONTHLY}}")
.routeId("meteorology.eurac.monthly")
.to(monthlyUrl)
.removeHeaders("*")
.process(e -> {
log.info("Monthly...");
e.getMessage().setHeader("message_type", "monthly");
})
.removeHeaders("*")
.to("direct:mq");

// // daily
Expand Down

0 comments on commit cd7e724

Please sign in to comment.