Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[198299] RDI Access change for retrieving data from Kobo #3871

Merged
merged 9 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 68 additions & 104 deletions backend/hct_mis_api/apps/core/kobo/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,25 @@
import time
import typing
from io import BytesIO
from typing import Any, Dict, List, Optional, Tuple
from typing import Dict, List, Optional, Tuple
from urllib.parse import urlparse

from django.conf import settings

import requests
from constance import config
from requests import Response
from requests.adapters import HTTPAdapter
from requests.exceptions import RetryError
from requests.packages.urllib3.util.retry import Retry

from hct_mis_api.apps.core.kobo.common import filter_by_owner
from hct_mis_api.apps.core.models import BusinessArea, XLSXKoboTemplate
from hct_mis_api.apps.core.models import XLSXKoboTemplate
from hct_mis_api.apps.utils.exceptions import log_and_raise

logger = logging.getLogger(__name__)


class TokenNotProvided(Exception):
pass


class TokenInvalid(Exception):
class CountryCodeNotProvided(Exception):
pass


Expand All @@ -32,100 +29,69 @@ class KoboRequestsSession(requests.Session):

def should_strip_auth(self, old_url: str, new_url: str) -> bool:
new_parsed = urlparse(new_url)
if new_parsed.hostname in KoboRequestsSession.AUTH_DOMAINS:
if new_parsed.hostname in KoboRequestsSession.AUTH_DOMAINS: # pragma: no cover
return False
return super().should_strip_auth(old_url, new_url) # type: ignore # FIXME: Call to untyped function "should_strip_auth" in typed context


class KoboAPI:
def __init__(self, business_area_slug: Optional[str] = None):
if business_area_slug is not None:
self.business_area = BusinessArea.objects.get(slug=business_area_slug)
self.KPI_URL = self.business_area.kobo_url or settings.KOBO_KF_URL
else:
self.business_area = None
self.KPI_URL = settings.KOBO_KF_URL
LIMIT = 30_000
FORMAT = "json"

self._get_token()
def __init__(self, kpi_url: Optional[str] = None, token: Optional[str] = None) -> None:
self._kpi_url = kpi_url or settings.KOBO_KF_URL
self._token = token or settings.KOBO_MASTER_API_TOKEN

def _handle_paginated_results(self, url: str) -> List[Dict]:
self._client = KoboRequestsSession()
self._set_token()

def _set_token(self) -> None:
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504], allowed_methods=False)
self._client.mount(self._kpi_url, HTTPAdapter(max_retries=retries))
self._client.headers.update({"Authorization": f"token {self._token}"})

def _get_paginated_request(self, url: str) -> List[Dict]:
next_url = url
results: List = []

# if there will be more than 30000 results,
# we need to make additional queries
while next_url:
data = self._handle_request(next_url)
response = self._get_request(next_url)
data = response.json()
next_url = data["next"]
results.extend(data["results"])
return results

def _get_url(
self,
endpoint: str,
append_api: bool = True,
add_limit: bool = True,
additional_query_params: Optional[Any] = None,
) -> str:
endpoint.strip("/")
if endpoint != "token" and append_api is True:
endpoint = f"api/v2/{endpoint}"
# According to the Kobo API documentation,
# the maximum limit per page is 30000
query_params = f"format=json{'&limit=30000' if add_limit else ''}"
if additional_query_params is not None:
query_params += f"&{additional_query_params}"
return f"{self.KPI_URL}/{endpoint}?{query_params}"

def _get_token(self) -> None:
self._client = KoboRequestsSession()
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504], allowed_methods=False)
self._client.mount(self.KPI_URL, HTTPAdapter(max_retries=retries))

if self.business_area is None:
token = settings.KOBO_MASTER_API_TOKEN
else:
token = self.business_area.kobo_token

if not token:
msg = f"KOBO Token is not set for business area {self.business_area}"
logger.error(msg)
raise TokenNotProvided(msg)

self._client.headers.update({"Authorization": f"token {token}"})

def _handle_request(self, url: str) -> Dict:
def _get_request(self, url: str) -> Response:
response = self._client.get(url=url)
try:
response.raise_for_status()
except requests.exceptions.HTTPError as e:
except requests.exceptions.HTTPError as e: # pragma: no cover
logger.exception(e)
raise
return response.json()
return response

def _post_request(
self, url: str, data: Optional[Dict] = None, files: Optional[typing.IO] = None
) -> requests.Response:
) -> Response: # pragma: no cover
return self._client.post(url=url, data=data, files=files)

def _patch_request(
self, url: str, data: Optional[Dict] = None, files: Optional[typing.IO] = None
) -> requests.Response:
return self._client.patch(url=url, data=data, files=files)

def create_template_from_file(
self, bytes_io_file: Optional[typing.IO], xlsx_kobo_template_object: XLSXKoboTemplate, template_id: str = ""
) -> Optional[Tuple[Dict, str]]:
data = {
"name": "Untitled",
"asset_type": "template",
"description": "",
"sector": "",
"country": "",
"share-metadata": False,
}
self, bytes_io_file: typing.IO, xlsx_kobo_template_object: XLSXKoboTemplate, template_id: str = ""
) -> Optional[Tuple[Dict, str]]: # pragma: no cover
# TODO: not sure if this actually works
if not template_id:
asset_response = self._post_request(url=self._get_url("assets/", add_limit=False), data=data)
data = {
"name": "Untitled",
"asset_type": "template",
"description": "",
"sector": "",
"country": "",
"share-metadata": False,
}
endpoint = "api/v2/assets"
query_params = f"format={self.FORMAT}"
url = f"{self._kpi_url}/{endpoint}?{query_params}"
asset_response = self._post_request(url=url, data=data)
try:
asset_response.raise_for_status()
except requests.exceptions.HTTPError as e:
Expand All @@ -135,12 +101,13 @@ def create_template_from_file(
asset_uid = asset_response_dict.get("uid")
else:
asset_uid = template_id

file_import_data = {
"assetUid": asset_uid,
"destination": self._get_url(f"assets/{asset_uid}/", append_api=False, add_limit=False),
"destination": f"{self._kpi_url}/assets/{asset_uid}?format={self.FORMAT}",
}
file_import_response = self._post_request(
url=self._get_url("imports/", append_api=False, add_limit=False),
url=f"{self._kpi_url}/imports?format={self.FORMAT}",
data=file_import_data,
files={"file": bytes_io_file}, # type: ignore # FIXME
)
Expand All @@ -149,7 +116,8 @@ def create_template_from_file(

attempts = 5
while attempts >= 0:
response_dict = self._handle_request(url)
response = self._get_request(url)
response_dict = response.json()
import_status = response_dict.get("status")
if import_status == "processing":
xlsx_kobo_template_object.status = XLSXKoboTemplate.PROCESSING
Expand All @@ -162,36 +130,32 @@ def create_template_from_file(
log_and_raise("Fetching import data took too long", error_type=RetryError)
return None

def get_all_projects_data(self) -> List:
if not self.business_area:
logger.error("Business area is not provided")
raise ValueError("Business area is not provided")
projects_url = self._get_url("assets")

results = self._handle_paginated_results(projects_url)
return filter_by_owner(results, self.business_area)
def get_all_projects_data(self, country_code: str) -> List:
if not country_code:
raise CountryCodeNotProvided("No country code provided")
endpoint = "api/v2/assets"
query_params = f"format={self.FORMAT}&limit={self.LIMIT}"
if config.KOBO_ENABLE_SINGLE_USER_ACCESS:
query_params += f"&q=settings__country_codes__icontains:{country_code.upper()}"
url = f"{self._kpi_url}/{endpoint}?{query_params}"
return self._get_paginated_request(url)

def get_single_project_data(self, uid: str) -> Dict:
projects_url = self._get_url(f"assets/{uid}")

return self._handle_request(projects_url)
endpoint = f"api/v2/assets/{uid}"
query_params = f"format={self.FORMAT}&limit={self.LIMIT}"
url = f"{self._kpi_url}/{endpoint}?{query_params}"
response = self._get_request(url)
return response.json()

def get_project_submissions(self, uid: str, only_active_submissions: bool) -> List:
additional_query_params = None
def get_project_submissions(self, uid: str, only_active_submissions: bool) -> List[Dict]:
endpoint = f"api/v2/assets/{uid}/data"
query_params = f"format={self.FORMAT}&limit={self.LIMIT}"
if only_active_submissions:
additional_query_params = 'query={"_validation_status.uid":"validation_status_approved"}'
submissions_url = self._get_url(
f"assets/{uid}/data",
additional_query_params=additional_query_params,
)

return self._handle_paginated_results(submissions_url)
query_params += f"&{additional_query_params}"
url = f"{self._kpi_url}/{endpoint}?{query_params}"
return self._get_paginated_request(url)

def get_attached_file(self, url: str) -> BytesIO:
response = self._client.get(url=url)
try:
response.raise_for_status()
except requests.exceptions.HTTPError as e:
logger.exception(e)
raise
def get_attached_file(self, url: str) -> BytesIO: # pragma: no cover
response = self._get_request(url)
return BytesIO(response.content)
3 changes: 1 addition & 2 deletions backend/hct_mis_api/apps/core/kobo/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ def count_population(results: list, business_area: BusinessArea) -> tuple[int, i
return total_households_count, total_individuals_count


def filter_by_owner(data: List, business_area: BusinessArea) -> List:
kobo_username = business_area.kobo_username
def filter_by_owner(data: List, kobo_username: str) -> List:
if data:
return [element for element in data if element["owner__username"] == kobo_username]
return []
9 changes: 9 additions & 0 deletions backend/hct_mis_api/apps/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from django.db.models import JSONField, UniqueConstraint
from django.utils.translation import gettext_lazy as _

from constance import config
from django_celery_beat.models import PeriodicTask
from django_celery_beat.schedulers import DatabaseScheduler, ModelEntry
from model_utils import Choices
Expand Down Expand Up @@ -71,9 +72,12 @@ class BusinessArea(NaturalKeyModel, TimeStampedUUIDModel):
long_name = models.CharField(max_length=255)
region_code = models.CharField(max_length=8)
region_name = models.CharField(max_length=8)

# TODO: deprecated to remove in the next release
kobo_username = models.CharField(max_length=255, null=True, blank=True)
kobo_token = models.CharField(max_length=255, null=True, blank=True)
kobo_url = models.URLField(max_length=255, null=True, blank=True)

rapid_pro_host = models.URLField(null=True, blank=True)
rapid_pro_payment_verification_token = models.CharField(max_length=40, null=True, blank=True)
rapid_pro_messages_token = models.CharField(max_length=40, null=True, blank=True)
Expand Down Expand Up @@ -157,6 +161,11 @@ class Meta:
def __str__(self) -> str:
return self.name

def get_kobo_token(self) -> str:
if config.KOBO_ENABLE_SINGLE_USER_ACCESS:
return settings.KOBO_MASTER_API_TOKEN
return self.kobo_token

def natural_key(self) -> Tuple[str]:
return (self.code,)

Expand Down
27 changes: 20 additions & 7 deletions backend/hct_mis_api/apps/core/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from django.core.exceptions import ObjectDoesNotExist
from django.db import models
from django.db.models import Q
from django.db.models import F, Q

import graphene
from constance import config
Expand All @@ -28,8 +28,12 @@
from hct_mis_api.apps.core.extended_connection import ExtendedConnection
from hct_mis_api.apps.core.field_attributes.core_fields_attributes import FieldFactory
from hct_mis_api.apps.core.field_attributes.fields_types import FILTERABLE_TYPES, Scope
from hct_mis_api.apps.core.kobo.api import KoboAPI
from hct_mis_api.apps.core.kobo.common import reduce_asset, reduce_assets_list
from hct_mis_api.apps.core.kobo.api import CountryCodeNotProvided, KoboAPI
from hct_mis_api.apps.core.kobo.common import (
filter_by_owner,
reduce_asset,
reduce_assets_list,
)
from hct_mis_api.apps.core.languages import Language, Languages
from hct_mis_api.apps.core.models import (
BusinessArea,
Expand Down Expand Up @@ -281,11 +285,12 @@ def get_fields_attr_generators(

def resolve_asset(business_area_slug: str, uid: str) -> Dict:
try:
assets = KoboAPI(business_area_slug).get_single_project_data(uid)
business_area = BusinessArea.objects.get(slug=business_area_slug)
assets = KoboAPI(token=business_area.get_kobo_token()).get_single_project_data(uid)
except ObjectDoesNotExist as e:
logger.exception(f"Provided business area: {business_area_slug}, does not exist.")
raise GraphQLError("Provided business area does not exist.") from e
except AttributeError as error:
except AttributeError as error: # pragma: no cover
logger.exception(error)
raise GraphQLError(str(error)) from error

Expand All @@ -294,13 +299,21 @@ def resolve_asset(business_area_slug: str, uid: str) -> Dict:

def resolve_assets_list(business_area_slug: str, only_deployed: bool = False) -> List:
try:
assets = KoboAPI(business_area_slug).get_all_projects_data()
business_area = BusinessArea.objects.annotate(country_code=F("countries__iso_code3")).get(
slug=business_area_slug
)
assets = KoboAPI(token=business_area.get_kobo_token()).get_all_projects_data(business_area.country_code)

if not config.KOBO_ENABLE_SINGLE_USER_ACCESS: # pragma: no cover
assets = filter_by_owner(assets, business_area.kobo_username)
except ObjectDoesNotExist as e:
logger.exception(f"Provided business area: {business_area_slug}, does not exist.")
raise GraphQLError("Provided business area does not exist.") from e
except AttributeError as error:
except AttributeError as error: # pragma: no cover
logger.exception(error)
raise GraphQLError(str(error)) from error
except CountryCodeNotProvided:
raise GraphQLError(f"Business area {business_area_slug} does not have a country code.")

return reduce_assets_list(assets, only_deployed=only_deployed)

Expand Down
Loading
Loading