Skip to content

Commit

Permalink
feat: publish sfera events (#216) (#464)
Browse files Browse the repository at this point in the history
  • Loading branch information
mghilardelli authored Dec 16, 2024
1 parent 25d357d commit b72d505
Show file tree
Hide file tree
Showing 15 changed files with 412 additions and 44 deletions.
3 changes: 2 additions & 1 deletion sfera-mock/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ see journeys [src/main/resources/static_sfera_resources](src/main/resources/stat
To create a new scenario some resources need to be added
1. add a new directory named `<train number>_<optional comment>` in `static_sfera_resources`
2. add a journey profile named `SFERA_JP_<train number>` to the directory
2. add corresponding segment profiles named `SFERA_SP_<train number>_<sp id>` to the directory
3. add corresponding segment profiles named `SFERA_SP_<train number>_<sp id>` to the directory
4. for events add `SFERA_Event_<train number>_<time after registration in ms>`

Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package ch.sbb.sferamock.messages.services;

import ch.sbb.sferamock.adapters.sfera.model.v0201.G2BEventPayload;
import ch.sbb.sferamock.messages.common.XmlHelper;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.stereotype.Service;

@Service
public class EventRepository implements ApplicationRunner {

private static final String XML_RESOURCES_CLASSPATH = "classpath:static_sfera_resources/*/SFERA_Event_*.xml";
private static final String XML_REGEX = "/([a-zA-Z0-9]+)_\\w+/SFERA_Event_([a-zA-Z0-9]+)_\\d+\\.xml";
private static final String OFFSET_XML_REGEX = "SFERA_Event_[a-zA-Z0-9]+_(\\d+)\\.xml";
private final XmlHelper xmlHelper;

Map<String, List<Event>> events = new HashMap<>();

public EventRepository(XmlHelper xmlHelper) {
this.xmlHelper = xmlHelper;
}

@Override
public void run(ApplicationArguments args) throws Exception {
importEvents();
}

private void importEvents() throws IOException {
PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
var resources = resolver.getResources(XML_RESOURCES_CLASSPATH);
for (var resource : resources) {
File file = resource.getFile();
var operationalNumber = extractOperationalNumber(file.getPath());
var offsetMs = extractOffsetMs(file.getName());
try (InputStream in = new FileInputStream(file)) {
String xmlPayload = new String(in.readAllBytes());
var eventPayload = xmlHelper.xmlToObject(xmlPayload);

List<Event> trainEvents = events.get(operationalNumber);
if (trainEvents == null) {
trainEvents = new ArrayList<>();
}
trainEvents.add(new Event(offsetMs, (G2BEventPayload) eventPayload));
events.put(operationalNumber, trainEvents);
}
}
}

private static String extractOperationalNumber(String filename) {
Pattern pattern = Pattern.compile(XML_REGEX);
Matcher matcher = pattern.matcher(filename);
if (matcher.find()) {
String directoryOperationalNumber = matcher.group(1);
String fileOperationalNumber = matcher.group(2);
if (directoryOperationalNumber != null && directoryOperationalNumber.equals(fileOperationalNumber)) {
return directoryOperationalNumber;
}
}
throw new RuntimeException("Operational number extraction in Event repository failed for file: " + filename);
}

private static int extractOffsetMs(String filename) {
Pattern pattern = Pattern.compile(OFFSET_XML_REGEX);
Matcher matcher = pattern.matcher(filename);
if (matcher.find()) {
return Integer.parseInt(matcher.group(1));
}
throw new RuntimeException("Offset extraction in Event repository failed for file: " + filename);
}

public record Event(int offsetMs, G2BEventPayload payload) {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package ch.sbb.sferamock.messages.services;

import ch.sbb.sferamock.adapters.sfera.model.v0201.G2BEventPayload;
import ch.sbb.sferamock.messages.model.ClientId;
import ch.sbb.sferamock.messages.model.RequestContext;
import ch.sbb.sferamock.messages.services.EventRepository.Event;
import ch.sbb.sferamock.messages.sfera.EventPublisher;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Service;

@Service
public class EventService {

private final EventRepository eventRepository;
private final ThreadPoolTaskScheduler taskScheduler;
private final EventPublisher eventPublisher;
private final Map<ClientId, List<ScheduledFuture<?>>> scheduledTasks = new ConcurrentHashMap<>();

public EventService(EventRepository eventRepository, EventPublisher eventPublisher) {
this.eventRepository = eventRepository;
this.eventPublisher = eventPublisher;
this.taskScheduler = new ThreadPoolTaskScheduler();
this.taskScheduler.initialize();
}

public void registerActiveTrain(RequestContext requestContext) {
if (hasActiveFutures(requestContext.clientId())) {
return;
}
List<Event> events = this.eventRepository.events.get(requestContext.tid().operationalNumber());
if (events == null) {
return;
}
List<ScheduledFuture<?>> futures = new ArrayList<>();
for (Event event : events) {
ScheduledFuture<?> future = taskScheduler.schedule(() -> processEvent(event.payload(), requestContext), Instant.now().plusMillis(event.offsetMs()));
futures.add(future);
}
scheduledTasks.put(requestContext.clientId(), futures);
}

public void deregisterActiveTrain(ClientId clientId) {
List<ScheduledFuture<?>> futures = scheduledTasks.remove(clientId);
if (futures != null) {
for (ScheduledFuture<?> future : futures) {
future.cancel(false);
}
}
}

private void processEvent(G2BEventPayload eventPayload, RequestContext requestContext) {
if (eventPayload.getRelatedTrainInformation() != null) {
eventPublisher.publishRelatedTrainInformation(eventPayload, requestContext);
} else if (eventPayload.getJourneyProfile() != null) {
eventPublisher.publishJourneyProfile(eventPayload, requestContext);
} else {
eventPublisher.publishEventPayload(eventPayload, requestContext);
}
}

private boolean hasActiveFutures(ClientId clientId) {
List<ScheduledFuture<?>> scheduledFutures = scheduledTasks.get(clientId);
return scheduledFutures != null && scheduledFutures.stream().anyMatch(scheduledFuture -> !scheduledFuture.isDone() && !scheduledFuture.isCancelled());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ public class RegistrationService {
private final Map<TrainIdentification, Set<ClientId>> activeTrains = new ConcurrentHashMap<>();
private final Map<ClientId, Registration> registrationMap = new ConcurrentHashMap<>();

private final EventService eventService;

public RegistrationService(EventService eventService) {
this.eventService = eventService;
}

private static Set<ClientId> existingSetWith(Set<ClientId> clientIdentifiers, ClientId newClientId) {
clientIdentifiers.add(newClientId);
return clientIdentifiers;
Expand Down Expand Up @@ -46,10 +52,12 @@ private void registerClientIdAndTrain(RequestContext requestContext, OperationMo
? newSetWith(clientId)
: existingSetWith(clientIdentifiers, clientId));
}
eventService.registerActiveTrain(requestContext);
}

public void deregisterClient(ClientId clientId) {
deregisterTrainIfLastClient(clientId);
eventService.deregisterActiveTrain(clientId);
registrationMap.remove(clientId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ public void processSegmentProfileRequest(List<SegmentIdentification> segmentIden
publishSegmentProfile(segmentProfiles, correlationId, requestContext);
}

public void processSessionTermination(RequestContext requestContext) {
if (!registrationService.isRegistered(requestContext.clientId())) {
publishErrorMessageUnregisteredClient(requestContext);
return;
}
registrationService.deregisterClient(requestContext.clientId());
publishOk(requestContext);
}

public void processTrainCharacteristicsRequest(List<TrainCharacteristicsIdentification> trainCharacteristicsIdentifications, RequestContext requestContext) {
if (!registrationService.isRegistered(requestContext.clientId())) {
publishErrorMessageUnregisteredClient(requestContext);
Expand Down Expand Up @@ -157,6 +166,10 @@ private void publishTrainCharacteristicsResponse(List<TrainCharacteristics> trai
}
}

private void publishOk(RequestContext requestContext) {
replyPublisher.publishOkMessage(requestContext);
}

private void publishErrorMessageUnregisteredClient(RequestContext requestContext) {
log.warn("Received a request from an unregistered client id {}", requestContext.clientId());
replyPublisher.publishErrorMessage(SferaErrorCodes.COULD_NOT_PROCESS_DATA, requestContext);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package ch.sbb.sferamock.messages.sfera;

import static org.springframework.util.MimeTypeUtils.APPLICATION_XML;

import ch.sbb.sferamock.adapters.sfera.model.v0201.G2BEventPayload;
import ch.sbb.sferamock.adapters.sfera.model.v0201.SFERAG2BEventMessage;
import ch.sbb.sferamock.messages.common.XmlHelper;
import ch.sbb.sferamock.messages.model.RequestContext;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class EventPublisher {

private static final Logger log = LoggerFactory.getLogger(EventPublisher.class);
private static final String SOLACE_BINDER = "solace";
private final XmlHelper xmlHelper;
private final StreamBridge streamBridge;
private final SferaMessageCreator sferaMessageCreator;
@Value("${spring.cloud.stream.bindings.publishEvent-out-0.destination}")
private String publishDestination;

public EventPublisher(XmlHelper xmlHelper, StreamBridge streamBridge, SferaMessageCreator sferaMessageCreator) {
this.xmlHelper = xmlHelper;
this.streamBridge = streamBridge;
this.sferaMessageCreator = sferaMessageCreator;
}

public void publishRelatedTrainInformation(G2BEventPayload eventPayload, RequestContext requestContext) {
var header = sferaMessageCreator.createMessageHeader(UUID.randomUUID(), requestContext.tid());
var event = sferaMessageCreator.createRelatedTrainInformation(eventPayload, header);
publishEvent(event, requestContext);
}

public void publishJourneyProfile(G2BEventPayload eventPayload, RequestContext requestContext) {
var header = sferaMessageCreator.createMessageHeader(UUID.randomUUID(), requestContext.tid());
var event = sferaMessageCreator.createJourneyProfileEventMessage(eventPayload, header);
publishEvent(event, requestContext);
}

public void publishEventPayload(G2BEventPayload eventPayload, RequestContext requestContext) {
var header = sferaMessageCreator.createMessageHeader(UUID.randomUUID(), requestContext.tid());
var event = sferaMessageCreator.createEventMessage(eventPayload, header);
publishEvent(event, requestContext);
}

private void publishEvent(SFERAG2BEventMessage eventMessage, RequestContext requestContext) {
String topic = SferaTopicHelper.getG2BEventTopic(publishDestination, requestContext);
log.info("Publishing Event Message: {} to topic {}", xmlHelper.toString(eventMessage), topic);
streamBridge.send(topic, SOLACE_BINDER, MessageBuilder
.withPayload(eventMessage)
.build(),
APPLICATION_XML);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,48 +64,56 @@ public void processIncomingMessage(Message<byte[]> message) {
return;
}
switch (payload) {
case SFERAB2GRequestMessage request -> {
var requestContext = RequestContext.fromTopic(topic, Optional.of(UUID.fromString(request.getMessageHeader().getMessageID())));
log.info("Received SFERAB2GRequestMessage {} on topic {}", xmlHelper.toString(request), topic);
var validationMessage = messageHeaderValidator.validate(request.getMessageHeader(), requestContext.tid().companyCode().value());
case SFERAB2GRequestMessage request -> processB2GRequest(request, topic);
case SFERAB2GEventMessage event -> {
var requestContext = RequestContext.fromTopic(topic, Optional.of(UUID.fromString(event.getMessageHeader().getMessageID())));
log.info("Received SFERAB2GEventMessage: {} on topic {}", xmlHelper.toString(event), topic);
var validationMessage = messageHeaderValidator.validate(event.getMessageHeader(), requestContext.tid().companyCode().value());
if (validationMessage.isPresent()) {
log.warn("Reject Message with error in message header");
replyPublisher.publishErrorMessage(validationMessage.get(), requestContext);
return;
}
if (request.getHandshakeRequest() != null) {
var handshakeRequest = request.getHandshakeRequest();
sferaApplicationService.processHandshakeRequest(
SferaToInternalConverters.convertOperationModes(handshakeRequest.getDASOperatingModesSupported()),
nullSafeBoolean(handshakeRequest.isStatusReportsEnabled()),
requestContext);
} else {
B2GRequest b2GRequest = request.getB2GRequest();
if (b2GRequest != null && b2GRequest.getJPRequest() != null && b2GRequest.getJPRequest().size() == 1) {
processJourneyProfileRequest(request.getB2GRequest().getJPRequest().get(0), requestContext);
return;
}
if (b2GRequest != null && b2GRequest.getSPRequest() != null && !b2GRequest.getSPRequest().isEmpty()) {
processSegmentProfileRequest(b2GRequest.getSPRequest(), requestContext);
return;
}
if (b2GRequest != null && b2GRequest.getTCRequest() != null && !b2GRequest.getTCRequest().isEmpty()) {
processTrainCharacteristicsRequest(b2GRequest.getTCRequest(), requestContext);
return;
}

log.warn("A B2G Request that is not a handshake should currently have exactly one jp or sp request. Request is ignored.");
if (event.getSessionTermination() != null) {
sferaApplicationService.processSessionTermination(requestContext);
}

}
case SFERAB2GEventMessage event -> {
log.info("Received SFERAB2GEventMessage: {} on topic {}",
xmlHelper.toString(event), topic);
case SFERAB2GReplyMessage reply -> log.info("Received SFERAB2GReplyMessage: {} on topic {}", xmlHelper.toString(reply), topic);
default -> log.error("Unknown xml message type received: {} xml string \"{}\"", payload.getClass(), xmlString);
}
}

private void processB2GRequest(SFERAB2GRequestMessage request, String topic) {
var requestContext = RequestContext.fromTopic(topic, Optional.of(UUID.fromString(request.getMessageHeader().getMessageID())));
log.info("Received SFERAB2GRequestMessage {} on topic {}", xmlHelper.toString(request), topic);
var validationMessage = messageHeaderValidator.validate(request.getMessageHeader(), requestContext.tid().companyCode().value());
if (validationMessage.isPresent()) {
log.warn("Reject Message with error in message header");
replyPublisher.publishErrorMessage(validationMessage.get(), requestContext);
return;
}
if (request.getHandshakeRequest() != null) {
var handshakeRequest = request.getHandshakeRequest();
sferaApplicationService.processHandshakeRequest(
SferaToInternalConverters.convertOperationModes(handshakeRequest.getDASOperatingModesSupported()),
nullSafeBoolean(handshakeRequest.isStatusReportsEnabled()),
requestContext);
} else {
B2GRequest b2GRequest = request.getB2GRequest();
if (b2GRequest != null && b2GRequest.getJPRequest() != null && b2GRequest.getJPRequest().size() == 1) {
processJourneyProfileRequest(request.getB2GRequest().getJPRequest().get(0), requestContext);
return;
}
case SFERAB2GReplyMessage reply -> {
log.info("Received SFERAB2GReplyMessage: {} on topic {}",
xmlHelper.toString(reply), topic);
if (b2GRequest != null && b2GRequest.getSPRequest() != null && !b2GRequest.getSPRequest().isEmpty()) {
processSegmentProfileRequest(b2GRequest.getSPRequest(), requestContext);
return;
}
default -> log.error("Unknown xml message type received: {} xml string \"{}\"", payload.getClass(), xmlString);
if (b2GRequest != null && b2GRequest.getTCRequest() != null && !b2GRequest.getTCRequest().isEmpty()) {
processTrainCharacteristicsRequest(b2GRequest.getTCRequest(), requestContext);
return;
}
log.warn("A B2G Request that is not a handshake should currently have exactly one jp or sp request. Request is ignored.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ public void publishHandshakeReject(HandshakeRejectReason rejectReason, RequestCo
publishReplyMessage(reply, requestContext);
}

public void publishOkMessage(RequestContext requestContext) {
var header = sferaMessageCreator.createMessageHeader(UUID.randomUUID(), requestContext.tid(), requestContext.incomingMessageId());
var reply = sferaMessageCreator.createOkMessage(header);
publishReplyMessage(reply, requestContext);
}

public void publishErrorMessage(SferaErrorCodes code, RequestContext requestContext) {
var replyMessageHeader = sferaMessageCreator.createOutgoingMessageHeader(UUID.randomUUID(),
requestContext.incomingMessageId(),
Expand Down
Loading

0 comments on commit b72d505

Please sign in to comment.