Skip to content

Commit

Permalink
new pagination interface, more stable reads, Bfabric.from_config (#85)
Browse files Browse the repository at this point in the history
- New pagination logic:
  - Specify max number of items
  - Specify offset of items to skip
- Stable reading
  - Reading passes a `createdbefore` query field (if it's not part of the query) to ensure consistent reads/pagination in the presence of insertions into the database. Deletions are not handled and might require an API feature.
  - It will need to be tested further before releasing, but my idea is that this is a lot more flexible moving forward
- New `Bfabric.from_config` arguably it is a bit redundant with the `get_system_auth` method, however I do feel like 99% of use cases will be handled with this method now and it will be easier to do so. I'm not changing the usage in this PR yet.
  • Loading branch information
leoschwarz authored May 7, 2024
1 parent 8f5635b commit 1472332
Show file tree
Hide file tree
Showing 4 changed files with 328 additions and 55 deletions.
180 changes: 128 additions & 52 deletions bfabric/bfabric2.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,45 @@
History
The python3 library first appeared in 2014.
"""
from __future__ import annotations

import base64
import logging
import os
import sys
from contextlib import contextmanager
from copy import deepcopy
from datetime import datetime
from enum import Enum
from pprint import pprint
from typing import Union, List, Optional
from typing import Any, Literal, ContextManager
from zoneinfo import ZoneInfo

from rich.console import Console

from bfabric import __version__ as PACKAGE_VERSION
from bfabric.bfabric_config import BfabricAuth, BfabricConfig, read_config
from bfabric.src.cli_formatting import HostnameHighlighter, DEFAULT_THEME
from bfabric.src.cli_formatting import DEFAULT_THEME, HostnameHighlighter
from bfabric.src.engine_suds import EngineSUDS
from bfabric.src.engine_zeep import EngineZeep
from bfabric.src.errors import get_response_errors
from bfabric.src.math_helper import div_int_ceil
from bfabric.src.paginator import page_iter, BFABRIC_QUERY_LIMIT
from bfabric.src.result_container import ResultContainer, BfabricResultType
from bfabric.src.paginator import BFABRIC_QUERY_LIMIT, compute_requested_pages, page_iter
from bfabric.src.result_container import BfabricResultType, ResultContainer


class BfabricAPIEngineType(Enum):
SUDS = 1
ZEEP = 2


def get_system_auth(login: str = None, password: str = None, base_url: str = None,
config_path: str = None, config_env: str = None, optional_auth: bool = False, verbose: bool = False):
def get_system_auth(
login: str = None,
password: str = None,
base_url: str = None,
config_path: str = None,
config_env: str = None,
optional_auth: bool = True,
verbose: bool = False,
) -> tuple[BfabricConfig, BfabricAuth]:
"""
:param login: Login string for overriding config file
:param password: Password for overriding config file
Expand All @@ -70,22 +80,25 @@ def get_system_auth(login: str = None, password: str = None, base_url: str = Non
if not os.path.isfile(config_path):
if have_config_path:
# NOTE: If user explicitly specifies a path to a wrong config file, this has to be an exception
raise IOError(f"Explicitly specified config file does not exist: {config_path}")
raise OSError(f"Explicitly specified config file does not exist: {config_path}")
# TODO: Convert to log
print(f"Warning: could not find the config file in the default location: {config_path}")
config = BfabricConfig(base_url=base_url)
auth = BfabricAuth(login=login, password=password)
if login is None and password is None:
auth = None
else:
auth = BfabricAuth(login=login, password=password)

# Load config from file, override some of the fields with the provided ones
else:
config, auth = read_config(config_path, config_env=config_env, optional_auth=optional_auth)
config, auth = read_config(config_path, config_env=config_env)
config = config.copy_with(base_url=base_url)
if (login is not None) and (password is not None):
auth = BfabricAuth(login=login, password=password)
elif (login is None) and (password is None):
auth = auth
else:
raise IOError("Must provide both username and password, or neither.")
raise OSError("Must provide both username and password, or neither.")

if not config.base_url:
raise ValueError("base_url missing")
Expand All @@ -108,14 +121,15 @@ class Bfabric:
def __init__(
self,
config: BfabricConfig,
auth: Optional[BfabricAuth],
auth: BfabricAuth | None,
engine: BfabricAPIEngineType = BfabricAPIEngineType.SUDS,
verbose: bool = False
):
verbose: bool = False,
) -> None:
self.verbose = verbose
self.query_counter = 0
self._config = config
self._auth = auth
self._zone_info = ZoneInfo(config.server_timezone)

if engine == BfabricAPIEngineType.SUDS:
self.engine = EngineSUDS(base_url=config.base_url)
Expand All @@ -129,6 +143,29 @@ def __init__(
if self.verbose:
self.print_version_message()

@classmethod
def from_config(
cls,
config_env: str | None = None,
auth: BfabricAuth | Literal["config"] | None = "config",
engine: BfabricAPIEngineType = BfabricAPIEngineType.SUDS,
verbose: bool = False,
) -> Bfabric:
"""Returns a new Bfabric instance, configured with the user configuration file.
If the `config_env` is specified then it will be used, if it is not specified the default environment will be
determined by checking the following in order (picking the first one that is found):
- The `BFABRICPY_CONFIG_ENV` environment variable
- The `default_config` field in the config file "GENERAL" section
:param config_env: Configuration environment to use. If not given, it is deduced as described above.
:param auth: Authentication to use. If "config" is given, the authentication will be read from the config file.
If it is set to None, no authentication will be used.
:param engine: Engine to use for the API. Default is SUDS.
:param verbose: Print a system info message to standard error console
"""
config, auth_config = get_system_auth(config_env=config_env)
auth_used: BfabricAuth | None = auth_config if auth == "config" else auth
return cls(config, auth_used, engine=engine, verbose=verbose)

@property
def config(self) -> BfabricConfig:
"""Returns the config object."""
Expand All @@ -144,7 +181,7 @@ def auth(self) -> BfabricAuth:
return self._auth

@contextmanager
def with_auth(self, auth: BfabricAuth):
def with_auth(self, auth: BfabricAuth) -> ContextManager[Bfabric]:
"""Context manager that temporarily (within the scope of the context) sets the authentication for
the Bfabric object to the provided value. This is useful when authenticating multiple users, to avoid accidental
use of the wrong credentials.
Expand All @@ -156,8 +193,15 @@ def with_auth(self, auth: BfabricAuth):
finally:
self._auth = old_auth

def read(self, endpoint: str, obj: dict, max_results: Optional[int] = 100, readid: bool = False, check: bool = True,
**kwargs) -> ResultContainer:
def read(
self,
endpoint: str,
obj: dict[str, Any],
max_results: int | None = 100,
offset: int = 0,
readid: bool = False,
check: bool = True,
) -> ResultContainer:
"""Reads objects from the specified endpoint that match all specified attributes in `obj`.
By setting `max_results` it is possible to change the number of results that are returned.
:param endpoint: endpoint
Expand All @@ -173,71 +217,104 @@ def read(self, endpoint: str, obj: dict, max_results: Optional[int] = 100, readi
:param check: whether to check for errors in the response
:return: List of responses, packaged in the results container
"""
# Ensure stability
obj = self._add_query_timestamp(obj)

# Get the first page.
# NOTE: According to old interface, this is equivalent to plain=True
response = self._read_method(readid, endpoint, obj, page=1, **kwargs)
response, errors = self._read_page(readid, endpoint, obj, page=1)

try:
n_pages = response["numberofpages"]
n_available_pages = response["numberofpages"]
except AttributeError:
n_pages = 0
n_available_pages = 0

# Return empty list if nothing found
if not n_pages:
result = ResultContainer([], self.result_type, total_pages_api=0, errors=get_response_errors(response, endpoint))
if not n_available_pages:
result = ResultContainer(
[], self.result_type, total_pages_api=0, errors=get_response_errors(response, endpoint)
)
if check:
result.assert_success()
return result

# Get results from other pages as well, if need be
# Only load as many pages as user has interest in
if max_results is None:
n_pages_trg = n_pages
else:
n_pages_trg = min(n_pages, div_int_ceil(max_results, BFABRIC_QUERY_LIMIT))
requested_pages, initial_offset = compute_requested_pages(
n_page_total=n_available_pages,
n_item_per_page=BFABRIC_QUERY_LIMIT,
n_item_offset=offset,
n_item_return_max=max_results,
)
logging.info(f"Requested pages: {requested_pages}")

# NOTE: Page numbering starts at 1
response_items = response[endpoint]
errors = []
for i_page in range(2, n_pages_trg + 1):
print('-- reading page', i_page, 'of', n_pages)
response = self._read_method(readid, endpoint, obj, page=i_page, **kwargs)
errors += get_response_errors(response, endpoint)
response_items += response[endpoint]

result = ResultContainer(response_items, self.result_type, total_pages_api=n_pages, errors=errors)
response_items = []
page_offset = initial_offset
for i_iter, i_page in enumerate(requested_pages):
if not (i_iter == 0 and i_page == 1):
print("-- reading page", i_page, "of", n_available_pages)
response, errors_page = self._read_page(readid, endpoint, obj, page=i_page)
errors += errors_page

response_items += response[endpoint][page_offset:]
page_offset = 0

result = ResultContainer(response_items, self.result_type, total_pages_api=n_available_pages, errors=errors)
if check:
result.assert_success()
return result

def save(self, endpoint: str, obj: dict, check: bool = True, **kwargs) -> ResultContainer:
results = self.engine.save(endpoint, obj, auth=self.auth, **kwargs)
def _add_query_timestamp(self, query: dict[str, Any]) -> dict[str, Any]:
"""Adds the current time as a createdbefore timestamp to the query, if there is no time in the query already.
This ensures pagination will be robust to insertion of new items during the query.
If a time is already present, it will be left as is, but a warning will be printed if it is in the future as
the query will not be robust to insertion of new items.
Note that this does not ensure robustness against deletion of items.
"""
server_time = datetime.now(self._zone_info)
if "createdbefore" in query:
query_time = datetime.fromisoformat(query["createdbefore"])
if query_time > server_time:
logging.warning(
f"Warning: Query timestamp is in the future: {query_time}. "
"This will not be robust to insertion of new items."
)
return query
else:
return {**query, "createdbefore": server_time.strftime("%Y-%m-%dT%H:%M:%S")}

def save(self, endpoint: str, obj: dict, check: bool = True) -> ResultContainer:
results = self.engine.save(endpoint, obj, auth=self.auth)
result = ResultContainer(results[endpoint], self.result_type, errors=get_response_errors(results, endpoint))
if check:
result.assert_success()
return result

def delete(self, endpoint: str, id: Union[List, int], check: bool = True) -> ResultContainer:
def delete(self, endpoint: str, id: int | list[int], check: bool = True) -> ResultContainer:
results = self.engine.delete(endpoint, id, auth=self.auth)
result = ResultContainer(results[endpoint], self.result_type, errors=get_response_errors(results, endpoint))
if check:
result.assert_success()
return result

def _read_method(self, readid: bool, endpoint: str, obj: dict, page: int = 1, **kwargs):
def _read_page(self, readid: bool, endpoint: str, query: dict[str, Any], page: int = 1):
"""Reads the specified page of objects from the specified endpoint that match the query."""
if readid:
# https://fgcz-bfabric.uzh.ch/wiki/tiki-index.php?page=endpoint.workunit#Web_Method_readid_
return self.engine.readid(endpoint, obj, auth=self.auth, page=page, **kwargs)
response = self.engine.readid(endpoint, query, auth=self.auth, page=page)
else:
return self.engine.read(endpoint, obj, auth=self.auth, page=page, **kwargs)
response = self.engine.read(endpoint, query, auth=self.auth, page=page)

return response, get_response_errors(response, endpoint)

############################
# Multi-query functionality
############################

# TODO: Is this scope sufficient? Is there ever more than one multi-query parameter, and/or not at the root of dict?
def read_multi(self, endpoint: str, obj: dict, multi_query_key: str, multi_query_vals: list,
readid: bool = False, **kwargs) -> ResultContainer:
def read_multi(
self, endpoint: str, obj: dict, multi_query_key: str, multi_query_vals: list, readid: bool = False
) -> ResultContainer:
"""
Makes a 1-parameter multi-query (there is 1 parameter that takes a list of values)
Since the API only allows BFABRIC_QUERY_LIMIT queries per page, split the list into chunks before querying
Expand All @@ -252,7 +329,7 @@ def read_multi(self, endpoint: str, obj: dict, multi_query_key: str, multi_query
NOTE: It is assumed that there is only 1 response for each value.
"""

response_tot = ResultContainer([], self.result_type, total_pages_api = 0)
response_tot = ResultContainer([], self.result_type, total_pages_api=0)
obj_extended = deepcopy(obj) # Make a copy of the query, not to make edits to the argument

# Iterate over request chunks that fit into a single API page
Expand All @@ -266,7 +343,7 @@ def read_multi(self, endpoint: str, obj: dict, multi_query_key: str, multi_query
# automatically? If yes, perhaps we don't need this method at all?
# TODO: It is assumed that a user requesting multi_query always wants all of the pages. Can anybody think of
# exceptions to this?
response_this = self.read(endpoint, obj_extended, max_results=None, readid=readid, **kwargs)
response_this = self.read(endpoint, obj_extended, max_results=None, readid=readid)
response_tot.extend(response_this)

return response_tot
Expand All @@ -288,7 +365,7 @@ def delete_multi(self, endpoint: str, id_list: list) -> ResultContainer:
response_tot = ResultContainer([], self.result_type, total_pages_api=0)

if len(id_list) == 0:
print('Warning, empty list provided for deletion, ignoring')
print("Warning, empty list provided for deletion, ignoring")
return response_tot

# Iterate over request chunks that fit into a single API page
Expand All @@ -298,7 +375,7 @@ def delete_multi(self, endpoint: str, id_list: list) -> ResultContainer:

return response_tot

def exists(self, endpoint: str, key: str, value: Union[List, Union[int, str]]) -> Union[bool, List[bool]]:
def exists(self, endpoint: str, key: str, value: list[int | str] | int | str) -> bool | list[bool]:
"""
:param endpoint: endpoint
:param key: A key for the query (e.g. id or name)
Expand All @@ -321,8 +398,8 @@ def exists(self, endpoint: str, key: str, value: Union[List, Union[int, str]]) -
for r in results.results:
if key in r:
result_vals += [r[key]]
elif '_' + key in r: # TODO: Remove this if SUDS bug is ever resolved
result_vals += [r['_' + key]]
elif "_" + key in r: # TODO: Remove this if SUDS bug is ever resolved
result_vals += [r["_" + key]]

# 3. For each of the requested ids, return true if there was a response and false if there was not
if is_scalar:
Expand All @@ -347,4 +424,3 @@ def print_version_message(self, stderr: bool = True) -> None:
"""
console = Console(stderr=stderr, highlighter=HostnameHighlighter(), theme=DEFAULT_THEME)
console.print(self.get_version_message(), style="bright_yellow")

40 changes: 39 additions & 1 deletion bfabric/src/paginator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
from __future__ import annotations

import math

# Single page query limit for BFabric API (as of time of writing, adapt if it changes)
BFABRIC_QUERY_LIMIT = 100


def page_iter(objs: list, page_size: int = BFABRIC_QUERY_LIMIT) -> list:
"""
:param objs: A list of objects to provide to bfabric as part of a query
Expand All @@ -9,4 +14,37 @@ def page_iter(objs: list, page_size: int = BFABRIC_QUERY_LIMIT) -> list:
"""

for i in range(0, len(objs), page_size):
yield objs[i:i + page_size]
yield objs[i : i + page_size]


def compute_requested_pages(
n_page_total: int,
n_item_per_page: int,
n_item_offset: int,
n_item_return_max: int | None,
) -> tuple[list[int], int]:
"""Returns the page indices that need to be requested to get all requested items.
:param n_page_total: Total number of pages available
:param n_item_per_page: Number of items per page
:param n_item_offset: Number of items to skip from the beginning
:param n_item_return_max: Maximum number of items to return
:return:
- list of page indices that need to be requested
- initial page offset (0-based), i.e. the i-th item from which onwards to retain results
"""
# B-Fabric API uses 1-based indexing for pages
index_start = 1

# Determine the page indices to request
# If n_item_return_max is not provided, we will return all items
if n_item_return_max is None:
n_item_return_max = n_page_total * n_item_per_page

# Determine the page indices to request
idx_max_return = math.ceil((n_item_return_max + n_item_offset) / n_item_per_page)
idx_arr = [idx + index_start for idx in range(n_item_offset // n_item_per_page, min(n_page_total, idx_max_return))]

# Determine the initial offset on the first page
initial_offset = min(n_item_offset, n_item_return_max) % n_item_per_page

return idx_arr, initial_offset
Loading

0 comments on commit 1472332

Please sign in to comment.