Skip to content

Commit

Permalink
fixed (or hacked anyway) a sorting bug that caused problems with larg…
Browse files Browse the repository at this point in the history
…e sacct queries. basically, you can get lots of dupes when you break up a sacct query (for jobs that span the jobs subdomains). modified the index columt to be an index of the whole list, rather than the subqueries.
  • Loading branch information
markyoder committed Apr 22, 2022
1 parent 47c99c4 commit d16fd6a
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 27 deletions.
19 changes: 8 additions & 11 deletions get_usage_sherlock_normal.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,20 @@
#SBATCH -o sacct_sherlock_out_normal.out
#SBATCH -e sacct_sherlock_out_normal.err
#SBATCH -p normal
#SBATCH --mem-per-cpu=4g
#
# can we pass "\t" correctly? maybe for now, just use the HPC standard "|" as a delimiter...
STARTTIME="2019-11-01"
ENDTIME="2020-11-30"
FORMAT="User,Group,GID,JobID,Jobname,partition,state,Submit,time,Eligible,start,end,elapsed,MaxRss,MaxVMSize,nnodes,ncpus,SystemCPU,UserCPU,TotalCPU,Suspended"
FORMAT="User,Group,GID,Jobname,JobID,JobIDRaw,partition,state,time,ncpus,nnodes,Submit,Eligible,start,end,elapsed,SystemCPU,UserCPU,TotalCPU,NTasks,CPUTimeRaw,Suspended,ReqTRES,AllocTRES"
#FORMAT="ALL"
PARTITION="normal"
#
#srun sacct -a -p --delimiter="|" --${STARTTIME} --format=User,Group,GID,JobID,Jobname,partition,state,Submit,time,Eligible,start,end,elapsed,MaxRss,MaxVMSize,nnodes,ncpus,SystemCPU,UserCPU,TotalCPU,Suspended

srun sacct --allusers --partition=$PARTITION -p --delimiter="|" --starttime=2019-08-16 --endtime=2019-09-15 --format=${FORMAT}
srun sacct --allusers --partition=$PARTITION -p --delimiter="|" --starttime=2019-09-16 --endtime=2019-10-15 --format=${FORMAT}
srun sacct --allusers --partition=$PARTITION -p --delimiter="|" --starttime=2019-10-16 --endtime=2019-11-15 --format=${FORMAT}
srun sacct --allusers --partition=$PARTITION -p --delimiter="|" --starttime=2019-11-16 --endtime=2019-12-15 --format=${FORMAT}
srun sacct --allusers --partition=$PARTITION -p --delimiter="|" --starttime=2019-12-16 --endtime=2020-01-15 --format=${FORMAT}
srun sacct --allusers --partition=$PARTITION -p --delimiter="|" --starttime=2020-01-16 --endtime=2020-02-15 --format=${FORMAT}

#srun sacct --allusers --partition=hns -p --delimiter="|" --starttime=${STARTTIME} --endtime=${ENDTIME} --format=${FORMAT}
sacct --partition=normal --delimiter="|" -p --allusers --starttime=2022-03-01T00:00:00 --endtime=2022-04-01T00:00:00 --format=${FORMAT}

sacct --noheader --partition=normal --delimiter="|" -p --allusers --starttime=2022-02-01T00:00:00 --endtime=2022-03-01T00:00:00 --format=${FORMAT}
sacct --noheader --partition=normal --delimiter="|" -p --allusers --starttime=2022-01-01T00:00:00 --endtime=2022-02-01T00:00:00 --format=${FORMAT}
sacct --noheader --partition=normal --delimiter="|" -p --allusers --starttime=2021-12-01T00:00:00 --endtime=2022-01-01T00:00:00 --format=${FORMAT}
sacct --noheader --partition=normal --delimiter="|" -p --allusers --starttime=2021-11-01T00:00:00 --endtime=2021-12-01T00:00:00 --format=${FORMAT}
sacct --noheader --partition=normal --delimiter="|" -p --allusers --starttime=2021-10-01T00:00:00 --endtime=2021-11-01T00:00:00 --format=${FORMAT}

110 changes: 94 additions & 16 deletions hpc_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -1336,9 +1336,9 @@ class SACCT_data_direct(SACCT_data_handler):
format_list_default = ['User', 'Group', 'GID', 'Jobname', 'JobID', 'JobIDRaw', 'partition', 'state', 'time', 'ncpus',
'nnodes', 'Submit', 'Eligible', 'start', 'end', 'elapsed', 'SystemCPU', 'UserCPU',
'TotalCPU', 'NTasks', 'CPUTimeRaw', 'Suspended', 'ReqTRES', 'AllocTRES']
def __init__(self, group=None, partition=None, delim='|', start_date=None, end_date=None, more_options=[], delta_t_days=30,
def __init__(self, group=None, partition=None, delim='|', start_date=None, end_date=None, more_options=[], delta_t_days=10,
format_list=None, n_cpu=None, types_dict=None, verbose=0, chunk_size=1000,
h5out_file=None, keep_raw_data=False, n_points_usage=1000,
h5out_file=None, keep_raw_data=False, raw_output_file=None, n_points_usage=1000,
**kwargs):
#
# TODO: this is a mess. I think it needs to be a bit nominally sloppier, then cleaner -- so maybe read the first block,
Expand Down Expand Up @@ -1387,7 +1387,6 @@ def __init__(self, group=None, partition=None, delim='|', start_date=None, end_d
options_str += f' --{op}={vl} '
#
#

######################
# process start_/end_date
if end_date is None or end_date == '':
Expand All @@ -1404,17 +1403,32 @@ def __init__(self, group=None, partition=None, delim='|', start_date=None, end_d
#
start_end_times = [(start_date, min(start_date + dtm.timedelta(days=delta_t_days), end_date))]
while start_end_times[-1][1] < end_date:
start_end_times += [[start_end_times[-1][1], min(start_end_times[-1][1] + dtm.timedelta(days=30), end_date) ]]
start_end_times += [[start_end_times[-1][1], min(start_end_times[-1][1] + dtm.timedelta(days=delta_t_days), end_date) ]]
#
self.start_end_times = start_end_times
##########################
#
self.__dict__.update({ky:val for ky,val in locals().items() if not ky in ('self', '__class__')})
#
print('*** DEBUG: Now execute load_sacct_data()')
print('*** DEBUG: Now execute load_sacct_data(); options_str={}'.format(options_str))
self.data = self.load_sacct_data(options_str=options_str, format_string=format_string)
#
if not raw_output_file is None:
if raw_output_file.endswith('h5'):
# TODO: Not totally positive this works...
dta_to_h5 = array_to_hdf5_dataset(input_array=self.data, dataset_name='raw_data', output_fname=raw_output_file, h5_mode='w', verbose=0)
else:
with open('{}.csv'.format(os.path.splitext(raw_output_file)[0]), 'w' ) as fout:
#fout.write('\t'.join(self.headers))
fout.write('{}\n'.format('\t'.join(self.data.dtype.names)) )
for rw in self.data:
#fout.write('\t'.join(rw[:-1].split(delim)))
#fout.write('{}\n'.format(rw))
fout.write('{}\n'.format('\t'.join(list(rw))))
#
#
print('*** DEBUG: load_sacct_data() executed. Compute calc_jobs_summary()')
print('*** DEBUG: data stuff: ', len(self.data), self.data.dtype)
#self.jobs_summary=self.calc_jobs_summary(data)
#
super(SACCT_data_direct, self).__init__(delim=delim, start_date=start_date, end_date=end_date, h5out_file=h5out_file, keep_raw_data=keep_raw_data,n_points_usage=n_points_usage, n_cpu=n_cpu, types_dict=types_dict, verbose=verbose, chunk_size=chunk_size, **kwargs)
Expand All @@ -1425,21 +1439,36 @@ def __init__(self, group=None, partition=None, delim='|', start_date=None, end_d
def load_data(self, *args, **kwargs):
pass
#
def load_sacct_data(self, options_str='', format_string='', n_cpu=None, start_end_times=None, max_rows=None, verbose=False):
def load_sacct_data(self, options_str='', format_string='', n_cpu=None, start_end_times=None, max_rows=None, verbose=None):
#
n_cpu = n_cpu or self.n_cpu
n_cpu = n_cpu or 4
#
# might want to force n_cpu>1...
#n_cpu = numpy.min(n_cpu,2)
#
#, raw_data_out_file=None
#raw_data_out_file = raw_data_out or self.raw_data_out_file
#
verbose = verbose or self.verbose
verbose = verbose or False
#
start_end_times = start_end_times or self.start_end_times
sacct_str_template = self.sacct_str_template
#
# for reference, stash a copy of the ideal sacct str (ie, with full date range)
self.sacct_str = sacct_str_template.format(options_str, datetime_to_SLURM_datestring(start_end_times[0][0]),
datetime_to_SLURM_datestring(start_end_times[-1][-1]), format_string )
if verbose:
print(f'** DEBUG: sacct_str:: {self.sacct_str}')
#
if n_cpu == 1:
sacct_out = None
#sacct_out = None
data = None
for k, (start, stop) in enumerate(start_end_times):
if verbose:
print('** DEBUG: processing SACCT data, n_cpu=1, start: {}, end: {}'.format(start, stop))
#
#sacct_str = 'srun sacct {} {} -p --allusers --starttime={} --endtime={} --format={} '.format( ('--noheader' if k>0 else ''),
# options_str, datetime_to_SLURM_datestring(start),
# datetime_to_SLURM_datestring(stop), format_string )
Expand All @@ -1449,12 +1478,27 @@ def load_sacct_data(self, options_str='', format_string='', n_cpu=None, start_en
if verbose: print('** [{}]: {}\n'.format(k, sacct_str))
#
#sacct_out += subprocess.run(sacct_str.split(), stdout=subprocess.PIPE).stdout.decode()
if sacct_out is None:
sacct_out = self.get_and_process_sacct_data(sacct_str)
# FIXME:
#TODO: this is not appending correctly. Maybe return as lists and construct structued array here?
#if sacct_out is None:
if data is None:
#sacct_out = self.get_and_process_sacct_data(sacct_str)
XX = self.get_and_process_sacct_data(sacct_str, with_headers=True, as_dict=True)
data = XX['data']
headers = XX['headers']
else:
# will appending work, or do we need to handle the headers, etc. in a more sophisticated way? This is still
# inefficient; there is a bit of copying, etc. that should be optimized later.
sacct_out = numpy.append(sacct_out, self.get_and_process_sacct_data(sacct_str) )
# TODO: No. Appending does not work; something does not promote (or something) needs to be returned as a list or
# collected and copied into the array like in the MPP model.
#
#sacct_out = numpy.append(sacct_out, self.get_and_process_sacct_data(sacct_str) )
data += self.get_and_process_sacct_data(sacct_str, with_headers=False)
#
#
sacct_out = pandas.DataFrame(data, columns=headers).to_records()
#
del data
else:
# TODO: consolidate spp,mpp methods...
with mpp.Pool(n_cpu) as P:
Expand All @@ -1468,7 +1512,7 @@ def load_sacct_data(self, options_str='', format_string='', n_cpu=None, start_en
if not max_rows is None:
kw_prams['max_rows'] = int(numpy.ceil(max_rows/n_cpu))
#
if verbose: print('*** DEBUG: len: ', len(sacct_str))
if verbose: print('*** DEBUG: sacct_str: ', sacct_str )
R += [P.apply_async(self.get_and_process_sacct_data, kwds=kw_prams )]
#
# is this the right syntax?
Expand All @@ -1481,13 +1525,20 @@ def load_sacct_data(self, options_str='', format_string='', n_cpu=None, start_en
k0=0
for k,R in enumerate(Rs):
n = len(R)
#
if verbose: print('*** DEBUG: len(R): ', len(R) )
#
for col in R.dtype.names:
sacct_out[col][k0:k0+n][:]=R[col]
#
k0+=n
#
#

#if not raw_data_out_file is None:
# 'index' column will be unique only to each process, so reset to the whole set. We use it to make sorting
# unambiguous. Note that breaking up the sacct query creates lots of duplicate records. When we also have
# duplicat 'index', we get NoneType sorting errors.
sacct_out['index'] = numpy.arange(len(sacct_out))
#
return sacct_out
#
Expand All @@ -1501,12 +1552,20 @@ def get_and_process_sacct_data(self, sacct_str='', max_rows=None, delim=None, wi
# with_headers==True && as_dict==True: returns dict {'headers':headers, 'data':data}
# with_headers==True && as_dict==False: default config, returns a recordarray()
# with_headers==False: returns just the data as a list.
#
# TODO: large queryies can fail with "results too big" from sacct. Sadly, we can't really anticipate when that will happen.
# It's worth seeing if this is memory limited or an actual SACCT restriction.
# Possibly, but this is ambitions, add better error handling to split up a query into smaller queries until they succeed.
# or just default to shorter activity intervals.
'''
#
if delim is None:
delim = self.delim
if delim is None:
delim = '|'
if as_dict:
# as_dict requires headers; returns {'headers': headers, 'data':data}
with_headers=True
#
# NOTE: instead of scctt_str.split(), we can just use the shell=True option.
sacct_output = subprocess.run(sacct_str.split(), stdout=subprocess.PIPE).stdout.decode().split('\n')
Expand All @@ -1523,8 +1582,11 @@ def get_and_process_sacct_data(self, sacct_str='', max_rows=None, delim=None, wi
#
data = [self.process_row(rw.replace('\"', ''), headers=headers, RH=RH) for k,rw in enumerate(sacct_output[1:]) if (max_rows is None or k<max_rows) and len(rw)>1 ]
#
# NOTER: added this (Very inefficient) step because I though it was failing to sort on the sort fields. It was really failing to sort on an empty set, so I think
# we can skip this.
# exclude any rows with no submit date or jobid. I hate to nest this sort of logic here, but we seem to get these from time to time. I don't see how they can be valid.
data = [rw for rw in data if not (rw[RH['JobID']] is None or rw[RH['JobID']]=='' or rw[RH['Submit']] is None or rw[RH['Submit']])=='' ]
# that said... I think this was supposed to fix something that is not actually happening, so we can probably omit this step.
#data = [rw for rw in data if not (rw[RH['JobID']] is None or rw[RH['JobID']]=='' or rw[RH['Submit']] is None or rw[RH['Submit']])=='' ]
#
# pandas.DataFrame(data, columns=active_headers).to_records()
#
Expand Down Expand Up @@ -1683,6 +1745,9 @@ def add_fig_slide(self, fig_title='Figure', width='.8', fig_path=''):
# newer versions of latex. I also sometimes just replace "_" -> "-". handing this bluntly here may be a problem, since shouldn't we be able to use a forumula
# as a title??? So we shold probably be rigorous and confirm that "_" is (not) wrapped in "$", or handle it on the input side... For now, let's handle the input.
# Anyway, we need this to be smart enough to handle corrected input (aka, modify "_" but not "\_")
#
rel_path = os.path.relpath(fig_path, self.output_path)
#
self.project_tex['slide_{}'.format(len(self.project_tex))] = self.tex_templates['figslide'].format(fig_title=fig_title, width=width, fig_path=fig_path)

#
Expand Down Expand Up @@ -2551,10 +2616,23 @@ def calc_jobs_summary(data=None, verbose=0, n_cpu=None, step_size=1000):
if n_cpu > 1:
# this should sort in place, which could cause problems downstream, so let's use an index:
print('*** DEBUG: data[JobID]: {}, data[Submit]: {}'.format( (None in data['JobID']), (None in data['Submit']) ))
ix_s = numpy.argsort(data, axis=0, order=['JobID', 'Submit'], kind='stable')
#
working_data = data[ix_s]
working_data.sort(axis=0, order=['JobID', 'Submit'], kind='stable')
# Getting some "can't sort None type" errors:
print('*** DEBUG: type(data), len(data): ', type(data), len(data) )
#print('*** DEBUG: nans in JobID?: ', numpy.isnan(data['JobID']).any() )
print('*** DEBUG: nans in Submit?: ', numpy.isnan(data['Submit']).any() )
print('*** DEGUG: Nones in JobID: ', numpy.sum([s is None for s in data['JobID']]))
#
# NOTE: sorts by named order=[] fields, then in order as they appear in dtype, to break any ties. When we have LOTS of
# records, particularly with jobarrays maybe? this can trigger attempts to sort on None I guess, so maybe we can
# specify
#sort_kind='stable'
sort_kind='quicksort'
#ix_s = numpy.argsort(data, axis=0, order=['JobID', 'Submit', 'index'], kind=sort_kind)
#
#working_data = data[ix_s]
working_data = data[:]
working_data.sort(axis=0, order=['JobID', 'Submit'], kind=sort_kind)
#
#ks = numpy.append(numpy.arange(0, len(data), step_size), [len(data)+1] )
ks = numpy.array([*numpy.arange(0, len(data), step_size), len(data)+1])
Expand Down
24 changes: 24 additions & 0 deletions sacct_reports_dev_batcher.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/bin/bash
#
#SBATCH --job-name=sacct_reports_dev
#SBATCH --partition=serc
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=4
#SBATCH --mem-per-cpu=8g
#SBATCH --output=sacct_report_batcher_%j.out
#SBATCH --error=sacct_report_batcher_%j.out
#SBATCH --time=1:00:00
#
module purge
module load anaconda-cees-beta/
module load system texlive/
#
CMD="python SACCT_reports_dev.py"
#
echo "*** DEBUG: Execute script: ${CMD}"
#
$CMD
#
echo "*** DEBUG: Script (hopefully) executed."
#

0 comments on commit d16fd6a

Please sign in to comment.