Skip to content

Commit a8be2e3

Browse files
authored
20998: Adds series IFA multiprocessing (#297)
1 parent d0b5109 commit a8be2e3

File tree

2 files changed

+62
-4
lines changed

2 files changed

+62
-4
lines changed

howso/utilities/feature_attributes/infer_feature_attributes.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,8 @@ def infer_feature_attributes(data: pd.DataFrame | SQLRelationalDatastoreProtocol
344344
If unset or set to None (recommended), let the ProcessPoolExecutor
345345
choose the best maximum number of process pool workers to process
346346
columns in a multi-process fashion. In this case, if the product of the
347-
data's rows and columns < 25,000,000, multiprocessing will not be used.
347+
data's rows and columns > 25,000,000 or if the data is time series and the
348+
number of rows > 500,000 multiprocessing will be used.
348349
349350
If defined with an integer > 0, manually set the number of max workers.
350351
Otherwise, the feature attributes will be calculated serially. Setting

howso/utilities/feature_attributes/time_series.py

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
1+
from concurrent.futures import (
2+
as_completed,
3+
Future,
4+
ProcessPoolExecutor,
5+
)
16
import logging
27
from math import e, isnan
8+
import multiprocessing as mp
9+
import os
310
from typing import (
411
Dict, Iterable, Optional, Union
512
)
@@ -18,6 +25,11 @@
1825
SMALLEST_TIME_DELTA = 0.001
1926

2027

28+
def _apply_chunks_shard(df: pd.DataFrame, feature_name: str, dt_format: str):
29+
"""Internal function to aid multiprocessing of series feature attributes."""
30+
return df[feature_name].apply(lambda x: date_to_epoch(x, dt_format))
31+
32+
2133
class InferFeatureAttributesTimeSeries:
2234
"""Infer feature attributes for time series data."""
2335

@@ -35,6 +47,7 @@ def _infer_delta_min_and_max( # noqa: C901
3547
id_feature_name: Optional[Union[str, Iterable[str]]] = None,
3648
orders_of_derivatives: Optional[Dict] = None,
3749
derived_orders: Optional[Dict] = None,
50+
max_workers: Optional[int] = None
3851
) -> Dict:
3952
"""
4053
Infer continuous feature delta_min, delta_max for each feature.
@@ -79,6 +92,17 @@ def _infer_delta_min_and_max( # noqa: C901
7992
to 2 will synthesize the 3rd order derivative value, and then use
8093
that synthed value to derive the 2nd and 1st order.
8194
95+
max_workers: int, optional
96+
If unset or set to None (recommended), let the ProcessPoolExecutor
97+
choose the best maximum number of process pool workers to process
98+
columns in a multi-process fashion. In this case, if the product of the
99+
data's rows and columns > 25,000,000 or the number of rows > 500,000
100+
multiprocessing will used.
101+
102+
If defined with an integer > 0, manually set the number of max workers.
103+
Otherwise, the feature attributes will be calculated serially. Setting
104+
this parameter to zero (0) will disable multiprocessing.
105+
82106
Returns
83107
-------
84108
features : dict
@@ -134,8 +158,39 @@ def _infer_delta_min_and_max( # noqa: C901
134158
df_c = self.data.loc[:, [f_name]]
135159

136160
# convert time feature to epoch
137-
df_c[f_name] = df_c[f_name].apply(
138-
lambda x: date_to_epoch(x, dt_format))
161+
if len(df_c) < 500_000 and max_workers is None:
162+
max_workers = 0
163+
if max_workers is None or max_workers >= 1:
164+
if max_workers is None:
165+
max_workers = os.cpu_count()
166+
mp_context = mp.get_context("spawn")
167+
futures: dict[Future, str] = dict()
168+
169+
with ProcessPoolExecutor(max_workers=max_workers, mp_context=mp_context) as pool:
170+
df_chunks = np.array_split(df_c, max_workers)
171+
for chunk in df_chunks:
172+
future = pool.submit(
173+
_apply_chunks_shard,
174+
df=chunk,
175+
feature_name=f_name,
176+
dt_format=dt_format
177+
)
178+
futures[future] = f_name
179+
180+
temp_results = []
181+
for future in as_completed(futures):
182+
try:
183+
response = future.result()
184+
temp_results.append(response)
185+
except Exception as exception:
186+
warnings.warn(
187+
f"Infer_feature_attributes raised an exception "
188+
f"while processing '{futures[future]}' ({str(exception)})."
189+
)
190+
191+
df_c[f_name] = pd.concat(temp_results)
192+
else:
193+
df_c[f_name] = df_c[f_name].apply(lambda x: date_to_epoch(x, dt_format))
139194

140195
# use Pandas' diff() to pull all the deltas for this feature
141196
if isinstance(id_feature_name, list):
@@ -578,6 +633,7 @@ def _process( # noqa: C901
578633
include_sample=include_sample,
579634
tight_bounds=set(tight_bounds) if tight_bounds else None,
580635
mode_bound_features=mode_bound_features,
636+
max_workers=max_workers
581637
)
582638

583639
# Add any features with unsupported data to this object's list
@@ -678,7 +734,8 @@ def _process( # noqa: C901
678734
datetime_feature_formats=datetime_feature_formats,
679735
id_feature_name=id_feature_name,
680736
orders_of_derivatives=orders_of_derivatives,
681-
derived_orders=derived_orders
737+
derived_orders=derived_orders,
738+
max_workers=max_workers
682739
)
683740

684741
# Set any manually specified rate/delta boundaries

0 commit comments

Comments
 (0)