Skip to content

Commit

Permalink
Push changes for quota stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
nf679 committed Sep 17, 2024
1 parent 8f96f13 commit a3fda9b
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 3 deletions.
7 changes: 6 additions & 1 deletion nlds/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from .nlds_setup import API_VERSION

from .routers import list, files, probe, status, find, meta, system, init
from .routers import list, files, probe, status, find, meta, system, init, quota

nlds = FastAPI()

Expand Down Expand Up @@ -56,4 +56,9 @@
init.router,
tags = ["init", ],
prefix = PREFIX + "/init"
)
nlds.include_router(
quota.router,
tags = ["quota", ],
prefix = PREFIX + "/quota"
)
1 change: 1 addition & 0 deletions nlds/rabbit/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class RabbitMQPublisher():
RK_STAT = "stat"
RK_FIND = "find"
RK_META = "meta"
RK_QUOTA = "quota"

# Exchange routing key parts – root
RK_ROOT = "nlds-api"
Expand Down
62 changes: 62 additions & 0 deletions nlds/routers/quota.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""
"""

from fastapi import Depends, APIRouter, status
from fastapi.exceptions import HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
import json

from typing import Optional, List, Dict

from ..rabbit.publisher import RabbitMQPublisher as RMQP
from ..errors import ResponseError
from ..authenticators.authenticate_methods import authenticate_token, \
authenticate_group, \
authenticate_user

router = APIRouter()

class QuotaResponse(BaseModel):
quota: int

############################ GET METHOD ############################
@router.get("/",
status_code = status.HTTP_202_ACCEPTED,
responses = {
status.HTTPS_202_ACCEPTED: {"model": QuotaResponse},
status.HTTP_400_BAD_REQUEST: {"model": ResponseError},
status.HTTP_401_UNAUTHORIZED: {"model": ResponseError},
status.HTTP_403_FORBIDDEN: {"model": ResponseError},
status.HTTP_404_NOT_FOUND: {"model": ResponseError},
status.HTTP_504_GATEWAY: {"model": ResponseError},
}
)
async def get(token: str = Depends(authenticate_token),
user: str = Depends(authenticate_user),
group: str = Depends(authenticate_group),
label: Optional[str] = None,
holding_id: Optional[int] = None,
tag: Optional[str] = None
):
# create the message dictionary

routing_key = f"{RMQP.RK_ROOT}.{RMQP.RK_ROUTE}.{RMQP.RK_QUOTA}"
api_action = f"{RMQP.RK_QUOTA}"
msg_dict = {
RMQP.MSG_DETAILS: {
RMQP.MSG_USER: user,
RMQP.MSG_GROUP: group,
},
RMQP.MSG_DATA: {},
RMQP.MSG_TYPE: RMQP.MSG_TYPE_STANDARD
}
# add the metadata
meta_dict = {}
if (label):
meta_dict[RMQP.MSG_LABEL] = label
if (holding_id):
meta_dict[RMQP.MSG_HOLDING_ID] = holding_id
if (tag):
# convert the string into a dictionary
81 changes: 80 additions & 1 deletion nlds_processors/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
from sqlalchemy import func, Enum
from sqlalchemy.exc import IntegrityError, OperationalError, ArgumentError, \
NoResultFound
from retry import retry
import requests
import json
from nlds.server_config import load_config
from nlds.utils.construct_url import construct_url

from nlds_processors.catalog.catalog_models import CatalogBase, File, Holding,\
Location, Transaction, Aggregation, Storage, Tag
Expand Down Expand Up @@ -874,4 +879,78 @@ def get_unarchived_files(self, holding: Holding) -> List[File]:
f"Couldn't find unarchived files for holding with "
f"id:{holding.id}"
)
return unarchived_files
return unarchived_files

@retry(requests.ConnectTimeout, tries=5, delay=1, backoff=2)
def get_projects_services(self, oauth_token: str, service_name):
"""Make a call to the JASMIN Projects Portal to get the service information."""
self.config = load_config()
self.name = "jasmin_authenticator"
self.auth_name = "authentication"
self._timeout = 10.0

config = self.config[self.auth_name][self.name]
token_headers = {
"Content-Type": "application/x-ww-form-urlencoded",
"cache-control": "no-cache",
"Authorization": f"Bearer {oauth_token}",
}
# Contact the user_services_url to get the information about the services
url = construct_url([config["user_services_urk"]], {"name": {service_name}})
try:
response = requests.get(
url,
headers=token_headers,
timeout=self._timeout,
)
except requests.exceptions.ConnectionError:
raise RuntimeError(f"User services url {url} could not be reached.")
except KeyError:
raise RuntimeError(f"Could not find 'user_services_url' key in the {self.name} section of the .server_config file.")
if response.status_code == requests.codes.ok: # status code 200
try:
response_json = json.loads(response.text)
return response_json
except json.JSONDecodeError:
raise RuntimeError(f"Invalid JSON returned from the user services url: {url}")
else:
raise RuntimeError(f"Error getting data for {service_name}")

def extract_tape_quota(self, oauth_token: str, service_name):
"""Get the service information then process it to extract the quota for the service."""
try:
result = self.get_projects_services(self, oauth_token, service_name)
except (RuntimeError, ValueError) as e:
raise type(e)(f"Error getting information for {service_name}: {e}")

# Process the result to get the requirements
for attr in result:
# Check that the category is Group Workspace
if attr["category"] == 1:
# Check that there are requirements, otherwise throw an error
if attr["requirements"]:
requirements = attr["requirements"]
else:
raise ValueError(f"Cannot find any requirements for {service_name}.")
else:
raise ValueError(f"Cannot find a Group Workspace with the name {service_name}. Check the category.")

# Go through the requirements to find the tape resource requirement
for requirement in requirements:
# Only return provisioned requirements
if requirement["status"] == 50:
# Find the tape resource and get its quota
if requirement["resource"]["short_name"] == "tape":
try:
tape_quota = requirement["amount"]
if tape_quota:
return tape_quota
else:
raise ValueError(f"Issue getting tape quota for {service_name}. Quota is zero.")
except KeyError:
raise KeyError(f"Issue getting tape quota for {service_name}. No 'value' field exists.")
else:
raise ValueError(f"No tape resources could be found for {service_name}")
else:
raise ValueError(f"No provisioned requirements found for {service_name}.Check the status of your requested resources.")

22 changes: 21 additions & 1 deletion nlds_processors/catalog/catalog_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1850,7 +1850,23 @@ def _catalog_meta(self, body: Dict, properties: Header) -> None:
msg_dict=body,
exchange={'name': ''},
correlation_id=properties.correlation_id
)
)

def _catalog_quota(self, body: Dict, properties: Header) -> None:
"""Return the users quota for the given service."""
message_vars = self._parse_user_vars(body)
if message_vars is None:
# Check if any problems have occured in the parsing of the message
# body and exit if necessary
self.log("Could not parse one or more mandatory variables, exiting"
"callback", self.RK_LOG_ERROR)
return
else:
# Unpack if no problems found in parsing
user, group = message_vars

try:
group_quota =


def attach_database(self, create_db_fl: bool = True):
Expand Down Expand Up @@ -2024,6 +2040,10 @@ def callback(self, ch: Channel, method: Method, properties: Header,
elif (api_method == self.RK_STAT):
self._catalog_stat(body, properties)

elif (api_method == self.RK_QUOTA):
# don't need to split any routing key for an RPC method
self._catalog_quota(body, properties)

# If received system test message, reply to it (this is for system status check)
elif api_method == "system_stat":
if properties.correlation_id is not None and properties.correlation_id != self.channel.consumer_tags[0]:
Expand Down

0 comments on commit a3fda9b

Please sign in to comment.