Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a get_lumisection_ranges_by_component method for getting per-component, per-ls information for offline datasets #9

Merged
merged 9 commits into from
Aug 9, 2024
4 changes: 2 additions & 2 deletions .github/workflows/test_package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.9", "3.10", "3.11"]
python-version: ["3.9", "3.10", "3.11", "3.12"]

steps:
- uses: actions/checkout@v4
Expand All @@ -34,4 +34,4 @@ jobs:
SSO_CLIENT_SECRET: ${{ secrets.SSO_CLIENT_SECRET }}
ENVIRONMENT: ${{ vars.ENVIRONMENT }}
run: |
pytest tests -s --retries 4
pytest tests --retries 2
162 changes: 86 additions & 76 deletions runregistry/runregistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import time
import json
import requests
import warnings
from dotenv import load_dotenv
from cernrequests import get_api_token, get_with_token
from runregistry.utils import (
Expand All @@ -25,6 +26,16 @@
# Offline table
WAITING_DQM_GUI_CONSTANT = "waiting dqm gui"

# Valid Lumisection statuses
LUMISECTION_STATES = ["GOOD", "BAD", "STANDBY", "EXCLUDED", "NOTSET"]

ONLINE_RUN_STATES = ["SIGNOFF", "OPEN", "COMPLETED"]

OFFLINE_DATASET_STATES = ["SIGNOFF", "OPEN", "COMPLETED", WAITING_DQM_GUI_CONSTANT]

# Time to sleep between JSON creation checks
JSON_CREATION_SLEEP_TIME = 15

staging_cert = ""
staging_key = ""
api_url = ""
Expand Down Expand Up @@ -70,14 +81,12 @@ def _get_headers(token: str = ""):


def _get_token():
# if not use_cookies:
# return {"dummy": "yammy"}
"""
Gets the token required to query RR API through the CERN SSO.
:return: the token required to query Run Registry API. In particular 'connect.sid' is the one we are interested in
:return: the token required to query Run Registry API.
"""
# if os.getenv("ENVIRONMENT") == "development":
# return None
if os.getenv("ENVIRONMENT") == "local":
return ""
token, expiration_date = get_api_token(
client_id=client_id,
client_secret=client_secret,
Expand All @@ -100,7 +109,7 @@ def _get_page(
query_filter = transform_to_rr_run_filter(run_filter=query_filter)
elif data_type == "datasets" and not ignore_filter_transformation:
query_filter = transform_to_rr_dataset_filter(dataset_filter=query_filter)
if os.getenv("ENVIRONMENT") == "development":
if os.getenv("ENVIRONMENT") in ["development", "local"]:
print(url)
print(query_filter)
payload = json.dumps(
Expand Down Expand Up @@ -130,8 +139,12 @@ def get_run(run_number, **kwargs):
:param run_number: run_number of specified run
"""
run = get_runs(filter={"run_number": run_number}, **kwargs)
if len(run) != 1:
return None
if not run:
return {}
if len(run) > 1:
raise Exception(
f"Unexpected number of results returned for run {run_number} ({len(run)}), was expecting exactly 1"
)
return run[0]


Expand Down Expand Up @@ -160,10 +173,9 @@ def get_runs(limit=40000, compress_attributes=True, **kwargs):
"WARNING: fetching more than 10,000 runs from run registry. you probably want to pass a filter into get_runs, or else this will take a while."
)
if resource_count > 20000 and "filter" not in kwargs:
print(
raise Exception(
"ERROR: For run registry queries that retrieve more than 20,000 runs, you must pass a filter into get_runs, an empty filter get_runs(filter={}) works"
)
return None
for page_number in range(1, page_count):
additional_runs = _get_page(
page=page_number, url=url, data_type="runs", **kwargs
Expand Down Expand Up @@ -199,8 +211,12 @@ def get_dataset(run_number, dataset_name="online", **kwargs):
dataset = get_datasets(
filter={"run_number": run_number, "dataset_name": dataset_name}, **kwargs
)
if len(dataset) != 1:
return None
if not dataset:
return {}
if len(dataset) > 1:
raise Exception(
f"Unexpected number of results returned for dataset {dataset_name} of run {run_number} ({len(dataset)}), was expecting exactly 1"
)
return dataset[0]


Expand Down Expand Up @@ -228,10 +244,9 @@ def get_datasets(limit=40000, compress_attributes=True, **kwargs) -> list:
"WARNING: fetching more than 10,000 datasets. you probably want to pass a filter into get_datasets, or else this will take a while."
)
if resource_count > 20000 and "filter" not in kwargs:
print(
raise Exception(
"ERROR: For queries that retrieve more than 20,000 datasets, you must pass a filter into get_datasets, an empty filter get_datasets(filter={}) works"
)
return []
for page_number in range(1, page_count):
additional_datasets = _get_page(
page=page_number, url=url, data_type="datasets", **kwargs
Expand Down Expand Up @@ -259,14 +274,14 @@ def get_datasets(limit=40000, compress_attributes=True, **kwargs) -> list:
def get_cycles():
url = "{}/cycles/global".format(api_url)
headers = _get_headers(token=_get_token())
if os.getenv("ENVIRONMENT") == "development":
if os.getenv("ENVIRONMENT") in ["development", "local"]:
print(url)
return requests.get(url, headers=headers).json()


def _get_lumisection_helper(url, run_number, dataset_name="online", **kwargs):
"""
Puts the headers for all other lumisection methods
Puts the headers and POST data for all other lumisection methods
"""

headers = _get_headers(token=_get_token())
Expand All @@ -292,12 +307,28 @@ def get_oms_lumisections(run_number, dataset_name="online", **kwargs):

def get_lumisection_ranges(run_number, dataset_name="online", **kwargs):
"""
Gets the lumisection ranges of the specified dataset
Gets the lumisection ranges of the specified dataset. Returns
a list of dicts, each one containing a lumisection "range", dictated
by the 'start' and 'stop' keys of the dict. In the same dict,
the status, cause, and comments per component are found.
"""
url = "{}/lumisections/rr_lumisection_ranges".format(api_url)
return _get_lumisection_helper(url, run_number, dataset_name, **kwargs)


def get_lumisection_ranges_by_component(run_number, dataset_name="online", **kwargs):
"""
Gets the lumisection ranges of the specified dataset as a dict,
where the components are the keys (e.g. 'rpc-rpc'). Each dict value is
a list of lumisection "ranges" (dicts) for the specific component. The exact
range is dictated by the 'start' and 'stop' keys of the nested dict.

Similar to get_lumisection_ranges, but organized by component.
"""
url = "{}/lumisections/rr_lumisection_ranges_by_component".format(api_url)
return _get_lumisection_helper(url, run_number, dataset_name, **kwargs)


def get_oms_lumisection_ranges(run_number, **kwargs):
"""
Gets the OMS lumisection ranges of the specified dataset (saved in RR database)
Expand All @@ -320,6 +351,10 @@ def generate_json(json_logic, **kwargs):
DO NOT USE, USE THE ONE BELOW (create_json)...
It receives a json logic configuration and returns a json with lumisections which pass the filter
"""
warnings.warn(
"The generate_json is unsafe and will be deprecated. Please use create_json instead",
PendingDeprecationWarning,
)
if not isinstance(json_logic, str):
json_logic = json.dumps(json_logic)
url = "{}/json_creation/generate".format(api_url)
Expand Down Expand Up @@ -357,21 +392,18 @@ def create_json(json_logic, dataset_name_filter, **kwargs):
response = requests.post(url, headers=headers, data=payload)
if response.status_code == 200:
return response.json()["final_json"]
elif response.status_code == 202:
# stil processing
print(f"progress creating json: {response.json()['progress']}")
time.sleep(JSON_CREATION_SLEEP_TIME)
elif response.status_code == 203:
# stil processing
print("json process is submited and pending, please wait...")
time.sleep(JSON_CREATION_SLEEP_TIME)
else:
if response.status_code == 202:
# stil processing
print("progress creating json: ", response.json()["progress"])
time.sleep(15)
elif response.status_code == 203:
# stil processing
print("json process is submited and pending, please wait...")
time.sleep(15)
elif response.status_code == 500:
print("Error creating json")
return
else:
print("error generating json")
return
raise Exception(
f"Error {response.status_code} during JSON creation: {response.text}"
)


# advanced RR operations ==============================================================================
Expand All @@ -381,21 +413,13 @@ def move_runs(from_, to_, run=None, runs=[], **kwargs):
move run/runs from one state to another
"""
if not run and not runs:
print("move_runs(): no 'run' and 'runs' arguments were provided")
return
raise ValueError("move_runs(): no 'run' and 'runs' arguments were provided")

states = ["SIGNOFF", "OPEN", "COMPLETED"]
if from_ not in states or to_ not in states:
print(
"move_runs(): get states '",
from_,
"' , '",
to_,
"', while allowed states are ",
states,
", return",
if from_ not in ONLINE_RUN_STATES or to_ not in ONLINE_RUN_STATES:
raise ValueError(
f"move_runs(): got states '{from_}, '{to_}'",
f" but allowed states are {ONLINE_RUN_STATES}",
)
return

url = "%s/runs/move_run/%s/%s" % (api_url, from_, to_)

Expand All @@ -408,7 +432,7 @@ def move_runs(from_, to_, run=None, runs=[], **kwargs):
answers = []
for run_number in runs:
payload = json.dumps({"run_number": run_number})
answer = requests.post(url, headers=headers, data=payload).json()
answer = requests.post(url, headers=headers, data=payload)
answers.append(answer)

return answers
Expand All @@ -419,8 +443,9 @@ def make_significant_runs(run=None, runs=[], **kwargs):
mark run/runs significant
"""
if not run and not runs:
print("make_significant_runs(): no 'run' and 'runs' arguments were provided")
return
raise ValueError(
"make_significant_runs(): no 'run' and 'runs' arguments were provided"
)

url = "%s/runs/mark_significant" % (api_url)

Expand All @@ -445,10 +470,9 @@ def reset_RR_attributes_and_refresh_runs(runs=[], **kwargs):
"""
runs = __parse_runs_arg(runs)
if not runs:
print(
"reset_RR_attributes_and_refresh_runs(): no 'runs' arguments were provided"
raise ValueError(
"reset_RR_attributes_and_refresh_runs(): no 'runs' argument was provided"
)
return
headers = _get_headers(token=_get_token())
answers = []
for run_number in runs:
Expand All @@ -467,10 +491,9 @@ def manually_refresh_components_statuses_for_runs(runs=[], **kwargs):
runs = __parse_runs_arg(runs)

if not runs:
print(
"manually_refresh_components_statuses_for_runs(): no 'runs' arguments were provided, return"
raise ValueError(
"manually_refresh_components_statuses_for_runs(): no 'runs' argument was provided"
)
return

headers = _get_headers(token=_get_token())
answers = []
Expand All @@ -496,16 +519,11 @@ def edit_rr_lumisections(
"""
WIP edit RR lumisections attributes
"""
states = ["GOOD", "BAD", "STANDBY", "EXCLUDED", "NONSET"]
if status not in states:
print(
"edit_rr_lumisections(): get status '",
status,
"', while allowed statuses are ",
states,
", return",
if status not in LUMISECTION_STATES:
raise Exception(
f"edit_rr_lumisections(): got status '{status}'",
f" but allowed statuses are {LUMISECTION_STATES}",
)
return

url = "%s/lumisections/edit_rr_lumisections" % (api_url)

Expand Down Expand Up @@ -535,21 +553,13 @@ def move_datasets(
Requires a privileged token.
"""
if not run and not runs:
print("move_datasets(): no 'run' and 'runs' arguments were provided, return")
return
raise ValueError("move_datasets(): no 'run' and 'runs' arguments were provided")

states = ["SIGNOFF", "OPEN", "COMPLETED", WAITING_DQM_GUI_CONSTANT]
if from_ not in states or to_ not in states:
print(
"move_datasets(): get states '",
from_,
"' , '",
to_,
"', while allowed states are ",
states,
", return",
if from_ not in OFFLINE_DATASET_STATES or to_ not in OFFLINE_DATASET_STATES:
raise ValueError(
f"move_datasets(): got states '{from_}', '{to_}",
f" but allowed states are {OFFLINE_DATASET_STATES}",
)
return

url = "%s/datasets/%s/move_dataset/%s/%s" % (api_url, workspace, from_, to_)

Expand All @@ -559,7 +569,7 @@ def move_datasets(
payload = json.dumps(
{"run_number": run, "dataset_name": dataset_name, "workspace": workspace}
)
return requests.post(url, headers=headers, data=payload)
return [requests.post(url, headers=headers, data=payload)]

answers = []
for run_number in runs:
Expand Down
Loading