Skip to content

Commit

Permalink
Fixed several network issues
Browse files Browse the repository at this point in the history
  • Loading branch information
Martomate committed Aug 1, 2024
1 parent 5005d05 commit bfdf2ac
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 132 deletions.
6 changes: 3 additions & 3 deletions client/src/main/scala/hexacraft/client/ClientWorld.scala
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,9 @@ class ClientWorld(val worldInfo: WorldInfo) extends BlockRepository with BlocksI
for e <- entities do {
if addEntity(e).isEmpty then {
entitiesToSpawnLater += e
println(s"Client: not ready to spawn entity ${e.id}")
// println(s"Client: not ready to spawn entity ${e.id}")
} else {
println(s"Client: finally spawned entity ${e.id}")
// println(s"Client: finally spawned entity ${e.id}")
}
}
}
Expand Down Expand Up @@ -294,7 +294,7 @@ class ClientWorld(val worldInfo: WorldInfo) extends BlockRepository with BlocksI
println(e)
}
case _ =>
println(s"Received entity event for an unknown entity (id: $id, event: $event)")
// println(s"Received entity event for an unknown entity (id: $id, event: $event)")
}
}
}
Expand Down
78 changes: 6 additions & 72 deletions client/src/main/scala/hexacraft/client/GameClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import hexacraft.world.coord.{BlockCoords, BlockRelWorld, ChunkRelWorld, CoordUt
import com.martomate.nbt.Nbt
import org.joml.{Matrix4f, Vector2f, Vector3d, Vector3f}
import org.zeromq.*
import zmq.ZError

import java.util.UUID
import scala.collection.mutable
Expand Down Expand Up @@ -119,8 +118,6 @@ object GameClient {
client.setUniforms(initialWindowSize.logicalAspectRatio)
client.setUseMouse(true)

Thread(() => socket.runMonitoring()).start()

Result.Ok((client, rx))
}

Expand Down Expand Up @@ -495,11 +492,6 @@ class GameClient(

def tick(ctx: TickContext): Unit = {
try {
if socket.isDisconnected then {
logout()
return
}

val Seq(playerNbt, worldEventsNbtPacket, worldLoadingEventsNbt) =
socket.sendMultiplePacketsAndWait(
Seq(NetworkPacket.GetPlayerState, NetworkPacket.GetEvents, NetworkPacket.GetWorldLoadingEvents(5))
Expand Down Expand Up @@ -819,9 +811,6 @@ class GameClient(

class GameClientSocket(serverIp: String, serverPort: Int) {
private val context = ZContext()
context.setUncaughtExceptionHandler((thread, exc) => println(s"Uncaught exception: $exc"))
context.setNotificationExceptionHandler((thread, exc) => println(s"Notification: $exc"))

private val socket = context.createSocket(SocketType.DEALER)

private val clientId = (new Random().nextInt(1000000) + 1000000).toString
Expand All @@ -834,50 +823,6 @@ class GameClientSocket(serverIp: String, serverPort: Int) {
socket.setHeartbeatTimeout(1000)
socket.connect(s"tcp://$serverIp:$serverPort")

private var monitoringThread: Thread = null.asInstanceOf[Thread]

private var _disconnected: Boolean = false
def isDisconnected: Boolean = _disconnected

def runMonitoring(): Unit = {
if monitoringThread != null then {
throw new Exception("May only run monitoring once")
}
monitoringThread = Thread.currentThread()

val monitor = context.synchronized {
if context.isClosed then {
return
}

val monitor = ZMonitor(context, socket)
monitor.add(ZMonitor.Event.ALL)
monitor.verbose(false)
monitor.start()
monitor
}

while !context.isClosed do {
try {
val event = monitor.nextEvent(100)
if event != null then {
if event.`type` == ZMonitor.Event.DISCONNECTED then {
_disconnected = true
}
}
} catch {
case e: ZMQException =>
e.getErrorCode match {
case ZError.EINTR => // noop
case _ => throw e
}
case e => throw e
}
}

monitor.close()
}

def sendPacket(packet: NetworkPacket): Unit = this.synchronized {
val message = packet.serialize()

Expand All @@ -887,8 +832,8 @@ class GameClientSocket(serverIp: String, serverPort: Int) {
}
}

private def queryRaw(message: Array[Byte]): Array[Byte] = this.synchronized {
if !socket.send(message) then {
def sendPacketAndWait(packet: NetworkPacket): Nbt = this.synchronized {
if !socket.send(packet.serialize()) then {
val err = socket.errno()
throw new ZMQException("Could not send message", err)
}
Expand All @@ -899,28 +844,23 @@ class GameClientSocket(serverIp: String, serverPort: Int) {
throw new ZMQException("Could not receive message", err)
}

response
}

def sendPacketAndWait(packet: NetworkPacket): Nbt = {
val response = queryRaw(packet.serialize())
val (_, tag) = Nbt.fromBinary(response)
tag
}

def sendMultiplePacketsAndWait(packets: Seq[NetworkPacket]): Seq[Nbt] = {
def sendMultiplePacketsAndWait(packets: Seq[NetworkPacket]): Seq[Nbt] = this.synchronized {
for p <- packets do {
if !socket.send(p.serialize()) then {
val err = socket.errno()
throw new ZMQException("Could not send message", err)
}
}

for _ <- packets.indices yield {
for i <- packets.indices yield {
val response = socket.recv(0)
if response == null then {
val err = socket.errno()
throw new ZMQException("Could not receive message", err)
throw new ZMQException(s"Could not receive message ${i + 1}", err)
}

val (_, tag) = Nbt.fromBinary(response)
Expand All @@ -929,12 +869,6 @@ class GameClientSocket(serverIp: String, serverPort: Int) {
}

def close(): Unit = {
context.synchronized {
context.close()
}

if monitoringThread != null then {
monitoringThread.interrupt()
}
context.close()
}
}
2 changes: 1 addition & 1 deletion game/src/main/scala/hexacraft/game/NetworkPacket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ enum NetworkPacket {
object NetworkPacket {
def deserialize(bytes: Array[Byte]): NetworkPacket = {
val (packetName, packetDataTag) = Nbt.fromBinary(bytes)
val root = packetDataTag.asInstanceOf[Nbt.MapTag]
val root = packetDataTag.asMap.get

packetName match {
case "login" =>
Expand Down
2 changes: 0 additions & 2 deletions main/src/test/scala/hexacraft/main/GameSceneTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ class GameSceneTest extends FunSuite {

// All newly loaded shader programs should be released after the game is unloaded
assertEquals(shadersRemoved.sorted, shadersAdded.sorted)

gameScene.unload()
}

test("GameScene emits QuitGame event when quit-button is pressed in pause menu") {
Expand Down
1 change: 1 addition & 0 deletions main/src/test/scala/hexacraft/main/MainRouterTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ class MainRouterTest extends FunSuite {
test("Game routes to GameScene".ignore) {
val scene = performSingleRoute(SceneRoute.Game(saveDirPath.toFile, WorldSettings.none, true, false, null))
assert(scene.isInstanceOf[GameScene])
scene.unload()
}

test("Game with Escape key and click on Back to menu routes to MainMenu".ignore) {
Expand Down
100 changes: 46 additions & 54 deletions server/src/main/scala/hexacraft/server/GameServer.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package hexacraft.server

import hexacraft.game.{GameKeyboard, NetworkPacket, PlayerInputHandler, PlayerPhysicsHandler}
import hexacraft.server.TcpServer.Error
import hexacraft.util.{Result, SeqUtils}
import hexacraft.world.*
import hexacraft.world.block.{Block, BlockState}
Expand All @@ -10,8 +11,7 @@ import hexacraft.world.entity.*

import com.martomate.nbt.Nbt
import org.joml.Vector2f
import org.zeromq.{SocketType, ZContext, ZMQ, ZMQException}
import zmq.ZError
import org.zeromq.ZMQException

import java.util.UUID
import scala.collection.mutable
Expand All @@ -20,11 +20,11 @@ object GameServer {
def create(isOnline: Boolean, port: Int, worldProvider: WorldProvider): GameServer = {
val world = new ServerWorld(worldProvider, worldProvider.getWorldInfo)

val server = new GameServer(isOnline, port, worldProvider, world)(using world.size)
val tcpServer = TcpServer
.start(port)
.unwrapWith(m => new IllegalStateException(s"Could not start server: $m"))

Thread(() => server.run()).start()

server
new GameServer(isOnline, tcpServer, worldProvider, world)(using world.size)
}
}

Expand All @@ -35,15 +35,22 @@ case class PlayerData(player: Player, entity: Entity, camera: Camera) {
val entityEventsWaitingToBeSent: mutable.ArrayBuffer[(UUID, EntityEvent)] = mutable.ArrayBuffer.empty
}

class GameServer(isOnline: Boolean, port: Int, worldProvider: WorldProvider, world: ServerWorld)(using CylinderSize) {
private var serverThread: Thread = null.asInstanceOf[Thread]
class GameServer(
isOnline: Boolean,
server: TcpServer,
worldProvider: WorldProvider,
world: ServerWorld
)(using CylinderSize) {

private val players: mutable.LongMap[PlayerData] = mutable.LongMap.empty

private val collisionDetector: CollisionDetector = new CollisionDetector(world)
private val playerInputHandler: PlayerInputHandler = new PlayerInputHandler
private val playerPhysicsHandler: PlayerPhysicsHandler = new PlayerPhysicsHandler(collisionDetector)

private val serverThread: Thread = Thread(() => this.run())
serverThread.start()

private def savePlayers(): Unit = {
for d <- players.values do {
val p = d.player
Expand Down Expand Up @@ -262,61 +269,46 @@ class GameServer(isOnline: Boolean, port: Int, worldProvider: WorldProvider, wor
}

def run(): Unit = {
if serverThread != null then {
throw new RuntimeException("You may only start the server once")
}
serverThread = Thread.currentThread()
val messagesToSend: mutable.ArrayBuffer[(Long, Nbt)] = mutable.ArrayBuffer.empty

try {
val context = ZContext()
while server.running do {
try {
val serverSocket = context.createSocket(SocketType.ROUTER)
serverSocket.setHeartbeatIvl(1000)
serverSocket.setHeartbeatTtl(3000)
serverSocket.setHeartbeatTimeout(3000)

if !serverSocket.bind(s"tcp://*:$port") then {
throw new IllegalStateException("Server could not be bound")
server.receive() match {
case Result.Ok((clientId, packet)) =>
handlePacket(clientId, packet) match {
case Some(res) =>
messagesToSend += clientId -> res
case None =>
}
case Result.Err(error) =>
error match {
case Error.InvalidPacket(message) =>
// Ignore the invalid packet, since it's up to the client to send correct data
println(s"Received invalid packet: $message")
}
}
println(s"Running server on port $port")

while !Thread.currentThread().isInterrupted do {
val identity = serverSocket.recv(0)
if identity.isEmpty then {
throw new Exception("Received an empty identity frame")
}
val clientId = String(identity).toLong
val bytes = serverSocket.recv(0)
if bytes == null then {
throw new ZMQException(serverSocket.errno())
}

val packet = NetworkPacket.deserialize(bytes)
handlePacket(clientId, packet, serverSocket) match {
case Some(res) =>
serverSocket.sendMore(identity)
serverSocket.send(res.toBinary())
case None =>
if server.running then {
for (clientId, data) <- messagesToSend do {
server.send(clientId, data) match {
case Result.Ok(_) =>
case Result.Err(Error.InvalidPacket(message)) =>
// This is a bug in the server, not invalid input, so it's best to shut down
throw new RuntimeException(s"Tried to send invalid packet: $message")
}
}
messagesToSend.clear()
}
} finally context.close()
} catch {
case e: ZMQException =>
e.getErrorCode match {
case ZError.EINTR => // noop
case _ => throw e
}
case e =>
throw e
} catch {
case _: InterruptedException =>
}
}

println(s"Stopped server")
}

private val chunksLoadedPerPlayer: mutable.HashMap[UUID, ChunkLoadingPrioritizer] = mutable.HashMap.empty
private val chunksLoadCount = mutable.LongMap.empty[Int]

private def handlePacket(clientId: Long, packet: NetworkPacket, socket: ZMQ.Socket): Option[Nbt.MapTag] = {
private def handlePacket(clientId: Long, packet: NetworkPacket): Option[Nbt.MapTag] = {
import NetworkPacket.*

packet match {
Expand Down Expand Up @@ -574,8 +566,8 @@ class GameServer(isOnline: Boolean, port: Int, worldProvider: WorldProvider, wor
}

private def stop(): Unit = {
if serverThread != null then {
serverThread.interrupt()
}
server.stop()
serverThread.interrupt()
serverThread.join()
}
}
Loading

0 comments on commit bfdf2ac

Please sign in to comment.