diff --git a/src/main/java/net/dancier/chatdancer/adapter/in/web/controller/ChatMessageController.java b/src/main/java/net/dancier/chatdancer/adapter/in/web/controller/ChatMessageController.java index 3df6c17..4321682 100644 --- a/src/main/java/net/dancier/chatdancer/adapter/in/web/controller/ChatMessageController.java +++ b/src/main/java/net/dancier/chatdancer/adapter/in/web/controller/ChatMessageController.java @@ -43,7 +43,7 @@ public ResponseEntity postChatMessage(@PathVariable UUID chatId, @Validated @Req postChatMessageRequestDto.getText(), new Message.AuthorId(postChatMessageRequestDto.getAuthorId()), new Chat.ChatId((chatId))); - createChatMessageUseCase.post(createChatMessageCommand); + createChatMessageUseCase.create(createChatMessageCommand); return ResponseEntity.status(HttpStatus.CREATED).build(); } } diff --git a/src/main/java/net/dancier/chatdancer/adapter/out/messaging/OutboxJpaEntity.java b/src/main/java/net/dancier/chatdancer/adapter/out/messaging/OutboxJpaEntity.java index cdde21f..f9bac87 100644 --- a/src/main/java/net/dancier/chatdancer/adapter/out/messaging/OutboxJpaEntity.java +++ b/src/main/java/net/dancier/chatdancer/adapter/out/messaging/OutboxJpaEntity.java @@ -19,15 +19,14 @@ public class OutboxJpaEntity { @GeneratedValue private UUID id; - private String topic; + private String type; - @JdbcTypeCode(SqlTypes.JSON) - private String metaData; + private String source; private String key; @JdbcTypeCode(SqlTypes.JSON) - private String payload; + private String data; private OffsetDateTime createdAt; diff --git a/src/main/java/net/dancier/chatdancer/adapter/out/messaging/ScheduleMessagesAdapter.java b/src/main/java/net/dancier/chatdancer/adapter/out/messaging/ScheduleMessagesAdapter.java index 6c1cb14..8348654 100644 --- a/src/main/java/net/dancier/chatdancer/adapter/out/messaging/ScheduleMessagesAdapter.java +++ b/src/main/java/net/dancier/chatdancer/adapter/out/messaging/ScheduleMessagesAdapter.java @@ -15,6 +15,7 @@ @Component public class ScheduleMessagesAdapter implements SendChatCreatedEventPort { private static final Logger log = LoggerFactory.getLogger(ScheduleMessagesAdapter.class); + private static final String CHAT_CREATED_SOURCE = "http://chat-dancer.dancier.net/chat-created"; private final OutboxJpaRepository outboxJpaRepository; @@ -22,17 +23,18 @@ public class ScheduleMessagesAdapter implements SendChatCreatedEventPort { @Override public void send(SendChatCreatedEventDto sendChatCreatedEventDto) throws JsonProcessingException { - log.info("scheduling the sending of the message in database: " + sendChatCreatedEventDto); - String payload = objectMapper.writeValueAsString(sendChatCreatedEventDto); + log.info("Scheduling Business Event for: " + sendChatCreatedEventDto); + String data = objectMapper.writeValueAsString(sendChatCreatedEventDto); OutboxJpaEntity outboxJpaEntity = new OutboxJpaEntity(); - outboxJpaEntity.setMetaData(payload); if (sendChatCreatedEventDto.getChatId() != null) { outboxJpaEntity.setKey(sendChatCreatedEventDto.getChatId().toString()); } - outboxJpaEntity.setPayload(payload); - outboxJpaEntity.setTopic("sccd"); + outboxJpaEntity.setData(data); + outboxJpaEntity.setType(TopicNames.CHAT_CREATED); outboxJpaEntity.setCreatedAt(OffsetDateTime.now()); outboxJpaEntity.setStatus(OutboxJpaEntity.STATUS.NEW); + outboxJpaEntity.setKey(sendChatCreatedEventDto.getChatId().toString()); + outboxJpaEntity.setSource(CHAT_CREATED_SOURCE); outboxJpaRepository.save(outboxJpaEntity); } diff --git a/src/main/java/net/dancier/chatdancer/adapter/out/messaging/SendOutboxService.java b/src/main/java/net/dancier/chatdancer/adapter/out/messaging/SendOutboxService.java index e22a165..80f2c41 100644 --- a/src/main/java/net/dancier/chatdancer/adapter/out/messaging/SendOutboxService.java +++ b/src/main/java/net/dancier/chatdancer/adapter/out/messaging/SendOutboxService.java @@ -12,7 +12,6 @@ import java.io.UnsupportedEncodingException; import java.net.URI; -import java.nio.charset.StandardCharsets; import java.util.UUID; @RequiredArgsConstructor @@ -24,15 +23,14 @@ public class SendOutboxService { private final ObjectMapper objectMapper; - public void send(OutboxJpaEntity entity) throws UnsupportedEncodingException, JsonProcessingException { + public void send(OutboxJpaEntity entity) throws JsonProcessingException { CloudEvent cloudEvent = CloudEventBuilder.v1() .withId(UUID.randomUUID().toString()) - .withSource(URI.create("https://chat-dancer.dancier.net")) - .withType("bla") - .withData(objectMapper.writeValueAsBytes(entity.getPayload())) - .withExtension("foo", "bar").build(); - log.info("This is the cloud event: " + cloudEvent); - kafkaTemplate.send(entity.getTopic(),entity.getKey(), cloudEvent); + .withSource(URI.create(entity.getSource())) + .withType(entity.getType()) + .withData(objectMapper.writeValueAsBytes(entity.getData())) + .build(); + kafkaTemplate.send(entity.getType(),entity.getKey(), cloudEvent); } } diff --git a/src/main/java/net/dancier/chatdancer/adapter/out/persistence/ChatPersistenceAdapter.java b/src/main/java/net/dancier/chatdancer/adapter/out/persistence/ChatPersistenceAdapter.java index 643aa72..e83b01b 100644 --- a/src/main/java/net/dancier/chatdancer/adapter/out/persistence/ChatPersistenceAdapter.java +++ b/src/main/java/net/dancier/chatdancer/adapter/out/persistence/ChatPersistenceAdapter.java @@ -4,7 +4,7 @@ import net.dancier.chatdancer.application.domain.model.Chat; import net.dancier.chatdancer.application.port.in.ChatsByParticipantQuery; import net.dancier.chatdancer.application.port.out.ChatsByParticipantPort; -import net.dancier.chatdancer.application.port.out.LoadChatPort; +import net.dancier.chatdancer.application.port.out.GetChatPort; import net.dancier.chatdancer.application.port.out.UpdateChatPort; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,7 +15,7 @@ @Component @AllArgsConstructor -public class ChatPersistenceAdapter implements LoadChatPort, UpdateChatPort, ChatsByParticipantPort { +public class ChatPersistenceAdapter implements GetChatPort, UpdateChatPort, ChatsByParticipantPort { public static final Logger log = LoggerFactory.getLogger(ChatPersistenceAdapter.class); @@ -24,7 +24,7 @@ public class ChatPersistenceAdapter implements LoadChatPort, UpdateChatPort, Cha private final ChatMapper chatMapper; @Override - public Chat loadChat(Chat.ChatId chatId) { + public Chat get(Chat.ChatId chatId) { ChatJpaEntity jpaChatEntity = chatJpaRepository.findById(chatId.getId()).orElseThrow(); log.info("got: " + jpaChatEntity); return chatMapper.fromJpaChatEntityToChat(jpaChatEntity); diff --git a/src/main/java/net/dancier/chatdancer/application/domain/service/CreateChatMessageService.java b/src/main/java/net/dancier/chatdancer/application/domain/service/CreateChatMessageService.java index 99046b8..220d5f7 100644 --- a/src/main/java/net/dancier/chatdancer/application/domain/service/CreateChatMessageService.java +++ b/src/main/java/net/dancier/chatdancer/application/domain/service/CreateChatMessageService.java @@ -5,7 +5,7 @@ import net.dancier.chatdancer.application.domain.model.Message; import net.dancier.chatdancer.application.port.in.CreateChatMessageCommand; import net.dancier.chatdancer.application.port.in.CreateChatMessageUseCase; -import net.dancier.chatdancer.application.port.out.LoadChatPort; +import net.dancier.chatdancer.application.port.out.GetChatPort; import net.dancier.chatdancer.application.port.out.UpdateChatPort; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,15 +17,15 @@ public class CreateChatMessageService implements CreateChatMessageUseCase { Logger log = LoggerFactory.getLogger(CreateChatMessageService.class); - private final LoadChatPort loadChatPort; + private final GetChatPort getChatPort; private final UpdateChatPort updateChatPort; @Override - public void post(CreateChatMessageCommand command) { + public void create(CreateChatMessageCommand command) { log.info("Posting: " + command); - Chat chat = loadChatPort.loadChat(command.chatId()); + Chat chat = getChatPort.get(command.chatId()); Message message = Message.withoutId(command.text(),command.authorId()); chat.addMessage(message); updateChatPort.updateChat(chat); diff --git a/src/main/java/net/dancier/chatdancer/application/domain/service/CreateChatService.java b/src/main/java/net/dancier/chatdancer/application/domain/service/CreateChatService.java index 1fd66da..e2e0200 100644 --- a/src/main/java/net/dancier/chatdancer/application/domain/service/CreateChatService.java +++ b/src/main/java/net/dancier/chatdancer/application/domain/service/CreateChatService.java @@ -4,6 +4,7 @@ import jakarta.transaction.Transactional; import lombok.RequiredArgsConstructor; import net.dancier.chatdancer.application.domain.model.Chat; +import net.dancier.chatdancer.application.exception.ApplicationException; import net.dancier.chatdancer.application.port.in.CreateChatCommand; import net.dancier.chatdancer.application.port.in.CreateChatUseCase; import net.dancier.chatdancer.application.port.out.SendChatCreatedEventDto; @@ -28,19 +29,22 @@ public class CreateChatService implements CreateChatUseCase { @Override @Transactional public Chat.ChatId createChat(CreateChatCommand createChatCommand) { - System.out.println("Creating Chat"); Chat chat = Chat.withoutId(LocalDateTime.now()); - createChatCommand.participants().forEach(p -> chat.addParticipant(p)); + createChatCommand.participants() + .forEach( + participantId -> chat.addParticipant(participantId) + ); Chat.ChatId chatId = updateChatPort.updateChat(chat); SendChatCreatedEventDto sendChatCreatedEventDto = new SendChatCreatedEventDto(); - + sendChatCreatedEventDto.setChatId(chatId.getId()); sendChatCreatedEventDto.setParticipantIds(createChatCommand.participants()); try { sendChatCreatedEventPort.send(sendChatCreatedEventDto); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); + } catch (JsonProcessingException jpe) { + log.error("Permanent Error: " + jpe); + throw new ApplicationException("Unable to serialize..", jpe); } return chatId; } diff --git a/src/main/java/net/dancier/chatdancer/application/domain/service/GetChatService.java b/src/main/java/net/dancier/chatdancer/application/domain/service/GetChatService.java index e17e4f9..bccd27c 100644 --- a/src/main/java/net/dancier/chatdancer/application/domain/service/GetChatService.java +++ b/src/main/java/net/dancier/chatdancer/application/domain/service/GetChatService.java @@ -3,17 +3,17 @@ import lombok.RequiredArgsConstructor; import net.dancier.chatdancer.application.domain.model.Chat; import net.dancier.chatdancer.application.port.in.GetChatUseCase; -import net.dancier.chatdancer.application.port.out.LoadChatPort; +import net.dancier.chatdancer.application.port.out.GetChatPort; import org.springframework.stereotype.Component; @RequiredArgsConstructor @Component public class GetChatService implements GetChatUseCase { - private final LoadChatPort loadChatPort; + private final GetChatPort getChatPort; @Override public Chat get(Chat.ChatId id) { - return loadChatPort.loadChat(id); + return getChatPort.get(id); } } diff --git a/src/main/java/net/dancier/chatdancer/application/domain/service/GetMessagesByChatService.java b/src/main/java/net/dancier/chatdancer/application/domain/service/GetMessagesByChatService.java index 66e5672..9c66927 100644 --- a/src/main/java/net/dancier/chatdancer/application/domain/service/GetMessagesByChatService.java +++ b/src/main/java/net/dancier/chatdancer/application/domain/service/GetMessagesByChatService.java @@ -4,7 +4,7 @@ import net.dancier.chatdancer.application.domain.model.Chat; import net.dancier.chatdancer.application.domain.model.Message; import net.dancier.chatdancer.application.port.in.GetMessagesByChatUseCase; -import net.dancier.chatdancer.application.port.out.LoadChatPort; +import net.dancier.chatdancer.application.port.out.GetChatPort; import org.springframework.stereotype.Component; import java.util.List; @@ -13,10 +13,10 @@ @Component public class GetMessagesByChatService implements GetMessagesByChatUseCase { - private final LoadChatPort loadChatPort; + private final GetChatPort getChatPort; @Override public List byChatId(Chat.ChatId chatId) { - return loadChatPort.loadChat(chatId).getMessages(); + return getChatPort.get(chatId).getMessages(); } } diff --git a/src/main/java/net/dancier/chatdancer/application/exception/ApplicationException.java b/src/main/java/net/dancier/chatdancer/application/exception/ApplicationException.java new file mode 100644 index 0000000..40739fd --- /dev/null +++ b/src/main/java/net/dancier/chatdancer/application/exception/ApplicationException.java @@ -0,0 +1,15 @@ +package net.dancier.chatdancer.application.exception; + +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.ResponseStatus; + +@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR) +public class ApplicationException extends RuntimeException{ + public ApplicationException(String message) { + super(message); + } + + public ApplicationException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/net/dancier/chatdancer/application/port/in/CreateChatMessageUseCase.java b/src/main/java/net/dancier/chatdancer/application/port/in/CreateChatMessageUseCase.java index 08b0e0e..bf2f61a 100644 --- a/src/main/java/net/dancier/chatdancer/application/port/in/CreateChatMessageUseCase.java +++ b/src/main/java/net/dancier/chatdancer/application/port/in/CreateChatMessageUseCase.java @@ -2,6 +2,6 @@ public interface CreateChatMessageUseCase { - void post(CreateChatMessageCommand command); + void create(CreateChatMessageCommand command); } diff --git a/src/main/java/net/dancier/chatdancer/application/port/out/LoadChatPort.java b/src/main/java/net/dancier/chatdancer/application/port/out/GetChatPort.java similarity index 62% rename from src/main/java/net/dancier/chatdancer/application/port/out/LoadChatPort.java rename to src/main/java/net/dancier/chatdancer/application/port/out/GetChatPort.java index 94be43f..7309b05 100644 --- a/src/main/java/net/dancier/chatdancer/application/port/out/LoadChatPort.java +++ b/src/main/java/net/dancier/chatdancer/application/port/out/GetChatPort.java @@ -2,8 +2,8 @@ import net.dancier.chatdancer.application.domain.model.Chat; -public interface LoadChatPort { +public interface GetChatPort { - Chat loadChat(Chat.ChatId chatId); + Chat get(Chat.ChatId chatId); } diff --git a/src/main/java/net/dancier/chatdancer/application/port/out/PostChatMessagePort.java b/src/main/java/net/dancier/chatdancer/application/port/out/PostChatMessagePort.java deleted file mode 100644 index 29230c4..0000000 --- a/src/main/java/net/dancier/chatdancer/application/port/out/PostChatMessagePort.java +++ /dev/null @@ -1,9 +0,0 @@ -package net.dancier.chatdancer.application.port.out; - -import net.dancier.chatdancer.application.domain.model.Message; - -public interface PostChatMessagePort { - - Message.MessageId postChatMessage(Message message); - -} diff --git a/src/main/resources/liquibase-changeLog.xml b/src/main/resources/liquibase-changeLog.xml index b1db489..0961180 100644 --- a/src/main/resources/liquibase-changeLog.xml +++ b/src/main/resources/liquibase-changeLog.xml @@ -57,4 +57,24 @@ ALTER COLUMN author_id TYPE VARCHAR(256); + + + ALTER TABLE outbox + RENAME payload TO data; + ALTER TABLE outbox + DROP COLUMN meta_data; + ALTER TABLE outbox + ADD COLUMN source VARCHAR(256); + ALTER TABLE outbox + ADD COLUMN type VARCHAR(256); + + + + + ALTER TABLE outbox + DROP COLUMN type; + ALTER TABLE outbox + RENAME topic TO type; + + \ No newline at end of file