diff --git a/src/DIRAC/TransformationSystem/Client/TransformationClient.py b/src/DIRAC/TransformationSystem/Client/TransformationClient.py index 9c82d2683d3..3710e25df53 100644 --- a/src/DIRAC/TransformationSystem/Client/TransformationClient.py +++ b/src/DIRAC/TransformationSystem/Client/TransformationClient.py @@ -163,6 +163,7 @@ def getTransformationFiles( timeout=1800, offset=0, maxfiles=None, + columns=None, ): """gets all the transformation files for a transformation, incrementally. "limit" here is just used to determine the offset. @@ -173,34 +174,37 @@ def getTransformationFiles( condDict = {} if timeStamp is None: timeStamp = "LastUpdate" - # getting transformationFiles - incrementally - if "LFN" in condDict: - if isinstance(condDict["LFN"], str): - lfnList = [condDict["LFN"]] - else: - lfnList = sorted(condDict["LFN"]) - # If a list of LFNs is given, use chunks of 1000 only - limit = limit if limit else 1000 + + if "LFN" not in condDict: + res = rpcClient.getTransformationFiles( + condDict, older, newer, timeStamp, orderAttribute, offset, maxfiles, columns + ) + if not res["OK"]: + return res + return S_OK(res["Value"]) + + # If LFNs requested continue to do the old behavior of requesting in small batches + # Probably not needed? + if isinstance(condDict["LFN"], str): + lfnList = [condDict["LFN"]] else: - # By default get by chunks of 10000 files - lfnList = [] - limit = limit if limit else 10000 + lfnList = sorted(condDict["LFN"]) + # If a list of LFNs is given, default to chunks of 1000 only + limit = limit if limit else 1000 + transID = condDict.get("TransformationID", "Unknown") offsetToApply = offset retries = 5 while True: - if lfnList: - # If list is exhausted, exit - if offsetToApply >= len(lfnList): - break - # Apply the offset to the list of LFNs - condDict["LFN"] = lfnList[offsetToApply : offsetToApply + limit] - # No limit and no offset as the list is limited already - res = rpcClient.getTransformationFiles(condDict, older, newer, timeStamp, orderAttribute, None, None) - else: - res = rpcClient.getTransformationFiles( - condDict, older, newer, timeStamp, orderAttribute, limit, offsetToApply - ) + # If list is exhausted, exit + if offsetToApply >= len(lfnList): + break + # Apply the offset to the list of LFNs + condDict["LFN"] = lfnList[offsetToApply : offsetToApply + limit] + # No limit and no offset as the list is limited already + res = rpcClient.getTransformationFiles( + condDict, older, newer, timeStamp, orderAttribute, None, None, columns + ) if not res["OK"]: gLogger.error( "Error getting files for transformation %s (offset %d), %s" @@ -211,36 +215,19 @@ def getTransformationFiles( if retries: continue return res - else: - condDictStr = str(condDict) - log = gLogger.debug if len(condDictStr) > 100 else gLogger.verbose - if not log( - "For conditions %s: result for limit %d, offset %d: %d files" - % (condDictStr, limit, offsetToApply, len(res["Value"])) - ): - gLogger.verbose( - "For condition keys %s (trans %s): result for limit %d, offset %d: %d files" - % ( - str(sorted(condDict)), - condDict.get("TransformationID", "None"), - limit, - offsetToApply, - len(res["Value"]), - ) - ) - if res["Value"]: - transformationFiles += res["Value"] - # Limit the number of files returned - if maxfiles and len(transformationFiles) >= maxfiles: - transformationFiles = transformationFiles[:maxfiles] - break - # Less data than requested, exit only if LFNs were not given - if not lfnList and len(res["Value"]) < limit: + gLogger.verbose(f"Result for limit {limit}, offset {offsetToApply}: {len(res['Value'])} files") + if res["Value"]: + transformationFiles += res["Value"] + # Limit the number of files returned + if maxfiles and len(transformationFiles) >= maxfiles: + transformationFiles = transformationFiles[:maxfiles] break - offsetToApply += limit - # Reset number of retries for next chunk - retries = 5 - + # Less data than requested, exit only if LFNs were not given + if not lfnList and len(res["Value"]) < limit: + break + offsetToApply += limit + # Reset number of retries for next chunk + retries = 5 return S_OK(transformationFiles) def getTransformationTasks( diff --git a/src/DIRAC/TransformationSystem/DB/TransformationDB.py b/src/DIRAC/TransformationSystem/DB/TransformationDB.py index 174188b5b28..3763f4f2217 100755 --- a/src/DIRAC/TransformationSystem/DB/TransformationDB.py +++ b/src/DIRAC/TransformationSystem/DB/TransformationDB.py @@ -602,60 +602,47 @@ def getTransformationFiles( limit=None, offset=None, connection=False, + columns=None, ): """Get files for the supplied transformations with support for the web standard structure""" connection = self.__getConnection(connection) - req = f"SELECT {intListToString(self.TRANSFILEPARAMS)} FROM TransformationFiles" - originalFileIDs = {} - if condDict is None: - condDict = {} - if condDict or older or newer: - lfns = condDict.pop("LFN", None) - if lfns: - if isinstance(lfns, str): - lfns = [lfns] - res = self.__getFileIDsForLfns(lfns, connection=connection) - if not res["OK"]: - return res - originalFileIDs = res["Value"][0] - condDict["FileID"] = list(originalFileIDs) - for val in condDict.values(): - if not val: - return S_OK([]) + all_columns = ["LFN"] + self.TRANSFILEPARAMS + if columns is None: + columns = all_columns + elif not set(columns).issubset(all_columns): + return S_ERROR(f"Invalid columns requested, valid columns are: {all_columns}") + + req = ", ".join(f"t.{x}" if x == "LFN" else f"d.{x}" for x in columns) + req = f"SELECT {req} FROM TransformationFiles t" + if "LFN" in columns or (condDict and "LFN" in condDict): + req = f"{req} JOIN DataFiles d ON t.FileID = d.FileID" + + fixedCondDict = {} + if condDict: + for key, value in condDict.items(): + if key in self.TRANSFILEPARAMS: + fixedCondDict[f"t.{key}"] = value + elif key in ["LFN"]: + fixedCondDict[f"d.{key}"] = value + else: + return S_ERROR(f"Invalid key {key} in condDict") + if timeStamp: + timeStamp = f"t.{timeStamp}" + if fixedCondDict or older or newer: + cond = self.buildCondition(fixedCondDict, older, newer, timeStamp, orderAttribute, limit, offset=offset) + req = f"{req} WHERE {cond}" - req = "{} {}".format( - req, - self.buildCondition(condDict, older, newer, timeStamp, orderAttribute, limit, offset=offset), - ) res = self._query(req, conn=connection) if not res["OK"]: return res - transFiles = res["Value"] - fileIDs = [int(row[1]) for row in transFiles] - webList = [] - resultList = [] - if not fileIDs: - originalFileIDs = {} - else: - if not originalFileIDs: - res = self.__getLfnsForFileIDs(fileIDs, connection=connection) - if not res["OK"]: - return res - originalFileIDs = res["Value"][1] - for row in transFiles: - lfn = originalFileIDs[row[1]] - # Prepare the structure for the web - fDict = {"LFN": lfn} - fDict.update(dict(zip(self.TRANSFILEPARAMS, row))) - # Note: the line below is returning "None" if the item is None... This seems to work but is ugly... - rList = [lfn] + [str(item) if not isinstance(item, int) else item for item in row] - webList.append(rList) - resultList.append(fDict) + resultList = [dict(zip(columns, row)) for row in res["Value"]] + webList = [[str(item) if not isinstance(item, int) else item for item in row] for row in res["Value"]] + result = S_OK(resultList) result["Records"] = webList - result["ParameterNames"] = ["LFN"] + self.TRANSFILEPARAMS + result["ParameterNames"] = ["LFN"] + columns return result def getFileSummary(self, lfns, connection=False): diff --git a/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py b/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py index 145a2872f5d..a9449b1bba4 100644 --- a/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py +++ b/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py @@ -288,6 +288,7 @@ def export_getTransformationFiles( orderAttribute=None, limit=None, offset=None, + columns=None, ): if not condDict: condDict = {} @@ -300,6 +301,7 @@ def export_getTransformationFiles( limit=limit, offset=offset, connection=False, + columns=None, ) ####################################################################