Skip to content

Commit

Permalink
Add partitions for list on assets and events (#544)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderland authored Sep 19, 2019
1 parent 74fbf5a commit cc40354
Show file tree
Hide file tree
Showing 13 changed files with 152 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Changes are grouped as follows
### Added
- Support for aggregated properties of assets.
- `Asset` and `AssetList` classes now have a `sequences` function which retrieves related sequences.
- Support for partitioned listing of assets and events.

### Changed
- `list` and `__call__` methods for assets now support list parameters for `root_ids`, `root_external_ids`.
Expand Down
2 changes: 1 addition & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ podTemplate(
resourceRequestCpu: '1000m',
resourceRequestMemory: '800Mi',
resourceLimitCpu: '1000m',
resourceLimitMemory: '800Mi',
resourceLimitMemory: '1200Mi',
ttyEnabled: true),
containerTemplate(name: 'node',
image: 'node:slim',
Expand Down
15 changes: 9 additions & 6 deletions cognite/client/_api/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def list(
root: bool = None,
external_id_prefix: str = None,
aggregated_properties: List[str] = None,
partitions: int = None,
limit: int = 25,
) -> AssetList:
"""List assets
Expand All @@ -163,13 +164,14 @@ def list(
parent_ids (List[int]): List of parent ids to filter on.
root_ids (List[int], optional): List of root ids ids to filter on.
root_external_ids (List[str], optional): List of root external ids to filter on.
metadata (Dict[str, Any]): Custom, application specific metadata. String key -> String value
source (str): The source of this asset
created_time (Dict[str, Any]): Range between two timestamps
last_updated_time (Dict[str, Any]): Range between two timestamps
root (bool): filtered assets are root assets or not
external_id_prefix (str): External Id provided by client. Should be unique within the project
metadata (Dict[str, Any]): Custom, application specific metadata. String key -> String value.
source (str): The source of this asset.
created_time (Dict[str, Any]): Range between two timestamps.
last_updated_time (Dict[str, Any]): Range between two timestamps.
root (bool): filtered assets are root assets or not.
external_id_prefix (str): External Id provided by client. Should be unique within the project.
aggregated_properties (List[str]): Set of aggregated properties to include.
partitions (int): Retrieve assets in parallel using this number of workers. Also requires `limit=None` to be passed.
limit (int, optional): Maximum number of assets to return. Defaults to 25. Set to -1, float("inf") or None
to return all items.
Expand Down Expand Up @@ -221,6 +223,7 @@ def list(
limit=limit,
filter=filter,
other_params={"aggregatedProperties": aggregated_properties} if aggregated_properties else {},
partitions=partitions,
)

def create(self, asset: Union[Asset, List[Asset]]) -> Union[Asset, AssetList]:
Expand Down
14 changes: 8 additions & 6 deletions cognite/client/_api/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,22 +141,24 @@ def list(
created_time: Dict[str, Any] = None,
last_updated_time: Dict[str, Any] = None,
external_id_prefix: str = None,
partitions: int = None,
limit: int = 25,
) -> EventList:
"""List events
Args:
start_time (Dict[str, Any]): Range between two timestamps
end_time (Dict[str, Any]): Range between two timestamps
start_time (Dict[str, Any]): Range between two timestamps.
end_time (Dict[str, Any]): Range between two timestamps.
type (str): Type of the event, e.g 'failure'.
subtype (str): Subtype of the event, e.g 'electrical'.
metadata (Dict[str, Any]): Customizable extra data about the event. String key -> String value.
asset_ids (List[int]): Asset IDs of related equipments that this event relates to.
root_asset_ids (List[Dict[str, Any]]): The IDs of the root assets that the related assets should be children of.
source (str): The source of this event.
created_time (Dict[str, Any]): Range between two timestamps
last_updated_time (Dict[str, Any]): Range between two timestamps
external_id_prefix (str): External Id provided by client. Should be unique within the project
created_time (Dict[str, Any]): Range between two timestamps.
last_updated_time (Dict[str, Any]): Range between two timestamps.
external_id_prefix (str): External Id provided by client. Should be unique within the project.
partitions (int): Retrieve events in parallel using this number of workers. Also requires `limit=None` to be passed.
limit (int, optional): Maximum number of events to return. Defaults to 25. Set to -1, float("inf") or None
to return all items.
Expand Down Expand Up @@ -198,7 +200,7 @@ def list(
type=type,
subtype=subtype,
).dump(camel_case=True)
return self._list(method="POST", limit=limit, filter=filter)
return self._list(method="POST", limit=limit, filter=filter, partitions=partitions)

def create(self, event: Union[Event, List[Event]]) -> Union[Event, EventList]:
"""Create one or more events.
Expand Down
51 changes: 50 additions & 1 deletion cognite/client/_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,21 @@ def _list(
limit: int = None,
filter: Dict = None,
other_params=None,
partitions=None,
headers: Dict = None,
):
if partitions:
if limit not in [None, -1, float("inf")]:
raise ValueError("When using partitions, limit should be `None`, `-1` or `inf`.")
return self._list_partitioned(
partitions=partitions,
cls=cls,
resource_path=resource_path,
filter=filter,
other_params=other_params,
headers=headers,
)

cls = cls or self._LIST_CLASS
resource_path = resource_path or self._RESOURCE_PATH
items = []
Expand All @@ -336,12 +349,48 @@ def _list(
limit=limit,
chunk_size=self._LIST_LIMIT,
filter=filter,
headers=headers,
other_params=other_params,
headers=headers,
):
items.extend(resource_list.data)
return cls(items, cognite_client=self._cognite_client)

def _list_partitioned(
self,
partitions,
cls=None,
resource_path: str = None,
filter: Dict = None,
other_params=None,
headers: Dict = None,
):
cls = cls or self._LIST_CLASS
resource_path = resource_path or self._RESOURCE_PATH

def get_partition(partition):
next_cursor = None
retrieved_items = []
while True:
body = {
"filter": filter or {},
"limit": self._LIST_LIMIT,
"cursor": next_cursor,
"partition": partition,
**(other_params or {}),
}
res = self._post(url_path=resource_path + "/list", json=body, headers=headers)
retrieved_items.extend(res.json()["items"])
next_cursor = res.json().get("nextCursor")
if next_cursor is None:
break
return retrieved_items

tasks = [("{}/{}".format(i + 1, partitions),) for i in range(partitions)]
tasks_summary = utils._concurrency.execute_tasks_concurrently(get_partition, tasks, max_workers=partitions)
if tasks_summary.exceptions:
raise tasks_summary.exceptions[0]
return cls._load(tasks_summary.joined_results(), cognite_client=self._cognite_client)

def _create_multiple(
self,
items: Union[List[Any], Any],
Expand Down
9 changes: 9 additions & 0 deletions tests/tests_integration/test_api/test_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ def test_list(self, post_spy):
assert 20 == len(res)
assert 2 == COGNITE_CLIENT.assets._post.call_count

def test_partitioned_list(self, post_spy):
# stop race conditions by cutting off max created time
maxtime = int(time.time() - 3600) * 1000
res_flat = COGNITE_CLIENT.assets.list(limit=None, created_time={"max": maxtime})
res_part = COGNITE_CLIENT.assets.list(partitions=8, limit=None, created_time={"max": maxtime})
assert len(res_flat) > 0
assert len(res_flat) == len(res_part)
assert {a.id for a in res_flat} == {a.id for a in res_part}

def test_list_with_aggregated_properties_param(self, post_spy):
res = COGNITE_CLIENT.assets.list(limit=10, aggregated_properties=["child_count"])
for asset in res:
Expand Down
13 changes: 13 additions & 0 deletions tests/tests_integration/test_api/test_events.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import time
from datetime import datetime
from unittest import mock

import pytest
Expand Down Expand Up @@ -52,6 +54,17 @@ def test_list(self, post_spy):
assert 20 == len(res)
assert 2 == COGNITE_CLIENT.events._post.call_count

def test_partitioned_list(self, post_spy):
# stop race conditions by cutting off max created time
maxtime = utils.timestamp_to_ms(datetime(2019, 4, 29, 17, 30))
res_flat = COGNITE_CLIENT.events.list(limit=None, type="test-data-populator", start_time={"max": maxtime})
res_part = COGNITE_CLIENT.events.list(
partitions=8, type="test-data-populator", start_time={"max": maxtime}, limit=None
)
assert len(res_flat) > 0
assert len(res_flat) == len(res_part)
assert {a.id for a in res_flat} == {a.id for a in res_part}

def test_search(self):
res = COGNITE_CLIENT.events.search(
filter=EventFilter(start_time={"min": cognite.client.utils._time.timestamp_to_ms("2d-ago")})
Expand Down
29 changes: 13 additions & 16 deletions tests/tests_unit/test_api/test_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,22 @@
COGNITE_CLIENT = CogniteClient()
ASSETS_API = COGNITE_CLIENT.assets

EXAMPLE_ASSET = {
"externalId": "string",
"name": "string",
"parentId": 1,
"description": "string",
"metadata": {"metadata-key": "metadata-value"},
"source": "string",
"id": 1,
"lastUpdatedTime": 0,
"rootId": 1,
}


@pytest.fixture
def mock_assets_response(rsps):
response_body = {
"items": [
{
"externalId": "string",
"name": "string",
"parentId": 1,
"description": "string",
"metadata": {"metadata-key": "metadata-value"},
"source": "string",
"id": 1,
"lastUpdatedTime": 0,
"rootId": 1,
}
]
}

response_body = {"items": [EXAMPLE_ASSET]}
url_pattern = re.compile(re.escape(ASSETS_API._get_base_url_with_base_path()) + "/.+")
rsps.add(rsps.POST, url_pattern, status=200, json=response_body)
yield rsps
Expand Down
4 changes: 4 additions & 0 deletions tests/tests_unit/test_api/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ def test_list(self, mock_events_response):
assert "bla" == jsgz_load(mock_events_response.calls[0].request.body)["filter"]["source"]
assert mock_events_response.calls[0].response.json()["items"] == res.dump(camel_case=True)

def test_list_partitions(self, mock_events_response):
EVENTS_API.list(partitions=13, limit=float("inf"))
assert 13 == len(mock_events_response.calls)

def test_create_single(self, mock_events_response):
res = EVENTS_API.create(Event(external_id="1"))
assert isinstance(res, Event)
Expand Down
1 change: 0 additions & 1 deletion tests/tests_unit/test_api/test_relationships.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ def test_create_single(self, mock_rel_response):
def test_create_single_types(self, mock_rel_response):
types = [Asset, TimeSeries, FileMetadata, Event]
for cls in types:
print(cls)
test = cls(external_id="test")
res = REL_API.create(
Relationship(
Expand Down
2 changes: 0 additions & 2 deletions tests/tests_unit/test_api/test_sequences.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,6 @@ def test_sequence_data_builtins(self, mock_seq_response, mock_get_sequence_data)
list_request = [call for call in mock_get_sequence_data.calls if "/data/list" in call.request.url][0]
response = list_request.response.json()
data_dump = r1.dump(camel_case=True)
print(response)
for f in ["columns", "rows", "id"]:
assert response[f] == data_dump[f]

Expand All @@ -425,7 +424,6 @@ def test_retrieve_dataframe_columns_mixed(self, mock_seq_response, mock_get_sequ
def test_retrieve_dataframe_columns_many_extid(self, mock_get_sequence_data_many_columns):
data = SEQ_API.data.retrieve(external_id="foo", start=1000000, end=1100000)
assert isinstance(data, SequenceData)
print(data.to_pandas())
assert ["ceid" + str(i) for i in range(200)] == list(data.to_pandas().columns)

def test_retrieve_dataframe_convert_null(self, mock_seq_response, mock_get_sequence_data_with_null):
Expand Down
6 changes: 3 additions & 3 deletions tests/tests_unit/test_api/test_signatures.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ class TestListAndIterSignatures:
@pytest.mark.parametrize(
"api, filter, ignore",
[
(assets.AssetsAPI, assets.AssetFilter, ["root_external_ids", "aggregated_properties"]),
(events.EventsAPI, events.EventFilter, []),
(assets.AssetsAPI, assets.AssetFilter, ["root_external_ids", "aggregated_properties", "partitions"]),
(events.EventsAPI, events.EventFilter, ["partitions"]),
(files.FilesAPI, files.FileMetadataFilter, []),
(sequences.SequencesAPI, sequences.SequenceFilter, []),
],
)
def test_list_and_iter_signatures_same_as_filter_signature(self, api, filter, ignore):
iter_parameters = dict(inspect.signature(api.__call__).parameters)
for name in ignore + ["chunk_size", "limit"]:
for name in set(ignore + ["chunk_size", "limit"]) - {"partitions"}:
del iter_parameters[name]

list_parameters = dict(inspect.signature(api.list).parameters)
Expand Down
41 changes: 41 additions & 0 deletions tests/tests_unit/test_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,47 @@ def test_standard_list_fail(self, rsps):
NUMBER_OF_ITEMS_FOR_AUTOPAGING = 11500
ITEMS_TO_GET_WHILE_AUTOPAGING = [{"x": 1, "y": 1} for _ in range(NUMBER_OF_ITEMS_FOR_AUTOPAGING)]

def test_list_partitions(self, rsps):
rsps.add(rsps.POST, BASE_URL + URL_PATH + "/list", status=200, json={"items": [{"x": 1, "y": 2}, {"x": 1}]})
res = API_CLIENT._list(
cls=SomeResourceList,
resource_path=URL_PATH,
method="POST",
partitions=3,
limit=None,
headers={"X-Test": "foo"},
)
assert 6 == len(res)
assert isinstance(res, SomeResourceList)
assert isinstance(res[0], SomeResource)
assert 3 == len(rsps.calls)
assert {"1/3", "2/3", "3/3"} == {jsgz_load(c.request.body)["partition"] for c in rsps.calls}
for call in rsps.calls:
request = jsgz_load(call.request.body)
assert "X-Test" in call.request.headers.keys()
del request["partition"]
assert {"cursor": None, "filter": {}, "limit": 1000} == request
assert call.response.json()["items"] == [{"x": 1, "y": 2}, {"x": 1}]

def test_list_partitions_with_failure(self, rsps):
def request_callback(request):
payload = jsgz_load(request.body)
np, total = payload["partition"].split("/")
if int(np) == 2:
return 503, {}, json.dumps({"message": "Service Unavailable"})
else:
return 200, {}, json.dumps({"items": [{"x": 42, "y": 13}]})

rsps.add_callback(
rsps.POST, BASE_URL + URL_PATH + "/list", callback=request_callback, content_type="application/json"
)
with pytest.raises(CogniteAPIError) as exc:
res = API_CLIENT._list(
cls=SomeResourceList, resource_path=URL_PATH, method="POST", partitions=4, limit=None
)
assert 503 == exc.value.code
assert 4 == len(rsps.calls)

@pytest.fixture
def mock_get_for_autopaging(self, rsps):
def callback(request):
Expand Down

0 comments on commit cc40354

Please sign in to comment.