Skip to content

Commit

Permalink
schedule
Browse files Browse the repository at this point in the history
  • Loading branch information
Marc Gorzala committed Dec 27, 2023
1 parent d6a360f commit befbdc9
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 21 deletions.
6 changes: 5 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,16 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-kafka</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-json-jackson</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.cloudevents.core.builder.CloudEventBuilder;
import jakarta.transaction.Transactional;
import lombok.RequiredArgsConstructor;
import net.dancier.dancer.eventlog.ScheduleMessagePort;
import net.dancier.dancer.eventlog.service.EventlogService;
import net.dancier.dancer.messaging.ScheduleMessageAdapter;
import org.slf4j.Logger;
Expand All @@ -31,27 +32,26 @@ public class ApplicationEventListener {

private final EventCreator eventCreator;

private final ScheduleMessageAdapter scheduleMessageAdapter;
private final ScheduleMessagePort scheduleMessagePort;

private final ObjectMapper objectMapper;

@EventListener
@Transactional
public void handle(ProfileUpdatedEvent profileUpdatedEvent) {
log.info("Got a Profile Change");
log.info("Got a Profile Change: {}", profileUpdatedEvent);
eventlogService.appendNew(
eventCreator.createEventlog(
"profile-updated",
profileUpdatedEvent.getDancer()));
try {
CloudEvent cloudEvent = CloudEventBuilder
.v1()
.withId(UUID.randomUUID().toString())
.withSource(BACKEND_SOURCE)
.withType("profile-updated")
.withTime(OffsetDateTime.now())
.withData(objectMapper.writeValueAsBytes(profileUpdatedEvent)).build();
scheduleMessageAdapter.schedule(cloudEvent, profileUpdatedEvent.getDancer().getId().toString());
String tmp = objectMapper.writeValueAsString(profileUpdatedEvent);
System.out.println(tmp);
scheduleMessagePort.schedule(
profileUpdatedEvent,
profileUpdatedEvent.getDancer().getId().toString(),
BACKEND_SOURCE,
"profile-updated");
} catch (JsonProcessingException jpe) {
log.error("Unable to generate Cloud-Event for: " + profileUpdatedEvent, jpe);
throw new ApplicationContextException("Unable to create Json", jpe);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
package net.dancier.dancer.eventlog;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.cloudevents.CloudEvent;

import java.net.URI;

public interface ScheduleMessagePort {
void schedule(CloudEvent cloudEvent, String key);
void schedule(Object object,
String key,
URI source,
String type) throws JsonProcessingException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import net.dancier.dancer.core.exception.ApplicationException;
import net.dancier.dancer.eventlog.model.Eventlog;
import net.dancier.dancer.eventlog.repository.EventlogDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.sql.SQLException;
Expand All @@ -19,15 +21,24 @@
@RequiredArgsConstructor
public class EventlogService {

private final Logger log = LoggerFactory.getLogger(EventlogService.class);

private final EventlogDAO eventlogDAO;

private final static Set<String> DEFAULT_AUTHENTICATED = Set.of();
private final static Set<String> DEFAULT_AUTHENTICATED = Set.of("ROLE_USER", "ROLE_ADMIN");
private final static Set<String> AT_LEAST_HUMAN = Set.of("ROLE_HUMAN", "ROLE_USER", "ROLE_ADMIN");
private final static Set<String> NO_SPECIAL_ROLE_NEEDED = Set.of();
private final static Set<EventlogConfig> allowedEvents = Set.of(
EventlogConfig.of("profile-updated", DEFAULT_AUTHENTICATED)
EventlogConfig.of("app_instance_id_created", NO_SPECIAL_ROLE_NEEDED),
EventlogConfig.of("navigated_to_page", NO_SPECIAL_ROLE_NEEDED),
EventlogConfig.of("human_session_created", AT_LEAST_HUMAN),
EventlogConfig.of("contact_message_sent", AT_LEAST_HUMAN),
EventlogConfig.of("profile_updated", DEFAULT_AUTHENTICATED) // will not go over the eventlog stuff in the future...
);

public void appendNew(Eventlog eventlog) {

validateTopic(eventlog);
authorize(eventlog);
try {
eventlog.setId(UUID.randomUUID());
eventlog.setCreated(Instant.now());
Expand All @@ -38,10 +49,12 @@ public void appendNew(Eventlog eventlog) {
}

private void validateTopic(Eventlog eventlog) {

String topic = eventlog.getTopic();
log.info("Validating Topic: {}", eventlog.getTopic());
}
private void authorize(Eventlog eventlog) {
String topic = eventlog.getTopic();
log.info("Authorizing eventlog request: {}", topic);
}

@AllArgsConstructor(access = AccessLevel.PRIVATE)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,26 @@
package net.dancier.dancer.messaging;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.data.PojoCloudEventData;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.jackson.PojoCloudEventDataMapper;
import lombok.RequiredArgsConstructor;
import net.dancier.dancer.eventlog.ScheduleMessagePort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.net.URI;
import java.time.OffsetDateTime;
import java.util.UUID;

import static io.cloudevents.core.CloudEventUtils.mapData;

@RequiredArgsConstructor
@Component
public class ScheduleMessageAdapter implements ScheduleMessagePort {
Expand All @@ -21,14 +32,19 @@ public class ScheduleMessageAdapter implements ScheduleMessagePort {
private final ObjectMapper objectMapper;

@Override
public void schedule(CloudEvent cloudEvent, String key) {
log.info("sending object: " + cloudEvent);
public void schedule(Object object,
String key,
URI source,
String type) throws JsonProcessingException {
log.info("sending object: " + object);
log.info("with key:" + key);

OutboxJpaEntity outboxJpaEntity = new OutboxJpaEntity();
outboxJpaEntity.setData(objectMapper.convertValue(cloudEvent, JsonNode.class));
outboxJpaEntity.setType(cloudEvent.getType());
outboxJpaEntity.setData(objectMapper.convertValue(object, JsonNode.class));
outboxJpaEntity.setType(type);
outboxJpaEntity.setSource(source.toString());
outboxJpaEntity.setKey(key);
outboxJpaEntity.setCreatedAt(cloudEvent.getTime());
outboxJpaEntity.setCreatedAt(OffsetDateTime.now());
outboxJpaEntity.setStatus(OutboxJpaEntity.STATUS.NEW);
outboxJpaRepository.save(outboxJpaEntity);
}
Expand Down

0 comments on commit befbdc9

Please sign in to comment.