diff --git a/multi_node_scaling_OpenCoarrays.sl b/multi_node_scaling_OpenCoarrays.sl new file mode 100644 index 0000000..3f18a64 --- /dev/null +++ b/multi_node_scaling_OpenCoarrays.sl @@ -0,0 +1,14 @@ +#!/bin/bash +#SBATCH --nodes=16 +#SBATCH --time=12:00:00 +#SBATCH --account=nstaff +#SBATCH --qos=regular +#SBATCH --constraint=cpu +#SBATCH --cpus-per-task=1 + +ml OpenCoarrays +for i in {1..3}; do + for n in {1..16}; do + fpm run --compiler gfortran-workaround.sh --flag "-O3 -fcoarray=lib" --example lu_decomp --runner "srun --nodes=${n} --ntasks=${n} --cpu-bind=cores" -- example/lu_decomp/100x100.dat + done +done \ No newline at end of file diff --git a/multi_node_scaling_cray.sl b/multi_node_scaling_cray.sl new file mode 100644 index 0000000..05f5e20 --- /dev/null +++ b/multi_node_scaling_cray.sl @@ -0,0 +1,14 @@ +#!/bin/bash +#SBATCH --nodes=16 +#SBATCH --time=02:00:00 +#SBATCH --account=nstaff +#SBATCH --qos=regular +#SBATCH --constraint=cpu +#SBATCH --cpus-per-task=1 + +ml PrgEnv-cray +for i in {1..3}; do + for n in {1..16}; do + fpm run --compiler crayftn-workaround.sh --flag "-O3" --example lu_decomp --runner "srun --nodes=${n} --ntasks=${n} --cpu-bind=cores" -- example/lu_decomp/100x100.dat + done +done \ No newline at end of file diff --git a/single_image_timing_OpenCoarrays.sl b/single_image_timing_OpenCoarrays.sl new file mode 100644 index 0000000..989515c --- /dev/null +++ b/single_image_timing_OpenCoarrays.sl @@ -0,0 +1,15 @@ +#!/bin/bash +#SBATCH --nodes=1 +#SBATCH --ntasks=1 +#SBATCH --time=02:00:00 +#SBATCH --account=nstaff +#SBATCH --qos=regular +#SBATCH --constraint=cpu +#SBATCH --cpus-per-task=1 + +ml OpenCoarrays +for i in {1..3}; do + for n in {1..100}; do + fpm run --compiler gfortran-workaround.sh --flag "-O3 -fcoarray=lib" --example lu_decomp --runner srun -- example/lu_decomp/${n}x${n}.dat + done +done \ No newline at end of file diff --git a/single_image_timing_cray.sl b/single_image_timing_cray.sl new file mode 100644 index 0000000..2373a57 --- /dev/null +++ b/single_image_timing_cray.sl @@ -0,0 +1,15 @@ +#!/bin/bash +#SBATCH --nodes=1 +#SBATCH --ntasks=1 +#SBATCH --time=02:00:00 +#SBATCH --account=nstaff +#SBATCH --qos=regular +#SBATCH --constraint=cpu +#SBATCH --cpus-per-task=1 + +ml PrgEnv-cray +for i in {1..3}; do + for n in {1..100}; do + fpm run --compiler crayftn-workaround.sh --flag "-O3" --example lu_decomp --runner srun -- example/lu_decomp/${n}x${n}.dat + done +done \ No newline at end of file diff --git a/single_node_scaling_OpenCoarrays.sl b/single_node_scaling_OpenCoarrays.sl new file mode 100644 index 0000000..e77189b --- /dev/null +++ b/single_node_scaling_OpenCoarrays.sl @@ -0,0 +1,14 @@ +#!/bin/bash +#SBATCH --nodes=1 +#SBATCH --time=24:00:00 +#SBATCH --account=nstaff +#SBATCH --qos=regular +#SBATCH --constraint=cpu +#SBATCH --cpus-per-task=1 + +ml OpenCoarrays +for i in {1..3}; do + for n in {1..16}; do + fpm run --compiler gfortran-workaround.sh --flag "-O3 -fcoarray=lib" --example lu_decomp --runner "srun --ntasks=${n} --cpu-bind=cores" -- example/lu_decomp/100x100.dat + done +done \ No newline at end of file diff --git a/single_node_scaling_cray.sl b/single_node_scaling_cray.sl new file mode 100644 index 0000000..ddc2633 --- /dev/null +++ b/single_node_scaling_cray.sl @@ -0,0 +1,14 @@ +#!/bin/bash +#SBATCH --nodes=1 +#SBATCH --time=02:00:00 +#SBATCH --account=nstaff +#SBATCH --qos=regular +#SBATCH --constraint=cpu +#SBATCH --cpus-per-task=1 + +ml PrgEnv-cray +for i in {1..3}; do + for n in {1..16}; do + fpm run --compiler crayftn-workaround.sh --flag "-O3" --example lu_decomp --runner "srun --ntasks=${n} --cpu-bind=cores" -- example/lu_decomp/100x100.dat + done +done \ No newline at end of file diff --git a/src/runner_m.f90 b/src/runner_m.f90 index 571286a..1aac573 100644 --- a/src/runner_m.f90 +++ b/src/runner_m.f90 @@ -1,276 +1,126 @@ module runner_m !! Compute-image/Scheduler-image abstraction use dag_m, only: dag_t - use iso_fortran_env, only: event_type + use iso_fortran_env, only: event_type, atomic_int_kind use payload_m, only: payload_t implicit none private public :: run - - type :: payload_list_t - type(payload_t), allocatable :: payloads(:) - end type - - type(payload_list_t), allocatable :: mailbox[:] - !! storage for communicating inputs/outputs between tasks - - logical, allocatable :: mailbox_entry_can_be_freed(:)[:] - !! used by the scheduler image to tell the worker images when they can release old data. - - type(event_type), allocatable :: ready_for_next_task(:)[:] - type(event_type) task_assigned[*] - - integer task_identifier[*] - !! The ID of the task currently assigned to this image. - - integer, allocatable :: task_assignment_history(:)[:] - !! Records which image did which task. - !! Index: task number. Value: image number. - - logical, allocatable :: task_done(:) - - - integer, parameter :: scheduler_image = 1 - integer, parameter :: no_task_assigned = -1 - integer, parameter :: NO_TASK_READY = -1 - integer, parameter :: ALL_TASKS_DONE = -2 - integer, parameter :: NO_IMAGE_READY = -1 - contains - subroutine run(dag) - implicit none type(dag_t), intent(in) :: dag - logical :: tasks_left - - if (num_images() == 1) then - call run_single_image(dag) - else - task_identifier = no_task_assigned - associate(n_tasks => size(dag%vertices), n_imgs => num_images()) - allocate(ready_for_next_task(n_imgs)[*]) - allocate(mailbox[*]) - allocate(mailbox%payloads(n_tasks)) - sync all - allocate(mailbox_entry_can_be_freed(n_tasks)[*]) - mailbox_entry_can_be_freed(n_tasks) = .false. - allocate(task_assignment_history(n_tasks)[*]) - task_assignment_history = NO_IMAGE_READY - if (this_image() == scheduler_image) then - allocate(task_done(n_tasks)) - task_done = .false. + integer(atomic_int_kind), parameter :: UNCLAIMED = 0_atomic_int_kind + integer, parameter :: NONE_AVAILABLE = 0 + integer :: n_tasks, n_imgs, num_dependencies + integer(atomic_int_kind) :: me + integer(atomic_int_kind), allocatable :: taken_on(:)[:] !! Records who executed a given task + integer(atomic_int_kind), allocatable :: num_tasks_completed[:] !! keep count of the tasks completed + type(event_type), allocatable :: task_completed(:)[:] !! To notify the other images when task has been completed + type(payload_t), allocatable :: mailbox(:)[:] !! storage for communicating inputs/outputs between tasks + integer, allocatable :: dependencies(:) + + call initialize + do while (not_all_tasks_completed()) + associate(next_task => find_next_task()) + if (next_task /= NONE_AVAILABLE) then + if (claim_task(next_task)) then + call execute_task(next_task) + call notify_task_completed(next_task) + end if end if end associate - - tasks_left = .true. - - do while (tasks_left) - if (this_image() == scheduler_image) then - tasks_left = assign_task(dag) - else - tasks_left = do_work(dag) - end if - end do - end if - end subroutine - - function do_work(dag) result(tasks_left) - type(dag_t), intent(in) :: dag - logical :: tasks_left - - event post(ready_for_next_task(this_image())[scheduler_image]) - event wait(task_assigned) - - !! It's probably better to introduce this only after some more testing -- HS - !free_unneeded_memory: do concurrent(integer :: l = 0:size(mailbox)) - ! if(mailbox_entry_can_be_freed(i)) then - ! deallocate(mailbox(i)%payload_) - ! mailbox_entry_can_be_freed(i) = .false. - ! end if - !end do free_unneeded_memory - - if (task_identifier == ALL_TASKS_DONE) then - tasks_left = .false. - else - do_assigned_task: associate(my_task => dag%vertices(task_identifier)%task) - block - integer, allocatable :: upstream_task_nums(:) - integer, allocatable :: upstream_task_imagenums(:) - integer :: i - type(payload_t), allocatable :: arguments(:) - - ! figure out which images have our input data - upstream_task_nums = dag%dependencies_for(task_identifier) - upstream_task_imagenums = & - [(task_assignment_history(upstream_task_nums(i))[scheduler_image], i = 1, size(upstream_task_nums))] - - arguments = [ ( payload_t(mailbox[upstream_task_imagenums(i)]%payloads(upstream_task_nums(i))%payload_), & - i = 1, size(upstream_task_nums) ) ] - - ! execute task, store result - mailbox%payloads(task_identifier) = & - my_task%execute(arguments) - - end block - tasks_left = .true. - end associate do_assigned_task - end if - end function - - function find_next_image() result(next_image) - integer :: next_image, i, ev_count, task_just_completed - - next_image = NO_IMAGE_READY - do i = 1, size(ready_for_next_task) - if (i == scheduler_image) cycle ! no need to check the scheduler image - - call event_query (ready_for_next_task(i), ev_count) - if (ev_count > 0) then - next_image = i - task_just_completed = (task_identifier[i]) - if (task_just_completed /= no_task_assigned) & - task_done(task_just_completed) = .true. - end if end do - end function - - function assign_task(dag) result(tasks_left) - type(dag_t), intent(in) :: dag - logical :: tasks_left - integer, allocatable, dimension(:) :: upstream_tasks, upstream_task_images - integer :: i - - associate(next_image => find_next_image()) - if (next_image /= NO_IMAGE_READY) then - associate(next_task => find_next_task(dag)) - if (next_task == NO_TASK_READY) then - tasks_left = .true. - else if (next_task == ALL_TASKS_DONE) then - call assign_completed_to_images() - tasks_left = .false. - else - event wait (ready_for_next_task(next_image)) - task_assignment_history(next_task) = next_image - - ! check which task the image just finished, that's task A - ! for each task B upstream of A, walk through that task's downstream dependencies - ! if they're all completed, the output data from B can be freed. - i = task_identifier[next_image] - if (i /= NO_TASK_READY) then - upstream_tasks = dag%dependencies_for(i) - upstream_task_images = task_assignment_history(upstream_tasks) - do i = 1, size(upstream_tasks) - if (all(task_done(dag%depends_on(upstream_tasks(i))))) then - mailbox_entry_can_be_freed(upstream_tasks(i))[upstream_task_images(i)] = .true. - end if - end do - end if - - - ! tell the image that it can proceed with the next task - task_identifier[next_image] = next_task - event post (task_assigned[next_image]) - tasks_left = .true. - end if - end associate - else - tasks_left = .true. + contains + subroutine initialize() + integer :: i + n_tasks = size(dag%vertices) + n_imgs = num_images() + me = this_image() + allocate( & + taken_on(n_tasks)[*], & + task_completed(n_tasks)[*], & + mailbox(n_tasks)[*], & + num_tasks_completed[*]) + if (me == 1) then + do i = 1, n_tasks + call atomic_define(taken_on(i), UNCLAIMED) + end do + call atomic_define(num_tasks_completed, 0) end if - end associate - end function - - subroutine assign_completed_to_images() - integer :: i, task_just_completed - - do i = 1, size(ready_for_next_task) - if (i == scheduler_image) cycle ! don't wait on the scheduler image - - event wait (ready_for_next_task(i)) - task_just_completed = task_identifier[i] - if (task_just_completed /= no_task_assigned) & - task_done(task_just_completed) = .true. - task_identifier[i] = ALL_TASKS_DONE - event post (task_assigned[i]) - end do - end subroutine - - pure function find_next_task ( dag ) Result ( next_task_to_run ) -!! find_next_task: search through the dag to find the next task where its -!! dependencies are complete -!! -!! possible outputs for next_task_to_run -!! - a positive integer signals the next task to run -!! - 'ALL_TASKS_DONE' signals all tasks are done -!! - 'NO_TASK_READY' signals that no tasks are ready to run -!! - implicit none + sync all + end subroutine - type(dag_t), intent(in) :: dag - integer :: next_task_to_run + function not_all_tasks_completed() + logical :: not_all_tasks_completed - integer :: task, depends - integer, allocatable, dimension(:) :: dependencies - logical :: done, all_done + integer :: tasks_completed - all_done = .true. - next_task_to_run = NO_TASK_READY + call atomic_ref(tasks_completed, num_tasks_completed[1]) + not_all_tasks_completed = tasks_completed < n_tasks + end function - do task = 1, size(task_done) - if ( task_done(task) .or. task_assignment_history(task) /= NO_IMAGE_READY ) then - cycle - else - all_done = .false. - dependencies = dag%dependencies_for ( task ) - done = .true. - do depends = 1, size(dependencies) - done = done .and. task_done(dependencies(depends)) - end do - if ( done ) then - next_task_to_run = task - exit - else - cycle - end if - end if - end do + function find_next_task() + integer :: find_next_task + + integer :: task, dependency, dependency_completed, taken_by + + task_loop: & + do task = 1, n_tasks + call atomic_ref(taken_by, taken_on(task)[1]) + if (taken_by == UNCLAIMED) then + dependencies = dag%dependencies_for(task) + num_dependencies = size(dependencies) + do dependency = 1, num_dependencies + call event_query(task_completed(dependencies(dependency)), dependency_completed) + if (dependency_completed == 0) cycle task_loop + end do + find_next_task = task + return + end if + end do task_loop + find_next_task = NONE_AVAILABLE + end function + function claim_task(task) + integer, intent(in) :: task + logical :: claim_task - if ( all_done ) then - next_task_to_run = ALL_TASKS_DONE - end if + integer(atomic_int_kind) :: old - end function find_next_task + call atomic_cas(taken_on(task)[1], old, UNCLAIMED, me) + claim_task = old == UNCLAIMED + end function - subroutine run_single_image(dag) - type(dag_t), intent(in) :: dag + subroutine execute_task(task) + integer, intent(in) :: task - type(payload_t), allocatable :: results(:) - logical, allocatable :: done(:) + integer :: dependency + type(payload_t), allocatable :: arguments(:) - associate(num_tasks => size(dag%vertices)) - allocate(results(num_tasks), done(num_tasks)) - done = .false. - do while (.not. all(done)) - associate(next_task => find_next_task()) - results(next_task) = dag%vertices(next_task)%task%execute(results(dag%dependencies_for(next_task))) - done(next_task) = .true. - end associate + allocate(arguments(num_dependencies)) + do dependency = 1, num_dependencies + event wait (task_completed(dependencies(dependency))) end do - end associate - contains - pure function find_next_task() result(next_task) - integer :: next_task + do concurrent (dependency = 1 : num_dependencies) + arguments(dependency) = mailbox(dependencies(dependency))[taken_on(dependencies(dependency))[1]] + end do + mailbox(task) = dag%vertices(task)%task%execute(arguments) + end subroutine - integer :: task + subroutine notify_task_completed(task) + integer, intent(in) :: task - do task = 1, size(dag%vertices) - if (.not.done(task) .and. all(done(dag%dependencies_for(task)))) then - next_task = task - return - end if + integer :: img, num_downstream, n + + num_downstream = size(dag%depends_on(task)) + do img = 1, n_imgs + do n = 1, num_downstream + event post (task_completed(task)[img]) + end do end do - end function + call atomic_add(num_tasks_completed[1], 1) + end subroutine end subroutine end module