-
Notifications
You must be signed in to change notification settings - Fork 6
DATAUP-530 Refactor to bulk insert #406
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e9ac3c2
b3884a6
a8cc2ae
606d292
18b0109
9dc9fdf
6404b86
0b65043
400c1ab
f16943a
ad74923
dd8cfcf
36cbb6f
1d977a6
cac5241
c3e08b3
a7256a4
f55ab10
27ef29a
2a0722f
9bbbd61
d857a37
add6f44
a637bb4
9be3578
0183f98
af03d77
76fcb1a
3d517d9
8e19ccf
2822e77
ac4b6af
f244544
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,18 +3,20 @@ | |
import time | ||
import traceback | ||
from contextlib import contextmanager | ||
from typing import Dict | ||
from datetime import datetime | ||
from typing import Dict, List | ||
|
||
from bson.objectid import ObjectId | ||
from mongoengine import connect, connection | ||
from pymongo import MongoClient | ||
from pymongo import MongoClient, UpdateOne | ||
from pymongo.errors import ServerSelectionTimeoutError | ||
|
||
from lib.execution_engine2.db.models.models import JobLog, Job, Status, TerminatedCode | ||
from lib.execution_engine2.exceptions import ( | ||
from execution_engine2.db.models.models import JobLog, Job, Status, TerminatedCode | ||
from execution_engine2.exceptions import ( | ||
RecordNotFoundException, | ||
InvalidStatusTransitionException, | ||
) | ||
from execution_engine2.sdk.EE2Runjob import JobIdPair | ||
|
||
|
||
class MongoUtil: | ||
|
@@ -216,7 +218,9 @@ def get_job(self, job_id=None, exclude_fields=None) -> Job: | |
|
||
return job | ||
|
||
def get_jobs(self, job_ids=None, exclude_fields=None, sort_id_ascending=None): | ||
def get_jobs( | ||
self, job_ids=None, exclude_fields=None, sort_id_ascending=None | ||
) -> List[Job]: | ||
if not (job_ids and isinstance(job_ids, list)): | ||
raise ValueError("Please provide a non empty list of job ids") | ||
|
||
|
@@ -263,6 +267,68 @@ def check_if_already_finished(job_status): | |
return True | ||
return False | ||
|
||
def update_jobs_to_queued( | ||
self, job_id_pairs: List[JobIdPair], scheduler_type: str = "condor" | ||
) -> None: | ||
f""" | ||
* Adds scheduler id to list of jobs | ||
* Updates a list of {Status.created.value} jobs to queued. Does not work on jobs that already have gone through any other | ||
status transition. If the record is not in the {Status.created.value} status, nothing will happen | ||
:param job_id_pairs: A list of pairs of Job Ids and Scheduler Ids | ||
:param scheduler_type: The scheduler this job was queued in, default condor | ||
""" | ||
|
||
bulk_update_scheduler_jobs = [] | ||
bulk_update_created_to_queued = [] | ||
queue_time_now = datetime.utcnow().timestamp() | ||
for job_id_pair in job_id_pairs: | ||
if job_id_pair.job_id is None: | ||
raise ValueError( | ||
f"Provided a bad job_id_pair, missing job_id for {job_id_pair.scheduler_id}" | ||
) | ||
elif job_id_pair.scheduler_id is None: | ||
raise ValueError( | ||
f"Provided a bad job_id_pair, missing scheduler_id for {job_id_pair.job_id}" | ||
) | ||
|
||
bulk_update_scheduler_jobs.append( | ||
UpdateOne( | ||
{ | ||
"_id": ObjectId(job_id_pair.job_id), | ||
}, | ||
{ | ||
"$set": { | ||
"scheduler_id": job_id_pair.scheduler_id, | ||
"scheduler_type": scheduler_type, | ||
} | ||
}, | ||
) | ||
) | ||
bulk_update_created_to_queued.append( | ||
UpdateOne( | ||
{ | ||
"_id": ObjectId(job_id_pair.job_id), | ||
"status": Status.created.value, | ||
}, | ||
{ | ||
"$set": { | ||
"status": Status.queued.value, | ||
"queued": queue_time_now, | ||
} | ||
}, | ||
) | ||
) | ||
# Update provided jobs with scheduler id. Then only update non terminated jobs into updated status. | ||
mongo_collection = self.config["mongo-jobs-collection"] | ||
|
||
if bulk_update_scheduler_jobs: | ||
with self.pymongo_client(mongo_collection) as pymongo_client: | ||
ee2_jobs_col = pymongo_client[self.mongo_database][mongo_collection] | ||
# Bulk Update to add scheduler ids | ||
ee2_jobs_col.bulk_write(bulk_update_scheduler_jobs, ordered=False) | ||
# Bulk Update to add queued status ids | ||
ee2_jobs_col.bulk_write(bulk_update_created_to_queued, ordered=False) | ||
|
||
def cancel_job(self, job_id=None, terminated_code=None): | ||
""" | ||
#TODO Should we check for a valid state transition here also? | ||
|
@@ -420,6 +486,18 @@ def update_job_status(self, job_id, status, msg=None, error_message=None): | |
def mongo_engine_connection(self): | ||
yield self.me_connection | ||
|
||
def insert_jobs(self, jobs_to_insert: List[Job]) -> List[ObjectId]: | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There aren't any unit tests for this function There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, wrote a basic test There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It tests that the method returns job IDs, but not that the jobs are actually correctly inserted to the DB. If this were me writing the tests I'd retrieve the jobs either via the MongoUtil API or directly from mongo and ensure the data was what I expected There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alright, added that, but not quite sure if that's what you had in mind There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it is, except the problem is that equality on Job objects only tests the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alright well how about using
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems reasonable There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bah, realized that the modify command isn't updating the "updated" timestamp There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added this test in a new PR #416 |
||
Insert multiple job records using MongoEngine | ||
:param jobs_to_insert: Multiple jobs to insert at once | ||
:return: List of job ids from the insertion | ||
""" | ||
# TODO Look at pymongo write_concerns that may be useful | ||
# TODO see if pymongo is faster | ||
# TODO: Think about error handling | ||
inserted = Job.objects.insert(doc_or_docs=jobs_to_insert, load_bulk=False) | ||
bio-boris marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return inserted | ||
|
||
def insert_one(self, doc): | ||
""" | ||
insert a doc into collection | ||
|
Uh oh!
There was an error while loading. Please reload this page.