Skip to content

Commit

Permalink
vertx 5
Browse files Browse the repository at this point in the history
  • Loading branch information
zuisong committed Dec 25, 2024
1 parent 6a1ed7a commit b1b6a61
Show file tree
Hide file tree
Showing 6 changed files with 298 additions and 114 deletions.
47 changes: 20 additions & 27 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
<version>1.0.0</version>
<packaging>jar</packaging>
<properties>
<kotlin.version>2.0.21</kotlin.version>
<vertx.version>4.5.11</vertx.version>
<kotlin.version>2.1.0</kotlin.version>
<vertx.version>5.0.0.CR3</vertx.version>
<jackson-bom.version>2.18.2</jackson-bom.version>
<java.version>21</java.version>
<log4j.version>2.24.1</log4j.version>
Expand All @@ -22,17 +22,18 @@
<main.class>cn.mmooo.MainKt</main.class>
<exec.mainClass>${main.class}</exec.mainClass>
<kotlin.compiler.jvmTarget>${java.version}</kotlin.compiler.jvmTarget>
<logback.version>1.5.10</logback.version>
<logback.version>1.5.12</logback.version>
<slf4j.version>2.0.16</slf4j.version>
<netty.version>4.1.114.Final</netty.version>
<rainbowgum.version>0.7.1</rainbowgum.version>
<netty.version>4.2.0.RC1</netty.version>
<rainbowgum.version>0.8.0</rainbowgum.version>
<penna.version>0.8.1</penna.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.5.12</version>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
Expand Down Expand Up @@ -95,14 +96,23 @@
<groupId>io.vertx</groupId>
<artifactId>vertx-stomp</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk-platform-logging</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-compression</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
</dependencies>
<build>
<finalName>app</finalName>
Expand Down Expand Up @@ -245,23 +255,6 @@
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>central</id>
<url>${mvn.repo.url}</url>
</repository>
<repository>
<id>maven_central</id>
<name>Maven Central</name>
<url>https://repo.maven.apache.org/maven2/</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>central</id>
<url>${mvn.repo.url}</url>
</pluginRepository>
</pluginRepositories>
<profiles>
<profile>
<id>native</id>
Expand Down
26 changes: 14 additions & 12 deletions src/main/java/cn/mmooo/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,26 @@ package cn.mmooo
import cn.mmooo.verticle.StompBridgeVerticle
import io.vertx.core.DeploymentOptions
import io.vertx.core.Vertx
import io.vertx.core.VertxBuilder
import io.vertx.core.VertxOptions

import java.util.function.Supplier


fun main() {
//System.setProperty("java.util.logging.manager", "org.apache.logging.log4j.jul.LogManager");
val logger = System.getLogger("main")
val logger = System.getLogger("main")

System.setProperty("hazelcast.logging.type", "jul")
logger.log(System.Logger.Level.INFO, "info")
// System.setProperty("hazelcast.logging.type", "jul")

val vertx = Vertx.vertx(VertxOptions())
val vertx = Vertx.builder().with(VertxOptions()).build()

vertx.runOnContext {
vertx.deployVerticle({ StompBridgeVerticle() }, DeploymentOptions().also {
it.threadingModel = io.vertx.core.ThreadingModel.EVENT_LOOP
it.workerPoolSize = 10
it.instances = 8
})
logger.log(System.Logger.Level.INFO, "application started")
}
vertx.runOnContext {
vertx.deployVerticle(Supplier { StompBridgeVerticle() }, DeploymentOptions().also {
it.threadingModel = io.vertx.core.ThreadingModel.EVENT_LOOP
it.workerPoolSize = 10
it.instances = 8
})
logger.log(System.Logger.Level.INFO, "application started")
}
}
94 changes: 94 additions & 0 deletions src/main/java/cn/mmooo/WS.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package cn.mmooo

import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.*
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.nio.NioIoHandler
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.http.HttpObjectAggregator
import io.netty.handler.codec.http.HttpServerCodec
import io.netty.handler.codec.http.websocketx.*
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler
import io.netty.handler.logging.LogLevel
import io.netty.handler.logging.LoggingHandler
import org.slf4j.LoggerFactory


/**
* socket服务端
*/
class NettyServerNew {

companion object {
private val logger = LoggerFactory.getLogger(NettyServerNew::class.java)
const val PORT = 9998
}


fun run() {
val bossGroup: EventLoopGroup = MultiThreadIoEventLoopGroup(NioIoHandler.newFactory())
val workerGroup: EventLoopGroup = MultiThreadIoEventLoopGroup(NioIoHandler.newFactory())
try {
val b = ServerBootstrap()
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel::class.java)
.childHandler(object : ChannelInitializer<SocketChannel>() {
@Throws(Exception::class)
override fun initChannel(channel: SocketChannel) {
//获取责任链对象
val pipeline = channel.pipeline().addLast("logger", LoggingHandler(LogLevel.INFO)) // 调试器
.addLast(HttpServerCodec()) // HTTP 协议解析,用于握手阶段
.addLast(HttpObjectAggregator(65536)) // HTTP 协议解析,用于握手阶段
.addLast(WebSocketServerCompressionHandler()) // WebSocket 数据压缩扩展
.addLast(WebSocketServerProtocolHandler("/", null, true)) // WebSocket 握手、控制帧处理

pipeline.addLast(MyWebSocketServerHandler())
}
})

val f = b.bind(PORT).sync()
logger.info("websocket service has been started at $PORT")
f.channel().closeFuture().sync()
} catch (e: Exception) {
logger.error("websocket service failed to start", e)
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully()
workerGroup.shutdownGracefully()
}
}


@Sharable
class MyWebSocketServerHandler : SimpleChannelInboundHandler<WebSocketFrame?>() {
@Throws(java.lang.Exception::class)
override fun channelRead0(ctx: ChannelHandlerContext, frame: WebSocketFrame?) {
when (frame) {
is TextWebSocketFrame -> { // 此处仅处理 Text Frame
logger.info("TextWebSocketFrame $frame")
val request = frame.text()
ctx.channel().writeAndFlush(TextWebSocketFrame("收到: $request"))
}

is PingWebSocketFrame -> {
logger.info("PING")
ctx.channel().writeAndFlush(PongWebSocketFrame())
}

is BinaryWebSocketFrame -> {
logger.info("BinaryWebSocketFrame ${frame}")
}

is ContinuationWebSocketFrame -> {
logger.info("ContinuationWebSocketFrame ${frame}")
}
}
}
}


}

fun main() {
NettyServerNew().run()
}
146 changes: 73 additions & 73 deletions src/main/java/cn/mmooo/verticle/StompBridgeVerticle.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,81 +16,81 @@ import java.lang.System.Logger.Level.*

class StompBridgeVerticle : AbstractVerticle() {

private val logger = System.getLogger(StompBridgeVerticle::class.java.simpleName)

override fun start() {
val port = System.getenv()["PUSH_PORT"]?.toIntOrNull() ?: 13000

val server = createStompServer(vertx)

val router = Router.router(vertx)
router.route()

.handler(LoggerHandler.create())
.handler(ResponseTimeHandler.create())
.handler(TimeoutHandler.create(10 * 1000))
.handler(CorsHandler.create().allowCredentials(true))
.handler(BodyHandler.create())
.handler(StaticHandler.create())
.failureHandler { rc ->
logger.log(ERROR, "", rc.failure())
rc.response()
.setStatusCode(500)
.end(rc.failure().message)
}

router.get("/health").handler { ctx ->
ctx.response().end("ok")
}
router.get("/").handler { ctx ->
ctx.redirect("/stomp-test.html")
}
private val logger = System.getLogger(StompBridgeVerticle::class.java.simpleName)

override fun start() {
val port = System.getenv()["PUSH_PORT"]?.toIntOrNull() ?: 13000

val server = createStompServer(vertx)

val router = Router.router(vertx)
router.route()

.handler(LoggerHandler.create())
.handler(ResponseTimeHandler.create())
.handler(TimeoutHandler.create(10 * 1000))
.handler(CorsHandler.create().allowCredentials(true))
.handler(BodyHandler.create())
.handler(StaticHandler.create())
.failureHandler { rc ->
logger.log(ERROR, "", rc.failure())
rc.response()
.setStatusCode(500)
.end(rc.failure().message)
}

router.get("/health").handler { ctx ->
ctx.response().end("ok")
}
router.get("/").handler { ctx ->
ctx.redirect("/stomp-test.html")
}

router.post("/push")
.handler { rc: RoutingContext ->
val body = rc.body().asString(Charsets.UTF_8.displayName())
val topic = rc.queryParam("topic").first()
vertx.eventBus().publish(topic, body)
logger.log(INFO, "OK")
rc.response().end("ok")
}


vertx
.createHttpServer(
HttpServerOptions().also {
it.webSocketSubProtocols = listOf("v10.stomp", "v11.stomp", "v12.stomp",
"v13.stomp"
router.post("/push")
.handler { rc: RoutingContext ->
val body = rc.body().asString(Charsets.UTF_8.displayName())
val topic = rc.queryParam("topic").first()
vertx.eventBus().publish(topic, body)
logger.log(INFO, "OK")
rc.response().end("ok")
}


vertx
.createHttpServer(
HttpServerOptions().also {
it.webSocketSubProtocols = listOf(
"v10.stomp", "v11.stomp", "v12.stomp",
)
it.isHttp2ClearTextEnabled = true
}
)
it.isHttp2ClearTextEnabled = true
}
.webSocketHandler(server.webSocketHandler())
.requestHandler(router)
.listen(port)
.onSuccess {
logger.log(INFO, "websocket server listen at {0}", port)
}
}

)
.webSocketHandler(server.webSocketHandler())
.requestHandler(router)
.listen(port)
.onSuccess {
logger.log(INFO, "websocket server listen at {0}", port)
}
}


private fun createStompServer(vertx: Vertx): StompServer {

return StompServer.create(
vertx,
StompServerOptions()
.setPort(-1)
.setWebsocketBridge(true)
.setWebsocketPath("/stomp")
).handler(
StompServerHandler.create(vertx).bridge(
BridgeOptions()
// 禁止网页向 eventbus 发消息
.addInboundPermitted(PermittedOptions().setAddress("NO_PERMISSION"))
.addOutboundPermitted(PermittedOptions())
)
)
}

private fun createStompServer(vertx: Vertx): StompServer {

return StompServer.create(
vertx,
StompServerOptions()
.setPort(-1)
.setWebsocketBridge(true)
.setWebsocketPath("/stomp")

).handler(
StompServerHandler.create(vertx).bridge(
BridgeOptions()
// 禁止网页向 eventbus 发消息
.addInboundPermitted(PermittedOptions().setAddress("NO_PERMISSION"))
.addOutboundPermitted(PermittedOptions())
)
)
}

}
Loading

0 comments on commit b1b6a61

Please sign in to comment.