From 8a8a3ee3b6441b72ce0b783a1493612503053c46 Mon Sep 17 00:00:00 2001 From: Henrik Mathias Eiding <51706349+HMEiding@users.noreply.github.com> Date: Tue, 20 Aug 2019 12:32:06 +0200 Subject: [PATCH] Deduplicate during fetching when retrieving related resources on AssetList (#504) * Implement deduplication and update tests. * Update CHANGELOG.md * Updates after review. * Move lock out of loop. --- CHANGELOG.md | 1 + cognite/client/data_classes/assets.py | 31 +++++++++++++------ .../test_data_classes/test_assets.py | 15 +++++---- 3 files changed, 32 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ec83ef5f1..57eea3849d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ Changes are grouped as follows ### Changed - assets.create() no longer validates asset hierarchy and sorts assets before posting. This functionality has been moved to assets.create_hierarchy(). +- AssetList.files() and AssetList.events() now deduplicate results during fetching instead of as a postprocessing step. ## [1.0.5] - 2019-08-15 ### Added diff --git a/cognite/client/data_classes/assets.py b/cognite/client/data_classes/assets.py index 09be433fc3..c006e09576 100644 --- a/cognite/client/data_classes/assets.py +++ b/cognite/client/data_classes/assets.py @@ -1,3 +1,4 @@ +import threading from typing import * from cognite.client.data_classes._base import * @@ -173,6 +174,10 @@ class AssetList(CogniteResourceList): _RESOURCE = Asset _UPDATE = AssetUpdate + def __init__(self, resources: List[Any], cognite_client=None): + super().__init__(resources, cognite_client) + self._retrieve_chunk_size = 100 + def time_series(self) -> "TimeSeriesList": """Retrieve all time series related to these assets. @@ -204,21 +209,29 @@ def files(self) -> "FileMetadataList": return self._retrieve_related_resources(FileMetadataList, self._cognite_client.files) def _retrieve_related_resources(self, resource_list_class, resource_api): + seen = set() + lock = threading.Lock() + + def retrieve_and_deduplicate(asset_ids): + res = resource_api.list(asset_ids=asset_ids, limit=-1) + resources = resource_list_class([]) + with lock: + for resource in res: + if resource.id not in seen: + resources.append(resource) + seen.add(resource.id) + return resources + ids = [a.id for a in self.data] tasks = [] - chunk_size = 100 - for i in range(0, len(ids), chunk_size): - tasks.append({"asset_ids": ids[i : i + chunk_size], "limit": -1}) + for i in range(0, len(ids), self._retrieve_chunk_size): + tasks.append({"asset_ids": ids[i : i + self._retrieve_chunk_size]}) res_list = utils._concurrency.execute_tasks_concurrently( - resource_api.list, tasks, resource_api._config.max_workers + retrieve_and_deduplicate, tasks, resource_api._config.max_workers ).results resources = resource_list_class([]) - seen = set() for res in res_list: - for resource in res: - if resource.id not in seen: - resources.append(resource) - seen.add(resource.id) + resources.extend(res) return resources diff --git a/tests/tests_unit/test_data_classes/test_assets.py b/tests/tests_unit/test_data_classes/test_assets.py index 47b76ff5ca..1265db4726 100644 --- a/tests/tests_unit/test_data_classes/test_assets.py +++ b/tests/tests_unit/test_data_classes/test_assets.py @@ -79,11 +79,7 @@ def test_get_files(self): "resource_class, resource_list_class, method", [(FileMetadata, FileMetadataList, "files"), (Event, EventList, "events")], ) - @mock.patch("cognite.client.utils._concurrency") - def test_get_related_resources_should_not_return_duplicates( - self, mock_concurrency, resource_class, resource_list_class, method - ): - assets = AssetList([Asset(id=1), Asset(id=2), Asset(id=3)], cognite_client=mock.MagicMock()) + def test_get_related_resources_should_not_return_duplicates(self, resource_class, resource_list_class, method): r1 = resource_class(id=1) r2 = resource_class(id=2) r3 = resource_class(id=3) @@ -91,7 +87,14 @@ def test_get_related_resources_should_not_return_duplicates( resources_a2 = resource_list_class([r2, r3]) resources_a3 = resource_list_class([r2, r3]) - mock_concurrency.execute_tasks_concurrently.return_value.results = [resources_a1, resources_a2, resources_a3] + mock_cognite_client = mock.MagicMock() + mock_method = getattr(mock_cognite_client, method) + mock_method.list.side_effect = [resources_a1, resources_a2, resources_a3] + mock_method._config = mock.Mock(max_workers=3) + + assets = AssetList([Asset(id=1), Asset(id=2), Asset(id=3)], cognite_client=mock_cognite_client) + assets._retrieve_chunk_size = 1 + resources = getattr(assets, method)() expected = [r1, r2, r3] assert expected == resources