From 98e1f42c0245d6546916cd037a655637a6c0e08b Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Thu, 30 May 2024 15:48:50 +0200 Subject: [PATCH] sweep: #7632 feat (DM): add a protocol option to getReplicas method family --- .../Client/DataManager.py | 25 +++++++++++++------ 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/src/DIRAC/DataManagementSystem/Client/DataManager.py b/src/DIRAC/DataManagementSystem/Client/DataManager.py index 75f51ce972e..1458aa61e6c 100644 --- a/src/DIRAC/DataManagementSystem/Client/DataManager.py +++ b/src/DIRAC/DataManagementSystem/Client/DataManager.py @@ -1558,10 +1558,16 @@ def put(self, lfn, fileName, diracSE, path=None): # File catalog methods # - def getActiveReplicas(self, lfns, getUrl=True, diskOnly=False, preferDisk=False): + def getActiveReplicas(self, lfns, getUrl=True, diskOnly=False, preferDisk=False, protocol=None): """Get all the replicas for the SEs which are in Active status for reading.""" return self.getReplicas( - lfns, allStatus=False, getUrl=getUrl, diskOnly=diskOnly, preferDisk=preferDisk, active=True + lfns, + allStatus=False, + getUrl=getUrl, + diskOnly=diskOnly, + preferDisk=preferDisk, + active=True, + protocol=protocol, ) def __filterTapeReplicas(self, replicaDict, diskOnly=False): @@ -1666,12 +1672,17 @@ def __checkSEStatus(self, se, status="Read"): """returns the value of a certain SE status flag (access or other)""" return StorageElement(se, vo=self.voName).status().get(status, False) - def getReplicas(self, lfns, allStatus=True, getUrl=True, diskOnly=False, preferDisk=False, active=False): + def getReplicas( + self, lfns, allStatus=True, getUrl=True, diskOnly=False, preferDisk=False, active=False, protocol=None + ): """get replicas from catalogue and filter if requested Warning: all filters are independent, hence active and preferDisk should be set if using forJobs """ catalogReplicas = {} failed = {} + if not protocol: + protocol = self.registrationProtocol + for lfnChunk in breakListIntoChunks(lfns, 1000): res = self.fileCatalog.getReplicas(lfnChunk, allStatus=allStatus) if res["OK"]: @@ -1692,9 +1703,7 @@ def getReplicas(self, lfns, allStatus=True, getUrl=True, diskOnly=False, preferD for se in se_lfn: seObj = StorageElement(se, vo=self.voName) - succPfn = ( - seObj.getURL(se_lfn[se], protocol=self.registrationProtocol).get("Value", {}).get("Successful", {}) - ) + succPfn = seObj.getURL(se_lfn[se], protocol=protocol).get("Value", {}).get("Successful", {}) for lfn in succPfn: catalogReplicas[lfn][se] = succPfn[lfn] @@ -1705,10 +1714,10 @@ def getReplicas(self, lfns, allStatus=True, getUrl=True, diskOnly=False, preferD self.__filterTapeReplicas(result, diskOnly=diskOnly) return S_OK(result) - def getReplicasForJobs(self, lfns, allStatus=False, getUrl=True, diskOnly=False): + def getReplicasForJobs(self, lfns, allStatus=False, getUrl=True, diskOnly=False, protocol=None): """get replicas useful for jobs""" # Call getReplicas with no filter and enforce filters in this method - result = self.getReplicas(lfns, allStatus=allStatus, getUrl=getUrl) + result = self.getReplicas(lfns, allStatus=allStatus, getUrl=getUrl, protocol=protocol) if not result["OK"]: return result replicaDict = result["Value"]