Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for continuously starting load jobs as slots free up in the loader #1494

Merged
merged 104 commits into from
Aug 4, 2024

Conversation

sh-rp
Copy link
Collaborator

@sh-rp sh-rp commented Jun 19, 2024

Description

In the current implementation we more or less always start n (=max workers) load jobs, let them complete and then rerun the whole loader to schedule the next n jobs. In this PR we submit new load jobs as slots free up.

What is happening here:

  • The loader now periodically checks all jobs wether they are done and schedules new ones as needed.
  • Runnable Jobs now manage their own internal state (file moving still needs to be done by the loader on the main thread) and have a dedicated "run" method which is the one called on a thread.
  • Renaming of the Job base classes to make clearer what is going on:
    • RunnableLoadJob: Jobs that actually do something and should be executed on a thread
    • FollowupJob: Class that creates a new job persisted to the disc
    • HasFollowupJobs (ex-FollowupJob): Trait that tells loader to look for followup Jobs
    • FinalizedJob: Not runnable because it already has an actionable state (completed, failed, retry). Used for indicating failed restored jobs, completed restored jobs and cases where nothing needs to be done
  • FollowupJobs always go to the "new_jobs" folder and need to be picked up by the loader as any other jobs do, there were some just directly executed in the main thread: not good! :) We now assume that creation of FollowupJobs does not yield exceptions, I don't think this is needed and it makes the code simpler.
  • (Hopefully) simplification of the load class
  • Restoring jobs is now handled the same way as creating jobs is, this simplifies the code in a few places. Jobs that are found in the running folder now are simply started again, the bigquery job can figure out wether it needs to be resumed on its own.

Possible FollowupWork:

  • Performance Tuning in loader, a lot of time is spent on parsing filenames (I think).

Copy link

netlify bot commented Jun 19, 2024

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit 2c38f13
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/66acccc36859290008ab7658

dlt/load/load.py Show resolved Hide resolved
dlt/load/load.py Outdated
if (
len(self.load_storage.list_new_jobs(load_id)) == 0
and len(self.load_storage.normalized_packages.list_started_jobs(load_id)) == 0
):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the correct "package completion" condition? I think so, but am not 100% sure.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was the previous condition:

        if file_count == 0:
            logger.info(f"No new jobs found in {load_id}")
            return 0, []

so checking just new jobs. I do not fully get why you do it here again? the loop above should exit only when all jobs are completed, no?

@@ -96,15 +96,15 @@ def test_unsupported_write_disposition() -> None:
load.load_storage.normalized_packages.save_schema(load_id, schema)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I needed to change a bunch of tests, since we do not rely on multiple executions of the run method anymore. All the changes make sense, it might be good to add a few more tests cases specific to the new implementation.

@sh-rp sh-rp marked this pull request as ready for review June 19, 2024 14:57
dlt/load/load.py Outdated Show resolved Hide resolved
dlt/load/load.py Outdated
remaining_jobs: List[LoadJob] = []
# if an exception condition was met, return it to the main runner
pending_exception: Exception = None
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to collect the exception to raise in the main loop here now, we could alternatively collect all problems we find and print them out, not just raise on exception.

@sh-rp sh-rp added the enhancement New feature or request label Jun 20, 2024
@rudolfix rudolfix added the sprint Marks group of tasks with core team focus at this moment label Jun 26, 2024
# Conflicts:
#	dlt/load/load.py
#	tests/load/test_dummy_client.py
@sh-rp sh-rp force-pushed the feat/continuous-load-jobs branch from c265ecc to b4d05c8 Compare June 27, 2024 13:31
@sh-rp sh-rp force-pushed the feat/continuous-load-jobs branch from 0a9b5c3 to da8c9e6 Compare July 2, 2024 10:45
@rudolfix rudolfix removed the sprint Marks group of tasks with core team focus at this moment label Jul 3, 2024
@sh-rp
Copy link
Collaborator Author

sh-rp commented Jul 18, 2024

@rudolfix this can go into another review. Two open questions from my side:

  1. Should we change the behavior of def is_package_partially_loaded (see above)
  2. What exactly is the desired behavior from a conceptual standpoint when creating a followup job fails? In the old version, loading just continues and those jobs are marked as failed. I don't think this makes sense, because the load will be useless if for example a mergejob can't be created and executed. So either we decide
  • this is a transient problem (e.g. the user can manually fix the schema and restart he load) , in that case we just raise an exception on the main thread, so that the job that triggered the scheduling of a followupjob remains in "started_jobs" and will be rerun on the pipeline execution including the scheduling of the followupjob (it is implemented and tested like this now)
  • or this is a terminal problem, in which case we should also stop the load but mark the loadpackage as failed

# Conflicts:
#	dlt/destinations/impl/filesystem/filesystem.py
@sh-rp sh-rp force-pushed the feat/continuous-load-jobs branch from 835a49d to bd252f0 Compare July 18, 2024 16:31
@sh-rp sh-rp force-pushed the feat/continuous-load-jobs branch from bd252f0 to 1c73de1 Compare July 18, 2024 16:33
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly is the desired behavior from a conceptual standpoint when creating a followup job fails? In the old version, loading just continues and those jobs are marked as failed. I don't think this makes sense, because the load will be useless if for example a mergejob can't be created and executed. So either we decide
this is a transient problem (e.g. the user can manually fix the schema and restart he load) , in that case we just raise an exception on the main thread, so that the job that triggered the scheduling of a followupjob remains in "started_jobs" and will be rerun on the pipeline execution including the scheduling of the followupjob (it is implemented and tested like this now)
or this is a terminal problem, in which case we should also stop the load but mark the loadpackage as failed

my take:
make it a transient error

and we need how we deal with failed packages. right now we continue load and do not raise exception at the end. IMO we should change that. we should continue load but raise exception at the end automatically.

I'll write a ticket for that - it ia a breaking change

dlt/load/load.py Outdated Show resolved Hide resolved
) and job_client.should_load_data_to_staging_dataset(load_table)

# set job vars
job.set_run_vars(load_id=load_id, schema=schema, load_table=load_table)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to change tagging stuff:

  1. we tag session in:
def create_load_job(
        self, table: TTableSchema, file_path: str, load_id: str, restore: bool = False
    ) -> LoadJob:
        """Starts SqlLoadJob for files ending with .sql or returns None to let derived classes to handle their specific jobs"""
        self._set_query_tags_for_job(load_id, table)

which is called from main thread and not supposed to open any connection. IDK how it works now :) but even if it does, it will tag a session on main thread and then we immediately close the connection and the reopen it on the worker thread but in that case tagging does not happen.

  1. Since you passed all required params in set_vars, we do not need to take any parameters in here:
def _set_query_tags_for_job(self, load_id: str, table: TTableSchema) -> None:
  1. this function should be called in run method of all jobs that have sql_client (created by sql_job_client). so we need to move it from client to the job... which is a good move

dlt/load/load.py Outdated
# this will raise on signal
sleep(1)
sleep(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see the above. are we still reading any job listings when looping on idle?

venv = Venv.restore_current()
with pytest.raises(CalledProcessError) as cpe:
print(venv.run_script("chess_pipeline.py"))

# move job into running folder manually
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to change how is_package_partially_loaded works. partially loaded -> has packages that are not completed and has packages that are completed.

current implementation assumes that failed packages, started packages and retried packages modify the destination. this is IMO wrong (we assume that jobs are atomic - most of them are). Now I think that was wrong from the start
WDYT?

)

# job will be automatically found and resumed
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK! I probably overlooked the lines below!

# sanity check
assert duration > 5

# we want 1000 empty processed jobs to need less than 15 seconds total (locally it runs in 10)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. but this idle loop where we sleep 0.1 seconds worries. do we have 100% cpu usage when this test run? maybe it is a little bit faster but we saturate CPU (while having some threads working). pls take a look. with 50k or 100k jobs maybe.

we must avoid starving threads by an idle loop that reads 50k files over and over

with pytest.raises(PipelineStepFailed):
pipeline.run(airtable_emojis())
# move job into running folder manually
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@property
    def has_pending_data(self) -> bool:
        """Tells if the pipeline contains any extracted files or pending load packages"""
        return (
            len(self.list_normalized_load_packages()) > 0
            or len(self.list_extracted_load_packages()) > 0
        )

it does not even look into package content. any not completed package is pending and will be executed before new package is created. this check is main reason this function exist

I answered is_package_partially_loaded question above

tests/pipeline/test_pipeline_trace.py Show resolved Hide resolved
@sh-rp
Copy link
Collaborator Author

sh-rp commented Jul 30, 2024

my take: make it a transient error

and we need how we deal with failed packages. right now we continue load and do not raise exception at the end. IMO we should change that. we should continue load but raise exception at the end automatically.

I'll write a ticket for that - it ia a breaking change

Ok, I changed it slightly and added new exceptions to indicate what went wrong plus tests. With regards to the load package with failing jobs: I totally agree that that should raise at the end of the load if there were failed jobs. Now these errors are pretty much hidden.

@sh-rp sh-rp force-pushed the feat/continuous-load-jobs branch from d1b2144 to ce3e1c9 Compare July 30, 2024 14:50
@@ -723,19 +723,12 @@ def build_job_file_name(

@staticmethod
def is_package_partially_loaded(package_info: LoadPackageInfo) -> bool:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the behavior is unified now between the different package states, I'd say this is correct.

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@sh-rp sh-rp merged commit 3bb677f into devel Aug 4, 2024
54 checks passed
@sh-rp sh-rp deleted the feat/continuous-load-jobs branch August 4, 2024 22:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci full run the full load tests on pr enhancement New feature or request
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

2 participants