diff --git a/docs/modules/ROOT/attachments/examples.json b/docs/modules/ROOT/attachments/examples.json index d2e7c049..436a87d2 100644 --- a/docs/modules/ROOT/attachments/examples.json +++ b/docs/modules/ROOT/attachments/examples.json @@ -98,5 +98,10 @@ "title": "Tokenize a CSV file", "description": "Shows how to define a Camel route in XML for tokenizing a CSV a file.", "link": "https://github.com/apache/camel-quarkus-examples/tree/main/file-split-log-xml" + }, + { + "title": "Vertx-Websocket Chat", + "description": "Shows how to configure a WebSocket server and interact with connected peers.", + "link": "https://github.com/apache/camel-quarkus-examples/tree/main/vertx-websocket-chat" } ] \ No newline at end of file diff --git a/vertx-websocket-chat/README.adoc b/vertx-websocket-chat/README.adoc new file mode 100644 index 00000000..34ce8dee --- /dev/null +++ b/vertx-websocket-chat/README.adoc @@ -0,0 +1,56 @@ += Vertx-Websocket Chat: A Camel Quarkus example +:cq-example-description: An example that shows how to configure a WebSocket server and interact with connected peers. + +{cq-description} + +TIP: Check the https://camel.apache.org/camel-quarkus/latest/first-steps.html[Camel Quarkus User guide] for prerequisites +and other general information. + +== Start in the Development mode + +You can run your application in dev mode that enables live coding using: + +[source,shell] +---- +$ mvn compile quarkus:dev +---- + +The above command compiles the project, starts the application and lets the Quarkus tooling watch for changes in your workspace. Any modifications in your project will automatically take effect in the running application.You can find the application running http://localhost:8080 + +TIP: Please refer to the Development mode section of +https://camel.apache.org/camel-quarkus/latest/first-steps.html#_development_mode[Camel Quarkus User guide] for more details. + +As we run the example in Quarkus Dev Mode, you can connect to the websocket by providing a username.Once you connect, you can see a chat room where you can send/receive messages. +You can write your message in the message section and you will be able to send the message either to a specific peer or to all of the users conncected by clicking the dropdown of the send button.You can click on the peer of your choice from the list to send the message only to that speicific peer.On clicking Send All,you can send your message to all of the users connected locally.You'll also be able to see the conversation on your window. + +=== Package and run the application + +Once you are done with developing you may want to package and run the application. + +TIP: Find more details about the JVM mode and Native mode in the Package and run section of +https://camel.apache.org/camel-quarkus/latest/first-steps.html#_package_and_run_the_application[Camel Quarkus User guide] + +==== JVM mode + +[source,shell] +---- +$ mvn clean package +$ java -jar target/quarkus-app/quarkus-run.jar +---- + +==== Native mode + +IMPORTANT: Native mode requires having GraalVM and other tools installed. Please check the Prerequisites section +of https://camel.apache.org/camel-quarkus/latest/first-steps.html#_prerequisites[Camel Quarkus User guide]. + +To prepare a native executable using GraalVM, run the following command: + +[source,shell] +---- +$ mvn clean package -Pnative +$ ./target/*-runner +---- + +== Feedback + +Please report bugs and propose improvements via https://github.com/apache/camel-quarkus/issues[GitHub issues of Camel Quarkus] project. \ No newline at end of file diff --git a/vertx-websocket-chat/eclipse-formatter-config.xml b/vertx-websocket-chat/eclipse-formatter-config.xml new file mode 100644 index 00000000..2248b2b8 --- /dev/null +++ b/vertx-websocket-chat/eclipse-formatter-config.xml @@ -0,0 +1,276 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/vertx-websocket-chat/pom.xml b/vertx-websocket-chat/pom.xml new file mode 100644 index 00000000..320da7dc --- /dev/null +++ b/vertx-websocket-chat/pom.xml @@ -0,0 +1,287 @@ + + + + 4.0.0 + + camel-quarkus-examples-vertx-websocket-chat + org.apache.camel.quarkus.examples + 3.10.0-SNAPSHOT + + Camel Quarkus :: Examples :: Vertx-Websocket chat + Camel Quarkus Example :: Implementing Websocket + + + 3.9.0 + 3.10.0-SNAPSHOT + + io.quarkus + quarkus-bom + org.apache.camel.quarkus + camel-quarkus-bom + + UTF-8 + UTF-8 + 17 + 17 + ${maven.compiler.target} + ${maven.compiler.source} + + 2.23.0 + 1.9.0 + 4.2 + 3.11.0 + 3.3.0 + 3.3.1 + 3.1.2 + + + + + + + ${quarkus.platform.group-id} + ${quarkus.platform.artifact-id} + ${quarkus.platform.version} + pom + import + + + ${camel-quarkus.platform.group-id} + ${camel-quarkus.platform.artifact-id} + ${camel-quarkus.platform.version} + pom + import + + + + + + + org.apache.camel.quarkus + camel-quarkus-bean + + + org.apache.camel.quarkus + camel-quarkus-jackson + + + org.apache.camel.quarkus + camel-quarkus-vertx-websocket + + + org.apache.camel.quarkus + camel-quarkus-direct + + + org.apache.camel.quarkus + camel-quarkus-rest + + + io.quarkus + quarkus-arc + + + + + io.quarkus + quarkus-junit5 + test + + + + + + + + + net.revelc.code.formatter + formatter-maven-plugin + ${formatter-maven-plugin.version} + + ${maven.multiModuleProjectDirectory}/eclipse-formatter-config.xml + LF + + + + + net.revelc.code + impsort-maven-plugin + ${impsort-maven-plugin.version} + + java.,javax.,org.w3c.,org.xml.,junit. + true + true + java.,javax.,org.w3c.,org.xml.,junit. + + + + + org.apache.maven.plugins + maven-compiler-plugin + ${maven-compiler-plugin.version} + + true + true + + -Xlint:unchecked + + + + + + org.apache.maven.plugins + maven-surefire-plugin + ${maven-surefire-plugin.version} + + false + + org.jboss.logmanager.LogManager + + + + + + ${quarkus.platform.group-id} + quarkus-maven-plugin + ${quarkus.platform.version} + + + + org.apache.maven.plugins + maven-failsafe-plugin + ${maven-surefire-plugin.version} + + + + org.apache.maven.plugins + maven-jar-plugin + ${maven-jar-plugin.version} + + + + com.mycila + license-maven-plugin + ${license-maven-plugin.version} + + true +
${maven.multiModuleProjectDirectory}/header.txt
+ + **/*.adoc + **/*.txt + **/LICENSE.txt + **/LICENSE + **/NOTICE.txt + **/NOTICE + **/README + **/pom.xml.versionsBackup + + + SLASHSTAR_STYLE + CAMEL_PROPERTIES_STYLE + SLASHSTAR_STYLE + + + ${maven.multiModuleProjectDirectory}/license-properties-headerdefinition.xml + +
+
+
+
+ + + + ${quarkus.platform.group-id} + quarkus-maven-plugin + + + build + + build + + + + + + + net.revelc.code.formatter + formatter-maven-plugin + + + format + + format + + process-sources + + + + + + net.revelc.code + impsort-maven-plugin + + + sort-imports + + sort + + process-sources + + + + +
+ + + + native + + + native + + + + native + + + + + org.apache.maven.plugins + maven-failsafe-plugin + + + + integration-test + verify + + + + ${quarkus.package.type} + + + + + + + + + + +
\ No newline at end of file diff --git a/vertx-websocket-chat/src/main/java/org/acme/websocket/ChatMessage.java b/vertx-websocket-chat/src/main/java/org/acme/websocket/ChatMessage.java new file mode 100644 index 00000000..58d264d3 --- /dev/null +++ b/vertx-websocket-chat/src/main/java/org/acme/websocket/ChatMessage.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.acme.websocket; + +import io.quarkus.runtime.annotations.RegisterForReflection; + +@RegisterForReflection(fields = false) +public class ChatMessage { + private String messageContent; + private String recipientName; + + public String getMessageContent() { + return messageContent; + } + + public void setMessageContent(String messageContent) { + this.messageContent = messageContent; + } + + public String getRecipientName() { + return recipientName; + } + + public void setRecipientName(String recipientName) { + this.recipientName = recipientName; + } +} diff --git a/vertx-websocket-chat/src/main/java/org/acme/websocket/SessionManager.java b/vertx-websocket-chat/src/main/java/org/acme/websocket/SessionManager.java new file mode 100644 index 00000000..85f03766 --- /dev/null +++ b/vertx-websocket-chat/src/main/java/org/acme/websocket/SessionManager.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.acme.websocket; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import io.quarkus.runtime.annotations.RegisterForReflection; +import jakarta.inject.Singleton; + +@Singleton +@RegisterForReflection +public class SessionManager { + private final Map SESSIONS = new ConcurrentHashMap<>(); + + public void startSession(String userName, String connectionKey) { + SESSIONS.put(userName.toLowerCase(), connectionKey); + } + + public void endSession(String userName) { + SESSIONS.remove(userName.toLowerCase()); + } + + public String getConnectionKey(String userName) { + return SESSIONS.get(userName.toLowerCase()); + } + + public boolean isSessionExists(String userName) { + return SESSIONS.containsKey(userName.toLowerCase()); + } + + public String[] getAllConnectedUsers() { + String[] connectedUsers = new String[SESSIONS.size()]; + int index = 0; + for (String userName : SESSIONS.keySet()) { + connectedUsers[index++] = userName; + } + return connectedUsers; + } +} diff --git a/vertx-websocket-chat/src/main/java/org/acme/websocket/WebSocketRoutes.java b/vertx-websocket-chat/src/main/java/org/acme/websocket/WebSocketRoutes.java new file mode 100644 index 00000000..b9bb127b --- /dev/null +++ b/vertx-websocket-chat/src/main/java/org/acme/websocket/WebSocketRoutes.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.acme.websocket; + +import io.vertx.core.http.ServerWebSocket; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.apache.camel.Message; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.vertx.websocket.VertxWebsocketConstants; +import org.apache.camel.model.rest.RestBindingMode; +import org.jboss.logging.Logger; + +@ApplicationScoped +public class WebSocketRoutes extends RouteBuilder { + static final Logger LOG = Logger.getLogger(WebSocketRoutes.class); + + @Inject + SessionManager sessionManager; + + @Override + public void configure() throws Exception { + from("vertx-websocket:/chat/{userName}?fireWebSocketConnectionEvents=true") + .choice() + + // Capture OPEN events to track newly connected peers + .when(simple("${header.CamelVertxWebsocket.event} == 'OPEN'")) + .process(exchange -> { + Message message = exchange.getMessage(); + String userName = message.getHeader("userName", String.class); + if (!sessionManager.isSessionExists(userName)) { + String connectionKey = message.getHeader(VertxWebsocketConstants.CONNECTION_KEY, String.class); + sessionManager.startSession(userName, connectionKey); + LOG.infof("Session started for user: %s", userName); + message.setBody("<<<<< " + userName + ": joined the chat"); + } else { + // Reject connections for a user name that is already taken + ServerWebSocket webSocket = message.getBody(ServerWebSocket.class); + // RFC 6455 status codes: https://datatracker.ietf.org/doc/html/rfc6455#section-7.4.1 + webSocket.close((short) 1000, "SESSION_ALREADY_EXISTS"); + LOG.warnf("Rejected connection for user: %s. User session already exists", userName); + } + }) + .to("vertx-websocket:/chat/{userName}?sendToAll=true") + .endChoice() + + // Capture MESSAGE events and broadcast them to all connected peers or specified peer + .when(simple("${header.CamelVertxWebsocket.event} == 'MESSAGE'")) + .choice() + .when(body().contains("recipientName")) + .unmarshal().json(ChatMessage.class) + .process(exchange -> { + Message message = exchange.getMessage(); + ChatMessage chatMessage = message.getBody(ChatMessage.class); + + String recipientConnectionKey = sessionManager.getConnectionKey(chatMessage.getRecipientName()); + exchange.getMessage().setHeader(VertxWebsocketConstants.CONNECTION_KEY, recipientConnectionKey); + exchange.getMessage().setBody(chatMessage); + }) + .setBody().simple("<<<<< ${header.userName}: ${body.messageContent}") + .to("vertx-websocket:/chat/{userName}") + .otherwise() + .log("New message from user ${header.userName}: ${body}") + .setBody().simple("<<<<< ${header.userName}: ${body}") + .to("vertx-websocket:/chat/{userName}?sendToAll=true") + .endChoice() + + // Capture CLOSE events to track peers disconnecting + .when(simple("${header.CamelVertxWebsocket.event} == 'CLOSE'")) + .process(exchange -> { + Message message = exchange.getMessage(); + String userName = message.getHeader("userName", String.class); + String connectionKey = message.getHeader(VertxWebsocketConstants.CONNECTION_KEY, String.class); + String userConnectionKey = sessionManager.getConnectionKey(userName); + if (!connectionKey.equals(userConnectionKey)) { + // WebSocket was closed due to a username that is already active. No need for further processing + message.setBody(null); + return; + } + + if (sessionManager.isSessionExists(userName)) { + LOG.infof("Session ended for user: %s", userName); + sessionManager.endSession(userName); + } + }) + .setBody().simple("<<<<< ${header.userName} left the chat") + .to("vertx-websocket:/chat/{userName}?sendToAll=true") + .endChoice(); + + //Displays list of connected users in the UI + restConfiguration().bindingMode(RestBindingMode.json); + + rest("/peers") + .get() + .to("direct:getConnectedUsers"); + + from("direct:getConnectedUsers") + .setBody(method(sessionManager, "getAllConnectedUsers")); + + } +} diff --git a/vertx-websocket-chat/src/main/resources/META-INF/resources/index.html b/vertx-websocket-chat/src/main/resources/META-INF/resources/index.html new file mode 100644 index 00000000..549b3595 --- /dev/null +++ b/vertx-websocket-chat/src/main/resources/META-INF/resources/index.html @@ -0,0 +1,201 @@ + + + + + + + Vert.x WebSocket Chat Demo + + + + +
+ +
+ +
+
+

Camel Quarkus Vert.x WebSocket Chat

+
+ +
+
+
+ + +
+
+ +
+
+
+ +
+
+
+ +
+
+
+
+ + +
+ +
+
+
+ + + + diff --git a/vertx-websocket-chat/src/main/resources/application.properties b/vertx-websocket-chat/src/main/resources/application.properties new file mode 100644 index 00000000..0a48b4f7 --- /dev/null +++ b/vertx-websocket-chat/src/main/resources/application.properties @@ -0,0 +1,20 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +# +# Quarkus +# +quarkus.banner.enabled = false diff --git a/vertx-websocket-chat/src/test/java/org/acme/websocket/WebSocketRoutesIT.java b/vertx-websocket-chat/src/test/java/org/acme/websocket/WebSocketRoutesIT.java new file mode 100644 index 00000000..84a61b17 --- /dev/null +++ b/vertx-websocket-chat/src/test/java/org/acme/websocket/WebSocketRoutesIT.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.acme.websocket; + +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +public class WebSocketRoutesIT extends WebSocketRoutesTest { +} diff --git a/vertx-websocket-chat/src/test/java/org/acme/websocket/WebSocketRoutesTest.java b/vertx-websocket-chat/src/test/java/org/acme/websocket/WebSocketRoutesTest.java new file mode 100644 index 00000000..efd630d3 --- /dev/null +++ b/vertx-websocket-chat/src/test/java/org/acme/websocket/WebSocketRoutesTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.acme.websocket; + +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.test.junit.QuarkusTest; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpClient; +import io.vertx.core.http.WebSocket; +import org.junit.jupiter.api.Test; + +@QuarkusTest +public class WebSocketRoutesTest { + @TestHTTPResource("/chat/bob") + URI userBob; + + @TestHTTPResource("/chat/amy") + URI userAmy; + + @Test + void chatTest() throws InterruptedException { + CountDownLatch connectLatch = new CountDownLatch(2); + CountDownLatch messageLatch = new CountDownLatch(2); + + Vertx vertx = Vertx.vertx(); + HttpClient client = vertx.createHttpClient(); + try { + AtomicReference bobWebSocketAtomicReference = new AtomicReference<>(); + client.webSocket(userBob.getPort(), userBob.getHost(), userBob.getPath()).onSuccess(webSocket -> { + bobWebSocketAtomicReference.set(webSocket); + connectLatch.countDown(); + }); + + AtomicReference amyWebSocketAtomicReference = new AtomicReference<>(); + client.webSocket(userAmy.getPort(), userAmy.getHost(), userAmy.getPath()).onSuccess(webSocket -> { + amyWebSocketAtomicReference.set(webSocket); + connectLatch.countDown(); + }); + + connectLatch.await(5, TimeUnit.SECONDS); + + WebSocket bobWebSocket = bobWebSocketAtomicReference.get(); + bobWebSocket.handler(message -> { + if (message.toString().toLowerCase().contains("hi bob")) { + messageLatch.countDown(); + } + }); + + WebSocket amyWebSocket = amyWebSocketAtomicReference.get(); + amyWebSocket.handler(message -> { + if (message.toString().toLowerCase().contains("hi amy")) { + messageLatch.countDown(); + } + }); + + bobWebSocket.write(Buffer.buffer("Hi Amy")); + amyWebSocket.write(Buffer.buffer("Hi Bob")); + + messageLatch.await(5, TimeUnit.SECONDS); + } finally { + if (client != null) { + client.close(); + } + + if (vertx != null) { + vertx.close(); + } + } + } +}