Skip to content

Commit

Permalink
Bug fixes to the numpy reader (#646)
Browse files Browse the repository at this point in the history
* Bug fixes to the numpy reader

* Added extra_cols based indicator handling, they are used for rulelets only, they only go on disk
  • Loading branch information
mariumof authored Feb 6, 2023
1 parent d4a8d53 commit 639fc6f
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 43 deletions.
2 changes: 1 addition & 1 deletion smdebug/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.0.26"
__version__ = "1.0.27"
107 changes: 65 additions & 42 deletions smdebug/profiler/system_metrics_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import io
import sys
import math
import pickle
import pandas as pd
import numpy as np
Expand Down Expand Up @@ -75,7 +76,7 @@ def _get_event_files_in_the_range(
available timestamp
"""

if self._startAfter_prefix is not "":
if self._startAfter_prefix != "":
if end_time_microseconds >= get_utctimestamp_us_since_epoch_from_system_profiler_file(
self._startAfter_prefix
):
Expand Down Expand Up @@ -202,13 +203,14 @@ class S3NumpySystemMetricsReader(S3SystemMetricsReader):
fill_val = 101

def __init__(self, s3_trial_path, use_in_memory_cache,
col_names, col_dict,
col_names, extra_col_names, col_dict,
np_store, store_chunk_size, store_time_file_prefix,
group_size, n_nodes, frequency,
accumulate_forward = False, accumulate_minutes = 20,
n_process=None, logger = None):
super().__init__(s3_trial_path, use_in_memory_cache)
self.col_names = col_names # list of names of indicators
self.extra_col_names = extra_col_names # list of extra indicators, not stored in memory, as opposed to col_names
self.col_dict = col_dict # mapping from name to position. Possibly will decide not needed
self.np_store = np_store
self.np_chunk_size = store_chunk_size
Expand Down Expand Up @@ -273,7 +275,9 @@ def init_accumulators(self):

num_rows = self.accu_mins * 60 * 10 # Max, at highest frequency
num_rows = num_rows + num_rows//10 # Buffer
num_cols = len(self.col_names)
# extra_col_names treated separately because, unlike col_names indices,
# the indices corresponding to col_names, they are stored on disk only
num_cols = len(self.col_names) + len(self.extra_col_names)
self.accu_n_rows = num_rows

for i in range (0, self.n_nodes):
Expand All @@ -299,9 +303,10 @@ def init_accumulators(self):
self.logger.info("NumpyS3Reader: ALLOCATED ACCU MEM for node {}".format(i))

def _json_to_numpy(node_ind, start_time, end_time, freq_delta,
num_rows, num_cols, np_chunk_size, event_data_list,
num_rows, num_cols, num_extra_cols,
np_chunk_size, event_data_list,
shared_mem_id, col_dict,
accu_val_mem_id, accu_time_mem_id,
accu_val_mem_id, accu_time_mem_id,
accu_cnt_mem_id, accu_num_rows,
np_store, tprefix, queue, logger):

Expand Down Expand Up @@ -329,25 +334,26 @@ def _json_to_numpy(node_ind, start_time, end_time, freq_delta,

accu_shm_count = shared_memory.SharedMemory(name=accu_cnt_mem_id)
accu_counts =\
np.ndarray((num_cols,), dtype=np.int32, buffer=accu_shm_count.buf)
np.ndarray((num_cols+num_extra_cols,),
dtype=np.int32, buffer=accu_shm_count.buf)

accu_shm_val = shared_memory.SharedMemory(name=accu_val_mem_id)
accu_vals = np.ndarray((accu_num_rows, num_cols),
accu_vals = np.ndarray((accu_num_rows, num_cols+num_extra_cols),
dtype=np.int32, buffer=accu_shm_val.buf)

accu_shm_time = shared_memory.SharedMemory(name=accu_time_mem_id)
accu_times = np.ndarray((accu_num_rows, num_cols),
accu_times = np.ndarray((accu_num_rows, num_cols+num_extra_cols),
dtype=np.int64, buffer=accu_shm_time.buf)

n_extra = np_chunk_size//100
for i in range (0, num_chunks):
np_val_chunks[i] =\
np.full((np_chunk_size+n_extra, num_cols),
np.full((np_chunk_size+n_extra, num_cols+num_extra_cols),
-1, dtype = np.int32)
np_time_chunks[i] =\
np.full((np_chunk_size+n_extra, num_cols),
np.full((np_chunk_size+n_extra, num_cols+num_extra_cols),
-1, dtype = np.int64)
np_ragged_sizes[i] = np.zeros((num_cols,), dtype = np.int32)
np_ragged_sizes[i] = np.zeros((num_cols+num_extra_cols,), dtype = np.int32)

separator = S3NumpySystemMetricsReader.separator

Expand All @@ -359,23 +365,24 @@ def _json_to_numpy(node_ind, start_time, end_time, freq_delta,

n_zeros = 0
n_nonzeros = 0
network_used = []
cpu_memory_used = []
for event_data in event_data_list:
event_string = event_data.decode("utf-8")
event_items = event_string.split("\n")
event_items.remove("")
for item in event_items:
event = json.loads(item) #ojson better
if event['Dimension'] == "Algorithm":
network_used.append(int(event['Value']))
continue
if event['Name'] == "MemoryUsedPercent":
cpu_memory_used.append( float(event['Value']))
continue
if event['Name'].startswith("cpu") == False and\
if event['Dimension'] != "Algorithm" and\
event['Name'] != "MemoryUsedPercent" and\
event['Name'].startswith("cpu") == False and\
event['Name'].startswith("gpu") == False:
continue

if event['Name'] == "MemoryUsedPercent":
# More informative, when very little RAM work
event['Value'] = math.ceil(event['Value'])

is_extra = (event['Dimension'] == "Algorithm" or\
event['Name'] == "MemoryUsedPercent")
col_ind = col_dict[event['Name']+separator+event['Dimension']]
cur_time = int(event['Timestamp']*1000000) #fast

Expand All @@ -389,11 +396,12 @@ def _json_to_numpy(node_ind, start_time, end_time, freq_delta,
chunk_ind = row_ind//np_chunk_size
chunk_row_ind = row_ind - chunk_ind*np_chunk_size

np_arr[row_ind,col_ind] = event['Value']
if event['Value'] == 0:
n_zeros += 1
else:
n_nonzeros += 1
if is_extra is False:
np_arr[row_ind,col_ind] = event['Value']
if event['Value'] == 0:
n_zeros += 1
else:
n_nonzeros += 1

try:
if accu_val_mem_id is not None and accu_counts[col_ind] < accu_num_rows:
Expand Down Expand Up @@ -435,7 +443,7 @@ def _json_to_numpy(node_ind, start_time, end_time, freq_delta,
max_entries_in_chunk = 0
min_time_in_chunk = max_time # This will yield a filename

for col_ind in range (0, num_cols):
for col_ind in range (0, num_cols+num_extra_cols):
n_entries = np_ragged_sizes[chunk_ind][col_ind]
max_entries_in_chunk = max(max_entries_in_chunk, n_entries)

Expand All @@ -458,8 +466,7 @@ def _json_to_numpy(node_ind, start_time, end_time, freq_delta,
continue

if max_entries_in_chunk > 0:

shp = (np_chunk_size+n_extra, num_cols)
shp = (np_chunk_size+n_extra, num_cols+num_extra_cols)
S3NumpySystemMetricsReader.store_vals_times(
node_ind, min_time_in_chunk, np_store, tprefix,
shp, np_val_chunks[chunk_ind], np_time_chunks[chunk_ind])
Expand All @@ -469,12 +476,7 @@ def _json_to_numpy(node_ind, start_time, end_time, freq_delta,
#print("RAGGED min_time_in_chunk type: {}".format(type(min_time_in_chunk.item())))
#print("RAGGED np_ragged_sized type: {}".format(type(np_ragged_sizes[chunk_ind])))

network_used = np.array(network_used)
cpu_memory_used = np.array(cpu_memory_used)
S3NumpySystemMetricsReader.store_vals(node_ind, min_time, np_store, (len(network_used),), network_used, val_type="Network")
S3NumpySystemMetricsReader.store_vals(node_ind, min_time, np_store, (len(cpu_memory_used),), cpu_memory_used, val_type="CPUmemory", dtype=float)

logger.info("S3NumpyReader _json_to_numpy FINISHED for node {}".format(node_ind))
logger.info("S3NumpyReader _json_to_numpy FINISHED for node {} min_row {}, max_row {}, min_time {}, max_time {}".format(node_ind, min_row, max_row, min_time, max_time))
queue.put((node_ind, min_row, max_row, min_time, max_time, jagged_metadata))

def get_events(
Expand Down Expand Up @@ -542,6 +544,8 @@ def get_events(

num_rows = (end_time-start_time)//(self.frequency*1000)
num_cols = len(self.col_names)
# Not stored in memory, as opposed to col_names based indicators:
num_extra_cols = len(self.extra_col_names)
np_chunk_size = self.np_chunk_size
self.logger.info("NumpyS3Reader: untrimmed DF shape ({},{})".format(num_rows,len(self.col_names)))
np_arr = np.full((num_rows, num_cols), np.nan)
Expand Down Expand Up @@ -581,7 +585,7 @@ def get_events(
for i in range (0, n_nodes):
tasks[i] = Process(target=S3NumpySystemMetricsReader._json_to_numpy,
args=(i, start_time, end_time, freq_delta,
num_rows, num_cols, np_chunk_size,
num_rows, num_cols, num_extra_cols, np_chunk_size,
event_data_lists[i],
shared_mem_ids[i], copy.deepcopy(self.col_dict),
self.accu_val_mem_ids[i] if forward else None,
Expand Down Expand Up @@ -632,6 +636,16 @@ def get_events(

# Could multiprocess the backfill as well. Not a bottleneck for now
post_process = True
"""
Logic for fl below.
We want to backfill, but not when the user pauses profiling
Also, the user may have switched profiling frequencies, max being 60000,
our original being possibly 100. So we deem as "interruption"
if we do not see a signal within
fudge_factor * 60000 * "max freq" / "our preq"
worst case is 3 minutes
"""
fl = int(max(5, 3*60000/self.frequency))
for i in range (0, n_nodes):
shm = shared_memory.SharedMemory(name=shared_mem_ids[i])
arr_from_sh = np.ndarray(num_rows*num_cols,
Expand All @@ -646,8 +660,14 @@ def get_events(
self.logger.info("S3NumpyReader: {} nans out of {}".format(mask.sum(), np_arr.size))
st_loc = time.perf_counter_ns()
temp_df = pd.DataFrame(np_arr)
temp_df.fillna(method='ffill', axis=0, inplace=True)
temp_df.fillna(method='bfill', axis=0, inplace=True)

# Fill at most 5 missing values, if more, there is a gap
temp_df.fillna(method='ffill', axis=0, limit = fl, inplace=True)
temp_df.fillna(method='bfill', axis=0, limit = fl, inplace=True)
# If anything is left to fill, we had a profiling gap. Zero it
temp_df.fillna(0, axis=0, inplace=True)
temp_df.fillna(0, axis=0, inplace=True)

fill_val = S3NumpySystemMetricsReader.fill_val
temp_df.fillna(fill_val, axis=0, inplace=True)
np_arr = temp_df.values
Expand Down Expand Up @@ -696,7 +716,8 @@ def collect_accu_metadata(self, start_time: int, end_time: int, forward: bool):

self.logger.info ("S3NumpyReader: writting accumulated data")

num_cols = len(self.col_names)
# both col_names based and extra_col_names based indicators go on disk:
num_cols = len(self.col_names) + len(self.extra_col_names)
num_rows = self.accu_n_rows
np_store = self.np_store
tprefix = self.tprefix
Expand Down Expand Up @@ -807,11 +828,10 @@ def store_vals_times(node_ind, min_time_in_chunk, np_store, tprefix,
S3NumpySystemMetricsReader.dump_to_disk(np_store, node_name, time_filename, np_time, shp, dtype=np.int64)

@staticmethod
def store_vals(node_ind, min_time, np_store, shp, np_data, val_type="", dtype=np.int64):
def store_vals(node_ind, np_store, shp, np_data, val_type="", dtype=np.int32):
node_name = S3NumpySystemMetricsReader.node_name_from_index(node_ind)
separator = S3NumpySystemMetricsReader.separator
filename = val_type + separator + str(min_time) + separator + str(node_ind+1) + \
".npy"
filename = val_type + separator + str(node_ind+1) + ".npy"
if np_store.startswith("s3://"):
S3NumpySystemMetricsReader.dump_to_s3(np_store, node_name, filename, np_data)
else:
Expand All @@ -829,9 +849,12 @@ def dump_to_s3(s3_storage_loc, node_name, filename, np_data):
s3_client.upload_fileobj(data_stream, bucket, filepath)

@staticmethod
def dump_to_disk(disk_storage_loc, node_name, filename, np_data, shp, dtype=np.int64):
def dump_to_disk(disk_storage_loc, node_name, filename, np_data, shp, dtype=np.int32):
directory = os.path.join(disk_storage_loc, node_name)
filepath = os.path.join(disk_storage_loc, node_name, filename)

if not os.path.exists(directory):
os.makedirs(directory)
fp_numpy = np.memmap(filepath,
dtype=dtype, offset=0, mode='w+', shape = shp)

Expand Down

0 comments on commit 639fc6f

Please sign in to comment.