diff --git a/config/resource_handler/README.md b/config/resource_handler/README.md new file mode 100644 index 0000000..a168f39 --- /dev/null +++ b/config/resource_handler/README.md @@ -0,0 +1,82 @@ +# Resource handler + +This module handles resource allocations including granular per-process allocations, base allocation modifications, and retry setup. It makes use of the [retry module](../retry) module for retry setup. + +The resource handler requires a JSON file containing allocation information, in the following format: + +```JSON +{ + "allocation_group": { + "process1": { + "cpus": { + "min": 1, + "fraction": 0.5, + "max": 1 + }, + "memory": { + "min": "150 MB", + "fraction": 0.23, + "max": "1 GB" + }, + "retry_strategy": { + "memory": { + "strategy": "exponential", + "operand": 2 + } + } + }, + "process2": { + "cpus": { + "min": 2, + "fraction": 0.5, + "max": 2 + }, + "memory": { + "min": "1500 MB", + "fraction": 0.23, + "max": "10 GB" + } + } + }, + "default": { + "process1": { + "cpus": { + "min": 1, + "fraction": 0.5, + "max": 1 + }, + "memory": { + "min": "150 MB", + "fraction": 0.23, + "max": "1 GB" + } + }, + "process2": { + "cpus": { + "min": 2, + "fraction": 0.5, + "max": 2 + }, + "memory": { + "min": "1500 MB", + "fraction": 0.23, + "max": "10 GB" + } + } + } +} +``` + +A resource group with the label `default` is required in the JSON. Additionally, groups labeled as F-series and M-series allocations can be provided and will be automatically detected. For example, an F16 node will be recognized and the corresponding group label `f16` will automatically be loaded if defined in the resources JSON. A specific group can also be selected through configuration with the parameter `resource_allocation_profile_tag`. + +## Example +An example of integrating the handler and setting up allocations: +```Nextflow +methods { + ... + setup = { + ... + resource_handler.handle_resources("${projectDir}/config/resources.json") + } +} +``` \ No newline at end of file diff --git a/config/resource_handler/resource_handler.config b/config/resource_handler/resource_handler.config new file mode 100644 index 0000000..6feb936 --- /dev/null +++ b/config/resource_handler/resource_handler.config @@ -0,0 +1,302 @@ +import nextflow.util.SysHelper +import groovy.json.JsonSlurper + +includeConfig "../retry/retry.config" + +class SystemResources { + private Map resource_limits = [ + 'cpus': [ + 'type': java.lang.Integer, + 'min': 1, + 'max': SysHelper.getAvailCpus() + ], + 'memory': [ + 'type': nextflow.util.MemoryUnit, + 'min': 1.MB, + 'max': SysHelper.getAvailMemory() + ], + 'time': [ + 'type': nextflow.util.Duration, + 'min': 1.s, + 'max': 1000.d + ] + ] + private String resource_profile = null + + SystemResources(Map params) { + // Search for config-defined resource limits + this.resource_limits.each { resource, resource_info -> + ['min', 'max'].each { limit_end -> + try { + if (params.containsKey("${limit_end}_${resource}" as String)) { + this.resource_limits[resource][limit_end] = params["${limit_end}_${resource}" as String].asType(this.resource_limits[resource]['type']) + } + } catch (all) { + // Do nothing, let default value defined above take effect + } + } + } + + this.identify_resource_profile() + } + + Map get_limits(String type) { + return [ + ("min_${type}" as String): this.resource_limits[type]['min'], + ("max_${type}" as String): this.resource_limits[type]['max'] + ] + } + + Object check_limits(Object obj, String type) { + return SystemResources.check_limits(obj, type, this.resource_limits[type]['min'], this.resource_limits[type]['max']) + } + + static Object check_limits(Object obj, String type, Object min, Object max) { + if (obj.compareTo(max) == 1) { + return max + } else if (obj.compareTo(min) == -1) { + return min + } else { + return obj + } + } + + private void identify_resource_profile() { + // Identify if available resources match F- or M-series nodes + def cpus = this.resource_limits.cpus.max + def memory = this.resource_limits.memory.max.toGiga() + + if (memory >= (cpus * 2 * 0.9 - 1) && (memory <= (cpus * 2))) { + this.resource_profile = "f${cpus}" + } + + if (memory >= (cpus * 16 * 0.9 - 1) && (memory <= (cpus * 16))) { + this.resource_profile = "m${cpus}" + } + } + + Object resolve_resource_allocation(Map allocation, String type) { + def min_raw = allocation['min'].asType(this.resource_limits[type]['type']) + def max_raw = allocation['max'].asType(this.resource_limits[type]['type']) + + def min_allocation = this.check_limits(min_raw, type) + def max_allocation = this.check_limits(max_raw, type) + + def requested_allocation = (allocation.fraction * (this.resource_limits[type]['max'])).asType(this.resource_limits[type]['type']) + + return SystemResources.check_limits(requested_allocation, type, min_allocation, max_allocation) + } + + String get_resource_profile_tag() { + return this.resource_profile + } +} + +class PipelineAllocation { + private SystemResources system_resources + private File resource_json + private String allocation_profile + private Map processed_resources = [:] + private Map retry_configurations = [:] + private Map raw_process_resources = [:] + + PipelineAllocation(Object resource_json, Object params) { + this.system_resources = new SystemResources(params) + this.resource_json = new File(resource_json) + + def json_slurper = new JsonSlurper() + + // TO-DO: Dump original for logging, keeping in class for now + this.raw_process_resources = json_slurper.parse(this.resource_json) + + assert this.raw_process_resources instanceof Map + // TO-DO: Validate JSON is in expected format + + this.select_allocation_profile(params) + + // Separate the retry strategies from the base allocations + this.processed_resources.each { process, allocations -> + if (allocations.containsKey("retry_strategy")) { + def current_retry_strategy = allocations["retry_strategy"] + // Convert memory string to MemoryUnit for proper retry setup + if (current_retry_strategy.containsKey("memory")) { + if (current_retry_strategy.memory.strategy == "add") { + current_retry_strategy.memory.operand = current_retry_strategy.memory.operand as nextflow.util.MemoryUnit + } + } + this.retry_configurations[process] = current_retry_strategy + allocations.remove("retry_strategy") + } + } + + // Convert string memory units to memory unit + this.processed_resources.each { process, allocations -> + for (resource_type in ["cpus", "memory", "time"]) { + if (allocations.containsKey(resource_type)) { + allocations[resource_type] = this.system_resources.resolve_resource_allocation(allocations[resource_type], resource_type) + } + } + } + + // TO-DO: Singularize processes that may be separated with `|` + } + + private void load_resource_profile(String profile_tag) { + this.processed_resources = this.raw_process_resources[profile_tag] + + if (!this.processed_resources) { + throw new Exception(" ### ERROR ### Failed to find requested resource profile: `${profile_tag}`") + } + } + + private void apply_custom_allocations(Map custom_allocations) { + custom_allocations.each { process, custom_allocation -> + custom_allocation.each { resource_type, allocation -> + this.processed_resources[process][resource_type] = this.system_resources.check_limits(allocation, resource_type) + } + } + } + + private void select_allocation_profile(Map params) { + String profile_tag = null + + // Try for user-given profile + if (params.containsKey('resource_allocation_profile_tag') && params.resource_allocation_profile_tag) { + profile_tag = params.resource_allocation_profile_tag + this.load_resource_profile(profile_tag) + return + } + + // Try loading detected tag based on system resources + profile_tag = this.system_resources.get_resource_profile_tag() + + if (profile_tag) { + try { + this.load_resource_profile(profile_tag) + return + } catch (all) { + throw new Exception(" ### ERROR ### Failed to load requested profile: ${profile_tag}. Please check and provide a valid tag.") + } + } + + // Resort to loading `default` profile + this.load_resource_profile('default') + } + + // TO-DO: functionality to dump original loaded JSON to file for logging + + void update_base_allocation(String resource, String process, Object multiplier) { + if (this.processed_resources.containsKey(process) && this.processed_resources[process].containsKey(resource)) { + this.processed_resources[process][resource] = this.system_resources.check_limits(this.processed_resources[process][resource] * multiplier, resource) + } else { + System.out.println(" ### WARNING ### No base value found for resource `${resource}` for process `${process}`. Update will be skipped.") + } + } + + // Apply base resource updates + void apply_base_updates(Map resource_updates) { + resource_updates.each { resource, updates -> + updates.each { processes, multiplier -> + List processes_to_update = (processes instanceof String || processes instanceof GString) ? [processes] : processes + + if (processes_to_update == []) { + processes_to_update = this.processed_resources.keySet() as List + } + + processes_to_update.each { process -> + this.update_base_allocation(resource, process, multiplier) + } + } + } + } + + Map get_base_resource_allocations() { + return this.processed_resources + } + + Map get_retry_configuration() { + return this.retry_configurations + } + + void print_resources() { + System.out.println(this.processed_resources) + } + + SystemResources get_system_resources() { + return this.system_resources + } +} + +resource_handler { + set_retry = { String proc_key, String proc_name, String type -> + if (process[proc_key]?[type] && \ + params.retry_information?[proc_name]?[type] && \ + params.retry_information?[proc_name]?[type]?.operand && \ + params.retry_information?[proc_name]?[type]?.strategy) { + process[proc_key][type] = { retry.retry_updater(params.base_allocations[task.process.split(':')[-1]][type], \ + params.retry_information[task.process.split(':')[-1]][type].strategy, \ + params.retry_information[task.process.split(':')[-1]][type].operand, \ + task.attempt, \ + type) } + } + } + + set_resource_limit_params = { SystemResources system_resources -> + ["cpus", "memory", "time"].each { resource_type -> + system_resources.get_limits(resource_type).each { resource_limit_key, resource_limit -> + params[resource_limit_key] = resource_limit + } + } + } + + setup_retry = { Map resources_to_allocate, Map retry_configuration -> + params.retry_information = [:] + + for (proc_allocation in resources_to_allocate) { + def proc_key = "withName:${proc_allocation.key}" as String + def allocation = proc_allocation.value + + def retry_strategy = retry_configuration.getOrDefault(proc_allocation.key, null) + if (retry_strategy) { + params.retry_information[proc_allocation.key] = retry_strategy + } + + for (resource_allocation in allocation) { + process[proc_key]["${resource_allocation.key}"] = resource_allocation.value + if (retry_strategy && retry_strategy.containsKey(resource_allocation.key)) { + resource_handler.set_retry(proc_key, proc_allocation.key, 'cpus') + resource_handler.set_retry(proc_key, proc_allocation.key, 'memory') + } + } + } + } + + handle_resources = { Object resource_file, Map customized_allocations=[:], Map current_params=params -> + // Load base.config by default for all pipelines + includeConfig "${projectDir}/config/base.config" + + def allocation_handler = new PipelineAllocation(resource_file, current_params) + def system_resources = allocation_handler.get_system_resources() + + // Set params for limits for each resources + resource_handler.set_resource_limit_params(system_resources) + + // Apply custom allocations if given + if (customized_allocations) { + allocation_handler.apply_custom_allocations(customized_allocations) + } + + // Apply base resource updates if given + if (params.containsKey('base_resource_update') && params.base_resource_update) { + allocation_handler.apply_base_updates(params.base_resource_update) + } + + // Set base allocations and retry + Map resources_to_allocate = allocation_handler.get_base_resource_allocations() + Map retry_configuration = allocation_handler.get_retry_configuration() + + params.base_allocations = resources_to_allocate + + resource_handler.setup_retry(resources_to_allocate, retry_configuration) + } +} diff --git a/tests/Dockerfile b/tests/Dockerfile index 98fa51c..d046586 100644 --- a/tests/Dockerfile +++ b/tests/Dockerfile @@ -19,7 +19,7 @@ ENV NXF_LAUNCHER=/.nextflow/tmp/launcher/nextflow-one_${NEXTFLOW_VERSION}/buildk RUN sed \ -i \ -e 's/"nextflow.cli.Launcher"/"groovy.ui.GroovyMain"/' \ - -e 's|"-classpath" "|"-classpath" "/bljars/junit-4.13.2.jar:/bljars/hamcrest-core-1.3.jar:/bljars/groovy-test-3.0.19.jar:/bljars/system-rules-1.19.0.jar:$BL_CLASSPATH:|' \ + -e "s|\"-classpath\" \"|\"-classpath\" \"$(find /bljars/ -not -name 'groovy-3*' -type f -printf "%p:"):|" \ ${NXF_LAUNCHER}/classpath-${NEXTFLOW_MD5} COPY validator /usr/local/validator diff --git a/tests/ResourceTests.groovy b/tests/ResourceTests.groovy new file mode 100644 index 0000000..59b0ba8 --- /dev/null +++ b/tests/ResourceTests.groovy @@ -0,0 +1,230 @@ +import java.nio.file.Path +import java.nio.file.Paths + +import static groovy.test.GroovyAssert.shouldFail +import groovy.json.JsonOutput +import groovy.util.ConfigObject + +import nextflow.util.ConfigHelper +import nextflow.util.MemoryUnit +import nextflow.util.SysHelper +import org.junit.Test + +import validator.bl.NextflowConfigTests + +class ResourceTests extends NextflowConfigTests { + protected Path get_projectDir() { + // Return the path to the "resource_test/" subfolder + return Paths.get( + getClass().protectionDomain.codeSource.location.path + ).getParent().resolve("resource_test") + } + + protected int override_cpus = 10 + protected MemoryUnit override_memory = 12.GB + + @Override + protected def generate_config_text(configobj) { + return """ + import nextflow.util.SysHelper + import nextflow.util.MemoryUnit + import static org.mockito.Mockito.* + import org.mockito.MockedStatic + + // Mock out the SysHelper::getAvailCpus() and + // SysHelper::getAvailMemory() methods + + try (MockedStatic dummyhelper = mockStatic( + SysHelper.class, + CALLS_REAL_METHODS)) { + dummyhelper + .when(SysHelper::getAvailCpus) + .thenReturn(${override_cpus}); + dummyhelper + .when(SysHelper::getAvailMemory) + .thenReturn(new MemoryUnit(${override_memory.getBytes()})); + + includeConfig "\${projectDir}/../../config/resource_handler/resource_handler.config" + + ${ConfigHelper.toCanonicalString(configobj)} + resource_handler.handle_resources(params.resource_file) + } + """ + } + + protected Map get_baseline_resource_allocations() { + // These are modified from the resource allocation README + return [ + default: [ + process1: [ + cpus: [ min: 1, fraction: 0.51, max: 100 ], + memory: [ min: "1 MB", fraction: 0.5, max: "100 GB" ] + ], + process2: [ + cpus: [ min: 1, fraction: 0.75, max: 100 ], + memory: [ min: "1 MB", fraction: 0.25, max: "100 GB" ] + ], + process3: [ + cpus: [ min: 1, fraction: 0.75, max: 2 ], + memory: [ min: "1 MB", fraction: 0.5, max: "12 MB" ] + ] + ], + custom_profile: [ + process1: [ + cpus: [ min: 12, fraction: 0.25, max: 100 ], + memory: [ min: "5 GB", fraction: 1.0, max: "100 GB" ] + ], + process2: [ + cpus: [ min: 12, fraction: 0.25, max: 20], + memory: [ min: "230 MB", fraction: 0.25, max: "250 MB" ] + ], + process3: [ + cpus: [ min: 7, fraction: 0.75, max: 1000 ], + memory: [ min: "12 GB", fraction: 0.6, max: "120 GB" ] + ] + ] + ] + } + + protected String write_resource_json(Map resources) { + File tempfile = testFolder.newFile("resources.json") + tempfile.write(JsonOutput.prettyPrint(JsonOutput.toJson(resources))) + return tempfile.toString() + } + + protected def set_common_parameters(Map resources) { + File tempfile = testFolder.newFile("resources.json") + tempfile.write(JsonOutput.prettyPrint(JsonOutput.toJson(resources))) + + inconfig.params.resource_file = tempfile.toString() + expected.params.resource_file = tempfile.toString() + + // These parameters are scraped from the current system + expected.params.min_cpus = 1 + expected.params.max_cpus = override_cpus + + expected.params.min_memory = 1.MB + expected.params.max_memory = override_memory + + expected.params.min_time = 1.s + expected.params.max_time = 1000.d + + // Re-implement the logic from the resource handler to predict the values + expected.params.base_allocations = [:] + + def profile = resources.get( + inconfig.params.containsKey("resource_allocation_profile_tag") + ? inconfig.params.resource_allocation_profile_tag + : "default" + ) + + profile.each { process_name, process_info -> + def allocations = [:] + + allocations.cpus = Math.min( + Math.min( + Math.max( + (override_cpus * process_info.cpus.fraction).asType(Integer), + process_info.cpus.min + ), + process_info.cpus.max + ), + override_cpus + ) + + allocations.memory = MemoryUnit.of(Math.min( + Math.min( + Math.max( + (override_memory.getBytes() * process_info.memory.fraction).asType(long), + MemoryUnit.of(process_info.memory.min).getBytes() + ), + MemoryUnit.of(process_info.memory.max).getBytes() + ), + override_memory.getBytes() + )) + + expected.params.base_allocations[process_name] = allocations + } + + expected.params.retry_information = [:] + } + + // A helper method to compare that the values of any common keys between + // the two maps are equal. + def compare_common_keys(Map left, Map right) { + left.keySet().intersect(right.keySet()).each { key -> + if (left[key] instanceof Map) { + assert right[key] instanceof Map + compare_common_keys(left[key], right[key]) + } else { + assert left[key] == right[key] + } + } + } + + @Test + void test_defaults() { + def resources = get_baseline_resource_allocations() + set_common_parameters(resources) + + // Sanity check - should get 50% of the memory in the default profile + assert expected.params.base_allocations.process1.memory == 0.5 * override_memory + + compare() + } + + @Test + void test_custom_profile() { + def resources = get_baseline_resource_allocations() + inconfig.params.resource_allocation_profile_tag = "custom_profile" + expected.params.resource_allocation_profile_tag = "custom_profile" + set_common_parameters(resources) + + // Sanity check - should get 100% of the memory in the custom profile + assert expected.params.base_allocations.process1.memory == override_memory + + compare() + } + + @Test + void test_modified_parameters() { + def resources = get_baseline_resource_allocations() + + def meta_expected = new ConfigObject() + + // Tweak the system on which this is being evaluated + override_cpus = 1000 + override_memory *= 2 + + // Sanity check - process1 should get 50% of the memory + meta_expected.process1.memory = 0.5 * override_memory + + // If we pin the min and max CPUs, that should determine exactly the number we get + resources.default.process1.cpus.min = 34 + resources.default.process1.cpus.max = 34 + meta_expected.process1.cpus = 34 + + // If the max CPUs are limiting, that determines the CPU limit + resources.default.process2.cpus.min = 1 + resources.default.process2.cpus.max = 72 + meta_expected.process2.cpus = 72 + + set_common_parameters(resources) + + compare_common_keys(expected.params.base_allocations, meta_expected) + + // Sanity-check the compare_common_keys function + // Extraneous keys don't cause problems + meta_expected.process7 = [:] + meta_expected.process2.fakekey = 12 + compare_common_keys(expected.params.base_allocations, meta_expected) + + // Mismatched keys _must_ cause problems + meta_expected.process2.cpus = 71 + shouldFail { + compare_common_keys(expected.params.base_allocations, meta_expected) + } + + compare() + } +} diff --git a/tests/config/pom.xml b/tests/config/pom.xml index 91aba18..5509548 100644 --- a/tests/config/pom.xml +++ b/tests/config/pom.xml @@ -15,5 +15,10 @@ system-rules 1.19.0 + + org.mockito + mockito-core + 5.10.0 + diff --git a/tests/resource_test/config/base.config b/tests/resource_test/config/base.config new file mode 100644 index 0000000..e69de29 diff --git a/tests/suite.groovy b/tests/suite.groovy index bf7244a..f085fc8 100644 --- a/tests/suite.groovy +++ b/tests/suite.groovy @@ -5,7 +5,8 @@ result = JUnitCore.runClasses \ ExampleTests, \ SetEnvTests, \ AlignMethodsTests, \ - BamParserTests + BamParserTests, \ + ResourceTests String message = "Ran: " + result.getRunCount() + ", Ignored: " + result.getIgnoreCount() + ", Failed: " + result.getFailureCount() if (result.wasSuccessful()) {