Skip to content

Commit

Permalink
Merge pull request #199 from dlt-hub/rfix/post-run-introspection
Browse files Browse the repository at this point in the history
post run introspection
  • Loading branch information
rudolfix authored Mar 30, 2023
2 parents 6732c4f + c032a6b commit 4ac4507
Show file tree
Hide file tree
Showing 80 changed files with 2,237 additions and 619 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test_dbt_runner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ on:
env:
# all credentials must be present to be passed to dbt runner
DESTINATION__POSTGRES__CREDENTIALS: postgresql://[email protected]:5432/dlt_data
DESTINATION__REDSHIFT__CREDENTIALS: postgresql://[email protected]:5439/chat_analytics_rasa_ci
DESTINATION__REDSHIFT__CREDENTIALS: postgresql://[email protected]:5439/dlt_ci
DESTINATION__CREDENTIALS__PASSWORD: ${{ secrets.PG_PASSWORD }}

DESTINATION__CREDENTIALS__PROJECT_ID: chat-analytics-rasa-ci
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ on:

env:
DESTINATION__POSTGRES__CREDENTIALS: postgresql://[email protected]:5432/dlt_data
DESTINATION__REDSHIFT__CREDENTIALS: postgresql://[email protected]:5439/chat_analytics_rasa_ci
DESTINATION__REDSHIFT__CREDENTIALS: postgresql://[email protected]:5439/dlt_ci
DESTINATION__DUCKDB__CREDENTIALS: duckdb:///_storage/test_quack.duckdb
# password is the same so it will be shared
CREDENTIALS__PASSWORD: ${{ secrets.PG_PASSWORD }}
Expand Down
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,17 @@ Please use `poetry version prerelease` to bump patch and then `make build-librar
To test destinations use `make test`. You will need following external resources
1. `BigQuery` project
2. `Redshift` cluster
3. `Postgres` instance. You can find a docker compose for postgres instance [here](tests/load/postgres/docker-compose.yml)
3. `Postgres` instance. You can find a docker compose for postgres instance [here](tests/load/postgres/docker-compose.yml). When run the instance is configured to work with the tests.
```shell
cd tests/load/postgres/
docker-compose up --build -d
```

See `tests/.example.env` for the expected environment variables. Then create `tests/.env` from it. You configure the tests as you would configure the dlt pipeline.
See `tests/.example.env` for the expected environment variables and command line example to run the tests. Then create `tests/.env` from it. You configure the tests as you would configure the dlt pipeline.
We'll provide you with access to the resources above if you wish to test locally.

To test local destinations (`duckdb` and `postgres`) run `make test-local`. You can run this tests without additional credentials (just copy `.example.env` into `.env`)

## publishing

1. Make sure that you are on `devel` branch and you have the newest code that passed all tests on CI.
Expand Down
39 changes: 23 additions & 16 deletions dlt/cli/_dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ def deploy_command_wrapper(pipeline_script_path: str, deployment_method: str, sc


@utils.track_command("pipeline", True, "operation")
def pipeline_command_wrapper(operation: str, pipeline_name: str, pipelines_dir: str) -> int:
def pipeline_command_wrapper(operation: str, pipeline_name: str, pipelines_dir: str, verbosity: int, load_id: str = None) -> int:
try:
pipeline_command(operation, pipeline_name, pipelines_dir)
pipeline_command(operation, pipeline_name, pipelines_dir, verbosity, load_id)
return 0
except (CannotRestorePipelineException, Exception) as ex:
click.secho(str(ex), err=True, fg="red")
Expand Down Expand Up @@ -180,8 +180,8 @@ def main() -> int:
init_cmd.add_argument("--generic", default=False, action="store_true", help="When present uses a generic template with all the dlt loading code present will be used. Otherwise a debug template is used that can be immediately run to get familiar with the dlt sources.")

deploy_cmd = subparsers.add_parser("deploy", help="Creates a deployment package for a selected pipeline script")
deploy_cmd.add_argument("pipeline_script_path", help="Path to a pipeline script")
deploy_cmd.add_argument("deployment_method", choices=["github-action"], default="github-action", help="Deployment method")
deploy_cmd.add_argument("pipeline-script-path", help="Path to a pipeline script")
deploy_cmd.add_argument("deployment-method", choices=["github-action"], default="github-action", help="Deployment method")
deploy_cmd.add_argument("--schedule", required=True, help="A schedule with which to run the pipeline, in cron format. Example: '*/30 * * * *' will run the pipeline every 30 minutes.")
deploy_cmd.add_argument("--run-manually", default=True, action="store_true", help="Allows the pipeline to be run manually form Github Actions UI.")
deploy_cmd.add_argument("--run-on-push", default=False, action="store_true", help="Runs the pipeline with every push to the repository.")
Expand All @@ -193,25 +193,32 @@ def main() -> int:
schema.add_argument("--remove-defaults", action="store_true", help="Does not show default hint values")

pipe_cmd = subparsers.add_parser("pipeline", help="Operations on pipelines that were ran locally")
pipe_cmd.add_argument("name", help="Pipeline name")
pipe_cmd.add_argument(
"operation",
choices=["info", "show", "list", "failed_loads", "sync"],
default="info",
help="""'info' - displays state of the pipeline,
'show' - launches Streamlit app with the loading status and dataset explorer,
'failed_loads' - displays information on all the failed loads, failed jobs and associated error messages,
'sync' - drops the local state of the pipeline and resets all the schemas and restores it from destination. The destination state, data and schemas are left intact."""
)
pipe_cmd.add_argument("--pipelines_dir", help="Pipelines working directory", default=None)
pipe_cmd.add_argument("--list-pipelines", "-l", default=False, action="store_true", help="List local pipelines")
pipe_cmd.add_argument("name", nargs='?', help="Pipeline name")
pipe_cmd.add_argument("--pipelines-dir", help="Pipelines working directory", default=None)
pipe_cmd.add_argument("--verbose", "-v", action='count', default=0, help="Provides more information for certain commands.")

pipeline_subparsers = pipe_cmd.add_subparsers(dest="operation", required=False)
pipeline_subparsers.add_parser("info", help="Displays state of the pipeline, use -v or -vv for more info")
pipeline_subparsers.add_parser("show", help="Generates and launches Streamlit app with the loading status and dataset explorer")
pipeline_subparsers.add_parser("failed-jobs", help="Displays information on all the failed loads in all completed packages, failed jobs and associated error messages")
pipeline_subparsers.add_parser("sync", help="Drops the local state of the pipeline and resets all the schemas and restores it from destination. The destination state, data and schemas are left intact.")
pipeline_subparsers.add_parser("trace", help="Displays last run trace, use -v or -vv for more info")
pipe_cmd_package = pipeline_subparsers.add_parser("load-package", help="Displays information on load package, use -v or -vv for more info")
pipe_cmd_package.add_argument("load-id", nargs='?', help="Load id of completed or normalized package. Defaults to the most recent package.")

subparsers.add_parser("telemetry", help="Shows telemetry status")

args = parser.parse_args()

if args.command == "schema":
return schema_command_wrapper(args.file, args.format, args.remove_defaults)
elif args.command == "pipeline":
return pipeline_command_wrapper(args.operation, args.name, args.pipelines_dir)
if args.list_pipelines:
return pipeline_command_wrapper("list", "-", args.pipelines_dir, args.verbose)
else:
load_id = args.load_id if args.operation == "load_package" else None
return pipeline_command_wrapper(args.operation or "info", args.name, args.pipelines_dir, args.verbose, load_id)
elif args.command == "init":
if args.list_pipelines:
return list_pipelines_command_wrapper(args.location, args.branch)
Expand Down
3 changes: 1 addition & 2 deletions dlt/cli/deploy_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from dlt.common.typing import StrAny
from dlt.common.utils import set_working_dir

from dlt.pipeline.trace import load_trace
from dlt.reflection import names as n

from dlt.cli import utils
Expand Down Expand Up @@ -95,7 +94,7 @@ def deploy_command(pipeline_script_path: str, deployment_method: str, schedule:
# attach to pipeline name, get state and trace
pipeline = dlt.attach(pipeline_name=pipeline_name, pipelines_dir=pipelines_dir)
# trace must exist and end with successful loading step
trace = load_trace(pipeline.working_dir)
trace = pipeline.last_trace
if trace is None or len(trace.steps) == 0:
raise PipelineWasNotRun("Pipeline run trace could not be found. Please run the pipeline at least once locally.")
last_step = trace.steps[-1]
Expand Down
3 changes: 2 additions & 1 deletion dlt/cli/init_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ def _welcome_message(pipeline_name: str, destination_name: str, pipeline_files:
fmt.echo(" " + fmt.bold(dep))
fmt.echo(" If the python-dlt dependency is already added, make sure you install the extra for %s to it" % fmt.bold(destination_name))
if dependency_system == utils.REQUIREMENTS_TXT:
fmt.echo(" To install with pip: %s" % fmt.bold(f"pip3 install {' '.join(pipeline_files.requirements)}"))
qs = "' '"
fmt.echo(" To install with pip: %s" % fmt.bold(f"pip3 install '{qs.join(pipeline_files.requirements)}'"))
elif dependency_system == utils.PYPROJECT_TOML:
fmt.echo(" If you are using poetry you may issue the following command:")
fmt.echo(fmt.bold(" poetry add %s -E %s" % (DLT_PKG_NAME, destination_name)))
Expand Down
91 changes: 83 additions & 8 deletions dlt/cli/pipeline_command.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
import os
import yaml
import dlt
from dlt.cli.exceptions import CliCommandException

from dlt.common import json
from dlt.common.pipeline import get_dlt_pipelines_dir
from dlt.common.runners import Venv
from dlt.common.runners.stdout import iter_stdout
from dlt.common.schema.utils import remove_defaults
from dlt.common.storages.file_storage import FileStorage

from dlt.cli import echo as fmt
from dlt.pipeline.state import TSourceState


def pipeline_command(operation: str, pipeline_name: str, pipelines_dir: str) -> None:
def pipeline_command(operation: str, pipeline_name: str, pipelines_dir: str, verbosity: int, load_id: str = None) -> None:
if operation == "list":
pipelines_dir = pipelines_dir or get_dlt_pipelines_dir()
storage = FileStorage(pipelines_dir)
Expand All @@ -31,23 +36,93 @@ def pipeline_command(operation: str, pipeline_name: str, pipelines_dir: str) ->
fmt.echo(line)

if operation == "info":
state = p.state
state: TSourceState = p.state # type: ignore
fmt.echo("Synchronized state:")
for k, v in state.items():
if not isinstance(v, dict):
fmt.echo("%s: %s" % (fmt.style(k, fg="green"), v))
if state.get("sources"):
fmt.echo()
fmt.secho("sources:", fg="green")
if verbosity > 0:
fmt.echo(json.dumps(state["sources"], pretty=True))
else:
fmt.echo("Add -v option to see sources state. Note that it could be large.")

fmt.echo()
fmt.echo("Local state:")
for k, v in state["_local"].items():
if not isinstance(v, dict):
fmt.echo("%s: %s" % (fmt.style(k, fg="green"), v))
fmt.echo()
fmt.echo("Working dir content:")
extracted_files = p.list_extracted_resources()
if extracted_files:
fmt.echo("Has %s extracted files ready to be normalized" % fmt.bold(str(len(extracted_files))))
norm_packages = p.list_normalized_load_packages()
if norm_packages:
fmt.echo("Has %s load packages ready to be loaded with following load ids:" % fmt.bold(str(len(norm_packages))))
for load_id in norm_packages:
fmt.echo(load_id)
fmt.echo()
loaded_packages = p.list_completed_load_packages()
if loaded_packages:
fmt.echo("Has %s completed load packages with following load ids:" % fmt.bold(str(len(loaded_packages))))
for load_id in loaded_packages:
fmt.echo(load_id)
fmt.echo()
trace = p.last_trace
if trace is None or len(trace.steps) == 0:
fmt.echo("Pipeline does not have last run trace.")
else:
fmt.echo("Pipeline has last run trace. Use 'dlt pipeline %s trace' to inspect " % pipeline_name)

if operation == "trace":
trace = p.last_trace
if trace is None or len(trace.steps) == 0:
fmt.warning("Pipeline does not have last run trace.")
return
fmt.echo(trace.asstr(verbosity))

if operation == "failed_loads":
if operation == "failed-jobs":
completed_loads = p.list_completed_load_packages()
for load_id in completed_loads:
normalized_loads = p.list_normalized_load_packages()
for load_id in completed_loads + normalized_loads: # type: ignore
fmt.echo("Checking failed jobs in load id '%s'" % fmt.bold(load_id))
for job, failed_message in p.list_failed_jobs_in_package(load_id):
fmt.echo("JOB: %s" % fmt.bold(os.path.abspath(job)))
fmt.secho(failed_message, fg="red")
failed_jobs = p.list_failed_jobs_in_package(load_id)
if failed_jobs:
for failed_job in p.list_failed_jobs_in_package(load_id):
fmt.echo("JOB: %s(%s)" % (fmt.bold(failed_job.job_file_info.job_id()), fmt.bold(failed_job.job_file_info.table_name)))
fmt.echo("JOB file type: %s" % fmt.bold(failed_job.job_file_info.file_format))
fmt.echo("JOB file path: %s" % fmt.bold(failed_job.file_path))
if verbosity > 0:
fmt.echo(failed_job.asstr(verbosity))
fmt.secho(failed_job.failed_message, fg="red")
fmt.echo()
else:
fmt.echo("No failed jobs found")


if operation == "sync":
if fmt.confirm("About to drop the local state of the pipeline and reset all the schemas. The destination state, data and schemas are left intact. Proceed?", default=False):
p = p.drop()
p.sync_destination()

if operation == "load-package":
if not load_id:
packages = sorted(p.list_normalized_load_packages())
if not packages:
packages = sorted(p.list_completed_load_packages())
if not packages:
raise CliCommandException("pipeline", "There are no load packages for that pipeline")
load_id = packages[-1]

package_info = p.get_load_package_info(load_id)
fmt.echo(package_info.asstr(verbosity))
if len(package_info.schema_update) > 0:
if verbosity == 0:
print("Add -v option to see schema update. Note that it could be large.")
else:
tables = remove_defaults({"tables": package_info.schema_update})
fmt.echo(fmt.bold("Schema update:"))
fmt.echo(yaml.dump(tables, allow_unicode=True, default_flow_style=False, sort_keys=False))
7 changes: 6 additions & 1 deletion dlt/cli/pipeline_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,12 @@ def get_remote_pipeline_index(repo_path: str, files: Sequence[str]) -> TPipeline


def get_pipeline_names(pipelines_storage: FileStorage) -> List[str]:
return [n for n in pipelines_storage.list_folder_dirs(".", to_root=False) if not any(fnmatch.fnmatch(n, ignore) for ignore in IGNORE_PIPELINES)]
candidates: List[str] = []
for name in [n for n in pipelines_storage.list_folder_dirs(".", to_root=False) if not any(fnmatch.fnmatch(n, ignore) for ignore in IGNORE_PIPELINES)]:
# must contain at least one valid python script
if any(f.endswith(".py") for f in pipelines_storage.list_folder_files(name, to_root=False)):
candidates.append(name)
return candidates


def get_pipeline_files(pipelines_storage: FileStorage, pipeline_name: str) -> PipelineFiles:
Expand Down
8 changes: 4 additions & 4 deletions dlt/common/configuration/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Any, Mapping, Type, Tuple, Union, NamedTuple, Sequence
from typing import Any, Mapping, Type, Tuple, NamedTuple, Sequence

from dlt.common.exceptions import DltException
from dlt.common.exceptions import DltException, TerminalException


class LookupTrace(NamedTuple):
Expand All @@ -10,11 +10,11 @@ class LookupTrace(NamedTuple):
value: Any


class ConfigurationException(DltException):
class ConfigurationException(DltException, TerminalException):
pass


class ContainerException(ConfigurationException):
class ContainerException(DltException):
"""base exception for all exceptions related to injectable container"""
pass

Expand Down
3 changes: 2 additions & 1 deletion dlt/common/configuration/providers/toml.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ def get_value(self, key: str, hint: Type[Any], *sections: str) -> Tuple[Optional
if not isinstance(node, dict):
raise KeyError(k)
node = node[k]
return node, full_key
rv = node.unwrap() if isinstance(node, (TOMLContainer, TOMLItem)) else node
return rv, full_key
except KeyError:
return None, full_key

Expand Down
5 changes: 3 additions & 2 deletions dlt/common/configuration/specs/run_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from os.path import isfile, join
from pathlib import Path
from typing import Any, Optional, Tuple, IO
from dlt.common.typing import TSecretStrValue

from dlt.common.utils import encoding_for_mode, main_module_file_path, reveal_pseudo_secret
from dlt.common.configuration.specs.base_configuration import BaseConfiguration, configspec
Expand All @@ -12,7 +13,7 @@
class RunConfiguration(BaseConfiguration):
pipeline_name: Optional[str] = None
sentry_dsn: Optional[str] = None # keep None to disable Sentry
slack_incoming_hook: Optional[str] = None
slack_incoming_hook: Optional[TSecretStrValue] = None
prometheus_port: Optional[int] = None # keep None to disable Prometheus
dlthub_telemetry: bool = True # enable or disable dlthub telemetry
dlthub_telemetry_segment_write_key: str = "a1F2gc6cNYw2plyAt02sZouZcsRjG7TD"
Expand All @@ -31,7 +32,7 @@ def on_resolved(self) -> None:
# it may be obfuscated base64 value
# TODO: that needs to be removed ASAP
try:
self.slack_incoming_hook = reveal_pseudo_secret(self.slack_incoming_hook, b"dlt-runtime-2022")
self.slack_incoming_hook = TSecretStrValue(reveal_pseudo_secret(self.slack_incoming_hook, b"dlt-runtime-2022"))
except binascii.Error:
# just keep the original value
pass
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/configuration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def log_traces(config: Optional[BaseConfiguration], key: str, hint: Type[Any], v
_RESOLVED_TRACES[path] = ResolvedValueTrace(key, resolved_trace.value, default_value, hint, resolved_trace.sections, resolved_trace.provider, config)


def get_resolved_traces() -> Mapping[str, ResolvedValueTrace]:
def get_resolved_traces() -> Dict[str, ResolvedValueTrace]:
return _RESOLVED_TRACES


Expand Down
7 changes: 4 additions & 3 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

from dlt.common.configuration.utils import serialize_value
from dlt.common.exceptions import IdentifierTooLongException, InvalidDestinationReference, UnknownDestinationModule
from dlt.common.schema import Schema
from dlt.common.schema.typing import TTableSchema
from dlt.common.schema import Schema, TTableSchema, TSchemaTables
from dlt.common.schema.exceptions import InvalidDatasetName
from dlt.common.configuration import configspec
from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration
Expand Down Expand Up @@ -92,8 +91,10 @@ def initialize_storage(self) -> None:
def is_storage_initialized(self) -> bool:
pass

def update_storage_schema(self) -> None:
def update_storage_schema(self, schema_update: Optional[TSchemaTables] = None) -> Optional[TSchemaTables]:
"""Performs schema update according to held schema and/or schema update passed. Returns an update that was applied at the destination."""
self._verify_schema_identifier_lengths()
return schema_update

@abstractmethod
def start_file_load(self, table: TTableSchema, file_path: str) -> LoadJob:
Expand Down
Loading

0 comments on commit 4ac4507

Please sign in to comment.