From 7b03e60f9d3acac4951ddf80f395a688bf01ac47 Mon Sep 17 00:00:00 2001 From: Ken Michalak Date: Wed, 18 Oct 2023 08:08:37 -0500 Subject: [PATCH] add pagination and base_url to JSON query_runner (#6499) --- redash/query_runner/json_ds.py | 125 +++++++++++++++++++++++++---- tests/query_runner/test_json_ds.py | 88 ++++++++++++++++++++ 2 files changed, 196 insertions(+), 17 deletions(-) create mode 100644 tests/query_runner/test_json_ds.py diff --git a/redash/query_runner/json_ds.py b/redash/query_runner/json_ds.py index de763dd68e..4508430fe0 100644 --- a/redash/query_runner/json_ds.py +++ b/redash/query_runner/json_ds.py @@ -1,5 +1,6 @@ import datetime import logging +from urllib.parse import urljoin import yaml from funcy import compact, project @@ -61,7 +62,7 @@ def add_column(columns, column_name, column_type): columns.append({"name": column_name, "friendly_name": column_name, "type": column_type}) -def _apply_path_search(response, path): +def _apply_path_search(response, path, default=None): if path is None: return response @@ -71,6 +72,8 @@ def _apply_path_search(response, path): current_path = path_parts.pop() if current_path in response: response = response[current_path] + elif default is not None: + return default else: raise Exception("Couldn't find path {} in response.".format(path)) @@ -78,6 +81,8 @@ def _apply_path_search(response, path): def _normalize_json(data, path): + if not data: + return None data = _apply_path_search(data, path) if isinstance(data, dict): @@ -94,9 +99,7 @@ def _sort_columns_with_fields(columns, fields): # TODO: merge the logic here with the one in MongoDB's queyr runner -def parse_json(data, path, fields): - data = _normalize_json(data, path) - +def parse_json(data, fields): rows = [] columns = [] @@ -130,17 +133,19 @@ def parse_json(data, path, fields): class JSON(BaseHTTPQueryRunner): requires_url = False + base_url_title = "Base URL" @classmethod def configuration_schema(cls): return { "type": "object", "properties": { + "base_url": {"type": "string", "title": cls.base_url_title}, "username": {"type": "string", "title": cls.username_title}, "password": {"type": "string", "title": cls.password_title}, }, "secret": ["password"], - "order": ["username", "password"], + "order": ["base_url", "username", "password"], } def __init__(self, configuration): @@ -153,6 +158,16 @@ def test_connection(self): def run_query(self, query, user): query = parse_query(query) + results, error = self._run_json_query(query) + if error is not None: + return None, error + + data = json_dumps(results) + if data: + return data, None + return None, "Got empty response from '{}'.".format(query["url"]) + + def _run_json_query(self, query): if not isinstance(query, dict): raise QueryParseError("Query should be a YAML object describing the URL to query.") @@ -165,13 +180,15 @@ def run_query(self, query, user): fields = query.get("fields") path = query.get("path") + if "pagination" in query: + pagination = RequestPagination.from_config(self.configuration, query["pagination"]) + else: + pagination = None + if isinstance(request_options.get("auth", None), list): request_options["auth"] = tuple(request_options["auth"]) elif self.configuration.get("username") or self.configuration.get("password"): - request_options["auth"] = ( - self.configuration.get("username"), - self.configuration.get("password"), - ) + request_options["auth"] = (self.configuration.get("username"), self.configuration.get("password")) if method not in ("get", "post"): raise QueryParseError("Only GET or POST methods are allowed.") @@ -179,17 +196,91 @@ def run_query(self, query, user): if fields and not isinstance(fields, list): raise QueryParseError("'fields' needs to be a list.") - response, error = self.get_response(query["url"], http_method=method, **request_options) + results, error = self._get_all_results(query["url"], method, path, pagination, **request_options) + return parse_json(results, fields), error - if error is not None: - return None, error + def _get_all_results(self, url, method, result_path, pagination, **request_options): + """Get all results from a paginated endpoint.""" + base_url = self.configuration.get("base_url") + url = urljoin(base_url, url) - data = json_dumps(parse_json(response.json(), path, fields)) + results = [] + has_more = True + while has_more: + response, error = self._get_json_response(url, method, **request_options) + has_more = False - if data: - return data, None - else: - return None, "Got empty response from '{}'.".format(query["url"]) + result = _normalize_json(response, result_path) + if result: + results.extend(result) + if pagination: + has_more, url, request_options = pagination.next(url, request_options, response) + + return results, error + + def _get_json_response(self, url, method, **request_options): + response, error = self.get_response(url, http_method=method, **request_options) + result = response.json() if error is None else {} + return result, error + + +class RequestPagination: + def next(self, url, request_options, response): + """Checks the response for another page. + + Returns: + has_more, next_url, next_request_options + """ + return False, None, request_options + + @staticmethod + def from_config(configuration, pagination): + if not isinstance(pagination, dict) or not isinstance(pagination.get("type"), str): + raise QueryParseError("'pagination' should be an object with a `type` property") + + if pagination["type"] == "url": + return UrlPagination(pagination) + elif pagination["type"] == "token": + return TokenPagination(pagination) + + raise QueryParseError("Unknown 'pagination.type' {}".format(pagination["type"])) + + +class UrlPagination(RequestPagination): + def __init__(self, pagination): + self.path = pagination.get("path", "_links.next.href") + if not isinstance(self.path, str): + raise QueryParseError("'pagination.path' should be a string") + + def next(self, url, request_options, response): + next_url = _apply_path_search(response, self.path, "") + if not next_url: + return False, None, request_options + + next_url = urljoin(url, next_url) + return True, next_url, request_options + + +class TokenPagination(RequestPagination): + def __init__(self, pagination): + self.fields = pagination.get("fields", ["next_page_token", "page_token"]) + if not isinstance(self.fields, list) or len(self.fields) != 2: + raise QueryParseError("'pagination.fields' should be a list of 2 field names") + + def next(self, url, request_options, response): + next_token = _apply_path_search(response, self.fields[0], "") + if not next_token: + return False, None, request_options + + params = request_options.get("params", {}) + + # prevent infinite loop that can happen if self.fields[1] is wrong + if next_token == params.get(self.fields[1]): + raise Exception("{} did not change; possible misconfiguration".format(self.fields[0])) + + params[self.fields[1]] = next_token + request_options["params"] = params + return True, url, request_options register(JSON) diff --git a/tests/query_runner/test_json_ds.py b/tests/query_runner/test_json_ds.py new file mode 100644 index 0000000000..88d428ee19 --- /dev/null +++ b/tests/query_runner/test_json_ds.py @@ -0,0 +1,88 @@ +""" +Some test cases for JSON api runner +""" +from unittest import TestCase +from urllib.parse import urlencode, urljoin + +from redash.query_runner.json_ds import JSON + + +def mock_api(url, method, **request_options): + if "params" in request_options: + qs = urlencode(request_options["params"]) + url = urljoin(url, "?{}".format(qs)) + + data, error = None, None + + if url == "http://localhost/basics": + data = [{"id": 1}, {"id": 2}] + elif url == "http://localhost/token-test": + data = {"next_page_token": "2", "records": [{"id": 1}, {"id": 2}]} + elif url == "http://localhost/token-test?page_token=2": + data = {"next_page_token": "3", "records": [{"id": 3}, {"id": 4}]} + elif url == "http://localhost/token-test?page_token=3": + data = {"records": [{"id": 5}]} + elif url == "http://localhost/hateoas": + data = { + "_embedded": {"records": [{"id": 10}, {"id": 11}]}, + "_links": { + "first": {"href": "http://localhost/hateoas"}, + "self": {"href": "http://localhost/hateoas"}, + "next": {"href": "http://localhost/hateoas?page=2"}, + "last": {"href": "http://localhost/hateoas?page=2"}, + }, + "page": {"size": 2, "totalElements": 3, "totalPages": 2}, + } + elif url == "http://localhost/hateoas?page=2": + data = { + "_embedded": {"records": [{"id": 12}]}, + "_links": { + "first": {"href": "http://localhost/hateoas"}, + "self": {"href": "http://localhost/hateoas?page=2"}, + "prev": {"href": "http://localhost/hateoas"}, + "last": {"href": "http://localhost/hateoas?page=2"}, + }, + "page": {"size": 2, "totalElements": 3, "totalPages": 2}, + } + else: + error = "404: {} not found".format(url) + + return data, error + + +class TestJSON(TestCase): + def setUp(self): + self.runner = JSON({"base_url": "http://localhost/"}) + self.runner._get_json_response = mock_api + + def test_basics(self): + q = {"url": "basics"} + results, error = self.runner._run_json_query(q) + + expected = [{"id": 1}, {"id": 2}] + self.assertEqual(results["rows"], expected) + + def test_token_pagination(self): + q = { + "url": "token-test", + "pagination": {"type": "token", "fields": ["next_page_token", "page_token"]}, + "path": "records", + } + results, error = self.runner._run_json_query(q) + self.assertIsNone(error) + + expected = [{"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}, {"id": 5}] + self.assertEqual(results["rows"], expected) + + def test_url_pagination(self): + q = { + "url": "hateoas", + "pagination": {"type": "url", "path": "_links.next.href"}, + "path": "_embedded.records", + "fields": ["id"], + } + results, error = self.runner._run_json_query(q) + self.assertIsNone(error) + + expected = [{"id": 10}, {"id": 11}, {"id": 12}] + self.assertEqual(results["rows"], expected)