Skip to content

Commit

Permalink
refactor: Perform bulk lookup of job parameters from elasticsearch
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisburr committed May 5, 2024
1 parent 2ed3778 commit 3e88ad6
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 65 deletions.
17 changes: 17 additions & 0 deletions src/DIRAC/Core/Utilities/ElasticSearchDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,23 @@ def getDoc(self, index: str, docID: str) -> dict:
except RequestError as re:
return S_ERROR(re)

@ifConnected
def getDocs(self, indexFunc, docIDs: list[str]) -> list[dict]:
"""Efficiently retrieve many documents from an index.
:param index: name of the index
:param docIDs: document IDs
"""
sLog.debug(f"Retrieving documents {docIDs}")
docs = [{"_index": indexFunc(docID), "_id": docID} for docID in docIDs]
try:
response = self.client.mget({"docs": docs})
except RequestError as re:
return S_ERROR(re)
else:
results = {int(x["_id"]): x["_source"] for x in response["docs"] if x.get("found")}
return S_OK(results)

@ifConnected
def updateDoc(self, index: str, docID: str, body) -> dict:
"""Update an existing document with a script or partial document
Expand Down
92 changes: 33 additions & 59 deletions src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
- setJobParameter()
- deleteJobParameters()
"""
from collections import defaultdict
from typing import Union

from DIRAC import S_ERROR, S_OK
from DIRAC.ConfigurationSystem.Client.Helpers import CSGlobals
from DIRAC.Core.Base.ElasticDB import ElasticDB
Expand Down Expand Up @@ -82,7 +85,7 @@ def _createIndex(self, indexName: str) -> None:
raise RuntimeError(result["Message"])
self.log.always("Index created:", indexName)

def getJobParameters(self, jobID: int, paramList=None) -> dict:
def getJobParameters(self, jobIDs: Union[int, list[int]], paramList=None) -> dict:
"""Get Job Parameters defined for jobID.
Returns a dictionary with the Job Parameters.
If paramList is empty - all the parameters are returned.
Expand All @@ -92,51 +95,30 @@ def getJobParameters(self, jobID: int, paramList=None) -> dict:
:param paramList: list of parameters to be returned (also a string is treated)
:return: dict with all Job Parameter values
"""
if isinstance(jobIDs, int):
jobIDs = [jobIDs]
if isinstance(paramList, str):
paramList = paramList.replace(" ", "").split(",")
self.log.debug(f"JobDB.getParameters: Getting Parameters for job {jobID}")
resultDict = {}
inNewIndex = self.existsDoc(self._indexName(jobID), jobID)
inOldIndex = self._isInOldIndex(self.oldIndexName, jobID)
# Case 1: the parameters are stored in both indices
if inNewIndex and inOldIndex:
# First we get the parameters from the old index
self.log.debug(
f"A document with JobID {jobID} was found in the old index {self.oldIndexName} and in the new index {self._indexName(jobID)}"
)
resultDict = self._searchInOldIndex(jobID, paramList)
self.log.debug(f"JobDB.getParameters: Getting Parameters for jobs {jobIDs}")

# Now we get the parameters from the new index
res = self.getDoc(self._indexName(jobID), str(jobID))
if not res["OK"]:
self.log.error("Could not retrieve the data from the new index!", res["Message"])
else:
for key in res["Value"]:
if paramList and key not in paramList:
continue
# Add new parameters or overwrite the old ones
resultDict[key] = res["Value"][key]
# First search the old index
result = self._searchInOldIndex(jobIDs, paramList)

# Case 2: only in the old index
elif inOldIndex:
self.log.debug(f"A document with JobID {jobID} was found in the old index {self.oldIndexName}")
resultDict = self._searchInOldIndex(jobID, paramList)
# Next search the new index
res = self.getDocs(self._indexName, jobIDs)
if not res["OK"]:
return res

# Case 3: only in the new index
else:
self.log.debug(
f"The searched parameters with JobID {jobID} exists in the new index {self._indexName(jobID)}"
)
res = self.getDoc(self._indexName(jobID), str(jobID))
if not res["OK"]:
return res
resultDict = res["Value"]
# Update result, preferring parameters from the new index
for job_id, doc in res["Value"].items():
if job_id not in result:
result[job_id] = {}
if paramList:
for k in list(resultDict):
if k not in paramList:
resultDict.pop(k)
result[job_id] |= {k: v for k, v in doc.items() if k in paramList}
else:
result[job_id] |= doc

return S_OK({jobID: resultDict})
return S_OK(result)

def setJobParameter(self, jobID: int, key: str, value: str) -> dict:
"""
Expand Down Expand Up @@ -247,36 +229,28 @@ def _isInOldIndex(self, old_index: str, jobID: int) -> bool:
except (RequestError, NotFoundError):
return False

def _searchInOldIndex(self, jobID: int, paramList: list) -> bool:
def _searchInOldIndex(self, jobIDs: list[int], paramList: list):
"""Searches for a document with this jobID in the old index"""
if paramList:
if isinstance(paramList, str):
paramList = paramList.replace(" ", "").split(",")
else:
paramList = []

resultDict = {}

# the following should be equivalent to
# {
# "query": {
# "bool": {
# "filter": { # no scoring
# "term": {"JobID": jobID} # term level query, does not pass through the analyzer
# }
# }
# }
# }

s = self.dslSearch.query("bool", filter=self._Q("term", JobID=jobID))
s = self.dslSearch.query("bool", filter=self._Q("terms", JobID=jobIDs))

res = s.scan()

for hit in res:
pname = hit.Name
if paramList and pname not in paramList:
continue
resultDict[pname] = hit.Value
resultDict = {}
try:
for hit in res:
jobID = int(hit.JobID)
pname = hit.Name
if paramList and pname not in paramList:
continue
resultDict.setdefault(jobID, {})[pname] = hit.Value
except NotFoundError:
pass
return resultDict

def _deleteInOldIndex(self, jobID: int, paramList: list) -> dict:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,12 +483,10 @@ def export_getJobParameters(cls, jobIDs, parName=None):
if cls.elasticJobParametersDB:
if not isinstance(jobIDs, list):
jobIDs = [jobIDs]
parameters = {}
for jobID in jobIDs:
res = cls.elasticJobParametersDB.getJobParameters(jobID, parName)
if not res["OK"]:
return res
parameters.update(res["Value"])
res = cls.elasticJobParametersDB.getJobParameters(jobIDs, parName)
if not res["OK"]:
return res
parameters = res["Value"]

# Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends
res = cls.jobDB.getJobParameters(jobIDs, parName)
Expand Down

0 comments on commit 3e88ad6

Please sign in to comment.