Skip to content

Commit

Permalink
feat!: make test concept tighter to pipeline main func, improve cli f…
Browse files Browse the repository at this point in the history
…or pipe and list
  • Loading branch information
z3z1ma committed Aug 19, 2024
1 parent cb259a6 commit a9a4c2a
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 38 deletions.
20 changes: 9 additions & 11 deletions src/cdf/core/component/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,28 @@ class DataPipeline(
t.Callable[..., "LoadInfo"],
t.Callable[..., t.Iterator["LoadInfo"]],
],
t.List[t.Callable[..., None]],
]
],
frozen=True,
):
"""A data pipeline which loads data from a source to a destination."""

integration_test: t.Optional[t.Callable[..., bool]] = None
"""A function to test the pipeline in an integration environment"""

@pydantic.field_validator("integration_test", mode="before")
@classmethod
def _bind_ancillary(cls, value: t.Any, info: pydantic.ValidationInfo) -> t.Any:
"""Bind the active workspace to the ancillary functions."""
return _get_bind_func(info)(_unwrap_entrypoint(value))

def __call__(self, *args: t.Any, **kwargs: t.Any) -> t.List["LoadInfo"]:
"""Run the data pipeline"""
_, runner = self.main(*args, **kwargs)
_, runner, _ = self.main(*args, **kwargs)
if inspect.isgeneratorfunction(runner):
return list(runner())
return [t.cast("LoadInfo", runner())]

def get_schemas(self, destination: t.Optional["DltDestination"] = None):
"""Get the schemas for the pipeline."""
pipeline, _ = self.main()
pipeline, _, _ = self.main()
pipeline.sync_destination(destination=destination)
return pipeline.schemas

def run_tests(self) -> None:
"""Run the integration test for the pipeline."""
_, _, tests = self.main()
for test in tests:
test()
62 changes: 35 additions & 27 deletions src/cdf/core/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,19 @@ def cli() -> None:
"""A dynamically generated CLI for the workspace."""
self.activate()

def _list(d: t.Dict[str, cmp.TComponent]) -> int:
def _list(d: t.Dict[str, cmp.TComponent], verbose: bool = False) -> None:
for name in sorted(d.keys()):
click.echo(name)
return 1
if verbose:
click.echo(d[name].model_dump_json(indent=2, exclude={"main"}))
else:
click.echo(d[name])

cli.command("list-services")(lambda: _list(self.services))
cli.command("list-pipelines")(lambda: _list(self.pipelines))
cli.command("list-publishers")(lambda: _list(self.publishers))
cli.command("list-operations")(lambda: _list(self.operations))
for k in ("services", "pipelines", "publishers", "operations"):
cli.command(f"list-{k}")(
click.option("-v", "--verbose", is_flag=True)(
lambda verbose=False, k=k: _list(getattr(self, k), verbose=verbose)
)
)

@cli.command("run-pipeline")
@click.argument(
Expand All @@ -218,46 +222,51 @@ def _list(d: t.Dict[str, cmp.TComponent]) -> int:
)
@click.pass_context
def run_pipeline(
ctx: click.Context, pipeline: t.Optional[str] = None, test: bool = False
ctx: click.Context,
pipeline_name: t.Optional[str] = None,
test: bool = False,
) -> None:
"""Run a data pipeline."""
# Prompt for a pipeline if not specified
if pipeline is None:
pipeline = click.prompt(
if pipeline_name is None:
pipeline_name = click.prompt(
"Enter a pipeline",
type=click.Choice(list(self.pipelines.keys())),
show_choices=True,
)
if pipeline is None:
if pipeline_name is None:
raise click.BadParameter(
"Pipeline must be specified.", ctx=ctx, param_hint="pipeline"
)

# Get the pipeline definition
pipeline_definition = self.pipelines[pipeline]
pipeline = self.pipelines[pipeline_name]

# Run the integration test if specified
if test:
if not pipeline_definition.integration_test:
raise click.UsageError(
f"Pipeline `{pipeline}` does not define an integration test."
)
click.echo("Running integration test.", err=True)
if pipeline_definition.integration_test():
click.echo("Running pipeline tests.", err=True)
try:
pipeline.run_tests()
except Exception as e:
click.echo(f"Pipeline test failed: {e}", err=True)
ctx.exit(1)
else:
click.echo("Integration test passed.", err=True)
ctx.exit(0)
else:
ctx.fail("Integration test failed.")

# Run the pipeline
start = time.time()
jobs = pipeline_definition()
try:
jobs = pipeline()
except Exception as e:
click.echo(
f"Pipeline failed after {time.time() - start:.2f} seconds: {e}",
err=True,
)
ctx.exit(1)

click.echo(
f"Pipeline process finished in {time.time() - start:.2f} seconds.",
err=True,
)

# Check for failed jobs
for job in jobs:
if job.has_failed_jobs:
ctx.fail("Pipeline failed.")
Expand Down Expand Up @@ -402,7 +411,7 @@ def run():
)
return load

return pipeline, run
return pipeline, run, []

# Switch statement on environment
# to scaffold a FF provider, which is hereforward dictated by the user
Expand Down Expand Up @@ -453,7 +462,6 @@ def run():
pipeline_definitions=[
cmp.DataPipeline(
main=test_pipeline,
integration_test=lambda: True,
name="exchangerate_pipeline",
owner="Alex",
description="A test pipeline",
Expand Down

0 comments on commit a9a4c2a

Please sign in to comment.