Skip to content

Commit

Permalink
Merge branch 'develop/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
tingeman committed Dec 12, 2021
2 parents a06d6a5 + 9d84ef2 commit fd2ec29
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 51 deletions.
33 changes: 28 additions & 5 deletions source/IO/OUT/OUT_all_tagged.m
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,9 @@
if ~(exist([result_path run_name])==7)
mkdir([result_path run_name])
end
if isempty(out_tag) || all(isnan(out_tag))
save([result_path run_name '/' run_name '_' datestr(t,'yyyymmdd') '.mat'], 'out')
else
save([result_path run_name '/' run_name '_' out_tag '_' datestr(t,'yyyymmdd') '.mat'], 'out')
end

filename = get_filename(out, t, result_path, run_name);
save(filename, 'out')

% Clear the out structure
out.STRATIGRAPHY=[];
Expand All @@ -180,6 +178,31 @@

xls_out = {'OUT','index',NaN,NaN;'OUT_all',1,NaN,NaN;'output_timestep',0.250000000000000,'[days]',NaN;'save_date','01.09.','provide in format dd.mm.',NaN;'save_interval',1,'[y]','if left empty, the entire output will be written out at the end';'OUT_END',NaN,NaN,NaN};
end

function filename = get_filename(out, t, result_path, run_name)
% compose and return out-filename

out_tag = out.PARA.tag;

if isnumeric(t)
if length(t) == 3
out_date = datestr(datetime(t(1), t(2), t(3)), 'yyyymmdd');
elseif length(t) == 1
out_date = datestr(t, 'yyyymmdd');
else
error(['Unknown date formate: ' t])
end
else
error(['Unknown date formate: ' t])
end

if isempty(out_tag) || all(isnan(out_tag))
filename = [result_path run_name '/' run_name '_' out_date '.mat'];
else
filename = [result_path run_name '/' run_name '_' out_tag '_' out_date '.mat'];
end

end

end
end
131 changes: 89 additions & 42 deletions source/IO/RUN_INFO/RUN_3D_PARALLEL.m
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
run_info.PARA.number_of_cores = [];
run_info.PARA.number_of_tiles = []; %3;
run_info.PARA.param_file_number = []; %[1;2;3];
run_info.PARA.run_mode = 'parallel'; % or 'sequential'

run_info.PARA.connected = [];
run_info.PARA.contact_length = [];
Expand All @@ -43,64 +44,109 @@
% end

function run_info = finalize_init(run_info)

if isempty(run_info.PARA.run_mode) || isnan(run_info.PARA.run_mode)
run_info.PARA.run_mode = 'parallel';
end
end



function [run_info, tile] = run_model(run_info)
function [run_info, tile] = run_model(run_info, run_flag)
%this could first open spmd and assign run_number depending on
%worker, then do another round of pprovider
%it could also do a loop over different tile representing
%different sections of the run, e.g. initial inial init, spin-up, actual run

%
% The run_info and tile instances returned are not the ones
% actually used in the runs, it is the template instances
% that were passed to the function.
%
% run_flag is a flag to indicate whether the model should be
% run or only initialized. Setting it to false is not very
% meaningful in this class, since initialization will be lost
% due to the parallelization. But in sequential mode, the last
% tile wil be returned in initialized state.

%for tile_id = 1:run_info.PARA.number_of_tiles
% exceptions.(['TILE_', num2str(tile_id)]) = struct();
%end
if ~exist('run_flag', 'var')
% if run_flag is not passed, default to true
run_flag = true;
end

err_out = cell(run_info.PARA.number_of_tiles);

this_pool = gcp('nocreate');
if isempty(this_pool)
this_pool = parpool([1 run_info.PARA.number_of_cores]);
disp(['Using new parpool with ' num2str(this_pool.NumWorkers) ' workers.'])
else
disp(['Using existing parpool with ' num2str(this_pool.NumWorkers) ' workers.'])
end


parfor tile_id = 1:run_info.PARA.number_of_tiles
% make copy of template run_info, to modify in this process
this_run_info = copy(run_info);

this_run_info.PARA.worker_number = tile_id; % assign id
tStart = datetime(datestr(now));

if strcmpi(run_info.PARA.run_mode, 'parallel')
this_pool = gcp('nocreate');
if isempty(this_pool)
this_pool = parpool([1 run_info.PARA.number_of_cores]);
disp(['Using new parpool with ' num2str(this_pool.NumWorkers) ' workers.'])
else
disp(['Using existing parpool with ' num2str(this_pool.NumWorkers) ' workers.'])
end


% prepare error collection in case of exceptions
err_out{tile_id}.tile_id = tile_id;
err_out{tile_id}.OK = true;

try
% initialize and run tile instance
[out_run_info, out_tile] = kernel_run_model(this_run_info);
catch ME
% catch and store error for saving after pool completes
error_timestamp = now;

parfor tile_id = 1:run_info.PARA.number_of_tiles
% make copy of template run_info, to modify in this process
this_run_info = copy(run_info);
this_run_info.PARA.worker_number = tile_id; % assign id

% prepare error collection in case of exceptions
err_out{tile_id}.tile_id = tile_id;
err_out{tile_id}.OK = false;
err_out{tile_id}.run_info = this_run_info;
err_out{tile_id}.timestamp = error_timestamp;
err_out{tile_id}.MException = ME;

% we cannot use save inside parfor, so we have to store
% the information and save it after.
err_out{tile_id}.OK = true;

try
% initialize and run tile instance
[out_run_info, out_tile] = kernel_run_model(this_run_info, run_flag);
catch ME
% catch and store error for saving after pool completes
error_timestamp = now;

err_out{tile_id}.tile_id = tile_id;
err_out{tile_id}.OK = false;
err_out{tile_id}.run_info = this_run_info;
err_out{tile_id}.timestamp = error_timestamp;
err_out{tile_id}.MException = ME;

% we cannot use save inside parfor, so we have to store
% the information and save it after.
end
end
tile = run_info.TILE;

elseif strcmpi(run_info.PARA.run_mode, 'sequential')
for tile_id = 1:run_info.PARA.number_of_tiles
this_run_info = copy(run_info);
this_run_info.PARA.worker_number = tile_id; % assign id

% prepare error collection in case of exceptions
err_out{tile_id}.tile_id = tile_id;
err_out{tile_id}.OK = true;

try
% initialize and run tile instance
[out_run_info, out_tile] = kernel_run_model(this_run_info, run_flag);
catch ME
% catch and store error for saving after pool completes
error_timestamp = now;

err_out{tile_id}.tile_id = tile_id;
err_out{tile_id}.OK = false;
err_out{tile_id}.run_info = this_run_info;
err_out{tile_id}.timestamp = error_timestamp;
err_out{tile_id}.MException = ME;
end
end

run_info = out_run_info;
tile = out_tile;

end

deltatime = datetime(datestr(now))-tStart;
fprintf('Elapsed time: ');
disp(deltatime);

% now save any error logs
for tid = 1:length(run_info.PARA.number_of_tiles)
if ~err_out{tid}.OK
Expand All @@ -113,13 +159,12 @@
info_out = err_out{tid};
save(error_log_file, '-struct', 'info_out');
end
end
end

tile = run_info.TILE;
end


function [run_info, tile] = kernel_run_model(run_info)
function [run_info, tile] = kernel_run_model(run_info, run_flag)
% This is the actual normal run_model method. It is extracted
% in separate method to more easily enclose its execution in
% a try-catch block in the new run_model method.
Expand All @@ -141,7 +186,9 @@

tile = finalize_init(tile);

tile = run_model(tile); %time integration
if run_flag
tile = run_model(tile); %time integration
end
end


Expand Down
4 changes: 0 additions & 4 deletions source/IO/TILE/TILE_1D_standard.m
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@
%=========================================================================
%TIME INTEGRATION
%=========================================================================
tic
while tile.t < tile.FORCING.PARA.end_time

%interpolate focing data to time t
Expand Down Expand Up @@ -194,9 +193,6 @@
% console window.
fprintf('\n\n')

% print elapsed time since tic
toc
fprintf('\n')
end


Expand Down

0 comments on commit fd2ec29

Please sign in to comment.