From b7bd4c5f491509a5ffa8da651068d6b53a480376 Mon Sep 17 00:00:00 2001 From: Alessandro Zoffoli Date: Sat, 4 Apr 2020 15:47:59 +0200 Subject: [PATCH 1/8] Add support for activemq --- build.sbt | 1 + .../main/scala/jms4s/jms/JmsConnection.scala | 4 +- .../src/main/scala/jms4s/jms/JmsSession.scala | 12 ++--- docker-compose.yml | 11 ++++- .../src/test/scala/jms4s/Jms4sBaseSpec.scala | 9 ++-- tests/src/test/scala/jms4s/activeMQ.scala | 44 +++++++++++++++++++ 6 files changed, 67 insertions(+), 14 deletions(-) create mode 100644 tests/src/test/scala/jms4s/activeMQ.scala diff --git a/build.sbt b/build.sbt index e6042621..8ef0d7e3 100644 --- a/build.sbt +++ b/build.sbt @@ -35,6 +35,7 @@ lazy val tests = project .settings(commonSettings: _*) .enablePlugins(NoPublishPlugin) .settings(libraryDependencies += "org.apache.logging.log4j" % "log4j-slf4j-impl" % log4jSlf4jImplV % Runtime) + .settings(libraryDependencies += "org.apache.activemq" % "activemq-all" % "5.15.12") .settings(parallelExecution in Test := false) .dependsOn(core, ibmMQ) diff --git a/core/src/main/scala/jms4s/jms/JmsConnection.scala b/core/src/main/scala/jms4s/jms/JmsConnection.scala index bbbceb1e..7aa6bc7e 100644 --- a/core/src/main/scala/jms4s/jms/JmsConnection.scala +++ b/core/src/main/scala/jms4s/jms/JmsConnection.scala @@ -13,10 +13,10 @@ class JmsConnection[F[_]: Sync: Logger] private[jms4s] ( def createSession(sessionType: SessionType): Resource[F, JmsSession[F]] = for { - session <- Resource.fromAutoCloseable( + session <- Resource.make( Logger[F].info(s"Opening QueueSession for $wrapped.") *> Sync[F].delay(wrapped.createSession(sessionType.rawTransacted, sessionType.rawAcknowledgeMode)) - ) + )(x => Sync[F].delay(x.close())) _ <- Resource.liftF(Logger[F].info(s"Opened QueueSession $session for $wrapped.")) } yield new JmsSession(session, blocker) } diff --git a/core/src/main/scala/jms4s/jms/JmsSession.scala b/core/src/main/scala/jms4s/jms/JmsSession.scala index 3b45769a..b41f2218 100644 --- a/core/src/main/scala/jms4s/jms/JmsSession.scala +++ b/core/src/main/scala/jms4s/jms/JmsSession.scala @@ -28,28 +28,28 @@ class JmsSession[F[_]: Sync: Logger] private[jms4s] ( jmsDestination: JmsDestination )(implicit CS: ContextShift[F], C: Concurrent[F]): Resource[F, JmsMessageConsumer[F]] = for { - consumer <- Resource.fromAutoCloseable( + consumer <- Resource.make( Logger[F].info(s"Opening MessageConsumer for ${jmsDestination.wrapped}, session: $wrapped...") *> Sync[F].delay(wrapped.createConsumer(jmsDestination.wrapped)) - ) + )(c => Sync[F].delay(c.close())) _ <- Resource.liftF(Logger[F].info(s"Opened MessageConsumer for ${jmsDestination.wrapped}, session: $wrapped.")) } yield new JmsMessageConsumer[F](consumer) def createProducer(jmsDestination: JmsDestination): Resource[F, JmsMessageProducer[F]] = for { - producer <- Resource.fromAutoCloseable( + producer <- Resource.make( Logger[F].info(s"Opening MessageProducer for ${jmsDestination.wrapped}, session: $wrapped...") *> Sync[F].delay(wrapped.createProducer(jmsDestination.wrapped)) - ) + )(c => Sync[F].delay(c.close())) _ <- Resource.liftF(Logger[F].info(s"Opened MessageProducer for ${jmsDestination.wrapped}, session: $wrapped.")) } yield new JmsMessageProducer(producer) val createUnidentifiedProducer: Resource[F, JmsUnidentifiedMessageProducer[F]] = for { - producer <- Resource.fromAutoCloseable( + producer <- Resource.make( Logger[F].info(s"Opening unidentified MessageProducer, session: $wrapped...") *> Sync[F].delay(wrapped.createProducer(null)) - ) + )(c => Sync[F].delay(c.close())) _ <- Resource.liftF(Logger[F].info(s"Opened unidentified MessageProducer, session: $wrapped.")) } yield new JmsUnidentifiedMessageProducer(producer) diff --git a/docker-compose.yml b/docker-compose.yml index c53cb375..579dfe07 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,4 @@ -ibmMQ: +ibmmq: restart: always image: ibmcom/mq # https://github.com/ibm-messaging/mq-docker ports: @@ -25,4 +25,11 @@ ibmMQ: # DEV.LISTENER.TCP - Listening on Port 1414. # Topic -# DEV.BASE.TOPIC - With a topic string of dev/ \ No newline at end of file +# DEV.BASE.TOPIC - With a topic string of dev/ + +activemq: + restart: always + image: rmohr/activemq + ports: + - "8161:8161" + - "61616:61616" diff --git a/tests/src/test/scala/jms4s/Jms4sBaseSpec.scala b/tests/src/test/scala/jms4s/Jms4sBaseSpec.scala index 006db4d0..7bc955e7 100644 --- a/tests/src/test/scala/jms4s/Jms4sBaseSpec.scala +++ b/tests/src/test/scala/jms4s/Jms4sBaseSpec.scala @@ -7,7 +7,7 @@ import cats.implicits._ import io.chrisdavenport.log4cats.SelfAwareStructuredLogger import io.chrisdavenport.log4cats.slf4j.Slf4jLogger import jms4s.config._ -import jms4s.ibmmq.ibmMQ.makeConnection +//import jms4s.ibmmq.ibmMQ.makeConnection import jms4s.jms.JmsMessage.JmsTextMessage import jms4s.jms.{ JmsConnection, JmsMessageConsumer, MessageFactory } @@ -20,7 +20,7 @@ trait Jms4sBaseSpec { val connectionRes: Resource[IO, JmsConnection[IO]] = blockerRes.flatMap( blocker => - makeConnection[IO]( + activeMQ.makeConnection[IO]( Config( qm = QueueManager("QM1"), endpoints = NonEmptyList.one(Endpoint("localhost", 1414)), @@ -31,7 +31,8 @@ trait Jms4sBaseSpec { // password = None, channel = Channel("DEV.ADMIN.SVRCONN"), username = Some(Username("admin")), - password = Some(Password("passw0rd")), +// password = Some(Password("passw0rd")), + password = Some(Password("admin")), clientId = "jms-specs" ), blocker @@ -41,7 +42,7 @@ trait Jms4sBaseSpec { val nMessages: Int = 50 val bodies: IndexedSeq[String] = (0 until nMessages).map(i => s"$i") val poolSize: Int = 4 - val timeout: FiniteDuration = 2.seconds + val timeout: FiniteDuration = 10.seconds val delay: FiniteDuration = 500.millis val topicName: TopicName = TopicName("DEV.BASE.TOPIC") val topicName2: TopicName = TopicName("DEV.BASE.TOPIC.1") diff --git a/tests/src/test/scala/jms4s/activeMQ.scala b/tests/src/test/scala/jms4s/activeMQ.scala new file mode 100644 index 00000000..930ca0e2 --- /dev/null +++ b/tests/src/test/scala/jms4s/activeMQ.scala @@ -0,0 +1,44 @@ +package jms4s + +import cats.data.NonEmptyList +import cats.effect.{ Blocker, Resource, Sync } +import cats.implicits._ +import io.chrisdavenport.log4cats.Logger +import javax.jms.Connection +import jms4s.config.{ Config, Endpoint } +import jms4s.jms.JmsConnection +import org.apache.activemq.ActiveMQConnectionFactory + +object activeMQ { + + def makeConnection[F[_]: Sync: Logger](config: Config, blocker: Blocker): Resource[F, JmsConnection[F]] = + for { + connection <- Resource.make( + Logger[F].info(s"Opening QueueConnection to MQ at ${hosts(config.endpoints)}...") >> + Sync[F].delay { + val queueConnectionFactory: ActiveMQConnectionFactory = + new ActiveMQConnectionFactory("tcp://localhost:61616") +// queueConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT) +// queueConnectionFactory.set(config.qm.value) +// queueConnectionFactory.setConnectionNameList(hosts(config.endpoints)) +// queueConnectionFactory.setChannel(config.channel.value) + queueConnectionFactory.setClientID(config.clientId) + + val connection: Connection = config.username.map { (username) => + queueConnectionFactory.createConnection( + username.value, + config.password.map(_.value).getOrElse("") + ) + }.getOrElse(queueConnectionFactory.createConnection) + + connection.start() + connection + } + )(c => Sync[F].delay(c.close())) + _ <- Resource.liftF(Logger[F].info(s"Opened QueueConnection $connection.")) + } yield new JmsConnection[F](connection, blocker) + + private def hosts(endpoints: NonEmptyList[Endpoint]): String = + endpoints.map(e => s"${e.host}(${e.port})").toList.mkString(",") + +} From 7c75b71f200695d23ed83fa2b2f0269a671093c8 Mon Sep 17 00:00:00 2001 From: Alessandro Zoffoli Date: Sun, 5 Apr 2020 15:16:00 +0200 Subject: [PATCH 2/8] Add proper module for activemq --- .../src/main}/scala/jms4s/activeMQ.scala | 29 ++++++---- build.sbt | 18 ++++-- .../main/scala/jms4s/jms/JmsConnection.scala | 11 +++- .../src/main/scala/jms4s/jms/JmsSession.scala | 56 ++++++++++++++++--- ibm-mq/src/main/scala/jms4s/ibmmq/ibmMQ.scala | 27 +++++---- .../test/scala/jms4s/IbmMQJmsClientSpec.scala | 32 +++++++++++ .../src/test/scala/jms4s/Jms4sBaseSpec.scala | 29 +--------- .../src/test/scala/jms4s/JmsClientSpec.scala | 2 +- tests/src/test/scala/jms4s/jms/JmsSpec.scala | 29 +++++++++- 9 files changed, 165 insertions(+), 68 deletions(-) rename {tests/src/test => active-mq/src/main}/scala/jms4s/activeMQ.scala (51%) create mode 100644 tests/src/test/scala/jms4s/IbmMQJmsClientSpec.scala diff --git a/tests/src/test/scala/jms4s/activeMQ.scala b/active-mq/src/main/scala/jms4s/activeMQ.scala similarity index 51% rename from tests/src/test/scala/jms4s/activeMQ.scala rename to active-mq/src/main/scala/jms4s/activeMQ.scala index 930ca0e2..1fe16bda 100644 --- a/tests/src/test/scala/jms4s/activeMQ.scala +++ b/active-mq/src/main/scala/jms4s/activeMQ.scala @@ -2,40 +2,45 @@ package jms4s import cats.data.NonEmptyList import cats.effect.{ Blocker, Resource, Sync } -import cats.implicits._ import io.chrisdavenport.log4cats.Logger import javax.jms.Connection import jms4s.config.{ Config, Endpoint } import jms4s.jms.JmsConnection import org.apache.activemq.ActiveMQConnectionFactory +import cats.implicits._ object activeMQ { def makeConnection[F[_]: Sync: Logger](config: Config, blocker: Blocker): Resource[F, JmsConnection[F]] = for { connection <- Resource.make( - Logger[F].info(s"Opening QueueConnection to MQ at ${hosts(config.endpoints)}...") >> + Logger[F].info(s"Opening Connection to MQ at ${hosts(config.endpoints)}...") *> Sync[F].delay { - val queueConnectionFactory: ActiveMQConnectionFactory = + val connectionFactory: ActiveMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616") -// queueConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT) -// queueConnectionFactory.set(config.qm.value) -// queueConnectionFactory.setConnectionNameList(hosts(config.endpoints)) -// queueConnectionFactory.setChannel(config.channel.value) - queueConnectionFactory.setClientID(config.clientId) +// connectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT) +// connectionFactory.set(config.qm.value) +// connectionFactory.setConnectionNameList(hosts(config.endpoints)) +// connectionFactory.setChannel(config.channel.value) + connectionFactory.setClientID(config.clientId) val connection: Connection = config.username.map { (username) => - queueConnectionFactory.createConnection( + connectionFactory.createConnection( username.value, config.password.map(_.value).getOrElse("") ) - }.getOrElse(queueConnectionFactory.createConnection) + }.getOrElse(connectionFactory.createConnection) connection.start() connection } - )(c => Sync[F].delay(c.close())) - _ <- Resource.liftF(Logger[F].info(s"Opened QueueConnection $connection.")) + )( + c => + Logger[F].info(s"Closing Connection $c to MQ at ${hosts(config.endpoints)}...") *> + Sync[F].delay(c.close()) *> + Logger[F].info(s"Closed Connection $c to MQ at ${hosts(config.endpoints)}.") + ) + _ <- Resource.liftF(Logger[F].info(s"Opened connection $connection.")) } yield new JmsConnection[F](connection, blocker) private def hosts(endpoints: NonEmptyList[Endpoint]): String = diff --git a/build.sbt b/build.sbt index 8ef0d7e3..3dfd18ec 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,7 @@ val catsV = "2.0.0" val jmsV = "2.0.1" val ibmMQV = "9.1.4.0" +val activeMQV = "5.15.12" val catsEffectV = "2.0.0" val catsEffectScalaTestV = "0.4.0" val fs2V = "2.0.0" @@ -14,7 +15,7 @@ val betterMonadicForV = "0.3.1" lazy val jms4s = project .in(file(".")) .enablePlugins(NoPublishPlugin) - .aggregate(core, ibmMQ, tests, examples, site) + .aggregate(core, ibmMQ, activeMQ, tests, examples, site) lazy val core = project .in(file("core")) @@ -30,20 +31,27 @@ lazy val ibmMQ = project .settings(parallelExecution in Test := false) .dependsOn(core) +lazy val activeMQ = project + .in(file("active-mq")) + .settings(commonSettings) + .settings(name := "jms4s-active-mq") + .settings(libraryDependencies += "org.apache.activemq" % "activemq-all" % activeMQV) + .settings(parallelExecution in Test := false) + .dependsOn(core) + lazy val tests = project .in(file("tests")) .settings(commonSettings: _*) .enablePlugins(NoPublishPlugin) .settings(libraryDependencies += "org.apache.logging.log4j" % "log4j-slf4j-impl" % log4jSlf4jImplV % Runtime) - .settings(libraryDependencies += "org.apache.activemq" % "activemq-all" % "5.15.12") .settings(parallelExecution in Test := false) - .dependsOn(core, ibmMQ) + .dependsOn(ibmMQ, activeMQ) lazy val examples = project .in(file("examples")) .settings(commonSettings: _*) .enablePlugins(NoPublishPlugin) - .dependsOn(core, ibmMQ) + .dependsOn(ibmMQ, activeMQ) lazy val site = project .in(file("site")) @@ -51,7 +59,7 @@ lazy val site = project .enablePlugins(MdocPlugin) .enablePlugins(NoPublishPlugin) .settings(commonSettings) - .dependsOn(core) + .dependsOn(core, ibmMQ, activeMQ) .settings { import microsites._ Seq( diff --git a/core/src/main/scala/jms4s/jms/JmsConnection.scala b/core/src/main/scala/jms4s/jms/JmsConnection.scala index 7aa6bc7e..0660f51b 100644 --- a/core/src/main/scala/jms4s/jms/JmsConnection.scala +++ b/core/src/main/scala/jms4s/jms/JmsConnection.scala @@ -14,9 +14,14 @@ class JmsConnection[F[_]: Sync: Logger] private[jms4s] ( def createSession(sessionType: SessionType): Resource[F, JmsSession[F]] = for { session <- Resource.make( - Logger[F].info(s"Opening QueueSession for $wrapped.") *> + Logger[F].info(s"Opening Session for Connection $wrapped.") *> Sync[F].delay(wrapped.createSession(sessionType.rawTransacted, sessionType.rawAcknowledgeMode)) - )(x => Sync[F].delay(x.close())) - _ <- Resource.liftF(Logger[F].info(s"Opened QueueSession $session for $wrapped.")) + )( + s => + Logger[F].info(s"Closing Session $s for Connection $wrapped...") *> + Sync[F].delay(s.close()) *> + Logger[F].info(s"Closed Session $s for Connection $wrapped.") + ) + _ <- Resource.liftF(Logger[F].info(s"Opened Session $session for Connection $wrapped.")) } yield new JmsSession(session, blocker) } diff --git a/core/src/main/scala/jms4s/jms/JmsSession.scala b/core/src/main/scala/jms4s/jms/JmsSession.scala index b41f2218..485b0105 100644 --- a/core/src/main/scala/jms4s/jms/JmsSession.scala +++ b/core/src/main/scala/jms4s/jms/JmsSession.scala @@ -29,28 +29,66 @@ class JmsSession[F[_]: Sync: Logger] private[jms4s] ( )(implicit CS: ContextShift[F], C: Concurrent[F]): Resource[F, JmsMessageConsumer[F]] = for { consumer <- Resource.make( - Logger[F].info(s"Opening MessageConsumer for ${jmsDestination.wrapped}, session: $wrapped...") *> + Logger[F].info( + s"Opening MessageConsumer for Destination ${jmsDestination.wrapped}, Session: $wrapped..." + ) *> Sync[F].delay(wrapped.createConsumer(jmsDestination.wrapped)) - )(c => Sync[F].delay(c.close())) - _ <- Resource.liftF(Logger[F].info(s"Opened MessageConsumer for ${jmsDestination.wrapped}, session: $wrapped.")) + )( + c => + Logger[F].info( + s"Closing MessageConsumer $c for Destination ${jmsDestination.wrapped}, Session: $wrapped..." + ) *> + Sync[F].delay(c.close()) *> + Logger[F].info( + s"Closed MessageConsumer $c for Destination ${jmsDestination.wrapped}, Session: $wrapped." + ) + ) + _ <- Resource.liftF( + Logger[F].info( + s"Opened MessageConsumer $consumer for Destination ${jmsDestination.wrapped}, Session: $wrapped." + ) + ) } yield new JmsMessageConsumer[F](consumer) def createProducer(jmsDestination: JmsDestination): Resource[F, JmsMessageProducer[F]] = for { producer <- Resource.make( - Logger[F].info(s"Opening MessageProducer for ${jmsDestination.wrapped}, session: $wrapped...") *> + Logger[F].info( + s"Opening MessageProducer for Destination ${jmsDestination.wrapped}, Session: $wrapped..." + ) *> Sync[F].delay(wrapped.createProducer(jmsDestination.wrapped)) - )(c => Sync[F].delay(c.close())) - _ <- Resource.liftF(Logger[F].info(s"Opened MessageProducer for ${jmsDestination.wrapped}, session: $wrapped.")) + )( + c => + Logger[F].info( + s"Closing MessageProducer $c for Destination ${jmsDestination.wrapped}, Session: $wrapped..." + ) *> + Sync[F].delay(c.close()) *> + Logger[F].info( + s"Closed MessageProducer $c for Destination ${jmsDestination.wrapped}, Session: $wrapped." + ) + ) + _ <- Resource.liftF( + Logger[F] + .info(s"Opened MessageProducer $producer for Destination ${jmsDestination.wrapped}, Session: $wrapped.") + ) } yield new JmsMessageProducer(producer) val createUnidentifiedProducer: Resource[F, JmsUnidentifiedMessageProducer[F]] = for { producer <- Resource.make( - Logger[F].info(s"Opening unidentified MessageProducer, session: $wrapped...") *> + Logger[F].info(s"Opening unidentified MessageProducer, Session: $wrapped...") *> Sync[F].delay(wrapped.createProducer(null)) - )(c => Sync[F].delay(c.close())) - _ <- Resource.liftF(Logger[F].info(s"Opened unidentified MessageProducer, session: $wrapped.")) + )( + c => + Logger[F].info( + s"Closing unidentified MessageProducer $c, Session: $wrapped..." + ) *> + Sync[F].delay(c.close()) *> + Logger[F].info( + s"Closed unidentified MessageProducer $c, Session: $wrapped." + ) + ) + _ <- Resource.liftF(Logger[F].info(s"Opened unidentified MessageProducer $producer, Session: $wrapped.")) } yield new JmsUnidentifiedMessageProducer(producer) val createMessage: F[JmsMessage[F]] = diff --git a/ibm-mq/src/main/scala/jms4s/ibmmq/ibmMQ.scala b/ibm-mq/src/main/scala/jms4s/ibmmq/ibmMQ.scala index c8c5d290..3ec325c7 100644 --- a/ibm-mq/src/main/scala/jms4s/ibmmq/ibmMQ.scala +++ b/ibm-mq/src/main/scala/jms4s/ibmmq/ibmMQ.scala @@ -13,28 +13,33 @@ object ibmMQ { def makeConnection[F[_]: Sync: Logger](config: Config, blocker: Blocker): Resource[F, JmsConnection[F]] = for { - connection <- Resource.fromAutoCloseable( - Logger[F].info(s"Opening QueueConnection to MQ at ${hosts(config.endpoints)}...") >> + connection <- Resource.make( + Logger[F].info(s"Opening Connection to MQ at ${hosts(config.endpoints)}...") >> Sync[F].delay { - val queueConnectionFactory: MQConnectionFactory = new MQConnectionFactory() - queueConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT) - queueConnectionFactory.setQueueManager(config.qm.value) - queueConnectionFactory.setConnectionNameList(hosts(config.endpoints)) - queueConnectionFactory.setChannel(config.channel.value) - queueConnectionFactory.setClientID(config.clientId) + val connectionFactory: MQConnectionFactory = new MQConnectionFactory() + connectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT) + connectionFactory.setQueueManager(config.qm.value) + connectionFactory.setConnectionNameList(hosts(config.endpoints)) + connectionFactory.setChannel(config.channel.value) + connectionFactory.setClientID(config.clientId) val connection = config.username.map { (username) => - queueConnectionFactory.createConnection( + connectionFactory.createConnection( username.value, config.password.map(_.value).getOrElse("") ) - }.getOrElse(queueConnectionFactory.createConnection) + }.getOrElse(connectionFactory.createConnection) connection.start() connection } + )( + c => + Logger[F].info(s"Closing Connection $c at ${hosts(config.endpoints)}...") *> + Sync[F].delay(c.close()) *> + Logger[F].info(s"Closed Connection $c.") ) - _ <- Resource.liftF(Logger[F].info(s"Opened QueueConnection $connection.")) + _ <- Resource.liftF(Logger[F].info(s"Opened Connection $connection at ${hosts(config.endpoints)}.")) } yield new JmsConnection[F](connection, blocker) private def hosts(endpoints: NonEmptyList[Endpoint]): String = diff --git a/tests/src/test/scala/jms4s/IbmMQJmsClientSpec.scala b/tests/src/test/scala/jms4s/IbmMQJmsClientSpec.scala new file mode 100644 index 00000000..9a4cf051 --- /dev/null +++ b/tests/src/test/scala/jms4s/IbmMQJmsClientSpec.scala @@ -0,0 +1,32 @@ +package jms4s +import cats.data.NonEmptyList +import cats.effect.{ Blocker, IO, Resource } +import jms4s.config._ +import jms4s.ibmmq.ibmMQ +import jms4s.jms.JmsConnection + +class IbmMQJmsClientSpec extends JmsClientSpec { + override def connectionRes: Resource[IO, JmsConnection[IO]] = + Blocker + .apply[IO] + .flatMap( + blocker => + ibmMQ.makeConnection[IO]( + Config( + qm = QueueManager("QM1"), + endpoints = NonEmptyList.one(Endpoint("localhost", 1414)), + // the current docker image seems to be misconfigured, so I need to use admin channel/auth in order to test topic + // but maybe it's just me not understanding something properly.. as usual + // channel = Channel("DEV.APP.SVRCONN"), + // username = Some(Username("app")), + // password = None, + channel = Channel("DEV.ADMIN.SVRCONN"), + username = Some(Username("admin")), + password = Some(Password("passw0rd")), +// password = Some(Password("admin")), + clientId = "jms-specs" + ), + blocker + ) + ) +} diff --git a/tests/src/test/scala/jms4s/Jms4sBaseSpec.scala b/tests/src/test/scala/jms4s/Jms4sBaseSpec.scala index 7bc955e7..75a85c00 100644 --- a/tests/src/test/scala/jms4s/Jms4sBaseSpec.scala +++ b/tests/src/test/scala/jms4s/Jms4sBaseSpec.scala @@ -1,13 +1,11 @@ package jms4s -import cats.data.NonEmptyList import cats.effect.concurrent.Ref -import cats.effect.{ Blocker, IO, Resource } +import cats.effect.{ IO, Resource } import cats.implicits._ import io.chrisdavenport.log4cats.SelfAwareStructuredLogger import io.chrisdavenport.log4cats.slf4j.Slf4jLogger import jms4s.config._ -//import jms4s.ibmmq.ibmMQ.makeConnection import jms4s.jms.JmsMessage.JmsTextMessage import jms4s.jms.{ JmsConnection, JmsMessageConsumer, MessageFactory } @@ -16,33 +14,12 @@ import scala.concurrent.duration.{ FiniteDuration, _ } trait Jms4sBaseSpec { implicit val logger: SelfAwareStructuredLogger[IO] = Slf4jLogger.getLogger[IO] - val blockerRes: Resource[IO, Blocker] = Blocker.apply - - val connectionRes: Resource[IO, JmsConnection[IO]] = blockerRes.flatMap( - blocker => - activeMQ.makeConnection[IO]( - Config( - qm = QueueManager("QM1"), - endpoints = NonEmptyList.one(Endpoint("localhost", 1414)), - // the current docker image seems to be misconfigured, so I need to use admin channel/auth in order to test topic - // but maybe it's just me not understanding something properly.. as usual - // channel = Channel("DEV.APP.SVRCONN"), - // username = Some(Username("app")), - // password = None, - channel = Channel("DEV.ADMIN.SVRCONN"), - username = Some(Username("admin")), -// password = Some(Password("passw0rd")), - password = Some(Password("admin")), - clientId = "jms-specs" - ), - blocker - ) - ) + def connectionRes: Resource[IO, JmsConnection[IO]] val nMessages: Int = 50 val bodies: IndexedSeq[String] = (0 until nMessages).map(i => s"$i") val poolSize: Int = 4 - val timeout: FiniteDuration = 10.seconds + val timeout: FiniteDuration = 2.seconds val delay: FiniteDuration = 500.millis val topicName: TopicName = TopicName("DEV.BASE.TOPIC") val topicName2: TopicName = TopicName("DEV.BASE.TOPIC.1") diff --git a/tests/src/test/scala/jms4s/JmsClientSpec.scala b/tests/src/test/scala/jms4s/JmsClientSpec.scala index 8eed5746..c941bd0c 100644 --- a/tests/src/test/scala/jms4s/JmsClientSpec.scala +++ b/tests/src/test/scala/jms4s/JmsClientSpec.scala @@ -14,7 +14,7 @@ import jms4s.jms.JmsMessage import jms4s.model.SessionType import org.scalatest.freespec.AsyncFreeSpec -class JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { +trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { private val jmsClient = new JmsClient[IO] "High level api" - { diff --git a/tests/src/test/scala/jms4s/jms/JmsSpec.scala b/tests/src/test/scala/jms4s/jms/JmsSpec.scala index 4003890e..cc80c857 100644 --- a/tests/src/test/scala/jms4s/jms/JmsSpec.scala +++ b/tests/src/test/scala/jms4s/jms/JmsSpec.scala @@ -1,9 +1,12 @@ package jms4s.jms +import cats.data.NonEmptyList import cats.effect.testing.scalatest.AsyncIOSpec -import cats.effect.{ IO, Resource } +import cats.effect.{ Blocker, IO, Resource } import cats.implicits._ import jms4s.Jms4sBaseSpec +import jms4s.config._ +import jms4s.ibmmq.ibmMQ import jms4s.model.SessionType import org.scalatest.freespec.AsyncFreeSpec @@ -65,4 +68,28 @@ class JmsSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { } } } + + override def connectionRes: Resource[IO, JmsConnection[IO]] = + Blocker + .apply[IO] + .flatMap( + blocker => + ibmMQ.makeConnection[IO]( + Config( + qm = QueueManager("QM1"), + endpoints = NonEmptyList.one(Endpoint("localhost", 1414)), + // the current docker image seems to be misconfigured, so I need to use admin channel/auth in order to test topic + // but maybe it's just me not understanding something properly.. as usual + // channel = Channel("DEV.APP.SVRCONN"), + // username = Some(Username("app")), + // password = None, + channel = Channel("DEV.ADMIN.SVRCONN"), + username = Some(Username("admin")), + password = Some(Password("passw0rd")), + // password = Some(Password("admin")), + clientId = "jms-specs" + ), + blocker + ) + ) } From 8607612470dca473fda7b31024b952930736215c Mon Sep 17 00:00:00 2001 From: Alessandro Zoffoli Date: Sun, 5 Apr 2020 16:23:35 +0200 Subject: [PATCH 3/8] Switch to activemq-artemis --- .../main/scala/jms4s/activemq}/activeMQ.scala | 25 ++++++----- build.sbt | 18 ++++---- core/src/main/scala/jms4s/config/config.scala | 20 --------- docker-compose.yml | 7 +++- ibm-mq/src/main/scala/jms4s/ibmmq/ibmMQ.scala | 15 ++++++- .../jms4s/ActiveMQArtemisJmsClientSpec.scala | 5 +++ .../test/scala/jms4s/IbmMQJmsClientSpec.scala | 33 ++------------- .../src/test/scala/jms4s/JmsClientSpec.scala | 42 +++++++------------ .../jms4s/{ => basespec}/Jms4sBaseSpec.scala | 9 ++-- .../providers/ActiveMQArtemisBaseSpec.scala | 26 ++++++++++++ .../basespec/providers/IbmMQBaseSpec.scala | 33 +++++++++++++++ .../jms4s/jms/ActiveMQArtemisJmsSpec.scala | 5 +++ .../test/scala/jms4s/jms/IbmMQJmsSpec.scala | 5 +++ tests/src/test/scala/jms4s/jms/JmsSpec.scala | 39 ++++------------- 14 files changed, 148 insertions(+), 134 deletions(-) rename {active-mq/src/main/scala/jms4s => active-mq-artemis/src/main/scala/jms4s/activemq}/activeMQ.scala (74%) create mode 100644 tests/src/test/scala/jms4s/ActiveMQArtemisJmsClientSpec.scala rename tests/src/test/scala/jms4s/{ => basespec}/Jms4sBaseSpec.scala (92%) create mode 100644 tests/src/test/scala/jms4s/basespec/providers/ActiveMQArtemisBaseSpec.scala create mode 100644 tests/src/test/scala/jms4s/basespec/providers/IbmMQBaseSpec.scala create mode 100644 tests/src/test/scala/jms4s/jms/ActiveMQArtemisJmsSpec.scala create mode 100644 tests/src/test/scala/jms4s/jms/IbmMQJmsSpec.scala diff --git a/active-mq/src/main/scala/jms4s/activeMQ.scala b/active-mq-artemis/src/main/scala/jms4s/activemq/activeMQ.scala similarity index 74% rename from active-mq/src/main/scala/jms4s/activeMQ.scala rename to active-mq-artemis/src/main/scala/jms4s/activemq/activeMQ.scala index 1fe16bda..7f2291d6 100644 --- a/active-mq/src/main/scala/jms4s/activeMQ.scala +++ b/active-mq-artemis/src/main/scala/jms4s/activemq/activeMQ.scala @@ -1,27 +1,32 @@ -package jms4s +package jms4s.activemq import cats.data.NonEmptyList import cats.effect.{ Blocker, Resource, Sync } +import cats.implicits._ import io.chrisdavenport.log4cats.Logger import javax.jms.Connection -import jms4s.config.{ Config, Endpoint } import jms4s.jms.JmsConnection -import org.apache.activemq.ActiveMQConnectionFactory -import cats.implicits._ +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory object activeMQ { + case class Config( + endpoints: NonEmptyList[Endpoint], + username: Option[Username] = None, + password: Option[Password] = None, + clientId: String + ) + case class Username(value: String) extends AnyVal + case class Password(value: String) extends AnyVal + case class Endpoint(host: String, port: Int) + def makeConnection[F[_]: Sync: Logger](config: Config, blocker: Blocker): Resource[F, JmsConnection[F]] = for { connection <- Resource.make( Logger[F].info(s"Opening Connection to MQ at ${hosts(config.endpoints)}...") *> Sync[F].delay { val connectionFactory: ActiveMQConnectionFactory = - new ActiveMQConnectionFactory("tcp://localhost:61616") -// connectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT) -// connectionFactory.set(config.qm.value) -// connectionFactory.setConnectionNameList(hosts(config.endpoints)) -// connectionFactory.setChannel(config.channel.value) + new ActiveMQConnectionFactory(hosts(config.endpoints)) connectionFactory.setClientID(config.clientId) val connection: Connection = config.username.map { (username) => @@ -44,6 +49,6 @@ object activeMQ { } yield new JmsConnection[F](connection, blocker) private def hosts(endpoints: NonEmptyList[Endpoint]): String = - endpoints.map(e => s"${e.host}(${e.port})").toList.mkString(",") + endpoints.map(e => s"tcp://${e.host}:${e.port}").toList.mkString(",") } diff --git a/build.sbt b/build.sbt index 3dfd18ec..9a5971d6 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ val catsV = "2.0.0" val jmsV = "2.0.1" val ibmMQV = "9.1.4.0" -val activeMQV = "5.15.12" +val activeMQV = "2.11.0" val catsEffectV = "2.0.0" val catsEffectScalaTestV = "0.4.0" val fs2V = "2.0.0" @@ -15,7 +15,7 @@ val betterMonadicForV = "0.3.1" lazy val jms4s = project .in(file(".")) .enablePlugins(NoPublishPlugin) - .aggregate(core, ibmMQ, activeMQ, tests, examples, site) + .aggregate(core, ibmMQ, activeMQArtemis, tests, examples, site) lazy val core = project .in(file("core")) @@ -31,11 +31,11 @@ lazy val ibmMQ = project .settings(parallelExecution in Test := false) .dependsOn(core) -lazy val activeMQ = project - .in(file("active-mq")) +lazy val activeMQArtemis = project + .in(file("active-mq-artemis")) .settings(commonSettings) - .settings(name := "jms4s-active-mq") - .settings(libraryDependencies += "org.apache.activemq" % "activemq-all" % activeMQV) + .settings(name := "jms4s-active-mq-artemis") + .settings(libraryDependencies += "org.apache.activemq" % "artemis-jms-client-all" % activeMQV) .settings(parallelExecution in Test := false) .dependsOn(core) @@ -45,13 +45,13 @@ lazy val tests = project .enablePlugins(NoPublishPlugin) .settings(libraryDependencies += "org.apache.logging.log4j" % "log4j-slf4j-impl" % log4jSlf4jImplV % Runtime) .settings(parallelExecution in Test := false) - .dependsOn(ibmMQ, activeMQ) + .dependsOn(ibmMQ, activeMQArtemis) lazy val examples = project .in(file("examples")) .settings(commonSettings: _*) .enablePlugins(NoPublishPlugin) - .dependsOn(ibmMQ, activeMQ) + .dependsOn(ibmMQ, activeMQArtemis) lazy val site = project .in(file("site")) @@ -59,7 +59,7 @@ lazy val site = project .enablePlugins(MdocPlugin) .enablePlugins(NoPublishPlugin) .settings(commonSettings) - .dependsOn(core, ibmMQ, activeMQ) + .dependsOn(core, ibmMQ, activeMQArtemis) .settings { import microsites._ Seq( diff --git a/core/src/main/scala/jms4s/config/config.scala b/core/src/main/scala/jms4s/config/config.scala index e8b7f2c4..90f00e55 100644 --- a/core/src/main/scala/jms4s/config/config.scala +++ b/core/src/main/scala/jms4s/config/config.scala @@ -1,24 +1,8 @@ package jms4s.config import cats.Order -import cats.data.NonEmptyList import cats.implicits._ -case class Config( - qm: QueueManager, - endpoints: NonEmptyList[Endpoint], - channel: Channel, - username: Option[Username] = None, - password: Option[Password] = None, - clientId: String -) - -case class Username(value: String) extends AnyVal - -case class Password(value: String) extends AnyVal - -case class Endpoint(host: String, port: Int) - sealed trait DestinationName extends Product with Serializable case class QueueName(value: String) extends DestinationName case class TopicName(value: String) extends DestinationName @@ -28,7 +12,3 @@ object DestinationName { case (x, y) => Order[String].compare(x.toString, y.toString) } } - -case class QueueManager(value: String) extends AnyVal - -case class Channel(value: String) extends AnyVal diff --git a/docker-compose.yml b/docker-compose.yml index 579dfe07..3f340f53 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -29,7 +29,10 @@ ibmmq: activemq: restart: always - image: rmohr/activemq + image: vromero/activemq-artemis # https://github.com/vromero/activemq-artemis-docker/blob/master/README.md ports: - - "8161:8161" + - "8161:8161" # http://localhost:8161/console - "61616:61616" + environment: + - ARTEMIS_USERNAME=admin + - ARTEMIS_PASSWORD=passw0rd diff --git a/ibm-mq/src/main/scala/jms4s/ibmmq/ibmMQ.scala b/ibm-mq/src/main/scala/jms4s/ibmmq/ibmMQ.scala index 3ec325c7..dd5ccb67 100644 --- a/ibm-mq/src/main/scala/jms4s/ibmmq/ibmMQ.scala +++ b/ibm-mq/src/main/scala/jms4s/ibmmq/ibmMQ.scala @@ -6,11 +6,24 @@ import cats.implicits._ import com.ibm.mq.jms.MQConnectionFactory import com.ibm.msg.client.wmq.common.CommonConstants import io.chrisdavenport.log4cats.Logger -import jms4s.config.{ Config, Endpoint } import jms4s.jms.JmsConnection object ibmMQ { + case class Config( + qm: QueueManager, + endpoints: NonEmptyList[Endpoint], + channel: Channel, + username: Option[Username] = None, + password: Option[Password] = None, + clientId: String + ) + case class Username(value: String) extends AnyVal + case class Password(value: String) extends AnyVal + case class Endpoint(host: String, port: Int) + case class QueueManager(value: String) extends AnyVal + case class Channel(value: String) extends AnyVal + def makeConnection[F[_]: Sync: Logger](config: Config, blocker: Blocker): Resource[F, JmsConnection[F]] = for { connection <- Resource.make( diff --git a/tests/src/test/scala/jms4s/ActiveMQArtemisJmsClientSpec.scala b/tests/src/test/scala/jms4s/ActiveMQArtemisJmsClientSpec.scala new file mode 100644 index 00000000..5d7b9d12 --- /dev/null +++ b/tests/src/test/scala/jms4s/ActiveMQArtemisJmsClientSpec.scala @@ -0,0 +1,5 @@ +package jms4s + +import jms4s.basespec.providers.ActiveMQArtemisBaseSpec + +class ActiveMQArtemisJmsClientSpec extends JmsClientSpec with ActiveMQArtemisBaseSpec diff --git a/tests/src/test/scala/jms4s/IbmMQJmsClientSpec.scala b/tests/src/test/scala/jms4s/IbmMQJmsClientSpec.scala index 9a4cf051..eccddce6 100644 --- a/tests/src/test/scala/jms4s/IbmMQJmsClientSpec.scala +++ b/tests/src/test/scala/jms4s/IbmMQJmsClientSpec.scala @@ -1,32 +1,5 @@ package jms4s -import cats.data.NonEmptyList -import cats.effect.{ Blocker, IO, Resource } -import jms4s.config._ -import jms4s.ibmmq.ibmMQ -import jms4s.jms.JmsConnection -class IbmMQJmsClientSpec extends JmsClientSpec { - override def connectionRes: Resource[IO, JmsConnection[IO]] = - Blocker - .apply[IO] - .flatMap( - blocker => - ibmMQ.makeConnection[IO]( - Config( - qm = QueueManager("QM1"), - endpoints = NonEmptyList.one(Endpoint("localhost", 1414)), - // the current docker image seems to be misconfigured, so I need to use admin channel/auth in order to test topic - // but maybe it's just me not understanding something properly.. as usual - // channel = Channel("DEV.APP.SVRCONN"), - // username = Some(Username("app")), - // password = None, - channel = Channel("DEV.ADMIN.SVRCONN"), - username = Some(Username("admin")), - password = Some(Password("passw0rd")), -// password = Some(Password("admin")), - clientId = "jms-specs" - ), - blocker - ) - ) -} +import jms4s.basespec.providers.IbmMQBaseSpec + +class IbmMQJmsClientSpec extends JmsClientSpec with IbmMQBaseSpec diff --git a/tests/src/test/scala/jms4s/JmsClientSpec.scala b/tests/src/test/scala/jms4s/JmsClientSpec.scala index c941bd0c..bf23ddd7 100644 --- a/tests/src/test/scala/jms4s/JmsClientSpec.scala +++ b/tests/src/test/scala/jms4s/JmsClientSpec.scala @@ -10,6 +10,7 @@ import cats.implicits._ import jms4s.JmsAcknowledgerConsumer.AckAction import jms4s.JmsAutoAcknowledgerConsumer.AutoAckAction import jms4s.JmsTransactedConsumer.TransactionAction +import jms4s.basespec.Jms4sBaseSpec import jms4s.jms.JmsMessage import jms4s.model.SessionType import org.scalatest.freespec.AsyncFreeSpec @@ -24,7 +25,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { session <- connection.createSession(SessionType.AutoAcknowledge) queue <- Resource.liftF(session.createQueue(inputQueueName)) producer <- session.createProducer(queue) - messages <- Resource.liftF(bodies.toList.traverse(i => session.createTextMessage(i))) + messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) consumer <- jmsClient.createTransactedConsumer(connection, inputQueueName, poolSize) } yield (consumer, producer, bodies.toSet, messages) @@ -59,8 +60,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { inputProducer <- session.createProducer(inputQueue) outputConsumer1 <- session.createConsumer(outputQueue1) outputConsumer2 <- session.createConsumer(outputQueue2) - bodies = (0 until nMessages).map(i => s"$i") - messages <- Resource.liftF(bodies.toList.traverse(i => session.createTextMessage(i))) + messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) consumer <- jmsClient.createTransactedConsumerToProducers( connection, inputQueueName, @@ -102,7 +102,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { session <- connection.createSession(SessionType.AutoAcknowledge) queue <- Resource.liftF(session.createQueue(inputQueueName)) producer <- session.createProducer(queue) - messages <- Resource.liftF(bodies.toList.traverse(i => session.createTextMessage(i))) + messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) consumer <- jmsClient.createAcknowledgerConsumer(connection, inputQueueName, poolSize) } yield (consumer, producer, bodies.toSet, messages) @@ -137,8 +137,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { inputProducer <- session.createProducer(inputQueue) outputConsumer1 <- session.createConsumer(outputQueue1) outputConsumer2 <- session.createConsumer(outputQueue2) - bodies = (0 until nMessages).map(i => s"$i") - messages <- Resource.liftF(bodies.toList.traverse(i => session.createTextMessage(i))) + messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) consumer <- jmsClient.createAcknowledgerToProducers( connection, inputQueueName, @@ -180,7 +179,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { session <- connection.createSession(SessionType.AutoAcknowledge) queue <- Resource.liftF(session.createQueue(inputQueueName)) producer <- session.createProducer(queue) - messages <- Resource.liftF(bodies.toList.traverse(i => session.createTextMessage(i))) + messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) consumer <- jmsClient.createAutoAcknowledgerConsumer(connection, inputQueueName, poolSize) } yield (consumer, producer, bodies.toSet, messages) @@ -215,8 +214,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { inputProducer <- session.createProducer(inputQueue) outputConsumer1 <- session.createConsumer(outputQueue1) outputConsumer2 <- session.createConsumer(outputQueue2) - bodies = (0 until nMessages).map(i => s"$i") - messages <- Resource.liftF(bodies.toList.traverse(i => session.createTextMessage(i))) + messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) consumer <- jmsClient.createAutoAcknowledgerToProducers( connection, inputQueueName, @@ -260,8 +258,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { session <- connection.createSession(SessionType.AutoAcknowledge) outputQueue <- Resource.liftF(session.createQueue(outputQueueName1)) outputConsumer <- session.createConsumer(outputQueue) - bodies = (0 until nMessages).map(i => s"$i") - messages <- Resource.liftF(bodies.toList.traverse(i => session.createTextMessage(i))) + messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) producer <- jmsClient.createProducer( connection, outputQueueName1, @@ -289,8 +286,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { session <- connection.createSession(SessionType.AutoAcknowledge) outputQueue <- Resource.liftF(session.createQueue(outputQueueName1)) outputConsumer <- session.createConsumer(outputQueue) - bodies = (0 until nMessages).map(i => s"$i") - messages <- Resource.liftF(bodies.toList.traverse(i => session.createTextMessage(i))) + messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) producer <- jmsClient.createProducer( connection, outputQueueName1, @@ -318,8 +314,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { session <- connection.createSession(SessionType.AutoAcknowledge) outputTopic <- Resource.liftF(session.createTopic(topicName)) outputConsumer <- session.createConsumer(outputTopic) - bodies = (0 until nMessages).map(i => s"$i") - messages <- Resource.liftF(bodies.toList.traverse(i => session.createTextMessage(i))) + messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) producer <- jmsClient.createProducer( connection, topicName, @@ -347,8 +342,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { session <- connection.createSession(SessionType.AutoAcknowledge) outputTopic <- Resource.liftF(session.createTopic(topicName)) outputConsumer <- session.createConsumer(outputTopic) - bodies = (0 until nMessages).map(i => s"$i") - messages <- Resource.liftF(bodies.toList.traverse(i => session.createTextMessage(i))) + messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) producer <- jmsClient.createProducer( connection, topicName, @@ -378,8 +372,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { outputQueue2 <- Resource.liftF(session.createQueue(outputQueueName2)) outputConsumer <- session.createConsumer(outputQueue) outputConsumer2 <- session.createConsumer(outputQueue2) - bodies = (0 until nMessages).map(i => s"$i") - messages <- Resource.liftF(bodies.toList.traverse(i => session.createTextMessage(i))) + messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) producer <- jmsClient.createProducer( connection, poolSize @@ -414,8 +407,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { outputConsumer <- session.createConsumer(topic) topic2 <- Resource.liftF(session.createTopic(topicName2)) outputConsumer2 <- session.createConsumer(topic2) - bodies = (0 until nMessages).map(i => s"$i") - messages <- Resource.liftF(bodies.toList.traverse(i => session.createTextMessage(i))) + messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) producer <- jmsClient.createProducer( connection, poolSize @@ -457,16 +449,14 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { res.use { case (producer, outputConsumer, message) => for { - producerTimestamp <- Timer[IO].clock.realTime(TimeUnit.MILLISECONDS) - _ <- producer.sendWithDelay( - messageWithDelayFactory((message, (outputQueueName1, Some(delay)))) - ) + producerTimestamp <- Timer[IO].clock.realTime(TimeUnit.MILLISECONDS) + _ <- producer.sendWithDelay(messageWithDelayFactory((message, (outputQueueName1, Some(delay))))) _ <- logger.info(s"Pushed message with body: ${body}.") _ <- logger.info(s"Consumer to Producer started.\nCollecting messages from output queue...") receivedMessage: JmsMessage.JmsTextMessage[IO] <- receiveMessage(outputConsumer).timeout(timeout) actualBody <- receivedMessage.getText jmsDeliveryTime <- receivedMessage.getJMSDeliveryTime - actualDelay <- IO(jmsDeliveryTime - producerTimestamp) + actualDelay = jmsDeliveryTime - producerTimestamp } yield assert(actualDelay >= delay.toMillis && actualBody == body) } } diff --git a/tests/src/test/scala/jms4s/Jms4sBaseSpec.scala b/tests/src/test/scala/jms4s/basespec/Jms4sBaseSpec.scala similarity index 92% rename from tests/src/test/scala/jms4s/Jms4sBaseSpec.scala rename to tests/src/test/scala/jms4s/basespec/Jms4sBaseSpec.scala index 75a85c00..d9a2de1a 100644 --- a/tests/src/test/scala/jms4s/Jms4sBaseSpec.scala +++ b/tests/src/test/scala/jms4s/basespec/Jms4sBaseSpec.scala @@ -1,11 +1,12 @@ -package jms4s +package jms4s.basespec +import cats.data.NonEmptyList import cats.effect.concurrent.Ref import cats.effect.{ IO, Resource } import cats.implicits._ import io.chrisdavenport.log4cats.SelfAwareStructuredLogger import io.chrisdavenport.log4cats.slf4j.Slf4jLogger -import jms4s.config._ +import jms4s.config.{ DestinationName, QueueName, TopicName } import jms4s.jms.JmsMessage.JmsTextMessage import jms4s.jms.{ JmsConnection, JmsMessageConsumer, MessageFactory } @@ -17,10 +18,10 @@ trait Jms4sBaseSpec { def connectionRes: Resource[IO, JmsConnection[IO]] val nMessages: Int = 50 - val bodies: IndexedSeq[String] = (0 until nMessages).map(i => s"$i") + val bodies: List[String] = (0 until nMessages).map(i => s"$i").toList val poolSize: Int = 4 val timeout: FiniteDuration = 2.seconds - val delay: FiniteDuration = 500.millis + val delay: FiniteDuration = 200.millis val topicName: TopicName = TopicName("DEV.BASE.TOPIC") val topicName2: TopicName = TopicName("DEV.BASE.TOPIC.1") val inputQueueName: QueueName = QueueName("DEV.QUEUE.1") diff --git a/tests/src/test/scala/jms4s/basespec/providers/ActiveMQArtemisBaseSpec.scala b/tests/src/test/scala/jms4s/basespec/providers/ActiveMQArtemisBaseSpec.scala new file mode 100644 index 00000000..9e556bf8 --- /dev/null +++ b/tests/src/test/scala/jms4s/basespec/providers/ActiveMQArtemisBaseSpec.scala @@ -0,0 +1,26 @@ +package jms4s.basespec.providers + +import cats.data.NonEmptyList +import cats.effect.{ Blocker, IO, Resource } +import jms4s.activemq.activeMQ +import jms4s.activemq.activeMQ.{ Config, Endpoint, Password, Username } +import jms4s.basespec.Jms4sBaseSpec +import jms4s.jms.JmsConnection + +trait ActiveMQArtemisBaseSpec extends Jms4sBaseSpec { + override def connectionRes: Resource[IO, JmsConnection[IO]] = + Blocker + .apply[IO] + .flatMap( + blocker => + activeMQ.makeConnection[IO]( + Config( + endpoints = NonEmptyList.one(Endpoint("localhost", 61616)), + username = Some(Username("admin")), + password = Some(Password("passw0rd")), + clientId = "jms-specs" + ), + blocker + ) + ) +} diff --git a/tests/src/test/scala/jms4s/basespec/providers/IbmMQBaseSpec.scala b/tests/src/test/scala/jms4s/basespec/providers/IbmMQBaseSpec.scala new file mode 100644 index 00000000..4dc4d67b --- /dev/null +++ b/tests/src/test/scala/jms4s/basespec/providers/IbmMQBaseSpec.scala @@ -0,0 +1,33 @@ +package jms4s.basespec.providers + +import cats.data.NonEmptyList +import cats.effect.{ Blocker, IO, Resource } +import jms4s.basespec.Jms4sBaseSpec +import jms4s.ibmmq.ibmMQ +import jms4s.ibmmq.ibmMQ._ +import jms4s.jms.JmsConnection + +trait IbmMQBaseSpec extends Jms4sBaseSpec { + override def connectionRes: Resource[IO, JmsConnection[IO]] = + Blocker + .apply[IO] + .flatMap( + blocker => + ibmMQ.makeConnection[IO]( + Config( + qm = QueueManager("QM1"), + endpoints = NonEmptyList.one(Endpoint("localhost", 1414)), + // the current docker image seems to be misconfigured, so I need to use admin channel/auth in order to test topic + // but maybe it's just me not understanding something properly.. as usual + // channel = Channel("DEV.APP.SVRCONN"), + // username = Some(Username("app")), + // password = None, + channel = Channel("DEV.ADMIN.SVRCONN"), + username = Some(Username("admin")), + password = Some(Password("passw0rd")), + clientId = "jms-specs" + ), + blocker + ) + ) +} diff --git a/tests/src/test/scala/jms4s/jms/ActiveMQArtemisJmsSpec.scala b/tests/src/test/scala/jms4s/jms/ActiveMQArtemisJmsSpec.scala new file mode 100644 index 00000000..f74730b1 --- /dev/null +++ b/tests/src/test/scala/jms4s/jms/ActiveMQArtemisJmsSpec.scala @@ -0,0 +1,5 @@ +package jms4s.jms + +import jms4s.basespec.providers.ActiveMQArtemisBaseSpec + +class ActiveMQArtemisJmsSpec extends JmsSpec with ActiveMQArtemisBaseSpec diff --git a/tests/src/test/scala/jms4s/jms/IbmMQJmsSpec.scala b/tests/src/test/scala/jms4s/jms/IbmMQJmsSpec.scala new file mode 100644 index 00000000..532711e5 --- /dev/null +++ b/tests/src/test/scala/jms4s/jms/IbmMQJmsSpec.scala @@ -0,0 +1,5 @@ +package jms4s.jms + +import jms4s.basespec.providers.IbmMQBaseSpec + +class IbmMQJmsSpec extends JmsSpec with IbmMQBaseSpec diff --git a/tests/src/test/scala/jms4s/jms/JmsSpec.scala b/tests/src/test/scala/jms4s/jms/JmsSpec.scala index cc80c857..70de00c0 100644 --- a/tests/src/test/scala/jms4s/jms/JmsSpec.scala +++ b/tests/src/test/scala/jms4s/jms/JmsSpec.scala @@ -1,18 +1,17 @@ package jms4s.jms -import cats.data.NonEmptyList +import java.util.concurrent.TimeUnit + import cats.effect.testing.scalatest.AsyncIOSpec -import cats.effect.{ Blocker, IO, Resource } +import cats.effect.{ IO, Resource, Timer } import cats.implicits._ -import jms4s.Jms4sBaseSpec -import jms4s.config._ -import jms4s.ibmmq.ibmMQ +import jms4s.basespec.Jms4sBaseSpec import jms4s.model.SessionType import org.scalatest.freespec.AsyncFreeSpec import scala.concurrent.duration._ -class JmsSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { +trait JmsSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { val expectedBody = "body" "Basic jms ops" - { @@ -48,13 +47,13 @@ class JmsSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { case (consumer, producer, msg) => for { _ <- producer.setDeliveryDelay(delay) - producerTimestamp <- IO(System.currentTimeMillis()) + producerTimestamp <- Timer[IO].clock.realTime(TimeUnit.MILLISECONDS) _ <- producer.send(msg) msg <- consumer.receiveJmsMessage tm <- msg.asJmsTextMessage body <- tm.getText jmsDeliveryTime <- tm.getJMSDeliveryTime - producerDelay <- IO(jmsDeliveryTime - producerTimestamp) + producerDelay = jmsDeliveryTime - producerTimestamp } yield assert(producerDelay >= delay.toMillis && body == expectedBody) } } @@ -68,28 +67,4 @@ class JmsSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { } } } - - override def connectionRes: Resource[IO, JmsConnection[IO]] = - Blocker - .apply[IO] - .flatMap( - blocker => - ibmMQ.makeConnection[IO]( - Config( - qm = QueueManager("QM1"), - endpoints = NonEmptyList.one(Endpoint("localhost", 1414)), - // the current docker image seems to be misconfigured, so I need to use admin channel/auth in order to test topic - // but maybe it's just me not understanding something properly.. as usual - // channel = Channel("DEV.APP.SVRCONN"), - // username = Some(Username("app")), - // password = None, - channel = Channel("DEV.ADMIN.SVRCONN"), - username = Some(Username("admin")), - password = Some(Password("passw0rd")), - // password = Some(Password("admin")), - clientId = "jms-specs" - ), - blocker - ) - ) } From f99b05f2ec4b0f4796f5bbef2c86dc84f0420071 Mon Sep 17 00:00:00 2001 From: Alessandro Zoffoli Date: Tue, 7 Apr 2020 10:03:33 +0200 Subject: [PATCH 4/8] Fix 'Invalid concurrent session usage.' warning --- .../src/test/scala/jms4s/JmsClientSpec.scala | 119 +++++++----------- tests/src/test/scala/jms4s/jms/JmsSpec.scala | 8 +- 2 files changed, 52 insertions(+), 75 deletions(-) diff --git a/tests/src/test/scala/jms4s/JmsClientSpec.scala b/tests/src/test/scala/jms4s/JmsClientSpec.scala index bf23ddd7..46a0b931 100644 --- a/tests/src/test/scala/jms4s/JmsClientSpec.scala +++ b/tests/src/test/scala/jms4s/JmsClientSpec.scala @@ -11,8 +11,8 @@ import jms4s.JmsAcknowledgerConsumer.AckAction import jms4s.JmsAutoAcknowledgerConsumer.AutoAckAction import jms4s.JmsTransactedConsumer.TransactionAction import jms4s.basespec.Jms4sBaseSpec -import jms4s.jms.JmsMessage import jms4s.model.SessionType +import jms4s.model.SessionType.AutoAcknowledge import org.scalatest.freespec.AsyncFreeSpec trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { @@ -22,7 +22,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { s"publish $nMessages messages and then consume them concurrently with local transactions" in { val res = for { connection <- connectionRes - session <- connection.createSession(SessionType.AutoAcknowledge) + session <- connection.createSession(AutoAcknowledge) queue <- Resource.liftF(session.createQueue(inputQueueName)) producer <- session.createProducer(queue) messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) @@ -42,7 +42,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { _ <- received.update(_ + body) } yield TransactionAction.commit }.start - _ <- logger.info(s"Consumer started.\nCollecting messages from the queue...") + _ <- logger.info(s"Consumer started. Collecting messages from the queue...") receivedMessages <- (received.get.iterateUntil(_.eqv(bodies)).timeout(timeout) >> received.get) .guarantee(consumerFiber.cancel) } yield assert(receivedMessages == bodies) @@ -53,13 +53,13 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { val res = for { connection <- connectionRes - session <- connection.createSession(SessionType.AutoAcknowledge) + session <- connection.createSession(AutoAcknowledge) inputQueue <- Resource.liftF(session.createQueue(inputQueueName)) outputQueue1 <- Resource.liftF(session.createQueue(outputQueueName1)) outputQueue2 <- Resource.liftF(session.createQueue(outputQueueName2)) - inputProducer <- session.createProducer(inputQueue) - outputConsumer1 <- session.createConsumer(outputQueue1) - outputConsumer2 <- session.createConsumer(outputQueue2) + inputProducer <- connection.createSession(AutoAcknowledge).flatMap(_.createProducer(inputQueue)) + outputConsumer1 <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(outputQueue1)) + outputConsumer2 <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(outputQueue2)) messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) consumer <- jmsClient.createTransactedConsumerToProducers( connection, @@ -83,7 +83,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { TransactionAction.send[IO](messageFactory(tm, outputQueueName1)) else TransactionAction.send[IO](messageFactory(tm, outputQueueName2)) }.start - _ <- logger.info(s"Consumer to Producer started.\nCollecting messages from output queues...") + _ <- logger.info(s"Consumer to Producer started. Collecting messages from output queues...") received1 <- Ref.of[IO, Set[String]](Set()) received2 <- Ref.of[IO, Set[String]](Set()) receivedMessages <- (( @@ -99,7 +99,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { val res = for { connection <- connectionRes - session <- connection.createSession(SessionType.AutoAcknowledge) + session <- connection.createSession(AutoAcknowledge) queue <- Resource.liftF(session.createQueue(inputQueueName)) producer <- session.createProducer(queue) messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) @@ -119,7 +119,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { _ <- received.update(_ + body) } yield AckAction.ack }.start - _ <- logger.info(s"Consumer started.\nCollecting messages from the queue...") + _ <- logger.info(s"Consumer started. Collecting messages from the queue...") receivedMessages <- (received.get.iterateUntil(_.eqv(bodies)).timeout(timeout) >> received.get) .guarantee(consumerFiber.cancel) } yield assert(receivedMessages == bodies) @@ -130,14 +130,15 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { val res = for { connection <- connectionRes - session <- connection.createSession(SessionType.AutoAcknowledge) + session <- connection.createSession(AutoAcknowledge) inputQueue <- Resource.liftF(session.createQueue(inputQueueName)) outputQueue1 <- Resource.liftF(session.createQueue(outputQueueName1)) outputQueue2 <- Resource.liftF(session.createQueue(outputQueueName2)) - inputProducer <- session.createProducer(inputQueue) - outputConsumer1 <- session.createConsumer(outputQueue1) - outputConsumer2 <- session.createConsumer(outputQueue2) - messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) + inputProducer <- connection.createSession(AutoAcknowledge).flatMap(_.createProducer(inputQueue)) + outputConsumer1 <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(outputQueue1)) + outputConsumer2 <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(outputQueue2)) + bodies = (0 until nMessages).map(i => s"$i") + messages <- Resource.liftF(bodies.toList.traverse(i => session.createTextMessage(i))) consumer <- jmsClient.createAcknowledgerToProducers( connection, inputQueueName, @@ -160,7 +161,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { AckAction.send[IO](messageFactory(tm, outputQueueName1)) else AckAction.send[IO](messageFactory(tm, outputQueueName2)) }.start - _ <- logger.info(s"Consumer to Producer started.\nCollecting messages from output queues...") + _ <- logger.info(s"Consumer to Producer started. Collecting messages from output queues...") received1 <- Ref.of[IO, Set[String]](Set()) received2 <- Ref.of[IO, Set[String]](Set()) receivedMessages <- (( @@ -176,7 +177,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { val res = for { connection <- connectionRes - session <- connection.createSession(SessionType.AutoAcknowledge) + session <- connection.createSession(AutoAcknowledge) queue <- Resource.liftF(session.createQueue(inputQueueName)) producer <- session.createProducer(queue) messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) @@ -196,7 +197,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { _ <- received.update(_ + body) } yield AutoAckAction.noOp }.start - _ <- logger.info(s"Consumer started.\nCollecting messages from the queue...") + _ <- logger.info(s"Consumer started. Collecting messages from the queue...") receivedMessages <- (received.get.iterateUntil(_.eqv(bodies)).timeout(timeout) >> received.get) .guarantee(consumerFiber.cancel) } yield assert(receivedMessages == bodies) @@ -207,13 +208,13 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { val res = for { connection <- connectionRes - session <- connection.createSession(SessionType.AutoAcknowledge) + session <- connection.createSession(AutoAcknowledge) inputQueue <- Resource.liftF(session.createQueue(inputQueueName)) outputQueue1 <- Resource.liftF(session.createQueue(outputQueueName1)) outputQueue2 <- Resource.liftF(session.createQueue(outputQueueName2)) - inputProducer <- session.createProducer(inputQueue) - outputConsumer1 <- session.createConsumer(outputQueue1) - outputConsumer2 <- session.createConsumer(outputQueue2) + inputProducer <- connection.createSession(AutoAcknowledge).flatMap(_.createProducer(inputQueue)) + outputConsumer1 <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(outputQueue1)) + outputConsumer2 <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(outputQueue2)) messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) consumer <- jmsClient.createAutoAcknowledgerToProducers( connection, @@ -237,7 +238,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { AutoAckAction.send[IO](messageFactory(tm, outputQueueName1)) else AutoAckAction.send[IO](messageFactory(tm, outputQueueName2)) }.start - _ <- logger.info(s"Consumer to Producer started.\nCollecting messages from output queues...") + _ <- logger.info(s"Consumer to Producer started. Collecting messages from output queues...") received1 <- Ref.of[IO, Set[String]](Set()) received2 <- Ref.of[IO, Set[String]](Set()) receivedMessages <- (( @@ -259,11 +260,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { outputQueue <- Resource.liftF(session.createQueue(outputQueueName1)) outputConsumer <- session.createConsumer(outputQueue) messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) - producer <- jmsClient.createProducer( - connection, - outputQueueName1, - poolSize - ) + producer <- jmsClient.createProducer(connection, outputQueueName1, poolSize) } yield (producer, outputConsumer, bodies.toSet, messages) res.use { @@ -287,11 +284,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { outputQueue <- Resource.liftF(session.createQueue(outputQueueName1)) outputConsumer <- session.createConsumer(outputQueue) messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) - producer <- jmsClient.createProducer( - connection, - outputQueueName1, - poolSize - ) + producer <- jmsClient.createProducer(connection, outputQueueName1, poolSize) } yield (producer, outputConsumer, bodies.toSet, messages) res.use { @@ -315,11 +308,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { outputTopic <- Resource.liftF(session.createTopic(topicName)) outputConsumer <- session.createConsumer(outputTopic) messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) - producer <- jmsClient.createProducer( - connection, - topicName, - poolSize - ) + producer <- jmsClient.createProducer(connection, topicName, poolSize) } yield (producer, outputConsumer, bodies.toSet, messages) res.use { @@ -343,11 +332,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { outputTopic <- Resource.liftF(session.createTopic(topicName)) outputConsumer <- session.createConsumer(outputTopic) messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) - producer <- jmsClient.createProducer( - connection, - topicName, - poolSize - ) + producer <- jmsClient.createProducer(connection, topicName, poolSize) } yield (producer, outputConsumer, bodies.toSet, messages) res.use { @@ -373,20 +358,18 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { outputConsumer <- session.createConsumer(outputQueue) outputConsumer2 <- session.createConsumer(outputQueue2) messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) - producer <- jmsClient.createProducer( - connection, - poolSize - ) + producer <- jmsClient.createProducer(connection, poolSize) } yield (producer, outputConsumer, outputConsumer2, bodies.toSet, messages) res.use { case (producer, outputConsumer, outputConsumer2, bodies, messages) => for { - _ <- messages.parTraverse_(msg => { - producer.send(messageFactory(msg, outputQueueName1)) *> producer.send( - messageFactory(msg, outputQueueName2) - ) - }) + _ <- messages.parTraverse_( + msg => + producer.send(messageFactory(msg, outputQueueName1)) *> producer.send( + messageFactory(msg, outputQueueName2) + ) + ) _ <- logger.info(s"Pushed ${messages.size} messages.") _ <- logger.info(s"Consumer to Producer started.\nCollecting messages from output queue...") firstBatch <- Ref.of[IO, Set[String]](Set()) @@ -408,18 +391,15 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { topic2 <- Resource.liftF(session.createTopic(topicName2)) outputConsumer2 <- session.createConsumer(topic2) messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) - producer <- jmsClient.createProducer( - connection, - poolSize - ) + producer <- jmsClient.createProducer(connection, poolSize) } yield (producer, outputConsumer, outputConsumer2, bodies.toSet, messages) res.use { case (producer, outputConsumer, outputConsumer2, bodies, messages) => for { - _ <- messages.parTraverse_(msg => { - producer.send(messageFactory(msg, topicName)) *> producer.send(messageFactory(msg, topicName2)) - }) + _ <- messages.parTraverse_( + msg => producer.send(messageFactory(msg, topicName)) *> producer.send(messageFactory(msg, topicName2)) + ) _ <- logger.info(s"Pushed ${messages.size} messages.") _ <- logger.info(s"Consumer to Producer started.\nCollecting messages from output queue...") firstBatch <- Ref.of[IO, Set[String]](Set()) @@ -440,23 +420,20 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { outputQueue <- Resource.liftF(session.createQueue(outputQueueName1)) outputConsumer <- session.createConsumer(outputQueue) message <- Resource.liftF(session.createTextMessage(body)) - producer <- jmsClient.createProducer( - connection, - poolSize - ) + producer <- jmsClient.createProducer(connection, poolSize) } yield (producer, outputConsumer, message) res.use { case (producer, outputConsumer, message) => for { - producerTimestamp <- Timer[IO].clock.realTime(TimeUnit.MILLISECONDS) - _ <- producer.sendWithDelay(messageWithDelayFactory((message, (outputQueueName1, Some(delay))))) - _ <- logger.info(s"Pushed message with body: ${body}.") - _ <- logger.info(s"Consumer to Producer started.\nCollecting messages from output queue...") - receivedMessage: JmsMessage.JmsTextMessage[IO] <- receiveMessage(outputConsumer).timeout(timeout) - actualBody <- receivedMessage.getText - jmsDeliveryTime <- receivedMessage.getJMSDeliveryTime - actualDelay = jmsDeliveryTime - producerTimestamp + producerTimestamp <- Timer[IO].clock.realTime(TimeUnit.MILLISECONDS) + _ <- producer.sendWithDelay(messageWithDelayFactory((message, (outputQueueName1, Some(delay))))) + _ <- logger.info(s"Pushed message with body: ${body}.") + _ <- logger.info(s"Consumer to Producer started.\nCollecting messages from output queue...") + receivedMessage <- receiveMessage(outputConsumer).timeout(timeout) + actualBody <- receivedMessage.getText + jmsDeliveryTime <- receivedMessage.getJMSDeliveryTime + actualDelay = jmsDeliveryTime - producerTimestamp } yield assert(actualDelay >= delay.toMillis && actualBody == body) } } diff --git a/tests/src/test/scala/jms4s/jms/JmsSpec.scala b/tests/src/test/scala/jms4s/jms/JmsSpec.scala index 70de00c0..36d330a3 100644 --- a/tests/src/test/scala/jms4s/jms/JmsSpec.scala +++ b/tests/src/test/scala/jms4s/jms/JmsSpec.scala @@ -19,8 +19,8 @@ trait JmsSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { connection <- connectionRes session <- connection.createSession(SessionType.AutoAcknowledge) queue <- Resource.liftF(session.createQueue(inputQueueName)) - queueConsumer <- session.createConsumer(queue) - queueProducer <- session.createProducer(queue) + queueConsumer <- connection.createSession(SessionType.AutoAcknowledge).flatMap(_.createConsumer(queue)) + queueProducer <- connection.createSession(SessionType.AutoAcknowledge).flatMap(_.createProducer(queue)) msg <- Resource.liftF(session.createTextMessage(expectedBody)) } yield (queueConsumer, queueProducer, msg) @@ -28,8 +28,8 @@ trait JmsSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { connection <- connectionRes session <- connection.createSession(SessionType.AutoAcknowledge) topic <- Resource.liftF(session.createTopic(topicName)) - topicConsumer <- session.createConsumer(topic) - topicProducer <- session.createProducer(topic) + topicConsumer <- connection.createSession(SessionType.AutoAcknowledge).flatMap(_.createConsumer(topic)) + topicProducer <- connection.createSession(SessionType.AutoAcknowledge).flatMap(_.createProducer(topic)) msg <- Resource.liftF(session.createTextMessage(expectedBody)) } yield (topicConsumer, topicProducer, msg) From 06ede9f0782f0ce43861c8cddd4393254d39ce08 Mon Sep 17 00:00:00 2001 From: Alessandro Zoffoli Date: Tue, 7 Apr 2020 13:06:57 +0200 Subject: [PATCH 5/8] Cleanup --- .../scala/jms4s/JmsAcknowledgerConsumer.scala | 13 +- .../main/scala/jms4s/JmsPooledProducer.scala | 22 +- .../scala/jms4s/JmsTransactedConsumer.scala | 15 +- .../jms4s/JmsUnidentifiedPooledProducer.scala | 7 +- .../src/test/scala/jms4s/JmsClientSpec.scala | 500 +++++++++--------- .../scala/jms4s/basespec/Jms4sBaseSpec.scala | 1 + tests/src/test/scala/jms4s/jms/JmsSpec.scala | 95 ++-- 7 files changed, 314 insertions(+), 339 deletions(-) diff --git a/core/src/main/scala/jms4s/JmsAcknowledgerConsumer.scala b/core/src/main/scala/jms4s/JmsAcknowledgerConsumer.scala index 994a6451..4dde7677 100644 --- a/core/src/main/scala/jms4s/JmsAcknowledgerConsumer.scala +++ b/core/src/main/scala/jms4s/JmsAcknowledgerConsumer.scala @@ -10,7 +10,6 @@ import jms4s.JmsAcknowledgerConsumer.AckAction import jms4s.JmsAcknowledgerConsumer.JmsAcknowledgerConsumerPool.JmsResource import jms4s.config.DestinationName import jms4s.jms.{ JmsConnection, JmsMessage, JmsMessageConsumer, MessageFactory } -import jms4s.model.SessionType import jms4s.model.SessionType.ClientAcknowledge import scala.concurrent.duration.FiniteDuration @@ -49,7 +48,7 @@ object JmsAcknowledgerConsumer { for { inputDestination <- Resource.liftF( connection - .createSession(SessionType.ClientAcknowledge) + .createSession(ClientAcknowledge) .use(_.createDestination(inputDestinationName)) ) outputDestinations <- Resource.liftF( @@ -57,7 +56,7 @@ object JmsAcknowledgerConsumer { .traverse( outputDestinationName => connection - .createSession(SessionType.ClientAcknowledge) + .createSession(ClientAcknowledge) .use(_.createDestination(outputDestinationName)) .map(jmsDestination => (outputDestinationName, jmsDestination)) ) @@ -65,7 +64,7 @@ object JmsAcknowledgerConsumer { queue <- Resource.liftF(Queue.bounded[F, JmsResource[F]](concurrencyLevel)) _ <- (0 until concurrencyLevel).toList.traverse_ { _ => for { - session <- connection.createSession(SessionType.ClientAcknowledge) + session <- connection.createSession(ClientAcknowledge) consumer <- session.createConsumer(inputDestination) producers <- outputDestinations.traverse { case (outputDestinationName, outputDestination) => @@ -89,18 +88,18 @@ object JmsAcknowledgerConsumer { for { inputDestination <- Resource.liftF( connection - .createSession(SessionType.ClientAcknowledge) + .createSession(ClientAcknowledge) .use(_.createDestination(inputDestinationName)) ) outputDestination <- Resource.liftF( connection - .createSession(SessionType.ClientAcknowledge) + .createSession(ClientAcknowledge) .use(_.createDestination(outputDestinationName)) ) pool <- Resource.liftF(Queue.bounded[F, JmsResource[F]](concurrencyLevel)) _ <- (0 until concurrencyLevel).toList.traverse_ { _ => for { - session <- connection.createSession(SessionType.ClientAcknowledge) + session <- connection.createSession(ClientAcknowledge) consumer <- session.createConsumer(inputDestination) jmsProducer <- session.createProducer(outputDestination) producer = Map(outputDestinationName -> new JmsProducer(jmsProducer)) diff --git a/core/src/main/scala/jms4s/JmsPooledProducer.scala b/core/src/main/scala/jms4s/JmsPooledProducer.scala index ee5167c4..d4c1ce40 100644 --- a/core/src/main/scala/jms4s/JmsPooledProducer.scala +++ b/core/src/main/scala/jms4s/JmsPooledProducer.scala @@ -5,8 +5,9 @@ import cats.effect.{ Concurrent, ContextShift, Resource, Sync } import fs2.concurrent.Queue import jms4s.config.DestinationName import jms4s.jms._ -import jms4s.model.SessionType import cats.implicits._ +import jms4s.model.SessionType.AutoAcknowledge + import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration._ @@ -41,7 +42,7 @@ object JmsPooledProducer { ) _ <- (0 until concurrencyLevel).toList.traverse_ { _ => for { - session <- connection.createSession(SessionType.AutoAcknowledge) + session <- connection.createSession(AutoAcknowledge) destination <- Resource.liftF(session.createDestination(queue)) producer <- session.createProducer(destination) _ <- Resource.liftF(pool.enqueue1(JmsResource(session, producer, new MessageFactory(session)))) @@ -54,14 +55,8 @@ object JmsPooledProducer { for { resources <- pool.dequeue1 messages <- f(resources.messageFactory) - _ <- messages.traverse_( - message => - for { - _ <- resources.producer - .send(message) - } yield () - ) - _ <- pool.enqueue1(resources) + _ <- messages.traverse_(message => resources.producer.send(message)) + _ <- pool.enqueue1(resources) } yield () override def sendNWithDelay( @@ -104,11 +99,8 @@ object JmsPooledProducer { for { resources <- pool.dequeue1 message <- f(resources.messageFactory) - _ <- for { - _ <- resources.producer - .send(message) - } yield () - _ <- pool.enqueue1(resources) + _ <- resources.producer.send(message) + _ <- pool.enqueue1(resources) } yield () } diff --git a/core/src/main/scala/jms4s/JmsTransactedConsumer.scala b/core/src/main/scala/jms4s/JmsTransactedConsumer.scala index f1cb012b..e01a9605 100644 --- a/core/src/main/scala/jms4s/JmsTransactedConsumer.scala +++ b/core/src/main/scala/jms4s/JmsTransactedConsumer.scala @@ -10,7 +10,6 @@ import jms4s.JmsTransactedConsumer.JmsTransactedConsumerPool.{ JmsResource, Rece import jms4s.JmsTransactedConsumer.TransactionAction import jms4s.config.DestinationName import jms4s.jms._ -import jms4s.model.SessionType import jms4s.model.SessionType.Transacted import scala.concurrent.duration.FiniteDuration @@ -31,7 +30,7 @@ object JmsTransactedConsumer { pool <- Resource.liftF(Queue.bounded[F, JmsResource[F]](concurrencyLevel)) _ <- (0 until concurrencyLevel).toList.traverse_ { _ => for { - session <- connection.createSession(SessionType.Transacted) + session <- connection.createSession(Transacted) consumer <- session.createConsumer(input) _ <- Resource.liftF( pool.enqueue1(JmsResource(session, consumer, Map.empty, new MessageFactory[F](session))) @@ -49,7 +48,7 @@ object JmsTransactedConsumer { for { inputDestination <- Resource.liftF( connection - .createSession(SessionType.Transacted) + .createSession(Transacted) .use(_.createDestination(inputDestinationName)) ) outputDestinations <- Resource.liftF( @@ -57,7 +56,7 @@ object JmsTransactedConsumer { .traverse( outputDestinationName => connection - .createSession(SessionType.Transacted) + .createSession(Transacted) .use(_.createDestination(outputDestinationName)) .map(jmsDestination => (outputDestinationName, jmsDestination)) ) @@ -67,7 +66,7 @@ object JmsTransactedConsumer { ) _ <- (0 until concurrencyLevel).toList.traverse_ { _ => for { - session <- connection.createSession(SessionType.Transacted) + session <- connection.createSession(Transacted) consumer <- session.createConsumer(inputDestination) producers <- outputDestinations.traverse { case (outputDestinationName, outputDestination) => @@ -91,18 +90,18 @@ object JmsTransactedConsumer { for { inputDestination <- Resource.liftF( connection - .createSession(SessionType.Transacted) + .createSession(Transacted) .use(_.createDestination(inputDestinationName)) ) outputDestination <- Resource.liftF( connection - .createSession(SessionType.Transacted) + .createSession(Transacted) .use(_.createDestination(outputDestinationName)) ) pool <- Resource.liftF(Queue.bounded[F, JmsResource[F]](concurrencyLevel)) _ <- (0 until concurrencyLevel).toList.traverse_ { _ => for { - session <- connection.createSession(SessionType.Transacted) + session <- connection.createSession(Transacted) consumer <- session.createConsumer(inputDestination) jmsProducer <- session.createProducer(outputDestination) producer = Map(outputDestinationName -> new JmsProducer(jmsProducer)) diff --git a/core/src/main/scala/jms4s/JmsUnidentifiedPooledProducer.scala b/core/src/main/scala/jms4s/JmsUnidentifiedPooledProducer.scala index 64511aa2..fd12ec73 100644 --- a/core/src/main/scala/jms4s/JmsUnidentifiedPooledProducer.scala +++ b/core/src/main/scala/jms4s/JmsUnidentifiedPooledProducer.scala @@ -6,10 +6,9 @@ import cats.implicits._ import fs2.concurrent.Queue import jms4s.config.DestinationName import jms4s.jms._ -import jms4s.model.SessionType +import jms4s.model.SessionType.AutoAcknowledge -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.duration._ +import scala.concurrent.duration.{ FiniteDuration, _ } trait JmsUnidentifiedPooledProducer[F[_]] { @@ -41,7 +40,7 @@ object JmsUnidentifiedPooledProducer { ) _ <- (0 until concurrencyLevel).toList.traverse_ { _ => for { - session <- connection.createSession(SessionType.AutoAcknowledge) + session <- connection.createSession(AutoAcknowledge) producer <- session.createUnidentifiedProducer _ <- Resource.liftF(pool.enqueue1(JmsResource(session, producer, new MessageFactory(session)))) } yield () diff --git a/tests/src/test/scala/jms4s/JmsClientSpec.scala b/tests/src/test/scala/jms4s/JmsClientSpec.scala index 46a0b931..55ae1c56 100644 --- a/tests/src/test/scala/jms4s/JmsClientSpec.scala +++ b/tests/src/test/scala/jms4s/JmsClientSpec.scala @@ -5,260 +5,257 @@ import java.util.concurrent.TimeUnit import cats.data.NonEmptyList import cats.effect.concurrent.Ref import cats.effect.testing.scalatest.AsyncIOSpec -import cats.effect.{ IO, Resource, Sync, Timer } +import cats.effect.{ IO, Resource, Timer } +import jms4s.basespec.Jms4sBaseSpec +import jms4s.model.SessionType.AutoAcknowledge +import org.scalatest.freespec.AsyncFreeSpec import cats.implicits._ import jms4s.JmsAcknowledgerConsumer.AckAction import jms4s.JmsAutoAcknowledgerConsumer.AutoAckAction import jms4s.JmsTransactedConsumer.TransactionAction -import jms4s.basespec.Jms4sBaseSpec -import jms4s.model.SessionType -import jms4s.model.SessionType.AutoAcknowledge -import org.scalatest.freespec.AsyncFreeSpec trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { private val jmsClient = new JmsClient[IO] - "High level api" - { - s"publish $nMessages messages and then consume them concurrently with local transactions" in { - val res = for { - connection <- connectionRes - session <- connection.createSession(AutoAcknowledge) - queue <- Resource.liftF(session.createQueue(inputQueueName)) - producer <- session.createProducer(queue) - messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) - consumer <- jmsClient.createTransactedConsumer(connection, inputQueueName, poolSize) - } yield (consumer, producer, bodies.toSet, messages) - - res.use { - case (consumer, producer, bodies, messages) => - for { - _ <- messages.traverse_(msg => producer.send(msg)) - _ <- logger.info(s"Pushed ${messages.size} messages.") - received <- Ref.of[IO, Set[String]](Set()) - consumerFiber <- consumer.handle { message => - for { - tm <- message.asJmsTextMessage - body <- tm.getText - _ <- received.update(_ + body) - } yield TransactionAction.commit - }.start - _ <- logger.info(s"Consumer started. Collecting messages from the queue...") - receivedMessages <- (received.get.iterateUntil(_.eqv(bodies)).timeout(timeout) >> received.get) - .guarantee(consumerFiber.cancel) - } yield assert(receivedMessages == bodies) - } + s"publish $nMessages messages and then consume them concurrently with local transactions" in { + val res = for { + connection <- connectionRes + session <- connection.createSession(AutoAcknowledge) + queue <- Resource.liftF(session.createQueue(inputQueueName)) + producer <- connection.createSession(AutoAcknowledge).flatMap(_.createProducer(queue)) + messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) + consumer <- jmsClient.createTransactedConsumer(connection, inputQueueName, poolSize) + } yield (consumer, producer, bodies.toSet, messages) + + res.use { + case (consumer, producer, bodies, messages) => + for { + _ <- messages.traverse_(msg => producer.send(msg)) + _ <- logger.info(s"Pushed ${messages.size} messages.") + received <- Ref.of[IO, Set[String]](Set()) + consumerFiber <- consumer.handle { message => + for { + tm <- message.asJmsTextMessage + body <- tm.getText + _ <- received.update(_ + body) + } yield TransactionAction.commit + }.start + _ <- logger.info(s"Consumer started. Collecting messages from the queue...") + receivedMessages <- (received.get.iterateUntil(_.eqv(bodies)).timeout(timeout) >> received.get) + .guarantee(consumerFiber.cancel) + } yield assert(receivedMessages == bodies) } + } - s"publish $nMessages messages, consume them concurrently with local transactions and then republishing to other queues" in { - - val res = for { - connection <- connectionRes - session <- connection.createSession(AutoAcknowledge) - inputQueue <- Resource.liftF(session.createQueue(inputQueueName)) - outputQueue1 <- Resource.liftF(session.createQueue(outputQueueName1)) - outputQueue2 <- Resource.liftF(session.createQueue(outputQueueName2)) - inputProducer <- connection.createSession(AutoAcknowledge).flatMap(_.createProducer(inputQueue)) - outputConsumer1 <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(outputQueue1)) - outputConsumer2 <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(outputQueue2)) - messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) - consumer <- jmsClient.createTransactedConsumerToProducers( - connection, - inputQueueName, - NonEmptyList.of(outputQueueName1, outputQueueName2), - poolSize - ) - } yield (consumer, inputProducer, outputConsumer1, outputConsumer2, bodies.toSet, messages) - - res.use { - case (consumer, inputProducer, outputConsumer1, outputConsumer2, bodies, messages) => - for { - _ <- messages.traverse_(msg => inputProducer.send(msg)) - _ <- logger.info(s"Pushed ${messages.size} messages.") - consumerToProducerFiber <- consumer.handle { message => - for { - tm <- message.asJmsTextMessage - text <- tm.getText - } yield - if (text.toInt % 2 == 0) - TransactionAction.send[IO](messageFactory(tm, outputQueueName1)) - else TransactionAction.send[IO](messageFactory(tm, outputQueueName2)) - }.start - _ <- logger.info(s"Consumer to Producer started. Collecting messages from output queues...") - received1 <- Ref.of[IO, Set[String]](Set()) - received2 <- Ref.of[IO, Set[String]](Set()) - receivedMessages <- (( - receiveUntil(outputConsumer1, received1, nMessages / 2), - receiveUntil(outputConsumer2, received2, nMessages / 2) - ).parTupled.timeout(timeout) >> (received1.get, received2.get).mapN(_ ++ _)) - .guarantee(consumerToProducerFiber.cancel) - } yield assert(receivedMessages == bodies) - } + s"publish $nMessages messages, consume them concurrently with local transactions and then republishing to other queues" in { + + val res = for { + connection <- connectionRes + session <- connection.createSession(AutoAcknowledge) + inputQueue <- Resource.liftF(session.createQueue(inputQueueName)) + outputQueue1 <- Resource.liftF(session.createQueue(outputQueueName1)) + outputQueue2 <- Resource.liftF(session.createQueue(outputQueueName2)) + inputProducer <- connection.createSession(AutoAcknowledge).flatMap(_.createProducer(inputQueue)) + outputConsumer1 <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(outputQueue1)) + outputConsumer2 <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(outputQueue2)) + messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) + consumer <- jmsClient.createTransactedConsumerToProducers( + connection, + inputQueueName, + NonEmptyList.of(outputQueueName1, outputQueueName2), + poolSize + ) + } yield (consumer, inputProducer, outputConsumer1, outputConsumer2, bodies.toSet, messages) + + res.use { + case (consumer, inputProducer, outputConsumer1, outputConsumer2, bodies, messages) => + for { + _ <- messages.traverse_(msg => inputProducer.send(msg)) + _ <- logger.info(s"Pushed ${messages.size} messages.") + consumerToProducerFiber <- consumer.handle { message => + for { + tm <- message.asJmsTextMessage + text <- tm.getText + } yield + if (text.toInt % 2 == 0) + TransactionAction.send[IO](messageFactory(tm, outputQueueName1)) + else + TransactionAction.send[IO](messageFactory(tm, outputQueueName2)) + }.start + _ <- logger.info(s"Consumer to Producer started. Collecting messages from output queues...") + received1 <- Ref.of[IO, Set[String]](Set()) + received2 <- Ref.of[IO, Set[String]](Set()) + receivedMessages <- (( + receiveUntil(outputConsumer1, received1, nMessages / 2), + receiveUntil(outputConsumer2, received2, nMessages / 2) + ).parTupled.timeout(timeout) >> (received1.get, received2.get).mapN(_ ++ _)) + .guarantee(consumerToProducerFiber.cancel) + } yield assert(receivedMessages == bodies) } + } - s"publish $nMessages messages and then consume them concurrently with acknowledge" in { - - val res = for { - connection <- connectionRes - session <- connection.createSession(AutoAcknowledge) - queue <- Resource.liftF(session.createQueue(inputQueueName)) - producer <- session.createProducer(queue) - messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) - consumer <- jmsClient.createAcknowledgerConsumer(connection, inputQueueName, poolSize) - } yield (consumer, producer, bodies.toSet, messages) - - res.use { - case (consumer, producer, bodies, messages) => - for { - _ <- messages.traverse_(msg => producer.send(msg)) - _ <- logger.info(s"Pushed ${messages.size} messages.") - received <- Ref.of[IO, Set[String]](Set()) - consumerFiber <- consumer.handle { message => - for { - tm <- message.asJmsTextMessage - body <- tm.getText - _ <- received.update(_ + body) - } yield AckAction.ack - }.start - _ <- logger.info(s"Consumer started. Collecting messages from the queue...") - receivedMessages <- (received.get.iterateUntil(_.eqv(bodies)).timeout(timeout) >> received.get) - .guarantee(consumerFiber.cancel) - } yield assert(receivedMessages == bodies) - } + s"publish $nMessages messages and then consume them concurrently with acknowledge" in { + + val res = for { + connection <- connectionRes + session <- connection.createSession(AutoAcknowledge) + queue <- Resource.liftF(session.createQueue(inputQueueName)) + producer <- connection.createSession(AutoAcknowledge).flatMap(_.createProducer(queue)) + messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) + consumer <- jmsClient.createAcknowledgerConsumer(connection, inputQueueName, poolSize) + } yield (consumer, producer, bodies.toSet, messages) + + res.use { + case (consumer, producer, bodies, messages) => + for { + _ <- messages.traverse_(msg => producer.send(msg)) + _ <- logger.info(s"Pushed ${messages.size} messages.") + received <- Ref.of[IO, Set[String]](Set()) + consumerFiber <- consumer.handle { message => + for { + tm <- message.asJmsTextMessage + body <- tm.getText + _ <- received.update(_ + body) + } yield AckAction.ack + }.start + _ <- logger.info(s"Consumer started. Collecting messages from the queue...") + receivedMessages <- (received.get.iterateUntil(_.eqv(bodies)).timeout(timeout) >> received.get) + .guarantee(consumerFiber.cancel) + } yield assert(receivedMessages == bodies) } + } - s"publish $nMessages messages, consume them concurrently and then republishing to other queues, with acknowledge" in { - - val res = for { - connection <- connectionRes - session <- connection.createSession(AutoAcknowledge) - inputQueue <- Resource.liftF(session.createQueue(inputQueueName)) - outputQueue1 <- Resource.liftF(session.createQueue(outputQueueName1)) - outputQueue2 <- Resource.liftF(session.createQueue(outputQueueName2)) - inputProducer <- connection.createSession(AutoAcknowledge).flatMap(_.createProducer(inputQueue)) - outputConsumer1 <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(outputQueue1)) - outputConsumer2 <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(outputQueue2)) - bodies = (0 until nMessages).map(i => s"$i") - messages <- Resource.liftF(bodies.toList.traverse(i => session.createTextMessage(i))) - consumer <- jmsClient.createAcknowledgerToProducers( - connection, - inputQueueName, - NonEmptyList.of(outputQueueName1, outputQueueName2), - poolSize - ) - } yield (consumer, inputProducer, outputConsumer1, outputConsumer2, bodies.toSet, messages) - - res.use { - case (consumer, inputProducer, outputConsumer1, outputConsumer2, bodies, messages) => - for { - _ <- messages.traverse_(msg => inputProducer.send(msg)) - _ <- logger.info(s"Pushed ${messages.size} messages.") - consumerToProducerFiber <- consumer.handle { message => - for { - tm <- message.asJmsTextMessage - text <- tm.getText - } yield - if (text.toInt % 2 == 0) - AckAction.send[IO](messageFactory(tm, outputQueueName1)) - else AckAction.send[IO](messageFactory(tm, outputQueueName2)) - }.start - _ <- logger.info(s"Consumer to Producer started. Collecting messages from output queues...") - received1 <- Ref.of[IO, Set[String]](Set()) - received2 <- Ref.of[IO, Set[String]](Set()) - receivedMessages <- (( - receiveUntil(outputConsumer1, received1, nMessages / 2), - receiveUntil(outputConsumer2, received2, nMessages / 2) - ).parTupled.timeout(timeout) >> (received1.get, received2.get).mapN(_ ++ _)) - .guarantee(consumerToProducerFiber.cancel) - } yield assert(receivedMessages == bodies) - } + s"publish $nMessages messages, consume them concurrently and then republishing to other queues, with acknowledge" in { + + val res = for { + connection <- connectionRes + session <- connection.createSession(AutoAcknowledge) + inputQueue <- Resource.liftF(session.createQueue(inputQueueName)) + outputQueue1 <- Resource.liftF(session.createQueue(outputQueueName1)) + outputQueue2 <- Resource.liftF(session.createQueue(outputQueueName2)) + inputProducer <- connection.createSession(AutoAcknowledge).flatMap(_.createProducer(inputQueue)) + outputConsumer1 <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(outputQueue1)) + outputConsumer2 <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(outputQueue2)) + messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) + consumer <- jmsClient.createAcknowledgerToProducers( + connection, + inputQueueName, + NonEmptyList.of(outputQueueName1, outputQueueName2), + poolSize + ) + } yield (consumer, inputProducer, outputConsumer1, outputConsumer2, bodies.toSet, messages) + + res.use { + case (consumer, inputProducer, outputConsumer1, outputConsumer2, bodies, messages) => + for { + _ <- messages.traverse_(msg => inputProducer.send(msg)) + _ <- logger.info(s"Pushed ${messages.size} messages.") + consumerToProducerFiber <- consumer.handle { message => + for { + tm <- message.asJmsTextMessage + text <- tm.getText + } yield + if (text.toInt % 2 == 0) + AckAction.send[IO](messageFactory(tm, outputQueueName1)) + else + AckAction.send[IO](messageFactory(tm, outputQueueName2)) + }.start + _ <- logger.info(s"Consumer to Producer started. Collecting messages from output queues...") + received1 <- Ref.of[IO, Set[String]](Set()) + received2 <- Ref.of[IO, Set[String]](Set()) + receivedMessages <- (( + receiveUntil(outputConsumer1, received1, nMessages / 2), + receiveUntil(outputConsumer2, received2, nMessages / 2) + ).parTupled.timeout(timeout) >> (received1.get, received2.get).mapN(_ ++ _)) + .guarantee(consumerToProducerFiber.cancel) + } yield assert(receivedMessages == bodies) } + } + + s"publish $nMessages messages and then consume them concurrently with auto-acknowledge" in { - s"publish $nMessages messages and then consume them concurrently with auto-acknowledge" in { - - val res = for { - connection <- connectionRes - session <- connection.createSession(AutoAcknowledge) - queue <- Resource.liftF(session.createQueue(inputQueueName)) - producer <- session.createProducer(queue) - messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) - consumer <- jmsClient.createAutoAcknowledgerConsumer(connection, inputQueueName, poolSize) - } yield (consumer, producer, bodies.toSet, messages) - - res.use { - case (consumer, producer, bodies, messages) => - for { - _ <- messages.traverse_(msg => producer.send(msg)) - _ <- logger.info(s"Pushed ${messages.size} messages.") - received <- Ref.of[IO, Set[String]](Set()) - consumerFiber <- consumer.handle { message => - for { - tm <- message.asJmsTextMessage - body <- tm.getText - _ <- received.update(_ + body) - } yield AutoAckAction.noOp - }.start - _ <- logger.info(s"Consumer started. Collecting messages from the queue...") - receivedMessages <- (received.get.iterateUntil(_.eqv(bodies)).timeout(timeout) >> received.get) - .guarantee(consumerFiber.cancel) - } yield assert(receivedMessages == bodies) - } + val res = for { + connection <- connectionRes + session <- connection.createSession(AutoAcknowledge) + queue <- Resource.liftF(session.createQueue(inputQueueName)) + producer <- connection.createSession(AutoAcknowledge).flatMap(_.createProducer(queue)) + messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) + consumer <- jmsClient.createAutoAcknowledgerConsumer(connection, inputQueueName, poolSize) + } yield (consumer, producer, bodies.toSet, messages) + + res.use { + case (consumer, producer, bodies, messages) => + for { + _ <- messages.traverse_(msg => producer.send(msg)) + _ <- logger.info(s"Pushed ${messages.size} messages.") + received <- Ref.of[IO, Set[String]](Set()) + consumerFiber <- consumer.handle { message => + for { + tm <- message.asJmsTextMessage + body <- tm.getText + _ <- received.update(_ + body) + } yield AutoAckAction.noOp + }.start + _ <- logger.info(s"Consumer started. Collecting messages from the queue...") + receivedMessages <- (received.get.iterateUntil(_.eqv(bodies)).timeout(timeout) >> received.get) + .guarantee(consumerFiber.cancel) + } yield assert(receivedMessages == bodies) } + } + + s"publish $nMessages messages, consume them concurrently and then republishing to other queues, with auto-acknowledge" in { + + val res = for { + connection <- connectionRes + session <- connection.createSession(AutoAcknowledge) + inputQueue <- Resource.liftF(session.createQueue(inputQueueName)) + outputQueue1 <- Resource.liftF(session.createQueue(outputQueueName1)) + outputQueue2 <- Resource.liftF(session.createQueue(outputQueueName2)) + inputProducer <- connection.createSession(AutoAcknowledge).flatMap(_.createProducer(inputQueue)) + outputConsumer1 <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(outputQueue1)) + outputConsumer2 <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(outputQueue2)) + messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) + consumer <- jmsClient.createAutoAcknowledgerToProducers( + connection, + inputQueueName, + NonEmptyList.of(outputQueueName1, outputQueueName2), + poolSize + ) + } yield (consumer, inputProducer, outputConsumer1, outputConsumer2, bodies.toSet, messages) - s"publish $nMessages messages, consume them concurrently and then republishing to other queues, with auto-acknowledge" in { - - val res = for { - connection <- connectionRes - session <- connection.createSession(AutoAcknowledge) - inputQueue <- Resource.liftF(session.createQueue(inputQueueName)) - outputQueue1 <- Resource.liftF(session.createQueue(outputQueueName1)) - outputQueue2 <- Resource.liftF(session.createQueue(outputQueueName2)) - inputProducer <- connection.createSession(AutoAcknowledge).flatMap(_.createProducer(inputQueue)) - outputConsumer1 <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(outputQueue1)) - outputConsumer2 <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(outputQueue2)) - messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) - consumer <- jmsClient.createAutoAcknowledgerToProducers( - connection, - inputQueueName, - NonEmptyList.of(outputQueueName1, outputQueueName2), - poolSize - ) - } yield (consumer, inputProducer, outputConsumer1, outputConsumer2, bodies.toSet, messages) - - res.use { - case (consumer, inputProducer, outputConsumer1, outputConsumer2, bodies, messages) => - for { - _ <- messages.traverse_(msg => inputProducer.send(msg)) - _ <- logger.info(s"Pushed ${messages.size} messages.") - consumerToProducerFiber <- consumer.handle { message => - for { - tm <- message.asJmsTextMessage - text <- tm.getText - } yield - if (text.toInt % 2 == 0) - AutoAckAction.send[IO](messageFactory(tm, outputQueueName1)) - else AutoAckAction.send[IO](messageFactory(tm, outputQueueName2)) - }.start - _ <- logger.info(s"Consumer to Producer started. Collecting messages from output queues...") - received1 <- Ref.of[IO, Set[String]](Set()) - received2 <- Ref.of[IO, Set[String]](Set()) - receivedMessages <- (( - receiveUntil(outputConsumer1, received1, nMessages / 2), - receiveUntil(outputConsumer2, received2, nMessages / 2) - ).parTupled.timeout(timeout) >> (received1.get, received2.get).mapN(_ ++ _)) - .guarantee(consumerToProducerFiber.cancel) - } yield assert(receivedMessages == bodies) - } + res.use { + case (consumer, inputProducer, outputConsumer1, outputConsumer2, bodies, messages) => + for { + _ <- messages.traverse_(msg => inputProducer.send(msg)) + _ <- logger.info(s"Pushed ${messages.size} messages.") + consumerToProducerFiber <- consumer.handle { message => + for { + tm <- message.asJmsTextMessage + text <- tm.getText + } yield + if (text.toInt % 2 == 0) + AutoAckAction.send[IO](messageFactory(tm, outputQueueName1)) + else AutoAckAction.send[IO](messageFactory(tm, outputQueueName2)) + }.start + _ <- logger.info(s"Consumer to Producer started. Collecting messages from output queues...") + received1 <- Ref.of[IO, Set[String]](Set()) + received2 <- Ref.of[IO, Set[String]](Set()) + receivedMessages <- (( + receiveUntil(outputConsumer1, received1, nMessages / 2), + receiveUntil(outputConsumer2, received2, nMessages / 2) + ).parTupled.timeout(timeout) >> (received1.get, received2.get).mapN(_ ++ _)) + .guarantee(consumerToProducerFiber.cancel) + } yield assert(receivedMessages == bodies) } } s"send $nMessages messages in a Queue with pooled producer and consume them" in { - val jmsClient = new JmsClient[IO] val res = for { connection <- connectionRes - session <- connection.createSession(SessionType.AutoAcknowledge) + session <- connection.createSession(AutoAcknowledge) outputQueue <- Resource.liftF(session.createQueue(outputQueueName1)) - outputConsumer <- session.createConsumer(outputQueue) + outputConsumer <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(outputQueue)) messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) producer <- jmsClient.createProducer(connection, outputQueueName1, poolSize) } yield (producer, outputConsumer, bodies.toSet, messages) @@ -276,13 +273,12 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { } s"sendN $nMessages messages in a Queue with pooled producer and consume them" in { - val jmsClient = new JmsClient[IO] val res = for { connection <- connectionRes - session <- connection.createSession(SessionType.AutoAcknowledge) + session <- connection.createSession(AutoAcknowledge) outputQueue <- Resource.liftF(session.createQueue(outputQueueName1)) - outputConsumer <- session.createConsumer(outputQueue) + outputConsumer <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(outputQueue)) messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) producer <- jmsClient.createProducer(connection, outputQueueName1, poolSize) } yield (producer, outputConsumer, bodies.toSet, messages) @@ -300,13 +296,12 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { } s"send $nMessages messages in a Topic with pooled producer and consume them" in { - val jmsClient = new JmsClient[IO] val res = for { connection <- connectionRes - session <- connection.createSession(SessionType.AutoAcknowledge) + session <- connection.createSession(AutoAcknowledge) outputTopic <- Resource.liftF(session.createTopic(topicName)) - outputConsumer <- session.createConsumer(outputTopic) + outputConsumer <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(outputTopic)) messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) producer <- jmsClient.createProducer(connection, topicName, poolSize) } yield (producer, outputConsumer, bodies.toSet, messages) @@ -324,13 +319,12 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { } s"sendN $nMessages messages in a Topic with pooled producer and consume them" in { - val jmsClient = new JmsClient[IO] val res = for { connection <- connectionRes - session <- connection.createSession(SessionType.AutoAcknowledge) + session <- connection.createSession(AutoAcknowledge) outputTopic <- Resource.liftF(session.createTopic(topicName)) - outputConsumer <- session.createConsumer(outputTopic) + outputConsumer <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(outputTopic)) messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) producer <- jmsClient.createProducer(connection, topicName, poolSize) } yield (producer, outputConsumer, bodies.toSet, messages) @@ -338,7 +332,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { res.use { case (producer, outputConsumer, bodies, messages) => for { - _ <- messages.toNel.fold(Sync[IO].unit)(ms => producer.sendN(messageFactory(ms))) + _ <- messages.toNel.fold(IO.unit)(ms => producer.sendN(messageFactory(ms))) _ <- logger.info(s"Pushed ${messages.size} messages.") _ <- logger.info(s"Consumer to Producer started.\nCollecting messages from output queue...") received <- Ref.of[IO, Set[String]](Set()) @@ -348,15 +342,14 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { } s"send $nMessages messages in two Queues with unidentified pooled producer and consume them" in { - val jmsClient = new JmsClient[IO] val res = for { connection <- connectionRes - session <- connection.createSession(SessionType.AutoAcknowledge) + session <- connection.createSession(AutoAcknowledge) outputQueue <- Resource.liftF(session.createQueue(outputQueueName1)) outputQueue2 <- Resource.liftF(session.createQueue(outputQueueName2)) - outputConsumer <- session.createConsumer(outputQueue) - outputConsumer2 <- session.createConsumer(outputQueue2) + outputConsumer <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(outputQueue)) + outputConsumer2 <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(outputQueue2)) messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) producer <- jmsClient.createProducer(connection, poolSize) } yield (producer, outputConsumer, outputConsumer2, bodies.toSet, messages) @@ -371,7 +364,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { ) ) _ <- logger.info(s"Pushed ${messages.size} messages.") - _ <- logger.info(s"Consumer to Producer started.\nCollecting messages from output queue...") + _ <- logger.info(s"Consumer to Producer started. Collecting messages from output queue...") firstBatch <- Ref.of[IO, Set[String]](Set()) firstBatchMessages <- receiveUntil(outputConsumer, firstBatch, nMessages).timeout(timeout) >> firstBatch.get secondBatch <- Ref.of[IO, Set[String]](Set()) @@ -381,15 +374,14 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { } s"send $nMessages messages in two Topics with unidentified pooled producer and consume them" in { - val jmsClient = new JmsClient[IO] val res = for { connection <- connectionRes - session <- connection.createSession(SessionType.AutoAcknowledge) + session <- connection.createSession(AutoAcknowledge) topic <- Resource.liftF(session.createTopic(topicName)) - outputConsumer <- session.createConsumer(topic) + outputConsumer <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(topic)) topic2 <- Resource.liftF(session.createTopic(topicName2)) - outputConsumer2 <- session.createConsumer(topic2) + outputConsumer2 <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(topic2)) messages <- Resource.liftF(bodies.traverse(i => session.createTextMessage(i))) producer <- jmsClient.createProducer(connection, poolSize) } yield (producer, outputConsumer, outputConsumer2, bodies.toSet, messages) @@ -401,7 +393,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { msg => producer.send(messageFactory(msg, topicName)) *> producer.send(messageFactory(msg, topicName2)) ) _ <- logger.info(s"Pushed ${messages.size} messages.") - _ <- logger.info(s"Consumer to Producer started.\nCollecting messages from output queue...") + _ <- logger.info(s"Consumer to Producer started. Collecting messages from output queue...") firstBatch <- Ref.of[IO, Set[String]](Set()) firstBatchMessages <- receiveUntil(outputConsumer, firstBatch, nMessages).timeout(timeout) >> firstBatch.get secondBatch <- Ref.of[IO, Set[String]](Set()) @@ -411,14 +403,11 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { } s"sendN $nMessages messages with delay in a Queue with pooled producer and consume them" in { - val jmsClient = new JmsClient[IO] - - val body = "body" val res = for { connection <- connectionRes - session <- connection.createSession(SessionType.AutoAcknowledge) + session <- connection.createSession(AutoAcknowledge) outputQueue <- Resource.liftF(session.createQueue(outputQueueName1)) - outputConsumer <- session.createConsumer(outputQueue) + outputConsumer <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(outputQueue)) message <- Resource.liftF(session.createTextMessage(body)) producer <- jmsClient.createProducer(connection, poolSize) } yield (producer, outputConsumer, message) @@ -428,8 +417,8 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { for { producerTimestamp <- Timer[IO].clock.realTime(TimeUnit.MILLISECONDS) _ <- producer.sendWithDelay(messageWithDelayFactory((message, (outputQueueName1, Some(delay))))) - _ <- logger.info(s"Pushed message with body: ${body}.") - _ <- logger.info(s"Consumer to Producer started.\nCollecting messages from output queue...") + _ <- logger.info(s"Pushed message with body: $body.") + _ <- logger.info(s"Consumer to Producer started. Collecting messages from output queue...") receivedMessage <- receiveMessage(outputConsumer).timeout(timeout) actualBody <- receivedMessage.getText jmsDeliveryTime <- receivedMessage.getJMSDeliveryTime @@ -437,5 +426,4 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { } yield assert(actualDelay >= delay.toMillis && actualBody == body) } } - } diff --git a/tests/src/test/scala/jms4s/basespec/Jms4sBaseSpec.scala b/tests/src/test/scala/jms4s/basespec/Jms4sBaseSpec.scala index d9a2de1a..dccd84de 100644 --- a/tests/src/test/scala/jms4s/basespec/Jms4sBaseSpec.scala +++ b/tests/src/test/scala/jms4s/basespec/Jms4sBaseSpec.scala @@ -17,6 +17,7 @@ trait Jms4sBaseSpec { def connectionRes: Resource[IO, JmsConnection[IO]] + val body = "body" val nMessages: Int = 50 val bodies: List[String] = (0 until nMessages).map(i => s"$i").toList val poolSize: Int = 4 diff --git a/tests/src/test/scala/jms4s/jms/JmsSpec.scala b/tests/src/test/scala/jms4s/jms/JmsSpec.scala index 36d330a3..bf3782d3 100644 --- a/tests/src/test/scala/jms4s/jms/JmsSpec.scala +++ b/tests/src/test/scala/jms4s/jms/JmsSpec.scala @@ -6,65 +6,62 @@ import cats.effect.testing.scalatest.AsyncIOSpec import cats.effect.{ IO, Resource, Timer } import cats.implicits._ import jms4s.basespec.Jms4sBaseSpec -import jms4s.model.SessionType +import jms4s.model.SessionType.AutoAcknowledge import org.scalatest.freespec.AsyncFreeSpec import scala.concurrent.duration._ trait JmsSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { - val expectedBody = "body" - "Basic jms ops" - { - val queueRes = for { - connection <- connectionRes - session <- connection.createSession(SessionType.AutoAcknowledge) - queue <- Resource.liftF(session.createQueue(inputQueueName)) - queueConsumer <- connection.createSession(SessionType.AutoAcknowledge).flatMap(_.createConsumer(queue)) - queueProducer <- connection.createSession(SessionType.AutoAcknowledge).flatMap(_.createProducer(queue)) - msg <- Resource.liftF(session.createTextMessage(expectedBody)) - } yield (queueConsumer, queueProducer, msg) + val queueRes = for { + connection <- connectionRes + session <- connection.createSession(AutoAcknowledge) + queue <- Resource.liftF(session.createQueue(inputQueueName)) + queueConsumer <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(queue)) + queueProducer <- connection.createSession(AutoAcknowledge).flatMap(_.createProducer(queue)) + msg <- Resource.liftF(session.createTextMessage(body)) + } yield (queueConsumer, queueProducer, msg) - val topicRes = for { - connection <- connectionRes - session <- connection.createSession(SessionType.AutoAcknowledge) - topic <- Resource.liftF(session.createTopic(topicName)) - topicConsumer <- connection.createSession(SessionType.AutoAcknowledge).flatMap(_.createConsumer(topic)) - topicProducer <- connection.createSession(SessionType.AutoAcknowledge).flatMap(_.createProducer(topic)) - msg <- Resource.liftF(session.createTextMessage(expectedBody)) - } yield (topicConsumer, topicProducer, msg) + val topicRes = for { + connection <- connectionRes + session <- connection.createSession(AutoAcknowledge) + topic <- Resource.liftF(session.createTopic(topicName)) + topicConsumer <- connection.createSession(AutoAcknowledge).flatMap(_.createConsumer(topic)) + topicProducer <- connection.createSession(AutoAcknowledge).flatMap(_.createProducer(topic)) + msg <- Resource.liftF(session.createTextMessage(body)) + } yield (topicConsumer, topicProducer, msg) - "publish to a queue and then receive" in { - queueRes.use { - case (queueConsumer, queueProducer, msg) => - for { - _ <- queueProducer.send(msg) - text <- receiveBodyAsTextOrFail(queueConsumer) - } yield assert(text == expectedBody) - } + "publish to a queue and then receive" in { + queueRes.use { + case (queueConsumer, queueProducer, msg) => + for { + _ <- queueProducer.send(msg) + text <- receiveBodyAsTextOrFail(queueConsumer) + } yield assert(text == body) } - "publish and then receive with a delay" in { - queueRes.use { - case (consumer, producer, msg) => - for { - _ <- producer.setDeliveryDelay(delay) - producerTimestamp <- Timer[IO].clock.realTime(TimeUnit.MILLISECONDS) - _ <- producer.send(msg) - msg <- consumer.receiveJmsMessage - tm <- msg.asJmsTextMessage - body <- tm.getText - jmsDeliveryTime <- tm.getJMSDeliveryTime - producerDelay = jmsDeliveryTime - producerTimestamp - } yield assert(producerDelay >= delay.toMillis && body == expectedBody) - } + } + "publish and then receive with a delay" in { + queueRes.use { + case (consumer, producer, msg) => + for { + _ <- producer.setDeliveryDelay(delay) + producerTimestamp <- Timer[IO].clock.realTime(TimeUnit.MILLISECONDS) + _ <- producer.send(msg) + msg <- consumer.receiveJmsMessage + tm <- msg.asJmsTextMessage + body <- tm.getText + jmsDeliveryTime <- tm.getJMSDeliveryTime + producerDelay = jmsDeliveryTime - producerTimestamp + } yield assert(producerDelay >= delay.toMillis && body == body) } - "publish to a topic and then receive" in { - topicRes.use { - case (topicConsumer, topicProducer, msg) => - for { - _ <- (IO.delay(10.millis) >> topicProducer.send(msg)).start - rec <- receiveBodyAsTextOrFail(topicConsumer) - } yield assert(rec == expectedBody) - } + } + "publish to a topic and then receive" in { + topicRes.use { + case (topicConsumer, topicProducer, msg) => + for { + _ <- (IO.delay(10.millis) >> topicProducer.send(msg)).start + rec <- receiveBodyAsTextOrFail(topicConsumer) + } yield assert(rec == body) } } } From c9197947702eb080294975d7b59ca02fcfe44809 Mon Sep 17 00:00:00 2001 From: Alessandro Zoffoli Date: Tue, 7 Apr 2020 14:24:23 +0200 Subject: [PATCH 6/8] Minors --- .../main/scala/jms4s/activemq/activeMQ.scala | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/active-mq-artemis/src/main/scala/jms4s/activemq/activeMQ.scala b/active-mq-artemis/src/main/scala/jms4s/activemq/activeMQ.scala index 7f2291d6..1551776f 100644 --- a/active-mq-artemis/src/main/scala/jms4s/activemq/activeMQ.scala +++ b/active-mq-artemis/src/main/scala/jms4s/activemq/activeMQ.scala @@ -25,16 +25,13 @@ object activeMQ { connection <- Resource.make( Logger[F].info(s"Opening Connection to MQ at ${hosts(config.endpoints)}...") *> Sync[F].delay { - val connectionFactory: ActiveMQConnectionFactory = - new ActiveMQConnectionFactory(hosts(config.endpoints)) - connectionFactory.setClientID(config.clientId) - - val connection: Connection = config.username.map { (username) => - connectionFactory.createConnection( - username.value, - config.password.map(_.value).getOrElse("") - ) - }.getOrElse(connectionFactory.createConnection) + val factory = new ActiveMQConnectionFactory(hosts(config.endpoints)) + factory.setClientID(config.clientId) + + val connection = config.username.fold(factory.createConnection)( + username => + factory.createConnection(username.value, config.password.map(_.value).getOrElse("")) + ) connection.start() connection From b725a415e9d14a6273bde20b1a4943d598310c84 Mon Sep 17 00:00:00 2001 From: Alessandro Zoffoli Date: Tue, 7 Apr 2020 14:34:41 +0200 Subject: [PATCH 7/8] Remove String obsession --- .../src/main/scala/jms4s/activemq/activeMQ.scala | 6 +++--- ibm-mq/src/main/scala/jms4s/ibmmq/ibmMQ.scala | 5 +++-- .../jms4s/basespec/providers/ActiveMQArtemisBaseSpec.scala | 4 ++-- .../test/scala/jms4s/basespec/providers/IbmMQBaseSpec.scala | 2 +- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/active-mq-artemis/src/main/scala/jms4s/activemq/activeMQ.scala b/active-mq-artemis/src/main/scala/jms4s/activemq/activeMQ.scala index 1551776f..d01783a5 100644 --- a/active-mq-artemis/src/main/scala/jms4s/activemq/activeMQ.scala +++ b/active-mq-artemis/src/main/scala/jms4s/activemq/activeMQ.scala @@ -4,7 +4,6 @@ import cats.data.NonEmptyList import cats.effect.{ Blocker, Resource, Sync } import cats.implicits._ import io.chrisdavenport.log4cats.Logger -import javax.jms.Connection import jms4s.jms.JmsConnection import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory @@ -14,11 +13,12 @@ object activeMQ { endpoints: NonEmptyList[Endpoint], username: Option[Username] = None, password: Option[Password] = None, - clientId: String + clientId: ClientId ) case class Username(value: String) extends AnyVal case class Password(value: String) extends AnyVal case class Endpoint(host: String, port: Int) + case class ClientId(value: String) extends AnyVal def makeConnection[F[_]: Sync: Logger](config: Config, blocker: Blocker): Resource[F, JmsConnection[F]] = for { @@ -26,7 +26,7 @@ object activeMQ { Logger[F].info(s"Opening Connection to MQ at ${hosts(config.endpoints)}...") *> Sync[F].delay { val factory = new ActiveMQConnectionFactory(hosts(config.endpoints)) - factory.setClientID(config.clientId) + factory.setClientID(config.clientId.value) val connection = config.username.fold(factory.createConnection)( username => diff --git a/ibm-mq/src/main/scala/jms4s/ibmmq/ibmMQ.scala b/ibm-mq/src/main/scala/jms4s/ibmmq/ibmMQ.scala index dd5ccb67..6459bb5b 100644 --- a/ibm-mq/src/main/scala/jms4s/ibmmq/ibmMQ.scala +++ b/ibm-mq/src/main/scala/jms4s/ibmmq/ibmMQ.scala @@ -16,13 +16,14 @@ object ibmMQ { channel: Channel, username: Option[Username] = None, password: Option[Password] = None, - clientId: String + clientId: ClientId ) case class Username(value: String) extends AnyVal case class Password(value: String) extends AnyVal case class Endpoint(host: String, port: Int) case class QueueManager(value: String) extends AnyVal case class Channel(value: String) extends AnyVal + case class ClientId(value: String) extends AnyVal def makeConnection[F[_]: Sync: Logger](config: Config, blocker: Blocker): Resource[F, JmsConnection[F]] = for { @@ -34,7 +35,7 @@ object ibmMQ { connectionFactory.setQueueManager(config.qm.value) connectionFactory.setConnectionNameList(hosts(config.endpoints)) connectionFactory.setChannel(config.channel.value) - connectionFactory.setClientID(config.clientId) + connectionFactory.setClientID(config.clientId.value) val connection = config.username.map { (username) => connectionFactory.createConnection( diff --git a/tests/src/test/scala/jms4s/basespec/providers/ActiveMQArtemisBaseSpec.scala b/tests/src/test/scala/jms4s/basespec/providers/ActiveMQArtemisBaseSpec.scala index 9e556bf8..0585c29e 100644 --- a/tests/src/test/scala/jms4s/basespec/providers/ActiveMQArtemisBaseSpec.scala +++ b/tests/src/test/scala/jms4s/basespec/providers/ActiveMQArtemisBaseSpec.scala @@ -3,7 +3,7 @@ package jms4s.basespec.providers import cats.data.NonEmptyList import cats.effect.{ Blocker, IO, Resource } import jms4s.activemq.activeMQ -import jms4s.activemq.activeMQ.{ Config, Endpoint, Password, Username } +import jms4s.activemq.activeMQ.{ ClientId, Config, Endpoint, Password, Username } import jms4s.basespec.Jms4sBaseSpec import jms4s.jms.JmsConnection @@ -18,7 +18,7 @@ trait ActiveMQArtemisBaseSpec extends Jms4sBaseSpec { endpoints = NonEmptyList.one(Endpoint("localhost", 61616)), username = Some(Username("admin")), password = Some(Password("passw0rd")), - clientId = "jms-specs" + clientId = ClientId("jms-specs") ), blocker ) diff --git a/tests/src/test/scala/jms4s/basespec/providers/IbmMQBaseSpec.scala b/tests/src/test/scala/jms4s/basespec/providers/IbmMQBaseSpec.scala index 4dc4d67b..6538eafd 100644 --- a/tests/src/test/scala/jms4s/basespec/providers/IbmMQBaseSpec.scala +++ b/tests/src/test/scala/jms4s/basespec/providers/IbmMQBaseSpec.scala @@ -25,7 +25,7 @@ trait IbmMQBaseSpec extends Jms4sBaseSpec { channel = Channel("DEV.ADMIN.SVRCONN"), username = Some(Username("admin")), password = Some(Password("passw0rd")), - clientId = "jms-specs" + clientId = ClientId("jms-specs") ), blocker ) From e4d9648fc586cc078181a02f861960f2d138031e Mon Sep 17 00:00:00 2001 From: Alessandro Zoffoli Date: Tue, 7 Apr 2020 14:50:32 +0200 Subject: [PATCH 8/8] CI is slow --- tests/src/test/scala/jms4s/JmsClientSpec.scala | 4 ++-- tests/src/test/scala/jms4s/basespec/Jms4sBaseSpec.scala | 4 ++-- tests/src/test/scala/jms4s/jms/JmsSpec.scala | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/src/test/scala/jms4s/JmsClientSpec.scala b/tests/src/test/scala/jms4s/JmsClientSpec.scala index 55ae1c56..18ff23e3 100644 --- a/tests/src/test/scala/jms4s/JmsClientSpec.scala +++ b/tests/src/test/scala/jms4s/JmsClientSpec.scala @@ -421,8 +421,8 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { _ <- logger.info(s"Consumer to Producer started. Collecting messages from output queue...") receivedMessage <- receiveMessage(outputConsumer).timeout(timeout) actualBody <- receivedMessage.getText - jmsDeliveryTime <- receivedMessage.getJMSDeliveryTime - actualDelay = jmsDeliveryTime - producerTimestamp + deliveryTime <- Timer[IO].clock.realTime(TimeUnit.MILLISECONDS) + actualDelay = deliveryTime - producerTimestamp } yield assert(actualDelay >= delay.toMillis && actualBody == body) } } diff --git a/tests/src/test/scala/jms4s/basespec/Jms4sBaseSpec.scala b/tests/src/test/scala/jms4s/basespec/Jms4sBaseSpec.scala index dccd84de..f9d7642f 100644 --- a/tests/src/test/scala/jms4s/basespec/Jms4sBaseSpec.scala +++ b/tests/src/test/scala/jms4s/basespec/Jms4sBaseSpec.scala @@ -21,8 +21,8 @@ trait Jms4sBaseSpec { val nMessages: Int = 50 val bodies: List[String] = (0 until nMessages).map(i => s"$i").toList val poolSize: Int = 4 - val timeout: FiniteDuration = 2.seconds - val delay: FiniteDuration = 200.millis + val timeout: FiniteDuration = 4.seconds // CI is slow... + val delay: FiniteDuration = 100.millis val topicName: TopicName = TopicName("DEV.BASE.TOPIC") val topicName2: TopicName = TopicName("DEV.BASE.TOPIC.1") val inputQueueName: QueueName = QueueName("DEV.QUEUE.1") diff --git a/tests/src/test/scala/jms4s/jms/JmsSpec.scala b/tests/src/test/scala/jms4s/jms/JmsSpec.scala index bf3782d3..95626cf6 100644 --- a/tests/src/test/scala/jms4s/jms/JmsSpec.scala +++ b/tests/src/test/scala/jms4s/jms/JmsSpec.scala @@ -50,9 +50,9 @@ trait JmsSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec { msg <- consumer.receiveJmsMessage tm <- msg.asJmsTextMessage body <- tm.getText - jmsDeliveryTime <- tm.getJMSDeliveryTime - producerDelay = jmsDeliveryTime - producerTimestamp - } yield assert(producerDelay >= delay.toMillis && body == body) + deliveryTime <- Timer[IO].clock.realTime(TimeUnit.MILLISECONDS) + actualDelay = deliveryTime - producerTimestamp + } yield assert(actualDelay >= delay.toMillis && body == body) } } "publish to a topic and then receive" in {