Skip to content

Commit 8530d11

Browse files
RWM is unable to process larger input files split into chunks (#53)
* RWM is unable to process larger input files split into chunks
1 parent e49762c commit 8530d11

File tree

5 files changed

+23
-16
lines changed

5 files changed

+23
-16
lines changed

cli/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ This Command-Line Interface (CLI) is designed to provide an easy and interactive
66
1. Create a batch configuration (ex. `poc.json`) file using the syntax and structure outlined in the [RAI Workflow Framework README](../workflow/README.md).
77
2. Add `rai-workflow-manager` as dependency to your `requirements.txt` file:
88
```txt
9-
rai-workflow-manager==0.0.28
9+
rai-workflow-manager==0.0.29
1010
```
1111
3. Build the project:
1212
```bash

workflow/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
__version_info__ = (0, 0, 28)
15+
__version_info__ = (0, 0, 29)
1616
__version__ = ".".join(map(str, __version_info__))

workflow/executor.py

+15-8
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ def _execute(self, logger: logging.Logger, env_config: EnvConfig, rai_config: Ra
351351
def await_pending(self, env_config, logger, missed_resources):
352352
loop = asyncio.get_event_loop()
353353
if loop.is_running():
354-
raise Exception('Waiting for resource would interrupt unexpected event loop - aborting to avoid confusion.')
354+
raise Exception('Waiting for resource would interrupt unexpected event loop - aborting to avoid confusion')
355355
pending = [src for src in missed_resources if self._resource_is_async(src)]
356356
pending_cos = [self._await_async_resource(logger, env_config, resource) for resource in pending]
357357
loop.run_until_complete(asyncio.gather(*pending_cos))
@@ -365,28 +365,35 @@ async def _await_async_resource(self, logger: logging.Logger, env_config: EnvCon
365365
def _load_source(self, logger: logging.Logger, env_config: EnvConfig, rai_config: RaiConfig, src):
366366
source_name = src["source"]
367367
if 'is_date_partitioned' in src and src['is_date_partitioned'] == 'Y':
368+
logger.info(f"Loading source '{source_name}' partitioned by date")
368369
if self.collapse_partitions_on_load:
369370
srcs = src["dates"]
370371
first_date = srcs[0]["date"]
371372
last_date = srcs[-1]["date"]
372373

373374
logger.info(
374-
f"Loading '{source_name}' from all partitions simultaneously, range {first_date} to {last_date}")
375+
f"Loading '{source_name}' all date partitions simultaneously, range {first_date} to {last_date}")
375376

376377
resources = []
377378
for d in srcs:
378379
resources += d["resources"]
379380
self._load_resource(logger, env_config, rai_config, resources, src)
380381
else:
381-
logger.info(f"Loading '{source_name}' one partition at a time")
382+
logger.info(f"Loading '{source_name}' one date partition at a time")
382383
for d in src["dates"]:
383384
logger.info(f"Loading partition for date {d['date']}")
384385

385-
resources = d["resources"]
386-
self._load_resource(logger, env_config, rai_config, resources, src)
386+
for res in d["resources"]:
387+
self._load_resource(logger, env_config, rai_config, [res], src)
387388
else:
388-
logger.info(f"Loading source '{source_name}' not partitioned by date ")
389-
self._load_resource(logger, env_config, rai_config, src["resources"], src)
389+
logger.info(f"Loading source '{source_name}' not partitioned by date")
390+
if self.collapse_partitions_on_load:
391+
logger.info(f"Loading '{source_name}' all chunk partitions simultaneously")
392+
self._load_resource(logger, env_config, rai_config, src["resources"], src)
393+
else:
394+
logger.info(f"Loading '{source_name}' one chunk partition at a time")
395+
for res in src["resources"]:
396+
self._load_resource(logger, env_config, rai_config, [res], src)
390397

391398
@staticmethod
392399
def _resource_is_async(src):
@@ -539,7 +546,7 @@ def _load_exports(logger: logging.Logger, env_config: EnvConfig, src) -> List[Ex
539546
container=env_config.get_container(e.get("container", default_container)),
540547
offset_by_number_of_days=e.get("offsetByNumberOfDays", 0)))
541548
except KeyError as ex:
542-
logger.warning(f"Unsupported FileType: {ex}. Skipping export: {e}.")
549+
logger.warning(f"Unsupported FileType: {ex}. Skipping export: {e}")
543550
return exports
544551

545552

workflow/manager.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ def add_engine(self, size: str = "XS") -> None:
8787
:param size: RAI engine size
8888
:return:
8989
"""
90-
self.__logger.info(f"Trying to add engine with `{size}` size to manager.")
90+
self.__logger.info(f"Trying to add engine with `{size}` size to manager")
9191
config = self.get_rai_config(size)
9292
if self.__engines:
9393
if size in self.__engines:
@@ -108,7 +108,7 @@ def remove_engine(self, size: str = "XS") -> None:
108108
:param size: RAI engine size
109109
:return:
110110
"""
111-
self.__logger.info(f"Trying to remove engine with `{size}` size from manager.")
111+
self.__logger.info(f"Trying to remove engine with `{size}` size from manager")
112112
if size not in self.__engines:
113113
self.__logger.info(f"`{size}` isn't managed. Ignore deletion")
114114
else:
@@ -128,7 +128,7 @@ def provision_engine(self, size: str) -> None:
128128
:param size: RAI engine size
129129
:return:
130130
"""
131-
self.__logger.info(f"Trying to provision engine with `{size}` size and add to manager.")
131+
self.__logger.info(f"Trying to provision engine with `{size}` size and add to manager")
132132
config = self.get_rai_config(size)
133133
if self.__engines:
134134
if size in self.__engines:
@@ -141,7 +141,7 @@ def provision_engine(self, size: str) -> None:
141141
self.__create_engine(config, size)
142142
self.__engines[size] = EngineMetaInfo(config.engine, size, False)
143143
else:
144-
self.__logger.info(f"Provision engine `{config.engine}` as default.")
144+
self.__logger.info(f"Provision engine `{config.engine}` as default")
145145
self.__recreate_engine(config, size)
146146
self.__engines = {size: EngineMetaInfo(config.engine, size, True)}
147147

workflow/query.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -151,10 +151,10 @@ def load_resources(logger: logging.Logger, config: AzureConfig, resources, src)
151151
logger.error(f"Unknown file type {file_stype_str}")
152152
else:
153153
if src_type == ContainerType.LOCAL:
154-
logger.info("Loading from local file.")
154+
logger.info("Loading from local file")
155155
return _local_load_simple_query(rel_name, resources[0]["uri"], file_type)
156156
elif src_type == ContainerType.AZURE:
157-
logger.info("Loading from Azure file.")
157+
logger.info("Loading from Azure file")
158158
return _azure_load_simple_query(rel_name, resources[0]["uri"], file_type, config)
159159

160160

0 commit comments

Comments
 (0)