Skip to content

Commit 6cff5fa

Browse files
RWM failed to load CSV files with json content using local container (#57)
1 parent 63a5e00 commit 6cff5fa

File tree

4 files changed

+25
-24
lines changed

4 files changed

+25
-24
lines changed

cli/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ data_path="./data"
9898
| When loading each multi-part source, <br/>load all partitions (and shards) in one transaction | `--collapse-partitions-on-load` | `False` | `True` | `BooleanOptionalAction` | `True` - `--collapse-partitions-on-load`, `False` - `--no-collapse-partitions-on-load`, no argument - default value |
9999
| Logging level for cli | `--log-level` | `False` | `INFO` | `String` | `['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']` |
100100
| Log rotation option. If `date` options is enabled RWM rotates logs each day. If `size` option is enabled RWM rotates log file when it reaches this size | `--log-rotation` | `False` | `date` | `String` | `['date', 'size']` |
101-
| Rotation log file size in Mb. RWM rotates log file when it reaches this size and `--log-rotation` is `size | `--log-file-size` | `False` | `5` | `Int` | |
101+
| Rotation log file size in Mb. RWM rotates log file when it reaches this size and `--log-rotation` is `size` | `--log-file-size` | `False` | `5` | `Int` | |
102102
| Drop database before workflow run, or not | `--drop-db` | `False` | `False` | `BooleanOptionalAction` | `True` - `--drop-db`, `False` - `--no-drop-db`, no argument - default value |
103103
| Remove RAI engine and database after run or not | `--cleanup-resources` | `False` | `False` | `BooleanOptionalAction` | `True` - `--cleanup-resources`, `False` - `--no-cleanup-resources`, no argument - default value |
104104
| Remove RAI engine after run or not | `--cleanup-engine` | `False` | `False` | `BooleanOptionalAction` | `True` - `--cleanup-engine`, `False` - `--no-cleanup-engine`, no argument - default value |

workflow/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
__version_info__ = (0, 0, 30)
15+
__version_info__ = (0, 0, 31)
1616
__version__ = ".".join(map(str, __version_info__))

workflow/executor.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,8 @@ def _load_resource(logger: logging.Logger, env_config: EnvConfig, rai_config: Ra
405405
container = env_config.get_container(src["container"])
406406
config = EnvConfig.get_config(container)
407407
if ContainerType.LOCAL == container.type or ContainerType.AZURE == container.type:
408-
rai.execute_query(logger, rai_config, env_config, q.load_resources(logger, config, resources, src),
408+
query_with_input = q.load_resources(logger, config, resources, src)
409+
rai.execute_query(logger, rai_config, env_config, query_with_input.query, query_with_input.inputs,
409410
readonly=False)
410411
elif ContainerType.SNOWFLAKE == container.type:
411412
snow.begin_data_sync(logger, config, rai_config, resources, src)

workflow/query.py

+21-21
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ def insert:resources_data_to_delete = resources_to_delete
132132
"""
133133

134134

135-
def load_resources(logger: logging.Logger, config: AzureConfig, resources, src) -> str:
135+
def load_resources(logger: logging.Logger, config: AzureConfig, resources, src) -> QueryWithInputs:
136136
rel_name = src["source"]
137137

138138
file_stype_str = src["file_type"]
@@ -146,7 +146,7 @@ def load_resources(logger: logging.Logger, config: AzureConfig, resources, src)
146146
return _local_load_multipart_query(rel_name, file_type, resources)
147147
elif src_type == ContainerType.AZURE:
148148
logger.info(f"Loading {len(resources)} shards from Azure files")
149-
return _azure_load_multipart_query(rel_name, file_type, resources, config)
149+
return QueryWithInputs(_azure_load_multipart_query(rel_name, file_type, resources, config), {})
150150
else:
151151
logger.error(f"Unknown file type {file_stype_str}")
152152
else:
@@ -155,7 +155,7 @@ def load_resources(logger: logging.Logger, config: AzureConfig, resources, src)
155155
return _local_load_simple_query(rel_name, resources[0]["uri"], file_type)
156156
elif src_type == ContainerType.AZURE:
157157
logger.info("Loading from Azure file")
158-
return _azure_load_simple_query(rel_name, resources[0]["uri"], file_type, config)
158+
return QueryWithInputs(_azure_load_simple_query(rel_name, resources[0]["uri"], file_type, config), {})
159159

160160

161161
def get_snapshot_expiration_date(snapshot_binding: str, date_format: str) -> str:
@@ -257,13 +257,13 @@ def output_json(relation: str) -> str:
257257
return f"def output = json_string[{relation}]"
258258

259259

260-
def _local_load_simple_query(rel_name: str, uri: str, file_type: FileType) -> str:
260+
def _local_load_simple_query(rel_name: str, uri: str, file_type: FileType) -> QueryWithInputs:
261261
try:
262262
raw_data_rel_name = f"{rel_name}_data"
263263
data = utils.read(uri)
264-
return f"{_load_from_literal(data, raw_data_rel_name)}\n" \
265-
f"def {IMPORT_CONFIG_REL}:{rel_name}:data = {raw_data_rel_name}\n" \
266-
f"{_simple_insert_query(rel_name, file_type)}\n"
264+
query = f"def {IMPORT_CONFIG_REL}:{rel_name}:data = {raw_data_rel_name}\n" \
265+
f"{_simple_insert_query(rel_name, file_type)}\n"
266+
return QueryWithInputs(query, {raw_data_rel_name: data})
267267
except OSError as e:
268268
raise e
269269

@@ -275,16 +275,18 @@ def _azure_load_simple_query(rel_name: str, uri: str, file_type: FileType, confi
275275
f"{_simple_insert_query(rel_name, file_type)}"
276276

277277

278-
def _local_load_multipart_query(rel_name: str, file_type: FileType, parts) -> str:
278+
def _local_load_multipart_query(rel_name: str, file_type: FileType, parts) -> QueryWithInputs:
279279
raw_data_rel_name = f"{rel_name}_data"
280280

281281
raw_text = ""
282282
part_indexes = ""
283+
inputs = {}
283284
for part in parts:
284285
try:
285286
part_idx = part["part_index"]
286287
data = utils.read(part["uri"])
287-
raw_text += _load_from_indexed_literal(data, raw_data_rel_name, part_idx)
288+
inputs[_indexed_literal(raw_data_rel_name, part_idx)] = data
289+
raw_text += _load_from_indexed_literal(raw_data_rel_name, part_idx)
288290
part_indexes += f"{part_idx}\n"
289291
except OSError as e:
290292
raise e
@@ -293,10 +295,12 @@ def _local_load_multipart_query(rel_name: str, file_type: FileType, parts) -> st
293295
load_config = _multi_part_load_config_query(rel_name, file_type,
294296
_local_multipart_config_integration(raw_data_rel_name))
295297

296-
return f"{_part_index_relation(part_indexes)}\n" \
297-
f"{raw_text}\n" \
298-
f"{load_config}\n" \
299-
f"{insert_text}"
298+
query = f"{_part_index_relation(part_indexes)}\n" \
299+
f"{raw_text}\n" \
300+
f"{load_config}\n" \
301+
f"{insert_text}"
302+
303+
return QueryWithInputs(query, inputs)
300304

301305

302306
def _azure_load_multipart_query(rel_name: str, file_type: FileType, parts, config: AzureConfig) -> str:
@@ -356,16 +360,12 @@ def _simple_insert_query(rel_name: str, file_type: FileType) -> str:
356360
return f"def insert:simple_source_catalog:{rel_name} = {FILE_LOAD_RELATION[file_type]}[{IMPORT_CONFIG_REL}:{rel_name}]"
357361

358362

359-
def _load_from_indexed_literal(data: str, raw_data_rel_name: str, index: int) -> str:
360-
return f"def {raw_data_rel_name}[{index}] =\n" \
361-
f"raw\"\"\"{data}" \
362-
f"\"\"\""
363+
def _load_from_indexed_literal(raw_data_rel_name: str, index: int) -> str:
364+
return f"def {raw_data_rel_name}[{index}] = {_indexed_literal(raw_data_rel_name, index)}\n"
363365

364366

365-
def _load_from_literal(data: str, raw_data_rel_name: str) -> str:
366-
return f"def {raw_data_rel_name} =\n" \
367-
f"raw\"\"\"{data}" \
368-
f"\"\"\""
367+
def _indexed_literal(raw_data_rel_name: str, index: int) -> str:
368+
return f"{raw_data_rel_name}_{index}"
369369

370370

371371
def _config_rel_name(rel: str) -> str: return f"load_{rel}_config"

0 commit comments

Comments
 (0)