Skip to content

Commit

Permalink
multiprocessor is added
Browse files Browse the repository at this point in the history
  • Loading branch information
ARSadri committed Sep 1, 2023
1 parent 1d1b96d commit 78b2b4d
Show file tree
Hide file tree
Showing 10 changed files with 604 additions and 31 deletions.
9 changes: 7 additions & 2 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,12 @@ History
* more tests are added.
* moved multichannel_to_frame to utils

0.9.1 (2023-09-01)
0.9.1 (2023-08-25)
-----------------
* bug removed from plt_utils numbers_as_images_4D.
* bug removed from printprogress when number of steps is very low.
* bug removed from printprogress when number of steps is very small.

0.10.0 (2023-09-01)
-----------------
* I added multiprocessor to lognflow
* bug fixed in logviewer
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,30 @@ The ```printprogress``` makes a pretty nice progress bar.

```

There is also a conviniant way to use multiprocessing in Python. You wish to
provide a function and some shared inputs and ask to run the function over
those inputs using multiprcessing. The ```multiprocessor``` is for you. The
following is a masked median of verctors::

```python
def some_func(idx, shared_inputs):
data, mask, statistics_func = shared_inputs
_data = data[idx]
_mask = mask[idx]
vector_to_analyze = _data[_mask==1]
to_return = statistics_func(vector_to_analyze)
return(np_array([to_return]))

data_shape = (1000, 1000000)
data = np.random.randn(*data_shape)
mask = (2*np.random.rand(*data_shape)).astype('int')
statistics_func = np.median

shared_inputs = (data, mask, op_type)
medians = multiprocessor(some_function, shared_inputs).start()

```

## Introduction

In this package we use a folder on the HDD to generate files and folders in typical
Expand Down
21 changes: 21 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,27 @@ The printprogress makes a pretty nice progress bar::
for _ in range(N):
# do_something()
pbar()
There is also a conviniant way to use multiprocessing in Python. You wish to
provide a function and some shared inputs and ask to run the function over
those inputs using multiprcessing. The multiprocessor is for you. The
following is a masked median of verctors::

def some_func(idx, shared_inputs):
data, mask, statistics_func = shared_inputs
_data = data[idx]
_mask = mask[idx]
vector_to_analyze = _data[_mask==1]
to_return = statistics_func(vector_to_analyze)
return(np_array([to_return]))
data_shape = (1000, 1000000)
data = np.random.randn(*data_shape)
mask = (2*np.random.rand(*data_shape)).astype('int')
statistics_func = np.median
shared_inputs = (data, mask, op_type)
medians = multiprocessor(some_function, shared_inputs).start()

In this package we use a folder on the HDD to generate files and folders in typical
formats such as numpy npy and npz, png, ... to log. A log viewer is also availble
Expand Down
5 changes: 3 additions & 2 deletions lognflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

__author__ = 'Alireza Sadri'
__email__ = '[email protected]'
__version__ = '0.9.1'
__version__ = '0.10.0'

from .lognflow import lognflow
from .logviewer import logviewer
from .printprogress import printprogress
from .plt_utils import plt_colorbar
from .utils import (
select_directory, select_file, repr_raw, replace_all,
text_to_object, multichannel_to_frame)
text_to_object, multichannel_to_frame)
from .multiprocessor import multiprocessor
74 changes: 49 additions & 25 deletions lognflow/lognflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,21 @@
before storing into the directory using log_var(name, var).
"""

from pathlib import Path as pathlib_Path
import time
from itertools import product as itertools_product
from dataclasses import dataclass
import numpy as np
from sys import platform as sys_platform
from os import system as os_system

import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from matplotlib import animation
from .utils import (
repr_raw, replace_all, select_directory, multichannel_to_frame)
from .plt_utils import plt_colorbar, plt_hist
from pathlib import Path as pathlib_Path
from itertools import product as itertools_product
from dataclasses import dataclass
from sys import platform as sys_platform
from os import system as os_system

from .logviewer import logviewer
from .utils import (
repr_raw, replace_all, select_directory, multichannel_to_frame)
from .plt_utils import plt_colorbar, plt_hist
from .logviewer import logviewer

@dataclass
class varinlog:
Expand Down Expand Up @@ -193,11 +191,18 @@ def __init__(self,
self.log_flush_period = log_flush_period

self.log_dir_str = str(self.log_dir.absolute())
self.enabled = True

def disable(self):
self.enabled = False

def enable(self):
self.enabled = True

def name_from_file(self, fpath):
"""
Given a fpath inside the logger log_dir, what would be its
equivalent parameter_name?
Given an fpath inside the logger log_dir,
what would be its equivalent parameter_name?
"""
fpath_str = str(fpath.absolute())
log_dir_str = None
Expand All @@ -210,11 +215,11 @@ def name_from_file(self, fpath):
fpath_split = fpath_name.split('.')
return '.'.join(fpath_split[:-1])

def copy(self, parameter_name, source, suffix = None,
def copy(self, parameter_name, source, suffix = None,
time_tag: bool = None):
""" copy into a new file
Given a parameter_name, the second argument will be copied into
the first. We will try syntaxes os_system('cp') and copy for
the first. We will try syntaxes os_system('cp') and 'copy' for
Windows.
:param parameter_name: str
Expand All @@ -227,6 +232,8 @@ def copy(self, parameter_name, source, suffix = None,
obtain a list of files matching the source and copy them into
their new location.
"""
if not self.enabled: return

try:
if source.is_file():
flist = [source]
Expand All @@ -245,12 +252,10 @@ def copy(self, parameter_name, source, suffix = None,
new_param_name = param_name
if suffix is None:
suffix = fpath.suffix
fpath_dest = self._get_fpath(param_dir, new_param_name,
suffix, time_tag)
fpath_dest = self._get_fpath(
param_dir, new_param_name, suffix, time_tag)

if sys_platform == "linux" or sys_platform == "linux2":
os_system(f'cp {fpath} {fpath_dest}')
elif sys_platform == "darwin":
if sys_platform in ["linux", "linux2", "darwin"]:
os_system(f'cp {fpath} {fpath_dest}')
elif sys_platform == "win32":
os_system(f'copy {fpath} {fpath_dest}')
Expand Down Expand Up @@ -279,6 +284,8 @@ def rename(self, new_name:str, append: bool = False):
:type append: bool
"""
if not self.enabled: return

self.flush_all()
if(append):
log_dir_name = ''
Expand Down Expand Up @@ -444,6 +451,7 @@ def log_text_flush(self, log_name = None, flush = False, suffix = None):
default: False
:type flush: bool
"""
if not self.enabled: return
log_name = self.log_name if (log_name is None) else log_name

param_dir, param_name, suffix = self._param_dir_name_suffix(
Expand Down Expand Up @@ -514,6 +522,7 @@ def log_text(self,
:param suffix: str
suffix is the extension of the file name.
"""
if not self.enabled: return
time_tag = self.time_tag if (time_tag is None) else time_tag
log_flush_period = self.log_flush_period \
if (log_flush_period is None) else log_flush_period
Expand Down Expand Up @@ -603,6 +612,7 @@ def log_var(self, parameter_name: str, parameter_value,
log_size_limit in bytes, default: 1e+7.
"""
if not self.enabled: return
try:
_ = parameter_value.shape
except:
Expand Down Expand Up @@ -671,6 +681,7 @@ def log_var_flush(self, parameter_name: str, suffix: str = None):
parameter_name can be just a name e.g. myvar, or could be a
path like name such as myscript/myvar.
"""
if not self.enabled: return
param_dir, param_name, suffix = self._param_dir_name_suffix(
parameter_name, suffix)
if(suffix is None):
Expand Down Expand Up @@ -710,6 +721,7 @@ def get_var(self, parameter_name: str, suffix: str = None) -> tuple:
tuple of two nd.arrays
"""
if not self.enabled: return
param_dir, param_name, suffix = self._param_dir_name_suffix(
parameter_name, suffix)
if(suffix is None):
Expand Down Expand Up @@ -753,6 +765,7 @@ def log_single(self, parameter_name: str,
Wheather if the time stamp is in the file name or not.
"""
if not self.enabled: return
time_tag = self.time_tag if (time_tag is None) else time_tag

param_dir, param_name, suffix = self._param_dir_name_suffix(
Expand Down Expand Up @@ -805,6 +818,7 @@ def log_plt(self,
Wheather if the time stamp is in the file name or not.
"""
if not self.enabled: return
time_tag = self.time_tag if (time_tag is None) else time_tag

param_dir, param_name, image_format = \
Expand Down Expand Up @@ -848,7 +862,7 @@ def log_multichannel_by_subplots(self,
Wheather if the time stamp is in the file name or not.
"""

if not self.enabled: return
time_tag = self.time_tag if (time_tag is None) else time_tag

n_r, n_c, n_ch = parameter_value.shape
Expand Down Expand Up @@ -900,6 +914,7 @@ def log_plot(self, parameter_name: str,
Wheather if the time stamp is in the file name or not.
"""
if not self.enabled: return
time_tag = self.time_tag if (time_tag is None) else time_tag

try:
Expand Down Expand Up @@ -969,6 +984,7 @@ def log_hist(self, parameter_name: str,
Wheather if the time stamp is in the file name or not.
"""
if not self.enabled: return
time_tag = self.time_tag if (time_tag is None) else time_tag

plt_hist(parameter_value_list,
Expand Down Expand Up @@ -998,6 +1014,7 @@ def log_scatter3(self, parameter_name: str,
Wheather if the time stamp is in the file name or not.
"""
if not self.enabled: return
time_tag = self.time_tag if (time_tag is None) else time_tag

fig = plt.figure()
Expand Down Expand Up @@ -1029,6 +1046,7 @@ def log_surface(self, parameter_name: str,
Wheather if the time stamp is in the file name or not.
rest of the parameters (**kwargs) will be passed to plot_surface()
"""
if not self.enabled: return
time_tag = self.time_tag if (time_tag is None) else time_tag

from mpl_toolkits.mplot3d import Axes3D
Expand Down Expand Up @@ -1063,8 +1081,8 @@ def log_hexbin(self, parameter_name: str, parameter_value,
grid size is the number of bins in 2D
:param time_tag: bool
Wheather if the time stamp is in the file name or not.
"""
if not self.enabled: return
time_tag = self.time_tag if (time_tag is None) else time_tag

try:
Expand Down Expand Up @@ -1102,8 +1120,8 @@ def log_imshow(self, parameter_name: str, parameter_value,
* (n, m, 3, ch)
:param time_tag: bool
Wheather if the time stamp is in the file name or not.
"""
if not self.enabled: return
time_tag = self.time_tag if (time_tag is None) else time_tag

parameter_value = parameter_value.squeeze()
Expand Down Expand Up @@ -1137,7 +1155,7 @@ def log_imshow(self, parameter_name: str, parameter_value,
fig, ax = plt.subplots()
im = ax.imshow(parameter_value, cmap = cmap, **kwargs)
if(colorbar):
plt.colorbar(im)
plt_colorbar(im)
if(remove_axis_ticks):
plt.setp(ax, xticks=[], yticks=[])
else:
Expand Down Expand Up @@ -1207,7 +1225,8 @@ def prepare_stack_of_images(self,
:param borders: float
borders between tiles will be filled with this variable
default: np.nan
"""
"""
if not self.enabled: return
list_of_stacks = list(list_of_stacks)
for cnt, stack in enumerate(list_of_stacks):
stack = self._handle_images_stack(stack, borders = borders)
Expand Down Expand Up @@ -1267,6 +1286,7 @@ def log_canvas(self,
Wheather if the time stamp is in the file name or not.
"""
if not self.enabled: return
time_tag = self.time_tag if (time_tag is None) else time_tag

try:
Expand Down Expand Up @@ -1391,6 +1411,7 @@ def log_confusion_matrix(self,
model_selection/plot_confusion_matrix.html
"""
if not self.enabled: return
accuracy = np.trace(cm) / np.sum(cm).astype('float')
misclass = 1 - accuracy

Expand Down Expand Up @@ -1447,6 +1468,7 @@ def log_animation(self, parameter_name: str, stack,
:param time_tag: bool
Wheather if the time stamp is in the file name or not.
"""
if not self.enabled: return
time_tag = self.time_tag if (time_tag is None) else time_tag

param_dir, param_name, suffix = self._param_dir_name_suffix(
Expand All @@ -1468,6 +1490,7 @@ def log_animation(self, parameter_name: str, stack,
return fpath

def flush_all(self):
if not self.enabled: return
for log_name in list(self._loggers_dict):
self.log_text_flush(log_name, flush = True)
for parameter_name in list(self._vars_dict):
Expand Down Expand Up @@ -1500,6 +1523,7 @@ def __call__(self, *args, **kwargs):
The text (str(...)) will be passed to the main log text file.
"""
self.log_text(None, *args, **kwargs)
self.flush_all()

def __del__(self):
try:
Expand Down
Loading

0 comments on commit 78b2b4d

Please sign in to comment.