diff --git a/components/flow/flow-mapper-service/src/integrationTest/kotlin/net/corda/session/mapper/service/integration/FlowMapperServiceIntegrationTest.kt b/components/flow/flow-mapper-service/src/integrationTest/kotlin/net/corda/session/mapper/service/integration/FlowMapperServiceIntegrationTest.kt index ec18763be0b..d175c1ee67c 100644 --- a/components/flow/flow-mapper-service/src/integrationTest/kotlin/net/corda/session/mapper/service/integration/FlowMapperServiceIntegrationTest.kt +++ b/components/flow/flow-mapper-service/src/integrationTest/kotlin/net/corda/session/mapper/service/integration/FlowMapperServiceIntegrationTest.kt @@ -520,6 +520,10 @@ class FlowMapperServiceIntegrationTest { producer { close.timeout = 6000 } + mediator { + poolSize = 1 + minPoolRecordCount = 20 + } pollTimeout = 100 } """ diff --git a/components/flow/flow-mapper-service/src/integrationTest/kotlin/net/corda/session/mapper/service/integration/TestFlowEventMediatorFactoryImpl.kt b/components/flow/flow-mapper-service/src/integrationTest/kotlin/net/corda/session/mapper/service/integration/TestFlowEventMediatorFactoryImpl.kt index 20061b92814..5372b3a79dc 100644 --- a/components/flow/flow-mapper-service/src/integrationTest/kotlin/net/corda/session/mapper/service/integration/TestFlowEventMediatorFactoryImpl.kt +++ b/components/flow/flow-mapper-service/src/integrationTest/kotlin/net/corda/session/mapper/service/integration/TestFlowEventMediatorFactoryImpl.kt @@ -79,6 +79,7 @@ class TestFlowEventMediatorFactoryImpl @Activate constructor( .threads(1) .threadName("flow-event-mediator") .stateManager(stateManagerFactory.create(stateManagerConfig)) + .minGroupSize(20) .build() private fun createMessageRouterFactory() = MessageRouterFactory { clientFinder -> diff --git a/components/flow/flow-mapper-service/src/main/kotlin/net/corda/session/mapper/messaging/mediator/FlowMapperEventMediatorFactoryImpl.kt b/components/flow/flow-mapper-service/src/main/kotlin/net/corda/session/mapper/messaging/mediator/FlowMapperEventMediatorFactoryImpl.kt index 73348a47204..ba54e321d24 100644 --- a/components/flow/flow-mapper-service/src/main/kotlin/net/corda/session/mapper/messaging/mediator/FlowMapperEventMediatorFactoryImpl.kt +++ b/components/flow/flow-mapper-service/src/main/kotlin/net/corda/session/mapper/messaging/mediator/FlowMapperEventMediatorFactoryImpl.kt @@ -23,7 +23,8 @@ import net.corda.schema.Schemas.Flow.FLOW_MAPPER_START import net.corda.schema.Schemas.Flow.FLOW_SESSION import net.corda.schema.Schemas.Flow.FLOW_START import net.corda.schema.Schemas.P2P.P2P_OUT_TOPIC -import net.corda.schema.configuration.FlowConfig +import net.corda.schema.configuration.MessagingConfig.Subscription.PROCESSING_MIN_POOL_RECORD_COUNT +import net.corda.schema.configuration.MessagingConfig.Subscription.PROCESSING_THREAD_POOL_SIZE import net.corda.session.mapper.service.executor.FlowMapperMessageProcessor import org.osgi.service.component.annotations.Activate import org.osgi.service.component.annotations.Component @@ -51,7 +52,6 @@ class FlowMapperEventMediatorFactoryImpl @Activate constructor( stateManager: StateManager, ) = eventMediatorFactory.create( createEventMediatorConfig( - flowConfig, messagingConfig, FlowMapperMessageProcessor(flowMapperEventExecutorFactory, flowConfig), stateManager, @@ -59,7 +59,6 @@ class FlowMapperEventMediatorFactoryImpl @Activate constructor( ) private fun createEventMediatorConfig( - flowConfig: SmartConfig, messagingConfig: SmartConfig, messageProcessor: StateAndEventProcessor, stateManager: StateManager, @@ -84,9 +83,10 @@ class FlowMapperEventMediatorFactoryImpl @Activate constructor( ) .messageProcessor(messageProcessor) .messageRouterFactory(createMessageRouterFactory()) - .threads(flowConfig.getInt(FlowConfig.PROCESSING_THREAD_POOL_SIZE)) + .threads(messagingConfig.getInt(PROCESSING_THREAD_POOL_SIZE)) .threadName("flow-mapper-event-mediator") .stateManager(stateManager) + .minGroupSize(messagingConfig.getInt(PROCESSING_MIN_POOL_RECORD_COUNT)) .build() private fun createMessageRouterFactory() = MessageRouterFactory { clientFinder -> diff --git a/components/flow/flow-mapper-service/src/test/kotlin/net/corda/session/mapper/service/messaging/mediator/FlowMapperEventMediatorFactoryImplTest.kt b/components/flow/flow-mapper-service/src/test/kotlin/net/corda/session/mapper/service/messaging/mediator/FlowMapperEventMediatorFactoryImplTest.kt index da208559925..02e92cb47f9 100644 --- a/components/flow/flow-mapper-service/src/test/kotlin/net/corda/session/mapper/service/messaging/mediator/FlowMapperEventMediatorFactoryImplTest.kt +++ b/components/flow/flow-mapper-service/src/test/kotlin/net/corda/session/mapper/service/messaging/mediator/FlowMapperEventMediatorFactoryImplTest.kt @@ -8,7 +8,7 @@ import net.corda.messaging.api.mediator.config.EventMediatorConfig import net.corda.messaging.api.mediator.factory.MediatorConsumerFactoryFactory import net.corda.messaging.api.mediator.factory.MessagingClientFactoryFactory import net.corda.messaging.api.mediator.factory.MultiSourceEventMediatorFactory -import net.corda.schema.configuration.FlowConfig +import net.corda.schema.configuration.MessagingConfig import net.corda.session.mapper.messaging.mediator.FlowMapperEventMediatorFactory import net.corda.session.mapper.messaging.mediator.FlowMapperEventMediatorFactoryImpl import org.junit.jupiter.api.Assertions.assertNotNull @@ -24,13 +24,13 @@ class FlowMapperEventMediatorFactoryImplTest { private val mediatorConsumerFactoryFactory = mock() private val messagingClientFactoryFactory = mock() private val multiSourceEventMediatorFactory = mock() - private val flowConfig = mock() + private val config = mock() @BeforeEach fun beforeEach() { `when`(multiSourceEventMediatorFactory.create(any>())) .thenReturn(mock()) - `when`(flowConfig.getInt(FlowConfig.PROCESSING_THREAD_POOL_SIZE)).thenReturn(10) + `when`(config.getInt(MessagingConfig.Subscription.PROCESSING_THREAD_POOL_SIZE)).thenReturn(10) flowMapperEventMediatorFactory = FlowMapperEventMediatorFactoryImpl( flowMapperEventExecutorFactory, @@ -42,7 +42,7 @@ class FlowMapperEventMediatorFactoryImplTest { @Test fun `successfully creates event mediator`() { - val mediator = flowMapperEventMediatorFactory.create(flowConfig, mock(), mock()) + val mediator = flowMapperEventMediatorFactory.create(mock(), config, mock()) assertNotNull(mediator) } diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt index 98a573c6a03..61889cfcf0e 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt @@ -13,7 +13,6 @@ import net.corda.data.uniqueness.UniquenessCheckRequestAvro import net.corda.flow.pipeline.factory.FlowEventProcessorFactory import net.corda.ledger.utxo.verification.TransactionVerificationRequest import net.corda.libs.configuration.SmartConfig -import net.corda.libs.configuration.helper.getConfig import net.corda.libs.platform.PlatformInfoProvider import net.corda.libs.statemanager.api.StateManager import net.corda.messaging.api.constants.WorkerRPCPaths.CRYPTO_PATH @@ -43,8 +42,8 @@ import net.corda.schema.configuration.BootConfig.PERSISTENCE_WORKER_REST_ENDPOIN import net.corda.schema.configuration.BootConfig.TOKEN_SELECTION_WORKER_REST_ENDPOINT import net.corda.schema.configuration.BootConfig.UNIQUENESS_WORKER_REST_ENDPOINT import net.corda.schema.configuration.BootConfig.VERIFICATION_WORKER_REST_ENDPOINT -import net.corda.schema.configuration.ConfigKeys -import net.corda.schema.configuration.FlowConfig +import net.corda.schema.configuration.MessagingConfig.Subscription.PROCESSING_MIN_POOL_RECORD_COUNT +import net.corda.schema.configuration.MessagingConfig.Subscription.PROCESSING_THREAD_POOL_SIZE import org.osgi.service.component.annotations.Activate import org.osgi.service.component.annotations.Component import org.osgi.service.component.annotations.Reference @@ -79,7 +78,6 @@ class FlowEventMediatorFactoryImpl @Activate constructor( stateManager: StateManager, ) = eventMediatorFactory.create( createEventMediatorConfig( - configs, messagingConfig, flowEventProcessorFactory.create(configs), stateManager, @@ -87,7 +85,6 @@ class FlowEventMediatorFactoryImpl @Activate constructor( ) private fun createEventMediatorConfig( - configs: Map, messagingConfig: SmartConfig, messageProcessor: StateAndEventProcessor, stateManager: StateManager, @@ -115,9 +112,10 @@ class FlowEventMediatorFactoryImpl @Activate constructor( ) .messageProcessor(messageProcessor) .messageRouterFactory(createMessageRouterFactory(messagingConfig)) - .threads(configs.getConfig(ConfigKeys.FLOW_CONFIG).getInt(FlowConfig.PROCESSING_THREAD_POOL_SIZE)) + .threads(messagingConfig.getInt(PROCESSING_THREAD_POOL_SIZE)) .threadName("flow-event-mediator") .stateManager(stateManager) + .minGroupSize(messagingConfig.getInt(PROCESSING_MIN_POOL_RECORD_COUNT)) .build() private fun createMessageRouterFactory(messagingConfig: SmartConfig) = MessageRouterFactory { clientFinder -> diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/messaging/FlowEventMediatorFactoryImplTest.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/messaging/FlowEventMediatorFactoryImplTest.kt index e7bb8f815a2..5681e06e48b 100644 --- a/components/flow/flow-service/src/test/kotlin/net/corda/flow/messaging/FlowEventMediatorFactoryImplTest.kt +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/messaging/FlowEventMediatorFactoryImplTest.kt @@ -32,7 +32,7 @@ import net.corda.schema.Schemas.Flow.FLOW_EVENT_TOPIC import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_OUT import net.corda.schema.Schemas.Flow.FLOW_STATUS_TOPIC import net.corda.schema.configuration.ConfigKeys -import net.corda.schema.configuration.FlowConfig +import net.corda.schema.configuration.MessagingConfig import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Assertions.assertNotNull import org.junit.jupiter.api.BeforeEach @@ -51,7 +51,7 @@ class FlowEventMediatorFactoryImplTest { private val multiSourceEventMediatorFactory = mock() private val cordaAvroSerializationFactory = mock() private val platformInfoProvider = mock() - private val flowConfig = mock() + private val config = mock() val captor = argumentCaptor>() @@ -63,7 +63,7 @@ class FlowEventMediatorFactoryImplTest { `when`(multiSourceEventMediatorFactory.create(captor.capture())) .thenReturn(mock()) - `when`(flowConfig.getInt(FlowConfig.PROCESSING_THREAD_POOL_SIZE)).thenReturn(10) + `when`(config.getInt(MessagingConfig.Subscription.PROCESSING_THREAD_POOL_SIZE)).thenReturn(10) flowEventMediatorFactory = FlowEventMediatorFactoryImpl( flowEventProcessorFactory, @@ -83,7 +83,7 @@ class FlowEventMediatorFactoryImplTest { @Test fun `successfully creates event mediator with expected routes`() { val mediator = flowEventMediatorFactory.create( - mapOf(ConfigKeys.FLOW_CONFIG to flowConfig), + mapOf(ConfigKeys.MESSAGING_CONFIG to config), mock(), mock(), ) diff --git a/gradle.properties b/gradle.properties index 5865db801a4..b8e72441077 100644 --- a/gradle.properties +++ b/gradle.properties @@ -44,7 +44,7 @@ commonsLangVersion = 3.12.0 commonsTextVersion = 1.10.0 # Corda API libs revision (change in 4th digit indicates a breaking change) # Change to 5.2.0.xx-SNAPSHOT to pick up maven local published copy -cordaApiVersion=5.2.0.16-beta+ +cordaApiVersion=5.2.0.17-beta+ disruptorVersion=3.4.4 felixConfigAdminVersion=1.9.26 diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/GroupAllocator.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/GroupAllocator.kt new file mode 100644 index 00000000000..b42cb07051b --- /dev/null +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/GroupAllocator.kt @@ -0,0 +1,51 @@ +package net.corda.messaging.mediator + +import net.corda.messaging.api.mediator.config.EventMediatorConfig +import net.corda.messaging.api.records.Record +import kotlin.math.ceil +import kotlin.math.min + +/** + * Helper class to use in the mediator to divide polled records into groups for processing. + */ +class GroupAllocator { + + /** + * Allocate events into groups based on their keys, a configured minimum group size and thread count. + * This allows for more efficient multi-threaded processing. + * The threshold record count to establish a new group is [config.minGroupSize]. + * If the number of groups exceeds the number of threads then the group count is set to the number of [config.threads] + * Records of the same key are always placed into the same group regardless of group size and count. + * @param events Events to allocate to groups + * @param config Mediator config + * @return Records allocated to groups. + */ + fun allocateGroups( + events: List>, + config: EventMediatorConfig + ): List>>> { + val groups = setUpGroups(config, events) + val buckets = events + .groupBy { it.key }.toList() + .sortedByDescending { it.second.size } + + buckets.forEach { (key, records) -> + val leastFilledGroup = groups.minByOrNull { it.values.flatten().size } + leastFilledGroup?.put(key, records) + } + + return groups.filter { it.values.isNotEmpty() } + } + + private fun setUpGroups( + config: EventMediatorConfig, + events: List> + ): MutableList>>> { + val numGroups = min( + ceil(events.size.toDouble() / config.minGroupSize).toInt(), + config.threads + ) + + return MutableList(numGroups) { mutableMapOf() } + } +} \ No newline at end of file diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt index 9ee3826b9af..e4a138b0323 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImpl.kt @@ -55,6 +55,7 @@ class MultiSourceEventMediatorImpl( private val taskManagerHelper = TaskManagerHelper( taskManager, stateManagerHelper, metrics ) + private val groupAllocator = GroupAllocator() private val uniqueId = UUID.randomUUID().toString() private val lifecycleCoordinatorName = LifecycleCoordinatorName( "MultiSourceEventMediator--${config.name}", uniqueId @@ -164,7 +165,7 @@ class MultiSourceEventMediatorImpl( val messages = consumer.poll(pollTimeout) val startTimestamp = System.nanoTime() if (messages.isNotEmpty()) { - var groups = allocateGroups(messages.map { it.toRecord() }) + var groups = groupAllocator.allocateGroups(messages.map { it.toRecord() }, config) var states = stateManager.get(messages.map { it.key.toString() }.distinct()) while (groups.isNotEmpty()) { @@ -230,7 +231,7 @@ class MultiSourceEventMediatorImpl( states = failedToCreate + failedToDelete + failedToUpdateOptimisticLockFailure groups = if (states.isNotEmpty()) { - allocateGroups(flowEvents.filterKeys { states.containsKey(it) }.values.flatten()) + groupAllocator.allocateGroups(flowEvents.filterKeys { states.containsKey(it) }.values.flatten(), config) } else { listOf() } @@ -319,23 +320,4 @@ class MultiSourceEventMediatorImpl( } } } - - private fun allocateGroups(events: List>): List>>> { - val groups = mutableListOf>>>() - val groupCountBasedOnEvents = (events.size / 20).coerceAtLeast(1) - val groupsCount = if (groupCountBasedOnEvents < config.threads) groupCountBasedOnEvents else config.threads - for (i in 0 until groupsCount) { - groups.add(mutableMapOf()) - } - val buckets = events.groupBy { it.key } - val bucketSizes = buckets.keys.sortedByDescending { buckets[it]?.size } - for (i in buckets.size - 1 downTo 0 step 1) { - val group = groups.minBy { it.values.flatten().size } - val key = bucketSizes[i] - val records = buckets[key]!! - group[key] = records - } - - return groups - } } \ No newline at end of file diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/GroupAllocatorTest.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/GroupAllocatorTest.kt new file mode 100644 index 00000000000..3b4a5535687 --- /dev/null +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/GroupAllocatorTest.kt @@ -0,0 +1,129 @@ +package net.corda.messaging.mediator + +import net.corda.libs.configuration.SmartConfigImpl +import net.corda.libs.statemanager.api.StateManager +import net.corda.messaging.api.mediator.config.EventMediatorConfig +import net.corda.messaging.api.mediator.factory.MessageRouterFactory +import net.corda.messaging.api.processor.StateAndEventProcessor +import net.corda.messaging.api.records.Record +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.mockito.kotlin.mock + +class GroupAllocatorTest { + + private val groupAllocator = GroupAllocator() + + @Test + fun `allocate groups with records of 1 key below min group size`() { + val config = buildTestConfig(2, 20) + val records = getIntRecords(listOf(19)) + + val result = groupAllocator.allocateGroups(records, config) + + assertGroupsSize(result, mapOf(0 to 19)) + } + + @Test + fun `allocate groups with records of 2 keys below min group size`() { + val config = buildTestConfig(2, 20) + val records = getIntRecords(listOf(15, 4)) + + val result = groupAllocator.allocateGroups(records, config) + + assertGroupsSize(result, mapOf(0 to 19)) + } + + @Test + fun `allocate groups with records of 1 key above min group size`() { + val config = buildTestConfig(2, 20) + val records = getIntRecords(listOf(60)) + + val result = groupAllocator.allocateGroups(records, config) + + assertGroupsSize(result, mapOf(0 to 60)) + } + + @Test + fun `allocate groups with records of 2 keys above min group size`() { + val config = buildTestConfig(4, 20) + val records = getIntRecords(listOf(35, 25)) + + val result = groupAllocator.allocateGroups(records, config) + + assertGroupsSize(result, mapOf(0 to 35, 1 to 25)) + } + + @Test + fun `allocate small groups of records with 6 keys`() { + val config = buildTestConfig(4, 20) + val records = getIntRecords(listOf(5, 8, 7, 5, 8, 6)) + + val result = groupAllocator.allocateGroups(records, config) + + assertGroupsSize(result, mapOf(0 to 20, 1 to 19)) + } + + @Test + fun `allocate large groups of records with 6 keys`() { + val config = buildTestConfig(8, 20) + val records = getIntRecords(listOf(15, 18, 17, 15, 18, 15)) + + val result = groupAllocator.allocateGroups(records, config) + + assertGroupsSize(result, mapOf(0 to 18, 1 to 18, 2 to 17, 3 to 30, 4 to 15)) + } + + @Test + fun `allocate large groups of records with 6 keys but fewer threads than groups`() { + val config = buildTestConfig(4, 20) + val records = getIntRecords(listOf(15, 18, 17, 15, 18, 15)) + + val result = groupAllocator.allocateGroups(records, config) + + assertGroupsSize(result, mapOf(0 to 18, 1 to 18, 2 to 32, 3 to 30)) + } + + @Test + fun `allocate large groups of records with 6 keys with less threads than groups`() { + val config = buildTestConfig(2, 20) + val records = getIntRecords(listOf(15, 18, 17, 15, 18, 15)) + + val result = groupAllocator.allocateGroups(records, config) + + assertGroupsSize(result, mapOf(0 to 50, 1 to 48)) + } + + private fun assertGroupsSize(groups: List>>>, groupSize: Map ) { + assertEquals(groupSize.size, groups.size) + + groupSize.map { + assertEquals(groupSize[it.key], groups[it.key].values.flatten().size) + } + } + + private fun buildTestConfig(threadCount: Int, minGroupSize: Int): EventMediatorConfig { + return EventMediatorConfig( + "", + SmartConfigImpl.empty(), + emptyList(), + emptyList(), + mock>(), + mock(), + threadCount, + "", + mock(), + minGroupSize + ) + } + + private fun getIntRecords( recordCountByKey: List): List> { + val records = mutableListOf>() + for (i in recordCountByKey.indices) { + for (j in 1..recordCountByKey[i]) { + records.add(Record(null, i, j)) + } + } + return records + } +} diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImplTest.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImplTest.kt index aecaf5831c4..56a5cb5dd6f 100644 --- a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImplTest.kt +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/MultiSourceEventMediatorImplTest.kt @@ -123,6 +123,7 @@ class MultiSourceEventMediatorImplTest { .threads(1) .threadName("mediator-thread") .stateManager(stateManager) + .minGroupSize(20) .build() mediator = MultiSourceEventMediatorImpl( diff --git a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfig.kt b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfig.kt index 26eaa487508..53bd45c6fa5 100644 --- a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfig.kt +++ b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfig.kt @@ -25,6 +25,8 @@ import java.time.Duration * @property threads Number of threads used by task manager. * @property threadName Name (prefix) for task manager threads. * @property stateManager State manager. + * @property minGroupSize Minimum size for group of records passed to task manager for processing in a single thread. Does not block if + * group size is not met by polled record count. */ data class EventMediatorConfig( val name: String, @@ -36,6 +38,7 @@ data class EventMediatorConfig( val threads: Int, val threadName: String, val stateManager: StateManager, + val minGroupSize: Int, ) { /** * Timeout for polling consumers. diff --git a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfigBuilder.kt b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfigBuilder.kt index 83a4e0c3f4c..a6e502a6b5c 100644 --- a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfigBuilder.kt +++ b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfigBuilder.kt @@ -26,6 +26,7 @@ class EventMediatorConfigBuilder { private var threads: Int? = null private var threadName: String? = null private var stateManager: StateManager? = null + private var minGroupSize: Int? = null /** Sets name for [MultiSourceEventMediator]. */ fun name(name: String) = @@ -59,6 +60,16 @@ class EventMediatorConfigBuilder { fun threadName(threadName: String) = apply { this.threadName = threadName } + /** + * Sets the minimum size for group of records passed to task manager for processing in a single thread. Does not block if + * group size is not met by polled record count. + * If the number of resulting groups is evaluated to be more than the number of threads then the number of [threads] is used to + * calculate [minGroupSize] instead. + */ + fun minGroupSize(minGroupSize: Int) = + apply { this.minGroupSize = minGroupSize } + + /** Sets state manager. */ fun stateManager(stateManager: StateManager) = apply { this.stateManager = stateManager } @@ -77,6 +88,7 @@ class EventMediatorConfigBuilder { threads = checkNotNull(threads) { "Number of threads not set" }, threadName = checkNotNull(threadName) { "Thread name not set" }, stateManager = checkNotNull(stateManager) { "State manager not set" }, + minGroupSize = checkNotNull(minGroupSize) { "Min group size not set" }, ) } } \ No newline at end of file