diff --git a/VERSION b/VERSION index 7ec1d6d..ccbccc3 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.1.0 +2.2.0 diff --git a/exabel_data_sdk/client/api/entity_api.py b/exabel_data_sdk/client/api/entity_api.py index e772c07..1e7acef 100644 --- a/exabel_data_sdk/client/api/entity_api.py +++ b/exabel_data_sdk/client/api/entity_api.py @@ -146,6 +146,36 @@ def update_entity(self, entity: Entity, update_mask: FieldMask = None) -> Entity ) return Entity.from_proto(response) + def upsert_entity(self, entity: Entity, assume_exists: bool = True) -> Entity: + """ + Upsert an entity. + + If the entity already exists, update it by replacement. Otherwise, create it. + + Args: + entity: The entity to upsert. + assume_exists: If True, the entity is assumed to exist. Will try to to update + the entity, and fall back to creating it if it does not exist, + and vice versa. + """ + if assume_exists: + try: + entity = self.update_entity(entity) + except RequestError as error: + if error.error_type == ErrorType.NOT_FOUND: + entity = self.create_entity(entity, entity.get_entity_type()) + else: + raise error + else: + try: + entity = self.create_entity(entity, entity.get_entity_type()) + except RequestError as error: + if error.error_type == ErrorType.ALREADY_EXISTS: + entity = self.update_entity(entity) + else: + raise error + return entity + def delete_entity(self, name: str) -> None: """ Delete one entity. @@ -189,6 +219,7 @@ def bulk_create_entities( entities: Sequence[Entity], entity_type: str, threads: int = 40, + upsert: bool = False, ) -> ResourceCreationResults[Entity]: """ Check if the provided entities exist, and create them if they don't. @@ -197,6 +228,10 @@ def bulk_create_entities( """ def insert(entity: Entity) -> ResourceCreationStatus: + if upsert: + # Upsert entities assuming they already exist. + self.upsert_entity(entity=entity, assume_exists=True) + return ResourceCreationStatus.UPSERTED # Optimistically insert the entity. # If the entity already exists, we'll get an ALREADY_EXISTS error from the backend, # which is handled appropriately by the bulk_insert function. diff --git a/exabel_data_sdk/client/api/relationship_api.py b/exabel_data_sdk/client/api/relationship_api.py index 7f980af..a268dc3 100644 --- a/exabel_data_sdk/client/api/relationship_api.py +++ b/exabel_data_sdk/client/api/relationship_api.py @@ -243,6 +243,38 @@ def update_relationship( ) return Relationship.from_proto(response) + def upsert_relationship( + self, relationship: Relationship, assume_exists: bool = True + ) -> Relationship: + """ + Upsert a relationship between two entities. + + If the relationship already exists, update it by replacement. Otherwise, create it. + + Args: + relationship: The relationship to upsert. + assume_exists: If True, the relationship is assumed to exist. Will try to to update + the relationship, and fall back to creating it if it does not exist, + and vice versa. + """ + if assume_exists: + try: + relationship = self.update_relationship(relationship) + except RequestError as error: + if error.error_type == ErrorType.NOT_FOUND: + relationship = self.create_relationship(relationship) + else: + raise error + else: + try: + relationship = self.create_relationship(relationship) + except RequestError as error: + if error.error_type == ErrorType.ALREADY_EXISTS: + relationship = self.update_relationship(relationship) + else: + raise error + return relationship + def delete_relationship(self, relationship_type: str, from_entity: str, to_entity: str) -> None: """ Delete a relationship. @@ -277,6 +309,7 @@ def bulk_create_relationships( self, relationships: Sequence[Relationship], threads: int = 40, + upsert: bool = False, ) -> ResourceCreationResults[Relationship]: """ Check if the provided relationships exist, and create them if they don't. @@ -284,6 +317,10 @@ def bulk_create_relationships( """ def insert(relationship: Relationship) -> ResourceCreationStatus: + if upsert: + # Upsert relationships assuming they already exist. + self.upsert_relationship(relationship=relationship, assume_exists=True) + return ResourceCreationStatus.UPSERTED # Optimistically insert the relationship. # If the relationship already exists, we'll get an ALREADY_EXISTS error from the # backend, which is handled appropriately by the bulk_insert function. diff --git a/exabel_data_sdk/client/api/resource_creation_result.py b/exabel_data_sdk/client/api/resource_creation_result.py index f66f5ed..da54749 100644 --- a/exabel_data_sdk/client/api/resource_creation_result.py +++ b/exabel_data_sdk/client/api/resource_creation_result.py @@ -26,6 +26,9 @@ class ResourceCreationStatus(Enum): # Denotes that creation failed. FAILED = 3 + # Denotes that a resource was upserted. + UPSERTED = 4 + class ResourceCreationResult(Generic[TResource]): """ @@ -57,6 +60,7 @@ def __init__( print_status: Whether to print status of the upload during processing. abort_threshold: If the fraction of failed requests exceeds this threshold, the upload is aborted. + upsert: Update resources that already exist. """ self.results: List[ResourceCreationResult[TResource]] = [] self.counter: Counter = Counter() @@ -117,9 +121,12 @@ def check_failures(self) -> None: def print_summary(self) -> None: """Prints a human legible summary of the resource creation results to screen.""" - print(self.counter[ResourceCreationStatus.CREATED], "new resources created") + if self.counter[ResourceCreationStatus.CREATED]: + print(self.counter[ResourceCreationStatus.CREATED], "new resources created") if self.counter[ResourceCreationStatus.EXISTS]: print(self.counter[ResourceCreationStatus.EXISTS], "resources already existed") + if self.counter[ResourceCreationStatus.UPSERTED]: + print(self.counter[ResourceCreationStatus.UPSERTED], "resources upserted") if self.counter[ResourceCreationStatus.FAILED]: print(self.counter[ResourceCreationStatus.FAILED], "resources failed:") for result in self.results: @@ -134,13 +141,17 @@ def print_status(self) -> None: Note that the previous status message is overwritten (by writing '\r'), but this only works if nothing else has been printed to stdout since the last update. """ + message_parts = [] fraction_complete = self.count() / self.total_count - sys.stdout.write( - f"\r{fraction_complete:.0%} - " - f"{self.count(ResourceCreationStatus.CREATED)} created, " - f"{self.count(ResourceCreationStatus.EXISTS)} exists, " - f"{self.count(ResourceCreationStatus.FAILED)} failed" - ) + message_parts.append(f"\r{fraction_complete:.0%} - ") + if self.counter[ResourceCreationStatus.CREATED]: + message_parts.append(f"{self.count(ResourceCreationStatus.CREATED)} created, ") + if self.counter[ResourceCreationStatus.UPSERTED]: + message_parts.append(f"{self.count(ResourceCreationStatus.UPSERTED)} upserted, ") + if self.counter[ResourceCreationStatus.EXISTS]: + message_parts.append(f"{self.count(ResourceCreationStatus.EXISTS)} exists, ") + message_parts.append(f"{self.count(ResourceCreationStatus.FAILED)} failed") + sys.stdout.write("".join(message_parts)) if fraction_complete == 1: sys.stdout.write("\n") sys.stdout.flush() diff --git a/exabel_data_sdk/client/api/time_series_api.py b/exabel_data_sdk/client/api/time_series_api.py index c695144..138ab2b 100644 --- a/exabel_data_sdk/client/api/time_series_api.py +++ b/exabel_data_sdk/client/api/time_series_api.py @@ -338,7 +338,7 @@ def insert(ts: pd.Series) -> ResourceCreationStatus: existed = self.upsert_time_series( str(ts.name), ts, create_tag=create_tag, default_known_time=default_known_time ) - return ResourceCreationStatus.EXISTS if existed else ResourceCreationStatus.CREATED + return ResourceCreationStatus.UPSERTED if existed else ResourceCreationStatus.CREATED return bulk_insert(series, insert, threads=threads) diff --git a/exabel_data_sdk/scripts/load_entities_from_csv.py b/exabel_data_sdk/scripts/load_entities_from_csv.py index 59ee011..e617121 100644 --- a/exabel_data_sdk/scripts/load_entities_from_csv.py +++ b/exabel_data_sdk/scripts/load_entities_from_csv.py @@ -57,6 +57,13 @@ def __init__(self, argv: Sequence[str], description: str): help="The column name for the entity description. " "If not specified, no description is provided.", ) + self.parser.add_argument( + "--upsert", + required=False, + action="store_true", + default=False, + help="Update entities if they already exist.", + ) def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None: @@ -101,7 +108,9 @@ def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None: return try: - client.entity_api.bulk_create_entities(entities, entity_type_name, threads=args.threads) + client.entity_api.bulk_create_entities( + entities, entity_type_name, threads=args.threads, upsert=args.upsert + ) except BulkInsertFailedError: # An error summary has already been printed. pass diff --git a/exabel_data_sdk/scripts/load_relationships_from_csv.py b/exabel_data_sdk/scripts/load_relationships_from_csv.py index 4c252fd..edb6540 100644 --- a/exabel_data_sdk/scripts/load_relationships_from_csv.py +++ b/exabel_data_sdk/scripts/load_relationships_from_csv.py @@ -65,6 +65,13 @@ def __init__(self, argv: Sequence[str], description: str): help="The column name for the relationship description. " "If not specified, no description is provided.", ) + self.parser.add_argument( + "--upsert", + required=False, + action="store_true", + default=False, + help="Update relationships if they already exist.", + ) def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None: @@ -130,7 +137,9 @@ def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None: return try: - client.relationship_api.bulk_create_relationships(relationships, threads=args.threads) + client.relationship_api.bulk_create_relationships( + relationships, threads=args.threads, upsert=args.upsert + ) except BulkInsertFailedError: # An error summary has already been printed. pass diff --git a/exabel_data_sdk/tests/client/api/mock_entity_api.py b/exabel_data_sdk/tests/client/api/mock_entity_api.py index 3207a5f..d01811f 100644 --- a/exabel_data_sdk/tests/client/api/mock_entity_api.py +++ b/exabel_data_sdk/tests/client/api/mock_entity_api.py @@ -44,7 +44,8 @@ def create_entity(self, entity: Entity, entity_type: str) -> Entity: return self.entities.create(entity) def update_entity(self, entity: Entity, update_mask: FieldMask = None) -> Entity: - raise NotImplementedError() + # Note: The mock implementation ignores update_mask + return self.entities.update(entity) def delete_entity(self, name: str) -> None: # Note: The mock implementation does not delete associated time series and relationships diff --git a/exabel_data_sdk/tests/client/api/mock_relationship_api.py b/exabel_data_sdk/tests/client/api/mock_relationship_api.py index fa160db..58246ff 100644 --- a/exabel_data_sdk/tests/client/api/mock_relationship_api.py +++ b/exabel_data_sdk/tests/client/api/mock_relationship_api.py @@ -76,7 +76,8 @@ def create_relationship(self, relationship: Relationship) -> Relationship: def update_relationship( self, relationship: Relationship, update_mask: FieldMask = None ) -> Relationship: - raise NotImplementedError() + # Note: The mock implementation ignores update_mask + return self.relationships.update(relationship, self._key(relationship)) def delete_relationship(self, relationship_type: str, from_entity: str, to_entity: str) -> None: self.relationships.delete((relationship_type, from_entity, to_entity)) diff --git a/exabel_data_sdk/tests/client/api/mock_resource_store.py b/exabel_data_sdk/tests/client/api/mock_resource_store.py index b72ff29..7c10b18 100644 --- a/exabel_data_sdk/tests/client/api/mock_resource_store.py +++ b/exabel_data_sdk/tests/client/api/mock_resource_store.py @@ -64,6 +64,26 @@ def create(self, resource: TResource, key: object = None) -> TResource: self.resources[key] = resource return resource + @failure_prone + def update(self, resource: TResource, key: object = None) -> TResource: + """ + Update the given resource in the store. + + Typically, resources have a resource name, which should be used as the key. + In this case, the key parameter should not be set. + If not, as in the case of Relationship resources, an explicit key must be provided. + + Args: + resource: the resource to create in the store + key: the key of the resource. Defaults to the resource's name. + """ + if key is None: + key = resource.name # type: ignore[attr-defined] + if key not in self.resources: + raise RequestError(ErrorType.NOT_FOUND, f"Does not exist: {key}") + self.resources[key] = resource + return resource + def delete(self, key: object): """Delete the resource with the given key.""" if key in self.resources: diff --git a/exabel_data_sdk/tests/client/api/test_entity_api.py b/exabel_data_sdk/tests/client/api/test_entity_api.py new file mode 100644 index 0000000..d409790 --- /dev/null +++ b/exabel_data_sdk/tests/client/api/test_entity_api.py @@ -0,0 +1,38 @@ +import unittest + +from exabel_data_sdk.client.api.data_classes.entity import Entity +from exabel_data_sdk.client.api.entity_api import EntityApi +from exabel_data_sdk.tests.client.api.mock_entity_api import MockEntityApi + + +class TestEntityApi(unittest.TestCase): + def test_upsert(self): + for assume_exists in (True, False): + entity_api: EntityApi = MockEntityApi() + expected = Entity( + name="entityTypes/company/entities/Amazon", + display_name="Amazon", + ) + created_entity = entity_api.upsert_entity(expected, assume_exists) + self.assertEqual(expected, created_entity) + updated_entity = entity_api.upsert_entity(expected, assume_exists) + self.assertEqual(expected, updated_entity) + + def test_upsert_replaces_resource(self): + for assume_exists in (True, False): + entity_api: EntityApi = MockEntityApi() + old_entity = Entity( + name="entityTypes/company/entities/Amazon", + display_name="Amazon's old display name", + description="Amazon's old description", + properties={"old_property": "old_value"}, + ) + expected = Entity( + name="entityTypes/company/entities/Amazon", + display_name="Amazon", + description="Amazon's new description", + ) + entity_api.create_entity(old_entity, old_entity.get_entity_type()) + entity_api.upsert_entity(expected, assume_exists) + actual_entity = entity_api.get_entity(expected.name) + self.assertEqual(expected, actual_entity) diff --git a/exabel_data_sdk/tests/client/api/test_relationship_api.py b/exabel_data_sdk/tests/client/api/test_relationship_api.py new file mode 100644 index 0000000..9f4c219 --- /dev/null +++ b/exabel_data_sdk/tests/client/api/test_relationship_api.py @@ -0,0 +1,47 @@ +import unittest + +from exabel_data_sdk.client.api.data_classes.relationship import Relationship +from exabel_data_sdk.client.api.relationship_api import RelationshipApi +from exabel_data_sdk.tests.client.api.mock_relationship_api import MockRelationshipApi + +AMAZON = "entityTypes/company/entities/Amazon" +USA = "entityTypes/country/entities/USA" +RELATIONHIP_TYPE = "relationshipTypes/LOCATED_IN" + + +class TestRelationshipApi(unittest.TestCase): + def test_upsert(self): + for assume_exists in (True, False): + relationship_api: RelationshipApi = MockRelationshipApi() + expected = Relationship( + relationship_type=RELATIONHIP_TYPE, + from_entity=AMAZON, + to_entity=USA, + ) + created_relationship = relationship_api.upsert_relationship(expected, assume_exists) + self.assertEqual(expected, created_relationship) + updated_relationship = relationship_api.upsert_relationship(expected, assume_exists) + self.assertEqual(expected, updated_relationship) + + def test_upsert_replaces_resource(self): + for assume_exists in (True, False): + relationship_api: RelationshipApi = MockRelationshipApi() + old_relationship = Relationship( + relationship_type=RELATIONHIP_TYPE, + from_entity=AMAZON, + to_entity=USA, + description="Old relationship description", + properties={"old_property": "old_value"}, + ) + expected = Relationship( + relationship_type=RELATIONHIP_TYPE, + from_entity=AMAZON, + to_entity=USA, + description="New relationship description", + ) + relationship_api.create_relationship(old_relationship) + relationship_api.upsert_relationship(expected, assume_exists) + actual_relationship = relationship_api.get_relationship( + expected.relationship_type, expected.from_entity, expected.to_entity + ) + self.assertEqual(expected, actual_relationship) diff --git a/exabel_data_sdk/tests/resources/data/entities.csv b/exabel_data_sdk/tests/resources/data/entities.csv index d6646dc..620ef47 100644 --- a/exabel_data_sdk/tests/resources/data/entities.csv +++ b/exabel_data_sdk/tests/resources/data/entities.csv @@ -1,4 +1,4 @@ brand,description Spring & Vine,Shampoo bars The Coconut Tree,Sri Lankan street food -Spring & Vine,This entry will be ignored because it's a duplicate +Spring & Vine,This entry might be ignored because it's a duplicate diff --git a/exabel_data_sdk/tests/resources/data/relationships.csv b/exabel_data_sdk/tests/resources/data/relationships.csv index 60b99ce..6e4355f 100644 --- a/exabel_data_sdk/tests/resources/data/relationships.csv +++ b/exabel_data_sdk/tests/resources/data/relationships.csv @@ -1,4 +1,4 @@ entity_from,brand,description entityTypes/company/company_x,Spring & Vine,Owned since 2019 -entityTypes/company/company_x,Spring & Vine,This entry will be ignored because it's a duplicate +entityTypes/company/company_x,Spring & Vine,This entry might be ignored because it's a duplicate entityTypes/company/company_y,The Coconut Tree,Acquired for $200M diff --git a/exabel_data_sdk/tests/scripts/common_utils.py b/exabel_data_sdk/tests/scripts/common_utils.py index 8678dce..37c7231 100644 --- a/exabel_data_sdk/tests/scripts/common_utils.py +++ b/exabel_data_sdk/tests/scripts/common_utils.py @@ -5,10 +5,12 @@ from exabel_data_sdk.tests.client.exabel_mock_client import ExabelMockClient -def load_test_data_from_csv(csv_script: Type[CsvScript], args: Sequence[str]) -> ExabelClient: +def load_test_data_from_csv( + csv_script: Type[CsvScript], args: Sequence[str], client: ExabelClient = None +) -> ExabelClient: """Loads entities to an ExabelMockClient using exabel_data_sdk.scripts.load_entities_from_csv""" script = csv_script(args, f"Test{type(csv_script).__name__}") - client = ExabelMockClient() + client = client or ExabelMockClient() script.run_script(client, script.parse_arguments()) return client diff --git a/exabel_data_sdk/tests/scripts/test_load_entities_from_csv.py b/exabel_data_sdk/tests/scripts/test_load_entities_from_csv.py index 0839cbf..97efa1e 100644 --- a/exabel_data_sdk/tests/scripts/test_load_entities_from_csv.py +++ b/exabel_data_sdk/tests/scripts/test_load_entities_from_csv.py @@ -76,3 +76,28 @@ def test_read_file_random_errors(self): for letter in "ABCDEFGHIJ" ] self.check_entities(client, expected_entities) + + def test_read_file_with_upsert(self): + args = common_args + [ + "--filename", + "./exabel_data_sdk/tests/resources/data/entities.csv", + "--description-col", + "description", + "--upsert", + ] + client = load_test_data_from_csv(LoadEntitiesFromCsv, args) + expected_entities = [ + Entity( + name="entityTypes/brand/entities/test.Spring_Vine", + display_name="Spring & Vine", + description="This entry might be ignored because it's a duplicate", + ), + Entity( + name="entityTypes/brand/entities/test.The_Coconut_Tree", + display_name="The Coconut Tree", + description="Sri Lankan street food", + ), + ] + self.check_entities(client, expected_entities) + client = load_test_data_from_csv(LoadEntitiesFromCsv, args, client) + self.check_entities(client, expected_entities) diff --git a/exabel_data_sdk/tests/scripts/test_load_relationships_from_csv.py b/exabel_data_sdk/tests/scripts/test_load_relationships_from_csv.py index 904ea7a..d24d4d7 100644 --- a/exabel_data_sdk/tests/scripts/test_load_relationships_from_csv.py +++ b/exabel_data_sdk/tests/scripts/test_load_relationships_from_csv.py @@ -83,3 +83,37 @@ def check_relationships(self, client, expected_relationships): expected_relationship.to_entity, ) self.assertEqual(expected_relationship, relationship) + + def test_read_file_with_upsert(self): + args = common_args + [ + "--filename", + "./exabel_data_sdk/tests/resources/data/relationships.csv", + "--entity-from-column", + "entity_from", + "--description-column", + "description", + "--upsert", + ] + client = load_test_data_from_csv(LoadRelationshipsFromCsv, args) + # Check that the relationship type was created + self.assertEqual( + RelationshipType("relationshipTypes/acme.PART_OF"), + client.relationship_api.get_relationship_type("relationshipTypes/acme.PART_OF"), + ) + expected_relationships = [ + Relationship( + relationship_type="relationshipTypes/acme.PART_OF", + from_entity="entityTypes/company/company_x", + to_entity="entityTypes/brand/entities/acme.Spring_Vine", + description="This entry might be ignored because it's a duplicate", + ), + Relationship( + relationship_type="relationshipTypes/acme.PART_OF", + from_entity="entityTypes/company/company_y", + to_entity="entityTypes/brand/entities/acme.The_Coconut_Tree", + description="Acquired for $200M", + ), + ] + self.check_relationships(client, expected_relationships) + client = load_test_data_from_csv(LoadRelationshipsFromCsv, args, client) + self.check_relationships(client, expected_relationships)