Skip to content

Commit

Permalink
Merge pull request #73 from uclahs-cds/yashpatel-generalized-resource…
Browse files Browse the repository at this point in the history
…-handler

Resource handler
  • Loading branch information
yashpatel6 authored Dec 14, 2024
2 parents da0541d + a9b012a commit 07c99b8
Show file tree
Hide file tree
Showing 7 changed files with 622 additions and 2 deletions.
82 changes: 82 additions & 0 deletions config/resource_handler/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Resource handler

This module handles resource allocations including granular per-process allocations, base allocation modifications, and retry setup. It makes use of the [retry module](../retry) module for retry setup.

The resource handler requires a JSON file containing allocation information, in the following format:

```JSON
{
"allocation_group": {
"process1": {
"cpus": {
"min": 1,
"fraction": 0.5,
"max": 1
},
"memory": {
"min": "150 MB",
"fraction": 0.23,
"max": "1 GB"
},
"retry_strategy": {
"memory": {
"strategy": "exponential",
"operand": 2
}
}
},
"process2": {
"cpus": {
"min": 2,
"fraction": 0.5,
"max": 2
},
"memory": {
"min": "1500 MB",
"fraction": 0.23,
"max": "10 GB"
}
}
},
"default": {
"process1": {
"cpus": {
"min": 1,
"fraction": 0.5,
"max": 1
},
"memory": {
"min": "150 MB",
"fraction": 0.23,
"max": "1 GB"
}
},
"process2": {
"cpus": {
"min": 2,
"fraction": 0.5,
"max": 2
},
"memory": {
"min": "1500 MB",
"fraction": 0.23,
"max": "10 GB"
}
}
}
}
```

A resource group with the label `default` is required in the JSON. Additionally, groups labeled as F-series and M-series allocations can be provided and will be automatically detected. For example, an F16 node will be recognized and the corresponding group label `f16` will automatically be loaded if defined in the resources JSON. A specific group can also be selected through configuration with the parameter `resource_allocation_profile_tag`.

## Example
An example of integrating the handler and setting up allocations:
```Nextflow
methods {
...
setup = {
...
resource_handler.handle_resources("${projectDir}/config/resources.json")
}
}
```
302 changes: 302 additions & 0 deletions config/resource_handler/resource_handler.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,302 @@
import nextflow.util.SysHelper
import groovy.json.JsonSlurper

includeConfig "../retry/retry.config"

class SystemResources {
private Map resource_limits = [
'cpus': [
'type': java.lang.Integer,
'min': 1,
'max': SysHelper.getAvailCpus()
],
'memory': [
'type': nextflow.util.MemoryUnit,
'min': 1.MB,
'max': SysHelper.getAvailMemory()
],
'time': [
'type': nextflow.util.Duration,
'min': 1.s,
'max': 1000.d
]
]
private String resource_profile = null

SystemResources(Map params) {
// Search for config-defined resource limits
this.resource_limits.each { resource, resource_info ->
['min', 'max'].each { limit_end ->
try {
if (params.containsKey("${limit_end}_${resource}" as String)) {
this.resource_limits[resource][limit_end] = params["${limit_end}_${resource}" as String].asType(this.resource_limits[resource]['type'])
}
} catch (all) {
// Do nothing, let default value defined above take effect
}
}
}

this.identify_resource_profile()
}

Map get_limits(String type) {
return [
("min_${type}" as String): this.resource_limits[type]['min'],
("max_${type}" as String): this.resource_limits[type]['max']
]
}

Object check_limits(Object obj, String type) {
return SystemResources.check_limits(obj, type, this.resource_limits[type]['min'], this.resource_limits[type]['max'])
}

static Object check_limits(Object obj, String type, Object min, Object max) {
if (obj.compareTo(max) == 1) {
return max
} else if (obj.compareTo(min) == -1) {
return min
} else {
return obj
}
}

private void identify_resource_profile() {
// Identify if available resources match F- or M-series nodes
def cpus = this.resource_limits.cpus.max
def memory = this.resource_limits.memory.max.toGiga()

if (memory >= (cpus * 2 * 0.9 - 1) && (memory <= (cpus * 2))) {
this.resource_profile = "f${cpus}"
}

if (memory >= (cpus * 16 * 0.9 - 1) && (memory <= (cpus * 16))) {
this.resource_profile = "m${cpus}"
}
}

Object resolve_resource_allocation(Map allocation, String type) {
def min_raw = allocation['min'].asType(this.resource_limits[type]['type'])
def max_raw = allocation['max'].asType(this.resource_limits[type]['type'])

def min_allocation = this.check_limits(min_raw, type)
def max_allocation = this.check_limits(max_raw, type)

def requested_allocation = (allocation.fraction * (this.resource_limits[type]['max'])).asType(this.resource_limits[type]['type'])

return SystemResources.check_limits(requested_allocation, type, min_allocation, max_allocation)
}

String get_resource_profile_tag() {
return this.resource_profile
}
}

class PipelineAllocation {
private SystemResources system_resources
private File resource_json
private String allocation_profile
private Map processed_resources = [:]
private Map retry_configurations = [:]
private Map raw_process_resources = [:]

PipelineAllocation(Object resource_json, Object params) {
this.system_resources = new SystemResources(params)
this.resource_json = new File(resource_json)

def json_slurper = new JsonSlurper()

// TO-DO: Dump original for logging, keeping in class for now
this.raw_process_resources = json_slurper.parse(this.resource_json)

assert this.raw_process_resources instanceof Map
// TO-DO: Validate JSON is in expected format

this.select_allocation_profile(params)

// Separate the retry strategies from the base allocations
this.processed_resources.each { process, allocations ->
if (allocations.containsKey("retry_strategy")) {
def current_retry_strategy = allocations["retry_strategy"]
// Convert memory string to MemoryUnit for proper retry setup
if (current_retry_strategy.containsKey("memory")) {
if (current_retry_strategy.memory.strategy == "add") {
current_retry_strategy.memory.operand = current_retry_strategy.memory.operand as nextflow.util.MemoryUnit
}
}
this.retry_configurations[process] = current_retry_strategy
allocations.remove("retry_strategy")
}
}

// Convert string memory units to memory unit
this.processed_resources.each { process, allocations ->
for (resource_type in ["cpus", "memory", "time"]) {
if (allocations.containsKey(resource_type)) {
allocations[resource_type] = this.system_resources.resolve_resource_allocation(allocations[resource_type], resource_type)
}
}
}

// TO-DO: Singularize processes that may be separated with `|`
}

private void load_resource_profile(String profile_tag) {
this.processed_resources = this.raw_process_resources[profile_tag]

if (!this.processed_resources) {
throw new Exception(" ### ERROR ### Failed to find requested resource profile: `${profile_tag}`")
}
}

private void apply_custom_allocations(Map custom_allocations) {
custom_allocations.each { process, custom_allocation ->
custom_allocation.each { resource_type, allocation ->
this.processed_resources[process][resource_type] = this.system_resources.check_limits(allocation, resource_type)
}
}
}

private void select_allocation_profile(Map params) {
String profile_tag = null

// Try for user-given profile
if (params.containsKey('resource_allocation_profile_tag') && params.resource_allocation_profile_tag) {
profile_tag = params.resource_allocation_profile_tag
this.load_resource_profile(profile_tag)
return
}

// Try loading detected tag based on system resources
profile_tag = this.system_resources.get_resource_profile_tag()

if (profile_tag) {
try {
this.load_resource_profile(profile_tag)
return
} catch (all) {
throw new Exception(" ### ERROR ### Failed to load requested profile: ${profile_tag}. Please check and provide a valid tag.")
}
}

// Resort to loading `default` profile
this.load_resource_profile('default')
}

// TO-DO: functionality to dump original loaded JSON to file for logging

void update_base_allocation(String resource, String process, Object multiplier) {
if (this.processed_resources.containsKey(process) && this.processed_resources[process].containsKey(resource)) {
this.processed_resources[process][resource] = this.system_resources.check_limits(this.processed_resources[process][resource] * multiplier, resource)
} else {
System.out.println(" ### WARNING ### No base value found for resource `${resource}` for process `${process}`. Update will be skipped.")
}
}

// Apply base resource updates
void apply_base_updates(Map resource_updates) {
resource_updates.each { resource, updates ->
updates.each { processes, multiplier ->
List processes_to_update = (processes instanceof String || processes instanceof GString) ? [processes] : processes

if (processes_to_update == []) {
processes_to_update = this.processed_resources.keySet() as List
}

processes_to_update.each { process ->
this.update_base_allocation(resource, process, multiplier)
}
}
}
}

Map get_base_resource_allocations() {
return this.processed_resources
}

Map get_retry_configuration() {
return this.retry_configurations
}

void print_resources() {
System.out.println(this.processed_resources)
}

SystemResources get_system_resources() {
return this.system_resources
}
}

resource_handler {
set_retry = { String proc_key, String proc_name, String type ->
if (process[proc_key]?[type] && \
params.retry_information?[proc_name]?[type] && \
params.retry_information?[proc_name]?[type]?.operand && \
params.retry_information?[proc_name]?[type]?.strategy) {
process[proc_key][type] = { retry.retry_updater(params.base_allocations[task.process.split(':')[-1]][type], \
params.retry_information[task.process.split(':')[-1]][type].strategy, \
params.retry_information[task.process.split(':')[-1]][type].operand, \
task.attempt, \
type) }
}
}

set_resource_limit_params = { SystemResources system_resources ->
["cpus", "memory", "time"].each { resource_type ->
system_resources.get_limits(resource_type).each { resource_limit_key, resource_limit ->
params[resource_limit_key] = resource_limit
}
}
}

setup_retry = { Map resources_to_allocate, Map retry_configuration ->
params.retry_information = [:]

for (proc_allocation in resources_to_allocate) {
def proc_key = "withName:${proc_allocation.key}" as String
def allocation = proc_allocation.value

def retry_strategy = retry_configuration.getOrDefault(proc_allocation.key, null)
if (retry_strategy) {
params.retry_information[proc_allocation.key] = retry_strategy
}

for (resource_allocation in allocation) {
process[proc_key]["${resource_allocation.key}"] = resource_allocation.value
if (retry_strategy && retry_strategy.containsKey(resource_allocation.key)) {
resource_handler.set_retry(proc_key, proc_allocation.key, 'cpus')
resource_handler.set_retry(proc_key, proc_allocation.key, 'memory')
}
}
}
}

handle_resources = { Object resource_file, Map customized_allocations=[:], Map current_params=params ->
// Load base.config by default for all pipelines
includeConfig "${projectDir}/config/base.config"

def allocation_handler = new PipelineAllocation(resource_file, current_params)
def system_resources = allocation_handler.get_system_resources()

// Set params for limits for each resources
resource_handler.set_resource_limit_params(system_resources)

// Apply custom allocations if given
if (customized_allocations) {
allocation_handler.apply_custom_allocations(customized_allocations)
}

// Apply base resource updates if given
if (params.containsKey('base_resource_update') && params.base_resource_update) {
allocation_handler.apply_base_updates(params.base_resource_update)
}

// Set base allocations and retry
Map resources_to_allocate = allocation_handler.get_base_resource_allocations()
Map retry_configuration = allocation_handler.get_retry_configuration()

params.base_allocations = resources_to_allocate

resource_handler.setup_retry(resources_to_allocate, retry_configuration)
}
}
2 changes: 1 addition & 1 deletion tests/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ ENV NXF_LAUNCHER=/.nextflow/tmp/launcher/nextflow-one_${NEXTFLOW_VERSION}/buildk
RUN sed \
-i \
-e 's/"nextflow.cli.Launcher"/"groovy.ui.GroovyMain"/' \
-e 's|"-classpath" "|"-classpath" "/bljars/junit-4.13.2.jar:/bljars/hamcrest-core-1.3.jar:/bljars/groovy-test-3.0.19.jar:/bljars/system-rules-1.19.0.jar:$BL_CLASSPATH:|' \
-e "s|\"-classpath\" \"|\"-classpath\" \"$(find /bljars/ -not -name 'groovy-3*' -type f -printf "%p:"):|" \
${NXF_LAUNCHER}/classpath-${NEXTFLOW_MD5}

COPY validator /usr/local/validator
Expand Down
Loading

0 comments on commit 07c99b8

Please sign in to comment.