Skip to content

Commit

Permalink
Manually update scalafmt style version
Browse files Browse the repository at this point in the history
  • Loading branch information
AL333Z committed Apr 13, 2020
1 parent 3be3526 commit 4ff9916
Show file tree
Hide file tree
Showing 14 changed files with 114 additions and 124 deletions.
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = "2.0.1"
version = "2.4.2"

maxColumn = 120
align = most
Expand Down
13 changes: 6 additions & 7 deletions active-mq-artemis/src/main/scala/jms4s/activemq/activeMQ.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@ object activeMQ {
val factory = new ActiveMQConnectionFactory(hosts(config.endpoints))
factory.setClientID(config.clientId.value)

config.username.fold(factory.createContext())(
username => factory.createContext(username.value, config.password.map(_.value).getOrElse(""))
config.username.fold(factory.createContext())(username =>
factory.createContext(username.value, config.password.map(_.value).getOrElse(""))
)
}
)(
c =>
Logger[F].info(s"Closing context $c to MQ at ${hosts(config.endpoints)}...") *>
blocker.delay(c.close()) *>
Logger[F].info(s"Closed context $c to MQ at ${hosts(config.endpoints)}.")
)(c =>
Logger[F].info(s"Closing context $c to MQ at ${hosts(config.endpoints)}...") *>
blocker.delay(c.close()) *>
Logger[F].info(s"Closed context $c to MQ at ${hosts(config.endpoints)}.")
)
_ <- Resource.liftF(Logger[F].info(s"Opened context $context."))
} yield new JmsContext[F](context, blocker)
Expand Down
28 changes: 14 additions & 14 deletions core/src/main/scala/jms4s/JmsAcknowledgerConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,15 @@ object JmsAcknowledgerConsumer {
ifSend = send =>
send
.createMessages(messageFactory)
.flatMap(
toSend =>
toSend.messagesAndDestinations.traverse_ {
case (message, (name, delay)) =>
delay.fold(
ifEmpty = context.send(name, message)
)(
f = d => context.send(name, message, d)
)
} *> blocker.delay(message.wrapped.acknowledge())
.flatMap(toSend =>
toSend.messagesAndDestinations.traverse_ {
case (message, (name, delay)) =>
delay.fold(
ifEmpty = context.send(name, message)
)(
f = d => context.send(name, message, d)
)
} *> blocker.delay(message.wrapped.acknowledge())
)
)
_ <- pool.enqueue1((context, consumer))
Expand Down Expand Up @@ -95,6 +94,7 @@ object JmsAcknowledgerConsumer {
case class Send[F[_]](
createMessages: MessageFactory[F] => F[ToSend[F]]
) extends AckAction[F] {

override def fold(ifAck: => F[Unit], ifNoAck: => F[Unit], ifSend: Send[F] => F[Unit]): F[Unit] =
ifSend(this)
}
Expand All @@ -110,8 +110,8 @@ object JmsAcknowledgerConsumer {
def sendN[F[_]: Functor](
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage[F], DestinationName)]]
): Send[F] =
Send[F](
mf => messageFactory(mf).map(nel => nel.map { case (message, name) => (message, (name, None)) }).map(ToSend[F])
Send[F](mf =>
messageFactory(mf).map(nel => nel.map { case (message, name) => (message, (name, None)) }).map(ToSend[F])
)

def sendNWithDelay[F[_]: Functor](
Expand All @@ -125,8 +125,8 @@ object JmsAcknowledgerConsumer {
Send[F](mf => messageFactory(mf).map(x => ToSend[F](NonEmptyList.one(x))))

def send[F[_]: Functor](messageFactory: MessageFactory[F] => F[(JmsMessage[F], DestinationName)]): Send[F] =
Send[F](
mf => messageFactory(mf).map { case (message, name) => ToSend[F](NonEmptyList.one((message, (name, None)))) }
Send[F](mf =>
messageFactory(mf).map { case (message, name) => ToSend[F](NonEmptyList.one((message, (name, None)))) }
)
}
}
29 changes: 15 additions & 14 deletions core/src/main/scala/jms4s/JmsAutoAcknowledgerConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,15 @@ object JmsAutoAcknowledgerConsumer {
ifSend = send =>
send
.createMessages(messageFactory)
.flatMap(
toSend =>
toSend.messagesAndDestinations.traverse_ {
case (message, (name, delay)) =>
delay.fold(
ifEmpty = context.send(name, message)
)(
f = d => context.send(name, message, d)
)
}
.flatMap(toSend =>
toSend.messagesAndDestinations.traverse_ {
case (message, (name, delay)) =>
delay.fold(
ifEmpty = context.send(name, message)
)(
f = d => context.send(name, message, d)
)
}
)
)
_ <- pool.enqueue1((context, consumer))
Expand All @@ -80,13 +79,15 @@ object JmsAutoAcknowledgerConsumer {
}

object AutoAckAction {

private[jms4s] case class NoOp[F[_]]() extends AutoAckAction[F] {
override def fold(ifNoOp: => F[Unit], ifSend: Send[F] => F[Unit]): F[Unit] = ifNoOp
}

case class Send[F[_]](
createMessages: MessageFactory[F] => F[ToSend[F]]
) extends AutoAckAction[F] {

override def fold(ifNoOp: => F[Unit], ifSend: Send[F] => F[Unit]): F[Unit] =
ifSend(this)
}
Expand All @@ -100,8 +101,8 @@ object JmsAutoAcknowledgerConsumer {
def sendN[F[_]: Functor](
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage[F], DestinationName)]]
): Send[F] =
Send[F](
mf => messageFactory(mf).map(nel => nel.map { case (message, name) => (message, (name, None)) }).map(ToSend[F])
Send[F](mf =>
messageFactory(mf).map(nel => nel.map { case (message, name) => (message, (name, None)) }).map(ToSend[F])
)

def sendNWithDelay[F[_]: Functor](
Expand All @@ -115,8 +116,8 @@ object JmsAutoAcknowledgerConsumer {
Send[F](mf => messageFactory(mf).map(x => ToSend[F](NonEmptyList.one(x))))

def send[F[_]: Functor](messageFactory: MessageFactory[F] => F[(JmsMessage[F], DestinationName)]): Send[F] =
Send[F](
mf => messageFactory(mf).map { case (message, name) => ToSend[F](NonEmptyList.one((message, (name, None)))) }
Send[F](mf =>
messageFactory(mf).map { case (message, name) => ToSend[F](NonEmptyList.one((message, (name, None)))) }
)
}
}
1 change: 1 addition & 0 deletions core/src/main/scala/jms4s/JmsProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ object JmsProducer {
}
mf = MessageFactory[F](context)
} yield new JmsProducer[F] {

override def sendN(
f: MessageFactory[F] => F[NonEmptyList[(JmsMessage[F], DestinationName)]]
): F[Unit] =
Expand Down
40 changes: 19 additions & 21 deletions core/src/main/scala/jms4s/JmsTransactedConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ object JmsTransactedConsumer {
for {
pool <- Resource.liftF(Queue.bounded[F, (JmsContext[F], JmsMessageConsumer[F])](concurrencyLevel))
_ <- (0 until concurrencyLevel).toList
.traverse_(
_ =>
for {
c <- context.createContext(SessionType.Transacted)
consumer <- c.createJmsConsumer(inputDestinationName)
_ <- Resource.liftF(pool.enqueue1((c, consumer)))
} yield ()
.traverse_(_ =>
for {
c <- context.createContext(SessionType.Transacted)
consumer <- c.createJmsConsumer(inputDestinationName)
_ <- Resource.liftF(pool.enqueue1((c, consumer)))
} yield ()
)
} yield build(new JmsTransactedConsumerPool[F](pool), concurrencyLevel, MessageFactory[F](context))

Expand All @@ -57,16 +56,14 @@ object JmsTransactedConsumer {
ifSend = send => {
send
.createMessages(messageFactory)
.flatMap(
toSend =>
toSend.messagesAndDestinations.traverse_ {
case (message, (name, delay)) =>
delay.fold(
received.context.send(name, message)
)(
d => received.context.send(name, message, d)
) *> pool.commit(received.context, received.consumer)
}
.flatMap(toSend =>
toSend.messagesAndDestinations.traverse_ {
case (message, (name, delay)) =>
delay.fold(
received.context.send(name, message)
)(d => received.context.send(name, message, d)) *> pool
.commit(received.context, received.consumer)
}
)
}
)
Expand Down Expand Up @@ -122,6 +119,7 @@ object JmsTransactedConsumer {
case class Send[F[_]](
createMessages: MessageFactory[F] => F[ToSend[F]]
) extends TransactionAction[F] {

override def fold(ifCommit: => F[Unit], ifRollback: => F[Unit], ifSend: Send[F] => F[Unit]): F[Unit] =
ifSend(this)
}
Expand All @@ -137,8 +135,8 @@ object JmsTransactedConsumer {
def sendN[F[_]: Functor](
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage[F], DestinationName)]]
): Send[F] =
Send[F](
mf => messageFactory(mf).map(nel => nel.map { case (message, name) => (message, (name, None)) }).map(ToSend[F])
Send[F](mf =>
messageFactory(mf).map(nel => nel.map { case (message, name) => (message, (name, None)) }).map(ToSend[F])
)

def sendNWithDelay[F[_]: Functor](
Expand All @@ -152,8 +150,8 @@ object JmsTransactedConsumer {
Send[F](mf => messageFactory(mf).map(x => ToSend[F](NonEmptyList.one(x))))

def send[F[_]: Functor](messageFactory: MessageFactory[F] => F[(JmsMessage[F], DestinationName)]): Send[F] =
Send[F](
mf => messageFactory(mf).map { case (message, name) => ToSend[F](NonEmptyList.one((message, (name, None)))) }
Send[F](mf =>
messageFactory(mf).map { case (message, name) => ToSend[F](NonEmptyList.one((message, (name, None)))) }
)
}
}
1 change: 1 addition & 0 deletions core/src/main/scala/jms4s/config/config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ case class QueueName(value: String) extends DestinationName
case class TopicName(value: String) extends DestinationName

object DestinationName {

implicit val orderingDestinationName: Order[DestinationName] = Order.from[DestinationName] {
case (x, y) => Order[String].compare(x.toString, y.toString)
}
Expand Down
14 changes: 6 additions & 8 deletions core/src/main/scala/jms4s/jms/JmsContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ class JmsContext[F[_]: Sync: Logger: ContextShift: Concurrent](
_ <- Logger[F].info(s"Context $ctx successfully created")
} yield ctx
}
)(
context =>
Logger[F].info(s"Releasing context $context") *>
blocker.delay(context.close())
)(context =>
Logger[F].info(s"Releasing context $context") *>
blocker.delay(context.close())
)
.map(context => new JmsContext(context, blocker))

Expand All @@ -52,10 +51,9 @@ class JmsContext[F[_]: Sync: Logger: ContextShift: Concurrent](
consumer <- Resource.make(
Logger[F].info(s"Creating consumer for destination $destinationName") *>
blocker.delay(context.createConsumer(destination.wrapped))
)(
consumer =>
Logger[F].info(s"Closing consumer for destination $destinationName") *>
blocker.delay(consumer.close())
)(consumer =>
Logger[F].info(s"Closing consumer for destination $destinationName") *>
blocker.delay(consumer.close())
)
} yield new JmsMessageConsumer[F](consumer)

Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/jms4s/jms/JmsMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class JmsMessage[F[_]: Sync] private[jms4s] (private[jms4s] val wrapped: Message
}

object MessageOps {

implicit def showMessage: Show[Message] = Show.show[Message] { message =>
def getStringContent: Try[String] = message match {
case message: TextMessage => Try(message.getText)
Expand Down
9 changes: 4 additions & 5 deletions ibm-mq/src/main/scala/jms4s/ibmmq/ibmMQ.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,10 @@ object ibmMQ {
)
}.getOrElse(connectionFactory.createContext())
}
)(
c =>
Logger[F].info(s"Closing Context $c at ${hosts(config.endpoints)}...") *>
blocker.delay(c.close()) *>
Logger[F].info(s"Closed Context $c.")
)(c =>
Logger[F].info(s"Closing Context $c at ${hosts(config.endpoints)}...") *>
blocker.delay(c.close()) *>
Logger[F].info(s"Closed Context $c.")
)
_ <- Resource.liftF(Logger[F].info(s"Opened Context $context at ${hosts(config.endpoints)}."))
} yield new JmsContext[F](context, blocker)
Expand Down
13 changes: 6 additions & 7 deletions tests/src/test/scala/jms4s/JmsClientSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -327,11 +327,10 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec {
res.use {
case (producer, consumer1, consumer2, bodies, messages) =>
for {
_ <- messages.parTraverse_(
msg =>
producer.send(messageFactory(msg, outputQueueName1)) *> producer.send(
messageFactory(msg, outputQueueName2)
)
_ <- messages.parTraverse_(msg =>
producer.send(messageFactory(msg, outputQueueName1)) *> producer.send(
messageFactory(msg, outputQueueName2)
)
)
_ <- logger.info(s"Pushed ${messages.size} messages.")
_ <- logger.info(s"Consumer to Producer started. Collecting messages from output queue...")
Expand All @@ -358,8 +357,8 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec {
res.use {
case (producer, consumer1, consumer2, bodies, messages) =>
for {
_ <- messages.parTraverse_(
msg => producer.send(messageFactory(msg, topicName1)) *> producer.send(messageFactory(msg, topicName2))
_ <- messages.parTraverse_(msg =>
producer.send(messageFactory(msg, topicName1)) *> producer.send(messageFactory(msg, topicName2))
)
_ <- logger.info(s"Pushed ${messages.size} messages.")
_ <- logger.info(s"Consumer to Producer started. Collecting messages from output queue...")
Expand Down
31 changes: 13 additions & 18 deletions tests/src/test/scala/jms4s/basespec/Jms4sBaseSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,12 @@ trait Jms4sBaseSpec {
messages: NonEmptyList[JmsTextMessage[IO]]
): MessageFactory[IO] => IO[NonEmptyList[JmsTextMessage[IO]]] = { mFactory: MessageFactory[IO] =>
messages
.map(
message =>
message.getText.flatMap { text =>
mFactory
.makeTextMessage(text)
.map(message => (message))
}
.map(message =>
message.getText.flatMap { text =>
mFactory
.makeTextMessage(text)
.map(message => (message))
}
)
.sequence
}
Expand All @@ -100,15 +99,11 @@ trait Jms4sBaseSpec {
destinationName: DestinationName
): MessageFactory[IO] => IO[NonEmptyList[(JmsTextMessage[IO], DestinationName)]] =
(mFactory: MessageFactory[IO]) =>
messages
.map(
message => {
message.getText.flatMap { text =>
mFactory
.makeTextMessage(text)
.map(message => (message))
}.map(message => (message, destinationName))
}
)
.sequence
messages.map { message =>
message.getText.flatMap { text =>
mFactory
.makeTextMessage(text)
.map(message => (message))
}.map(message => (message, destinationName))
}.sequence
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,16 @@ trait ActiveMQArtemisBaseSpec extends Jms4sBaseSpec {
override def contextRes(implicit cs: ContextShift[IO]): Resource[IO, JmsContext[IO]] =
Blocker
.apply[IO]
.flatMap(
blocker =>
activeMQ.makeContext[IO](
Config(
endpoints = NonEmptyList.one(Endpoint("localhost", 61616)),
username = Some(Username("admin")),
password = Some(Password("passw0rd")),
clientId = ClientId("jms-specs")
),
blocker
)
.flatMap(blocker =>
activeMQ.makeContext[IO](
Config(
endpoints = NonEmptyList.one(Endpoint("localhost", 61616)),
username = Some(Username("admin")),
password = Some(Password("passw0rd")),
clientId = ClientId("jms-specs")
),
blocker
)
)

}
Loading

0 comments on commit 4ff9916

Please sign in to comment.