Skip to content

Commit 7119cb9

Browse files
author
Martin Traverse
authored
Feature / Runtime config plugin (#451)
* Provide a capability to load 3rd party plugins when launching from the CLI * Provide a plugin interface for config loading, and local implementation * Special load method for config plugins in plugin manager * Separate config loading and config parsing using a new config manager * Fix TRAC deprecation warnings * Update unit tests for config loading / parsing * Handle URL schemes for Windows file paths * Fix path resolution in the config manager on Windows * Allow specifying plugin_package as a launch arg for launch_model and launch_job * Test case for external config loader * Fixes for using external config loaders * Fix path resolution in the config manager on Windows * Fix path resolution in the config manager on Windows
1 parent 2ed40f1 commit 7119cb9

File tree

15 files changed

+686
-149
lines changed

15 files changed

+686
-149
lines changed

tracdap-runtime/python/src/tracdap/rt/_exec/dev_mode.py

+9-17
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class DevModeTranslator:
4646
_log: tp.Optional[_util.logging.Logger] = None
4747

4848
@classmethod
49-
def translate_sys_config(cls, sys_config: _cfg.RuntimeConfig, config_dir: tp.Optional[pathlib.Path]):
49+
def translate_sys_config(cls, sys_config: _cfg.RuntimeConfig, config_mgr: _cfg_p.ConfigManager):
5050

5151
cls._log.info(f"Applying dev mode config translation to system config")
5252

@@ -56,7 +56,7 @@ def translate_sys_config(cls, sys_config: _cfg.RuntimeConfig, config_dir: tp.Opt
5656
sys_config.storage = _cfg.StorageConfig()
5757

5858
sys_config = cls._add_integrated_repo(sys_config)
59-
sys_config = cls._resolve_relative_storage_root(sys_config, config_dir)
59+
sys_config = cls._resolve_relative_storage_root(sys_config, config_mgr)
6060

6161
return sys_config
6262

@@ -66,7 +66,7 @@ def translate_job_config(
6666
sys_config: _cfg.RuntimeConfig,
6767
job_config: _cfg.JobConfig,
6868
scratch_dir: pathlib.Path,
69-
config_dir: tp.Optional[pathlib.Path],
69+
config_mgr: _cfg_p.ConfigManager,
7070
model_class: tp.Optional[_api.TracModel.__class__]) \
7171
-> _cfg.JobConfig:
7272

@@ -84,7 +84,7 @@ def translate_job_config(
8484

8585
# Fow flows, load external flow definitions then perform auto-wiring and type inference
8686
if job_config.job.jobType == _meta.JobType.RUN_FLOW:
87-
job_config = cls._process_flow_definition(job_config, config_dir)
87+
job_config = cls._process_flow_definition(job_config, config_mgr)
8888

8989
# For run (model|flow) jobs, apply processing to the parameters, inputs and outputs
9090
if job_config.job.jobType in [_meta.JobType.RUN_MODEL, _meta.JobType.RUN_FLOW]:
@@ -109,7 +109,7 @@ def _add_integrated_repo(cls, sys_config: _cfg.RuntimeConfig) -> _cfg.RuntimeCon
109109
@classmethod
110110
def _resolve_relative_storage_root(
111111
cls, sys_config: _cfg.RuntimeConfig,
112-
sys_config_path: tp.Optional[pathlib.Path]):
112+
config_mgr: _cfg_p.ConfigManager):
113113

114114
storage_config = copy.deepcopy(sys_config.storage)
115115

@@ -128,6 +128,7 @@ def _resolve_relative_storage_root(
128128

129129
cls._log.info(f"Resolving relative path for [{bucket_key}] local storage...")
130130

131+
sys_config_path = config_mgr.config_dir_path()
131132
if sys_config_path is not None:
132133
absolute_path = sys_config_path.joinpath(root_path).resolve()
133134
if absolute_path.exists():
@@ -291,7 +292,7 @@ def _generate_model_for_entry_point(
291292
return model_id, model_object
292293

293294
@classmethod
294-
def _process_flow_definition(cls, job_config: _cfg.JobConfig, config_dir: pathlib.Path) -> _cfg.JobConfig:
295+
def _process_flow_definition(cls, job_config: _cfg.JobConfig, config_mgr: _cfg_p.ConfigManager) -> _cfg.JobConfig:
295296

296297
flow_details = job_config.job.runFlow.flow
297298

@@ -305,21 +306,12 @@ def _process_flow_definition(cls, job_config: _cfg.JobConfig, config_dir: pathli
305306
cls._log.error(err)
306307
raise _ex.EConfigParse(err)
307308

308-
flow_path = config_dir.joinpath(flow_details) if config_dir is not None else pathlib.Path(flow_details)
309-
310-
if not flow_path.exists():
311-
err = f"Flow definition not available for [{flow_details}]: File not found ({flow_path})"
312-
cls._log.error(err)
313-
raise _ex.EConfigParse(err)
314-
315309
flow_id = _util.new_object_id(_meta.ObjectType.FLOW)
316310
flow_key = _util.object_key(flow_id)
317311

318-
cls._log.info(f"Generating flow definition for [{flow_details}] with ID = [{flow_key}]")
312+
cls._log.info(f"Generating flow definition from [{flow_details}] with ID = [{flow_key}]")
319313

320-
flow_parser = _cfg_p.ConfigParser(_meta.FlowDefinition)
321-
flow_raw_data = flow_parser.load_raw_config(flow_path, flow_path.name)
322-
flow_def = flow_parser.parse(flow_raw_data, flow_path.name)
314+
flow_def = config_mgr.load_config_object(flow_details, _meta.FlowDefinition)
323315

324316
# Auto-wiring and inference only applied to externally loaded flows for now
325317
flow_def = cls._autowire_flow(flow_def, job_config)

tracdap-runtime/python/src/tracdap/rt/_exec/runtime.py

+22-15
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ def __init__(
6464
job_result_format: tp.Optional[str] = None,
6565
scratch_dir: tp.Union[str, pathlib.Path, None] = None,
6666
scratch_dir_persist: bool = False,
67+
plugin_packages: tp.List[str] = None,
6768
dev_mode: bool = False):
6869

6970
trac_version = _version.__version__
@@ -86,12 +87,13 @@ def __init__(
8687
self._log.info(f"TRAC D.A.P. Python Runtime {trac_version}")
8788

8889
self._sys_config = sys_config if isinstance(sys_config, _cfg.RuntimeConfig) else None
89-
self._sys_config_path = pathlib.Path(sys_config) if not self._sys_config else None
90+
self._sys_config_path = sys_config if not self._sys_config else None
9091
self._job_result_dir = job_result_dir
9192
self._job_result_format = job_result_format
9293
self._scratch_dir = scratch_dir
9394
self._scratch_dir_provided = True if scratch_dir is not None else False
9495
self._scratch_dir_persist = scratch_dir_persist
96+
self._plugin_packages = plugin_packages or []
9597
self._dev_mode = dev_mode
9698

9799
# Runtime control
@@ -102,6 +104,7 @@ def __init__(
102104
self._oneshot_job = None
103105

104106
# Top level resources
107+
self._config_mgr: tp.Optional[_cparse.ConfigManager] = None
105108
self._models: tp.Optional[_models.ModelLoader] = None
106109
self._storage: tp.Optional[_storage.StorageManager] = None
107110

@@ -141,30 +144,36 @@ def pre_start(self):
141144

142145
self._prepare_scratch_dir()
143146

144-
# Plugin manager and static API impl are singletons
145-
# If these methods are called multiple times, the second and subsequent calls are ignored
147+
# Plugin manager, static API and guard rails are singletons
148+
# Calling these methods multiple times is safe (e.g. for embedded or testing scenarios)
149+
# However, plugins are never un-registered for the lifetime of the processes
146150

147151
_plugins.PluginManager.register_core_plugins()
152+
153+
for plugin_package in self._plugin_packages:
154+
_plugins.PluginManager.register_plugin_package(plugin_package)
155+
148156
_static_api.StaticApiImpl.register_impl()
149157
_guard.PythonGuardRails.protect_dangerous_functions()
150158

151159
# Load sys config (or use embedded), config errors are detected before start()
152160
# Job config can also be checked before start() by using load_job_config()
153161

162+
self._config_mgr = _cparse.ConfigManager.for_root_config(self._sys_config_path)
163+
154164
if self._sys_config is None:
155165
sys_config_dev_mode = _dev_mode.DEV_MODE_SYS_CONFIG if self._dev_mode else None
156-
sys_config_parser = _cparse.ConfigParser(_cfg.RuntimeConfig, sys_config_dev_mode)
157-
sys_config_raw = sys_config_parser.load_raw_config(self._sys_config_path, config_file_name="system")
158-
self._sys_config = sys_config_parser.parse(sys_config_raw, self._sys_config_path)
166+
self._sys_config = self._config_mgr.load_root_object(
167+
_cfg.RuntimeConfig, sys_config_dev_mode,
168+
config_file_name="system")
159169
else:
160170
self._log.info("Using embedded system config")
161171

162172
# Dev mode translation is controlled by the dev mode flag
163173
# I.e. it can be applied to embedded configs
164174

165175
if self._dev_mode:
166-
config_dir = self._sys_config_path.parent if self._sys_config_path is not None else None
167-
self._sys_config = _dev_mode.DevModeTranslator.translate_sys_config(self._sys_config, config_dir)
176+
self._sys_config = _dev_mode.DevModeTranslator.translate_sys_config(self._sys_config, self._config_mgr)
168177

169178
# Runtime API server is controlled by the sys config
170179

@@ -311,20 +320,18 @@ def load_job_config(
311320

312321
if isinstance(job_config, _cfg.JobConfig):
313322
self._log.info("Using embedded job config")
314-
job_config_path = None
315323

316324
else:
317-
job_config_path = job_config
318325
job_config_dev_mode = _dev_mode.DEV_MODE_JOB_CONFIG if self._dev_mode else None
319-
job_config_parser = _cparse.ConfigParser(_cfg.JobConfig, job_config_dev_mode)
320-
job_config_raw = job_config_parser.load_raw_config(job_config_path, config_file_name="job")
321-
job_config = job_config_parser.parse(job_config_raw, job_config_path)
326+
job_config = self._config_mgr.load_config_object(
327+
job_config, _cfg.JobConfig,
328+
job_config_dev_mode,
329+
config_file_name="job")
322330

323331
if self._dev_mode:
324-
config_dir = job_config_path.parent if job_config_path is not None else None
325332
job_config = _dev_mode.DevModeTranslator.translate_job_config(
326333
self._sys_config, job_config,
327-
self._scratch_dir, config_dir,
334+
self._scratch_dir, self._config_mgr,
328335
model_class)
329336

330337
return job_config

0 commit comments

Comments
 (0)