diff --git a/build.gradle.kts b/build.gradle.kts index 4ce3a37..6b3519a 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -4,12 +4,12 @@ import org.jetbrains.kotlin.gradle.tasks.KotlinCompile plugins { `java-library` `maven-publish` - kotlin("jvm") version "1.9.20" - id("org.jetbrains.dokka") version "1.9.10" + kotlin("jvm") version "1.9.23" + id("org.jetbrains.dokka") version "1.9.20" } group = "org.veupathdb.lib" -version = "1.3.0" +version = "2.0.0" repositories { mavenCentral() diff --git a/makefile b/makefile index 414b9d3..f496a47 100644 --- a/makefile +++ b/makefile @@ -4,10 +4,14 @@ nothing: .PHONY: end-to-end end-to-end: - @docker-compose -f test/docker-compose.yml build \ + @docker compose -f test/docker-compose.yml build \ --build-arg=GITHUB_USERNAME=$(shell grep 'gpr.user' ~/.gradle/gradle.properties | cut -d= -f2) \ --build-arg=GITHUB_TOKEN=$(shell grep 'gpr.key' ~/.gradle/gradle.properties | cut -d= -f2) - @docker-compose -f test/docker-compose.yml up | grep --color=always -v rabbit_1 + @docker compose -f test/docker-compose.yml up + +.PHONY: kill-tests +kill-tests: + @docker compose -f test/docker-compose.yml down --rmi local -v .PHONY: docs docs: diff --git a/rabbitmq.conf b/rabbitmq.conf new file mode 100644 index 0000000..3b81b25 --- /dev/null +++ b/rabbitmq.conf @@ -0,0 +1,2 @@ +# 5 second timeout message ack timeout for error recovery testing. +consumer_timeout = 5000 \ No newline at end of file diff --git a/readme.adoc b/readme.adoc index 74c1db0..505c661 100644 --- a/readme.adoc +++ b/readme.adoc @@ -10,7 +10,7 @@ Client/server library for utilizing RabbitMQ as a job queue. .Gradle [source, kotlin] ---- - implementation("org.veupathdb.lib:rabbit-job-queue:1.0.0") + implementation("org.veupathdb.lib:rabbit-job-queue:2.0.0") ---- === Worker / Client diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueDispatcher.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/JobQueueDispatcher.kt similarity index 82% rename from src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueDispatcher.kt rename to src/main/kotlin/org/veupathdb/lib/rabbit/jobs/JobQueueDispatcher.kt index 4084131..8757492 100644 --- a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueDispatcher.kt +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/JobQueueDispatcher.kt @@ -3,6 +3,7 @@ package org.veupathdb.lib.rabbit.jobs import com.rabbitmq.client.CancelCallback import com.rabbitmq.client.DeliverCallback import org.slf4j.LoggerFactory +import org.veupathdb.lib.rabbit.jobs.config.QueueConfig import org.veupathdb.lib.rabbit.jobs.fn.ErrorHandler import org.veupathdb.lib.rabbit.jobs.fn.SuccessHandler import org.veupathdb.lib.rabbit.jobs.model.ErrorNotification @@ -11,11 +12,13 @@ import org.veupathdb.lib.rabbit.jobs.model.SuccessNotification import org.veupathdb.lib.rabbit.jobs.pools.ErrorHandlers import org.veupathdb.lib.rabbit.jobs.pools.SuccessHandlers import org.veupathdb.lib.rabbit.jobs.serialization.Json +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors /** * Job dispatcher. */ -class QueueDispatcher : QueueWrapper { +class JobQueueDispatcher : QueueWrapper { private val Log = LoggerFactory.getLogger(javaClass) @@ -23,9 +26,17 @@ class QueueDispatcher : QueueWrapper { private val successHandlers = SuccessHandlers() - constructor(config: QueueConfig): super(config) + private val workers: ExecutorService - constructor(action: QueueConfig.() -> Unit): super(action) + constructor(config: QueueConfig): super(config) { + workers = Executors.newFixedThreadPool(config.executor.workers) + initCallbacks() + } + + constructor(action: QueueConfig.() -> Unit): super(action) { + workers = Executors.newFixedThreadPool(config.executor.workers) + initCallbacks() + } /** * Registers a callback to be executed on job success notification. @@ -57,7 +68,7 @@ class QueueDispatcher : QueueWrapper { withDispatchQueue { publish(dispatchQueueName, job) } } - override fun initCallbacks() { + private fun initCallbacks() { withErrorQueue { basicConsume( errorQueueName, diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueWorker.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/JobQueueExecutor.kt similarity index 51% rename from src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueWorker.kt rename to src/main/kotlin/org/veupathdb/lib/rabbit/jobs/JobQueueExecutor.kt index 7a9195f..acc8af7 100644 --- a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueWorker.kt +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/JobQueueExecutor.kt @@ -1,30 +1,45 @@ package org.veupathdb.lib.rabbit.jobs -import com.rabbitmq.client.CancelCallback -import com.rabbitmq.client.DeliverCallback import org.slf4j.LoggerFactory +import org.veupathdb.lib.rabbit.jobs.config.QueueConfig import org.veupathdb.lib.rabbit.jobs.fn.JobHandler import org.veupathdb.lib.rabbit.jobs.model.ErrorNotification -import org.veupathdb.lib.rabbit.jobs.model.JobDispatch import org.veupathdb.lib.rabbit.jobs.model.SuccessNotification +import org.veupathdb.lib.rabbit.jobs.pools.exec.ExecutorPoolConfig import org.veupathdb.lib.rabbit.jobs.pools.JobHandlers -import org.veupathdb.lib.rabbit.jobs.serialization.Json +import org.veupathdb.lib.rabbit.jobs.pools.exec.JobQueueExecutorPool /** - * Job executor end of the job queue. + * Job execution end of the job queue. */ -class QueueWorker : QueueWrapper { +class JobQueueExecutor : QueueWrapper { private val Log = LoggerFactory.getLogger(javaClass) private val handlers = JobHandlers() + private val executorPool: JobQueueExecutorPool + /** * Instantiates a new QueueWorker based on the given configuration. * * @param config Configuration for the RabbitMQ connections. */ - constructor(config: QueueConfig): super(config) + constructor(config: QueueConfig): super(config) { + executorPool = JobQueueExecutorPool(ExecutorPoolConfig( + channelProvider = ::dispatchQueue, + queueName = config.jobQueueName, + handlers = handlers, + poolSize = config.executor.workers, + maxJobTime = config.executor.maxJobExecutionTime, + threadFactory = null, + failureChecker = config.executor.getOrCreateFailureEnforcer(), + shutdownCB = ::abort, + timeoutCB = config.executor.jobTimeoutCallback, + )) + + executorPool.start() + } /** * Instantiates a new QueueWorker using the given action to configure the @@ -32,7 +47,21 @@ class QueueWorker : QueueWrapper { * * @param action Action used to configure the RabbitMQ connections. */ - constructor(action: QueueConfig.() -> Unit): super(action) + constructor(action: QueueConfig.() -> Unit): super(action) { + executorPool = JobQueueExecutorPool(ExecutorPoolConfig( + channelProvider = ::dispatchQueue, + queueName = config.jobQueueName, + handlers = handlers, + poolSize = config.executor.workers, + maxJobTime = config.executor.maxJobExecutionTime, + threadFactory = null, + failureChecker = config.executor.getOrCreateFailureEnforcer(), + shutdownCB = ::abort, + timeoutCB = config.executor.jobTimeoutCallback, + )) + + executorPool.start() + } /** * Registers a callback to be executed when a new job is submitted to the @@ -67,27 +96,12 @@ class QueueWorker : QueueWrapper { withSuccessQueue { publish(successQueueName, msg) } } - /** - * Initializes the job queue callback. - */ - override fun initCallbacks() { - withDispatchQueue { - basicConsume( - dispatchQueueName, - false, - DeliverCallback { _, msg -> - Log.debug("handling job message {}", msg.envelope.deliveryTag) - workers.execute { - try { - handlers.execute(JobDispatch.fromJson(Json.from(msg.body))) - } finally { - Log.debug("acknowledging job message {}", msg.envelope.deliveryTag) - basicAck(msg.envelope.deliveryTag, false) - } - } - }, - CancelCallback { } - ) - } + fun shutdown(blocking: Boolean = true) { + executorPool.stop(blocking) + } + + private fun abort() { + Log.info("closing connection to RabbitMQ") + connection.abort() } } \ No newline at end of file diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueConfig.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueConfig.kt deleted file mode 100644 index 2a58306..0000000 --- a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueConfig.kt +++ /dev/null @@ -1,149 +0,0 @@ -package org.veupathdb.lib.rabbit.jobs - -/** - * RabbitMQ Queue Configuration - */ -class QueueConfig { - /** - * RabbitMQ Connection Hostname - */ - var hostname = "rabbit" - - /** - * RabbitMQ Authentication Username - */ - var username = "guest" - - /** - * RabbitMQ Authentication Password - */ - var password = "guest" - - /** - * RabbitMQ Connection Port - */ - var hostPort = 5672 - - /** - * RabbitMQ Connection Timeout - */ - var timeout = 5_000 - - /** - * Job Dispatch Queue Name - */ - var jobQueueName = "jobs" - - /** - * Job Failure Notification Queue Name - */ - var errorQueueName = "errors" - - /** - * Job Success Notification Queue Name - */ - var successQueueName = "successes" - - /** - * Callback Worker Count - * - * Number of worker threads used to handle incoming messages. - */ - var workers = 5 - - /** - * Configures the RabbitMQ hostname. - * - * @param host RabbitMQ hostname. - * - * @return This configuration. - */ - fun hostname(host: String): QueueConfig { - hostname = host - return this - } - - /** - * Configures the RabbitMQ authentication username. - * - * @param user RabbitMQ username. - * - * @return This configuration. - */ - fun username(user: String): QueueConfig { - username = user - return this - } - - /** - * Configures the RabbitMQ authentication password. - * - * @param pass RabbitMQ password. - * - * @return This configuration. - */ - fun password(pass: String): QueueConfig { - password = pass - return this - } - - /** - * Configures the RabbitMQ host port. - * - * @param port RabbitMQ port. - * - * @return This configuration. - */ - fun hostPort(port: Int): QueueConfig { - hostPort = port - return this - } - - /** - * Configures the RabbitMQ connection timeout. - * - * @param time Connection timeout. - * - * @return This configuration. - */ - fun timeout(time: Int): QueueConfig { - timeout = time - return this - } - - /** - * Configures the name of the job dispatch queue. - * - * @param name Job dispatch queue name. - * - * @return This configuration. - */ - fun jobQueueName(name: String): QueueConfig { - jobQueueName = name - return this - } - - /** - * Configures the name of the job error notification queue. - * - * @param name Error notification queue name. - * - * @return This configuration. - */ - fun errorQueueName(name: String): QueueConfig { - errorQueueName = name - return this - } - - /** - * Configures the name of the job success notification queue. - * - * @param name Success notification queue name. - * - * @return This configuration. - */ - fun successQueueName(name: String): QueueConfig { - successQueueName = name - return this - } -} \ No newline at end of file diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueWrapper.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueWrapper.kt index 75e5d4e..1f8bee8 100644 --- a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueWrapper.kt +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueWrapper.kt @@ -3,10 +3,9 @@ package org.veupathdb.lib.rabbit.jobs import com.rabbitmq.client.Channel import com.rabbitmq.client.Connection import com.rabbitmq.client.ConnectionFactory +import org.veupathdb.lib.rabbit.jobs.config.QueueConfig import org.veupathdb.lib.rabbit.jobs.serialization.JsonSerializable import java.nio.charset.StandardCharsets -import java.util.concurrent.ExecutorService -import java.util.concurrent.Executors /** * Base implementation of a queue worker or dispatcher. @@ -17,22 +16,12 @@ sealed class QueueWrapper { */ private val factory = ConnectionFactory() - protected val workers: ExecutorService + protected val config: QueueConfig /** * Open RabbitMQ Connection - * - * Connection will be refreshed if it has closed since last access. */ - protected var connection: Connection - get() { - if (!field.isOpen) { - field = factory.newConnection() - } - - return field - } - private set + protected val connection: Connection /** * Job Dispatch Queue Name @@ -55,16 +44,14 @@ sealed class QueueWrapper { * @param config Queue configuration used to configure this [QueueWrapper]. */ constructor(config: QueueConfig) { + this.config = config configure(config) + connection = factory.newConnection() - workers = Executors.newFixedThreadPool(config.workers) dispatchQueueName = config.jobQueueName errorQueueName = config.errorQueueName successQueueName = config.successQueueName - - @Suppress("LeakingThis") - initCallbacks() } /** @@ -74,18 +61,14 @@ sealed class QueueWrapper { * configure this [QueueWrapper]. */ constructor(action: QueueConfig.() -> Unit) { - val tmp = QueueConfig() - tmp.action() - configure(tmp) + config = QueueConfig() + config.action() + configure(config) connection = factory.newConnection() - workers = Executors.newFixedThreadPool(tmp.workers) - dispatchQueueName = tmp.jobQueueName - errorQueueName = tmp.errorQueueName - successQueueName = tmp.successQueueName - - @Suppress("LeakingThis") - initCallbacks() + dispatchQueueName = config.jobQueueName + errorQueueName = config.errorQueueName + successQueueName = config.successQueueName } /** @@ -107,14 +90,18 @@ sealed class QueueWrapper { * Initializes the job dispatch queue if it is not already initialized. */ protected open fun Channel.initDispatchQueue() { - queueDeclare(dispatchQueueName, true, false, false, emptyMap()) + queueDeclare( + /* queue = */ dispatchQueueName, + /* durable = */ true, + /* exclusive = */ false, + /* autoDelete = */ false, + /* arguments = */ mapOf( + "x-consumer-timeout" to (config.executor.maxJobExecutionTime * 1.5).inWholeMilliseconds + ), + ) } - /** - * Internal inline channel usage. - */ - protected inline fun withChannel(action: Channel.() -> Unit) = - with(connection.createChannel()) { action() } + protected fun dispatchQueue(): Channel = connection.createChannel().also { it.initDispatchQueue() } /** * Executes the given action against the job dispatch queue in a thread safe @@ -122,11 +109,9 @@ sealed class QueueWrapper { * * @param action Action to execute against the job dispatch queue. */ - protected inline fun withDispatchQueue(action: Channel.() -> Unit) = - withChannel { - initDispatchQueue() - action() - } + protected inline fun withDispatchQueue(action: Channel.() -> Unit) = dispatchQueue().let(action) + + protected fun errorQueue(): Channel = connection.createChannel().also { it.initErrorQueue() } /** * Executes the give action against the error notification queue in a thread @@ -134,11 +119,9 @@ sealed class QueueWrapper { * * @param action Action to execute against the error notification queue. */ - protected inline fun withErrorQueue(action: Channel.() -> Unit) = - withChannel { - initErrorQueue() - action() - } + protected inline fun withErrorQueue(action: Channel.() -> Unit) = errorQueue().let(action) + + protected fun successQueue(): Channel = connection.createChannel().also { it.initSuccessQueue() } /** * Executes the given action against the success notification queue in a @@ -146,11 +129,7 @@ sealed class QueueWrapper { * * @param action Action to execute */ - protected inline fun withSuccessQueue(action: Channel.() -> Unit) = - withChannel { - initSuccessQueue() - action() - } + protected inline fun withSuccessQueue(action: Channel.() -> Unit) = successQueue().let(action) /** * Publishes a message to the given [Channel]. @@ -167,11 +146,6 @@ sealed class QueueWrapper { ) } - /** - * Initialize queue callbacks. - */ - protected abstract fun initCallbacks() - /** * Configures the RabbitMQ [ConnectionFactory] based on the settings in the * given [QueueConfig]. @@ -179,10 +153,10 @@ sealed class QueueWrapper { * @param config Caller initialized RabbitMQ configuration properties. */ private fun configure(config: QueueConfig) { - factory.host = config.hostname - factory.username = config.username - factory.password = config.password - factory.port = config.hostPort - factory.connectionTimeout = config.timeout + factory.host = config.connection.hostname + factory.username = config.connection.username + factory.password = config.connection.password + factory.port = config.connection.hostPort + factory.connectionTimeout = config.connection.timeout.inWholeMilliseconds.toInt() } } diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/AnyFailureEnforcer.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/AnyFailureEnforcer.kt new file mode 100644 index 0000000..0149ed1 --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/AnyFailureEnforcer.kt @@ -0,0 +1,9 @@ +package org.veupathdb.lib.rabbit.jobs.config + +internal class AnyFailureEnforcer(policies: Array) : ExecutorFailureEnforcer { + private val enforcers = policies.map(ExecutorFailurePolicy::newEnforcer) + + override fun markFailure() = enforcers.forEach(ExecutorFailureEnforcer::markFailure) + override fun shouldHalt() = enforcers.any(ExecutorFailureEnforcer::shouldHalt) + override fun reason() = enforcers.first(ExecutorFailureEnforcer::shouldHalt).reason() +} \ No newline at end of file diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ConnectionConfig.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ConnectionConfig.kt new file mode 100644 index 0000000..774eaa3 --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ConnectionConfig.kt @@ -0,0 +1,47 @@ +package org.veupathdb.lib.rabbit.jobs.config + +import com.rabbitmq.client.ConnectionFactory +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds + +class ConnectionConfig { + /** + * RabbitMQ Connection Hostname + */ + var hostname: String = "rabbit" + set(value) { + if (hostname.isBlank()) + throw IllegalArgumentException("RabbitMQ hostname cannot be blank") + field = value + } + + /** + * RabbitMQ Authentication Username + */ + var username: String = "guest" + + /** + * RabbitMQ Authentication Password + */ + var password: String = "guest" + + /** + * RabbitMQ Connection Port + */ + var hostPort: Int = 5672 + set(value) { + if (value !in 1 .. 65535) + throw IllegalArgumentException("invalid port number $value") + field = value + } + + /** + * RabbitMQ Connection Timeout + */ + var timeout: Duration = 5.seconds + + /** + * Connection Factory + */ + var connectionFactory: ConnectionFactory = ConnectionFactory() +} diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorConfig.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorConfig.kt new file mode 100644 index 0000000..6e962fa --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorConfig.kt @@ -0,0 +1,42 @@ +package org.veupathdb.lib.rabbit.jobs.config + +import org.veupathdb.lib.rabbit.jobs.model.JobDispatch +import kotlin.time.Duration +import kotlin.time.Duration.Companion.minutes +import kotlin.time.Duration.Companion.seconds + +class ExecutorConfig { + /** + * Configures the number of job execution worker threads. + */ + var workers: Int = 5 + set(value) { + if (value < 1) + throw IllegalArgumentException("cannot set worker count to a value less than 1") + + field = value + } + + /** + * Max allowed execution time for a single job. + * + * If a job exceeds this time it will be forceably terminated. + */ + var maxJobExecutionTime: Duration = 15.minutes + + /** + * Callback that will be executed for every job that is killed for exceeding + * the configured [maxJobExecutionTime] value. + */ + var jobTimeoutCallback: (JobDispatch) -> Unit = {} + + /** + * Failure policy defining the circumstances in which the target + * [JobQueueExecutor][org.veupathdb.lib.rabbit.jobs.JobQueueExecutor] should + * shut down without attempting recovery. + */ + var failurePolicy: ExecutorFailurePolicy? = null + + internal fun getOrCreateFailureEnforcer() = + failurePolicy?.newEnforcer() ?: WindowedFailureEnforcer((workers * 2).toUInt(), 2.seconds) +} \ No newline at end of file diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorFailureEnforcer.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorFailureEnforcer.kt new file mode 100644 index 0000000..eb25374 --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorFailureEnforcer.kt @@ -0,0 +1,34 @@ +package org.veupathdb.lib.rabbit.jobs.config + +/** + * Executor Failure State Shutdown Enforcer + * + * Tracks failures and determines when an executor should be considered as + * "failed" and shut down. + * + * @author Elizabeth Paige Harper [foxcapades.io@gmail.com] + * @since 2.0.0 + */ +interface ExecutorFailureEnforcer { + /** + * Called when a channel or consumer is killed due to an unhandled exception. + */ + fun markFailure() + + /** + * Called to test whether the containing executor should be halted according + * to the rules defined in this enforcer. + * + * @return `true` if the executor should be halted, otherwise `false`. + */ + fun shouldHalt(): Boolean + + /** + * Returns the failure reason for this enforcer. + * + * This method will only be called after [shouldHalt] has returned `true`. + * + * @return A message describing the reason the executor is being halted. + */ + fun reason(): String +} \ No newline at end of file diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorFailurePolicy.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorFailurePolicy.kt new file mode 100644 index 0000000..edc12c0 --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorFailurePolicy.kt @@ -0,0 +1,65 @@ +package org.veupathdb.lib.rabbit.jobs.config + +import kotlin.time.Duration + +/** + * Defines a policy that itself defines the state or point at which an executor + * should be considered "failed" and shut down. + * + * This type is effectively a factory for [ExecutorFailureEnforcer] instances + * which will individually be used to enforce the rules of the policy. + * + * @author Elizabeth Paige Harper [foxcapades.io@gmail.com] + * @since 2.0.0 + */ +fun interface ExecutorFailurePolicy { + /** + * Creates a new [ExecutorFailureEnforcer] instance. + */ + fun newEnforcer(): ExecutorFailureEnforcer + + companion object { + /** + * Defines a new failure policy that considers an executor as failed when + * channels or consumers are unexpectedly killed more than [max] times + * within the time window defined by [within]. + * + * This may be used to catch instances where consumers are spinning up and + * failing immediately, possibly indicating a persistent issue that will not + * be resolved automatically. + * + * @param max Max permissible number of times consumers may be unexpectedly + * killed within the defined time window. + * + * @param within Defines the time window within which at most [max] + * unexpected consumer deaths may occur. + * + * @return A new [ExecutorFailurePolicy] instance. + */ + fun maxFailuresWithin(max: Int, within: Duration) = ExecutorFailurePolicy { WindowedFailureEnforcer(max.toUInt(), within) } + + /** + * Defines a new failure policy that considers an executor as failed when + * channels or consumers are unexpectedly killed more than [max] times total + * within the lifespan of the executor. + * + * @param max Max permissible number of times consumers may be unexpectedly + * killed. + * + * @return A new [ExecutorFailurePolicy] instance. + */ + fun maxTotalFailures(max: Int) = ExecutorFailurePolicy { MaxFailureEnforcer(max.toUInt()) } + + /** + * Defines a new failure policy that wraps other policies and considers an + * executor as failed when any of the sub policies consider the executor as + * failed. + * + * @param others Policies to apply. + * + * @return A new [ExecutorFailurePolicy] instance. + */ + fun ofAny(vararg others: ExecutorFailurePolicy) = ExecutorFailurePolicy { AnyFailureEnforcer(others) } + } +} + diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/MaxFailureEnforcer.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/MaxFailureEnforcer.kt new file mode 100644 index 0000000..c62ce2a --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/MaxFailureEnforcer.kt @@ -0,0 +1,9 @@ +package org.veupathdb.lib.rabbit.jobs.config + +internal class MaxFailureEnforcer(private val maxFailures: UInt) : ExecutorFailureEnforcer { + private var failureCount = 0u + + override fun markFailure() { failureCount++ } + override fun shouldHalt() = failureCount > maxFailures + override fun reason() = "process exceeded $maxFailures total failures" +} \ No newline at end of file diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/QueueConfig.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/QueueConfig.kt new file mode 100644 index 0000000..f2663ba --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/QueueConfig.kt @@ -0,0 +1,82 @@ +package org.veupathdb.lib.rabbit.jobs.config + +/** + * RabbitMQ Queue Configuration + */ +class QueueConfig { + + /** + * RabbitMQ connection configuration. + */ + var connection: ConnectionConfig = ConnectionConfig() + + /** + * RabbitMQ connection configuration. + * + * @param connectionConfig New connection configuration object. + * + * @return This configuration. + */ + fun connection(connectionConfig: ConnectionConfig) = also { it.connection = connectionConfig } + + /** + * Executes the given function on the current [connection] value. + * + * @param fn Function to execute on the current [connection] value. + * + * @return This configuration. + */ + inline fun connection(fn: ConnectionConfig.() -> Unit) = also { it.connection.fn() } + + + /** + * Job Dispatch Queue Name + */ + var jobQueueName = "jobs" + + /** + * Configures the name of the job dispatch queue. + * + * @param name Job dispatch queue name. + * + * @return This configuration. + */ + fun jobQueueName(name: String) = apply { jobQueueName = name } + + + /** + * Job Failure Notification Queue Name + */ + var errorQueueName = "errors" + + /** + * Configures the name of the job error notification queue. + * + * @param name Error notification queue name. + * + * @return This configuration. + */ + fun errorQueueName(name: String) = apply { errorQueueName = name } + + + /** + * Job Success Notification Queue Name + */ + var successQueueName = "successes" + + /** + * Configures the name of the job success notification queue. + * + * @param name Success notification queue name. + * + * @return This configuration. + */ + fun successQueueName(name: String) = apply { successQueueName = name } + + + var executor: ExecutorConfig = ExecutorConfig() + + fun executor(executorConfig: ExecutorConfig) = also { it.executor = executorConfig } + + inline fun executor(fn: ExecutorConfig.() -> Unit) = also { it.executor.fn() } +} diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/WindowedFailureEnforcer.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/WindowedFailureEnforcer.kt new file mode 100644 index 0000000..bf271a3 --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/WindowedFailureEnforcer.kt @@ -0,0 +1,15 @@ +package org.veupathdb.lib.rabbit.jobs.config + +import org.veupathdb.lib.rabbit.jobs.utils.ScrollingCounter +import kotlin.time.Duration + +internal class WindowedFailureEnforcer( + private val maxFailures: UInt, + private val within: Duration, +) : ExecutorFailureEnforcer { + private val counter = ScrollingCounter(within) + + override fun markFailure() = counter.inc() + override fun shouldHalt() = counter > maxFailures + override fun reason() = "process exceeded $maxFailures within $within" +} \ No newline at end of file diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/ChannelProvider.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/ChannelProvider.kt new file mode 100644 index 0000000..c48084c --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/ChannelProvider.kt @@ -0,0 +1,6 @@ +package org.veupathdb.lib.rabbit.jobs.pools.exec + +import com.rabbitmq.client.Channel + +internal typealias ChannelProvider = () -> Channel + diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/Executor.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/Executor.kt new file mode 100644 index 0000000..052fdc9 --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/Executor.kt @@ -0,0 +1,110 @@ +package org.veupathdb.lib.rabbit.jobs.pools.exec + +import com.rabbitmq.client.Channel +import com.rabbitmq.client.ConsumerShutdownSignalCallback +import org.slf4j.LoggerFactory +import org.veupathdb.lib.rabbit.jobs.model.JobDispatch +import org.veupathdb.lib.rabbit.jobs.pools.JobHandlers +import org.veupathdb.lib.rabbit.jobs.serialization.Json +import java.util.concurrent.* +import kotlin.time.Duration + +/** + * Single 'worker' wrapping a RabbitMQ consumer client whose purpose is to parse + * incoming job config messages and fire jobs onto an external thread pool for + * each message. + * + * Each executor runs a single job at a time and awaits the result of that job + * before acknowledging the job config message and moving on to the next + * available message. + * + * Additionally, via the [next] and [prev] values, `Executor` instances act as + * nodes in a linked list, allowing dead nodes to be pruned from the executor + * pool efficiently. + * + * @author Elizabeth Paige Harper [foxcapades.io@gmail.com] + * @since 2.0.0 + */ +internal class Executor( + /** + * Identifier string for this executor. + * + * This value is used for logging. + */ + val id: String, + + /** + * Channel this executor should subscribe to. + */ + private val channel: Channel, + + /** + * Job execution handlers. + */ + private val handlers: JobHandlers, + + /** + * Max allowed job execution time. + */ + private val jobTimeout: Duration, + + /** + * Callback used to submit jobs to a worker pool. + */ + private val submitJobFn: (Runnable) -> Future<*>, + + private val timeoutFn: (JobDispatch) -> Unit, +) { + private val log = LoggerFactory.getLogger(javaClass) + + var prev: Executor? = null + + var next: Executor? = null + + fun init(queue: String, shutdown: ConsumerShutdownSignalCallback) { + channel.basicConsume( + queue, + false, + { _, msg -> + if (!channel.isOpen) { + log.error("consumer '{}' cannot execute job for message {}, channel is closed!", id, msg.envelope.deliveryTag) + return@basicConsume + } + + log.debug("consumer '{}' executing job for message {}", id, msg.envelope.deliveryTag) + + val dispatch = JobDispatch.fromJson(Json.from(msg.body)) + val future = submitJobFn { handlers.execute(dispatch) } + + // Wait for {jobTimeout} at most before killing the job and + // acknowledging the message + try { + future.get(jobTimeout.inWholeSeconds, TimeUnit.SECONDS) + log.debug("acknowledging job message {}", msg.envelope.deliveryTag) + channel.basicAck(msg.envelope.deliveryTag, false) + } catch (e: TimeoutException) { + log.warn("consumer '{}' killing job for message {} for taking longer than {}", id, msg.envelope.deliveryTag, jobTimeout) + swallow { future.cancel(true) } + swallow { channel.basicAck(msg.envelope.deliveryTag, false) } + swallow { timeoutFn(dispatch) } + } + }, + { }, + shutdown, + ) + } + + fun stop() { + log.debug("closing channel for consumer {}", id) + try { channel.close() } catch (e: Throwable) { /* do nothing */ } + } + + private fun swallow(log: Boolean = true, fn: () -> Unit) { + try { + fn() + } catch (e: Throwable) { + if (log) + this.log.warn("caught exception:", e) + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/ExecutorPoolConfig.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/ExecutorPoolConfig.kt new file mode 100644 index 0000000..71dcd2a --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/ExecutorPoolConfig.kt @@ -0,0 +1,69 @@ +package org.veupathdb.lib.rabbit.jobs.pools.exec + +import org.veupathdb.lib.rabbit.jobs.config.ExecutorFailureEnforcer +import org.veupathdb.lib.rabbit.jobs.model.JobDispatch +import org.veupathdb.lib.rabbit.jobs.pools.JobHandlers +import java.util.concurrent.ThreadFactory +import kotlin.time.Duration + +/** + * Configuration values for a [JobQueueExecutorPool] instance. + * + * @author Elizabeth Paige Harper [foxcapades.io@gmail.com] + * @since 2.0.0 + */ +internal data class ExecutorPoolConfig( + /** + * Function used to get new RabbitMQ `Channel` instances to bind newly created + * [Executor]s to. + */ + val channelProvider: ChannelProvider, + + /** + * Name of the queue [Executor]s in the target pool should subscribe to. + */ + val queueName: String, + + /** + * Job execution handlers. + */ + val handlers: JobHandlers, + + /** + * Number of [Executor]s that should be kept in the target pool. + */ + val poolSize: Int, + + /** + * Max allowed job execution time. + * + * Jobs will be killed if they exceed this time limit. + */ + val maxJobTime: Duration, + + /** + * Thread factory to use in the [JobQueueExecutorPool]'s internal thread pool. + * + * If this value is set to null, a default thread factory will be used. + */ + val threadFactory: ThreadFactory?, + + /** + * Executor failure checker. + * + * Used to determine when the target executor pool has reached a 'failed' + * state. + */ + val failureChecker: ExecutorFailureEnforcer, + + /** + * Shutdown callback. Used to trigger a connection close on critical worker + * pool failure. + */ + val shutdownCB: () -> Unit, + + /** + * On job timeout callback. + */ + val timeoutCB: (JobDispatch) -> Unit, +) \ No newline at end of file diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/JobQueueExecutorPool.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/JobQueueExecutorPool.kt new file mode 100644 index 0000000..86745ff --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/JobQueueExecutorPool.kt @@ -0,0 +1,232 @@ +package org.veupathdb.lib.rabbit.jobs.pools.exec + +import com.rabbitmq.client.Consumer +import com.rabbitmq.client.ConsumerShutdownSignalCallback +import org.slf4j.LoggerFactory +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import kotlin.time.Duration +import kotlin.time.Duration.Companion.days + +/** + * Job Queue Executor Pool + * + * Contains a pool of job execution workers subscribed to a RabbitMQ queue + * defined by the given configuration params. + * + * Each executor in the pool operates with its own [Consumer] instance on its + * own channel provided by the given [ChannelProvider]. + * + * If an executor's consumer dies, the executor will be replaced. + * + * ```kotlin + * // Create a new executor pool + * val executor = JobQueueExecutorPool(config) + * + * // Start the executor pool's workers + * executor.start() + * + * // Gracefully stop the executor pool + * executor.stop() + * + * // Immediately stop the executor pool (interrupting any running jobs) + * executor.stopNow() + * ``` + * + * @author Elizabeth Paige Harper [foxcapades.io@gmail.com] + * @since 2.0.0 + */ +internal class JobQueueExecutorPool(private val config: ExecutorPoolConfig) { + private val logger = LoggerFactory.getLogger(javaClass) + + private val closing = AtomicBoolean(false) + private val name = "${config.queueName}-consumer-pool" + private val threads = Executors.newFixedThreadPool(config.poolSize) + + private var head: Executor? = null + private var tail: Executor? = null + private var counter = 0u + + init { + if (config.poolSize !in 1..64) + throw IllegalArgumentException("invalid size value ${config.poolSize}, must be in the range [1, 64]") + } + + /** + * Starts the executor pool's workers. + * + * Until this method is called, the worker pool will not receive or react to + * any job queue messages. + */ + fun start() { + for (i in 0 ..< config.poolSize) + add(createNew().also { it.init(config.queueName, onConsumerShutdown(it)) }) + } + + /** + * Attempts to gracefully shut down the executor pool. + * + * If [block] is `true`, this method will block the current thread until the + * [killAfter] duration has been reached, at which point the executor pool + * will request a force-shutdown of the underlying thread pool, aborting any + * jobs still in progress at that point. If [blockAfterKill] is also `true` + * at this point, the method will continue to block after the force-shutdown + * request until the last job has ended. + * + * If [block] is `false`, this method will not block. It will request the + * underlying thread pool shut down, then return immediately. In this case + * the values of [killAfter] and [blockAfterKill] are ignored. + * + * @param block Whether this method should block the current thread until the + * underlying thread pool has gracefully shut down. + * + * @param killAfter Max duration to wait for a graceful shutdown before + * attempting to abort remaining in-progress job executions. + * + * @param blockAfterKill Whether this method should continue to block after + * the [killAfter] duration has passed. + */ + fun stop( + block: Boolean = false, + killAfter: Duration = 10_000.days, + blockAfterKill: Boolean = true, + ): StopCode { + stopExecutors() + threads.shutdown() + + if (!block) + return if (threads.isTerminated) StopCode.Graceful else StopCode.Unknown + + if (threads.awaitTermination(killAfter.inWholeMilliseconds, TimeUnit.MILLISECONDS)) + return StopCode.Graceful + + threads.shutdownNow() + silently(config.shutdownCB) + if (blockAfterKill) + threads.awaitTermination(243256, TimeUnit.DAYS) + + return StopCode.Forced + } + + @JvmInline value class StopCode private constructor(private val value: Int) { + companion object { + val Unknown = StopCode(0) + val Graceful = StopCode(1) + val Forced = StopCode(2) + } + } + + /** + * Shuts down the executor pool by force, aborting any currently running jobs. + * + * This method may optionally block until the shutdown has completed. + * + * @param block Whether this method should block until the shutdown has + * completed. + */ + fun stopNow(block: Boolean = true): StopCode { + stopExecutors() + threads.shutdownNow() + silently(config.shutdownCB) + + if (block) + threads.awaitTermination(243256, TimeUnit.DAYS) // basically just wait indefinitely + + return StopCode.Forced + } + + private fun stopExecutors() { + synchronized(this) { + if (closing.get()) + return + + closing.set(true) + + var next = head + head = null + tail = null + + while (next != null) { + val tn = next.next + + next.prev = null + next.next = null + next.stop() + + next = tn + } + } + } + + private fun createNew(): Executor { + counter++ + val name = "$name-$counter" + + logger.debug("creating new consumer: {}", name) + + return Executor( + id = name, + channel = config.channelProvider(), + handlers = config.handlers, + jobTimeout = config.maxJobTime, + submitJobFn = threads::submit, + timeoutFn = config.timeoutCB, + ) + } + + private fun onConsumerShutdown(ex: Executor): ConsumerShutdownSignalCallback { + return ConsumerShutdownSignalCallback { _, sig -> + if (sig.isInitiatedByApplication) { + logger.debug("received consumer {} shutdown signal", ex.id) + } else { + logger.warn("caught unexpected shutdown on consumer {}", ex.id) + config.failureChecker.markFailure() + if (config.failureChecker.shouldHalt()) { + val reason = config.failureChecker.reason() + logger.error("shutting down job queue executor: {}", reason) + + stopNow() + return@ConsumerShutdownSignalCallback + } + } + + remove(ex) + ex.stop() + + if (!closing.get()) + add(createNew().also { it.init(config.queueName, onConsumerShutdown(it)) }) + } + } + + private fun add(ex: Executor) { + if (head == null) { + head = ex + tail = ex + } else { + tail!!.next = ex + tail = ex + } + } + + private fun remove(ex: Executor) { + if (head === ex) { + head = ex.next + head?.prev = null + } else if (tail === ex) { + tail = ex.prev + tail?.next = null + } else { + ex.next?.prev = ex.prev + ex.prev?.next = ex.next + } + } + + private fun silently(fn: () -> Unit) { + try { + fn() + } catch (e: Throwable) { + // do nothing + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/utils/ScrollingCounter.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/utils/ScrollingCounter.kt new file mode 100644 index 0000000..189dce3 --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/utils/ScrollingCounter.kt @@ -0,0 +1,34 @@ +package org.veupathdb.lib.rabbit.jobs.utils + +import java.util.LinkedList +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds + +class ScrollingCounter(windowDuration: Duration = 5.seconds) { + private val entries = LinkedList() + private val maxDelta = windowDuration.inWholeMilliseconds + + fun inc() { entries.offer(System.currentTimeMillis()) } + + operator fun compareTo(value: Int) = compareTo(value.toUInt()) + operator fun compareTo(value: UInt) = count().compareTo(value) + + fun count(): UInt { + val cutoff = System.currentTimeMillis() - maxDelta + + while (entries.isNotEmpty() && entries.first < cutoff) + entries.pop() + + return entries.size.toUInt() + } + + override fun equals(other: Any?) = + when (other) { + is Int -> count() == other + is UInt -> count() == other + is ScrollingCounter -> other === this || count() == other.count() + else -> false + } + + override fun hashCode() = entries.hashCode() * 31 + maxDelta.hashCode() +} \ No newline at end of file diff --git a/test/client/.dockerignore b/test/client/.dockerignore new file mode 100644 index 0000000..bb88a77 --- /dev/null +++ b/test/client/.dockerignore @@ -0,0 +1,2 @@ +/test/server/ +/test/docker-compose.yml \ No newline at end of file diff --git a/test/client/build.gradle.kts b/test/client/build.gradle.kts index 02bdf3b..7c25afc 100644 --- a/test/client/build.gradle.kts +++ b/test/client/build.gradle.kts @@ -27,12 +27,16 @@ dependencies { implementation(kotlin("stdlib-jdk8")) implementation(rootProject) implementation("org.veupathdb.lib:hash-id:1.0.2") - implementation("com.fasterxml.jackson.core:jackson-databind:2.13.0") + + implementation("com.fasterxml.jackson.core:jackson-databind:2.17.1") + implementation("org.apache.logging.log4j:log4j-slf4j-impl:2.23.1") + implementation("org.apache.logging.log4j:log4j-api:2.23.1") + implementation("org.apache.logging.log4j:log4j-core:2.23.1") } kotlin { jvmToolchain { - (this as JavaToolchainSpec).languageVersion.set(JavaLanguageVersion.of(17)) + languageVersion.set(JavaLanguageVersion.of(17)) } } diff --git a/test/client/src/main/kotlin/main.kt b/test/client/src/main/kotlin/main.kt index 5c57a61..7e98d59 100644 --- a/test/client/src/main/kotlin/main.kt +++ b/test/client/src/main/kotlin/main.kt @@ -1,22 +1,28 @@ @file:JvmName("Main") -import com.fasterxml.jackson.databind.node.TextNode -import org.veupathdb.lib.hash_id.HashID -import org.veupathdb.lib.rabbit.jobs.QueueWorker -import org.veupathdb.lib.rabbit.jobs.model.ErrorNotification +import org.veupathdb.lib.rabbit.jobs.JobQueueExecutor +import org.veupathdb.lib.rabbit.jobs.config.ExecutorFailurePolicy import org.veupathdb.lib.rabbit.jobs.model.SuccessNotification +import kotlin.time.Duration.Companion.minutes fun main() { println("Sleeping for 10 seconds...") Thread.sleep(10_000) - val conFac = QueueWorker {} + val conFac = JobQueueExecutor { + executor { + failurePolicy = ExecutorFailurePolicy.maxTotalFailures(1) + workers = 1 + maxJobExecutionTime = 35.minutes + jobTimeoutCallback = { println(it.body) } + } + } conFac.onJob { - print("Server: ") - println(it) - } + println("Server said: $it") - conFac.sendSuccess(SuccessNotification(HashID("0102030405060708090A0B0C0D0E0F10"))) - conFac.sendError(ErrorNotification(HashID("0102030405060708090A0B0C0D0E0F10"), 123, 0,"butts", TextNode("body"))) + Thread.sleep(40.minutes.inWholeMilliseconds) + + conFac.sendSuccess(SuccessNotification(it.jobID)) + } } \ No newline at end of file diff --git a/test/client/src/main/resources/log4j2.xml b/test/client/src/main/resources/log4j2.xml new file mode 100644 index 0000000..473e169 --- /dev/null +++ b/test/client/src/main/resources/log4j2.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/test/docker-compose.yml b/test/docker-compose.yml index 22efe5d..532e474 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -10,6 +10,6 @@ services: context: ../ dockerfile: test/server/Dockerfile rabbit: - image: rabbitmq:3.9.11-management-alpine + image: rabbitmq:3.13.3-management-alpine ports: - - "15672:15672" \ No newline at end of file + - "15672:15672" diff --git a/test/server/.dockerignore b/test/server/.dockerignore new file mode 100644 index 0000000..d3849d4 --- /dev/null +++ b/test/server/.dockerignore @@ -0,0 +1,2 @@ +/test/client/ +/test/docker-compose.yml diff --git a/test/server/build.gradle.kts b/test/server/build.gradle.kts index 653f9ff..5fc85eb 100644 --- a/test/server/build.gradle.kts +++ b/test/server/build.gradle.kts @@ -27,12 +27,15 @@ dependencies { implementation(kotlin("stdlib-jdk8")) implementation(rootProject) implementation("org.veupathdb.lib:hash-id:1.1.0") - implementation("com.fasterxml.jackson.core:jackson-databind:2.13.3") + implementation("com.fasterxml.jackson.core:jackson-databind:2.17.1") + implementation("org.apache.logging.log4j:log4j-slf4j-impl:2.23.1") + implementation("org.apache.logging.log4j:log4j-api:2.23.1") + implementation("org.apache.logging.log4j:log4j-core:2.23.1") } kotlin { jvmToolchain { - (this as JavaToolchainSpec).languageVersion.set(JavaLanguageVersion.of(17)) + languageVersion.set(JavaLanguageVersion.of(17)) } } diff --git a/test/server/src/main/kotlin/main.kt b/test/server/src/main/kotlin/main.kt index e305dcc..09cc0ee 100644 --- a/test/server/src/main/kotlin/main.kt +++ b/test/server/src/main/kotlin/main.kt @@ -2,26 +2,28 @@ import com.fasterxml.jackson.databind.node.TextNode import org.veupathdb.lib.hash_id.HashID -import org.veupathdb.lib.rabbit.jobs.QueueDispatcher +import org.veupathdb.lib.rabbit.jobs.JobQueueDispatcher import org.veupathdb.lib.rabbit.jobs.model.JobDispatch fun main() { println("Sleeping for 10 seconds...") Thread.sleep(10_000) - val conFac = QueueDispatcher {} + val conFac = JobQueueDispatcher {} conFac.onSuccess { - println("Client Success: $it") + println("Success from client: $it") } conFac.onError { - print("Client Error: $it") + print("Error from client: $it") } - conFac.dispatch(JobDispatch( - HashID("01020304050607080102030405060708"), - TextNode("foo"), - "something", - )) + for (i in 1 .. 15) { + conFac.dispatch(JobDispatch( + HashID("01020304050607080102030405060701"), + TextNode("foo $i"), + "something", + )) + } } \ No newline at end of file diff --git a/test/server/src/main/resources/log4j2.xml b/test/server/src/main/resources/log4j2.xml new file mode 100644 index 0000000..473e169 --- /dev/null +++ b/test/server/src/main/resources/log4j2.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + \ No newline at end of file