Skip to content

Commit

Permalink
Fix to enable correct iteration over non-lists like numpy arrays; non…
Browse files Browse the repository at this point in the history
…-iterables now require partial
  • Loading branch information
swansonk14 committed Apr 9, 2020
1 parent ce6d84c commit eb122f7
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 139 deletions.
2 changes: 1 addition & 1 deletion LICENSE.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Copyright (c) 2019 Kyle Swanson
Copyright (c) 2020 Kyle Swanson

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,19 +150,25 @@ for result in iterator:

### Arguments

All `p_tqdm` functions accept any number of lists (of the same length) as input, as long as the number of lists matches the number of arguments of the function. Additionally, if any non-list variable is passed as an input to a `p_tqdm` function, the variable will be passed to all calls of the function. See the example below.
All `p_tqdm` functions accept any number of iterables as input, as long as the number of iterables matches the number of arguments of the function.

To repeat a non-iterable argument along with the iterables, use Python's [partial](https://docs.python.org/3/library/functools.html#functools.partial) from the [functools](https://docs.python.org/3/library/functools.html) library. See the example below.

```python
from functools import partial

l1 = ['1', '2', '3']
l2 = ['a', 'b', 'c']

def add(a, b, c):
def add(a, b, c=''):
return a + b + c

added = p_map(add, l1, l2, '!')
added = p_map(partial(add, c='!'), l1, l2)
# added == ['1a!', '2b!', '3c!']
```

### CPUs

All the parallel `p_tqdm` functions can be passed the keyword `num_cpus` to indicate how many CPUs to use. The default is all CPUs. `num_cpus` can either be an integer to indicate the exact number of CPUs to use or a float to indicate the proportion of CPUs to use.

Note that the parallel Pool objects used by `p_tqdm` are automatically closed when the map finishes processing.
131 changes: 40 additions & 91 deletions p_tqdm/p_tqdm.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,171 +8,120 @@
t_imap: Returns an iterator for a sequential map.
"""

from typing import Any, Callable, Generator
from collections import Sized
from typing import Any, Callable, Generator, Iterable, List

from pathos.helpers import cpu_count
from pathos.multiprocessing import ProcessPool as Pool
from tqdm.auto import tqdm


def _parallel(ordered: bool, function: Callable, *arrays: list, **kwargs: Any) -> Generator:
def _parallel(ordered: bool, function: Callable, *iterables: Iterable, **kwargs: Any) -> Generator:
"""Returns a generator for a parallel map with a progress bar.
Arguments:
ordered(bool): True for an ordered map, false for an unordered map.
function(Callable): The function to apply to each element
of the given arrays.
arrays(Tuple[list]): One or more arrays of the same length
containing the data to be mapped. If a non-list
variable is passed, it will be repeated a number
of times equal to the lengths of the list(s). If only
non-list variables are passed, the function will be
performed num_iter times.
num_cpus(int): The number of cpus to use in parallel.
If an int, uses that many cpus.
If a float, uses that proportion of cpus.
If None, uses all available cpus.
num_iter(int): If only non-list variables are passed, the
function will be performed num_iter times on
these variables. Default: 1.
function(Callable): The function to apply to each element of the given Iterables.
iterables(Tuple[Iterable]): One or more Iterables containing the data to be mapped.
Returns:
A generator which will apply the function
to each element of the given arrays in
parallel in order with a progress bar.
A generator which will apply the function to each element of the given Iterables
in parallel in order with a progress bar.
"""

# Convert tuple to list
arrays = list(arrays)

# Extract kwargs
# Extract num_cpus
num_cpus = kwargs.pop('num_cpus', None)
num_iter = kwargs.pop('num_iter', 1)

# Determine num_cpus
if num_cpus is None:
num_cpus = cpu_count()
elif type(num_cpus) == float:
num_cpus = int(round(num_cpus * cpu_count()))

# Determine num_iter when at least one list is present
if any([type(array) == list for array in arrays]):
num_iter = max([len(array) for array in arrays if type(array) == list])

# Convert single variables to lists
# and confirm lists are same length
for i, array in enumerate(arrays):
if type(array) != list:
arrays[i] = [array for _ in range(num_iter)]
else:
assert len(array) == num_iter
# Determine length of tqdm (equal to length of shortest iterable)
length = min(len(iterable) for iterable in iterables if isinstance(iterable, Sized))

# Create parallel generator
map_type = 'imap' if ordered else 'uimap'
pool = Pool(num_cpus)
map_func = getattr(pool, map_type)

for item in tqdm(map_func(function, *arrays), total=num_iter, **kwargs):
for item in tqdm(map_func(function, *iterables), total=length, **kwargs):
yield item

pool.clear()


def p_map(function: Callable, *arrays: list, **kwargs: Any) -> list:
def p_map(function: Callable, *iterables: Iterable, **kwargs: Any) -> List[Any]:
"""Performs a parallel ordered map with a progress bar."""

ordered = True
iterator = _parallel(ordered, function, *arrays, **kwargs)
result = list(iterator)
generator = _parallel(ordered, function, *iterables, **kwargs)
result = list(generator)

return result


def p_imap(function: Callable, *arrays: list, **kwargs: Any) -> Generator:
"""Returns an iterator for a parallel ordered map with a progress bar."""
def p_imap(function: Callable, *iterables: Iterable, **kwargs: Any) -> Generator:
"""Returns a generator for a parallel ordered map with a progress bar."""

ordered = True
iterator = _parallel(ordered, function, *arrays, **kwargs)
generator = _parallel(ordered, function, *iterables, **kwargs)

return iterator
return generator


def p_umap(function: Callable, *arrays: list, **kwargs: Any) -> list:
def p_umap(function: Callable, *iterables: Iterable, **kwargs: Any) -> List[Any]:
"""Performs a parallel unordered map with a progress bar."""

ordered = False
iterator = _parallel(ordered, function, *arrays, **kwargs)
result = list(iterator)
generator = _parallel(ordered, function, *iterables, **kwargs)
result = list(generator)

return result


def p_uimap(function: Callable, *arrays: list, **kwargs: Any) -> Generator:
"""Returns an iterator for a parallel unordered map with a progress bar."""
def p_uimap(function: Callable, *iterables: Iterable, **kwargs: Any) -> Generator:
"""Returns a generator for a parallel unordered map with a progress bar."""

ordered = False
iterator = _parallel(ordered, function, *arrays, **kwargs)
generator = _parallel(ordered, function, *iterables, **kwargs)

return iterator
return generator


def _sequential(function: Callable, *arrays: list, **kwargs: Any) -> Generator:
def _sequential(function: Callable, *iterables: Iterable, **kwargs: Any) -> Generator:
"""Returns a generator for a sequential map with a progress bar.
Arguments:
function(Callable): The function to apply to each element
of the given arrays.
arrays(Tuple[list]): One or more arrays of the same length
containing the data to be mapped. If a non-list
variable is passed, it will be repeated a number
of times equal to the lengths of the list(s). If only
non-list variables are passed, the function will be
performed num_iter times.
num_iter(int): If only non-list variables are passed, the
function will be performed num_iter times on
these variables. Default: 1.
function(Callable): The function to apply to each element of the given Iterables.
iterables(Tuple[Iterable]): One or more Iterables containing the data to be mapped.
Returns:
A generator which will apply the function
to each element of the given arrays sequentially
in order with a progress bar.
A generator which will apply the function to each element of the given Iterables
sequentially in order with a progress bar.
"""

# Convert tuple to list
arrays = list(arrays)

# Extract kwargs
num_iter = kwargs.pop('num_iter', 1)

# Determine num_iter when at least one list is present
if any([type(array) == list for array in arrays]):
num_iter = max([len(array) for array in arrays if type(array) == list])

# Convert single variables to lists
# and confirm lists are same length
for i, array in enumerate(arrays):
if type(array) != list:
arrays[i] = [array for _ in range(num_iter)]
else:
assert len(array) == num_iter
# Determine length of tqdm (equal to length of shortest iterable)
length = min(len(iterable) for iterable in iterables if isinstance(iterable, Sized))

# Create sequential generator
for item in tqdm(map(function, *arrays), total=num_iter, **kwargs):
for item in tqdm(map(function, *iterables), total=length, **kwargs):
yield item


def t_map(function: Callable, *arrays: list, **kwargs: Any) -> list:
def t_map(function: Callable, *iterables: Iterable, **kwargs: Any) -> List[Any]:
"""Performs a sequential map with a progress bar."""

iterator = _sequential(function, *arrays, **kwargs)
result = list(iterator)
generator = _sequential(function, *iterables, **kwargs)
result = list(generator)

return result


def t_imap(function: Callable, *arrays: list, **kwargs: Any) -> Generator:
"""Returns an iterator for a sequential map with a progress bar."""
def t_imap(function: Callable, *iterables: Iterable, **kwargs: Any) -> Generator:
"""Returns a generator for a sequential map with a progress bar."""

iterator = _sequential(function, *arrays, **kwargs)
generator = _sequential(function, *iterables, **kwargs)

return iterator
return generator
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@

setup(
name='p_tqdm',
version='1.3.2',
version='1.3.3',
author='Kyle Swanson',
author_email='[email protected]',
description='Parallel processing with progress bars',
long_description=long_description,
long_description_content_type='text/markdown',
url='https://github.com/swansonk14/p_tqdm',
download_url='https://github.com/swansonk14/p_tqdm/v_1.3.2.tar.gz',
download_url='https://github.com/swansonk14/p_tqdm/v_1.3.3.tar.gz',
license='MIT',
packages=find_packages(),
install_requires=[
Expand Down
61 changes: 19 additions & 42 deletions tests/tests.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import unittest
from functools import partial

from p_tqdm import p_map, p_imap, p_umap, p_uimap, t_map, t_imap

Expand All @@ -11,8 +12,8 @@ def add_2(a, b):
return a + b


def add_3(a, b, c):
return a + b + c
def add_3(a, b, c=0):
return a + 2 * b + 3 * c


class Test_p_map(unittest.TestCase):
Expand Down Expand Up @@ -51,11 +52,11 @@ def test_two_lists_and_one_single(self):
array_1 = [1, 2, 3]
array_2 = [10, 11, 12]
single = 5
result = self.func(add_3, array_1, single, array_2)
result = self.func(partial(add_3, single), array_1, array_2)
if self.generator:
result = list(result)

correct_array = [16, 18, 20]
correct_array = [37, 42, 47]
if self.ordered:
self.assertEqual(correct_array, result)
else:
Expand All @@ -65,63 +66,39 @@ def test_one_list_and_two_singles(self):
array = [1, 2, 3]
single_1 = 5
single_2 = -2
result = self.func(add_3, single_1, array, single_2)
result = self.func(partial(add_3, single_1, c=single_2), array)
if self.generator:
result = list(result)

correct_array = [4, 5, 6]
correct_array = [1, 3, 5]
if self.ordered:
self.assertEqual(correct_array, result)
else:
self.assertEqual(sorted(correct_array), sorted(result))

def test_one_single(self):
single = 5
result = self.func(add_1, single)
if self.generator:
result = list(result)

correct_array = [6]
if self.ordered:
self.assertEqual(correct_array, result)
else:
self.assertEqual(sorted(correct_array), sorted(result))

def test_one_single_with_num_iter(self):
single = 5
num_iter = 3
result = self.func(add_1, single, num_iter=num_iter)
if self.generator:
result = list(result)

correct_array = [6] * num_iter
if self.ordered:
self.assertEqual(correct_array, result)
else:
self.assertEqual(sorted(correct_array), sorted(result))

def test_two_singles(self):
single_1 = 5
single_2 = -2
result = self.func(add_2, single_1, single_2)
def test_list_and_generator_and_single_equal_length(self):
array = [1, 2, 3]
generator = range(3)
single = -3
result = self.func(partial(add_3, c=single), array, generator)
if self.generator:
result = list(result)

correct_array = [3]
correct_array = [-8, -5, -2]
if self.ordered:
self.assertEqual(correct_array, result)
else:
self.assertEqual(sorted(correct_array), sorted(result))

def test_two_singles_with_num_iter(self):
single_1 = 5
single_2 = -2
num_iter = 3
result = self.func(add_2, single_1, single_2, num_iter=num_iter)
def test_list_and_generator_and_single_unequal_length(self):
array = [1, 2, 3, 4, 5, 6]
generator = range(3)
single = -3
result = self.func(partial(add_3, c=single), array, generator)
if self.generator:
result = list(result)

correct_array = [3] * num_iter
correct_array = [-8, -5, -2]
if self.ordered:
self.assertEqual(correct_array, result)
else:
Expand Down

0 comments on commit eb122f7

Please sign in to comment.