Skip to content

Commit

Permalink
Merge pull request #210 from amosproj/sprint-12
Browse files Browse the repository at this point in the history
Sprint 12
  • Loading branch information
iheziqi authored Jul 11, 2023
2 parents 20f3d54 + f19bebe commit 3f16e66
Show file tree
Hide file tree
Showing 66 changed files with 1,072 additions and 2,324 deletions.
21 changes: 17 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,33 @@ We want to achieve this by structuring our UI according to the topology of Apach
* Node.js Version **20.0.0** or higher
* Docker Desktop

## Backend and Pulsar instance
## Startup

First, build the application JAR from the `backend` directory with:

`./mvnw package -DskipTests`

Then, start Docker Desktop and create the pulsar setup from the root-directory with:
### Start Frontend, Backend and Pulsar instance without demodata

Start Docker Desktop and create the pulsar setup from the root-directory with:

```bash
echo BACKEND_IP=localhost >> .env
docker-compose --profile backend --profile frontend up --build -d
```

### Start Frontend, Backend and Pulsar instance with demodata

Start Docker Desktop and create the pulsar setup from the root-directory with:

```bash
echo BACKEND_IP=localhost >> .env
docker-compose --profile demodata --profile backend --profile frontend up --build -d
docker-compose --profile backend --profile frontend --profile demodata -f docker-compose.yml -f docker-compose-setup.yml up --build -d
```

Notes:
* The `docker-compose.yml` includes the pulsar, backend and frontend services.
* The `docker-compose-setup.yml` includes services for the local or AWS demodata setup. `-f` selects the files used for the startup.
* `--build` is needed for the first startup, or when the demodata docker images are changed
* `-d` runs the container in the background, so u can close the terminal without terminating the app itself.
* `--profile demodata` is needed when you want to create the demo topology and start the demo producers & consumers, that will continuously send and receive messages
Expand Down Expand Up @@ -64,5 +77,5 @@ so you need to pass it to the `docker-compose` via `-e` flag.

```bash
echo BACKEND_IP=${EC2_IP_ADDRESS} >> .env
docker-compose --profile demodata-aws --profile backend --profile frontend up --build -d
docker-compose --profile backend --profile frontend --profile demodata-aws -f docker-compose.yml -f docker-compose-setup.yml up --build -d
```
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package de.amos.apachepulsarui;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;
Expand All @@ -17,6 +18,9 @@
@EnableCaching
public class ApachePulsarUiApplication {

@Value("${frontend.url}")
private String allowedOrigin;

public static void main(String[] args) {
SpringApplication.run(ApachePulsarUiApplication.class, args);
}
Expand All @@ -26,7 +30,7 @@ public WebMvcConfigurer configurer() {
return new WebMvcConfigurer() {
@Override
public void addCorsMappings(CorsRegistry corsRegistry) {
corsRegistry.addMapping("/**").allowedOrigins("*");
corsRegistry.addMapping("/**").allowedOrigins(allowedOrigin);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.springframework.web.bind.annotation.RestController;

import java.util.List;
import java.util.Set;

@RestController
@RequestMapping("/messages")
Expand All @@ -30,8 +31,10 @@ public class MessageController {
public ResponseEntity<MessagesDto> getMessages(@RequestParam String topic,
@RequestParam(required = false, defaultValue = "10") Integer numMessages,
@RequestParam(required = false, defaultValue = "") List<String> producers,
@RequestParam(required = false, defaultValue = "") List<String> subscriptions) {
List<MessageDto> messageDtos = messageService.getLatestMessagesFiltered(topic, numMessages, producers, subscriptions);
@RequestParam(required = false, defaultValue = "") List<String> subscriptions)
{
Set<MessageDto> messageDtos = messageService.getLatestMessagesFiltered(topic, numMessages, producers, subscriptions);
return new ResponseEntity<>(new MessagesDto(messageDtos), HttpStatus.OK);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
import lombok.AllArgsConstructor;
import lombok.Data;

import java.util.List;
import java.util.Set;


@Data
@AllArgsConstructor
public class MessagesDto {

private List<MessageDto> messages;
private Set<MessageDto> messages;

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

@Service
@Slf4j
@RequiredArgsConstructor
public class MessageService {
private final PulsarAdmin pulsarAdmin;

public List<MessageDto> getLatestMessagesFiltered(String topic, Integer numMessages, List<String> producers, List<String> subscriptions) {
List<MessageDto> messageDtos = getLatestMessagesOfTopic(topic, numMessages);
public Set<MessageDto> getLatestMessagesFiltered(String topic, Integer numMessages, List<String> producers, List<String> subscriptions) {
Set<MessageDto> messageDtos = getLatestMessagesOfTopic(topic, numMessages);
if (!producers.isEmpty()) {
messageDtos = filterByProducers(messageDtos, producers);
}
Expand All @@ -39,12 +43,14 @@ public List<MessageDto> getLatestMessagesFiltered(String topic, Integer numMessa
return messageDtos;
}

private List<MessageDto> filterBySubscription(List<MessageDto> messageDtos, Integer numMessages, String topic, List<String> subscriptions) {
private Set<MessageDto> filterBySubscription(Set<MessageDto> messageDtos, Integer numMessages, String topic, List<String> subscriptions) {
List<String> messageIds = subscriptions.stream()
.flatMap(s -> peekMessageIds(topic, s, numMessages).stream())
.toList();

return messageDtos.stream().filter(m -> messageIds.contains(m.getMessageId())).toList();
return messageDtos.stream()
.filter(m -> messageIds.contains(m.getMessageId()))
.collect(Collectors.toCollection(LinkedHashSet::new));
}

private List<String> peekMessageIds(String topic, String subscription, Integer numMessages) {
Expand All @@ -60,14 +66,13 @@ private List<String> peekMessageIds(String topic, String subscription, Integer n
}


private List<MessageDto> filterByProducers(List<MessageDto> messageDtos, List<String> producers) {
private Set<MessageDto> filterByProducers(Set<MessageDto> messageDtos, List<String> producers) {
return messageDtos.stream()
.filter(m -> producers.contains(m.getProducer()))
.toList();

.collect(Collectors.toCollection(LinkedHashSet::new));
}

private List<MessageDto> getLatestMessagesOfTopic(String topic, Integer numMessages) {
private Set<MessageDto> getLatestMessagesOfTopic(String topic, Integer numMessages) {
var schema = getSchemaIfExists(topic);
try {
var messages = new ArrayList<Message<byte[]>>();
Expand All @@ -83,7 +88,10 @@ private List<MessageDto> getLatestMessagesOfTopic(String topic, Integer numMessa
}
return messages.stream()
.map(message -> MessageDto.fromExistingMessage(message, schema))
.toList();
// latest message first in set
.sorted(Comparator.comparing(MessageDto::getPublishTime, Comparator.reverseOrder()))
// linked to keep the order!
.collect(Collectors.toCollection(LinkedHashSet::new));
} catch (PulsarAdminException e) {
throw new PulsarApiException(
"Could not examine the amount of '%d' messages for topic '%s'".formatted(numMessages, topic),
Expand Down
1 change: 1 addition & 0 deletions backend/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ server.port=8081
pulsar.consumer.url = pulsar://localhost:6650
pulsar.admin.url = http://localhost:8080
server.servlet.context-path=/api
frontend.url = http://localhost:8082
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MockMvc;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static java.util.Collections.emptyList;
import static org.hamcrest.Matchers.equalTo;
Expand All @@ -37,7 +38,7 @@ public class MessageControllerTest {

@Test
void getMessages_returnsMessages() throws Exception {
List<MessageDto> messageDtos = List.of(
Set<MessageDto> messageDtos = Set.of(
aMessage("persistent://public/default/spaceships", "Nebuchadnezzar"),
aMessage("persistent://public/default/spaceships", "Serenity")
);
Expand All @@ -54,7 +55,7 @@ void getMessages_returnsMessages() throws Exception {

@Test
void getMessages_withoutNumMessages_returns10Messages() throws Exception {
var messageDtos = new ArrayList<MessageDto>();
HashSet<MessageDto> messageDtos = new HashSet<>();
for (int i = 0; i < 10; i++) {
messageDtos.add(aMessage("persistent://public/default/test", "Test" + i));
}
Expand All @@ -70,7 +71,7 @@ void getMessages_withoutNumMessages_returns10Messages() throws Exception {

@Test
void getMessages_withProducer_returns10Messages() throws Exception {
var messageDtos = new ArrayList<MessageDto>();
HashSet<MessageDto> messageDtos = new HashSet<>();
for (int i = 0; i < 10; i++) {
messageDtos.add(aMessage("persistent://public/default/test", "Test" + i));
}
Expand All @@ -86,7 +87,7 @@ void getMessages_withProducer_returns10Messages() throws Exception {

@Test
void getMessages_withSubscription_returns10Messages() throws Exception {
var messageDtos = new ArrayList<MessageDto>();
HashSet<MessageDto> messageDtos = new HashSet<>();
for (int i = 0; i < 10; i++) {
messageDtos.add(aMessage("persistent://public/default/test", "Test" + i));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ void getNumberOfLatestMessagesFromTopic_returnsMessages() throws Exception {
}
var messages = messageService.getLatestMessagesFiltered(TOPICNAME, 1, emptyList(), emptyList());

MessageDto messageReceived = messages.get(0);
MessageDto messageReceived = messages.iterator().next();
assertThat(messageReceived.getMessageId()).isNotEmpty(); // generated
assertThat(messageReceived.getTopic()).isEqualTo(messageToSend.getTopic());
assertThat(messageReceived.getPayload()).isEqualTo(messageToSend.getPayload());
Expand All @@ -91,7 +91,7 @@ void getNumberOfLatestMessagesFromTopicFilteredByProducer_returnsMessages() thro
}
var messages = messageService.getLatestMessagesFiltered(TOPICNAME, 1, List.of(producerName), emptyList());

MessageDto messageReceived = messages.get(0);
MessageDto messageReceived = messages.iterator().next();
assertThat(messageReceived.getMessageId()).isNotEmpty(); // generated
assertThat(messageReceived.getTopic()).isEqualTo(messageToSend.getTopic());
assertThat(messageReceived.getPayload()).isEqualTo(messageToSend.getPayload());
Expand Down Expand Up @@ -155,7 +155,7 @@ void getNumberOfLatestMessagesFromTopicFilteredBySubscription_returnsMessages()
}
var messages = messageService.getLatestMessagesFiltered(TOPICNAME, 1, emptyList(), List.of(subscriptionName));

MessageDto messageReceived = messages.get(0);
MessageDto messageReceived = messages.iterator().next();
assertThat(messageReceived.getMessageId()).isNotEmpty(); // generated
assertThat(messageReceived.getTopic()).isEqualTo(messageToSend.getTopic());
assertThat(messageReceived.getPayload()).isEqualTo(messageToSend.getPayload());
Expand All @@ -179,7 +179,7 @@ void getNumberOfLatestMessagesFromTopic_forMessageWithSchema_returnsSchema() thr
}
var messages = messageService.getLatestMessagesFiltered(TOPICNAME, 1, emptyList(), emptyList());

MessageDto messageReceived = messages.get(0);
MessageDto messageReceived = messages.iterator().next();
assertThat(messageReceived.getSchema()).isEqualTo(schema.getSchemaInfo().getSchemaDefinition());
}

Expand Down
49 changes: 49 additions & 0 deletions docker-compose-setup.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
version: '3'
services:

backend:
depends_on:
setup-topology:
condition: service_completed_successfully

setup-topology:
image: pulsarui/setuptopology
profiles:
- demodata
build:
context: demodata/setup-topology
dockerfile: Dockerfile
environment:
- 'PULSAR_ADMIN_URL=http://pulsar:8080'
- 'USE_AWS=false'
depends_on:
pulsar:
condition: service_healthy

setup-topology-aws:
image: pulsarui/setuptopology
profiles:
- demodata-aws
build:
context: demodata/setup-topology
dockerfile: Dockerfile
environment:
- 'PULSAR_ADMIN_URL=http://pulsar:8080'
- 'USE_AWS=true'
depends_on:
pulsar:
condition: service_healthy

demo-producer-consumer:
image: pulsarui/demoproducerconsumer
profiles:
- demodata
build:
context: demodata/demo-producer-consumer
dockerfile: Dockerfile
environment:
- 'PULSAR_URL=pulsar://pulsar:6650'
- 'PUBLISH_INTERVAL_SECONDS=30'
depends_on:
setup-topology:
condition: service_completed_successfully
45 changes: 2 additions & 43 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ services:
environment:
- "PULSAR_CONSUMER_URL=pulsar://pulsar:6650"
- "PULSAR_ADMIN_URL=http://pulsar:8080"
- "FRONTEND_URL=http://${BACKEND_IP}:8082"
depends_on:
pulsar:
condition: service_healthy
Expand Down Expand Up @@ -48,46 +49,4 @@ services:
test: ["CMD", "bin/pulsar-admin", "brokers", "healthcheck"]
interval: 5s
timeout: 10s
retries: 10

setup-topology:
image: pulsarui/setuptopology
profiles:
- demodata
build:
context: demodata/setup-topology
dockerfile: Dockerfile
environment:
- 'PULSAR_ADMIN_URL=http://pulsar:8080'
- 'USE_AWS=false'
depends_on:
pulsar:
condition: service_healthy

setup-topology-aws:
image: pulsarui/setuptopology
profiles:
- demodata-aws
build:
context: demodata/setup-topology
dockerfile: Dockerfile
environment:
- 'PULSAR_ADMIN_URL=http://pulsar:8080'
- 'USE_AWS=true'
depends_on:
pulsar:
condition: service_healthy

demo-producer-consumer:
image: pulsarui/demoproducerconsumer
profiles:
- demodata
build:
context: demodata/demo-producer-consumer
dockerfile: Dockerfile
environment:
- 'PULSAR_URL=pulsar://pulsar:6650'
- 'PUBLISH_INTERVAL_SECONDS=30'
depends_on:
setup-topology:
condition: service_completed_successfully
retries: 10
Loading

0 comments on commit 3f16e66

Please sign in to comment.