Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for continuously starting load jobs as slots free up in the loader #1494

Merged
merged 104 commits into from
Aug 4, 2024
Merged
Show file tree
Hide file tree
Changes from 88 commits
Commits
Show all changes
104 commits
Select commit Hold shift + click to select a range
78a5989
add support for starting load jobs as slots free up
sh-rp Jun 19, 2024
b4d05c8
Merge branch 'devel' into feat/continuous-load-jobs
sh-rp Jun 27, 2024
8f1c9bc
Merge branch 'devel' into feat/continuous-load-jobs
sh-rp Jul 2, 2024
c516fbc
update loader class to devel changes
sh-rp Jul 2, 2024
da8c9e6
update failed w_d test
sh-rp Jul 2, 2024
b8ff71d
reduce sleep time for now
sh-rp Jul 2, 2024
fa66386
add first implementation of futures on custom destination
sh-rp Jul 2, 2024
d59e4eb
rename start_file_load to get_load_job
sh-rp Jul 2, 2024
3a8ec86
add first version of working follow up jobs for new loader setup
sh-rp Jul 2, 2024
1768e17
require jobclient in constructor for duckdb
sh-rp Jul 2, 2024
1707413
fixes some dummy tests
sh-rp Jul 2, 2024
189988c
update all jobs to have the new run method
sh-rp Jul 3, 2024
a53a9b7
unify file_path argument in loadjobs
sh-rp Jul 3, 2024
37108a6
fixes some filepath related tests
sh-rp Jul 3, 2024
aaa14fe
renames job classes for more clarity and small updates
sh-rp Jul 3, 2024
78f5dbc
re-organize jobs a bit more
sh-rp Jul 4, 2024
a8d4a7a
fix destination parallelism
sh-rp Jul 4, 2024
2d1c3b0
remove changed in config.toml
sh-rp Jul 4, 2024
c93fea8
replace emptyloadjob with finalized load job
sh-rp Jul 4, 2024
9c4ee47
make sure files are only moved on main thread
sh-rp Jul 4, 2024
331d74a
Merge branch 'devel' into feat/continuous-load-jobs
sh-rp Jul 4, 2024
2f6d3db
tmp
sh-rp Jul 8, 2024
f61151a
wrap job instantiation in try catch block (still needs improvement)
sh-rp Jul 8, 2024
145dbfb
Merge branch 'devel' into feat/continuous-load-jobs
sh-rp Jul 8, 2024
3765b01
post devel merge fix
sh-rp Jul 8, 2024
4d05dd5
simplify followupjob creation
sh-rp Jul 8, 2024
5ddb8ed
refactor job restoring
sh-rp Jul 8, 2024
14794e7
Merge branch 'devel' into feat/continuous-load-jobs
sh-rp Jul 8, 2024
efb21b1
simplify common fields on loadjobs
sh-rp Jul 8, 2024
75bbb59
Merge branch 'devel' into feat/continuous-load-jobs
sh-rp Jul 9, 2024
1f857a0
completely separate followupjobs from regular loadjobs
sh-rp Jul 9, 2024
d6ad935
unify some more loadjob vars
sh-rp Jul 9, 2024
d6d2dc7
fix job client tests
sh-rp Jul 9, 2024
1a5d2de
amend last commit
sh-rp Jul 9, 2024
58ae445
fix handling of jobs in loader
sh-rp Jul 9, 2024
802b168
fix a couple more tests
sh-rp Jul 9, 2024
18fbca2
fix deltalake load jobs
sh-rp Jul 9, 2024
26d3ca1
fix pending exceptions code
sh-rp Jul 10, 2024
47f5298
fix partial load tests
sh-rp Jul 10, 2024
0d97352
fix custom destination and delta table tests
sh-rp Jul 10, 2024
6f7c940
remove one unclear assertion for now
sh-rp Jul 10, 2024
7980cd1
fix clickhouse loadjob
sh-rp Jul 10, 2024
20ad945
fix databricks loadjob
sh-rp Jul 10, 2024
dafd93c
fix one weaviate and the qdrant local tests (hopefully :)
sh-rp Jul 11, 2024
6cb31b5
Merge branch 'devel' into feat/continuous-load-jobs
sh-rp Jul 11, 2024
acdac15
fix one pipeline test
sh-rp Jul 11, 2024
b2f1ad6
add a couple of loader test stubs
sh-rp Jul 11, 2024
9903b18
update bigquery load jobs to new format
sh-rp Jul 11, 2024
933e962
Merge branch 'devel' into feat/continuous-load-jobs
sh-rp Jul 11, 2024
12adb5c
fix bigquery resume test
sh-rp Jul 11, 2024
695c209
add additional check to bigquery job resume test
sh-rp Jul 12, 2024
59c09cc
write to delta tables in single commit
sh-rp Jul 12, 2024
7feafab
fix broken filesystem loading
sh-rp Jul 12, 2024
4275308
add some simple jobs tests
sh-rp Jul 12, 2024
79a610a
fix recursion problem
sh-rp Jul 12, 2024
8dcac5b
remove a bit of unneded code
sh-rp Jul 15, 2024
ffdb01b
Merge remote-tracking branch 'origin/feat/continuous-load-jobs' into …
sh-rp Jul 15, 2024
5ba9124
do not open remote connection when creating a load job
sh-rp Jul 15, 2024
b25b857
fix weaviate
sh-rp Jul 15, 2024
9179c99
Merge branch 'devel' into feat/continuous-load-jobs
sh-rp Jul 15, 2024
124316e
Merge branch 'devel' into feat/continuous-load-jobs
sh-rp Jul 15, 2024
3f79ddc
post devel merge fixes
sh-rp Jul 15, 2024
7109d33
only update load package info if jobs where finalized
sh-rp Jul 15, 2024
3d43ddb
fix two obviously wrong tests...
sh-rp Jul 15, 2024
06015e0
create client on thread for jobs
sh-rp Jul 16, 2024
00eda96
fix sql_client / job_client vars
sh-rp Jul 16, 2024
928e070
add tests for available slots and update tests for getting filtering …
sh-rp Jul 16, 2024
1785641
clean up complete package condition
sh-rp Jul 16, 2024
dcb683f
Merge branch 'devel' into feat/continuous-load-jobs
sh-rp Jul 16, 2024
187a5eb
Merge branch 'devel' into feat/continuous-load-jobs
sh-rp Jul 16, 2024
258f5d4
improve table-sequential job filtering
sh-rp Jul 16, 2024
607990c
fix resume job test
sh-rp Jul 16, 2024
ea25801
fix load job init exceptions tests
sh-rp Jul 16, 2024
828bf4c
remove test stubs for tests that already exist
sh-rp Jul 16, 2024
f3ca312
add some benchmark code to loader tests (in progress)
sh-rp Jul 16, 2024
0e87a69
amend loader benchmark test
sh-rp Jul 16, 2024
9fb8c5c
remove job_client from RunnableLoadJob initializer params
sh-rp Jul 16, 2024
b8f7420
fix bg streaming insert
sh-rp Jul 16, 2024
eb882d0
fix bigquery streaming insert
sh-rp Jul 17, 2024
f3161af
small renaming and logging changes
sh-rp Jul 18, 2024
07c279a
remove delta job type in favor of using the reference jobs
sh-rp Jul 18, 2024
26fef1b
nicer logging when jobs pool is being drained
sh-rp Jul 18, 2024
36b1997
small comment change
sh-rp Jul 18, 2024
999ab9d
test exception in followup job creation
sh-rp Jul 18, 2024
2d4c7d4
add tests for followup jobs
sh-rp Jul 18, 2024
19a90ac
improve dummy tests for better followup job testing
sh-rp Jul 18, 2024
b6e4fca
Merge branch 'devel' into feat/continuous-load-jobs
sh-rp Jul 18, 2024
1c73de1
fix linter
sh-rp Jul 18, 2024
d35842c
Merge branch 'devel' into feat/continuous-load-jobs
sh-rp Jul 30, 2024
90f820c
put sleep amount back to 1.0 while checking for completed load jobs
sh-rp Jul 30, 2024
6ba32f8
create explicit exceptions for failed table chain jobs
sh-rp Jul 30, 2024
9142a1b
make the large load package test faster
sh-rp Jul 30, 2024
9fc995e
fix trace test
sh-rp Jul 30, 2024
bf9f912
allow clients to prepare for job execution on thread and move query t…
sh-rp Jul 30, 2024
5c07c07
fix runnable job tests and linter
sh-rp Jul 30, 2024
ce3e1c9
fix linter again and remove wrong value from tests
sh-rp Jul 30, 2024
7fe2f46
test
sh-rp Jul 31, 2024
7e569af
update detection of pending jobs, will probably break some tests
sh-rp Jul 31, 2024
960f309
fix two tests of pending packages
sh-rp Jul 31, 2024
1cf2207
fix test_remove_pending_packages test
sh-rp Jul 31, 2024
5b6717c
Merge branch 'devel' into feat/continuous-load-jobs
sh-rp Aug 2, 2024
3423ca7
switch to docker compose subcommand
sh-rp Aug 2, 2024
4b21365
fix compose deployments
sh-rp Aug 2, 2024
2c38f13
fix test for arrow version in delta tables
sh-rp Aug 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 109 additions & 54 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from dlt.common import logger
from dlt.common.configuration.specs.base_configuration import extract_inner_hint
from dlt.common.destination.utils import verify_schema_capabilities
from dlt.common.exceptions import TerminalValueError
from dlt.common.normalizers.naming import NamingConvention
from dlt.common.schema import Schema, TTableSchema, TSchemaTables
from dlt.common.schema.utils import (
Expand All @@ -42,6 +43,8 @@
InvalidDestinationReference,
UnknownDestinationModule,
DestinationSchemaTampered,
DestinationTransientException,
DestinationTerminalException,
)
from dlt.common.schema.exceptions import UnknownTableException
from dlt.common.storages import FileStorage
Expand Down Expand Up @@ -258,11 +261,45 @@ class DestinationClientDwhWithStagingConfiguration(DestinationClientDwhConfigura
"""configuration of the staging, if present, injected at runtime"""


TLoadJobState = Literal["running", "failed", "retry", "completed"]
TLoadJobState = Literal["ready", "running", "failed", "retry", "completed"]


class LoadJob:
"""Represents a job that loads a single file
class LoadJob(ABC):
"""
A stateful load job, represents one job file
"""

def __init__(self, file_path: str) -> None:
self._file_path = file_path
self._file_name = FileStorage.get_file_name_from_file_path(file_path)
# NOTE: we only accept a full filepath in the constructor
assert self._file_name != self._file_path
self._parsed_file_name = ParsedLoadJobFileName.parse(self._file_name)

def job_id(self) -> str:
"""The job id that is derived from the file name and does not changes during job lifecycle"""
return self._parsed_file_name.job_id()
rudolfix marked this conversation as resolved.
Show resolved Hide resolved

def file_name(self) -> str:
"""A name of the job file"""
return self._file_name

def job_file_info(self) -> ParsedLoadJobFileName:
return self._parsed_file_name

@abstractmethod
def state(self) -> TLoadJobState:
"""Returns current state. Should poll external resource if necessary."""
pass

@abstractmethod
def exception(self) -> str:
"""The exception associated with failed or retry states"""
pass


class RunnableLoadJob(LoadJob, ABC):
"""Represents a runnable job that loads a single file

Each job starts in "running" state and ends in one of terminal states: "retry", "failed" or "completed".
Each job is uniquely identified by a file name. The file is guaranteed to exist in "running" state. In terminal state, the file may not be present.
Expand All @@ -273,75 +310,96 @@ class LoadJob:
immediately transition job into "failed" or "retry" state respectively.
"""

def __init__(self, file_name: str) -> None:
def __init__(self, file_path: str) -> None:
"""
File name is also a job id (or job id is deterministically derived) so it must be globally unique
"""
# ensure file name
assert file_name == FileStorage.get_file_name_from_file_path(file_name)
self._file_name = file_name
self._parsed_file_name = ParsedLoadJobFileName.parse(file_name)
super().__init__(file_path)
self._state: TLoadJobState = "ready"
self._exception: Exception = None

@abstractmethod
def state(self) -> TLoadJobState:
"""Returns current state. Should poll external resource if necessary."""
pass
# variables needed by most jobs, set by the loader in set_run_vars
self._schema: Schema = None
self._load_table: TTableSchema = None
self._load_id: str = None
self._job_client: "JobClientBase" = None

def file_name(self) -> str:
"""A name of the job file"""
return self._file_name
def set_run_vars(self, load_id: str, schema: Schema, load_table: TTableSchema) -> None:
"""
called by the loader right before the job is run
"""
self._load_id = load_id
self._schema = schema
self._load_table = load_table

def job_id(self) -> str:
"""The job id that is derived from the file name and does not changes during job lifecycle"""
return self._parsed_file_name.job_id()
@property
def load_table_name(self) -> str:
return self._load_table["name"]

def job_file_info(self) -> ParsedLoadJobFileName:
return self._parsed_file_name
def run_managed(
self,
job_client: "JobClientBase",
) -> None:
"""
wrapper around the user implemented run method
"""
# only jobs that are not running or have not reached a final state
# may be started
assert self._state in ("ready", "retry")
self._job_client = job_client

# filepath is now moved to running
try:
self._state = "running"
self.run()
self._state = "completed"
except (DestinationTerminalException, TerminalValueError) as e:
logger.exception(f"Terminal problem when starting job {self.file_name}")
self._state = "failed"
self._exception = e
except (DestinationTransientException, Exception) as e:
logger.exception(f"Temporary problem when starting job {self.file_name}")
self._state = "retry"
self._exception = e
finally:
# sanity check
assert self._state in ("completed", "retry", "failed")

@abstractmethod
def run(self) -> None:
"""
run the actual job, this will be executed on a thread and should be implemented by the user
exception will be handled outside of this function
"""
raise NotImplementedError()

def state(self) -> TLoadJobState:
"""Returns current state. Should poll external resource if necessary."""
return self._state

def exception(self) -> str:
"""The exception associated with failed or retry states"""
pass
return str(self._exception)


class NewLoadJob(LoadJob):
"""Adds a trait that allows to save new job file"""
class FollowupJob:
"""Base class for follow up jobs that should be created"""

@abstractmethod
def new_file_path(self) -> str:
"""Path to a newly created temporary job file. If empty, no followup job should be created"""
pass


class FollowupJob:
"""Adds a trait that allows to create a followup job"""
class HasFollowupJobs:
"""Adds a trait that allows to create single or table chain followup jobs"""

def create_followup_jobs(self, final_state: TLoadJobState) -> List[NewLoadJob]:
def create_followup_jobs(self, final_state: TLoadJobState) -> List[FollowupJob]:
"""Return list of new jobs. `final_state` is state to which this job transits"""
return []


class DoNothingJob(LoadJob):
"""The most lazy class of dlt"""

def __init__(self, file_path: str) -> None:
super().__init__(FileStorage.get_file_name_from_file_path(file_path))

def state(self) -> TLoadJobState:
# this job is always done
return "completed"

def exception(self) -> str:
# this part of code should be never reached
raise NotImplementedError()


class DoNothingFollowupJob(DoNothingJob, FollowupJob):
"""The second most lazy class of dlt"""

pass


class JobClientBase(ABC):
def __init__(
self,
Expand Down Expand Up @@ -394,13 +452,10 @@ def update_stored_schema(
return expected_update

@abstractmethod
def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob:
"""Creates and starts a load job for a particular `table` with content in `file_path`"""
pass

@abstractmethod
def restore_file_load(self, file_path: str) -> LoadJob:
"""Finds and restores already started loading job identified by `file_path` if destination supports it."""
def create_load_job(
self, table: TTableSchema, file_path: str, load_id: str, restore: bool = False
) -> LoadJob:
"""Creates a load job for a particular `table` with content in `file_path`"""
pass

def should_truncate_table_before_load(self, table: TTableSchema) -> bool:
Expand All @@ -410,7 +465,7 @@ def create_table_chain_completed_followup_jobs(
self,
table_chain: Sequence[TTableSchema],
completed_table_chain_jobs: Optional[Sequence[LoadJobInfo]] = None,
) -> List[NewLoadJob]:
) -> List[FollowupJob]:
"""Creates a list of followup jobs that should be executed after a table chain is completed"""
return []

Expand Down
5 changes: 5 additions & 0 deletions dlt/common/runtime/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ def raise_if_signalled() -> None:
raise SignalReceivedException(_received_signal)


def signal_received() -> bool:
"""check if a signal was received"""
return True if _received_signal else False


def sleep(sleep_seconds: float) -> None:
"""A signal-aware version of sleep function. Will raise SignalReceivedException if signal was received during sleep period."""
# do not allow sleeping if signal was received
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
VARIANT_FIELD_FORMAT = "v_%s"
TFileOrPath = Union[str, PathLike, IO[Any]]
TSortOrder = Literal["asc", "desc"]
TLoaderFileFormat = Literal["jsonl", "typed-jsonl", "insert_values", "parquet", "csv"]
TLoaderFileFormat = Literal["jsonl", "typed-jsonl", "insert_values", "parquet", "csv", "reference"]
"""known loader file formats"""


Expand Down
35 changes: 22 additions & 13 deletions dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@
)
from dlt.common.schema.utils import table_schema_has_type
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.reference import LoadJob, DoNothingFollowupJob, DoNothingJob
from dlt.common.destination.reference import NewLoadJob, SupportsStagingDestination
from dlt.common.destination.reference import LoadJob
from dlt.common.destination.reference import FollowupJob, SupportsStagingDestination
from dlt.common.data_writers.escape import escape_hive_identifier
from dlt.destinations.sql_jobs import SqlStagingCopyJob, SqlMergeJob
from dlt.destinations.sql_jobs import SqlStagingCopyFollowupJob, SqlMergeFollowupJob

from dlt.destinations.typing import DBApi, DBTransaction
from dlt.destinations.exceptions import (
Expand All @@ -65,6 +65,7 @@
)
from dlt.destinations.typing import DBApiCursor
from dlt.destinations.job_client_impl import SqlJobClientWithStaging
from dlt.destinations.job_impl import FinalizedLoadJobWithFollowupJobs, FinalizedLoadJob
from dlt.destinations.impl.athena.configuration import AthenaClientConfiguration
from dlt.destinations.type_mapping import TypeMapper
from dlt.destinations import path_utils
Expand Down Expand Up @@ -160,7 +161,7 @@ def __init__(self) -> None:
DLTAthenaFormatter._INSTANCE = self


class AthenaMergeJob(SqlMergeJob):
class AthenaMergeJob(SqlMergeFollowupJob):
@classmethod
def _new_temp_table_name(cls, name_prefix: str, sql_client: SqlClientBase[Any]) -> str:
# reproducible name so we know which table to drop
Expand Down Expand Up @@ -468,40 +469,48 @@ def _get_table_update_sql(
LOCATION '{location}';""")
return sql

def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob:
def create_load_job(
self, table: TTableSchema, file_path: str, load_id: str, restore: bool = False
) -> LoadJob:
"""Starts SqlLoadJob for files ending with .sql or returns None to let derived classes to handle their specific jobs"""
if table_schema_has_type(table, "time"):
raise LoadJobTerminalException(
file_path,
"Athena cannot load TIME columns from parquet tables. Please convert"
" `datetime.time` objects in your data to `str` or `datetime.datetime`.",
)
job = super().start_file_load(table, file_path, load_id)
job = super().create_load_job(table, file_path, load_id, restore)
if not job:
job = (
DoNothingFollowupJob(file_path)
FinalizedLoadJobWithFollowupJobs(file_path)
if self._is_iceberg_table(self.prepare_load_table(table["name"]))
else DoNothingJob(file_path)
else FinalizedLoadJob(file_path)
)
return job

def _create_append_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]:
def _create_append_followup_jobs(
self, table_chain: Sequence[TTableSchema]
) -> List[FollowupJob]:
if self._is_iceberg_table(self.prepare_load_table(table_chain[0]["name"])):
return [
SqlStagingCopyJob.from_table_chain(table_chain, self.sql_client, {"replace": False})
SqlStagingCopyFollowupJob.from_table_chain(
table_chain, self.sql_client, {"replace": False}
)
]
return super()._create_append_followup_jobs(table_chain)

def _create_replace_followup_jobs(
self, table_chain: Sequence[TTableSchema]
) -> List[NewLoadJob]:
) -> List[FollowupJob]:
if self._is_iceberg_table(self.prepare_load_table(table_chain[0]["name"])):
return [
SqlStagingCopyJob.from_table_chain(table_chain, self.sql_client, {"replace": True})
SqlStagingCopyFollowupJob.from_table_chain(
table_chain, self.sql_client, {"replace": True}
)
]
return super()._create_replace_followup_jobs(table_chain)

def _create_merge_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]:
def _create_merge_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[FollowupJob]:
return [AthenaMergeJob.from_table_chain(table_chain, self.sql_client)]

def _is_iceberg_table(self, table: TTableSchema) -> bool:
Expand Down
Loading
Loading