Skip to content

Commit

Permalink
update get_das_info to include empty and broken files
Browse files Browse the repository at this point in the history
  • Loading branch information
mafrahm committed Aug 14, 2024
1 parent c88e17b commit d0c6dd2
Showing 1 changed file with 76 additions and 18 deletions.
94 changes: 76 additions & 18 deletions scripts/get_das_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ def convert_default(data: dict, placeholder="PLACEHOLDER") -> str:
keys=[
"{data['name']}", # noqa
],
aux={{
"broken_files": {data['broken_files']},
"empty_files": {data['empty_files']},
}}
n_files={data['nfiles']},
n_events={data['nevents']},
)
Expand Down Expand Up @@ -180,13 +184,11 @@ def convert_minimal(data: dict) -> str:
}


def get_das_info(
dataset: str,
) -> dict:
def load_das_info(dataset: str, add_file_info: bool = False) -> dict:
from law.util import interruptable_popen

# call dasgoclient command
cmd = f"dasgoclient -query='dataset={dataset}' -json"
cmd = f"dasgoclient -query='{'file ' if add_file_info else ''}dataset={dataset}' -json"
code, out, _ = interruptable_popen(
cmd,
shell=True,
Expand All @@ -196,9 +198,24 @@ def get_das_info(
if code != 0:
raise Exception(f"dasgoclient query failed:\n{out}")
infos = json.loads(out)

return infos


def get_das_info(
dataset: str,
add_file_info: bool = False,
) -> dict:
infos = load_das_info(dataset, add_file_info=False)

info_of_interest = {"name": dataset}
for info in infos:
dataset_info = info["dataset"][0]
if "files_via_dataset" in info["das"]["services"][0]:
print("should not be called")
empty_files = list(filter(lambda x: x["file"][0]["nevents"] == 0, info))
broken_files = list(filter(lambda x: x["file"][0]["is_file_valid"] == 0, info))

# Get json format of single das_string gives multiple dictornaries with different info
# Avoid to print multiple infos twice and ask specificly for the kew of interest
if "dataset_info" in info["das"]["services"][0]:
Expand All @@ -207,6 +224,58 @@ def get_das_info(
info_of_interest["nfiles"] = dataset_info.get("nfiles", "")
info_of_interest["nevents"] = dataset_info.get("nevents", "")

if add_file_info:
file_infos = load_das_info(dataset, add_file_info=True)

empty_files = [
info["file"][0]["name"]
for info in filter(lambda info: info["file"][0]["nevents"] == 0, file_infos)
]
broken_files = [
info["file"][0]["name"]
for info in filter(lambda info: info["file"][0]["is_file_valid"] == 0, file_infos)
]
info_of_interest["empty_files"] = empty_files
info_of_interest["broken_files"] = broken_files
else:
info_of_interest["empty_files"] = "UNKNOWN"
info_of_interest["broken_files"] = "UNKNOWN"

return info_of_interest


def new_get_das_info(dataset: str) -> dict:
info_of_interest = {"name": dataset}

file_infos = load_das_info(dataset, add_file_info=True)

info_of_interest["dataset_id"] = file_infos[0]["file"][0]["dataset_id"]

empty_files_filter = lambda info: info["file"][0]["nevents"] == 0
broken_files_filter = lambda info: info["file"][0]["is_file_valid"] == 0

good_files = list(filter(lambda x: not broken_files_filter(x) and not empty_files_filter(x), file_infos))

dataset_id = {info["file"][0]["dataset_id"] for info in good_files}
if len(dataset_id) == 1:
info_of_interest["dataset_id"] = dataset_id.pop()
else:
raise ValueError(f"Multiple dataset IDs ({dataset_id}) found for dataset {dataset}")

info_of_interest["nfiles"] = len(good_files)
info_of_interest["nevents"] = sum(info["file"][0]["nevents"] for info in good_files)

empty_files = [
info["file"][0]["name"]
for info in filter(empty_files_filter, file_infos)
]
broken_files = [
info["file"][0]["name"]
for info in filter(broken_files_filter, file_infos)
]
info_of_interest["empty_files"] = empty_files
info_of_interest["broken_files"] = broken_files

return info_of_interest


Expand All @@ -215,16 +284,14 @@ def print_das_info(
keys_of_interest: tuple | None = None,
convert_function_str: str | None = None,
):
from law.util import interruptable_popen

# get the requested convert function
convert_function = convert_functions[convert_function_str]

for das_string in das_strings:
# set default keys of interest
# NOTE: this attribute is currently not used
keys_of_interest = keys_of_interest or (
"name", "dataset_id", "nfiles", "nevents",
"name", "dataset_id", "nfiles", "nevents", "empty_files", "broken_files",
)

wildcard = "*" in das_string
Expand All @@ -234,22 +301,13 @@ def print_das_info(
datasets.append(das_string)
else:
# using a wildcard leads to a different structer in json format
cmd = f"dasgoclient -query='dataset={das_string}' -json"
code, out, _ = interruptable_popen(
cmd,
shell=True,
stdout=subprocess.PIPE,
executable="/bin/bash",
)
if code != 0:
raise Exception(f"dasgoclient query failed:\n{out}")
infos = json.loads(out)
infos = load_das_info(das_string, add_file_info=False)
for info in infos:
dataset_name = info.get("dataset", [])[0].get("name", "")
datasets.append(dataset_name)

for dataset in datasets:
info_of_interest = get_das_info(dataset)
info_of_interest = new_get_das_info(dataset)
desired_output = convert_function(info_of_interest)
print(desired_output)

Expand Down

0 comments on commit d0c6dd2

Please sign in to comment.