Skip to content

Commit

Permalink
Merge pull request #6 from fp-in-bo/producers
Browse files Browse the repository at this point in the history
Add Producers
  • Loading branch information
AL333Z authored Apr 7, 2020
2 parents 9028da0 + b758981 commit 7b0157f
Show file tree
Hide file tree
Showing 7 changed files with 556 additions and 22 deletions.
33 changes: 12 additions & 21 deletions core/src/main/scala/jms4s/JmsClient.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package jms4s

import cats.data._
import cats.effect.{ Concurrent, ContextShift, Resource, Sync }
import cats.implicits._
import cats.effect.{ Concurrent, ContextShift, Resource }
import jms4s.config.DestinationName
import jms4s.jms._

import scala.concurrent.duration.{ FiniteDuration, _ }

class JmsClient[F[_]: ContextShift: Concurrent] {

def createTransactedConsumer(
Expand Down Expand Up @@ -78,24 +75,18 @@ class JmsClient[F[_]: ContextShift: Concurrent] {
concurrencyLevel: Int
): Resource[F, JmsAutoAcknowledgerConsumer[F]] =
JmsAutoAcknowledgerConsumer.make(connection, inputDestinationName, outputDestinationName, concurrencyLevel)
}

class JmsProducer[F[_]: Sync: ContextShift] private[jms4s] (private[jms4s] val producer: JmsMessageProducer[F]) {

def publish(message: JmsMessage[F]): F[Unit] =
producer.send(message)

def publish(message: JmsMessage[F], delay: FiniteDuration): F[Unit] =
producer.setDeliveryDelay(delay) >> producer.send(message) >> producer.setDeliveryDelay(0.millis)
}

class JmsUnidentifiedProducer[F[_]: Sync: ContextShift] private[jms4s] (
private[jms4s] val producer: JmsUnidentifiedMessageProducer[F]
) {
def createProducer(
connection: JmsConnection[F],
destinationName: DestinationName,
concurrencyLevel: Int
): Resource[F, JmsPooledProducer[F]] =
JmsPooledProducer.make(connection, destinationName, concurrencyLevel)

def publish(destination: JmsDestination, message: JmsMessage[F]): F[Unit] =
producer.send(destination, message)
def createProducer(
connection: JmsConnection[F],
concurrencyLevel: Int
): Resource[F, JmsUnidentifiedPooledProducer[F]] =
JmsUnidentifiedPooledProducer.make(connection, concurrencyLevel)

def publish(destination: JmsDestination, message: JmsMessage[F], delay: FiniteDuration): F[Unit] =
producer.setDeliveryDelay(delay) >> producer.send(destination, message) >> producer.setDeliveryDelay(0.millis)
}
121 changes: 121 additions & 0 deletions core/src/main/scala/jms4s/JmsPooledProducer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package jms4s

import cats.data.NonEmptyList
import cats.effect.{ Concurrent, ContextShift, Resource, Sync }
import fs2.concurrent.Queue
import jms4s.config.DestinationName
import jms4s.jms._
import jms4s.model.SessionType
import cats.implicits._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._

trait JmsPooledProducer[F[_]] {

def sendN(
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage[F])]]
): F[Unit]

def sendNWithDelay(
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage[F], Option[FiniteDuration])]]
): F[Unit]

def sendWithDelay(
messageFactory: MessageFactory[F] => F[(JmsMessage[F], Option[FiniteDuration])]
): F[Unit]

def send(messageFactory: MessageFactory[F] => F[JmsMessage[F]]): F[Unit]

}

object JmsPooledProducer {

private[jms4s] def make[F[_]: ContextShift: Concurrent](
connection: JmsConnection[F],
queue: DestinationName,
concurrencyLevel: Int
): Resource[F, JmsPooledProducer[F]] =
for {
pool <- Resource.liftF(
Queue.bounded[F, JmsResource[F]](concurrencyLevel)
)
_ <- (0 until concurrencyLevel).toList.traverse_ { _ =>
for {
session <- connection.createSession(SessionType.AutoAcknowledge)
destination <- Resource.liftF(session.createDestination(queue))
producer <- session.createProducer(destination)
_ <- Resource.liftF(pool.enqueue1(JmsResource(session, producer, new MessageFactory(session))))
} yield ()
}
} yield new JmsPooledProducer[F] {
override def sendN(
f: MessageFactory[F] => F[NonEmptyList[JmsMessage[F]]]
): F[Unit] =
for {
resources <- pool.dequeue1
messages <- f(resources.messageFactory)
_ <- messages.traverse_(
message =>
for {
_ <- resources.producer
.send(message)
} yield ()
)
_ <- pool.enqueue1(resources)
} yield ()

override def sendNWithDelay(
f: MessageFactory[F] => F[NonEmptyList[(JmsMessage[F], Option[FiniteDuration])]]
): F[Unit] =
for {
resources <- pool.dequeue1
messagesDelays <- f(resources.messageFactory)
_ <- messagesDelays.traverse_(
messageWithDelay =>
for {
_ <- messageWithDelay._2 match {
case Some(delay) => resources.producer.setDeliveryDelay(delay)
case None => Sync[F].unit
}
_ <- resources.producer.send(messageWithDelay._1)
_ <- resources.producer.setDeliveryDelay(0.millis)
} yield ()
)
_ <- pool.enqueue1(resources)
} yield ()

override def sendWithDelay(
f: MessageFactory[F] => F[(JmsMessage[F], Option[FiniteDuration])]
): F[Unit] =
for {
resources <- pool.dequeue1
messagesWithDelay <- f(resources.messageFactory)
_ <- messagesWithDelay._2 match {
case Some(delay) => resources.producer.setDeliveryDelay(delay)
case None => Sync[F].unit
}
_ <- resources.producer.send(messagesWithDelay._1)
_ <- resources.producer.setDeliveryDelay(0.millis)
_ <- pool.enqueue1(resources)

} yield ()

override def send(f: MessageFactory[F] => F[JmsMessage[F]]): F[Unit] =
for {
resources <- pool.dequeue1
message <- f(resources.messageFactory)
_ <- for {
_ <- resources.producer
.send(message)
} yield ()
_ <- pool.enqueue1(resources)
} yield ()
}

case class JmsResource[F[_]] private[jms4s] (
session: JmsSession[F],
producer: JmsMessageProducer[F],
messageFactory: MessageFactory[F]
)

}
16 changes: 16 additions & 0 deletions core/src/main/scala/jms4s/JmsProducer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package jms4s

import cats.effect.{ ContextShift, Sync }
import cats.implicits._
import jms4s.jms.{ JmsMessage, JmsMessageProducer }

import scala.concurrent.duration.{ FiniteDuration, _ }

class JmsProducer[F[_]: Sync: ContextShift] private[jms4s] (private[jms4s] val producer: JmsMessageProducer[F]) {

def publish(message: JmsMessage[F]): F[Unit] =
producer.send(message)

def publish(message: JmsMessage[F], delay: FiniteDuration): F[Unit] =
producer.setDeliveryDelay(delay) >> producer.send(message) >> producer.setDeliveryDelay(0.millis)
}
124 changes: 124 additions & 0 deletions core/src/main/scala/jms4s/JmsUnidentifiedPooledProducer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package jms4s

import cats.data.NonEmptyList
import cats.effect.{ Concurrent, ContextShift, Resource, Sync }
import cats.implicits._
import fs2.concurrent.Queue
import jms4s.config.DestinationName
import jms4s.jms._
import jms4s.model.SessionType

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._

trait JmsUnidentifiedPooledProducer[F[_]] {

def sendN(
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage[F], DestinationName)]]
): F[Unit]

def sendNWithDelay(
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage[F], (DestinationName, Option[FiniteDuration]))]]
): F[Unit]

def sendWithDelay(
messageFactory: MessageFactory[F] => F[(JmsMessage[F], (DestinationName, Option[FiniteDuration]))]
): F[Unit]

def send(messageFactory: MessageFactory[F] => F[(JmsMessage[F], DestinationName)]): F[Unit]

}

object JmsUnidentifiedPooledProducer {

private[jms4s] def make[F[_]: ContextShift: Concurrent](
connection: JmsConnection[F],
concurrencyLevel: Int
): Resource[F, JmsUnidentifiedPooledProducer[F]] =
for {
pool <- Resource.liftF(
Queue.bounded[F, JmsResource[F]](concurrencyLevel)
)
_ <- (0 until concurrencyLevel).toList.traverse_ { _ =>
for {
session <- connection.createSession(SessionType.AutoAcknowledge)
producer <- session.createUnidentifiedProducer
_ <- Resource.liftF(pool.enqueue1(JmsResource(session, producer, new MessageFactory(session))))
} yield ()
}
} yield new JmsUnidentifiedPooledProducer[F] {
override def sendN(
f: MessageFactory[F] => F[NonEmptyList[(JmsMessage[F], DestinationName)]]
): F[Unit] =
for {
resources <- pool.dequeue1
messagesWithDestinations <- f(resources.messageFactory)
_ <- messagesWithDestinations.traverse_(
messageWithDestination =>
for {
jmsDestination <- resources.session.createDestination(messageWithDestination._2)
_ <- resources.producer
.send(jmsDestination, messageWithDestination._1)
} yield ()
)
_ <- pool.enqueue1(resources)
} yield ()

override def sendNWithDelay(
f: MessageFactory[F] => F[NonEmptyList[(JmsMessage[F], (DestinationName, Option[FiniteDuration]))]]
): F[Unit] =
for {
resources <- pool.dequeue1
messagesWithDestinationsAndDelayes <- f(resources.messageFactory)
_ <- messagesWithDestinationsAndDelayes.traverse_(
messageWithDestinationAndDelay =>
for {
jmsDestination <- resources.session.createDestination(messageWithDestinationAndDelay._2._1)
_ <- messageWithDestinationAndDelay._2._2 match {
case Some(delay) => resources.producer.setDeliveryDelay(delay)
case None => Sync[F].unit
}
_ <- resources.producer.send(jmsDestination, messageWithDestinationAndDelay._1)
_ <- resources.producer.setDeliveryDelay(0.millis)
} yield ()
)
_ <- pool.enqueue1(resources)
} yield ()

override def sendWithDelay(
f: MessageFactory[F] => F[(JmsMessage[F], (DestinationName, Option[FiniteDuration]))]
): F[Unit] =
for {
resources <- pool.dequeue1
messagesWithDestinationAndDelay <- f(resources.messageFactory)
jmsDestination <- resources.session.createDestination(messagesWithDestinationAndDelay._2._1)
_ <- messagesWithDestinationAndDelay._2._2 match {
case Some(a) => resources.producer.setDeliveryDelay(a)
case None => Sync[F].unit
}
_ <- resources.producer.send(jmsDestination, messagesWithDestinationAndDelay._1)
_ <- resources.producer.setDeliveryDelay(0.millis)
_ <- pool.enqueue1(resources)

} yield ()

override def send(f: MessageFactory[F] => F[(JmsMessage[F], DestinationName)]): F[Unit] =
for {
resources <- pool.dequeue1
messageWithDestination <- f(resources.messageFactory)
_ <- for {
jmsDestination <- resources.session.createDestination(messageWithDestination._2)
_ <- resources.producer
.send(jmsDestination, messageWithDestination._1)
} yield ()
_ <- pool.enqueue1(resources)
} yield ()
}

case class JmsResource[F[_]] private[jms4s] (
session: JmsSession[F],
producer: JmsUnidentifiedMessageProducer[F],
messageFactory: MessageFactory[F]
)

}
18 changes: 18 additions & 0 deletions core/src/main/scala/jms4s/JmsUnidentifiedProducer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package jms4s

import cats.effect.{ ContextShift, Sync }
import jms4s.jms.{ JmsDestination, JmsMessage, JmsUnidentifiedMessageProducer }
import cats.implicits._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._

class JmsUnidentifiedProducer[F[_]: Sync: ContextShift] private[jms4s] (
private[jms4s] val producer: JmsUnidentifiedMessageProducer[F]
) {

def publish(destination: JmsDestination, message: JmsMessage[F]): F[Unit] =
producer.send(destination, message)

def publish(destination: JmsDestination, message: JmsMessage[F], delay: FiniteDuration): F[Unit] =
producer.setDeliveryDelay(delay) >> producer.send(destination, message) >> producer.setDeliveryDelay(0.millis)
}
41 changes: 41 additions & 0 deletions tests/src/test/scala/jms4s/Jms4sBaseSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ trait Jms4sBaseSpec {
val timeout: FiniteDuration = 2.seconds
val delay: FiniteDuration = 500.millis
val topicName: TopicName = TopicName("DEV.BASE.TOPIC")
val topicName2: TopicName = TopicName("DEV.BASE.TOPIC.1")
val inputQueueName: QueueName = QueueName("DEV.QUEUE.1")
val outputQueueName1: QueueName = QueueName("DEV.QUEUE.2")
val outputQueueName2: QueueName = QueueName("DEV.QUEUE.3")
Expand All @@ -53,6 +54,10 @@ trait Jms4sBaseSpec {
.flatMap(_.asJmsTextMessage)
.flatMap(_.getText)

def receiveMessage(consumer: JmsMessageConsumer[IO]): IO[JmsTextMessage[IO]] =
consumer.receiveJmsMessage
.flatMap(_.asJmsTextMessage)

def receiveUntil(
consumer: JmsMessageConsumer[IO],
received: Ref[IO, Set[String]],
Expand All @@ -72,4 +77,40 @@ trait Jms4sBaseSpec {
.map(message => (message, destinationName))
}
}

def messageFactory(
message: JmsTextMessage[IO]
): MessageFactory[IO] => IO[JmsTextMessage[IO]] = { mFactory: MessageFactory[IO] =>
message.getText.flatMap { text =>
mFactory
.makeTextMessage(text)
.map(message => (message))
}
}

def messageWithDelayFactory(
message: (JmsTextMessage[IO], (DestinationName, Option[FiniteDuration]))
): MessageFactory[IO] => IO[(JmsTextMessage[IO], (DestinationName, Option[FiniteDuration]))] = {
mFactory: MessageFactory[IO] =>
message._1.getText.flatMap { text =>
mFactory
.makeTextMessage(text)
.map(m => (m, (message._2._1, message._2._2)))
}
}

def messageFactory(
messages: NonEmptyList[JmsTextMessage[IO]]
): MessageFactory[IO] => IO[NonEmptyList[JmsTextMessage[IO]]] = { mFactory: MessageFactory[IO] =>
messages
.map(
message =>
message.getText.flatMap { text =>
mFactory
.makeTextMessage(text)
.map(message => (message))
}
)
.sequence
}
}
Loading

0 comments on commit 7b0157f

Please sign in to comment.