-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathcheckDatasetExistance.py
111 lines (96 loc) · 3.34 KB
/
checkDatasetExistance.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
from enum import Enum
import json
import os
import sys
import yaml
if __name__ == "__main__":
file_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.dirname(file_dir))
__package__ = 'RunKit'
from .run_tools import ps_call
from .grid_tools import run_dasgoclient
class Status(Enum):
OK = 0
MISSING = 1
MULTIPLE_INFO = 2
INCONSISTENT_STATUS = 3
NOT_VALID = 4
QUERY_FAILED = 5
class DasInerface:
def __init__(self, cache_file=None):
self.cache_file = cache_file
if cache_file is not None and os.path.isfile(cache_file):
with open(cache_file, 'r') as f:
self.cache = json.load(f)
else:
self.cache = {}
def get_status(self, dataset):
if dataset in self.cache:
return Status[self.cache[dataset]]
status = self.query_status(dataset)
self.cache[dataset] = status.name
if self.cache_file is not None:
with open(self.cache_file, 'w') as f:
json.dump(self.cache, f, indent=2)
return status
def query_status(self, dataset):
try:
query = f'dataset dataset={dataset}'
inputDBS= 'prod/phys03' if dataset.endswith("USER") else 'global'
entries = run_dasgoclient(query, inputDBS=inputDBS, json_output=True)
ds_infos = []
for entry in entries:
if "dbs3:dataset_info" in entry['das']['services']:
ds_infos.append(entry['dataset'])
if len(ds_infos) == 0:
return Status.MISSING
status = None
for ds_info in ds_infos:
if len(ds_info) != 1:
return Status.MULTIPLE_INFO
if status is not None and status != ds_info[0]['status']:
return Status.INCONSISTENT_STATUS
status = ds_info[0]['status']
if status != "VALID":
return Status.NOT_VALID
except:
return Status.QUERY_FAILED
return Status.OK
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Check consistency of tasks configurations for crabOverseer.')
parser.add_argument('--cache', type=str, required=False, default=None, help='File with cached results.')
parser.add_argument('task_file', type=str, nargs='+', help="file(s) with task descriptions")
args = parser.parse_args()
datasets = []
all_ok = True
for entry in args.task_file:
if entry.endswith('.yaml'):
with open(entry, 'r') as f:
cfg = yaml.safe_load(f)
for task_name, task_desc in cfg.items():
if task_name == 'config': continue
customTask = type(task_desc) == dict
if customTask:
if 'inputDataset' not in task_desc:
print(f'ERROR: "{entry}" task "{task_name}" does not have "inputDataset" field.')
inputDataset = None
all_ok = False
else:
inputDataset = task_desc['inputDataset']
else:
inputDataset = task_desc
if inputDataset is None:
raise ValueError(f'ERROR: "{entry}" task "{task_name}" does not have "inputDataset" field.')
datasets.append(inputDataset)
else:
datasets.append(entry)
print(f'Checking {len(datasets)} datasets from {len(args.task_file)} config files.')
das_interface = DasInerface(args.cache)
for dataset in datasets:
status = das_interface.get_status(dataset)
if status == Status.OK: continue
all_ok = False
print(f'{status}: {dataset}')
if all_ok:
print("All datasets exist.")