Skip to content

Commit

Permalink
Merge pull request #174 from dlt-hub/rfix/adds-segment-telemetry
Browse files Browse the repository at this point in the history
adds segment telemetry and sentry tracing
  • Loading branch information
rudolfix authored Mar 14, 2023
2 parents f6773c4 + 8599d1c commit 1049059
Show file tree
Hide file tree
Showing 104 changed files with 2,060 additions and 700 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test_destination_bigquery.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ env:
CREDENTIALS__PRIVATE_KEY: ${{ secrets.BQ_CRED_PRIVATE_KEY }}
CREDENTIALS__TOKEN_URI: https://oauth2.googleapis.com/token

RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR

jobs:
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ env:
# password is the same so it will be shared
CREDENTIALS__PASSWORD: ${{ secrets.PG_PASSWORD }}

RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_SEGMENT_WRITE_KEY: TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB

jobs:

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ lint:
./check-package.sh
poetry run mypy --config-file mypy.ini dlt docs/examples
poetry run flake8 --max-line-length=200 dlt docs/examples
poetry run flake8 --max-line-length=200 tests
poetry run flake8 --max-line-length=200 tests --exclude tests/reflection/module_cases
# $(MAKE) lint-security

lint-security:
Expand Down
159 changes: 94 additions & 65 deletions dlt/cli/_dlt.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@
from typing import Any, Sequence
import yaml
import os
import argparse
import click
from dlt.common.pipeline import get_default_pipelines_dir
from dlt.common.runners.stdout import iter_stdout
from dlt.common.runners.venv import Venv
from dlt.common.storages.file_storage import FileStorage
from dlt.cli.telemetry_command import DLT_TELEMETRY_DOCS_URL, change_telemetry_status_command, telemetry_status_command

from dlt.version import __version__
from dlt.common import json
from dlt.common.schema import Schema
from dlt.common.typing import DictStrAny

from dlt.pipeline import attach

import dlt.cli.echo as fmt
from dlt.cli import utils
from dlt.cli.init_command import init_command, list_pipelines_command, DLT_INIT_DOCS_URL, DEFAULT_PIPELINES_REPO
from dlt.cli.deploy_command import PipelineWasNotRun, deploy_command, DLT_DEPLOY_DOCS_URL
from dlt.cli.pipeline_command import pipeline_command
from dlt.pipeline.exceptions import CannotRestorePipelineException


@utils.track_command("init", False, "pipeline_name", "destination_name")
def init_command_wrapper(pipeline_name: str, destination_name: str, use_generic_template: bool, repo_location: str, branch: str) -> int:
try:
init_command(pipeline_name, destination_name, use_generic_template, repo_location, branch)
Expand All @@ -31,6 +29,7 @@ def init_command_wrapper(pipeline_name: str, destination_name: str, use_generic_
return 0


@utils.track_command("list_pipelines", False)
def list_pipelines_command_wrapper(repo_location: str, branch: str) -> int:
try:
list_pipelines_command(repo_location, branch)
Expand All @@ -41,6 +40,7 @@ def list_pipelines_command_wrapper(repo_location: str, branch: str) -> int:
return 0


@utils.track_command("deploy", False, "deployment_method")
def deploy_command_wrapper(pipeline_script_path: str, deployment_method: str, schedule: str, run_on_push: bool, run_on_dispatch: bool, branch: str) -> int:
try:
utils.ensure_git_command("deploy")
Expand Down Expand Up @@ -79,60 +79,96 @@ def deploy_command_wrapper(pipeline_script_path: str, deployment_method: str, sc
return 0


def pipeline_command_wrapper(operation: str, name: str, pipelines_dir: str) -> int:

@utils.track_command("pipeline", True, "operation")
def pipeline_command_wrapper(operation: str, pipeline_name: str, pipelines_dir: str) -> int:
try:
if operation == "list":
pipelines_dir = pipelines_dir or get_default_pipelines_dir()
storage = FileStorage(pipelines_dir)
dirs = storage.list_folder_dirs(".", to_root=False)
if len(dirs) > 0:
click.echo("%s pipelines found in %s" % (len(dirs), fmt.bold(pipelines_dir)))
else:
click.echo("No pipelines found in %s" % fmt.bold(pipelines_dir))
for _dir in dirs:
click.secho(_dir, fg="green")
return 0

p = attach(pipeline_name=name, pipelines_dir=pipelines_dir)
click.echo("Found pipeline %s in %s" % (fmt.bold(p.pipeline_name), fmt.bold(p.pipelines_dir)))

if operation == "show":
from dlt.helpers import streamlit
venv = Venv.restore_current()
for line in iter_stdout(venv, "streamlit", "run", streamlit.__file__, name):
click.echo(line)

if operation == "info":
state = p.state
for k, v in state.items():
if not isinstance(v, dict):
click.echo("%s: %s" % (click.style(k, fg="green"), v))
for k, v in state["_local"].items():
if not isinstance(v, dict):
click.echo("%s: %s" % (click.style(k, fg="green"), v))

if operation == "failed_loads":
completed_loads = p.list_completed_load_packages()
for load_id in completed_loads:
click.echo("Checking failed jobs in load id '%s'" % fmt.bold(load_id))
for job, failed_message in p.list_failed_jobs_in_package(load_id):
click.echo("JOB: %s" % fmt.bold(os.path.abspath(job)))
click.secho(failed_message, fg="red")

if operation == "sync":
if click.confirm("About to drop the local state of the pipeline and reset all the schemas. The destination state, data and schemas are left intact. Proceed?", default=False):
p = p.drop()
p.sync_destination()
pipeline_command(operation, pipeline_name, pipelines_dir)
return 0
except (CannotRestorePipelineException, Exception) as ex:
click.secho(str(ex), err=True, fg="red")
return 1


@utils.track_command("schema", False, "operation")
def schema_command_wrapper(file_path: str, format_: str, remove_defaults: bool) -> int:
with open(file_path, "br") as f:
if os.path.splitext(file_path)[1][1:] == "json":
schema_dict: DictStrAny = json.load(f)
else:
schema_dict = yaml.safe_load(f)
s = Schema.from_dict(schema_dict)
if format_ == "json":
schema_str = json.dumps(s.to_dict(remove_defaults=remove_defaults), pretty=True)
else:
schema_str = s.to_pretty_yaml(remove_defaults=remove_defaults)
print(schema_str)
return 0


@utils.track_command("telemetry", False)
def telemetry_status_command_wrapper() -> int:
try:
telemetry_status_command()
except Exception as ex:
click.secho(str(ex), err=True, fg="red")
fmt.note("Please refer to %s for further assistance" % fmt.bold(DLT_TELEMETRY_DOCS_URL))
return -1
return 0


@utils.track_command("telemetry_switch", False, "enabled")
def telemetry_change_status_command_wrapper(enabled: bool) -> int:
try:
change_telemetry_status_command(enabled)
except Exception as ex:
click.secho(str(ex), err=True, fg="red")
fmt.note("Please refer to %s for further assistance" % fmt.bold(DLT_TELEMETRY_DOCS_URL))
return -1
return 0


ACTION_EXECUTED = False

def print_help(parser: argparse.ArgumentParser) -> None:
if not ACTION_EXECUTED:
parser.print_help()


class TelemetryAction(argparse.Action):
def __init__(self, option_strings: Sequence[str], dest: Any = argparse.SUPPRESS, default: Any = argparse.SUPPRESS, help: str = None) -> None: # noqa
super(TelemetryAction, self).__init__(
option_strings=option_strings,
dest=dest,
default=default,
nargs=0,
help=help
)
def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, values: Any, option_string: str = None) -> None:
global ACTION_EXECUTED

ACTION_EXECUTED = True
telemetry_change_status_command_wrapper(option_string == "--enable-telemetry")


class NonInteractiveAction(argparse.Action):
def __init__(self, option_strings: Sequence[str], dest: Any = argparse.SUPPRESS, default: Any = argparse.SUPPRESS, help: str = None) -> None: # noqa
super(NonInteractiveAction, self).__init__(
option_strings=option_strings,
dest=dest,
default=default,
nargs=0,
help=help
)
def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, values: Any, option_string: str = None) -> None:
fmt.ALWAYS_CHOOSE_DEFAULT = True


def main() -> int:
parser = argparse.ArgumentParser(description="Runs various DLT modules", formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--version', action='version', version='%(prog)s {version}'.format(version=__version__))
parser.add_argument('--version', action="version", version='%(prog)s {version}'.format(version=__version__))
parser.add_argument('--disable-telemetry', action=TelemetryAction, help="Disables telemetry before command is executed")
parser.add_argument('--enable-telemetry', action=TelemetryAction, help="Enables telemetry before command is executed")
parser.add_argument('--non-interactive', action=NonInteractiveAction, help="Non interactive mode. Default choices are automatically made for confirmations and prompts.")
subparsers = parser.add_subparsers(dest="command")

init_cmd = subparsers.add_parser("init", help="Adds or creates a pipeline in the current folder.")
Expand Down Expand Up @@ -163,26 +199,17 @@ def main() -> int:
choices=["info", "show", "list", "failed_loads", "sync"],
default="info",
help="""'info' - displays state of the pipeline,
'show' - launches streamlit app with the loading status and dataset explorer,
'show' - launches Streamlit app with the loading status and dataset explorer,
'failed_loads' - displays information on all the failed loads, failed jobs and associated error messages,
'sync' - drops the local state of the pipeline and resets all the schemas and restores it from destination. The destination state, data and schemas are left intact."""
)
pipe_cmd.add_argument("--pipelines_dir", help="Pipelines working directory", default=None)
subparsers.add_parser("telemetry", help="Shows telemetry status")

args = parser.parse_args()

if args.command == "schema":
with open(args.file, "br") as f:
if os.path.splitext(args.file)[1][1:] == "json":
schema_dict: DictStrAny = json.load(f)
else:
schema_dict = yaml.safe_load(f)
s = Schema.from_dict(schema_dict)
if args.format == "json":
schema_str = json.dumps(s.to_dict(remove_defaults=args.remove_defaults), pretty=True)
else:
schema_str = s.to_pretty_yaml(remove_defaults=args.remove_defaults)
print(schema_str)
return schema_command_wrapper(args.file, args.format, args.remove_defaults)
elif args.command == "pipeline":
return pipeline_command_wrapper(args.operation, args.name, args.pipelines_dir)
elif args.command == "init":
Expand All @@ -196,13 +223,15 @@ def main() -> int:
return init_command_wrapper(args.pipeline, args.destination, args.generic, args.location, args.branch)
elif args.command == "deploy":
return deploy_command_wrapper(args.pipeline_script_path, args.deployment_method, args.schedule, args.run_on_push, args.run_manually, args.branch)
elif args.command == "telemetry":
return telemetry_status_command_wrapper()
else:
parser.print_help()
print_help(parser)
return -1
return 0


def _main() -> None:
"""Script entry point"""
exit(main())


Expand Down
2 changes: 1 addition & 1 deletion dlt/cli/config_toml_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,4 @@ def write_values(toml: tomlkit.TOMLDocument, values: Iterable[WritableConfigValu
else:
toml_table = toml_table[section] # type: ignore

write_value(toml_table, value.name, value.hint, overwrite_existing, default_value=value.default_value)
write_value(toml_table, value.name, value.hint, overwrite_existing, default_value=value.default_value, is_default_of_interest=True)
Loading

0 comments on commit 1049059

Please sign in to comment.