diff --git a/io/js-jvm/src/test/scala/fs2/io/net/udp/UdpSuite.scala b/io/js-jvm/src/test/scala/fs2/io/net/udp/UdpSuite.scala index 50f8934d6a..19fd4effc2 100644 --- a/io/js-jvm/src/test/scala/fs2/io/net/udp/UdpSuite.scala +++ b/io/js-jvm/src/test/scala/fs2/io/net/udp/UdpSuite.scala @@ -29,7 +29,13 @@ import cats.syntax.all._ import com.comcast.ip4s._ +import scala.concurrent.duration._ + class UdpSuite extends Fs2Suite with UdpSuitePlatform { + def sendAndReceive(socket: DatagramSocket[IO], toSend: Datagram): IO[Datagram] = + socket + .write(toSend) >> socket.read.timeoutTo(1.second, IO.defer(sendAndReceive(socket, toSend))) + group("udp") { test("echo one") { val msg = Chunk.array("Hello, world!".getBytes) @@ -38,15 +44,11 @@ class UdpSuite extends Fs2Suite with UdpSuitePlatform { .flatMap { serverSocket => Stream.eval(serverSocket.localAddress).map(_.port).flatMap { serverPort => val serverAddress = SocketAddress(ip"127.0.0.1", serverPort) - val server = serverSocket.reads - .evalMap(packet => serverSocket.write(packet)) - .drain - val client = Stream.resource(Network[IO].openDatagramSocket()).flatMap { clientSocket => - Stream(Datagram(serverAddress, msg)) - .through(clientSocket.writes) - .drain ++ Stream.eval(clientSocket.read) + val server = serverSocket.reads.foreach(packet => serverSocket.write(packet)) + val client = Stream.resource(Network[IO].openDatagramSocket()).evalMap { clientSocket => + sendAndReceive(clientSocket, Datagram(serverAddress, msg)) } - server.mergeHaltBoth(client) + client.concurrently(server) } } .compile @@ -69,21 +71,17 @@ class UdpSuite extends Fs2Suite with UdpSuitePlatform { .flatMap { serverSocket => Stream.eval(serverSocket.localAddress).map(_.port).flatMap { serverPort => val serverAddress = SocketAddress(ip"127.0.0.1", serverPort) - val server = serverSocket.reads - .evalMap(packet => serverSocket.write(packet)) - .drain + val server = serverSocket.reads.foreach(packet => serverSocket.write(packet)) val client = Stream.resource(Network[IO].openDatagramSocket()).flatMap { clientSocket => Stream .emits(msgs.map(msg => Datagram(serverAddress, msg))) - .flatMap { msg => - Stream.exec(clientSocket.write(msg)) ++ Stream.eval(clientSocket.read) - } + .evalMap(msg => sendAndReceive(clientSocket, msg)) } val clients = Stream .constant(client) .take(numClients.toLong) .parJoin(numParallelClients) - server.mergeHaltBoth(clients) + clients.concurrently(server) } } .compile @@ -110,15 +108,13 @@ class UdpSuite extends Fs2Suite with UdpSuitePlatform { .exec( v4Interfaces.traverse_(interface => serverSocket.join(groupJoin, interface)) ) ++ - serverSocket.reads - .evalMap(packet => serverSocket.write(packet)) - .drain + serverSocket.reads.foreach(packet => serverSocket.write(packet)) val client = Stream.resource(Network[IO].openDatagramSocket()).flatMap { clientSocket => Stream(Datagram(SocketAddress(group.address, serverPort), msg)) .through(clientSocket.writes) .drain ++ Stream.eval(clientSocket.read) } - server.mergeHaltBoth(client) + client.concurrently(server) } } .compile