Skip to content

Commit f433965

Browse files
adding static postfix to partitioned exports (#77)
* adding static postfix to partitioned exports * bump version * address review comments
1 parent a7224f4 commit f433965

File tree

8 files changed

+81
-7
lines changed

8 files changed

+81
-7
lines changed

cli-e2e-test/rel/config/config1.rel

+4
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ module export_config
1414
def syntax:header = {
1515
1, :name
1616
}
17+
18+
def partition_size = 512 // defines the upper bound (MB) for the size of a partition
1719
end
1820

1921
module stores_csv
@@ -22,6 +24,8 @@ module export_config
2224
def syntax:header = {
2325
1, :name
2426
}
27+
28+
def partition_size = 512 // defines the upper bound (MB) for the size of a partition
2529
end
2630

2731
module products_csv

workflow/README.md

+6-1
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,12 @@ workflow:
311311

312312
* `data` relation contains the well-defined (:col, key..., val) data to be exported;
313313
* `syntax:header` relation contains the header of the CSV file (Int, Symbol);
314-
* `partition_size` defines max size of the exported files in MB, if above threshold RAI partitions it in multiple files `filename_{part_nr}.csv`
314+
* `partition_size` defines max size of the exported files in MB, if above threshold RAI partitions it in multiple files
315+
`filename_0_{part_nr}.csv`.
316+
317+
Note: RWM puts a static `_0` postfix by default, as otherwise RAI would not add it if there's only one partition.
318+
Therefore, file names will look like `filename_0.csv` in case of a single partition and `filename_0_1.csv`, etc. in
319+
case of multiple partitions. *This only applies to CSV exports to Azure, as local exports can not be partitioned.*
315320

316321
#### Basic exports
317322
The former needs to be used when a simple export config is passed as shown below:

workflow/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
__version_info__ = (0, 0, 43)
15+
__version_info__ = (0, 0, 44)
1616
__version__ = ".".join(map(str, __version_info__))

workflow/common.py

+1
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ class Export:
218218
snapshot_binding: str
219219
container: Container
220220
offset_by_number_of_days: int = 0
221+
is_partitioned: bool = False
221222

222223

223224
@dataclasses.dataclass

workflow/constants.py

+3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323

2424
BLOB_PAGE_SIZE = 500
2525

26+
# query constants
27+
PARTITIONED_EXPORT_POSTFIX = "_0"
28+
2629
# Step types
2730
CONFIGURE_SOURCES = 'ConfigureSources'
2831
INSTALL_MODELS = 'InstallModels'

workflow/executor.py

+13
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,7 @@ def __init__(self, idt, name, type_value, state, timing, engine_size, exports, e
488488

489489
def _execute(self, logger: logging.Logger, env_config: EnvConfig, rai_config: RaiConfig):
490490
exports = list(filter(lambda e: self._should_export(logger, rai_config, env_config, e), self.exports))
491+
self._discover_partitioned(logger, rai_config, env_config, exports)
491492
if self.export_jointly:
492493
exports.sort(key=lambda e: e.container.name)
493494
container_groups = {container_name: list(group) for container_name, group in
@@ -527,6 +528,18 @@ def _should_export(self, logger: logging.Logger, rai_config: RaiConfig, env_conf
527528
f"Skipping export of {export.relation}: defined as a snapshot and the current one is still valid")
528529
return should_export
529530

531+
@staticmethod
532+
def _discover_partitioned(logger: logging.Logger, rai_config: RaiConfig, env_config: EnvConfig,
533+
exports: List[Export]):
534+
logger.info("Identifying partitioned exports...")
535+
rez = rai.execute_query_take_tuples(logger, rai_config, env_config, q.discover_partitioned_exports(exports))
536+
if not rez:
537+
return
538+
# now get the keys of the output dict `rez` and drop the `:` from the start of the key
539+
partitioned_export_names = [k[1:] for k in rez.keys()]
540+
for export in exports:
541+
export.is_partitioned = export.relation in partitioned_export_names
542+
530543

531544
class ExportWorkflowStepFactory(WorkflowStepFactory):
532545

workflow/query.py

+16-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from workflow import utils
88
from workflow.common import FileType, Export, Source, ContainerType, AzureConfig
9-
from workflow.constants import IMPORT_CONFIG_REL, FILE_LOAD_RELATION
9+
from workflow.constants import IMPORT_CONFIG_REL, FILE_LOAD_RELATION, PARTITIONED_EXPORT_POSTFIX
1010

1111
# Static queries
1212
DISABLE_IVM = "def insert:relconfig:disable_ivm = true"
@@ -171,6 +171,15 @@ def output(valid_until) {{
171171
"""
172172

173173

174+
def discover_partitioned_exports(exports: List[Export]) -> str:
175+
query = ""
176+
for export in exports:
177+
query += f"""
178+
def output:{export.relation} = export_config:{export.relation}:partition_size = _
179+
"""
180+
return query
181+
182+
174183
def export_relations_local(logger: logging.Logger, exports: List[Export]) -> str:
175184
query = ""
176185
for export in exports:
@@ -414,7 +423,9 @@ def output:{rel_name}[{key_str}] = csv_string[_export_csv_config:{rel_name}[{key
414423

415424
def _export_relation_as_csv_to_azure(config: AzureConfig, export: Export, end_date: str, date_format: str) -> str:
416425
rel_name = export.relation
417-
export_path = f"{_compose_export_path(config, export, end_date, date_format)}/{rel_name}.csv"
426+
# for partitioned exports we append "_0" to the file name postfix so that we always have a partition number
427+
postfix = PARTITIONED_EXPORT_POSTFIX if export.is_partitioned else ""
428+
export_path = f"{_compose_export_path(config, export, end_date, date_format)}/{rel_name}{postfix}.csv"
418429
return f"""
419430
module _export_csv_config
420431
def {rel_name} = export_config:{rel_name}
@@ -428,6 +439,9 @@ def export:{rel_name} = export_csv[_export_csv_config:{rel_name}]
428439
def _export_meta_relation_as_csv_to_azure(config: AzureConfig, export: Export, end_date: str, date_format: str) -> str:
429440
rel_name = export.relation
430441
postfix = _to_rel_meta_key_as_str(export)
442+
# for partitioned exports we append "_0" to the file name postfix so that we always have a partition number
443+
if export.is_partitioned:
444+
postfix += PARTITIONED_EXPORT_POSTFIX
431445
base_path = _compose_export_path(config, export, end_date, date_format)
432446
export_path = f"{base_path}/{rel_name}_{postfix}.csv"
433447
key_str = _to_rel_meta_key_as_seq(export)

workflow/rai.py

+37-3
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,24 @@ def execute_query_take_single(logger: logging.Logger, rai_config: RaiConfig, env
264264
return rsp.results[0]['table'].to_pydict()["v1"][0]
265265

266266

267+
def execute_query_take_tuples(logger: logging.Logger, rai_config: RaiConfig, env_config: EnvConfig, query: str,
268+
readonly: bool = True, ignore_problems: bool = False) -> any:
269+
"""
270+
Execute query and take the results as an array of tuples.
271+
:param logger: logger
272+
:param rai_config: RAI config
273+
:param env_config: Env config
274+
:param query: Rel query
275+
:param readonly: Parameter to specify transaction type: Read/Write
276+
:param ignore_problems: Ignore SDK problems if any
277+
"""
278+
rsp = execute_query(logger, rai_config, env_config, query, readonly=readonly, ignore_problems=ignore_problems)
279+
if not rsp.results:
280+
logger.debug(f"Query returned no results: {query}")
281+
return {}
282+
return _parse_as_dict(rsp)
283+
284+
267285
def list_transactions(logger: logging.Logger, rai_config: RaiConfig) -> List:
268286
"""
269287
List all transactions for the engine
@@ -322,13 +340,29 @@ def _parse_csv_string(rsp: api.TransactionAsyncResponse) -> Dict:
322340
Parse the output for a csv_string query ie
323341
def output:{rel_name} = csv_string[...]
324342
"""
325-
resp = {}
326343
rel_pattern = r'^/:output/:(.*)/String$'
344+
return _parse_matching_pattern(rsp, rel_pattern)
345+
346+
347+
def _parse_as_dict(rsp: api.TransactionAsyncResponse) -> Dict:
348+
"""
349+
Parse the output as dictionary, everything starting with :output
350+
"""
351+
rel_pattern = r'^/:output/(.*)$'
352+
return _parse_matching_pattern(rsp, rel_pattern)
353+
354+
355+
def _parse_matching_pattern(rsp: api.TransactionAsyncResponse, pattern: str) -> Dict:
356+
resp = {}
327357
for result in rsp.results:
328-
match = re.search(rel_pattern, result['relationId'])
358+
match = re.search(pattern, result['relationId'])
329359
if match:
330360
relation_id = match.group(1)
331-
data = result['table'].to_pydict()["v1"][0]
361+
result_dict = result['table'].to_pydict()
362+
if "v1" in result_dict:
363+
data = result_dict["v1"][0]
364+
else:
365+
data = True # if only relation label present, the value is True
332366
resp[relation_id] = data
333367
return resp
334368

0 commit comments

Comments
 (0)