forked from jeffersonscientific/HPC_analytics
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhpc_lib.py
4370 lines (4297 loc) · 208 KB
/
hpc_lib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# functions, classes, and other helper bits for HPC_analytics notebooks and other libs.
#
import os
import sys
import math
import numpy
import numpy.lib.recfunctions as nrf
import scipy
import scipy.constants
#import matplotlib
import matplotlib.dates as mpd
#import pylab as plt
import datetime as dtm
import pytz
import multiprocessing as mpp
import pickle
import json
import h5py
#
import subprocess
import shlex
import dataclasses
#
#import numba
import pandas
#
# 2023-12-11 yoder:
# TODO: add class sinfo_nodes() (or something like it...), based on this query:
# sinfo --Node --partition=serc --Format=nodelist,nodeaddr,available,cpus,cpusstate,memory,gres,gresused:35,statecomplete,time
# We can use this to do a deeper drill-down to estimate available resources.
#
# TODO: so... do we keep our plotting routines separate, or do we just make sure we use... uhh? (double check this)
# matplotlib.use('Agg')
#. load this first, so on an HPC primary
import pylab as plt
#
day_2_sec=24.*3600.
GB=1024.**3.
#
# NOTE: matplotlib.dates changed the standard reference epoch. This is mainly to mitigate roundin errors in modern
# dates. You can use set_/get_epoch() to read and modify the epoch, but there are rules about doing this
# before figures are plotted, etc. It's typical affect is cosmetic, so differencing dates, elapsed times, etc.
# will not change (much). Looks like the best way to handle this is to augment the numerical dates.
old_mpd_epoch = '0000-12-31T00:00:00'
new_mpd_epoch = '1970-01-01T00:00:00'
dt_mpd_epoch = 719163.0
#
# TODO: move group_ids, and local-specific things like that to site-specific modules or data files.
#
# GIT NOTE: Mazama is gone; no need for mazama_groups_ids
#
# TODO: LOTS more serc_user ids!
# Generally, I think we don't use these any longer. We get these natively from the data.
#serc_user_ids = ['biondo', 'beroza', 'sklemp', 'harrisgp', 'gorelick', 'edunham', 'sagraham', 'omramom', 'aditis2', 'oneillm', 'jcaers', 'mukerji', 'glucia', 'tchelepi', 'lou', 'segall', 'horne', 'leift']
# These are used for some reporting code, that should be moved
user_exclusions = ['myoder96', 'dennis']
group_exclusions = ['modules']
#
def str2date(dt_str, verbose=0):
try:
return mpd.num2date(mpd.datestr2num(dt_str))
except:
if verbose:
print('error date conversion: ', dt_str)
#raise Exception('date conversion exception')
#
return None
#
def str2date_num(dt_str, verbose=0):
try:
return mpd.datestr2num(dt_str)
except:
if verbose:
print('error in date-number conversion: ', dt_str)
return None
#
def simple_date_string(dtm, delim='-'):
# TODO: replace str() call with a proper .encode(), maybe .encode('utf-8') ?
return delim.join([str(x) for x in [dtm.year, dtm.month, dtm.day]])
def datetime_to_SLURM_datestring(dtm, delim='-'):
zf = lambda x:str(x).zfill(2)
#
return '{}T{}:{}:{}'.format(delim.join([str(x) for x in [dtm.year, zf(dtm.month), zf(dtm.day)]]), zf(dtm.hour), zf(dtm.minute), zf(dtm.second))
#
def elapsed_time_2_day(tm_in, verbose=0):
#
if tm_in in ( 'Partition_Limit', 'UNLIMITED' ) or tm_in is None:
#if tm_in.lower() in ( 'partition_limit', 'unlimited' ):
return 0.
#
if tm_in is None:
return 0.
#
return elapsed_time_2_sec(tm_in=tm_in, verbose=verbose)/(day_2_sec)
#
def elapsed_time_2_sec(tm_in, verbose=0):
#
# TODO: really??? why not just post the PL value?
if tm_in in ( 'Partition_Limit', 'UNLIMITED', 'unlimited', 'infinite' ) or tm_in is None:
#if tm_in.lower() in ( 'partition_limit', 'unlimited' ):
return 0.
#
days, tm = ([0,0] + list(tm_in.split('-')))[-2:]
#
if tm==0:
return 0.
days=float(days)
#
if verbose:
try:
h,m,s = numpy.append(numpy.zeros(3), numpy.array(tm.split(':')).astype(float))[-3:]
except:
print('*** AHHH!!! error! ', tm, tm_in)
raise Exception("broke on elapsed time.")
else:
h,m,s = numpy.append(numpy.zeros(3), numpy.array(tm.split(':')).astype(float))[-3:]
#
return numpy.dot([day_2_sec, 3600., 60., 1.], [float(x) for x in (days, h, m, s)])
#return float(days)*day_2_sec + float(h)*3600. + float(m)*60. + float(s)
#
# TO_DO: can we vectorize? might have to expand/split() first, then to the math parts.
def elapsed_time_2_sec_v(tm_in, verbose=0):
#
tm_in = numpy.atleast_1d(tm_in)
#
tm_in = nm_in[numpy.logical_or(tm_in=='Partition_Limit', tm_in=='UNLIMITED')]
# TODO: really??? why not just post the PL value?
#if tm_in in ( 'Partition_Limit', 'UNLIMITED' ):
#if tm_in.lower() in ( 'partition_limit', 'unlimited' ):
# return None
#
days, tm = ([0,0] + list(tm_in.split('-')))[-2:]
#
if tm==0:
return None
days=float(days)
#
if verbose:
try:
h,m,s = numpy.append(numpy.zeros(3), numpy.array(tm.split(':')).astype(float))[-3:]
except:
print('*** AHHH!!! error! ', tm, tm_in)
raise Exception("broke on elapsed time.")
else:
h,m,s = numpy.append(numpy.zeros(3), numpy.array(tm.split(':')).astype(float))[-3:]
#
return numpy.dot([day_2_sec, 3600., 60., 1.], [float(x) for x in (days, h, m, s)])
#return float(days)*day_2_sec + float(h)*3600. + float(m)*60. + float(s)
#
kmg_vals = {'k':1E3, 'm':1E6, 'g':1E9, 't':1E12}
def kmg_to_num(x):
'''
# convert something like 25k to 25000
# for now, assume xxxU format.
'''
#
#if isinstance(x, (float, int)):
# return x
try:
return float(x)
except:
pass
#
if x is None or x=='':
return None
#
try:
return float(x[:-1]) * kmg_vals[x[-1].lower()]
except:
return None
def space_replacer(s, space_to='_', space_ord=32):
# NOTE: default is to replace space with '_', but can be abstracted. Let's also handle bytearrays...
# NOTE: we'll put in a hack for bytes array, but we should be getting the ecoding from the input.
# for now, let's not do this. bytes vs string can be handled by the calling function. If we handle it here, we
# restrict it to a sort of hacked handling.
#if isinstance(s,bytes):
# return s.replace(chr(space_ord).encode(), space_to.encode())
#
#if not isinstance(s,str):
# s = str(s)
return str(s).replace(chr(space_ord), space_to)
#
#
dtm_handler_default = str2date_num
# TODO: write this as a class, inherit from dict, class(dict):
default_SLURM_types_dict = {'User':str, 'JobID':str, 'JobName':str, 'Partition':str, 'State':str, 'JobID_parent':str,
'Timelimit':elapsed_time_2_day,
'Start':dtm_handler_default, 'End':dtm_handler_default, 'Submit':dtm_handler_default,
'Eligible':dtm_handler_default,
'Elapsed':elapsed_time_2_day, 'MaxRSS':kmg_to_num, 'AveRSS':kmg_to_num, 'ReqMem':kmg_to_num,
'MaxVMSize':kmg_to_num,'AveVMSize':kmg_to_num, 'NNodes':int, 'NCPUS':int,
'MinCPU':str, 'SystemCPU':elapsed_time_2_day, 'UserCPU':elapsed_time_2_day, 'TotalCPU':elapsed_time_2_day,
'NTasks':int,'MaxDiskWrite':kmg_to_num, 'AveDiskWrite':kmg_to_num,
'MaxDiskRead':kmg_to_num, 'AveDiskRead':kmg_to_num
}
#
# yoder, 2022-08-11: updating default_SLURM_types_dict, mostly to facilitate SQUEUE calls. separating it out
# (rather than just directly add in the new cols) for posterity. also, adding .lower() and .upper() duplicate
# entries to simplify searching.
default_SLURM_types_dict.update({ky:int for ky in ['NODES', 'CPUS', 'TASKS', 'numnodes', 'numtasks', 'numcpus', 'gpus', 'GPUS', 'MEMORY']})
default_SLURM_types_dict.update({'TIMELEFT':elapsed_time_2_day, 'TIMEUSED':elapsed_time_2_day})
dst_ul = {ky.lower():val for ky,val in default_SLURM_types_dict.items()}
dst_ul.update( {ky.upper():val for ky,val in default_SLURM_types_dict.items()} )
default_SLURM_types_dict.update(dst_ul)
del dst_ul
# TODO: this addition to default_SLURM_types_dict separated for diagnistic/dev purposes. Integrate it directly into main def.
default_SLURM_types_dict.update({'Account':str})
#
def running_mean(X, n=10):
return (numpy.cumsum(X)[n:] - numpy.cumsum(X)[:-n])/n
#
class SACCT_data_handler(object):
#
# TODO: write GRES (gpu, etc. ) handler functions.
# TODO: add real-time options? to get sacct data in real time; we can still stash it into a file if we need to, but
# more likely a varriation on load_sacct_data() that executes the sacct querries, instead of loading a file.
dtm_handler_default = str2date_num
#
default_types_dict = default_SLURM_types_dict
#
time_units_labels={'hour':'hours', 'hours':'hours', 'hr':'hours', 'hrs':'hours', 'min':'minutes','minute':'minutes', 'minutes':'minutes', 'sec':'seconds', 'secs':'seconds', 'second':'seconds', 'seconds':'seconds'}
#
time_units_vals={'days':1., 'hours':24., 'minutes':24.*60., 'seconds':24.*3600.}
#
# add a dict to capture input(like) paremeters. These can then be easily added to hdf5 (and similar) storage
# containers.
attributes={}
#
#
def __init__(self, data_file_name=None, delim='|', max_rows=None, types_dict=None, chunk_size=1000, n_cpu=None,
n_points_usage=1000, verbose=0, keep_raw_data=False, h5out_file=None, qs_default=[.25,.5,.75,.9], **kwargs):
'''
# handler object for sacct data.
#
#@keep_raw_data: we compute a jobs_summary[] table, which probably needs to be an HDF5 object, as per memory
# requirements, But for starters, let's optinally dump the raw data. we shouldn't actually need it for
# anything we're doing right now. if it comes to it, maybe we dump it as an HDF5 or actually build a DB of some sort.
'''
# TODO: reorganize a bit. Consolidate load_data() functionality into... a function, which we can rewrite
# for each sub-class. Then we can either similarly offload the calc_summaries() type work to a separate
# function or leave them in __init__() where they probably belong.
#
n_cpu = int(n_cpu or 1)
n_cpu = int(n_cpu)
#
if types_dict is None:
#
# handle all of the dates the same way. datetimes or numbers? numbers are easier for most
#. purposes. Also, they don't morph unpredictably when transfered from one container to another.
#. Dates are always a pain...
#
types_dict=self.default_types_dict
#
self.__dict__.update({key:val for key,val in locals().items() if not key in ['self', '__class__']})
self.attributes.update({key:val for key,val in locals().items() if not key in ['self', '__class__']})
#
self.load_data()
#
if h5out_file is None:
h5out_file='sacct_output.h5'
self.h5out_file = h5out_file
#
self.headers = self.data.dtype.names
# row-headers index:
self.RH = {h:k for k,h in enumerate(self.headers)}
self.calc_summaries()
#
self.attributes['h5out_file'] = h5out_file
self.attributes['RH'] = self.RH
#
# NOTE: matplotlib.dates changed its default reference epoch, for numerical date conversion, from
# 0000-12-31 to 1970-1-1. switching the epoch can be tricky; different versions of mpd can give
# weird years. let's handle it here, assuming modern dates:
# dt_epoch = 0.
# yr_test = mpd.num2date(SACCT_obj.jobs_summary['Start'][0]).year
# if yr_test > 3000:
# dt_epoch = -dt_mpd_epoch
# if yr_tesst < 1000:
# dt_epoch = dt_mod_epochj
# what does this do? I think we can get rid of it...
#dt_mod_epoch = self.compute_mpd_epoch
#
self.dt_mpd_epoch = self.compute_mpd_epoch_dt()
#
def __len__(self, *args, **kwargs):
return len(self.jobs_summary)
def __getitem__(self, *args, **kwargs):
return self.jobs_summary.__getitem__(*args, **kwargs)
def __setitem__(self, *args, **kwargs):
return self.jobs_summary.__setitem__(*args, **kwargs)
#
@property
def dtype(self):
if 'jobs_summary' in self.__dict__.keys():
return self.jobs_summary.dtype
else:
return None
#
def get_NGPUs(self, jobs_summary=None, alloc_tres=None):
if alloc_tres is None:
if jobs_summary is None:
jobs_summary = self.jobs_summary
alloc_tres = jobs_summary['AllocTRES'].astype(str)
#
else:
alloc_tres = numpy.array(alloc_tres).astype(str)
#
#return numpy.array([float(s.split('gpu=')[1].split(',')[0]) if 'gpu=' in s else 0.
# for s in jobs_summary['AllocTRES'].astype(str)])
#
# NOTE: if this blows up, it might be because I made a couple minor type-handling changes on 25 July 2022 (yoder):
return numpy.array([int(s.split('gpu=')[1].split(',')[0]) if 'gpu=' in s else 0
for s in alloc_tres]).astype(int)
#
def compute_mpd_epoch_dt(self, test_col='Submit', test_index=0, yr_upper=3000, yr_lower=1000):
#
# catch zero-length data exception:
if len(self) == 0:
return 0.
#print('*** DEBUG: ', len(self.jobs_summary), len(self[test_col]))
#print('*** DEBUG: ', self[test_col][0:10])
test_date = None
while test_date is None:
x = self[test_col][test_index]
if not isinstance(x,float) or isinstance(x,int) or isinstance(x,dtm.datetime):
test_index += 1
continue
test_date = self.jobs_summary[test_col][test_index]
if test_date is None:
test_date = dtm.datetime.now(pytz.timezone('UTC'))
#
#
if isinstance(test_date, dtm.datetime):
test_date = mpd.date2num(test_date)
# dt_epoch = 0.
# yr_test = mpd.num2date(self.jobs_summary[test_col][test_index]).year
# if yr_test > yr_upper:
# dt_epoch = -dt_mpd_epoch
# if yr_test < yr_lower:
# dt_epoch = dt_mod_epoch
#
#return dt_epoch
return compute_mpd_epoch_dt(test_date, yr_upper=yr_upper, yr_lower=yr_lower)
#
def calc_summaries(self, n_cpu=None, chunk_size=None, t_min=None, t_max=None):
#
n_cpu = n_cpu or self.n_cpu
n_points_usage = self.n_points_usage
chunk_size = chunk_size or self.chunk_size
#
# TODO: add start_date, end_date to parent class. Also then change all reffs of t_min/_max to start_date,
# end_date to simplify the syntax.
# add t_min, t_max (start/end dates). This is a little bit clunky and bass-ackwards, since it is actually
# facilitating compatibility with chile subclasses, but in a way that is harmless. Also, might be a good idea to
# handle t_min, t_max in general. These are the min/max times in the timeseries. By default, they are inferred from
# the data, but because SACCT yields all jobs in any state during the start/end times, you can actually get start/end
# times outside your query window.
if t_min is None and 'start_date' in self.__dict__.keys():
t_min = self.start_date
#
if t_max is None and 'end_date' in self.__dict__.keys():
t_max = self.end_date
#
#
# t_min, t_max need to be numeric. most likely, we'll be converting
# from a datetime type, but this should be better handled.
# TODO: handle data type better...
if not (isinstance(t_min, float) or isinstance(t_min, int)):
t_min = mpd.date2num(t_min)
if not (isinstance(t_max, float) or isinstance(t_max, int)):
t_max = mpd.date2num(t_max)
#
self.jobs_summary = self.calc_jobs_summary()
if not self.keep_raw_data:
del self.data
if len(self.jobs_summary)==0:
return None
#
self.cpu_usage = self.active_jobs_cpu(n_cpu=n_cpu, t_min=t_min, t_max=t_max, mpp_chunksize=min(int(len(self.jobs_summary)/n_cpu), chunk_size) )
self.weekly_hours = self.get_cpu_hours(bin_size=7, n_points=n_points_usage, t_min=t_min, t_max=t_max, n_cpu=n_cpu)
self.daily_hours = self.get_cpu_hours(bin_size=1, n_points=n_points_usage, t_min=t_min, t_max=t_max, n_cpu=n_cpu)
#
return len(self.jobs_summary)
#
def load_data(self, data_file_name=None):
#
data_file_name = (data_file_name or self.data_file_name)
#
# especially for dev, allow to pass the recarray object itself...
if isinstance(data_file_name, str):
# if on the off-chance we are creating hdf5, assume it is properly configured will just work:
if os.path.splitext(data_file_name)[1] == 'h5':
with hdf5.File(data_file_name, 'r') as fin:
self.data = fin['data'][:]
else:
self.data = self.load_sacct_data()
#
# If we are given a data file name as an input, and there is no hdf5 output name, create one from the data file input.
# otherwise, we'll use something more generic (see below).
if h5out_file is None:
h5out_file = '{}.h5'.format(os.path.splitext(data_file_name)[0])
else:
self.data = data_file_name
#self.data = self.data_df.values.to_list()
#
#
def write_hdf5(self, h5out_file=None, append=False, meta_data=None):
h5out_file = h5out_file or self.h5out_file
if h5out_file is None:
h5out_file = 'sacct_writehdf5_output.h5'
#
h5_mode = 'a'
if not append:
h5_mode = 'w'
with h5py.File(h5out_file, 'w') as fout:
# over-write this file
pass
#
if 'data' in self.__dict__.keys():
array_to_hdf5_dataset(input_array=self.data, dataset_name='data', output_fname=h5out_file, h5_mode='a')
#
# TODO: consider that we really don't need the computed data sets any longer. computing cpu_Usage, etc. is just not that hard to do. not hurting anything
# but not necessary either.
array_to_hdf5_dataset(input_array=self.jobs_summary, dataset_name='jobs_summary', output_fname=h5out_file, h5_mode='a')
array_to_hdf5_dataset(input_array=self.cpu_usage, dataset_name='cpu_usage', output_fname=h5out_file, h5_mode='a')
array_to_hdf5_dataset(input_array=self.weekly_hours, dataset_name='weekly_hours', output_fname=h5out_file, h5_mode='a')
array_to_hdf5_dataset(input_array=self.daily_hours, dataset_name='daily_hours', output_fname=h5out_file, h5_mode='a')
#
# # TODO: also write metadata/attributes:
# # you'd think the best thing to do would be to make a datset for inptus, but then we have to structure
# that dataset. I think we can just store variables.
with h5py.File(h5out_file, 'a') as fout:
#for ky in ('start_date', 'end_date'):
# fout.attrs.create(ky, simple_date_string(self.__dict__[ky]))
#
for ky, f in [('partition', str), ('group',str), ('start_date', simple_date_string), ('end_date', simple_date_string)]:
#fout.attrs.create(ky, f(self.__dict__.get(ky,None)))
if ky in self.__dict__.keys():
fout.attrs.create(ky, f(self.__dict__[ky]))
return None
#
def calc_jobs_summary(self, data=None, verbose=None, n_cpu=None, step_size=100000):
'''
# compute jobs summary from (raw)data
# @n_cpu: number of comutational elements (CPUs or processors)
# @step_size: numer of rows per thread (-like chunks). We'll use a Pool() (probably -- or something) to do the computation. use this
# variable to break up the job into n>n_cpu jobs, so we don't get a "too big to pickle" error. we'll also need to put in a little
# intelligence to e sure the steps are not too small, all distinct job_ids are represented, etc.
'''
#
if data is None: data = self.data
if verbose is None: verbose=self.verbose
#
# I think the "nonelike" syntax is actually favorable here:
n_cpu = (n_cpu or self.n_cpu)
n_cpu = (n_cpu or 1)
n_cpu = int(n_cpu)
#
if verbose:
print('*** calc_jobs_summary: with prams: len(data)={}, verbose={}, n_cpu={}, step_size={}'.format(len(data), verbose, n_cpu, step_size))
#
# NOTE: moved this out of Class to facilitate more memory efficient MPP.
return calc_jobs_summary(data=data, verbose=verbose, n_cpu=n_cpu, step_size=step_size)
#
# GIT NOTE: deleted the depricated function calc_jobs_summary_depricated(self,...) which is the in-class (logic included...) version of
# calc_jobs_summary()
# commit d16fd6a941c769fbce2da54e067686a1ca0b6c2f
#
#@numba.jit
def load_sacct_data(self, data_file_name=None, delim=None, verbose=1, max_rows=None, chunk_size=None, n_cpu=None):
# TODO: this should probably be a class of its own. Note that it depends on a couple of class-scope functions in
# ways that are not really class hierarchically compatible.
if data_file_name is None:
data_file_name = self.data_file_name
if delim is None:
delim = self.delim
max_rows = max_rows or self.max_rows
chunk_size = chunk_size or self.chunk_size
chunk_size = chunk_size or 100000
#
n_cpu = n_cpu or self.n_cpu
# mpp.cpu_count() is dangerous for HPC environments. So set to 1? Maybe hedge and guess 2 or 4 (we can hyperthread a bit)?
#n_cpu = n_cpu or mpp.cpu_count()
n_cpu = n_cpu or 1
#
if verbose:
print('** load_sact_data(), max_rows={}'.format(max_rows))
#
with open(data_file_name, 'r') as fin:
headers_rw = fin.readline()
if verbose:
print('*** headers_rw: ', headers_rw)
#
headers = headers_rw[:-1].split(delim)[:-1] + ['JobID_parent']
self.headers = headers
#
# make a row-handler dictionary, until we have proper indices.
RH = {h:k for k,h in enumerate(headers)}
self.RH = RH
#
if verbose:
print('*** load data:: headers: ', headers)
active_headers=headers
# TODO: it would be a nice performance boost to only work through a subset of the headers...
#active_headers = [cl for cl in headers if cl in types_dict.keys()]
#
# TODO: reevaluate readlines() vs for rw in...
#
# eventually, we might need to batch this.
if n_cpu > 1:
# TODO: use a context manager syntax instead of open(), close(), etc.
P = mpp.Pool(n_cpu)
#self.headers = headers
#
# TODO: how do we make this work with mpp and max_rows limit? This could be less of a problem on newer
# HPC systems.
# see also: https://stackoverflow.com/questions/16542261/python-multiprocessing-pool-with-map-async
# for the use of P.map(), using a context manager and functools.partial()
# TODO: use an out-of-class variation of process_row() (or maybe the whole function), or modify the in-class so MPP does not need
# to pickle over the whole object, which breaks for large data sets. tentatively, use apply_async() and just pass
# headers, types_dict, and RH.
# def process_sacct_row(rw, delim='\t', headers=None, types_dict={}, RH={})
# TODO: to enforce maxrows, why not just use fin.read() (and a few more small changes)?
# Could be that we're just running out of memory on Mazama, or we're trying to benefit from parallel file
# read.
if max_rows is None:
results = P.map_async(self.process_row, fin, chunksize=chunk_size)
else:
# This might run into memory problems, but we'll take our chances for now.
#X = [rw for k,rw in enumerate(fin) if k<max_rows]
# it's a bit faster to chunk these reads together (but probably not a major factor)
# Note that using readlines() will usually give > max_rows (one iteration's overage)
data_subset = []
while len(data_subset)<max_rows:
data_subset += fin.readlines(chunk_size)
#
results = P.map_async(self.process_row, data_subset[0:max_rows], chunksize=chunk_size)
#
P.close()
P.join()
data = results.get()
#
del results
del P
else:
#
# TODO: consider chunking this to improve speed (by making better use of cache?).
data = [self.process_row(rw) for k,rw in enumerate(fin) if (max_rows is None or k<max_rows) ]
#
#
# del all_the_data, rw, k, cvol, vl
#
if verbose:
print('** load_sacct_data::len: ', len(data))
self.raw_data_len = len(data)
#
# TODO: write a to_records() handler, so we don't need to use stupid PANDAS
return pandas.DataFrame(data, columns=active_headers).to_records()
#
#@numba.jit
def process_row(self, rw, headers=None, RH=None, delim=None):
# use this with MPP processing:
# ... but TODO: it looks like this is 1) inefficient and 2) breaks with large data inputs because I think it pickles the entire
# class object... so we need to move the MPP object out of class.
#
# get rid of spaces. Values like State="CANCELED by 12345" seem to generate errors under some
# circumstances?
rw = rw.replace(chr(32), '_')
#
headers = (headers or self.headers)
RH = (RH or self.RH)
if delim is None:
delim = self.delim
# use this for MPP processing:
rws = rw.split(delim)
#print('*** DEBUG lens: ', len(rws), len(headers))
#return [None if vl=='' else self.types_dict.get(col,str)(vl)
# for k,(col,vl) in enumerate(zip(self.headers, rw.split(self.delim)[:-1]))]
# NOTE: last entry in the row is JobID_parent.
#
# DEBUG:
# print('*** rw: ', rw)
# print('*** hdrs: ', headers)
# print('*** RH: ', RH)
#
try:
return [None if vl=='' else self.types_dict.get(col,str)(vl)
for k,(col,vl) in enumerate(zip(headers, rws[:-1]))] + [rws[RH['JobID']].split('.')[0]]
except:
print('*** EXCEPTION! with row: ', rw)
print('*** HEADERS: ', headers)
print('*** RH: ', RH)
raise Exception()
#
# GIT Note: Deleting the depricated get_cpu_hours_2() function here. It can be recovered from commits earlier than
# 31 Aug 2020.
#
# GIT Note: Deleting depricated get_cpu_hours_depricated(). It can be restored from the current commit on 21 Sept 2020:
# commit 44d9c5285324859d55c7878b6d0f13bbe6576be1 (HEAD -> parallel_summary, origin/parallel_summary)
# Author: Mark Yoder <[email protected]>
#Date: Mon Sep 21 17:29:08 2020 -0700
def get_cpu_hours(self, n_points=10000, bin_size=7., IX=None, t_min=None, t_max=None, jobs_summary=None, verbose=False,
n_cpu=None, step_size=1000):
'''
# TODO: this in-class wrapper still needs to be tested, but since it's just a pass-through, it should work.
#
# a third or 4th shot at this, to get the parallelization right... again. Consider making it more procedural so we can
# moved the main code out of Class scope for dev.
# parallelize by splitting up jobs_summary and computing each subset over the full time-series, which we can compute,
# not pass. Since we're passing jobs_summary, maybe we don't pass the index IX any longer??? maybe we maintain that for
# legacy.
'''
n_cpu = n_cpu or self.n_cpu
n_cpu = n_cpu or 1
if jobs_summary is None:
jobs_summary = self.jobs_summary
if not IX is None:
jobs_summary = jobs_summary[IX]
#
return get_cpu_hours(n_points=n_points, bin_size=bin_size, t_min=t_min, t_max=t_max, jobs_summary=jobs_summary, verbose=verbose, n_cpu=n_cpu, step_size=step_size)
#
# GIT NOTE: deleting get_cpu_hours_depricated()
# Current commit: commit d0872dbf00fd493fa4937d4b9030f9b5a927e21d
#
def active_jobs_cpu(self, n_points=5000, ix=None, bin_size=None, t_min=None, t_max=None, t_now=None, n_cpu=None, jobs_summary=None, verbose=None, mpp_chunksize=None, nan_to=0., NCPUs=None):
'''
##
# Use out-of-class procedural function to improve parallelization (or at least development of parallel code).
# this class function will just be a wrapper and will retain some of the default variable handlers.
#
# @n_points: number of points in returned time series.
# @bin_size: size of bins. This will override n_points
# @t_min: start time (aka, bin phase).
# @ix: an index, aka user=my_user
'''
#
if verbose is None:
verbose = self.verbose
#
mpp_chunksize = mpp_chunksize or self.chunk_size
n_cpu = n_cpu or self.n_cpu
#
if (not ix is None) and len(ix)==0:
#return numpy.array([(0.),(0.),(0.)], dtype=[('time', '>f8'),
#return numpy.array([], dtype=[('time', '>f8'),('N_jobs', '>f8'),('N_cpu', '>f8')])
return null_return()
#
if jobs_summary is None:
jobs_summary=self.jobs_summary
if not ix is None:
jobs_summary=jobs_summary[ix]
#
return active_jobs_cpu(n_points=n_points, bin_size=bin_size, t_min=t_min, t_max=t_max, t_now=t_now, n_cpu=n_cpu, jobs_summary=jobs_summary,
verbose=verbose, mpp_chunksize=mpp_chunksize, nan_to=nan_to, NCPUs=NCPUs)
#
# GIT Note (commit d16fd6a941c769fbce2da54e067686a1ca0b6c2f): Removing active_jobs_cpu_DEPRICATED(self,...).
# functional code moved out of class to facilitate development (and MPP mabye??)
#
# NOTE: some trial and error figuring out how to offload process_ajc_row() to facilitate parallelization.
# Ultimately, MPP gains were significant but costly (did not generally parallelize well), and a PAIN to maintain.
# on Sherlock, we seem to have sufficient performance to process SPP, so some of this can mabye be simplified?
# GIT note (commit d16fd6a941c769fbce2da54e067686a1ca0b6c2f): removing process_ajc_row_2(), as per deprication.
#@numba.jit
#def process_ajc_row(self,j, t_start, t_end, t, jobs_summary):
def process_ajc_row(self, t):
'''
# Active Jobs CPU worker function...
'''
# DEPRICATION: moving this outside the class definition to better facilitate MPP and development.
#print('*** DEBUG: t:: {}'.format(t) )
ix_t = numpy.where(numpy.logical_and(self.t_start<=t, self.t_end>t))[0]
#output[['N_jobs', 'N_cpu']][j] = len(ix_t), numpy.sum(jobs_summary['NCPUS'][ix_t])
return len(ix_t), numpy.sum(self.ajc_js['NCPUS'][ix_t])
#
def get_wait_stats(self, qs=None):
# TODO: revise this to use a structured array, not a recarray.
dtype=[('ncpus', '>i8'), ('mean', '>f8'), ('median', '>f8'), ('stdev', '>f8'), ('min', '>f8'), ('max', '>f8')]
if not qs is None:
for k,q in enumerate(qs):
dtype += [(f'q{k+1}', '>f8')]
wait_stats = numpy.core.records.fromarrays(numpy.zeros((len(self.jobs_summary), len(dtype))).T,
dtype=dtype)
wait_stats.qs=numpy.array(qs)
#
# NOTE: jobs that have not started will be blank? None? NaN? Let's see if we can't just pick up NaNs on the flipside.
delta_ts = self.jobs_summary['Start'] - self.jobs_summary['Submit']
ix_started = numpy.invert(numpy.isnan(delta_ts))
delta_ts = delta_ts[ix_started]
#
for j,k in enumerate(sorted(numpy.unique(self.jobs_summary['NCPUS']) ) ):
#
x_prime = delta_ts[numpy.logical_and(self.jobs_summary['NCPUS'][ix_started]==k, delta_ts>=0.)]
#wait_stats[k-1]=[[k, numpy.mean(x_prime), numpy.median(x_prime), numpy.std(x_prime),
# numpy.min(x_prime), numpy.max(x_prime)]]
#
#wait_stats[k-1][0] = k
wait_stats[j][0] = k
if len(x_prime)==0:
continue
for l,f in zip(range(1, 6), [numpy.mean, numpy.median, numpy.std, numpy.min, numpy.max]):
#
wait_stats[j][l]=f(x_prime)
#
# I NEVER remember the right syntax to set subsets of recarreays or structured arrays...
#quantiles = numpy.quantiles(x_prime, qs)
#print('*** DEBUG qs: {}, nanlen: {}'.format(qs,numpy.sum(numpy.isnan(x_prime))))
#
if not qs is None:
for k, (q_s, q_v) in enumerate(zip(qs, numpy.quantile(x_prime, qs))):
wait_stats[[f'q{k+1}']][j] = q_v
#
return wait_stats
#
def submit_wait_distribution(self, n_cpu=1):
# TODO: add input index?
#
n_cpu = numpu.atleast_1d(n_cpu)
#
X = self.jobs_summary['Start'] - self.jobs_summary['Submit']
#
return numpy.array([n_cpu, [X[self.jobs_summary['n_cpu']==n] for n in n_cpu]]).T
#
def get_wait_times_per_ncpu(self, n_cpu):
# get wait-times for n_cpu cpus. to be used independently or as an MPP worker function.
#
ix = self.jobs_summary['NCPUS']==n_cpu
return self.jobs_summary['Start'][ix] - self.jobs_summary['Submit'][ix]
#
def get_active_cpus_layer_cake(self, jobs_summary=None, layer_field='Partition', layers=None, n_points=5000, bin_size=None, t_min=None, t_max=None, t_now=None, n_cpu=None, verbose=False, mpp_chunksize=10000, nan_to=0., NCPUs=None):
# (n_points=5000, bin_size=None, t_min=None, t_max=None, t_now=None, n_cpu=None, jobs_summary=None, verbose=None, mpp_chunksize=10000, nan_to=0.
if jobs_summary is None:
jobs_summary = self.jobs_summary
if NCPUs is None or NCPUs=='':
NCPUs = jobs_summary['NCPUS']
#
if t_now is None:
t_now = mpd.date2num(dtm.datetime.now() )
if t_min is None:
t_min = numpy.nanmin([jobs_summary['Start'], jobs_summary['End']])
#t_min = numpy.nanmin( [t_now, t_min] )
#
if t_max is None:
t_max = numpy.nanmax([jobs_summary['Start'], jobs_summary['End']])
if verbose:
print(f'*** DEBUG t_now: {t_now}, t_max: {t_max}')
t_max = numpy.nanmin([t_now, t_max])
#
if layers is None:
layers = [ky.decode() if hasattr(ky,'decode') else ky for ky in list(set(jobs_summary[layer_field]))]
layers = {ky:{} for ky in layers}
if verbose:
if verbose:
print('*** ', layers)
#
dtype_cpuh = [(s, '>f8') for s in ['time'] + list(layers.keys())]
dtype_jobs = dtype_cpuh
#
for j, ky in enumerate(layers.keys()):
# TODO: better to handle b'' vs '' via ix.astype(str), or similar approach.
ix = jobs_summary[layer_field] == (ky.encode() if hasattr(jobs_summary[layer_field][0], 'decode') else ky)
XX = self.active_jobs_cpu(n_points=n_points, bin_size=bin_size, t_min=t_min, t_max=t_max, t_now=t_now, n_cpu=n_cpu,
jobs_summary=jobs_summary[ix], verbose=verbose, mpp_chunksize=mpp_chunksize, nan_to=nan_to,
NCPUs=NCPUs[ix])
#
if verbose:
print('*** DEBUG: {}:: {}'.format(j, len(XX['N_cpu'])))
#print(f'*** DEBUG: {k}:: {len(XX['N_cpu'])}')
#
# create output_ arrays for j==0 (or equivalently output_cpus and output_jobs do not exist...)
if j==0:
output_cpus = numpy.empty( (len(XX['N_cpu']), ), dtype=dtype_cpuh )
output_jobs = numpy.empty( (len(XX['N_jobs']), ), dtype=dtype_jobs )
#
output_cpus[ky] = XX['N_cpu'][:]
output_jobs[ky] = XX['N_jobs'][:]
#
output_cpus['time'] = XX['time']
output_jobs['time'] = XX['time']
#
return {'N_cpu': output_cpus, 'N_jobs': output_jobs}
#
def get_cpu_hours_layer_cake(self, jobs_summary=None, layer_field='Partition', layers=None, bin_size=7, n_points=5000,
t_min=None, t_max=None, t_now=None, verbose=0):
'''
# revised version of gchlc with a more intuitive output.
#
'''
#
if jobs_summary is None:
jobs_summary = self.jobs_summary
#
# t_min, t_max: These constraints need to be handled up-front, from the whole data set. Subsets will likely return
# different t_min/t_max.
if t_now is None:
t_now = mpd.date2num(dtm.datetime.now())
if t_min is None:
t_min = numpy.nanmin([jobs_summary['Start'], jobs_summary['End']])
#t_min = numpy.nanmin([t_min, t_now])
#
if t_max is None:
t_max = numpy.nanmax([jobs_summary['Start'], jobs_summary['End']])
#
# if data set contains unfinished jobs, set t_now to now. this probaby should be revisited... maybe makes more sense
# to use max(t_end) as the default t_now... Another point for this, it's a lot easier to set t_max=now() than
# max(in_data_set). Doing this can truncate active jobs, but will avoid artifacts.
#if None in jobs_summary['End']:
# t_max = numpy.nanmax([t_max, t_now])
#
if layers is None:
layers = [ky.decode() if hasattr(ky,'decode') else ky for ky in list(set(jobs_summary[layer_field]))]
layers = {ky:{} for ky in layers}
if verbose:
print('*** ', layers)
#
dtype_cpuh = [(s, '>f8') for s in ['time'] + list(layers.keys())]
dtype_jobs = dtype_cpuh
output_cpuh = numpy.empty( (n_points, ), dtype=dtype_cpuh )
output_jobs = numpy.empty( (n_points, ), dtype=dtype_jobs )
output_elapsed = {s:0 for s in layers}
#
for j, ky in enumerate(layers.keys()):
if verbose:
print(f'*** ky: {ky} // {ky.encode()}')
#
# NOTE: should not need the en/decode() logic any more; that should be fixed in the HDF5 subclase, but it won't hurt...
ix = jobs_summary[layer_field] == (ky.encode() if hasattr(jobs_summary[layer_field][0], 'decode') else ky)
XX = self.get_cpu_hours(bin_size=bin_size, n_points=n_points, t_min=t_min, t_max=t_max,
jobs_summary=jobs_summary[ix])
output_cpuh[ky] = XX['cpu_hours'][:]
output_jobs[ky] = XX['N_jobs'][:]
output_elapsed[ky] = 24.*numpy.sum(jobs_summary['Elapsed'][ix]*jobs_summary['NCPUS'][ix])
#
output_cpuh['time'] = XX['time']
output_jobs['time'] = XX['time']
#
return {'cpu_hours': output_cpuh, 'jobs':output_jobs, 'elapsed':output_elapsed}
# REMOVED:
#def get_cpu_hours_layer_cake_depricated(self, jobs_summary=None, layer_field='Partition', layers=None, bin_size=7, n_points=5000,
# t_min=None, t_max=None, verbose=0):
#
####
# GPU Layer Cake Functions
#####################
def report_activegpus_layercake_and_CDFs(self, group_by='Group', agpu_layer_cake=None, jobs_summary=None, ave_len_days=5., qs=[.5, .75, .9], n_points=5000, bin_size=None, fg=None, ax1=None, ax2=None, ax3=None, ax4=None):
'''
# Inputs:
# @group_by: column name for grouping
# @agpu_jobs: optional input of gpuh_jobs
# @bin_size: bins size of timeseries.
# @fg: (optional) figure for plots. Should have 4 subplots. if not provided a default will be created.
# @ax1,2,3,4: Optional. if not provided will be created and/or drawn from fg.
# @ave_len_days: smoothing/averaging interval, in days.
# #qs: list/array-like of quantile thresholds.
'''
qs = numpy.array(qs)
#bin_size = bin_size or .1
if fg is None:
fg = plt.figure(figsize=(20,8))
for k in range(1,3):
fg.add_subplot(1,2,k)
#
# NOTE: this might break if (fg is None), even if ax1,2,3,4 are provided)
#ax1, ax2, ax3, ax4 = fg.get_axes()
ax1 = ax1 or fg.get_axes()[0]
ax3 = ax3 or fg.get_axes()[1]
#
ax1.grid()
ax3.grid()
#
ax1.set_title('Active GPUs', size=16)
ax3.set_title('Active GPUs, CDF', size=16)
#
if agpu_layer_cake is None:
agpu_layer_cake = self.get_active_gpus_layer_cake(layer_field=group_by, n_points=n_points, bin_size=bin_size, jobs_summary=jobs_summary)
#
#cpus = acpu_layer_cake['N_cpu']
#jobs = acpu_layer_cake['N_jobs']
T = agpu_layer_cake['N_gpu']['time']
#
lc_ngpu = plot_layer_cake(data=agpu_layer_cake['N_gpu'], ax=ax1)
#lc_njobs = plot_layer_cake(data=acpu_layer_cake['N_jobs'], ax=ax2)
#
# get an averaging length (number of TS elements) of about ave_len_days
ave_len = int(numpy.ceil(ave_len_days*len(T)/(T[-1] - T[0])))
z_gpu = ax1.get_lines()[-1].get_ydata()
#z_jobs = ax2.get_lines()[-1].get_ydata()
z_gpus_smooth = running_mean(z_gpu, ave_len)
#z_jobs_smooth = running_mean(z_jobs, ave_len)
#
ax1.plot(T[-len(z_gpus_smooth):], z_gpus_smooth, ls='-', marker='', lw=2, label=f'{ave_len_days} days-ave')
#ax2.plot(T[-len(z_jobs_smooth):], z_jobs_smooth, ls='-', marker='', lw=2, label=f'{ave_len_days} days-ave')
#
qs_gpus = numpy.quantile(z_gpu, qs)
#qs_jobs = numpy.quantile(z_jobs, qs)
#
hh_gpus = ax3.hist(z_gpu, bins=100, cumulative=True, density=True, histtype='step', lw=3.)
for x,y in zip(qs_gpus, qs):
#ax3.plot([0., qs_cpus[-1], qs_cpus[-1]], [qs[-1], qs[-1], 0.], ls='--', color='r', lw=2. )
ax3.plot([0., x, x], [y, y, 0.], ls='--', lw=2., label=f'{y*100.}th %: {x:.0f} gpus' )
#
#hh_jobs = ax4.hist(z_jobs, bins=100, cumulative=True, density=True, histtype='step', lw=3.)
#for x,y in zip(qs_jobs, qs):
#ax3.plot([0., qs_cpus[-1], qs_cpus[-1]], [qs[-1], qs[-1], 0.], ls='--', color='r', lw=2. )
#ax4.plot([0., x, x], [y, y, 0.], ls='--', lw=3., label=f'{y*100.}th %: {x:.0f} jobs' )
#
for ax in (ax1, ax3):
ax.legend(loc=0)
#
return fg
def get_active_gpus_layer_cake(self, jobs_summary=None, layer_field='Partition', layers=None, n_points=5000, bin_size=None, t_min=None, t_max=None, t_now=None, n_cpu=None, verbose=False, mpp_chunksize=10000, nan_to=0., NGPUs=None):
# (n_points=5000, bin_size=None, t_min=None, t_max=None, t_now=None, n_cpu=None, jobs_summary=None, verbose=None, mpp_chunksize=10000, nan_to=0.
if jobs_summary is None:
jobs_summary = self.jobs_summary
if NGPUs is None or NGPUs=='':
NGPUs = jobs_summary['NGPUs']
#
if t_now is None:
t_now = mpd.date2num(dtm.datetime.now() )
if t_min is None:
t_min = numpy.nanmin([jobs_summary['Start'], jobs_summary['End']])
#t_min = numpy.nanmin( [t_now, t_min] )
#
if t_max is None:
t_max = numpy.nanmax([jobs_summary['Start'], jobs_summary['End']])
print(f'*** DEBUG t_now: {t_now}, t_max: {t_max}')
t_max = numpy.nanmin([t_now, t_max])
#
if layers is None:
layers = [ky.decode() if hasattr(ky,'decode') else ky for ky in list(set(jobs_summary[layer_field]))]
layers = {ky:{} for ky in layers}
if verbose:
print('*** ', layers)
#
dtype_gpuh = [(s, '>f8') for s in ['time'] + list(layers.keys())]
dtype_jobs = dtype_gpuh
#
for j, ky in enumerate(layers.keys()):
# TODO: better to handle b'' vs '' via ix.astype(str), or similar approach.
ix = jobs_summary[layer_field] == (ky.encode() if hasattr(jobs_summary[layer_field][0], 'decode') else ky)
XX = self.active_jobs_gpu(n_points=n_points, bin_size=bin_size, t_min=t_min, t_max=t_max, t_now=t_now, n_cpu=n_cpu,
jobs_summary=jobs_summary[ix], verbose=verbose, mpp_chunksize=mpp_chunksize, nan_to=nan_to,
NGPUs=NGPUs[ix])
#
if verbose:
print('*** DEBUG: {}:: {}'.format(j, len(XX['N_gpu'])))
#print(f'*** DEBUG: {k}:: {len(XX['N_cpu'])}')
#
# create output_ arrays for j==0 (or equivalently output_cpus and output_jobs do not exist...)
if j==0:
output_gpus = numpy.empty( (len(XX['N_gpu']), ), dtype=dtype_gpuh )
output_jobs = numpy.empty( (len(XX['N_jobs']), ), dtype=dtype_jobs )
#
output_gpus[ky] = XX['N_gpu'][:]
output_jobs[ky] = XX['N_jobs'][:]
#
output_gpus['time'] = XX['time']
output_jobs['time'] = XX['time']
#
return {'N_gpu': output_gpus, 'N_jobs': output_jobs}
def active_jobs_gpu(self,n_points=5000, bin_size=None, t_min=None, t_max=None, t_now=None, n_cpu=None, jobs_summary=None, verbose=None, mpp_chunksize=10000, nan_to=0., NGPUs=None):
'''
# TODO: Facilitate GPU computation...
# We could just add a GPUs column to this, but I think a more elegant solution is to add NCPUS
# as an optional parameter. Default behavior will be to get it from jobs_sumamry (current behavior);
# alternatively, an array like NGPUs can be provided.
#
# Since this is now procedural, let's phase out ix?. the class-resident version can keep it if so desired.
# NOTE on parallelization: There is quite a bit of pre-processing front-matter, so it makes sense to separate the SPP and MPP, as opposed
# to doing a recursive callback. Basically, we put in a fair bit of work to construct our working data; it would be expensive to do it again,
# messy to integrate it into the call signature, etc. In other words, if you're adding a lot of inputs and input handling to accomodate MPP,
# it's probably better to just outsource it. See MPP section below.
#
# TODO: assess parallel strategy; maybe adjust chunksize to avoid too-big-to-parallel problems.
# @n_points: number of points in returned time series.
# @bin_size: size of bins. This will override n_points
# @t_min: start time (aka, bin phase).
# @ix: an index, aka user=my_user
# NOTE: see @mpp_chunksize below: speed performance was a big problem on Mazama, probably because it was IO limited? Not
# seeing the same problem(s) on Sherlock, so some of these more desperate discussions of performance optimization can probably
# be ignored.
# @mpp_chunksize: batch size for MPP, in this case probably an apply_async(). Note that this has a significant affect on speed and parallelization
# performance, and optimal performance is probably related to maximal use of cache. For example, for moderate size data sets, the SPP speed might be ~20 sec,
# and the 2cpu, chunk_size=20000 might be 2 sec, then 1.5, 1.2, for 3,4 cores. toying with the chunk_size parameter suggests that cache size is the thing. It is
# probably worth circling back to see if map_async() can be made to work, since it probably has algorithms to estimate optimal chunk_size.
# @nan_to={val} set nan values to {val}. If None, skip this step. Default is 0.
# @NCPUs=None : vector of NCPU values. Default will be to use jobs_summary['NCPUS'], or we can provide -- for