|
| 1 | +import time |
| 2 | +import logging |
| 3 | +import sys |
| 4 | +import tomli |
| 5 | + |
| 6 | +import cli.args |
| 7 | +import cli.logger |
| 8 | +import workflow.constants |
| 9 | +import workflow.manager |
| 10 | +import workflow.common |
| 11 | +import workflow.utils |
| 12 | +import workflow.executor |
| 13 | + |
| 14 | + |
| 15 | +def start(): |
| 16 | + # parse arguments |
| 17 | + args = cli.args.parse() |
| 18 | + # configure logger |
| 19 | + logger = cli.logger.configure(logging.getLevelName(args.log_level)) |
| 20 | + try: |
| 21 | + with open(args.env_config, "rb") as fp: |
| 22 | + loader_config = tomli.load(fp) |
| 23 | + except OSError as e: |
| 24 | + logger.exception("Failed to load 'loader.toml' config.", e) |
| 25 | + sys.exit(1) |
| 26 | + # init env config |
| 27 | + env_config = workflow.common.EnvConfig.from_env_vars(loader_config) |
| 28 | + # init Workflow resource manager |
| 29 | + loader_config[workflow.constants.RAI_SDK_HTTP_RETRIES] = args.rai_sdk_http_retries |
| 30 | + resource_manager = workflow.manager.ResourceManager.init(logger, args.engine, args.database, loader_config) |
| 31 | + logger.info("Using: " + ",".join(f"{k}={v}" for k, v in vars(args).items())) |
| 32 | + try: |
| 33 | + logger.info(f"Activating batch with config from '{args.batch_config}'") |
| 34 | + start_time = time.time() |
| 35 | + # load batch config as json string |
| 36 | + batch_config_json = workflow.utils.read(args.batch_config) |
| 37 | + # create engine if it doesn't exist |
| 38 | + resource_manager.create_engine(args.engine_size) |
| 39 | + # Skip infrastructure setup during recovery |
| 40 | + if not args.recover and not args.recover_step: |
| 41 | + # Create db and disable IVM in case of enabled flag |
| 42 | + resource_manager.create_database(args.drop_db, args.disable_ivm) |
| 43 | + # Init workflow executor |
| 44 | + parameters = { |
| 45 | + workflow.constants.REL_CONFIG_DIR: args.rel_config_dir, |
| 46 | + workflow.constants.OUTPUT_ROOT: args.output_root, |
| 47 | + workflow.constants.LOCAL_DATA_DIR: args.dev_data_dir, |
| 48 | + workflow.constants.START_DATE: args.start_date, |
| 49 | + workflow.constants.END_DATE: args.end_date, |
| 50 | + workflow.constants.COLLAPSE_PARTITIONS_ON_LOAD: args.collapse_partitions_on_load |
| 51 | + } |
| 52 | + config = workflow.executor.WorkflowConfig(env_config, args.run_mode, |
| 53 | + workflow.common.BatchConfig(args.batch_config_name, |
| 54 | + batch_config_json), |
| 55 | + args.recover, |
| 56 | + args.recover_step, parameters) |
| 57 | + executor = workflow.executor.WorkflowExecutor.init(logger, config, resource_manager) |
| 58 | + end_time = time.time() |
| 59 | + executor.run() |
| 60 | + # Print execution time information |
| 61 | + executor.print_timings() |
| 62 | + logger.info(f"Infrastructure setup time is {workflow.utils.format_duration(end_time - start_time)}") |
| 63 | + |
| 64 | + if args.cleanup_resources: |
| 65 | + resource_manager.cleanup_resources() |
| 66 | + except Exception as e: |
| 67 | + # Cleanup resources in case of any failure. |
| 68 | + logger.exception(e) |
| 69 | + if args.cleanup_resources: |
| 70 | + resource_manager.cleanup_resources() |
| 71 | + sys.exit(1) |
0 commit comments