Skip to content

DATAUP-530 Refactor to bulk insert #406

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 33 commits into from
Aug 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 83 additions & 5 deletions lib/execution_engine2/db/MongoUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@
import time
import traceback
from contextlib import contextmanager
from typing import Dict
from datetime import datetime
from typing import Dict, List

from bson.objectid import ObjectId
from mongoengine import connect, connection
from pymongo import MongoClient
from pymongo import MongoClient, UpdateOne
from pymongo.errors import ServerSelectionTimeoutError

from lib.execution_engine2.db.models.models import JobLog, Job, Status, TerminatedCode
from lib.execution_engine2.exceptions import (
from execution_engine2.db.models.models import JobLog, Job, Status, TerminatedCode
from execution_engine2.exceptions import (
RecordNotFoundException,
InvalidStatusTransitionException,
)
from execution_engine2.sdk.EE2Runjob import JobIdPair


class MongoUtil:
Expand Down Expand Up @@ -216,7 +218,9 @@ def get_job(self, job_id=None, exclude_fields=None) -> Job:

return job

def get_jobs(self, job_ids=None, exclude_fields=None, sort_id_ascending=None):
def get_jobs(
self, job_ids=None, exclude_fields=None, sort_id_ascending=None
) -> List[Job]:
if not (job_ids and isinstance(job_ids, list)):
raise ValueError("Please provide a non empty list of job ids")

Expand Down Expand Up @@ -263,6 +267,68 @@ def check_if_already_finished(job_status):
return True
return False

def update_jobs_to_queued(
self, job_id_pairs: List[JobIdPair], scheduler_type: str = "condor"
) -> None:
f"""
* Adds scheduler id to list of jobs
* Updates a list of {Status.created.value} jobs to queued. Does not work on jobs that already have gone through any other
status transition. If the record is not in the {Status.created.value} status, nothing will happen
:param job_id_pairs: A list of pairs of Job Ids and Scheduler Ids
:param scheduler_type: The scheduler this job was queued in, default condor
"""

bulk_update_scheduler_jobs = []
bulk_update_created_to_queued = []
queue_time_now = datetime.utcnow().timestamp()
for job_id_pair in job_id_pairs:
if job_id_pair.job_id is None:
raise ValueError(
f"Provided a bad job_id_pair, missing job_id for {job_id_pair.scheduler_id}"
)
elif job_id_pair.scheduler_id is None:
raise ValueError(
f"Provided a bad job_id_pair, missing scheduler_id for {job_id_pair.job_id}"
)

bulk_update_scheduler_jobs.append(
UpdateOne(
{
"_id": ObjectId(job_id_pair.job_id),
},
{
"$set": {
"scheduler_id": job_id_pair.scheduler_id,
"scheduler_type": scheduler_type,
}
},
)
)
bulk_update_created_to_queued.append(
UpdateOne(
{
"_id": ObjectId(job_id_pair.job_id),
"status": Status.created.value,
},
{
"$set": {
"status": Status.queued.value,
"queued": queue_time_now,
}
},
)
)
# Update provided jobs with scheduler id. Then only update non terminated jobs into updated status.
mongo_collection = self.config["mongo-jobs-collection"]

if bulk_update_scheduler_jobs:
with self.pymongo_client(mongo_collection) as pymongo_client:
ee2_jobs_col = pymongo_client[self.mongo_database][mongo_collection]
# Bulk Update to add scheduler ids
ee2_jobs_col.bulk_write(bulk_update_scheduler_jobs, ordered=False)
# Bulk Update to add queued status ids
ee2_jobs_col.bulk_write(bulk_update_created_to_queued, ordered=False)

def cancel_job(self, job_id=None, terminated_code=None):
"""
#TODO Should we check for a valid state transition here also?
Expand Down Expand Up @@ -420,6 +486,18 @@ def update_job_status(self, job_id, status, msg=None, error_message=None):
def mongo_engine_connection(self):
yield self.me_connection

def insert_jobs(self, jobs_to_insert: List[Job]) -> List[ObjectId]:
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There aren't any unit tests for this function

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, wrote a basic test

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It tests that the method returns job IDs, but not that the jobs are actually correctly inserted to the DB. If this were me writing the tests I'd retrieve the jobs either via the MongoUtil API or directly from mongo and ensure the data was what I expected

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, added that, but not quite sure if that's what you had in mind

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is, except the problem is that equality on Job objects only tests the _id, not any other fields. So the insert method could completely trash the incoming Job object and save it and the test will still pass. I'd update to use the assert_job_equal method from EE2RunJob_test.py (maybe that should be moved to a utility class at some point)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright well how about using

        for i, retrieved_job in enumerate(retrieved_jobs):
            assert jobs_to_insert[i].to_json() == retrieved_job.to_json()
            assert jobs_to_insert[i].to_mongo() == retrieved_job.to_mongo()
            assert jobs_to_insert[i].to_dbref() == retrieved_job.to_dbref()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable

Copy link
Collaborator Author

@bio-boris bio-boris Aug 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bah, realized that the modify command isn't updating the "updated" timestamp

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this test in a new PR #416

Insert multiple job records using MongoEngine
:param jobs_to_insert: Multiple jobs to insert at once
:return: List of job ids from the insertion
"""
# TODO Look at pymongo write_concerns that may be useful
# TODO see if pymongo is faster
# TODO: Think about error handling
inserted = Job.objects.insert(doc_or_docs=jobs_to_insert, load_bulk=False)
return inserted

def insert_one(self, doc):
"""
insert a doc into collection
Expand Down
5 changes: 2 additions & 3 deletions lib/execution_engine2/sdk/EE2Logs.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from enum import Enum
from typing import Dict, NamedTuple

from lib.execution_engine2.db.models.models import JobLog as JLModel, LogLines
from lib.execution_engine2.exceptions import RecordNotFoundException
from execution_engine2.db.models.models import JobLog as JLModel, LogLines
from execution_engine2.exceptions import RecordNotFoundException


# if TYPE_CHECKING:
Expand Down Expand Up @@ -104,7 +104,6 @@ def add_job_logs(self, job_id, log_lines, as_admin=False) -> AddLogResult:
self.sdkmr.get_job_with_permission(
job_id, JobPermissions.WRITE, as_admin=as_admin
)
self.sdkmr.logger.debug(f"About to add logs for {job_id}")
try:
try:
job_log = self.sdkmr.mongo_util.get_job_log_pymongo(job_id)
Expand Down
Loading