Skip to content

Commit

Permalink
Merge pull request #5 from fp-in-bo/activemq
Browse files Browse the repository at this point in the history
Add support for activemq-artemis
  • Loading branch information
AL333Z authored Apr 7, 2020
2 parents 7b0157f + e4d9648 commit a6408aa
Show file tree
Hide file tree
Showing 20 changed files with 595 additions and 481 deletions.
51 changes: 51 additions & 0 deletions active-mq-artemis/src/main/scala/jms4s/activemq/activeMQ.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package jms4s.activemq

import cats.data.NonEmptyList
import cats.effect.{ Blocker, Resource, Sync }
import cats.implicits._
import io.chrisdavenport.log4cats.Logger
import jms4s.jms.JmsConnection
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory

object activeMQ {

case class Config(
endpoints: NonEmptyList[Endpoint],
username: Option[Username] = None,
password: Option[Password] = None,
clientId: ClientId
)
case class Username(value: String) extends AnyVal
case class Password(value: String) extends AnyVal
case class Endpoint(host: String, port: Int)
case class ClientId(value: String) extends AnyVal

def makeConnection[F[_]: Sync: Logger](config: Config, blocker: Blocker): Resource[F, JmsConnection[F]] =
for {
connection <- Resource.make(
Logger[F].info(s"Opening Connection to MQ at ${hosts(config.endpoints)}...") *>
Sync[F].delay {
val factory = new ActiveMQConnectionFactory(hosts(config.endpoints))
factory.setClientID(config.clientId.value)

val connection = config.username.fold(factory.createConnection)(
username =>
factory.createConnection(username.value, config.password.map(_.value).getOrElse(""))
)

connection.start()
connection
}
)(
c =>
Logger[F].info(s"Closing Connection $c to MQ at ${hosts(config.endpoints)}...") *>
Sync[F].delay(c.close()) *>
Logger[F].info(s"Closed Connection $c to MQ at ${hosts(config.endpoints)}.")
)
_ <- Resource.liftF(Logger[F].info(s"Opened connection $connection."))
} yield new JmsConnection[F](connection, blocker)

private def hosts(endpoints: NonEmptyList[Endpoint]): String =
endpoints.map(e => s"tcp://${e.host}:${e.port}").toList.mkString(",")

}
17 changes: 13 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
val catsV = "2.0.0"
val jmsV = "2.0.1"
val ibmMQV = "9.1.4.0"
val activeMQV = "2.11.0"
val catsEffectV = "2.0.0"
val catsEffectScalaTestV = "0.4.0"
val fs2V = "2.0.0"
Expand All @@ -14,7 +15,7 @@ val betterMonadicForV = "0.3.1"
lazy val jms4s = project
.in(file("."))
.enablePlugins(NoPublishPlugin)
.aggregate(core, ibmMQ, tests, examples, site)
.aggregate(core, ibmMQ, activeMQArtemis, tests, examples, site)

lazy val core = project
.in(file("core"))
Expand All @@ -30,27 +31,35 @@ lazy val ibmMQ = project
.settings(parallelExecution in Test := false)
.dependsOn(core)

lazy val activeMQArtemis = project
.in(file("active-mq-artemis"))
.settings(commonSettings)
.settings(name := "jms4s-active-mq-artemis")
.settings(libraryDependencies += "org.apache.activemq" % "artemis-jms-client-all" % activeMQV)
.settings(parallelExecution in Test := false)
.dependsOn(core)

lazy val tests = project
.in(file("tests"))
.settings(commonSettings: _*)
.enablePlugins(NoPublishPlugin)
.settings(libraryDependencies += "org.apache.logging.log4j" % "log4j-slf4j-impl" % log4jSlf4jImplV % Runtime)
.settings(parallelExecution in Test := false)
.dependsOn(core, ibmMQ)
.dependsOn(ibmMQ, activeMQArtemis)

lazy val examples = project
.in(file("examples"))
.settings(commonSettings: _*)
.enablePlugins(NoPublishPlugin)
.dependsOn(core, ibmMQ)
.dependsOn(ibmMQ, activeMQArtemis)

lazy val site = project
.in(file("site"))
.enablePlugins(MicrositesPlugin)
.enablePlugins(MdocPlugin)
.enablePlugins(NoPublishPlugin)
.settings(commonSettings)
.dependsOn(core)
.dependsOn(core, ibmMQ, activeMQArtemis)
.settings {
import microsites._
Seq(
Expand Down
13 changes: 6 additions & 7 deletions core/src/main/scala/jms4s/JmsAcknowledgerConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import jms4s.JmsAcknowledgerConsumer.AckAction
import jms4s.JmsAcknowledgerConsumer.JmsAcknowledgerConsumerPool.JmsResource
import jms4s.config.DestinationName
import jms4s.jms.{ JmsConnection, JmsMessage, JmsMessageConsumer, MessageFactory }
import jms4s.model.SessionType
import jms4s.model.SessionType.ClientAcknowledge

import scala.concurrent.duration.FiniteDuration
Expand Down Expand Up @@ -49,23 +48,23 @@ object JmsAcknowledgerConsumer {
for {
inputDestination <- Resource.liftF(
connection
.createSession(SessionType.ClientAcknowledge)
.createSession(ClientAcknowledge)
.use(_.createDestination(inputDestinationName))
)
outputDestinations <- Resource.liftF(
outputDestinationNames
.traverse(
outputDestinationName =>
connection
.createSession(SessionType.ClientAcknowledge)
.createSession(ClientAcknowledge)
.use(_.createDestination(outputDestinationName))
.map(jmsDestination => (outputDestinationName, jmsDestination))
)
)
queue <- Resource.liftF(Queue.bounded[F, JmsResource[F]](concurrencyLevel))
_ <- (0 until concurrencyLevel).toList.traverse_ { _ =>
for {
session <- connection.createSession(SessionType.ClientAcknowledge)
session <- connection.createSession(ClientAcknowledge)
consumer <- session.createConsumer(inputDestination)
producers <- outputDestinations.traverse {
case (outputDestinationName, outputDestination) =>
Expand All @@ -89,18 +88,18 @@ object JmsAcknowledgerConsumer {
for {
inputDestination <- Resource.liftF(
connection
.createSession(SessionType.ClientAcknowledge)
.createSession(ClientAcknowledge)
.use(_.createDestination(inputDestinationName))
)
outputDestination <- Resource.liftF(
connection
.createSession(SessionType.ClientAcknowledge)
.createSession(ClientAcknowledge)
.use(_.createDestination(outputDestinationName))
)
pool <- Resource.liftF(Queue.bounded[F, JmsResource[F]](concurrencyLevel))
_ <- (0 until concurrencyLevel).toList.traverse_ { _ =>
for {
session <- connection.createSession(SessionType.ClientAcknowledge)
session <- connection.createSession(ClientAcknowledge)
consumer <- session.createConsumer(inputDestination)
jmsProducer <- session.createProducer(outputDestination)
producer = Map(outputDestinationName -> new JmsProducer(jmsProducer))
Expand Down
22 changes: 7 additions & 15 deletions core/src/main/scala/jms4s/JmsPooledProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import cats.effect.{ Concurrent, ContextShift, Resource, Sync }
import fs2.concurrent.Queue
import jms4s.config.DestinationName
import jms4s.jms._
import jms4s.model.SessionType
import cats.implicits._
import jms4s.model.SessionType.AutoAcknowledge

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._

Expand Down Expand Up @@ -41,7 +42,7 @@ object JmsPooledProducer {
)
_ <- (0 until concurrencyLevel).toList.traverse_ { _ =>
for {
session <- connection.createSession(SessionType.AutoAcknowledge)
session <- connection.createSession(AutoAcknowledge)
destination <- Resource.liftF(session.createDestination(queue))
producer <- session.createProducer(destination)
_ <- Resource.liftF(pool.enqueue1(JmsResource(session, producer, new MessageFactory(session))))
Expand All @@ -54,14 +55,8 @@ object JmsPooledProducer {
for {
resources <- pool.dequeue1
messages <- f(resources.messageFactory)
_ <- messages.traverse_(
message =>
for {
_ <- resources.producer
.send(message)
} yield ()
)
_ <- pool.enqueue1(resources)
_ <- messages.traverse_(message => resources.producer.send(message))
_ <- pool.enqueue1(resources)
} yield ()

override def sendNWithDelay(
Expand Down Expand Up @@ -104,11 +99,8 @@ object JmsPooledProducer {
for {
resources <- pool.dequeue1
message <- f(resources.messageFactory)
_ <- for {
_ <- resources.producer
.send(message)
} yield ()
_ <- pool.enqueue1(resources)
_ <- resources.producer.send(message)
_ <- pool.enqueue1(resources)
} yield ()
}

Expand Down
15 changes: 7 additions & 8 deletions core/src/main/scala/jms4s/JmsTransactedConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import jms4s.JmsTransactedConsumer.JmsTransactedConsumerPool.{ JmsResource, Rece
import jms4s.JmsTransactedConsumer.TransactionAction
import jms4s.config.DestinationName
import jms4s.jms._
import jms4s.model.SessionType
import jms4s.model.SessionType.Transacted

import scala.concurrent.duration.FiniteDuration
Expand All @@ -31,7 +30,7 @@ object JmsTransactedConsumer {
pool <- Resource.liftF(Queue.bounded[F, JmsResource[F]](concurrencyLevel))
_ <- (0 until concurrencyLevel).toList.traverse_ { _ =>
for {
session <- connection.createSession(SessionType.Transacted)
session <- connection.createSession(Transacted)
consumer <- session.createConsumer(input)
_ <- Resource.liftF(
pool.enqueue1(JmsResource(session, consumer, Map.empty, new MessageFactory[F](session)))
Expand All @@ -49,15 +48,15 @@ object JmsTransactedConsumer {
for {
inputDestination <- Resource.liftF(
connection
.createSession(SessionType.Transacted)
.createSession(Transacted)
.use(_.createDestination(inputDestinationName))
)
outputDestinations <- Resource.liftF(
outputDestinationNames
.traverse(
outputDestinationName =>
connection
.createSession(SessionType.Transacted)
.createSession(Transacted)
.use(_.createDestination(outputDestinationName))
.map(jmsDestination => (outputDestinationName, jmsDestination))
)
Expand All @@ -67,7 +66,7 @@ object JmsTransactedConsumer {
)
_ <- (0 until concurrencyLevel).toList.traverse_ { _ =>
for {
session <- connection.createSession(SessionType.Transacted)
session <- connection.createSession(Transacted)
consumer <- session.createConsumer(inputDestination)
producers <- outputDestinations.traverse {
case (outputDestinationName, outputDestination) =>
Expand All @@ -91,18 +90,18 @@ object JmsTransactedConsumer {
for {
inputDestination <- Resource.liftF(
connection
.createSession(SessionType.Transacted)
.createSession(Transacted)
.use(_.createDestination(inputDestinationName))
)
outputDestination <- Resource.liftF(
connection
.createSession(SessionType.Transacted)
.createSession(Transacted)
.use(_.createDestination(outputDestinationName))
)
pool <- Resource.liftF(Queue.bounded[F, JmsResource[F]](concurrencyLevel))
_ <- (0 until concurrencyLevel).toList.traverse_ { _ =>
for {
session <- connection.createSession(SessionType.Transacted)
session <- connection.createSession(Transacted)
consumer <- session.createConsumer(inputDestination)
jmsProducer <- session.createProducer(outputDestination)
producer = Map(outputDestinationName -> new JmsProducer(jmsProducer))
Expand Down
7 changes: 3 additions & 4 deletions core/src/main/scala/jms4s/JmsUnidentifiedPooledProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ import cats.implicits._
import fs2.concurrent.Queue
import jms4s.config.DestinationName
import jms4s.jms._
import jms4s.model.SessionType
import jms4s.model.SessionType.AutoAcknowledge

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.concurrent.duration.{ FiniteDuration, _ }

trait JmsUnidentifiedPooledProducer[F[_]] {

Expand Down Expand Up @@ -41,7 +40,7 @@ object JmsUnidentifiedPooledProducer {
)
_ <- (0 until concurrencyLevel).toList.traverse_ { _ =>
for {
session <- connection.createSession(SessionType.AutoAcknowledge)
session <- connection.createSession(AutoAcknowledge)
producer <- session.createUnidentifiedProducer
_ <- Resource.liftF(pool.enqueue1(JmsResource(session, producer, new MessageFactory(session))))
} yield ()
Expand Down
20 changes: 0 additions & 20 deletions core/src/main/scala/jms4s/config/config.scala
Original file line number Diff line number Diff line change
@@ -1,24 +1,8 @@
package jms4s.config

import cats.Order
import cats.data.NonEmptyList
import cats.implicits._

case class Config(
qm: QueueManager,
endpoints: NonEmptyList[Endpoint],
channel: Channel,
username: Option[Username] = None,
password: Option[Password] = None,
clientId: String
)

case class Username(value: String) extends AnyVal

case class Password(value: String) extends AnyVal

case class Endpoint(host: String, port: Int)

sealed trait DestinationName extends Product with Serializable
case class QueueName(value: String) extends DestinationName
case class TopicName(value: String) extends DestinationName
Expand All @@ -28,7 +12,3 @@ object DestinationName {
case (x, y) => Order[String].compare(x.toString, y.toString)
}
}

case class QueueManager(value: String) extends AnyVal

case class Channel(value: String) extends AnyVal
11 changes: 8 additions & 3 deletions core/src/main/scala/jms4s/jms/JmsConnection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@ class JmsConnection[F[_]: Sync: Logger] private[jms4s] (

def createSession(sessionType: SessionType): Resource[F, JmsSession[F]] =
for {
session <- Resource.fromAutoCloseable(
Logger[F].info(s"Opening QueueSession for $wrapped.") *>
session <- Resource.make(
Logger[F].info(s"Opening Session for Connection $wrapped.") *>
Sync[F].delay(wrapped.createSession(sessionType.rawTransacted, sessionType.rawAcknowledgeMode))
)(
s =>
Logger[F].info(s"Closing Session $s for Connection $wrapped...") *>
Sync[F].delay(s.close()) *>
Logger[F].info(s"Closed Session $s for Connection $wrapped.")
)
_ <- Resource.liftF(Logger[F].info(s"Opened QueueSession $session for $wrapped."))
_ <- Resource.liftF(Logger[F].info(s"Opened Session $session for Connection $wrapped."))
} yield new JmsSession(session, blocker)
}
Loading

0 comments on commit a6408aa

Please sign in to comment.