diff --git a/Makefile b/Makefile index d14227b..78228ad 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,6 @@ update-docs: @echo "Updating docs..." - @typer src/cdf/cli.py utils docs --name=cdf >docs/cli_reference.md @pydoc-markdown -I src/cdf >docs/api_reference.md @echo "Done." diff --git a/docs/api_reference.md b/docs/api_reference.md index 8f7ca0a..0dc9ae9 100644 --- a/docs/api_reference.md +++ b/docs/api_reference.md @@ -2,791 +2,431 @@ # \_\_init\_\_ - - -#### find\_nearest + -```python -@M.result -def find_nearest(path: PathLike = ".") -> Project -``` +# types -Find the nearest project. +A module for shared types. -Recursively searches for a project file in the parent directories. + -**Arguments**: +# types.monads -- `path` _PathLike, optional_ - The path to start searching from. Defaults to ".". - +Contains monadic types and functions for working with them. -**Raises**: + -- `FileNotFoundError` - If no project is found. - +#### T -**Returns**: +The type of the value inside the Monad -- `Project` - The nearest project. + - +#### U -#### is\_main +The transformed type of the value inside the Monad -```python -def is_main(module_name: t.Optional[str] = None) -> bool -``` + -Check if the current module is being run as the main program in cdf context. +#### K -Also injects a hook in debug mode to allow dropping into user code via pdb. +A known type that is not necessarily the same as T -**Arguments**: + -- `module_name` _str, optional_ - The name of the module to check. If None, the calling module is - checked. The most idiomatic usage is to pass `__name__` to check the current module. - +#### L -**Returns**: +A known type that is not necessarily the same as U -- `bool` - True if the current module is the main program in cdf context. + - +#### E -#### get\_active\_project +The type of the error inside the Result -```python -def get_active_project() -> Project -``` + -Get the active project. +#### TState -**Raises**: +The type of the state -- `ValueError` - If no valid project is found in the context. - + -**Returns**: +#### TMonad -- `Project` - The active project. +Generic Self type for Monad - + -#### get\_workspace +## Maybe Objects ```python -def get_workspace(path: PathLike = ".") -> M.Result[Workspace, Exception] +class Maybe(Monad[T], abc.ABC) ``` -Get a workspace from a path. - -**Arguments**: - -- `path` _PathLike, optional_ - The path to get the workspace from. Defaults to ".". - - -**Returns**: - - M.Result[Workspace, Exception]: The workspace or an error. - - - -#### transform\_gateway - -Gateway configuration for transforms. - - + -#### transform\_connection +#### pure ```python -def transform_connection(type_: str, **kwargs) -> ConnectionConfig +@classmethod +def pure(cls, value: K) -> "Maybe[K]" ``` -Create a connection configuration for transforms. +Creates a Maybe with a value. - + -#### run\_script +#### unwrap ```python -def run_script(module_name: str, - source: t.Union[t.Callable[..., dlt.sources.DltSource], - dlt.sources.DltSource], - *, - run_options: t.Optional[t.Dict[str, t.Any]] = None, - **kwargs: t.Any) -> None +def unwrap() -> T ``` -A shorthand syntax for a cdf script with a single source which should run as a pipeline. - -The first argument should almost always be `__name__`. This function conditionally executes -the source if the module is the main program in cdf context. This occurs either when invoked -through cdf pipeline command or when the script is run directly by python. - - +Unwraps the value of the Maybe. -# cli +**Returns**: -CLI for cdf. + The unwrapped value. - + -#### main +#### unwrap\_or ```python -@app.callback() -def main( - ctx: typer.Context, - workspace: str, - path: Path = typer.Option(".", - "--path", - "-p", - help="Path to the project.", - envvar="CDF_ROOT"), - debug: bool = typer.Option(False, - "--debug", - "-d", - help="Enable debug mode."), - environment: t.Optional[str] = typer.Option(None, - "--env", - "-e", - help="Environment to use."), - log_level: t.Optional[str] = typer.Option( - None, - "--log-level", - "-l", - help="The log level to use.", - envvar="LOG_LEVEL", # A common environment variable for log level - ) -) -> None +def unwrap_or(default: U) -> t.Union[T, U] ``` -CDF (continuous data framework) is a framework for end to end data processing. +Tries to unwrap the Maybe, returning a default value if the Maybe is Nothing. - +**Arguments**: -#### init +- `default` - The value to return if unwrapping Nothing. + -```python -@app.command(rich_help_panel="Project Management") -def init(ctx: typer.Context) -> None -``` +**Returns**: -:art: Initialize a new project. + The unwrapped value or the default value. - + -#### index +#### lift ```python -@app.command(rich_help_panel="Project Management") -def index(ctx: typer.Context, hydrate: bool = False) -> None +@classmethod +def lift(cls, func: t.Callable[[U], + K]) -> t.Callable[["U | Maybe[U]"], "Maybe[K]"] ``` -:page_with_curl: Print an index of [b][blue]Pipelines[/blue], [red]Models[/red], [yellow]Publishers[/yellow][/b], and other components. +Lifts a function to work within the Maybe monad. - +**Arguments**: -#### pipeline +- `func` - A function to lift. + -```python -@app.command(rich_help_panel="Core") -def pipeline( - ctx: typer.Context, - pipeline_to_sink: t.Annotated[ - str, - typer.Argument(help="The pipeline and sink separated by a colon."), - ], - select: t.List[str] = typer.Option( - ..., - "-s", - "--select", - default_factory=lambda: [], - help= - "Glob pattern for resources to run. Can be specified multiple times.", - ), - exclude: t.List[str] = typer.Option( - ..., - "-x", - "--exclude", - default_factory=lambda: [], - help= - "Glob pattern for resources to exclude. Can be specified multiple times.", - ), - force_replace: t.Annotated[ - bool, - typer.Option( - ..., - "-F", - "--force-replace", - help= - "Force the write disposition to replace ignoring state. Useful to force a reload of incremental resources.", - ), - ] = False, - no_stage: t.Annotated[ - bool, - typer.Option( - ..., - "--no-stage", - help= - "Do not stage the data in the staging destination of the sink even if defined.", - ), - ] = False -) -> t.Any -``` +**Returns**: -:inbox_tray: Ingest data from a [b blue]Pipeline[/b blue] into a data store where it can be [b red]Transformed[/b red]. + A new function that returns a Maybe value. + -**Arguments**: +#### \_\_iter\_\_ -- `ctx` - The CLI context. -- `pipeline_to_sink` - The pipeline and sink separated by a colon. -- `select` - The resources to ingest as a sequence of glob patterns. -- `exclude` - The resources to exclude as a sequence of glob patterns. -- `force_replace` - Whether to force replace the write disposition. -- `no_stage` - Allows selective disabling of intermediate staging even if configured in sink. - - - -#### discover - -```python -@app.command(rich_help_panel="Develop") -def discover( - ctx: typer.Context, - pipeline: t.Annotated[ - str, - typer.Argument(help="The pipeline in which to discover resources."), - ], - no_quiet: t.Annotated[ - bool, - typer.Option( - help="Pipeline stdout is suppressed by default, this disables that." - ), - ] = False -) -> None +```python +def __iter__() -> t.Iterator[T] ``` -:mag: Dry run a [b blue]Pipeline[/b blue] and enumerates the discovered resources. +Allows safely unwrapping the value of the Maybe using a for construct. + -**Arguments**: +## Just Objects -- `ctx` - The CLI context. -- `pipeline` - The pipeline in which to discover resources. -- `no_quiet` - Whether to suppress the pipeline stdout. +```python +class Just(Maybe[T]) +``` - + -#### head +#### bind ```python -@app.command(rich_help_panel="Develop") -def head(ctx: typer.Context, - pipeline: t.Annotated[str, - typer.Argument( - help="The pipeline to inspect.")], - resource: t.Annotated[str, - typer.Argument( - help="The resource to inspect.")], - n: t.Annotated[int, typer.Option("-n", "--rows")] = 5) -> None +def bind(func: t.Callable[[T], Maybe[U]]) -> Maybe[U] ``` -:wrench: Prints the first N rows of a [b green]Resource[/b green] within a [b blue]pipeline[/b blue]. Defaults to [cyan]5[/cyan]. - -This is useful for quickly inspecting data :detective: and verifying that it is coming over the wire correctly. - +Applies a function to the value inside the Just. **Arguments**: -- `ctx` - The CLI context. -- `pipeline` - The pipeline to inspect. -- `resource` - The resource to inspect. -- `n` - The number of rows to print. +- `func` - A function that takes a value of type T and returns a Maybe containing a value of type U. -**Raises**: +**Returns**: -- `typer.BadParameter` - If the resource is not found in the pipeline. + The result of applying the function to the value inside the Just. - + -#### publish +#### map ```python -@app.command(rich_help_panel="Core") -def publish( - ctx: typer.Context, - sink_to_publisher: t.Annotated[ - str, - typer.Argument(help="The sink and publisher separated by a colon."), - ], - skip_verification: t.Annotated[ - bool, - typer. - Option(help="Skip the verification of the publisher dependencies.", ), - ] = False -) -> t.Any +def map(func: t.Callable[[T], U]) -> "Maybe[U]" ``` -:outbox_tray: [b yellow]Publish[/b yellow] data from a data store to an [violet]External[/violet] system. - +Applies a mapping function to the value inside the Just. **Arguments**: -- `ctx` - The CLI context. -- `sink_to_publisher` - The sink and publisher separated by a colon. -- `skip_verification` - Whether to skip the verification of the publisher dependencies. - - - -#### script - -```python -@app.command(rich_help_panel="Core") -def script( - ctx: typer.Context, - script: t.Annotated[str, - typer.Argument(help="The script to execute.")], - quiet: t.Annotated[bool, - typer.Option( - help="Suppress the script stdout.")] = False -) -> t.Any -``` - -:hammer: Execute a [b yellow]Script[/b yellow] within the context of the current workspace. - +- `func` - A function that takes a value of type T and returns a value of type U. + -**Arguments**: +**Returns**: -- `ctx` - The CLI context. -- `script` - The script to execute. -- `quiet` - Whether to suppress the script stdout. + A new Just containing the result of applying the function to the value inside the Just. - + -#### notebook +#### filter ```python -@app.command(rich_help_panel="Core") -def notebook( - ctx: typer.Context, - notebook: t.Annotated[str, - typer.Argument(help="The notebook to execute.")], - params: t.Annotated[ - str, - typer.Option( - ..., - help= - "The parameters to pass to the notebook as a json formatted string.", - ), - ] = "{}" -) -> t.Any +def filter(predicate: t.Callable[[T], bool]) -> Maybe[T] ``` -:notebook: Execute a [b yellow]Notebook[/b yellow] within the context of the current workspace. - +Filters the value inside the Just based on a predicate. **Arguments**: -- `ctx` - The CLI context. -- `notebook` - The notebook to execute. -- `params` - The parameters to pass to the notebook as a json formatted string. +- `predicate` - A function that takes a value of type T and returns a boolean. + + +**Returns**: + + A new Just containing the value inside the Just if the predicate holds. - + -#### jupyter\_lab +#### is\_just ```python -@app.command( - rich_help_panel="Utilities", - context_settings={ - "allow_extra_args": True, - "ignore_unknown_options": True - }, -) -def jupyter_lab(ctx: typer.Context) -> None +def is_just() -> bool ``` -:star2: Start a Jupyter Lab server in the context of a workspace. +Returns True if the Maybe is a Just. - + -#### spec +#### is\_nothing ```python -@app.command(rich_help_panel="Develop") -def spec(name: _SpecType, json_schema: bool = False) -> None +def is_nothing() -> bool ``` -:blue_book: Print the fields for a given spec type. +Returns False if the Maybe is a Just. + -**Arguments**: +## Nothing Objects -- `name` - The name of the spec to print. -- `json_schema` - Whether to print the JSON schema for the spec. +```python +class Nothing(Maybe[T]) +``` - + -#### schema\_dump +#### bind ```python -@schema.command("dump") -def schema_dump( - ctx: typer.Context, - pipeline_to_sink: t.Annotated[ - str, - typer.Argument( - help="The pipeline:sink combination from which to fetch the schema." - ), - ], - format: t.Annotated[_ExportFormat, - typer.Option(help="The format to dump the schema in." - )] = _ExportFormat.json -) -> None +def bind(func: t.Callable[[T], Maybe[U]]) -> "Nothing[T]" ``` -:computer: Dump the schema of a [b blue]pipeline[/b blue]:[violet]sink[/violet] combination. - +Applies a function to the value inside the Just. **Arguments**: -- `ctx` - The CLI context. -- `pipeline_to_sink` - The pipeline:sink combination from which to fetch the schema. -- `format` - The format to dump the schema in. +- `func` - A function that takes a value of type T and returns a Maybe containing a value of type U. -**Raises**: +**Returns**: -- `typer.BadParameter` - If the pipeline or sink are not found. + The result of applying the function to the value inside the Just. - + -#### schema\_edit +#### map ```python -@schema.command("edit") -def schema_edit(ctx: typer.Context, pipeline_to_sink: t.Annotated[ - str, - typer.Argument( - help="The pipeline:sink combination from which to fetch the schema."), -]) -> None +def map(func: t.Callable[[T], U]) -> "Nothing[T]" ``` -:pencil: Edit the schema of a [b blue]pipeline[/b blue]:[violet]sink[/violet] combination using the system editor. - +Applies a mapping function to the value inside the Just. **Arguments**: -- `ctx` - The CLI context. -- `pipeline_to_sink` - The pipeline:sink combination from which to fetch the schema. +- `func` - A function that takes a value of type T and returns a value of type U. -**Raises**: +**Returns**: -- `typer.BadParameter` - If the pipeline or sink are not found. + A new Just containing the result of applying the function to the value inside the Just. - + -#### state\_dump +#### filter ```python -@state.command("dump") -def state_dump(ctx: typer.Context, pipeline_to_sink: t.Annotated[ - str, - typer.Argument( - help="The pipeline:sink combination from which to fetch the schema."), -]) -> None +def filter(predicate: t.Callable[[T], bool]) -> "Nothing[T]" ``` -:computer: Dump the state of a [b blue]pipeline[/b blue]:[violet]sink[/violet] combination. - +Filters the value inside the Just based on a predicate. **Arguments**: -- `ctx` - The CLI context. -- `pipeline_to_sink` - The pipeline:sink combination from which to fetch the state. +- `predicate` - A function that takes a value of type T and returns a boolean. -**Raises**: +**Returns**: -- `typer.BadParameter` - If the pipeline or sink are not found. + A new Just containing the value inside the Just if the predicate holds. - + -#### state\_edit +#### is\_just ```python -@state.command("edit") -def state_edit(ctx: typer.Context, pipeline_to_sink: t.Annotated[ - str, - typer.Argument( - help="The pipeline:sink combination from which to fetch the state."), -]) -> None +def is_just() -> bool ``` -:pencil: Edit the state of a [b blue]pipeline[/b blue]:[violet]sink[/violet] combination using the system editor. - +Returns False if the Maybe is a Nothing. -**Arguments**: + -- `ctx` - The CLI context. -- `pipeline_to_sink` - The pipeline:sink combination from which to fetch the state. - +#### is\_nothing -**Raises**: - -- `typer.BadParameter` - If the pipeline or sink are not found. - - - -#### model\_evaluate - -```python -@model.command("evaluate") -def model_evaluate( - ctx: typer.Context, - model: t.Annotated[ - str, - typer.Argument( - help="The model to evaluate. Can be prefixed with the gateway."), - ], - start: str = typer.Option( - "1 month ago", - help= - "The start time to evaluate the model from. Defaults to 1 month ago.", - ), - end: str = typer.Option( - "now", - help="The end time to evaluate the model to. Defaults to now.", - ), - limit: t.Optional[int] = typer.Option( - None, help="The number of rows to limit the evaluation to.") -) -> None +```python +def is_nothing() -> bool ``` -:bar_chart: Evaluate a [b red]Model[/b red] and print the results. A thin wrapper around `sqlmesh evaluate` +Returns True if the Maybe is a Nothing. + -**Arguments**: +## Result Objects -- `ctx` - The CLI context. -- `model` - The model to evaluate. Can be prefixed with the gateway. -- `limit` - The number of rows to limit the evaluation to. - - - -#### model\_render - -```python -@model.command("render") -def model_render( - ctx: typer.Context, - model: t.Annotated[ - str, - typer.Argument( - help="The model to evaluate. Can be prefixed with the gateway."), - ], - start: str = typer.Option( - "1 month ago", - help= - "The start time to evaluate the model from. Defaults to 1 month ago.", - ), - end: str = typer.Option( - "now", - help="The end time to evaluate the model to. Defaults to now.", - ), - expand: t.List[str] = typer.Option( - [], help="The referenced models to expand."), - dialect: t.Optional[str] = typer.Option( - None, help="The SQL dialect to use for rendering.") -) -> None +```python +class Result(Monad[T], t.Generic[T, E]) ``` -:bar_chart: Render a [b red]Model[/b red] and print the query. A thin wrapper around `sqlmesh render` + +#### pure -**Arguments**: +```python +@classmethod +def pure(cls, value: K) -> "Result[K, E]" +``` -- `ctx` - The CLI context. -- `model` - The model to evaluate. Can be prefixed with the gateway. -- `start` - The start time to evaluate the model from. Defaults to 1 month ago. -- `end` - The end time to evaluate the model to. Defaults to now. -- `expand` - The referenced models to expand. -- `dialect` - The SQL dialect to use for rendering. +Creates an Ok with a value. - + -#### model\_name +#### lift ```python -@model.command("name") -def model_name(ctx: typer.Context, model: t.Annotated[ - str, - typer.Argument( - help= - "The model to convert the physical name. Can be prefixed with the gateway." - ), -]) -> None +@classmethod +def lift( + cls, func: t.Callable[[U], K] +) -> t.Callable[["U | Result[U, Exception]"], "Result[K, Exception]"] ``` -:bar_chart: Get a [b red]Model[/b red]'s physical table name. A thin wrapper around `sqlmesh table_name` - +Transforms a function to work with arguments and output wrapped in Result monads. **Arguments**: -- `ctx` - The CLI context. -- `model` - The model to evaluate. Can be prefixed with the gateway. - - - -#### model\_diff - -```python -@model.command("diff") -def model_diff( - ctx: typer.Context, - model: t.Annotated[ - str, - typer.Argument( - help="The model to evaluate. Can be prefixed with the gateway."), - ], - source_target: t.Annotated[ - str, - typer.Argument( - help="The source and target environments separated by a colon."), - ], - show_sample: bool = typer.Option( - False, help="Whether to show a sample of the diff.") -) -> None -``` - -:bar_chart: Compute the diff of a [b red]Model[/b red] across 2 environments. A thin wrapper around `sqlmesh table_diff` - +- `func` - A function that takes any number of arguments and returns a value of type T. + -**Arguments**: +**Returns**: -- `ctx` - The CLI context. -- `model` - The model to evaluate. Can be prefixed with the gateway. -- `source_target` - The source and target environments separated by a colon. + A function that takes the same number of unwrapped arguments and returns a Result-wrapped result. - + -#### model\_prototype +#### \_\_iter\_\_ ```python -@model.command("prototype") -def model_prototype( - ctx: typer.Context, - dependencies: t.List[str] = typer.Option( - [], - "-d", - "--dependencies", - help="The dependencies to include in the prototype.", - ), - start: str = typer.Option( - "1 month ago", - help= - "The start time to evaluate the model from. Defaults to 1 month ago.", - ), - end: str = typer.Option( - "now", - help="The end time to evaluate the model to. Defaults to now.", - ), - limit: int = typer.Option( - 5_000_000, - help="The number of rows to limit the evaluation to.", - )) +def __iter__() -> t.Iterator[T] ``` -:bar_chart: Prototype a model and save the results to disk. +Allows safely unwrapping the value of the Result using a for construct. + -**Arguments**: +## Ok Objects -- `ctx` - The CLI context. -- `dependencies` - The dependencies to include in the prototype. -- `start` - The start time to evaluate the model from. Defaults to 1 month ago. -- `end` - The end time to evaluate the model to. Defaults to now. -- `limit` - The number of rows to limit the evaluation to. +```python +class Ok(Result[T, E]) +``` - + -# types +#### bind -A module for shared types. +```python +def bind(func: t.Callable[[T], Result[U, E]]) -> Result[U, E] +``` - +Applies a function to the result of the Ok. -# types.monads +**Arguments**: -Contains monadic types and functions for working with them. +- `func` - A function that takes a value of type T and returns a Result containing a value of type U. + - +**Returns**: -#### T - -The type of the value inside the Monad - - - -#### U - -The transformed type of the value inside the Monad - - - -#### K - -A known type that is not necessarily the same as T - - - -#### L - -A known type that is not necessarily the same as U - - + A new Result containing the result of the original Result after applying the function. -#### E + -The type of the error inside the Result +#### map - +```python +def map(func: t.Callable[[T], U]) -> Result[U, E] +``` -#### TState +Applies a mapping function to the result of the Ok. -The type of the state +**Arguments**: - +- `func` - A function that takes a value of type T and returns a value of type U. + -#### TMonad +**Returns**: -Generic Self type for Monad + A new Ok containing the result of the original Ok after applying the function. - + -## Maybe Objects +#### is\_ok ```python -class Maybe(Monad[T], abc.ABC) +def is_ok() -> bool ``` - +Returns True if the Result is an Ok. -#### pure + + +#### is\_err ```python -@classmethod -def pure(cls, value: K) -> "Maybe[K]" +def is_err() -> bool ``` -Creates a Maybe with a value. +Returns False if the Result is an Ok. - + #### unwrap @@ -794,202 +434,203 @@ Creates a Maybe with a value. def unwrap() -> T ``` -Unwraps the value of the Maybe. +Unwraps the value of the Ok. **Returns**: The unwrapped value. - + #### unwrap\_or ```python -def unwrap_or(default: U) -> t.Union[T, U] +def unwrap_or(default: t.Any) -> T ``` -Tries to unwrap the Maybe, returning a default value if the Maybe is Nothing. +Tries to unwrap the Ok, returning a default value if unwrapping raises an exception. **Arguments**: -- `default` - The value to return if unwrapping Nothing. +- `default` - The value to return if unwrapping raises an exception. **Returns**: - The unwrapped value or the default value. + The unwrapped value or the default value if an exception is raised. - + -#### lift +#### unwrap\_err ```python -@classmethod -def lift(cls, func: t.Callable[[U], - K]) -> t.Callable[["U | Maybe[U]"], "Maybe[K]"] +def unwrap_err() -> BaseException ``` -Lifts a function to work within the Maybe monad. +Raises a ValueError since the Result is an Ok. + + + +#### filter + +```python +def filter(predicate: t.Callable[[T], bool]) -> Result[T, E] +``` + +Filters the result of the Ok based on a predicate. **Arguments**: -- `func` - A function to lift. +- `predicate` - A function that takes a value of type T and returns a boolean. +- `error` - The error to return if the predicate does not hold. **Returns**: - A new function that returns a Maybe value. + A new Result containing the result of the original Result if the predicate holds. - + -#### \_\_iter\_\_ +#### to\_parts ```python -def __iter__() -> t.Iterator[T] +def to_parts() -> t.Tuple[T, None] ``` -Allows safely unwrapping the value of the Maybe using a for construct. +Unpacks the value of the Ok. - + -## Just Objects +## Err Objects ```python -class Just(Maybe[T]) +class Err(Result[T, E]) ``` - + -#### bind +#### \_\_init\_\_ ```python -def bind(func: t.Callable[[T], Maybe[U]]) -> Maybe[U] +def __init__(error: E) -> None ``` -Applies a function to the value inside the Just. +Initializes an Err with an error. **Arguments**: -- `func` - A function that takes a value of type T and returns a Maybe containing a value of type U. - - -**Returns**: - - The result of applying the function to the value inside the Just. +- `error` - The error to wrap in the Err. - + -#### map +#### bind ```python -def map(func: t.Callable[[T], U]) -> "Maybe[U]" +def bind(func: t.Callable[[T], Result[U, E]]) -> "Err[T, E]" ``` -Applies a mapping function to the value inside the Just. +Applies a function to the result of the Err. **Arguments**: -- `func` - A function that takes a value of type T and returns a value of type U. +- `func` - A function that takes a value of type T and returns a Result containing a value of type U. **Returns**: - A new Just containing the result of applying the function to the value inside the Just. + An Err containing the original error. - + -#### filter +#### map ```python -def filter(predicate: t.Callable[[T], bool]) -> Maybe[T] +def map(func: t.Callable[[T], U]) -> "Err[T, E]" ``` -Filters the value inside the Just based on a predicate. +Applies a mapping function to the result of the Err. **Arguments**: -- `predicate` - A function that takes a value of type T and returns a boolean. +- `func` - A function that takes a value of type T and returns a value of type U. **Returns**: - A new Just containing the value inside the Just if the predicate holds. + An Err containing the original error. - + -#### is\_just +#### is\_ok ```python -def is_just() -> bool +def is_ok() -> bool ``` -Returns True if the Maybe is a Just. +Returns False if the Result is an Err. - + -#### is\_nothing +#### is\_err ```python -def is_nothing() -> bool +def is_err() -> bool ``` -Returns False if the Maybe is a Just. +Returns True if the Result is an Err. - + -## Nothing Objects +#### unwrap ```python -class Nothing(Maybe[T]) +def unwrap() -> T ``` - +Raises a ValueError since the Result is an Err. -#### bind + + +#### unwrap\_or ```python -def bind(func: t.Callable[[T], Maybe[U]]) -> "Nothing[T]" +def unwrap_or(default: U) -> U ``` -Applies a function to the value inside the Just. +Returns a default value since the Result is an Err. **Arguments**: -- `func` - A function that takes a value of type T and returns a Maybe containing a value of type U. +- `default` - The value to return. **Returns**: - The result of applying the function to the value inside the Just. + The default value. - + -#### map +#### unwrap\_err ```python -def map(func: t.Callable[[T], U]) -> "Nothing[T]" +def unwrap_err() -> BaseException ``` -Applies a mapping function to the value inside the Just. - -**Arguments**: - -- `func` - A function that takes a value of type T and returns a value of type U. - +Unwraps the error of the Err. **Returns**: - A new Just containing the result of applying the function to the value inside the Just. + The unwrapped error. - + #### filter ```python -def filter(predicate: t.Callable[[T], bool]) -> "Nothing[T]" +def filter(predicate: t.Callable[[T], bool]) -> "Err[T, E]" ``` -Filters the value inside the Just based on a predicate. +Filters the result of the Err based on a predicate. **Arguments**: @@ -998,115 +639,129 @@ Filters the value inside the Just based on a predicate. **Returns**: - A new Just containing the value inside the Just if the predicate holds. + An Err containing the original error. - + -#### is\_just +#### to\_parts ```python -def is_just() -> bool +def to_parts() -> t.Tuple[None, E] ``` -Returns False if the Maybe is a Nothing. +Unpacks the error of the Err. - + -#### is\_nothing +## Promise Objects ```python -def is_nothing() -> bool +class Promise(t.Generic[T], t.Awaitable[T], Monad[T]) ``` -Returns True if the Maybe is a Nothing. - - + -## Result Objects +#### \_\_init\_\_ ```python -class Result(Monad[T], t.Generic[T, E]) +def __init__(coro_func: t.Callable[P, t.Coroutine[None, None, T]], *args: + P.args, **kwargs: P.kwargs) -> None ``` - +Initializes a Promise with a coroutine function. + +**Arguments**: + +- `coro_func` - A coroutine function that returns a value of type T. +- `args` - Positional arguments to pass to the coroutine function. +- `kwargs` - Keyword arguments to pass to the coroutine function. + + #### pure ```python @classmethod -def pure(cls, value: K) -> "Result[K, E]" +def pure(cls, value: K) -> "Promise[K]" ``` -Creates an Ok with a value. +Creates a Promise that is already resolved with a value. - +**Arguments**: -#### lift +- `value` - The value to resolve the Promise with. + -```python -@classmethod -def lift( - cls, func: t.Callable[[U], K] -) -> t.Callable[["U | Result[U, Exception]"], "Result[K, Exception]"] -``` +**Returns**: -Transforms a function to work with arguments and output wrapped in Result monads. + A new Promise that is already resolved with the value. -**Arguments**: + -- `func` - A function that takes any number of arguments and returns a value of type T. - +#### \_\_await\_\_ -**Returns**: +```python +def __await__() +``` - A function that takes the same number of unwrapped arguments and returns a Result-wrapped result. +Allows the Promise to be awaited. - + -#### \_\_iter\_\_ +#### set\_result ```python -def __iter__() -> t.Iterator[T] +def set_result(result: T) -> None ``` -Allows safely unwrapping the value of the Result using a for construct. +Sets a result on the Promise. - +**Arguments**: -## Ok Objects +- `result` - The result to set on the Promise. + + + +#### set\_exception ```python -class Ok(Result[T, E]) +def set_exception(exception: Exception) -> None ``` - +Sets an exception on the Promise. + +**Arguments**: + +- `exception` - The exception to set on the Promise. + + #### bind ```python -def bind(func: t.Callable[[T], Result[U, E]]) -> Result[U, E] +def bind(func: t.Callable[[T], "Promise[U]"]) -> "Promise[U]" ``` -Applies a function to the result of the Ok. +Applies a function to the result of the Promise. **Arguments**: -- `func` - A function that takes a value of type T and returns a Result containing a value of type U. +- `func` - A function that takes a value of type T and returns a Promise containing a value of type U. **Returns**: - A new Result containing the result of the original Result after applying the function. + A new Promise containing the result of the original Promise after applying the function. - + #### map ```python -def map(func: t.Callable[[T], U]) -> Result[U, E] +def map(func: t.Callable[[T], U]) -> "Promise[U]" ``` -Applies a mapping function to the result of the Ok. +Applies a mapping function to the result of the Promise. **Arguments**: @@ -1115,51 +770,42 @@ Applies a mapping function to the result of the Ok. **Returns**: - A new Ok containing the result of the original Ok after applying the function. - - + A new Promise containing the result of the original Promise after applying the function. -#### is\_ok + -```python -def is_ok() -> bool -``` +#### then -Returns True if the Result is an Ok. +syntactic sugar, equivalent to map - + -#### is\_err +#### filter ```python -def is_err() -> bool +def filter(predicate: t.Callable[[T], bool]) -> "Promise[T]" ``` -Returns False if the Result is an Ok. - - - -#### unwrap +Filters the result of the Promise based on a predicate. -```python -def unwrap() -> T -``` +**Arguments**: -Unwraps the value of the Ok. +- `predicate` - A function that takes a value of type T and returns a boolean. + **Returns**: - The unwrapped value. + A new Promise containing the result of the original Promise if the predicate holds. - + #### unwrap\_or ```python -def unwrap_or(default: t.Any) -> T +def unwrap_or(default: T) -> T ``` -Tries to unwrap the Ok, returning a default value if unwrapping raises an exception. +Tries to unwrap the Promise, returning a default value if unwrapping raises an exception. **Arguments**: @@ -1170,96 +816,146 @@ Tries to unwrap the Ok, returning a default value if unwrapping raises an except The unwrapped value or the default value if an exception is raised. - + -#### unwrap\_err +#### from\_value ```python -def unwrap_err() -> BaseException +@classmethod +def from_value(cls, value: T) -> "Promise[T]" ``` -Raises a ValueError since the Result is an Ok. +Creates a Promise that is already resolved with a value. - +**Arguments**: -#### filter +- `value` - The value to resolve the Promise with. + + +**Returns**: + + A new Promise that is already resolved with the value. + + + +#### from\_exception ```python -def filter(predicate: t.Callable[[T], bool]) -> Result[T, E] +@classmethod +def from_exception(cls, exception: BaseException) -> "Promise[T]" ``` -Filters the result of the Ok based on a predicate. +Creates a Promise that is already resolved with an exception. **Arguments**: -- `predicate` - A function that takes a value of type T and returns a boolean. -- `error` - The error to return if the predicate does not hold. +- `exception` - The exception to resolve the Promise with. **Returns**: - A new Result containing the result of the original Result if the predicate holds. + A new Promise that is already resolved with the exception. - + -#### to\_parts +#### lift ```python -def to_parts() -> t.Tuple[T, None] +@classmethod +def lift( + cls, + func: t.Callable[[U], + T]) -> t.Callable[["U | Promise[U]"], "Promise[T]"] ``` -Unpacks the value of the Ok. +Lifts a synchronous function to work within the Promise context, +making it return a Promise of the result and allowing it to be used +with Promise inputs. - +**Arguments**: -## Err Objects +- `func` - A synchronous function that returns a value of type T. + + +**Returns**: + + A function that, when called, returns a Promise wrapping the result of the original function. + + + +## Lazy Objects ```python -class Err(Result[T, E]) +class Lazy(Monad[T]) ``` - + #### \_\_init\_\_ ```python -def __init__(error: E) -> None +def __init__(computation: t.Callable[[], T]) -> None ``` -Initializes an Err with an error. +Initializes a Lazy monad with a computation that will be executed lazily. **Arguments**: -- `error` - The error to wrap in the Err. +- `computation` - A function that takes no arguments and returns a value of type T. - + + +#### pure + +```python +@classmethod +def pure(cls, value: T) -> "Lazy[T]" +``` + +Creates a Lazy monad with a pure value. + + + +#### evaluate + +```python +def evaluate() -> T +``` + +Evaluates the computation if it has not been evaluated yet and caches the result. + +**Returns**: + + The result of the computation. + + #### bind ```python -def bind(func: t.Callable[[T], Result[U, E]]) -> "Err[T, E]" +def bind(func: t.Callable[[T], "Lazy[U]"]) -> "Lazy[U]" ``` -Applies a function to the result of the Err. +Lazily applies a function to the result of the current computation. **Arguments**: -- `func` - A function that takes a value of type T and returns a Result containing a value of type U. +- `func` - A function that takes a value of type T and returns a Lazy monad containing a value of type U. **Returns**: - An Err containing the original error. + A new Lazy monad containing the result of the computation after applying the function. - + #### map ```python -def map(func: t.Callable[[T], U]) -> "Err[T, E]" +def map(func: t.Callable[[T], U]) -> "Lazy[U]" ``` -Applies a mapping function to the result of the Err. +Lazily applies a mapping function to the result of the computation. **Arguments**: @@ -1268,29 +964,28 @@ Applies a mapping function to the result of the Err. **Returns**: - An Err containing the original error. + A new Lazy monad containing the result of the computation after applying the function. - + -#### is\_ok +#### filter ```python -def is_ok() -> bool +def filter(predicate: t.Callable[[T], bool]) -> "Lazy[T]" ``` -Returns False if the Result is an Err. +Lazily filters the result of the computation based on a predicate. - +**Arguments**: -#### is\_err +- `predicate` - A function that takes a value of type T and returns a boolean. + -```python -def is_err() -> bool -``` +**Returns**: -Returns True if the Result is an Err. + A new Lazy monad containing the result of the computation if the predicate holds. - + #### unwrap @@ -1298,3934 +993,2660 @@ Returns True if the Result is an Err. def unwrap() -> T ``` -Raises a ValueError since the Result is an Err. +Forces evaluation of the computation and returns its result. - +**Returns**: + + The result of the computation. + + #### unwrap\_or ```python -def unwrap_or(default: U) -> U +def unwrap_or(default: T) -> T ``` -Returns a default value since the Result is an Err. +Tries to evaluate the computation, returning a default value if evaluation raises an exception. **Arguments**: -- `default` - The value to return. +- `default` - The value to return if the computation raises an exception. **Returns**: - The default value. + The result of the computation or the default value if an exception is raised. - + -#### unwrap\_err +#### lift ```python -def unwrap_err() -> BaseException +@classmethod +def lift(cls, func: t.Callable[[U], + T]) -> t.Callable[["U | Lazy[U]"], "Lazy[T]"] ``` -Unwraps the error of the Err. +Transforms a function to work with arguments and output wrapped in Lazy monads. -**Returns**: +**Arguments**: - The unwrapped error. +- `func` - A function that takes any number of arguments and returns a value of type U. + - +**Returns**: -#### filter + A function that takes the same number of Lazy-wrapped arguments and returns a Lazy-wrapped result. -```python -def filter(predicate: t.Callable[[T], bool]) -> "Err[T, E]" -``` + -Filters the result of the Err based on a predicate. +#### Defer -**Arguments**: +Defer is an alias for Lazy -- `predicate` - A function that takes a value of type T and returns a boolean. - + -**Returns**: +#### S - An Err containing the original error. +State type - + -#### to\_parts +#### A -```python -def to_parts() -> t.Tuple[None, E] -``` +Return type -Unpacks the error of the Err. + - +#### B -## Promise Objects +Transformed type + + + +## State Objects ```python -class Promise(t.Generic[T], t.Awaitable[T], Monad[T]) +class State(t.Generic[S, A], Monad[A], abc.ABC) ``` - + -#### \_\_init\_\_ +#### lift ```python -def __init__(coro_func: t.Callable[P, t.Coroutine[None, None, T]], *args: - P.args, **kwargs: P.kwargs) -> None +@classmethod +def lift( + cls, + func: t.Callable[[U], + A]) -> t.Callable[["U | State[S, U]"], "State[S, A]"] ``` -Initializes a Promise with a coroutine function. +Lifts a function to work within the State monad. **Arguments**: -- `coro_func` - A coroutine function that returns a value of type T. -- `args` - Positional arguments to pass to the coroutine function. -- `kwargs` - Keyword arguments to pass to the coroutine function. - - +- `func` - A function to lift. -#### pure +**Returns**: -```python -@classmethod -def pure(cls, value: K) -> "Promise[K]" -``` + A new function that returns a State value. -Creates a Promise that is already resolved with a value. + -**Arguments**: +#### error -- `value` - The value to resolve the Promise with. - +noqa: E731 -**Returns**: + - A new Promise that is already resolved with the value. +# core - + -#### \_\_await\_\_ +# core.configuration -```python -def __await__() -``` +Configuration utilities for the CDF configuration resolver system. -Allows the Promise to be awaited. +There are 3 ways to request configuration values: - +1. Using a Request annotation: -#### set\_result +Pro: It's explicit and re-usable. An annotation can be used in multiple places. ```python -def set_result(result: T) -> None +import typing as t +import cdf.core.configuration as conf + +def foo(bar: t.Annotated[str, conf.Request["api.key"]]) -> None: + print(bar) ``` -Sets a result on the Promise. +2. Setting a __cdf_resolve__ attribute on a callable object. This can be done +directly or by using the `map_section` or `map_values` decorators: -**Arguments**: +Pro: It's concise and can be used in a decorator. It also works with classes. -- `result` - The result to set on the Promise. +```python +import cdf.core.configuration as conf - +@conf.map_section("api") +def foo(key: str) -> None: + print(key) -#### set\_exception +@conf.map_values(key="api.key") +def bar(key: str) -> None: + print(key) -```python -def set_exception(exception: Exception) -> None +def baz(key: str) -> None: + print(key) + +baz.__cdf_resolve__ = ("api",) ``` -Sets an exception on the Promise. +3. Using the `_cdf_resolve` kwarg to request the resolver: -**Arguments**: +Pro: It's flexible and can be used in any function. It requires no imports. -- `exception` - The exception to set on the Promise. +```python +def foo(key: str, _cdf_resolve=("api",)) -> None: + print(key) - +def bar(key: str, _cdf_resolve={"key": "api.key"}) -> None: + print(key) +``` -#### bind + + +#### load\_file ```python -def bind(func: t.Callable[[T], "Promise[U]"]) -> "Promise[U]" +def load_file(path: Path) -> M.Result[t.Dict[str, t.Any], Exception] ``` -Applies a function to the result of the Promise. +Load a configuration from a file path. **Arguments**: -- `func` - A function that takes a value of type T and returns a Promise containing a value of type U. +- `path` - The file path. **Returns**: - A new Promise containing the result of the original Promise after applying the function. + A Result monad with the configuration dictionary if the file format is JSON, YAML or TOML. + Otherwise, a Result monad with an error. - + -#### map +#### add\_custom\_converter ```python -def map(func: t.Callable[[T], U]) -> "Promise[U]" +def add_custom_converter(name: str, converter: t.Callable[[str], + t.Any]) -> None ``` -Applies a mapping function to the result of the Promise. +Add a custom converter to the configuration system. -**Arguments**: + -- `func` - A function that takes a value of type T and returns a value of type U. - +#### get\_converter -**Returns**: +```python +def get_converter(name: str) -> t.Callable[[str], t.Any] +``` - A new Promise containing the result of the original Promise after applying the function. +Get a custom converter from the configuration system. - + -#### then +#### remove\_converter -syntactic sugar, equivalent to map +```python +def remove_converter(name: str) -> None +``` - +Remove a custom converter from the configuration system. -#### filter + + +#### apply\_converters ```python -def filter(predicate: t.Callable[[T], bool]) -> "Promise[T]" +def apply_converters(input_value: t.Any, + resolver: t.Optional["ConfigResolver"] = None) -> t.Any ``` -Filters the result of the Promise based on a predicate. +Apply converters to a string. -**Arguments**: + -- `predicate` - A function that takes a value of type T and returns a boolean. - +## \_ConfigScopes Objects -**Returns**: +```python +class _ConfigScopes(t.NamedTuple) +``` - A new Promise containing the result of the original Promise if the predicate holds. +A struct to store named configuration scopes by precedence. - + -#### unwrap\_or +#### explicit -```python -def unwrap_or(default: T) -> T -``` +User-provided configuration passed as a dictionary. -Tries to unwrap the Promise, returning a default value if unwrapping raises an exception. + -**Arguments**: +#### environment -- `default` - The value to return if unwrapping raises an exception. - +Environment-specific configuration loaded from a file. -**Returns**: + - The unwrapped value or the default value if an exception is raised. +#### baseline - +Configuration loaded from a base config file. -#### from\_value + + +#### resolve ```python -@classmethod -def from_value(cls, value: T) -> "Promise[T]" +def resolve() -> "Box" ``` -Creates a Promise that is already resolved with a value. +Resolve the configuration scopes. -**Arguments**: + -- `value` - The value to resolve the Promise with. - +## ConfigLoader Objects -**Returns**: +```python +class ConfigLoader() +``` - A new Promise that is already resolved with the value. +Load configuration from multiple sources. - + -#### from\_exception +#### \_\_init\_\_ ```python -@classmethod -def from_exception(cls, exception: BaseException) -> "Promise[T]" +def __init__(*sources: ConfigSource, + environment: str = "dev", + deferred: bool = False) -> None ``` -Creates a Promise that is already resolved with an exception. +Initialize the configuration loader. -**Arguments**: + -- `exception` - The exception to resolve the Promise with. - +#### config -**Returns**: +```python +@property +def config() -> t.Mapping[str, t.Any] +``` - A new Promise that is already resolved with the exception. +Get the configuration dictionary. - + -#### lift +#### import\_ ```python -@classmethod -def lift( - cls, - func: t.Callable[[U], - T]) -> t.Callable[["U | Promise[U]"], "Promise[T]"] +def import_(source: ConfigSource, append: bool = True) -> None ``` -Lifts a synchronous function to work within the Promise context, -making it return a Promise of the result and allowing it to be used -with Promise inputs. - -**Arguments**: +Include a new source of configuration. -- `func` - A synchronous function that returns a value of type T. - + -**Returns**: +#### RESOLVER\_HINT - A function that, when called, returns a Promise wrapping the result of the original function. +A hint to engage the configuration resolver. - + -## Lazy Objects +#### map\_config\_section ```python -class Lazy(Monad[T]) +def map_config_section( + *sections: str) -> t.Callable[[t.Callable[P, T]], t.Callable[P, T]] ``` - +Mark a function to inject configuration values from a specific section. -#### \_\_init\_\_ + + +#### map\_config\_values ```python -def __init__(computation: t.Callable[[], T]) -> None +def map_config_values(**mapping: t.Any + ) -> t.Callable[[t.Callable[P, T]], t.Callable[P, T]] ``` -Initializes a Lazy monad with a computation that will be executed lazily. - -**Arguments**: - -- `computation` - A function that takes no arguments and returns a value of type T. +Mark a function to inject configuration values from a specific mapping of param names to keys. - + -#### pure +## ConfigResolver Objects ```python -@classmethod -def pure(cls, value: T) -> "Lazy[T]" +class ConfigResolver(t.MutableMapping) ``` -Creates a Lazy monad with a pure value. +Resolve configuration values. - + -#### evaluate +#### map\_section -```python -def evaluate() -> T -``` +Mark a function to inject configuration values from a specific section. -Evaluates the computation if it has not been evaluated yet and caches the result. + -**Returns**: +#### map\_values - The result of the computation. +Mark a function to inject configuration values from a specific mapping of param names to keys. - + -#### bind +#### \_\_init\_\_ ```python -def bind(func: t.Callable[[T], "Lazy[U]"]) -> "Lazy[U]" +def __init__( + *sources: ConfigSource, + environment: str = "dev", + loader: ConfigLoader = ConfigLoader("config.json") +) -> None ``` -Lazily applies a function to the result of the current computation. +Initialize the configuration resolver. -**Arguments**: + -- `func` - A function that takes a value of type T and returns a Lazy monad containing a value of type U. - +#### config -**Returns**: +```python +@property +def config() -> t.Mapping[str, t.Any] +``` - A new Lazy monad containing the result of the computation after applying the function. +Get the configuration dictionary. - + -#### map +#### \_\_getitem\_\_ ```python -def map(func: t.Callable[[T], U]) -> "Lazy[U]" +def __getitem__(key: str) -> t.Any ``` -Lazily applies a mapping function to the result of the computation. +Get a configuration value. -**Arguments**: + -- `func` - A function that takes a value of type T and returns a value of type U. - +#### \_\_setitem\_\_ -**Returns**: +```python +def __setitem__(key: str, value: t.Any) -> None +``` - A new Lazy monad containing the result of the computation after applying the function. +Set a configuration value. - + -#### filter +#### \_\_getattr\_\_ ```python -def filter(predicate: t.Callable[[T], bool]) -> "Lazy[T]" +def __getattr__(key: str) -> t.Any ``` -Lazily filters the result of the computation based on a predicate. - -**Arguments**: - -- `predicate` - A function that takes a value of type T and returns a boolean. - - -**Returns**: - - A new Lazy monad containing the result of the computation if the predicate holds. +Get a configuration value. - + -#### unwrap +#### \_\_enter\_\_ ```python -def unwrap() -> T +def __enter__() -> "ConfigResolver" ``` -Forces evaluation of the computation and returns its result. - -**Returns**: - - The result of the computation. +Enter a context. - + -#### unwrap\_or +#### \_\_exit\_\_ ```python -def unwrap_or(default: T) -> T +def __exit__(*args) -> None ``` -Tries to evaluate the computation, returning a default value if evaluation raises an exception. - -**Arguments**: - -- `default` - The value to return if the computation raises an exception. - - -**Returns**: - - The result of the computation or the default value if an exception is raised. +Exit a context. - + -#### lift +#### \_\_repr\_\_ ```python -@classmethod -def lift(cls, func: t.Callable[[U], - T]) -> t.Callable[["U | Lazy[U]"], "Lazy[T]"] +def __repr__() -> str ``` -Transforms a function to work with arguments and output wrapped in Lazy monads. - -**Arguments**: - -- `func` - A function that takes any number of arguments and returns a value of type U. - - -**Returns**: - - A function that takes the same number of Lazy-wrapped arguments and returns a Lazy-wrapped result. - - - -#### Defer - -Defer is an alias for Lazy - - - -#### S - -State type - - - -#### A - -Return type - - - -#### B - -Transformed type +Get a string representation of the configuration resolver. - + -## State Objects +#### set\_environment ```python -class State(t.Generic[S, A], Monad[A], abc.ABC) +def set_environment(environment: str) -> None ``` - +Set the environment of the configuration resolver. -#### lift + + +#### import\_source ```python -@classmethod -def lift( - cls, - func: t.Callable[[U], - A]) -> t.Callable[["U | State[S, U]"], "State[S, A]"] +def import_source(source: ConfigSource, append: bool = True) -> None ``` -Lifts a function to work within the State monad. - -**Arguments**: - -- `func` - A function to lift. +Include a new source of configuration. -**Returns**: + - A new function that returns a State value. +#### kwarg\_hint - +A hint supplied in a kwarg to engage the configuration resolver. -#### error + -noqa: E731 +#### resolve\_defaults - +```python +def resolve_defaults(func_or_cls: t.Callable[P, T]) -> t.Callable[..., T] +``` -# core +Resolve configuration values into a function or class. - + -# core.config +#### is\_resolvable -The config module provides a configuration provider for CDF scoped settings. +```python +def is_resolvable(param: inspect.Parameter) -> bool +``` -This allows for the configuration to be accessed and modified in a consistent manner across -the codebase leveraging dlt's configuration provider interface. It also makes all of dlt's -semantics which depend on the configuration providers seamlessly work with CDF's configuration. +Check if a parameter is injectable. - + -## CdfConfigProvider Objects +#### extract\_request\_annotation ```python -class CdfConfigProvider(_ConfigProvider) +@staticmethod +def extract_request_annotation(param: inspect.Parameter) -> t.Optional[str] ``` -A configuration provider for CDF scoped settings. +Extract a request annotation from a parameter. - + -#### \_\_init\_\_ +#### \_\_call\_\_ ```python -def __init__(scope: t.ChainMap[str, t.Any], secret: bool = False) -> None +def __call__(func_or_cls: t.Callable[P, T], *args: t.Any, + **kwargs: t.Any) -> T ``` -Initialize the provider. +Invoke a callable with injected configuration values. -**Arguments**: + -- `config` - The configuration ChainMap. +# core.component.service - + -#### get\_value +## Service Objects ```python -def get_value(key: str, hint: t.Type[t.Any], pipeline_name: str, *sections: - str) -> t.Tuple[t.Optional[t.Any], str] +class Service(Component[ServiceProto]) ``` -Get a value from the configuration. +A service that the workspace provides. IE an API, database, requests client, etc. - + -#### set\_value +# core.component -```python -def set_value(key: str, value: t.Any, pipeline_name: str, *sections: - str) -> None -``` + -Set a value in the configuration. +# core.component.operation - + -#### name +## Operation Objects ```python -@property -def name() -> str +class Operation(Entrypoint[OperationProto]) ``` -The name of the provider - - +A generic callable that returns an exit code. -#### supports\_sections - -```python -@property -def supports_sections() -> bool -``` + -This provider supports sections +# core.component.pipeline - + -#### supports\_secrets +## DataPipeline Objects ```python -@property -def supports_secrets() -> bool +class DataPipeline(Entrypoint[DataPipelineProto]) ``` -There is no differentiation between secrets and non-secrets for the cdf provider. - -Nothing is persisted. Data is available in memory and backed by the dynaconf settings object. +A data pipeline which loads data from a source to a destination. - + -#### is\_writable +#### \_\_call\_\_ ```python -@property -def is_writable() -> bool +def __call__(*args: t.Any, **kwargs: t.Any) -> t.List["LoadInfo"] ``` -Whether the provider is writable +Run the data pipeline - + -#### get\_config\_providers +#### get\_schemas ```python -def get_config_providers( - scope: t.ChainMap[str, t.Any], - include_env: bool = True -) -> t.Union[ - t.Tuple[CdfConfigProvider, CdfConfigProvider], - t.Tuple[EnvironProvider, CdfConfigProvider, CdfConfigProvider], -] +def get_schemas(destination: t.Optional["DltDestination"] = None) ``` -Get the configuration providers for the given scope. +Get the schemas for the pipeline. - + -#### inject\_configuration +#### run\_tests ```python -@contextmanager -def inject_configuration( - scope: t.ChainMap[str, t.Any], - include_env: bool = True) -> t.Iterator[t.Mapping[str, t.Any]] +def run_tests() -> None ``` -Inject the configuration provider into the context - -This allows dlt.config and dlt.secrets to access the scope configuration. Furthermore -it makes the scope configuration available throughout dlt where things such as extract, -normalize, and load settings can be specified. - - +Run the integration test for the pipeline. -# core.filesystem + -A central interface for filesystems thinly wrapping fsspec. +# core.component.publisher - + -## FilesystemAdapter Objects +## DataPublisher Objects ```python -class FilesystemAdapter() +class DataPublisher(Entrypoint[DataPublisherProto]) ``` -Wraps an fsspec filesystem. +A data publisher which pushes data to an operational system. -The filesystem is lazily loaded. Certain methods are intercepted to include cdf-specific logic. Helper -methods are provided for specific operations. + - - -#### \_\_init\_\_ +#### \_\_call\_\_ ```python -@with_config(sections=("filesystem", )) -def __init__(uri: PathLike = dlt.config.value, - root: t.Optional[PathLike] = None, - options: t.Optional[t.Dict[str, t.Any]] = None) -> None +def __call__(*args: t.Any, **kwargs: t.Any) -> None ``` -Load a filesystem from a provider and kwargs. +Publish the data -**Arguments**: + -- `uri` - The filesystem URI. -- `options` - The filesystem provider kwargs. +# core.component.base - + -#### \_\_getattr\_\_ +## ServiceLevelAgreement Objects ```python -def __getattr__(name: str) -> t.Any +class ServiceLevelAgreement(Enum) ``` -Proxy attribute access to the wrapped filesystem. +An SLA to assign to a component. Users can define the meaning of each level. - + -#### \_\_getitem\_\_ +## \_Node Objects ```python -def __getitem__(value: str) -> t.Any +class _Node(pydantic.BaseModel) ``` -Get a path from the filesystem. +A node in a graph of components. - + -#### \_\_setitem\_\_ +#### owner -```python -def __setitem__(key: str, value: t.Any) -> None -``` +The owner of the node. Useful for tracking who to contact for issues or config. + + -Set a path in the filesystem. +#### description - +A description of the node. -#### open + -```python -def open(path: PathLike, mode: str = "r", **kwargs: t.Any) -> t.Any -``` +#### sla -Open a file from the filesystem. +The SLA for the node. -**Arguments**: + -- `path` - The path to the file. -- `mode` - The file mode. -- `kwargs` - Additional kwargs. - +#### enabled -**Returns**: +Whether the node is enabled or disabled. Disabled components are not loaded. - The file handle. + - +#### version -# core.constants +A semantic version for the node. Can signal breaking changes to dependents. -Constants used by CDF. + - +#### tags -#### CDF\_ENVIRONMENT +Tags to categorize the node. -Environment variable to set the environment of the project. + - +#### metadata -#### DEFAULT\_ENVIRONMENT +Additional metadata for the node. Useful for custom integrations. -Default environment for the project. + - +## Component Objects -#### CDF\_MAIN +```python +class Component(_Node, t.Generic[T]) +``` -A sentinel value that will match the __name__ attribute of a module being executed by CDF. +A component with a binding to a dependency. - + -#### CDF\_LOG\_LEVEL +#### main -Environment variable to set the log level of the project. +The dependency for the component. This is what is injected into the workspace. - + -# core.runtime +#### name - +The key to register the component in the container. -# core.runtime.common +Must be a valid Python identifier. Users can use these names as function parameters +for implicit dependency injection. Names must be unique within the workspace. - + -#### with\_activate\_project +#### \_\_call\_\_ ```python -def with_activate_project(func: t.Callable[P, T]) -> t.Callable[P, T] +def __call__() -> T ``` -Attempt to inject the Project associated with the first argument into cdf.context. - -**Arguments**: - -- `func` - The function to decorate. - - -**Returns**: - - The decorated function. +Unwrap the main dependency invoking the underlying callable. - + -# core.runtime.pipeline +## Entrypoint Objects -The runtime pipeline module is responsible for executing pipelines from pipeline specifications. +```python +class Entrypoint(_Node, t.Generic[T]) +``` -It performs the following functions: -- Injects the runtime context into the pipeline. -- Executes the pipeline. -- Captures metrics during extract. -- Intercepts sources during extract. (if specified, this makes the pipeline a no-op) -- Applies transformations to sources during extract. -- Stages data if a staging location is provided and enabled in the runtime context. -- Forces replace disposition if specified in the runtime context. -- Filters resources based on glob patterns. -- Logs a warning if dataset_name is provided in the runtime context. (since we want to manage it) -- Creates a cdf pipeline from a dlt pipeline. +An entrypoint representing an invokeable set of functions. - + -#### pipeline +#### main -Gets the active pipeline or creates a new one with the given arguments. +The main function associated with the entrypoint. - + -## RuntimePipeline Objects +#### name -```python -class RuntimePipeline(Pipeline) -``` +The name of the entrypoint. -Overrides certain methods of the dlt pipeline to allow for cdf specific behavior. +This is used to register the entrypoint in the workspace and CLI. Names must be +unique within the workspace. The name can contain spaces and special characters. - + -#### configure +#### \_\_call\_\_ ```python -def configure(dry_run: bool = False, - force_replace: bool = False, - select: t.Optional[t.List[str]] = None, - exclude: t.Optional[t.List[str]] = None) -> "RuntimePipeline" +def __call__(*args: t.Any, **kwargs: t.Any) -> t.Any ``` -Configures options which affect the behavior of the pipeline at runtime. - -**Arguments**: +Invoke the entrypoint. -- `dry_run` - Whether to run the pipeline in dry run mode. -- `force_replace` - Whether to force replace disposition. -- `select` - A list of glob patterns to select resources. -- `exclude` - A list of glob patterns to exclude resources. - + -**Returns**: +# core.context -- `RuntimePipeline` - The pipeline with source hooks configured. +Context management utilities for managing the active workspace. - + -#### force\_replace +#### get\_active\_workspace ```python -@property -def force_replace() -> bool +def get_active_workspace() -> t.Optional["Workspace"] ``` -Whether to force replace disposition. +Get the active workspace for resolving injected dependencies. - + -#### dry\_run +#### set\_active\_workspace ```python -@property -def dry_run() -> bool +def set_active_workspace(workspace: t.Optional["Workspace"]) -> Token ``` -Dry run mode. +Set the active workspace for resolving injected dependencies. - + -#### metric\_accumulator +#### use\_workspace ```python -@property -def metric_accumulator() -> t.Mapping[str, t.Any] +@contextlib.contextmanager +def use_workspace(workspace: t.Optional["Workspace"]) -> t.Iterator[None] ``` -A container for accumulating metrics during extract. +Context manager for temporarily setting the active workspace. - + -#### source\_hooks +#### resolve ```python -@property -def source_hooks( -) -> t.List[t.Callable[[dlt.sources.DltSource], dlt.sources.DltSource]] +def resolve( + dependencies: t.Union[t.Callable[..., T], bool] = True, + configuration: bool = True, + eagerly_bind_workspace: bool = False +) -> t.Callable[..., t.Union[T, t.Callable[..., T]]] ``` -The source hooks for the pipeline. +Decorator for injecting dependencies and resolving configuration for a function. - + -#### tracked\_sources +#### get\_default\_callable\_lifecycle ```python -@property -def tracked_sources() -> t.Set[dlt.sources.DltSource] +def get_default_callable_lifecycle() -> t.Optional["Lifecycle"] ``` -The sources tracked by the pipeline. +Get the default lifecycle for callables when otherwise unspecified. - + -## PipelineResult Objects +#### set\_default\_callable\_lifecycle ```python -class PipelineResult(t.NamedTuple) +def set_default_callable_lifecycle( + lifecycle: t.Optional["Lifecycle"]) -> Token ``` -The result of executing a pipeline specification. +Set the default lifecycle for callables when otherwise unspecified. - + -#### execute\_pipeline\_specification +#### use\_default\_callable\_lifecycle ```python -@with_activate_project -def execute_pipeline_specification( - pipe_spec: PipelineSpecification, - sink_spec: t.Union[ - TDestinationReferenceArg, - t.Tuple[TDestinationReferenceArg, - t.Optional[TDestinationReferenceArg]], - SinkSpecification, - ], - select: t.Optional[t.List[str]] = None, - exclude: t.Optional[t.List[str]] = None, - force_replace: bool = False, - dry_run: bool = False, - enable_stage: bool = True, - quiet: bool = False, - **pipeline_options: t.Any) -> M.Result[PipelineResult, Exception] +@contextlib.contextmanager +def use_default_callable_lifecycle( + lifecycle: t.Optional["Lifecycle"]) -> t.Iterator[None] ``` -Executes a pipeline specification. - -**Arguments**: - -- `pipe_spec` - The pipeline specification. -- `sink_spec` - The destination where the pipeline will write data. -- `select` - A list of glob patterns to select resources. -- `exclude` - A list of glob patterns to exclude resources. -- `force_replace` - Whether to force replace disposition. -- `dry_run` - Whether to run the pipeline in dry run mode. -- `enable_stage` - Whether to enable staging. If disabled, staging will be ignored. -- `quiet` - Whether to suppress output. -- `pipeline_options` - Additional dlt.pipeline constructor arguments. - - -**Returns**: - - M.Result[PipelineResult, Exception]: The result of executing the pipeline specification. - - +Context manager for temporarily setting the default callable lifecycle. -# core.runtime.publisher + -The runtime publisher module is responsible for executing publishers from publisher specifications. +# core.workspace -It performs the following functions: -- Validates the dependencies of the publisher exist. -- Verifies the dependencies are up-to-date. -- Executes the publisher script. +A workspace is a container for components and configurations. - + -#### execute\_publisher\_specification +## Workspace Objects ```python -@with_activate_project -def execute_publisher_specification( - spec: PublisherSpecification, - transform_ctx: sqlmesh.Context, - skip_verification: bool = False -) -> M.Result[t.Dict[str, t.Any], Exception] +class Workspace(pydantic.BaseModel) ``` -Execute a publisher specification. - -**Arguments**: +A CDF workspace that allows for dependency injection and configuration resolution. -- `spec` - The publisher specification to execute. -- `transform_ctx` - The SQLMesh context to use for execution. -- `skip_verification` - Whether to skip the verification of the publisher dependencies. + - +#### name -# core.runtime.notebook +A human-readable name for the workspace. -The runtime notebook module is responsible for executing notebooks from notebook specifications. + -It performs the following functions: -- Executes the notebook. -- Writes the output to a designated location in a storage provider. -- Cleans up the rendered notebook if required. +#### version - +A semver version string for the workspace. -#### execute\_notebook\_specification + -```python -@with_activate_project -def execute_notebook_specification( - spec: NotebookSpecification, - **params: t.Any) -> M.Result["NotebookNode", Exception] -``` +#### environment -Execute a notebook specification. +The runtime environment used to resolve configuration. -**Arguments**: + -- `spec` - The notebook specification to execute. -- `storage` - The filesystem to use for persisting the output. -- `**params` - The parameters to pass to the notebook. Overrides the notebook spec parameters. +#### conf\_resolver - +The configuration resolver for the workspace. -# core.runtime.script + -The runtime script module is responsible for executing scripts from script specifications. +#### container -It performs the following functions: -- Executes the script. -- Optionally captures stdout and returns it as a string. +The dependency injection container for the workspace. - + -#### execute\_script\_specification +#### configuration\_sources -```python -@with_activate_project -def execute_script_specification( - spec: ScriptSpecification, - capture_stdout: bool = False -) -> t.Union[M.Result[t.Dict[str, t.Any], Exception], M.Result[str, - Exception]] -``` +A list of configuration sources resolved and merged by the workspace. -Execute a script specification. + -**Arguments**: +#### service\_definitions -- `spec` - The script specification to execute. -- `capture_stdout` - Whether to capture stdout and return it. False returns an empty string. +An iterable of raw service definitions that the workspace provides. - + -# core.logger +#### pipeline\_definitions -Logger for CDF +An iterable of raw pipeline definitions that the workspace provides. - + -#### LOGGER +#### publishers\_definitions -CDF logger instance. +An iterable of raw publisher definitions that the workspace provides. - + -#### LOG\_LEVEL +#### operation\_definitions -The active log level for CDF. +An iterable of raw generic operation definitions that the workspace provides. - + -#### configure +#### sqlmesh\_path -```python -def configure(level: int | str = logging.INFO) -> None -``` +The path to the sqlmesh root for the workspace. -Configure logging. + -**Arguments**: +#### sqlmesh\_context\_kwargs -- `level` _int, optional_ - Logging level. Defaults to logging.INFO. +Keyword arguments to pass to the sqlmesh context. - + -#### create +#### activate ```python -def create(name: str | None = None) -> CDFLoggerAdapter | logging.Logger +def activate() -> Self ``` -Get or create a logger. +Activate the workspace for the current context. -**Arguments**: + -- `name` _str, optional_ - The name of the logger. If None, the package logger is - returned. Defaults to None. If a name is provided, a child logger is - created. - +#### services -**Returns**: +```python +@cached_property +def services() -> t.Dict[str, cmp.Service] +``` - The logger. +Return the resolved services of the workspace. - + -#### log\_level +#### pipelines ```python -def log_level() -> str +@cached_property +def pipelines() -> t.Dict[str, cmp.DataPipeline] ``` -Returns current log level +Return the resolved data pipelines of the workspace. - + -#### set\_level +#### publishers ```python -def set_level(level: int | str) -> None +@cached_property +def publishers() -> t.Dict[str, cmp.DataPublisher] ``` -Set the package log level. +Return the resolved data publishers of the workspace. -**Arguments**: + -- `level` _int | str_ - The new log level. - +#### operations -**Raises**: +```python +@cached_property +def operations() -> t.Dict[str, cmp.Operation] +``` -- `ValueError` - If the log level is not valid. +Return the resolved operations of the workspace. - + -#### suppress\_and\_warn +#### get\_sqlmesh\_context ```python -@contextlib.contextmanager -def suppress_and_warn() -> t.Iterator[None] +def get_sqlmesh_context(gateway: t.Optional[str] = None, + must_exist: bool = False, + **kwargs: t.Any) -> t.Optional["sqlmesh.Context"] ``` -Suppresses exception and logs it as warning +Return the transform context or raise an error if not defined. - + -#### mute +#### cli ```python -@contextlib.contextmanager -def mute() -> t.Iterator[None] +@property +def cli() -> "click.Group" ``` -Mute the logger. +Dynamically generate a CLI entrypoint for the workspace. - + -#### \_\_getattr\_\_ +#### bind ```python -def __getattr__(name: str) -> "LogMethod" +def bind(func_or_cls: t.Callable[P, T]) -> t.Callable[..., T] ``` -Get a logger method from the package logger. +Wrap a function with configuration and dependencies defined in the workspace. - + -#### apply\_patches +#### invoke ```python -def apply_patches() -> None +def invoke(func_or_cls: t.Callable[P, T], *args: t.Any, **kwargs: t.Any) -> T ``` -Apply logger patches. +Invoke a function with configuration and dependencies defined in the workspace. + + - +# core.injector.registry -# core.specification.sink +Dependency registry with lifecycle management. - + -## SinkSpecification Objects +## Lifecycle Objects ```python -class SinkSpecification(PythonScript) +class Lifecycle(enum.Enum) ``` -A sink specification. +Lifecycle of a dependency. - + -#### ingest\_config +#### PROTOTYPE -The variable which holds the ingest configuration (a dlt destination). +A prototype dependency is created every time it is requested - + -#### stage\_config +#### SINGLETON -The variable which holds the staging configuration (a dlt destination). +A singleton dependency is created once and shared. - + -#### transform\_config +#### INSTANCE -The variable which holds the transform configuration (a sqlmesh config). +An instance dependency is a global object which is not created by the container. - + -#### get\_ingest\_config +#### is\_prototype ```python -def get_ingest_config() -> t.Tuple[Destination, t.Optional[Destination]] +@property +def is_prototype() -> bool ``` -Get the ingest configuration. +Check if the lifecycle is prototype. - + -#### get\_transform\_config +#### is\_singleton ```python -def get_transform_config() -> GatewayConfig +@property +def is_singleton() -> bool ``` -Get the transform configuration. +Check if the lifecycle is singleton. - + -#### ingest +#### is\_instance ```python @property -def ingest() -> Destination +def is_instance() -> bool ``` -The ingest destination. +Check if the lifecycle is instance. - + -#### stage +#### is\_deferred ```python @property -def stage() -> t.Optional[Destination] +def is_deferred() -> bool ``` -The stage destination. +Check if the object to be created is deferred. - + -#### transform +## TypedKey Objects ```python -@property -def transform() -> GatewayConfig +class TypedKey(t.NamedTuple) ``` -The transform configuration. +A key which is a tuple of a name and a type. + + - +#### type\_name -# core.specification +```python +@property +def type_name() -> t.Optional[str] +``` - +Get the name of the type if applicable. -# core.specification.model + - +#### \_\_eq\_\_ -# core.specification.pipeline +```python +def __eq__(other: t.Any) -> bool +``` -The spec classes for continuous data framework pipelines. +Two keys are equal if their names and base types match. - + -## PipelineMetricSpecification Objects +#### \_\_hash\_\_ ```python -class PipelineMetricSpecification(PythonEntrypoint) +def __hash__() -> int ``` -Defines metrics which can be captured during pipeline execution - - +Hash the key with the effective type if possible. -#### options + -Kwargs to pass to the metric function. +#### DependencyKey -This assumes the metric is a callable which accepts kwargs and returns a metric -interface. If the metric is not parameterized, this should be left empty. +A string or a typed key. - + -#### func +## Dependency Objects ```python -@property -def func() -> MetricInterface +class Dependency(pydantic.BaseModel, t.Generic[T]) ``` -A typed property to return the metric function +A Monadic type which wraps a value with lifecycle and allows simple transformations. - + -#### \_\_call\_\_ +#### factory -```python -def __call__(resource: dlt.sources.DltResource, - state: MetricStateContainer) -> None -``` +The factory or instance of the dependency. -Adds a metric aggregator to a resource + - +#### lifecycle -#### InlineMetricSpecifications +The lifecycle of the dependency. -Mapping of resource name glob patterns to metric specs + - +#### conf\_spec -## PipelineFilterSpecification Objects +A hint for configuration values. -```python -class PipelineFilterSpecification(PythonEntrypoint) -``` + -Defines filters which can be applied to pipeline execution +#### alias - +Used as an alternative to inferring the name from the factory. -#### options + -Kwargs to pass to the filter function. +#### instance -This assumes the filter is a callable which accepts kwargs and returns a filter -interface. If the filter is already a filter interface, this should be left empty. +```python +@classmethod +def instance(cls, instance: t.Any) -> "Dependency" +``` - +Create a dependency from an instance. -#### func +**Arguments**: -```python -@property -def func() -> FilterInterface -``` +- `instance` - The instance to use as the dependency. + -A typed property to return the filter function +**Returns**: - + A new Dependency object with the instance lifecycle. -#### \_\_call\_\_ + + +#### singleton ```python -def __call__(resource: dlt.sources.DltResource) -> None +@classmethod +def singleton(cls, factory: t.Callable[..., T], *args: t.Any, + **kwargs: t.Any) -> "Dependency" ``` -Adds a filter to a resource +Create a singleton dependency. + +**Arguments**: - +- `factory` - The factory function to create the dependency. +- `args` - Positional arguments to pass to the factory. +- `kwargs` - Keyword arguments to pass to the factory. + -#### InlineFilterSpecifications +**Returns**: -Mapping of resource name glob patterns to filter specs + A new Dependency object with the singleton lifecycle. - + -## PipelineSpecification Objects +#### prototype ```python -class PipelineSpecification(PythonScript, Schedulable) +@classmethod +def prototype(cls, factory: t.Callable[..., T], *args: t.Any, + **kwargs: t.Any) -> "Dependency" ``` -A pipeline specification. - - +Create a prototype dependency. -#### metrics - -A dict of resource name glob patterns to metric definitions. +**Arguments**: -Metrics are captured on a per resource basis during pipeline execution and are -accumulated into the metric_state dict. The metric definitions are callables that -take the current item and the current metric value and return the new metric value. +- `factory` - The factory function to create the dependency. +- `args` - Positional arguments to pass to the factory. +- `kwargs` - Keyword arguments to pass to the factory. + - +**Returns**: -#### filters + A new Dependency object with the prototype lifecycle. -A dict of resource name glob patterns to filter definitions. + -Filters are applied on a per resource basis during pipeline execution. The filter -definitions are callables that take the current item and return a boolean indicating -whether the item should be filtered out. +#### wrap - - -#### dataset\_name - -The name of the dataset associated with the pipeline. - -Defaults to the versioned name. This string is formatted with the pipeline name, version, meta, and tags. +```python +@classmethod +def wrap(cls, obj: t.Any, *args: t.Any, **kwargs: t.Any) -> Self +``` - +Wrap an object as a dependency. -#### options +Assumes singleton lifecycle for callables unless a default lifecycle context is set. -Options available in pipeline scoped dlt config resolution. +**Arguments**: - +- `obj` - The object to wrap. + -#### persist\_extract\_package +**Returns**: -Whether to persist the extract package in the project filesystem. + A new Dependency object with the object as the factory. - + -#### inject\_metrics\_and\_filters +#### map\_value ```python -def inject_metrics_and_filters( - source: dlt.sources.DltSource, - container: MetricStateContainer) -> dlt.sources.DltSource +def map_value(func: t.Callable[[T], T]) -> Self ``` -Apply metrics and filters defined by the specification to a source. - -For a source to conform to the specification, it must have this method applied to it. You -can manipulate sources without this method, but the metrics and filters will not be applied. +Apply a function to the unwrapped value. **Arguments**: -- `source` - The source to apply metrics and filters to. -- `container` - The container to store metric state in. This is mutated during execution. +- `func` - The function to apply to the unwrapped value. **Returns**: -- `dlt.sources.DltSource` - The source with metrics and filters applied. + A new Dependency object with the function applied. - + -#### create\_pipeline +#### map ```python -def create_pipeline(klass: t.Type[TPipeline] = dlt.Pipeline, - **kwargs: t.Any) -> TPipeline +def map(*funcs: t.Callable[[t.Callable[..., T]], t.Callable[..., T]], + idempotent: bool = False) -> Self ``` -Convert the pipeline specification to a dlt pipeline object. +Apply a sequence of transformations to the wrapped value. -This is a convenience method to create a dlt pipeline object from the specification. The -dlt pipeline is expected to use the name and dataset name from the specification. This -is what allows declarative definitions to be associated with runtime artifacts. +The transformations are applied in order. This is a no-op if the dependency is +already resolved and idempotent is True or the dependency is an instance. **Arguments**: -- `klass` _t.Type[TPipeline], optional_ - The pipeline class to use. Defaults to dlt.Pipeline. -- `**kwargs` - Additional keyword arguments to pass to the dlt.pipeline constructor. +- `funcs` - The functions to apply to the wrapped value. +- `idempotent` - If True, allow transformations on resolved dependencies to be a no-op. **Returns**: -- `TPipeline` - The dlt pipeline object. - - + The Dependency object with the transformations applied. -# core.specification.publisher + - - -## PublisherSpecification Objects +#### unwrap ```python -class PublisherSpecification(PythonScript, Schedulable) +def unwrap() -> T ``` -A publisher specification. - - +Unwrap the value from the factory. -#### depends\_on + -The dependencies of the publisher expressed as fully qualified names of SQLMesh tables. +#### \_\_call\_\_ - +```python +def __call__() -> T +``` -# core.specification.notebook +Alias for unwrap. - + -## NotebookSpecification Objects +#### try\_infer\_type ```python -class NotebookSpecification(WorkspaceComponent, InstallableRequirements) +def try_infer_type() -> t.Optional[t.Type[T]] ``` -A sink specification. - - +Get the effective type of the dependency. -#### storage\_path + -The path to write the output notebook to for long term storage. +#### try\_infer\_name -Uses the configured Project fs provider. This may be gcs, s3, etc. - -This is a format string which will be formatted with the following variables: -- name: The name of the notebook. -- date: The current date. -- timestamp: An ISO formatted timestamp. -- epoch: The current epoch time. -- params: A dict of the resolved parameters passed to the notebook. +```python +def try_infer_name() -> t.Optional[str] +``` - +Infer the name of the dependency from the factory. -#### parameters + -Parameters to pass to the notebook when running. +#### generate\_key - +```python +def generate_key( + name: t.Optional[DependencyKey] = None) -> t.Union[str, TypedKey] +``` -#### gc\_duration +Generate a typed key for the dependency. -The duration in seconds to keep the locally rendered notebook in the `_rendered` folder. +**Arguments**: -Rendered notebooks are written to the `_rendered` folder of the notebook's parent directory. -That folder is not intended to be a permanent storage location. This setting controls how long -rendered notebooks are kept before being garbage collected. The default is 3 days. Set to 0 to -clean up immediately after execution. Set to -1 to never clean up. +- `name` - The name of the dependency. + - +**Returns**: -# core.specification.script + A typed key if the type can be inferred, else the name. - + -## ScriptSpecification Objects +## DependencyRegistry Objects ```python -class ScriptSpecification(PythonScript, Schedulable) +class DependencyRegistry(t.MutableMapping[DependencyKey, Dependency]) ``` -A script specification. +A registry for dependencies with lifecycle management. - +Dependencies can be registered with a name or a typed key. Typed keys are tuples +of a name and a type hint. Dependencies can be added with a lifecycle, which can be +one of prototype, singleton, or instance. Dependencies can be retrieved by name or +typed key. Dependencies can be injected into functions or classes. Dependencies can +be wired into callables to resolve a dependency graph. -# core.specification.base + -Base specification classes for continuous data framework components - - - -## BaseComponent Objects +#### \_\_init\_\_ ```python -class BaseComponent(pydantic.BaseModel) +def __init__(strict: bool = False) -> None ``` -A component specification. - -Components are the building blocks of a data platform. They declaratively describe -the functions within a workspace which extract, load, transform, and publish data. +Initialize the registry. - +**Arguments**: -#### name +- `strict` - If True, do not inject an untyped lookup for a typed dependency. -The name of the component. Must be unique within the workspace. + - +#### dependencies -#### version +```python +@property +def dependencies() -> ChainMap[t.Any, Dependency] +``` -The version of the component. +Get all dependencies. -Used internally to version datasets and serves as an external signal to dependees that something -has changed in a breaking way. All components are versioned. + - +#### add -#### owner +```python +def add(key: DependencyKey, + value: t.Any, + lifecycle: t.Optional[Lifecycle] = None, + override: bool = False, + init_args: t.Tuple[t.Any, ...] = (), + init_kwargs: t.Optional[t.Dict[str, t.Any]] = None) -> None +``` -The owners of the component. +Register a dependency with the container. - +**Arguments**: -#### description +- `key` - The name of the dependency. +- `value` - The factory or instance of the dependency. +- `lifecycle` - The lifecycle of the dependency. +- `override` - If True, override an existing dependency. +- `init_args` - Arguments to initialize the factory with. +- `init_kwargs` - Keyword arguments to initialize the factory with. -The description of the component. + -This should help users understand the purpose of the component. For scripts and entrypoints, we -will attempt to extract the relevant docstring. +#### add\_from\_dependency - +```python +def add_from_dependency(dependency: Dependency, + key: t.Optional[DependencyKey] = None, + override: bool = False) -> None +``` -#### tags +Add a Dependency object to the container. -Tags for this component used for component queries and integrations. +**Arguments**: - +- `key` - The name or typed key of the dependency. +- `dependency` - The dependency object. +- `override` - If True, override an existing dependency -#### enabled + -Whether this component is enabled. Respected in cdf operations. +#### remove - +```python +def remove(name_or_key: DependencyKey) -> None +``` -#### meta +Remove a dependency by name or key from the container. -Arbitrary user-defined metadata for this component. +**Arguments**: -Used for user-specific integrations and automation. +- `name_or_key` - The name or typed key of the dependency. - + -#### \_\_eq\_\_ +#### clear ```python -def __eq__(other: t.Any) -> bool +def clear() -> None ``` -Check if two components are equal. +Clear all dependencies and singletons. - + -#### \_\_hash\_\_ +#### has ```python -def __hash__() -> int +def has(name_or_key: DependencyKey) -> bool ``` -Hash the component. - - +Check if a dependency is registered. -#### workspace - -```python -@property -def workspace() -> "Workspace" -``` +**Arguments**: -Get the workspace containing the component. +- `name_or_key` - The name or typed key of the dependency. - + -#### has\_workspace\_association +#### resolve ```python -@property -def has_workspace_association() -> bool +def resolve(name_or_key: DependencyKey, must_exist: bool = False) -> t.Any ``` -Check if the component has a workspace association. +Get a dependency. - +**Arguments**: -#### versioned\_name +- `name_or_key` - The name or typed key of the dependency. +- `must_exist` - If True, raise KeyError if the dependency is not found. + -```python -@property -def versioned_name() -> str -``` +**Returns**: -Get the versioned name of the component. + The dependency if found, else None. - + -#### owners +#### \_\_contains\_\_ ```python -@property -def owners() -> t.List[str] +def __contains__(name: t.Any) -> bool ``` -Get the owners. +Check if a dependency is registered. - + #### \_\_getitem\_\_ ```python -def __getitem__(key: str) -> t.Any +def __getitem__(name: DependencyKey) -> t.Any ``` -Get a field from the component. +Get a dependency. Raises KeyError if not found. - + -## WorkspaceComponent Objects +#### \_\_setitem\_\_ ```python -class WorkspaceComponent(BaseComponent) +def __setitem__(name: DependencyKey, value: t.Any) -> None ``` -A component within a workspace. +Add a dependency. Defaults to singleton lifecycle if callable, else instance. - + -#### component\_path +#### \_\_delitem\_\_ -The path to the component within the workspace folder. +```python +def __delitem__(name: DependencyKey) -> None +``` - +Remove a dependency. -#### root\_path + -The base path from which to resolve the component path. +#### wire -This is typically the union of the project path and the workspace path but -for standalone components (components created programmatically outside the -context of the cdf taxonomy), it should be set to either the current working -directory (default) or the system root. +```python +def wire(func_or_cls: t.Callable[P, T]) -> t.Callable[..., T] +``` - +Inject dependencies into a function. -#### path +**Arguments**: -```python -@property -def path() -> Path -``` +- `func_or_cls` - The function or class to inject dependencies into. + + +**Returns**: -Get the path to the component. + A function that can be called with dependencies injected - + -## Schedulable Objects +#### \_\_call\_\_ ```python -class Schedulable(pydantic.BaseModel) +def __call__(func_or_cls: t.Callable[P, T], *args: t.Any, + **kwargs: t.Any) -> T ``` -A mixin for schedulable components. +Invoke a callable with dependencies injected from the registry. - +**Arguments**: -#### cron\_string +- `func_or_cls` - The function or class to invoke. +- `args` - Positional arguments to pass to the callable. +- `kwargs` - Keyword arguments to pass to the callable. + -A cron expression for scheduling the primary action associated with the component. +**Returns**: -This is intended to be leveraged by libraries like Airflow. + The result of the callable - + -#### cron +#### \_\_iter\_\_ ```python -@property -def cron() -> t.Optional[croniter] +def __iter__() -> t.Iterator[TypedKey] ``` -Get the croniter instance. +Iterate over dependency names. - + -#### next\_run +#### \_\_len\_\_ ```python -def next_run() -> t.Optional[int] +def __len__() -> int ``` -Get the next run time for the component. +Return the number of dependencies. - + -#### is\_due +#### \_\_bool\_\_ ```python -def is_due() -> bool +def __bool__() -> bool ``` -Check if the component is due to run. +True if the registry has dependencies. - + -## InstallableRequirements Objects +#### \_\_or\_\_ ```python -class InstallableRequirements(pydantic.BaseModel) +def __or__(other: "DependencyRegistry") -> "DependencyRegistry" ``` -A mixin for components that support installation of requirements. - - - -#### requirements +Merge two registries like pythons dict union overload. -The requirements for the component. + - - -#### install\_requirements +#### \_\_getstate\_\_ ```python -def install_requirements() -> None +def __getstate__() -> t.Dict[str, t.Any] ``` -Install the component. +Serialize the state. - + -## PythonScript Objects +#### \_\_setstate\_\_ ```python -class PythonScript(WorkspaceComponent, InstallableRequirements) +def __setstate__(state: t.Dict[str, t.Any]) -> None ``` -A python script component. - - +Deserialize the state. -#### auto\_install + -Whether to automatically install the requirements for the script. +#### GLOBAL\_REGISTRY -Useful for leaner Docker images which defer certain component dep installs to runtime. +A global dependency registry. - + -#### package +# core.injector -```python -def package(outputdir: str) -> None -``` + -Package the component. +# core.injector.errors - + -#### main +## DependencyCycleError Objects ```python -@property -def main() -> t.Callable[[], t.Dict[str, t.Any]] +class DependencyCycleError(Exception) ``` -Get the entrypoint function. +Raised when a dependency cycle is detected. - + -#### \_\_call\_\_ +## DependencyMutationError Objects ```python -def __call__() -> t.Dict[str, t.Any] +class DependencyMutationError(Exception) ``` -Run the script. +Raised when an instance/singleton dependency has already been resolved but a mutation is attempted. - + -## PythonEntrypoint Objects +# proxy -```python -class PythonEntrypoint(BaseComponent, InstallableRequirements) -``` +The proxy module provides a MySQL proxy server for the CDF. -A python entrypoint component. +The proxy server is used to intercept MySQL queries and execute them using SQLMesh. +This allows it to integrate with BI tools and other MySQL clients. Furthermore, +during interception, the server can rewrite queries expanding semantic references +making it an easy to use semantic layer for SQLMesh. - + -#### entrypoint +# proxy.planner -The entrypoint of the component in the format module:func. +An http server which executed a plan which is a pickled pydantic model - +This is purely a POC. It will be replaced by a more robust solution in the future +using flask or fastapi. It will always be designed such that input must be +trusted. In an environment where the input is not trusted, the server should +never be exposed to the internet. It should always be behind a firewall and +only accessible by trusted clients. -#### main + + +#### run\_plan\_server ```python -@property -def main() -> t.Callable[..., t.Any] +def run_plan_server(port: int, context: sqlmesh.Context) -> None ``` -Get the entrypoint function. +Listen on a port and execute plans. - + -#### \_\_call\_\_ - -```python -def __call__(*args: t.Any, **kwargs: t.Any) -> t.Any -``` +# proxy.mysql -Run the entrypoint. +A MySQL proxy server which uses SQLMesh to execute queries. - + -## CanExecute Objects +#### file\_watcher ```python -class CanExecute(t.Protocol) +async def file_watcher(context: sqlmesh.Context) -> None ``` -A protocol specifying the minimum interface executable components satisfy. - - - -# core.context - -The context module provides thread-safe context variables and injection mechanisms. +Watch for changes in the workspace and refresh the context. -It facilitates communication between specifications and runtime modules. + - +## SQLMeshSession Objects -#### active\_project +```python +class SQLMeshSession(Session) +``` -The active workspace context variable. +A session for the MySQL proxy server which uses SQLMesh. -The allows the active workspace to be passed to user-defined scripts. The workspace -has a reference to the project configuration and filesystem. + - +#### query -#### active\_pipeline +```python +async def query( + expression: exp.Expression, sql: str, + attrs: t.Dict[str, + str]) -> t.Tuple[t.Tuple[t.Tuple[t.Any], ...], t.List[str]] +``` -Stores the active pipeline. +Execute a query. -This is the primary mechanism to pass a configured pipeline to user-defined scripts. + - +#### schema -#### debug\_mode +```python +async def schema() -> t.Dict[str, t.Dict[str, t.Dict[str, str]]] +``` -The debug mode context variable. +Get the schema of the database. -Allows us to mutate certain behaviors in the runtime based on the debug mode. User can -optionally introspect this. + - +#### run\_mysql\_proxy -#### extract\_limit +```python +async def run_mysql_proxy(context: sqlmesh.Context) -> None +``` -The extract limit context variable. +Run the MySQL proxy server. -Lets us set a limit on the number of items to extract from a source. This variable -can be introspected by user-defined scripts to optimize for partial extraction. + - +# integrations -# core.project + -The project module provides a way to define a project and its workspaces. +# integrations.slack -Everything in CDF is described via a simple configuration structure. We parse this configuration -using dynaconf which provides a simple way to load configuration from various sources such as -environment variables, YAML, TOML, JSON, and Python files. It also provides many other features -such as loading .env files, env-specific configuration, templating via @ tokens, and more. The -configuration is then validated with pydantic to ensure it is correct and to give us well defined -types to work with. The underlying dynaconf settings object is stored in the `wrapped` attribute -of the Project and Workspace settings objects. This allows us to access the raw configuration -values if needed. ChainMaps are used to provide a scoped view of the configuration. This enables -a powerful layering mechanism where we can override configuration values at different levels. -Finally, we provide a context manager to inject the project configuration into the dlt context -which allows us to access the configuration throughout the dlt codebase and in data pipelines. + -**Example**: +## SlackMessageComposer Objects - -```toml -# cdf.toml -[default] -name = "cdf-example" -workspaces = ["alex"] -filesystem.uri = "file://_storage" -feature_flags.provider = "filesystem" -feature_flags.filename = "feature_flags.json" - -[prod] -filesystem.uri = "gcs://bucket/path" -``` - -```toml -# alex/cdf.toml -[pipelines.us_cities] # alex/pipelines/us_cities_pipeline.py -version = 1 -dataset_name = "us_cities_v0_{version}" -description = "Get US city data" -options.full_refresh = false -options.runtime.dlthub_telemetry = false +```python +class SlackMessageComposer() ``` - +Builds Slack message with primary and secondary blocks + + -## \_BaseSettings Objects +#### \_\_init\_\_ ```python -class _BaseSettings(pydantic.BaseModel) +def __init__(initial_message: t.Optional[TSlackMessage] = None) -> None ``` -A base model for CDF settings +Initialize the Slack message builder - + -#### is\_newer\_than +#### add\_primary\_blocks ```python -def is_newer_than(other: "Project") -> bool +def add_primary_blocks(*blocks: TSlackBlock) -> "SlackMessageComposer" ``` -Check if the model is newer than another model +Add blocks to the message. Blocks are always displayed - + -#### is\_older\_than +#### add\_secondary\_blocks ```python -def is_older_than(other: "Project") -> bool +def add_secondary_blocks(*blocks: TSlackBlock) -> "SlackMessageComposer" ``` -Check if the model is older than another model +Add attachments to the message + +Attachments are hidden behind "show more" button. The first 5 attachments +are always displayed. NOTICE: attachments blocks are deprecated by Slack - + -#### model\_dump +#### normalize\_message ```python -def model_dump(**kwargs: t.Any) -> t.Dict[str, t.Any] +def normalize_message( + message: t.Union[str, t.List[str], t.Iterable[str]]) -> str ``` -Dump the model to a dictionary +Normalize message to fit Slack's max text length - + -## FilesystemConfig Objects +#### divider\_block ```python -class FilesystemConfig(_BaseSettings) +def divider_block() -> dict ``` -Configuration for a filesystem provider +Create a divider block - + -#### uri +#### fields\_section\_block -The filesystem URI +```python +def fields_section_block(*messages: str) -> dict +``` -This is based on fsspec. See https://filesystem-spec.readthedocs.io/en/latest/index.html -This supports all filesystems supported by fsspec as well as filesystem chaining. +Create a section block with multiple fields - + -#### options\_ +#### text\_section\_block -The filesystem options +```python +def text_section_block(message: str) -> dict +``` -Options are passed to the filesystem provider as keyword arguments. +Create a section block with text - + -#### options +#### empty\_section\_block ```python -@property -def options() -> t.Dict[str, t.Any] +def empty_section_block() -> dict ``` -Get the filesystem options as a dictionary +Create an empty section block - + -#### project +#### context\_block ```python -@property -def project() -> "Project" +def context_block(*messages: str) -> dict ``` -Get the project this configuration belongs to +Create a context block with multiple fields - + -#### has\_project\_association +#### header\_block ```python -@property -def has_project_association() -> bool +def header_block(message: str) -> dict ``` -Check if the configuration is associated with a project +Create a header block - + -#### get\_adapter +#### button\_action\_block ```python -def get_adapter() -> M.Result[FilesystemAdapter, Exception] +def button_action_block(text: str, url: str) -> dict ``` -Get a filesystem adapter +Create a button action block - + -## FeatureFlagProviderType Objects +#### compacted\_sections\_blocks ```python -class FeatureFlagProviderType(str, Enum) +def compacted_sections_blocks( + *messages: t.Union[str, t.Iterable[str]]) -> t.List[dict] ``` -The feature flag provider +Create a list of compacted sections blocks - + -## BaseFeatureFlagConfig Objects +## SlackAlertIcon Objects ```python -class BaseFeatureFlagConfig(_BaseSettings) +class SlackAlertIcon(str, Enum) ``` -Base configuration for a feature flags provider - - - -#### provider - -The feature flags provider +Enum for status of the alert - + -#### project +#### stringify\_list ```python -@property -def project() -> "Project" +def stringify_list(list_variation: t.Union[t.List[str], str]) -> str ``` -Get the project this configuration belongs to +Prettify and deduplicate list of strings converting it to a newline delimited string - + -#### has\_project\_association +#### send\_basic\_slack\_message ```python -@property -def has_project_association() -> bool +def send_basic_slack_message(incoming_hook: str, + message: str, + is_markdown: bool = True) -> None ``` -Check if the configuration is associated with a project +Sends a `message` to Slack `incoming_hook`, by default formatted as markdown. - + -#### get\_adapter +#### send\_extract\_start\_slack\_message ```python -def get_adapter(**kwargs: t.Any - ) -> M.Result[AbstractFeatureFlagAdapter, Exception] +def send_extract_start_slack_message(incoming_hook: str, source: str, + run_id: str, tags: t.List[str], + owners: t.List[str], environment: str, + resources_selected: t.List[str], + resources_count: int) -> None ``` -Get a handle to the feature flag adapter +Sends a Slack message for the start of an extract - + -## FilesystemFeatureFlagConfig Objects +#### send\_extract\_failure\_message ```python -class FilesystemFeatureFlagConfig(BaseFeatureFlagConfig) +def send_extract_failure_message(incoming_hook: str, source: str, run_id: str, + duration: float, error: Exception) -> None ``` -Configuration for a feature flags provider that uses the configured filesystem +Sends a Slack message for the failure of an extract + + - +#### send\_extract\_success\_message -#### provider +```python +def send_extract_success_message(incoming_hook: str, source: str, run_id: str, + duration: float) -> None +``` -The feature flags provider +Sends a Slack message for the success of an extract - + -#### filename +#### send\_normalize\_start\_slack\_message -The feature flags filename. +```python +def send_normalize_start_slack_message(incoming_hook: str, source: str, + blob_name: str, run_id: str, + environment: str) -> None +``` -This is a format string that can include the following variables: -- `name`: The project name -- `workspace`: The workspace name -- `environment`: The environment name -- `source`: The source name -- `resource`: The resource name -- `version`: The version number of the component +Sends a Slack message for the start of an extract - + -## HarnessFeatureFlagConfig Objects +#### send\_normalize\_failure\_message ```python -class HarnessFeatureFlagConfig(BaseFeatureFlagConfig) +def send_normalize_failure_message(incoming_hook: str, source: str, + blob_name: str, run_id: str, + duration: float, error: Exception) -> None ``` -Configuration for a feature flags provider that uses the Harness API +Sends a Slack message for the failure of an normalization - + -#### provider +#### send\_normalization\_success\_message -The feature flags provider - - - -#### api\_key - -The harness API key. Get it from your user settings - - - -#### sdk\_key - -The harness SDK key. Get it from the environment management page of the FF module - - - -#### account - -The harness account ID. We will attempt to read it from the environment if not provided. - - - -#### organization - -The harness organization ID. We will attempt to read it from the environment if not provided. - - - -#### project\_ +```python +def send_normalization_success_message(incoming_hook: str, source: str, + blob_name: str, run_id: str, + duration: float) -> None +``` -The harness project ID. We will attempt to read it from the environment if not provided. +Sends a Slack message for the success of an normalization - + -## LaunchDarklyFeatureFlagSettings Objects +#### send\_load\_start\_slack\_message ```python -class LaunchDarklyFeatureFlagSettings(BaseFeatureFlagConfig) +def send_load_start_slack_message(incoming_hook: str, source: str, + destination: str, dataset: str, + run_id: str) -> None ``` -Configuration for a feature flags provider that uses the LaunchDarkly API - - - -#### provider +Sends a Slack message for the start of a load -The feature flags provider + - +#### send\_load\_failure\_message -#### api\_key +```python +def send_load_failure_message(incoming_hook: str, source: str, + destination: str, dataset: str, + run_id: str) -> None +``` -The LaunchDarkly API key. Get it from your user settings +Sends a Slack message for the failure of an load - + -## SplitFeatureFlagSettings Objects +#### send\_load\_success\_message ```python -class SplitFeatureFlagSettings(BaseFeatureFlagConfig) +def send_load_success_message(incoming_hook: str, source: str, + destination: str, dataset: str, run_id: str, + payload: str) -> None ``` -Configuration for a feature flags provider that uses the Split API - - - -#### provider - -The feature flags provider +Sends a Slack message for the success of an normalization - + -#### api\_key +# integrations.feature\_flag.launchdarkly -The Split API key. Get it from your user settings +LaunchDarkly feature flag provider. - + -## NoopFeatureFlagSettings Objects +## LaunchDarklyFeatureFlagAdapter Objects ```python -class NoopFeatureFlagSettings(BaseFeatureFlagConfig) +class LaunchDarklyFeatureFlagAdapter(AbstractFeatureFlagAdapter) ``` -Configuration for a feature flags provider that does nothing - - - -#### provider - -The feature flags provider - - - -#### FeatureFlagConfig - -A union of all feature flag provider configurations +A feature flag adapter that uses LaunchDarkly. - + -## Workspace Objects +#### \_\_init\_\_ ```python -class Workspace(_BaseSettings) +@with_config(sections=("feature_flags", )) +def __init__(sdk_key: str, **kwargs: t.Any) -> None ``` -A workspace is a collection of pipelines, sinks, publishers, scripts, and notebooks in a subdirectory of the project - - - -#### workspace\_path - -The path to the workspace within the project path +Initialize the LaunchDarkly feature flags. - +**Arguments**: -#### project\_path +- `sdk_key` - The SDK key to use for LaunchDarkly. -The path to the project + - +# integrations.feature\_flag.harness -#### name +Harness feature flag provider. -The name of the workspace + - +## HarnessFeatureFlagAdapter Objects -#### owner +```python +class HarnessFeatureFlagAdapter(AbstractFeatureFlagAdapter) +``` -The owner of the workspace + - +#### \_\_init\_\_ -#### pipelines +```python +@with_config(sections=("feature_flags", )) +def __init__(sdk_key: str = dlt.secrets.value, + api_key: str = dlt.secrets.value, + account: str = dlt.secrets.value, + organization: str = dlt.secrets.value, + project: str = dlt.secrets.value, + **kwargs: t.Any) -> None +``` -Pipelines move data from sources to sinks +Initialize the adapter. - + -#### sinks +#### client -A sink is a destination for data +```python +@property +def client() -> CfClient +``` - +Get the client and cache it in the instance. -#### publishers + -Publishers send data to external systems +#### pool - +```python +@property +def pool() -> ThreadPoolExecutor +``` -#### scripts +Get the thread pool. -Scripts are used to automate tasks + - +#### get -#### notebooks +```python +def get(feature_name: str) -> FlagAdapterResponse +``` -Notebooks are used for data analysis and reporting +Get a feature flag. - + -#### path +#### get\_all\_feature\_names ```python -@property -def path() -> Path +def get_all_feature_names() -> t.List[str] ``` -Get the path to the workspace +Get all the feature flags. - + -#### \_\_getitem\_\_ +#### save ```python -def __getitem__(key: str) -> t.Any +def save(feature_name: str, flag: bool) -> None ``` -Get a component by name +Create a feature flag. - + -#### \_\_setitem\_\_ +#### save\_many ```python -def __setitem__(key: str, value: t.Any) -> None +def save_many(flags: t.Dict[str, bool]) -> None ``` -Set a component by name +Create many feature flags. - + -#### \_\_delitem\_\_ +#### delete ```python -def __delitem__(key: str) -> None +def delete(feature_name: str) -> None ``` -Delete a component by name +Drop a feature flag. - + -#### \_\_len\_\_ +#### delete\_many ```python -def __len__() -> int +def delete_many(feature_names: t.List[str]) -> None ``` -Get the number of components +Drop many feature flags. - + -#### \_\_iter\_\_ +#### apply\_source ```python -def __iter__() -> t.Iterator[spec.CoreSpecification] +def apply_source(source: DltSource, *namespace: str) -> DltSource ``` -Iterate over the components +Apply the feature flags to a dlt source. - + -#### \_\_contains\_\_ +#### \_\_del\_\_ ```python -def __contains__(key: str) -> bool +def __del__() -> None ``` -Check if a component exists - - +Close the client. -#### get\_component\_names + -```python -def get_component_names() -> t.List[str] -``` +# integrations.feature\_flag -Get the component names +Feature flag providers implement a uniform interface and are wrapped by an adapter. - +The adapter is responsible for loading the correct provider and applying the feature flags within +various contexts in cdf. This allows for a clean separation of concerns and makes it easy to +implement new feature flag providers in the future. -#### items + -```python -def items() -> t.Iterator[t.Tuple[str, spec.CoreSpecification]] -``` +#### ADAPTERS -Iterate over the components +Feature flag provider adapters classes by name. - + -#### get\_pipeline\_spec +#### get\_feature\_flag\_adapter\_cls ```python -def get_pipeline_spec( - name: str) -> M.Result[spec.PipelineSpecification, Exception] +@with_config(sections=("feature_flags", )) +def get_feature_flag_adapter_cls( + provider: str = dlt.config.value +) -> M.Result[t.Type[AbstractFeatureFlagAdapter], Exception] ``` -Get a pipeline by name - - +Get a feature flag adapter by name. -#### get\_sink\_spec +**Arguments**: -```python -def get_sink_spec(name: str) -> M.Result[spec.SinkSpecification, Exception] -``` +- `provider` - The name of the feature flag adapter. +- `options` - The configuration for the feature flag adapter. + -Get a sink by name +**Returns**: - + The feature flag adapter. -#### get\_publisher\_spec + -```python -def get_publisher_spec( - name: str) -> M.Result[spec.PublisherSpecification, Exception] -``` +# integrations.feature\_flag.file -Get a publisher by name +File-based feature flag provider. - + -#### get\_script\_spec +## FilesystemFeatureFlagAdapter Objects ```python -def get_script_spec( - name: str) -> M.Result[spec.ScriptSpecification, Exception] +class FilesystemFeatureFlagAdapter(AbstractFeatureFlagAdapter) ``` -Get a script by name +A feature flag adapter that uses the filesystem. - + -#### get\_notebook\_spec +#### \_\_init\_\_ ```python -def get_notebook_spec( - name: str) -> M.Result[spec.NotebookSpecification, Exception] +@with_config(sections=("feature_flags", )) +def __init__(filesystem: fsspec.AbstractFileSystem, + filename: str = dlt.config.value, + **kwargs: t.Any) -> None ``` -Get a notebook by name +Initialize the filesystem feature flags. + +**Arguments**: + +- `filesystem` - The filesystem to use. +- `filename` - The filename to use for the feature flags. - + -#### project +#### get ```python -@property -def project() -> "Project" +def get(feature_name: str) -> FlagAdapterResponse ``` -Get the project this workspace belongs to +Get a feature flag. - +**Arguments**: -#### has\_project\_association +- `feature_name` - The name of the feature flag. + -```python -@property -def has_project_association() -> bool -``` +**Returns**: -Check if the workspace is associated with a project + The feature flag. - + -#### inject\_configuration +#### get\_all\_feature\_names ```python -@contextmanager -def inject_configuration() -> t.Iterator[None] +def get_all_feature_names() -> t.List[str] ``` -Inject the workspace configuration into the context +Get all feature flag names. + +**Returns**: + + The feature flag names. - + -#### fs\_adapter +#### save ```python -@property -def fs_adapter() -> FilesystemAdapter +def save(feature_name: str, flag: bool) -> None ``` -Get a handle to the project filesystem adapter +Save a feature flag. + +**Arguments**: + +- `feature_name` - The name of the feature flag. +- `flag` - The value of the feature flag. - + -#### ff\_adapter +#### save\_many ```python -@property -def ff_adapter() -> AbstractFeatureFlagAdapter +def save_many(flags: t.Dict[str, bool]) -> None ``` -Get a handle to the project feature flag adapter +Save multiple feature flags. + +**Arguments**: - +- `flags` - The feature flags to save. -#### state + -```python -@property -def state() -> StateStore -``` +# integrations.feature\_flag.split -Get a handle to the project state store +Split feature flag provider. - + -#### get\_transform\_gateways +## SplitFeatureFlagAdapter Objects ```python -def get_transform_gateways() -> t.Iterator[t.Tuple[str, "GatewayConfig"]] +class SplitFeatureFlagAdapter(AbstractFeatureFlagAdapter) ``` -Get the SQLMesh gateway configurations +A feature flag adapter that uses Split. - + -#### get\_transform\_context +#### \_\_init\_\_ ```python -def get_transform_context(name: t.Optional[str] = None) +def __init__(sdk_key: str, **kwargs: t.Any) -> None ``` -Get the SQLMesh context for the workspace - -We expect a config.py file in the workspace directory that uses the -`get_transform_gateways` method to populate the SQLMesh Config.gateways key. +Initialize the Split feature flags. **Arguments**: -- `name` - The name of the gateway to use. - +- `sdk_key` - The SDK key to use for Split. -**Returns**: + + +# integrations.feature\_flag.noop - The SQLMesh context. +No-op feature flag provider. - + -## Project Objects +## NoopFeatureFlagAdapter Objects ```python -class Project(_BaseSettings) +class NoopFeatureFlagAdapter(AbstractFeatureFlagAdapter) ``` -A project is a collection of workspaces and configuration settings +A feature flag adapter that does nothing. - + -#### path +#### \_\_init\_\_ -The path to the project +```python +def __init__(**kwargs: t.Any) -> None +``` - +Initialize the adapter. -#### name + -The name of the project +# integrations.feature\_flag.base - + -#### version +## FlagAdapterResponse Objects -The version of the project +```python +class FlagAdapterResponse(Enum) +``` - +Feature flag response. -#### owner +This enum is used to represent the state of a feature flag. It is similar +to a boolean but with an extra state for when the flag is not found. -The owner of the project + - +#### ENABLED -#### documentation +The feature flag is enabled. -The project documentation + - +#### DISABLED -#### workspaces +The feature flag is disabled. -The project workspaces + - +#### NOT\_FOUND -#### fs +The feature flag is not found. -The project filesystem settings + - +#### \_\_bool\_\_ -#### ff +```python +def __bool__() -> bool +``` -The project feature flags provider settings +Return True if the flag is enabled and False otherwise. - + -#### state - -The project state connection settings - - - -#### \_\_getitem\_\_ - -```python -def __getitem__(key: str) -> t.Any -``` - -Get an item from the configuration - - - -#### \_\_setitem\_\_ - -```python -def __setitem__(key: str, value: t.Any) -> None -``` - -Set an item in the configuration - - - -#### \_\_delitem\_\_ - -```python -def __delitem__(key: str) -> None -``` - -Delete a workspace - - - -#### \_\_len\_\_ - -```python -def __len__() -> int -``` - -Get the number of workspaces - - - -#### \_\_iter\_\_ - -```python -def __iter__() -> t.Iterator[Workspace] -``` - -Iterate over the workspaces - - - -#### \_\_contains\_\_ - -```python -def __contains__(key: str) -> bool -``` - -Check if a workspace exists - - - -#### get\_workspace\_names - -```python -def get_workspace_names() -> t.List[str] -``` - -Get the workspace names - - - -#### items - -```python -def items() -> t.Iterator[t.Tuple[str, Workspace]] -``` - -Iterate over the workspaces - - - -#### get\_workspace +#### \_\_eq\_\_ ```python -def get_workspace(name: str) -> M.Result[Workspace, Exception] +def __eq__(value: object) -> bool ``` -Get a workspace by name +Compare the flag to a boolean. - + -#### get\_workspace\_from\_path +#### from\_bool ```python -def get_workspace_from_path(path: PathLike) -> M.Result[Workspace, Exception] +@classmethod +def from_bool(cls, flag: bool) -> "FlagAdapterResponse" ``` -Get a workspace by path. +Convert a boolean to a flag response. - + -#### to\_scoped\_dict +## AbstractFeatureFlagAdapter Objects ```python -def to_scoped_dict(workspace: t.Optional[str] = None) -> ChainMap[str, t.Any] +class AbstractFeatureFlagAdapter(abc.ABC) ``` -Convert the project settings to a scoped dictionary - -Lookups are performed in the following order: -- The extra configuration, holding data set via __setitem__. -- The workspace configuration, if passed. -- The project configuration. -- The wrapped configuration, if available. Typically a dynaconf settings object. - -Boxing allows us to access nested values using dot notation. This is doubly useful -since ChainMaps will move to the next map in the chain if the dotted key is not -fully resolved in the current map. +Abstract feature flag adapter. - + -#### inject\_configuration +#### \_\_init\_\_ ```python -@contextmanager -def inject_configuration( - workspace: t.Optional[str] = None) -> t.Iterator[None] +def __init__(**kwargs: t.Any) -> None ``` -Inject the project configuration into the context +Initialize the adapter. - + -#### fs\_adapter +#### get ```python -@cached_property -def fs_adapter() -> FilesystemAdapter +@abc.abstractmethod +def get(feature_name: str) -> FlagAdapterResponse ``` -Get a configured filesystem adapter +Get the feature flag. - + -#### ff\_adapter +#### \_\_getitem\_\_ ```python -@cached_property -def ff_adapter() -> AbstractFeatureFlagAdapter +def __getitem__(feature_name: str) -> FlagAdapterResponse ``` -Get a handle to the project's configured feature flag adapter +Get the feature flag. - + -#### duckdb +#### get\_many ```python -@cached_property -def duckdb() -> duckdb.DuckDBPyConnection +def get_many(feature_names: t.List[str]) -> t.Dict[str, FlagAdapterResponse] ``` -Get a handle to the project's DuckDB connection - - - -#### get\_workspace\_path - -```python -def get_workspace_path(name: str) -> M.Result[Path, Exception] -``` +Get many feature flags. -Get the path to a workspace by name +Implementations should override this method if they can optimize it. The default +will call get in a loop. - + -#### from\_path +#### save ```python -@classmethod -def from_path(cls, root: PathLike) +@abc.abstractmethod +def save(feature_name: str, flag: bool) -> None ``` -Load configuration data from a project root path using dynaconf. - -**Arguments**: - -- `root` - The root path to the project. - - -**Returns**: - - A Project object. +Save the feature flag. - + -#### activate +#### \_\_setitem\_\_ ```python -def activate() -> t.Callable[[], None] +def __setitem__(feature_name: str, flag: bool) -> None ``` -Activate the project and return a deactivation function +Save the feature flag. - + -#### activated +#### save\_many ```python -@contextmanager -def activated() -> t.Iterator[None] +def save_many(flags: t.Dict[str, bool]) -> None ``` -Activate the project for the duration of the context - - - -#### load\_project - -Load configuration data from a project root path using dynaconf. - -**Arguments**: - -- `root` - The root path to the project. - - -**Returns**: - - A Result monad with a Project object if successful. Otherwise, a Result monad with an error. - - - -# core.state +Save many feature flags. -The state module is responible for providing an adapter through which we can persist data +Implementations should override this method if they can optimize it. The default +will call save in a loop. - + -## StateStore Objects +#### get\_all\_feature\_names ```python -class StateStore(pydantic.BaseModel) +@abc.abstractmethod +def get_all_feature_names() -> t.List[str] ``` -The state store is responsible for persisting data - - - -#### schema - -The schema in which to store data - - - -#### protected - -Whether the state store is protected, i.e. should never be torn down - -A safety measure to prevent accidental data loss when users are consuming the cdf API -directly. This should be set to False when running tests or you know what you're doing. - - - -#### connection - -The connection configuration to the state store +Get all feature names. - + -#### adapter +#### keys ```python -@property -def adapter() -> EngineAdapter +def keys() -> t.List[str] ``` -The adapter to the state store +Get all feature names. - + -#### setup +#### \_\_iter\_\_ ```python -def setup() -> None +def __iter__() -> t.Iterator[str] ``` -Setup the state store +Iterate over the feature names. - + -#### teardown +#### \_\_contains\_\_ ```python -def teardown() -> None +def __contains__(feature_name: str) -> bool ``` -Teardown the state store +Check if a feature flag exists. - + -#### store\_json +#### \_\_len\_\_ ```python -def store_json(key: str, value: t.Any) -> None +def __len__() -> int ``` -Store a JSON value +Get the number of feature flags. - + -#### \_\_del\_\_ +#### delete ```python -def __del__() -> None +def delete(feature_name: str) -> None ``` -Close the connection to the state store - - +Delete a feature flag. -# core.utility +By default, this will disable the flag but implementations can override this method +to delete the flag. - + -#### find\_item +#### delete\_many ```python -def find_item(lst: t.List[TDict], key: t.Union[t.Callable[[TDict], t.Any], - str], value: t.Any) -> TDict +def delete_many(feature_names: t.List[str]) -> None ``` -Find an item in a list by a key-value pair. - -**Example**: - - >>> find_item([{"name": "Alice"}, {"name": "Bob"}], "name", "Bob") -- `{"name"` - "Bob"} - - -**Arguments**: - -- `lst` - The list to search. -- `key` - The key function to extract the value from an item or the key name. -- `value` - The value to find. - - -**Returns**: - - The item with the matching value. - - - -# core.utility.file +Delete many feature flags. - + -#### load\_file +#### apply\_source ```python -def load_file(path: Path) -> M.Result[t.Dict[str, t.Any], Exception] +def apply_source(source: "DltSource", *namespace: str) -> "DltSource" ``` -Load a configuration from a file path. +Apply the feature flags to a dlt source. **Arguments**: -- `path` - The file path. +- `source` - The source to apply the feature flags to. **Returns**: - A Result monad with the configuration dictionary if the file format is JSON, YAML or TOML. - Otherwise, a Result monad with an error. - - - -# core.feature\_flag.launchdarkly - -LaunchDarkly feature flag provider. - - - -## LaunchDarklyFeatureFlagAdapter Objects - -```python -class LaunchDarklyFeatureFlagAdapter(AbstractFeatureFlagAdapter) -``` - -A feature flag adapter that uses LaunchDarkly. - - - -#### \_\_init\_\_ - -```python -@with_config(sections=("feature_flags", )) -def __init__(sdk_key: str, **kwargs: t.Any) -> None -``` - -Initialize the LaunchDarkly feature flags. - -**Arguments**: - -- `sdk_key` - The SDK key to use for LaunchDarkly. - - - -# core.feature\_flag.harness - -Harness feature flag provider. - - - -## HarnessFeatureFlagAdapter Objects - -```python -class HarnessFeatureFlagAdapter(AbstractFeatureFlagAdapter) -``` - - - -#### \_\_init\_\_ - -```python -@with_config(sections=("feature_flags", )) -def __init__(sdk_key: str = dlt.secrets.value, - api_key: str = dlt.secrets.value, - account: str = dlt.secrets.value, - organization: str = dlt.secrets.value, - project: str = dlt.secrets.value, - **kwargs: t.Any) -> None -``` - -Initialize the adapter. - - - -#### client - -```python -@property -def client() -> CfClient -``` - -Get the client and cache it in the instance. - - - -#### pool - -```python -@property -def pool() -> ThreadPoolExecutor -``` - -Get the thread pool. - - - -#### get - -```python -def get(feature_name: str) -> FlagAdapterResponse -``` - -Get a feature flag. - - - -#### get\_all\_feature\_names - -```python -def get_all_feature_names() -> t.List[str] -``` - -Get all the feature flags. - - - -#### save - -```python -def save(feature_name: str, flag: bool) -> None -``` - -Create a feature flag. - - - -#### save\_many - -```python -def save_many(flags: t.Dict[str, bool]) -> None -``` - -Create many feature flags. - - - -#### delete - -```python -def delete(feature_name: str) -> None -``` - -Drop a feature flag. - - - -#### delete\_many - -```python -def delete_many(feature_names: t.List[str]) -> None -``` - -Drop many feature flags. - - - -#### apply\_source - -```python -def apply_source(source: DltSource, *namespace: str) -> DltSource -``` - -Apply the feature flags to a dlt source. - - - -#### \_\_del\_\_ - -```python -def __del__() -> None -``` - -Close the client. - - - -# core.feature\_flag - -Feature flag providers implement a uniform interface and are wrapped by an adapter. - -The adapter is responsible for loading the correct provider and applying the feature flags within -various contexts in cdf. This allows for a clean separation of concerns and makes it easy to -implement new feature flag providers in the future. - - - -#### ADAPTERS - -Feature flag provider adapters classes by name. - - - -#### get\_feature\_flag\_adapter\_cls - -```python -@with_config(sections=("feature_flags", )) -def get_feature_flag_adapter_cls( - provider: str = dlt.config.value -) -> M.Result[t.Type[AbstractFeatureFlagAdapter], Exception] -``` - -Get a feature flag adapter by name. - -**Arguments**: - -- `provider` - The name of the feature flag adapter. -- `options` - The configuration for the feature flag adapter. - - -**Returns**: - - The feature flag adapter. - - - -# core.feature\_flag.file - -File-based feature flag provider. - - - -## FilesystemFeatureFlagAdapter Objects - -```python -class FilesystemFeatureFlagAdapter(AbstractFeatureFlagAdapter) -``` - -A feature flag adapter that uses the filesystem. - - - -#### \_\_init\_\_ - -```python -@with_config(sections=("feature_flags", )) -def __init__(filesystem: fsspec.AbstractFileSystem, - filename: str = dlt.config.value, - **kwargs: t.Any) -> None -``` - -Initialize the filesystem feature flags. - -**Arguments**: - -- `filesystem` - The filesystem to use. -- `filename` - The filename to use for the feature flags. - - - -#### get - -```python -def get(feature_name: str) -> FlagAdapterResponse -``` - -Get a feature flag. - -**Arguments**: - -- `feature_name` - The name of the feature flag. - - -**Returns**: - - The feature flag. - - - -#### get\_all\_feature\_names - -```python -def get_all_feature_names() -> t.List[str] -``` - -Get all feature flag names. - -**Returns**: - - The feature flag names. - - - -#### save - -```python -def save(feature_name: str, flag: bool) -> None -``` - -Save a feature flag. - -**Arguments**: - -- `feature_name` - The name of the feature flag. -- `flag` - The value of the feature flag. - - - -#### save\_many - -```python -def save_many(flags: t.Dict[str, bool]) -> None -``` - -Save multiple feature flags. - -**Arguments**: - -- `flags` - The feature flags to save. - - - -# core.feature\_flag.split - -Split feature flag provider. - - - -## SplitFeatureFlagAdapter Objects - -```python -class SplitFeatureFlagAdapter(AbstractFeatureFlagAdapter) -``` - -A feature flag adapter that uses Split. - - - -#### \_\_init\_\_ - -```python -def __init__(sdk_key: str, **kwargs: t.Any) -> None -``` - -Initialize the Split feature flags. - -**Arguments**: - -- `sdk_key` - The SDK key to use for Split. - - - -# core.feature\_flag.noop - -No-op feature flag provider. - - - -## NoopFeatureFlagAdapter Objects - -```python -class NoopFeatureFlagAdapter(AbstractFeatureFlagAdapter) -``` - -A feature flag adapter that does nothing. - - - -#### \_\_init\_\_ - -```python -def __init__(**kwargs: t.Any) -> None -``` - -Initialize the adapter. - - - -# core.feature\_flag.base - - - -## FlagAdapterResponse Objects - -```python -class FlagAdapterResponse(Enum) -``` - -Feature flag response. - -This enum is used to represent the state of a feature flag. It is similar -to a boolean but with an extra state for when the flag is not found. - - - -#### ENABLED - -The feature flag is enabled. - - - -#### DISABLED - -The feature flag is disabled. - - - -#### NOT\_FOUND - -The feature flag is not found. - - - -#### \_\_bool\_\_ - -```python -def __bool__() -> bool -``` - -Return True if the flag is enabled and False otherwise. - - - -#### \_\_eq\_\_ - -```python -def __eq__(value: object) -> bool -``` - -Compare the flag to a boolean. - - - -#### from\_bool - -```python -@classmethod -def from_bool(cls, flag: bool) -> "FlagAdapterResponse" -``` - -Convert a boolean to a flag response. - - - -## AbstractFeatureFlagAdapter Objects - -```python -class AbstractFeatureFlagAdapter(abc.ABC) -``` - -Abstract feature flag adapter. - - - -#### \_\_init\_\_ - -```python -def __init__(**kwargs: t.Any) -> None -``` - -Initialize the adapter. - - - -#### get - -```python -@abc.abstractmethod -def get(feature_name: str) -> FlagAdapterResponse -``` - -Get the feature flag. - - - -#### \_\_getitem\_\_ - -```python -def __getitem__(feature_name: str) -> FlagAdapterResponse -``` - -Get the feature flag. - - - -#### get\_many - -```python -def get_many(feature_names: t.List[str]) -> t.Dict[str, FlagAdapterResponse] -``` - -Get many feature flags. - -Implementations should override this method if they can optimize it. The default -will call get in a loop. - - - -#### save - -```python -@abc.abstractmethod -def save(feature_name: str, flag: bool) -> None -``` - -Save the feature flag. - - - -#### \_\_setitem\_\_ - -```python -def __setitem__(feature_name: str, flag: bool) -> None -``` - -Save the feature flag. - - - -#### save\_many - -```python -def save_many(flags: t.Dict[str, bool]) -> None -``` - -Save many feature flags. - -Implementations should override this method if they can optimize it. The default -will call save in a loop. - - - -#### get\_all\_feature\_names - -```python -@abc.abstractmethod -def get_all_feature_names() -> t.List[str] -``` - -Get all feature names. - - - -#### keys - -```python -def keys() -> t.List[str] -``` - -Get all feature names. - - - -#### \_\_iter\_\_ - -```python -def __iter__() -> t.Iterator[str] -``` - -Iterate over the feature names. - - - -#### \_\_contains\_\_ - -```python -def __contains__(feature_name: str) -> bool -``` - -Check if a feature flag exists. - - - -#### \_\_len\_\_ - -```python -def __len__() -> int -``` - -Get the number of feature flags. - - - -#### delete - -```python -def delete(feature_name: str) -> None -``` - -Delete a feature flag. - -By default, this will disable the flag but implementations can override this method -to delete the flag. - - - -#### delete\_many - -```python -def delete_many(feature_names: t.List[str]) -> None -``` - -Delete many feature flags. - - - -#### apply\_source - -```python -def apply_source(source: "DltSource", *namespace: str) -> "DltSource" -``` - -Apply the feature flags to a dlt source. - -**Arguments**: - -- `source` - The source to apply the feature flags to. - - -**Returns**: - - The source with the feature flags applied. - - - -# integrations - - - -# integrations.slack - - - -## SlackMessageComposer Objects - -```python -class SlackMessageComposer() -``` - -Builds Slack message with primary and secondary blocks - - - -#### \_\_init\_\_ - -```python -def __init__(initial_message: t.Optional[TSlackMessage] = None) -> None -``` - -Initialize the Slack message builder - - - -#### add\_primary\_blocks - -```python -def add_primary_blocks(*blocks: TSlackBlock) -> "SlackMessageComposer" -``` - -Add blocks to the message. Blocks are always displayed - - - -#### add\_secondary\_blocks - -```python -def add_secondary_blocks(*blocks: TSlackBlock) -> "SlackMessageComposer" -``` - -Add attachments to the message - -Attachments are hidden behind "show more" button. The first 5 attachments -are always displayed. NOTICE: attachments blocks are deprecated by Slack - - - -#### normalize\_message - -```python -def normalize_message( - message: t.Union[str, t.List[str], t.Iterable[str]]) -> str -``` - -Normalize message to fit Slack's max text length - - - -#### divider\_block - -```python -def divider_block() -> dict -``` - -Create a divider block - - - -#### fields\_section\_block - -```python -def fields_section_block(*messages: str) -> dict -``` - -Create a section block with multiple fields - - - -#### text\_section\_block - -```python -def text_section_block(message: str) -> dict -``` - -Create a section block with text - - - -#### empty\_section\_block - -```python -def empty_section_block() -> dict -``` - -Create an empty section block - - - -#### context\_block - -```python -def context_block(*messages: str) -> dict -``` - -Create a context block with multiple fields - - - -#### header\_block - -```python -def header_block(message: str) -> dict -``` - -Create a header block - - - -#### button\_action\_block - -```python -def button_action_block(text: str, url: str) -> dict -``` - -Create a button action block - - - -#### compacted\_sections\_blocks - -```python -def compacted_sections_blocks( - *messages: t.Union[str, t.Iterable[str]]) -> t.List[dict] -``` - -Create a list of compacted sections blocks - - - -## SlackAlertIcon Objects - -```python -class SlackAlertIcon(str, Enum) -``` - -Enum for status of the alert - - - -#### stringify\_list - -```python -def stringify_list(list_variation: t.Union[t.List[str], str]) -> str -``` - -Prettify and deduplicate list of strings converting it to a newline delimited string - - - -#### send\_basic\_slack\_message - -```python -def send_basic_slack_message(incoming_hook: str, - message: str, - is_markdown: bool = True) -> None -``` - -Sends a `message` to Slack `incoming_hook`, by default formatted as markdown. - - - -#### send\_extract\_start\_slack\_message - -```python -def send_extract_start_slack_message(incoming_hook: str, source: str, - run_id: str, tags: t.List[str], - owners: t.List[str], environment: str, - resources_selected: t.List[str], - resources_count: int) -> None -``` - -Sends a Slack message for the start of an extract - - - -#### send\_extract\_failure\_message - -```python -def send_extract_failure_message(incoming_hook: str, source: str, run_id: str, - duration: float, error: Exception) -> None -``` - -Sends a Slack message for the failure of an extract - - - -#### send\_extract\_success\_message - -```python -def send_extract_success_message(incoming_hook: str, source: str, run_id: str, - duration: float) -> None -``` - -Sends a Slack message for the success of an extract - - - -#### send\_normalize\_start\_slack\_message - -```python -def send_normalize_start_slack_message(incoming_hook: str, source: str, - blob_name: str, run_id: str, - environment: str) -> None -``` - -Sends a Slack message for the start of an extract - - - -#### send\_normalize\_failure\_message - -```python -def send_normalize_failure_message(incoming_hook: str, source: str, - blob_name: str, run_id: str, - duration: float, error: Exception) -> None -``` - -Sends a Slack message for the failure of an normalization - - - -#### send\_normalization\_success\_message - -```python -def send_normalization_success_message(incoming_hook: str, source: str, - blob_name: str, run_id: str, - duration: float) -> None -``` - -Sends a Slack message for the success of an normalization - - - -#### send\_load\_start\_slack\_message - -```python -def send_load_start_slack_message(incoming_hook: str, source: str, - destination: str, dataset: str, - run_id: str) -> None -``` - -Sends a Slack message for the start of a load - - - -#### send\_load\_failure\_message - -```python -def send_load_failure_message(incoming_hook: str, source: str, - destination: str, dataset: str, - run_id: str) -> None -``` - -Sends a Slack message for the failure of an load - - - -#### send\_load\_success\_message - -```python -def send_load_success_message(incoming_hook: str, source: str, - destination: str, dataset: str, run_id: str, - payload: str) -> None -``` - -Sends a Slack message for the success of an normalization + The source with the feature flags applied. diff --git a/docs/cli_reference.md b/docs/cli_reference.md deleted file mode 100644 index ebc52ae..0000000 --- a/docs/cli_reference.md +++ /dev/null @@ -1,575 +0,0 @@ -# `cdf` - -CDF (continuous data framework) is a framework for end to end data processing. - -**Usage**: - -```console -$ cdf [OPTIONS] WORKSPACE COMMAND [ARGS]... -``` - -**Arguments**: - -* `WORKSPACE`: [required] - -**Options**: - -* `-p, --path PATH`: Path to the project. [env var: CDF_ROOT; default: .] -* `-d, --debug`: Enable debug mode. -* `-e, --env TEXT`: Environment to use. -* `-l, --log-level TEXT`: The log level to use. [env var: LOG_LEVEL] -* `--help`: Show this message and exit. - -Made with [red]♥[/red] by [bold]z3z1ma[/bold]. - -**Commands**: - -* `discover`: :mag: Dry run a [b blue]Pipeline[/b blue]... -* `head`: :wrench: Prints the first N rows of a [b... -* `index`: :page_with_curl: Print an index of... -* `init`: :art: Initialize a new project. -* `jupyter-lab`: :star2: Start a Jupyter Lab server in the... -* `model`: :construction: Model management commands. -* `notebook`: :notebook: Execute a [b yellow]Notebook[/b... -* `pipeline`: :inbox_tray: Ingest data from a [b... -* `publish`: :outbox_tray: [b yellow]Publish[/b yellow]... -* `schema`: :construction: Schema management commands. -* `script`: :hammer: Execute a [b yellow]Script[/b... -* `spec`: :blue_book: Print the fields for a given... -* `state`: :construction: State management commands. - -## `cdf discover` - -:mag: Dry run a [b blue]Pipeline[/b blue] and enumerates the discovered resources. - - -Args: - ctx: The CLI context. - pipeline: The pipeline in which to discover resources. - no_quiet: Whether to suppress the pipeline stdout. - -**Usage**: - -```console -$ cdf discover [OPTIONS] PIPELINE -``` - -**Arguments**: - -* `PIPELINE`: The pipeline in which to discover resources. [required] - -**Options**: - -* `--no-quiet / --no-no-quiet`: Pipeline stdout is suppressed by default, this disables that. [default: no-no-quiet] -* `--help`: Show this message and exit. - -## `cdf head` - -:wrench: Prints the first N rows of a [b green]Resource[/b green] within a [b blue]pipeline[/b blue]. Defaults to [cyan]5[/cyan]. - -This is useful for quickly inspecting data :detective: and verifying that it is coming over the wire correctly. - - -Args: - ctx: The CLI context. - pipeline: The pipeline to inspect. - resource: The resource to inspect. - n: The number of rows to print. - -Raises: - typer.BadParameter: If the resource is not found in the pipeline. - -**Usage**: - -```console -$ cdf head [OPTIONS] PIPELINE RESOURCE -``` - -**Arguments**: - -* `PIPELINE`: The pipeline to inspect. [required] -* `RESOURCE`: The resource to inspect. [required] - -**Options**: - -* `-n, --rows INTEGER`: [default: 5] -* `--help`: Show this message and exit. - -## `cdf index` - -:page_with_curl: Print an index of [b][blue]Pipelines[/blue], [red]Models[/red], [yellow]Publishers[/yellow][/b], and other components. - -**Usage**: - -```console -$ cdf index [OPTIONS] -``` - -**Options**: - -* `--hydrate / --no-hydrate`: [default: no-hydrate] -* `--help`: Show this message and exit. - -## `cdf init` - -:art: Initialize a new project. - -**Usage**: - -```console -$ cdf init [OPTIONS] -``` - -**Options**: - -* `--help`: Show this message and exit. - -## `cdf jupyter-lab` - -:star2: Start a Jupyter Lab server in the context of a workspace. - -**Usage**: - -```console -$ cdf jupyter-lab [OPTIONS] -``` - -**Options**: - -* `--help`: Show this message and exit. - -## `cdf model` - -:construction: Model management commands. - -**Usage**: - -```console -$ cdf model [OPTIONS] COMMAND [ARGS]... -``` - -**Options**: - -* `--help`: Show this message and exit. - -Made with [red]♥[/red] by [bold]z3z1ma[/bold]. - -**Commands**: - -* `diff`: :bar_chart: Compute the diff of a [b... -* `evaluate`: :bar_chart: Evaluate a [b red]Model[/b... -* `name`: :bar_chart: Get a [b red]Model[/b red]'s... -* `prototype`: :bar_chart: Prototype a model and save the... -* `render`: :bar_chart: Render a [b red]Model[/b red]... - -### `cdf model diff` - -:bar_chart: Compute the diff of a [b red]Model[/b red] across 2 environments. A thin wrapper around `sqlmesh table_diff` - - -Args: - ctx: The CLI context. - model: The model to evaluate. Can be prefixed with the gateway. - source_target: The source and target environments separated by a colon. - -**Usage**: - -```console -$ cdf model diff [OPTIONS] MODEL SOURCE_TARGET -``` - -**Arguments**: - -* `MODEL`: The model to evaluate. Can be prefixed with the gateway. [required] -* `SOURCE_TARGET`: The source and target environments separated by a colon. [required] - -**Options**: - -* `--show-sample / --no-show-sample`: Whether to show a sample of the diff. [default: no-show-sample] -* `--help`: Show this message and exit. - -### `cdf model evaluate` - -:bar_chart: Evaluate a [b red]Model[/b red] and print the results. A thin wrapper around `sqlmesh evaluate` - - -Args: - ctx: The CLI context. - model: The model to evaluate. Can be prefixed with the gateway. - limit: The number of rows to limit the evaluation to. - -**Usage**: - -```console -$ cdf model evaluate [OPTIONS] MODEL -``` - -**Arguments**: - -* `MODEL`: The model to evaluate. Can be prefixed with the gateway. [required] - -**Options**: - -* `--start TEXT`: The start time to evaluate the model from. Defaults to 1 month ago. [default: 1 month ago] -* `--end TEXT`: The end time to evaluate the model to. Defaults to now. [default: now] -* `--limit INTEGER`: The number of rows to limit the evaluation to. -* `--help`: Show this message and exit. - -### `cdf model name` - -:bar_chart: Get a [b red]Model[/b red]'s physical table name. A thin wrapper around `sqlmesh table_name` - - -Args: - ctx: The CLI context. - model: The model to evaluate. Can be prefixed with the gateway. - -**Usage**: - -```console -$ cdf model name [OPTIONS] MODEL -``` - -**Arguments**: - -* `MODEL`: The model to convert the physical name. Can be prefixed with the gateway. [required] - -**Options**: - -* `--help`: Show this message and exit. - -### `cdf model prototype` - -:bar_chart: Prototype a model and save the results to disk. - - -Args: - ctx: The CLI context. - dependencies: The dependencies to include in the prototype. - start: The start time to evaluate the model from. Defaults to 1 month ago. - end: The end time to evaluate the model to. Defaults to now. - limit: The number of rows to limit the evaluation to. - -**Usage**: - -```console -$ cdf model prototype [OPTIONS] -``` - -**Options**: - -* `-d, --dependencies TEXT`: The dependencies to include in the prototype. -* `--start TEXT`: The start time to evaluate the model from. Defaults to 1 month ago. [default: 1 month ago] -* `--end TEXT`: The end time to evaluate the model to. Defaults to now. [default: now] -* `--limit INTEGER`: The number of rows to limit the evaluation to. [default: 5000000] -* `--help`: Show this message and exit. - -### `cdf model render` - -:bar_chart: Render a [b red]Model[/b red] and print the query. A thin wrapper around `sqlmesh render` - - -Args: - ctx: The CLI context. - model: The model to evaluate. Can be prefixed with the gateway. - start: The start time to evaluate the model from. Defaults to 1 month ago. - end: The end time to evaluate the model to. Defaults to now. - expand: The referenced models to expand. - dialect: The SQL dialect to use for rendering. - -**Usage**: - -```console -$ cdf model render [OPTIONS] MODEL -``` - -**Arguments**: - -* `MODEL`: The model to evaluate. Can be prefixed with the gateway. [required] - -**Options**: - -* `--start TEXT`: The start time to evaluate the model from. Defaults to 1 month ago. [default: 1 month ago] -* `--end TEXT`: The end time to evaluate the model to. Defaults to now. [default: now] -* `--expand TEXT`: The referenced models to expand. -* `--dialect TEXT`: The SQL dialect to use for rendering. -* `--help`: Show this message and exit. - -## `cdf notebook` - -:notebook: Execute a [b yellow]Notebook[/b yellow] within the context of the current workspace. - - -Args: - ctx: The CLI context. - notebook: The notebook to execute. - params: The parameters to pass to the notebook as a json formatted string. - -**Usage**: - -```console -$ cdf notebook [OPTIONS] NOTEBOOK -``` - -**Arguments**: - -* `NOTEBOOK`: The notebook to execute. [required] - -**Options**: - -* `--params TEXT`: The parameters to pass to the notebook as a json formatted string. [default: {}] -* `--help`: Show this message and exit. - -## `cdf pipeline` - -:inbox_tray: Ingest data from a [b blue]Pipeline[/b blue] into a data store where it can be [b red]Transformed[/b red]. - - -Args: - ctx: The CLI context. - pipeline_to_sink: The pipeline and sink separated by a colon. - select: The resources to ingest as a sequence of glob patterns. - exclude: The resources to exclude as a sequence of glob patterns. - force_replace: Whether to force replace the write disposition. - no_stage: Allows selective disabling of intermediate staging even if configured in sink. - -**Usage**: - -```console -$ cdf pipeline [OPTIONS] PIPELINE_TO_SINK -``` - -**Arguments**: - -* `PIPELINE_TO_SINK`: The pipeline and sink separated by a colon. [required] - -**Options**: - -* `-s, --select TEXT`: Glob pattern for resources to run. Can be specified multiple times. [default: (dynamic)] -* `-x, --exclude TEXT`: Glob pattern for resources to exclude. Can be specified multiple times. [default: (dynamic)] -* `-F, --force-replace`: Force the write disposition to replace ignoring state. Useful to force a reload of incremental resources. -* `--no-stage`: Do not stage the data in the staging destination of the sink even if defined. -* `--help`: Show this message and exit. - -## `cdf publish` - -:outbox_tray: [b yellow]Publish[/b yellow] data from a data store to an [violet]External[/violet] system. - - -Args: - ctx: The CLI context. - sink_to_publisher: The sink and publisher separated by a colon. - skip_verification: Whether to skip the verification of the publisher dependencies. - -**Usage**: - -```console -$ cdf publish [OPTIONS] SINK_TO_PUBLISHER -``` - -**Arguments**: - -* `SINK_TO_PUBLISHER`: The sink and publisher separated by a colon. [required] - -**Options**: - -* `--skip-verification / --no-skip-verification`: Skip the verification of the publisher dependencies. [default: no-skip-verification] -* `--help`: Show this message and exit. - -## `cdf schema` - -:construction: Schema management commands. - -**Usage**: - -```console -$ cdf schema [OPTIONS] COMMAND [ARGS]... -``` - -**Options**: - -* `--help`: Show this message and exit. - -Made with [red]♥[/red] by [bold]z3z1ma[/bold]. - -**Commands**: - -* `dump`: :computer: Dump the schema of a [b... -* `edit`: :pencil: Edit the schema of a [b... - -### `cdf schema dump` - -:computer: Dump the schema of a [b blue]pipeline[/b blue]:[violet]sink[/violet] combination. - - -Args: - ctx: The CLI context. - pipeline_to_sink: The pipeline:sink combination from which to fetch the schema. - format: The format to dump the schema in. - -Raises: - typer.BadParameter: If the pipeline or sink are not found. - -**Usage**: - -```console -$ cdf schema dump [OPTIONS] PIPELINE_TO_SINK -``` - -**Arguments**: - -* `PIPELINE_TO_SINK`: The pipeline:sink combination from which to fetch the schema. [required] - -**Options**: - -* `--format [json|yaml|yml|py|python|dict]`: The format to dump the schema in. [default: json] -* `--help`: Show this message and exit. - -### `cdf schema edit` - -:pencil: Edit the schema of a [b blue]pipeline[/b blue]:[violet]sink[/violet] combination using the system editor. - - -Args: - ctx: The CLI context. - pipeline_to_sink: The pipeline:sink combination from which to fetch the schema. - -Raises: - typer.BadParameter: If the pipeline or sink are not found. - -**Usage**: - -```console -$ cdf schema edit [OPTIONS] PIPELINE_TO_SINK -``` - -**Arguments**: - -* `PIPELINE_TO_SINK`: The pipeline:sink combination from which to fetch the schema. [required] - -**Options**: - -* `--help`: Show this message and exit. - -## `cdf script` - -:hammer: Execute a [b yellow]Script[/b yellow] within the context of the current workspace. - - -Args: - ctx: The CLI context. - script: The script to execute. - quiet: Whether to suppress the script stdout. - -**Usage**: - -```console -$ cdf script [OPTIONS] SCRIPT -``` - -**Arguments**: - -* `SCRIPT`: The script to execute. [required] - -**Options**: - -* `--quiet / --no-quiet`: Suppress the script stdout. [default: no-quiet] -* `--help`: Show this message and exit. - -## `cdf spec` - -:blue_book: Print the fields for a given spec type. - - -Args: - name: The name of the spec to print. - json_schema: Whether to print the JSON schema for the spec. - -**Usage**: - -```console -$ cdf spec [OPTIONS] NAME:{pipeline|publisher|script|notebook|sink|feature_flags|filesystem} -``` - -**Arguments**: - -* `NAME:{pipeline|publisher|script|notebook|sink|feature_flags|filesystem}`: [required] - -**Options**: - -* `--json-schema / --no-json-schema`: [default: no-json-schema] -* `--help`: Show this message and exit. - -## `cdf state` - -:construction: State management commands. - -**Usage**: - -```console -$ cdf state [OPTIONS] COMMAND [ARGS]... -``` - -**Options**: - -* `--help`: Show this message and exit. - -Made with [red]♥[/red] by [bold]z3z1ma[/bold]. - -**Commands**: - -* `dump`: :computer: Dump the state of a [b... -* `edit`: :pencil: Edit the state of a [b... - -### `cdf state dump` - -:computer: Dump the state of a [b blue]pipeline[/b blue]:[violet]sink[/violet] combination. - - -Args: - ctx: The CLI context. - pipeline_to_sink: The pipeline:sink combination from which to fetch the state. - -Raises: - typer.BadParameter: If the pipeline or sink are not found. - -**Usage**: - -```console -$ cdf state dump [OPTIONS] PIPELINE_TO_SINK -``` - -**Arguments**: - -* `PIPELINE_TO_SINK`: The pipeline:sink combination from which to fetch the schema. [required] - -**Options**: - -* `--help`: Show this message and exit. - -### `cdf state edit` - -:pencil: Edit the state of a [b blue]pipeline[/b blue]:[violet]sink[/violet] combination using the system editor. - - -Args: - ctx: The CLI context. - pipeline_to_sink: The pipeline:sink combination from which to fetch the state. - -Raises: - typer.BadParameter: If the pipeline or sink are not found. - -**Usage**: - -```console -$ cdf state edit [OPTIONS] PIPELINE_TO_SINK -``` - -**Arguments**: - -* `PIPELINE_TO_SINK`: The pipeline:sink combination from which to fetch the state. [required] - -**Options**: - -* `--help`: Show this message and exit. - diff --git a/pyproject.toml b/pyproject.toml index 9659589..4627e24 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "cdf" -version = "0.8.0" +version = "0.9.0" description = "A framework to manage data continuously" authors = [ { name = "z3z1ma", email = "butler.alex2010@gmail.com" }, @@ -40,9 +40,6 @@ dev = [ "pydoc-markdown>4", ] -[project.scripts] -cdf = "cdf.cli:app" - [build-system] requires = ["hatchling"] build-backend = "hatchling.build"