Skip to content

Commit

Permalink
Add upsert for entities and relationships (#55)
Browse files Browse the repository at this point in the history
* Upsert method for entities.
* Upsert method for relationships.
* Upsert flag when using the CSV uploader for entities and relationships.
  • Loading branch information
aksestok authored Jan 4, 2022
1 parent 0707d71 commit dfb6393
Show file tree
Hide file tree
Showing 17 changed files with 286 additions and 17 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.1.0
2.2.0
35 changes: 35 additions & 0 deletions exabel_data_sdk/client/api/entity_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,36 @@ def update_entity(self, entity: Entity, update_mask: FieldMask = None) -> Entity
)
return Entity.from_proto(response)

def upsert_entity(self, entity: Entity, assume_exists: bool = True) -> Entity:
"""
Upsert an entity.
If the entity already exists, update it by replacement. Otherwise, create it.
Args:
entity: The entity to upsert.
assume_exists: If True, the entity is assumed to exist. Will try to to update
the entity, and fall back to creating it if it does not exist,
and vice versa.
"""
if assume_exists:
try:
entity = self.update_entity(entity)
except RequestError as error:
if error.error_type == ErrorType.NOT_FOUND:
entity = self.create_entity(entity, entity.get_entity_type())
else:
raise error
else:
try:
entity = self.create_entity(entity, entity.get_entity_type())
except RequestError as error:
if error.error_type == ErrorType.ALREADY_EXISTS:
entity = self.update_entity(entity)
else:
raise error
return entity

def delete_entity(self, name: str) -> None:
"""
Delete one entity.
Expand Down Expand Up @@ -189,6 +219,7 @@ def bulk_create_entities(
entities: Sequence[Entity],
entity_type: str,
threads: int = 40,
upsert: bool = False,
) -> ResourceCreationResults[Entity]:
"""
Check if the provided entities exist, and create them if they don't.
Expand All @@ -197,6 +228,10 @@ def bulk_create_entities(
"""

def insert(entity: Entity) -> ResourceCreationStatus:
if upsert:
# Upsert entities assuming they already exist.
self.upsert_entity(entity=entity, assume_exists=True)
return ResourceCreationStatus.UPSERTED
# Optimistically insert the entity.
# If the entity already exists, we'll get an ALREADY_EXISTS error from the backend,
# which is handled appropriately by the bulk_insert function.
Expand Down
37 changes: 37 additions & 0 deletions exabel_data_sdk/client/api/relationship_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,38 @@ def update_relationship(
)
return Relationship.from_proto(response)

def upsert_relationship(
self, relationship: Relationship, assume_exists: bool = True
) -> Relationship:
"""
Upsert a relationship between two entities.
If the relationship already exists, update it by replacement. Otherwise, create it.
Args:
relationship: The relationship to upsert.
assume_exists: If True, the relationship is assumed to exist. Will try to to update
the relationship, and fall back to creating it if it does not exist,
and vice versa.
"""
if assume_exists:
try:
relationship = self.update_relationship(relationship)
except RequestError as error:
if error.error_type == ErrorType.NOT_FOUND:
relationship = self.create_relationship(relationship)
else:
raise error
else:
try:
relationship = self.create_relationship(relationship)
except RequestError as error:
if error.error_type == ErrorType.ALREADY_EXISTS:
relationship = self.update_relationship(relationship)
else:
raise error
return relationship

def delete_relationship(self, relationship_type: str, from_entity: str, to_entity: str) -> None:
"""
Delete a relationship.
Expand Down Expand Up @@ -277,13 +309,18 @@ def bulk_create_relationships(
self,
relationships: Sequence[Relationship],
threads: int = 40,
upsert: bool = False,
) -> ResourceCreationResults[Relationship]:
"""
Check if the provided relationships exist, and create them if they don't.
If the relationship already exists, it is not updated.
"""

def insert(relationship: Relationship) -> ResourceCreationStatus:
if upsert:
# Upsert relationships assuming they already exist.
self.upsert_relationship(relationship=relationship, assume_exists=True)
return ResourceCreationStatus.UPSERTED
# Optimistically insert the relationship.
# If the relationship already exists, we'll get an ALREADY_EXISTS error from the
# backend, which is handled appropriately by the bulk_insert function.
Expand Down
25 changes: 18 additions & 7 deletions exabel_data_sdk/client/api/resource_creation_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ class ResourceCreationStatus(Enum):
# Denotes that creation failed.
FAILED = 3

# Denotes that a resource was upserted.
UPSERTED = 4


class ResourceCreationResult(Generic[TResource]):
"""
Expand Down Expand Up @@ -57,6 +60,7 @@ def __init__(
print_status: Whether to print status of the upload during processing.
abort_threshold: If the fraction of failed requests exceeds this threshold,
the upload is aborted.
upsert: Update resources that already exist.
"""
self.results: List[ResourceCreationResult[TResource]] = []
self.counter: Counter = Counter()
Expand Down Expand Up @@ -117,9 +121,12 @@ def check_failures(self) -> None:

def print_summary(self) -> None:
"""Prints a human legible summary of the resource creation results to screen."""
print(self.counter[ResourceCreationStatus.CREATED], "new resources created")
if self.counter[ResourceCreationStatus.CREATED]:
print(self.counter[ResourceCreationStatus.CREATED], "new resources created")
if self.counter[ResourceCreationStatus.EXISTS]:
print(self.counter[ResourceCreationStatus.EXISTS], "resources already existed")
if self.counter[ResourceCreationStatus.UPSERTED]:
print(self.counter[ResourceCreationStatus.UPSERTED], "resources upserted")
if self.counter[ResourceCreationStatus.FAILED]:
print(self.counter[ResourceCreationStatus.FAILED], "resources failed:")
for result in self.results:
Expand All @@ -134,13 +141,17 @@ def print_status(self) -> None:
Note that the previous status message is overwritten (by writing '\r'),
but this only works if nothing else has been printed to stdout since the last update.
"""
message_parts = []
fraction_complete = self.count() / self.total_count
sys.stdout.write(
f"\r{fraction_complete:.0%} - "
f"{self.count(ResourceCreationStatus.CREATED)} created, "
f"{self.count(ResourceCreationStatus.EXISTS)} exists, "
f"{self.count(ResourceCreationStatus.FAILED)} failed"
)
message_parts.append(f"\r{fraction_complete:.0%} - ")
if self.counter[ResourceCreationStatus.CREATED]:
message_parts.append(f"{self.count(ResourceCreationStatus.CREATED)} created, ")
if self.counter[ResourceCreationStatus.UPSERTED]:
message_parts.append(f"{self.count(ResourceCreationStatus.UPSERTED)} upserted, ")
if self.counter[ResourceCreationStatus.EXISTS]:
message_parts.append(f"{self.count(ResourceCreationStatus.EXISTS)} exists, ")
message_parts.append(f"{self.count(ResourceCreationStatus.FAILED)} failed")
sys.stdout.write("".join(message_parts))
if fraction_complete == 1:
sys.stdout.write("\n")
sys.stdout.flush()
2 changes: 1 addition & 1 deletion exabel_data_sdk/client/api/time_series_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ def insert(ts: pd.Series) -> ResourceCreationStatus:
existed = self.upsert_time_series(
str(ts.name), ts, create_tag=create_tag, default_known_time=default_known_time
)
return ResourceCreationStatus.EXISTS if existed else ResourceCreationStatus.CREATED
return ResourceCreationStatus.UPSERTED if existed else ResourceCreationStatus.CREATED

return bulk_insert(series, insert, threads=threads)

Expand Down
11 changes: 10 additions & 1 deletion exabel_data_sdk/scripts/load_entities_from_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ def __init__(self, argv: Sequence[str], description: str):
help="The column name for the entity description. "
"If not specified, no description is provided.",
)
self.parser.add_argument(
"--upsert",
required=False,
action="store_true",
default=False,
help="Update entities if they already exist.",
)

def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None:

Expand Down Expand Up @@ -101,7 +108,9 @@ def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None:
return

try:
client.entity_api.bulk_create_entities(entities, entity_type_name, threads=args.threads)
client.entity_api.bulk_create_entities(
entities, entity_type_name, threads=args.threads, upsert=args.upsert
)
except BulkInsertFailedError:
# An error summary has already been printed.
pass
Expand Down
11 changes: 10 additions & 1 deletion exabel_data_sdk/scripts/load_relationships_from_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ def __init__(self, argv: Sequence[str], description: str):
help="The column name for the relationship description. "
"If not specified, no description is provided.",
)
self.parser.add_argument(
"--upsert",
required=False,
action="store_true",
default=False,
help="Update relationships if they already exist.",
)

def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None:

Expand Down Expand Up @@ -130,7 +137,9 @@ def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None:
return

try:
client.relationship_api.bulk_create_relationships(relationships, threads=args.threads)
client.relationship_api.bulk_create_relationships(
relationships, threads=args.threads, upsert=args.upsert
)
except BulkInsertFailedError:
# An error summary has already been printed.
pass
Expand Down
3 changes: 2 additions & 1 deletion exabel_data_sdk/tests/client/api/mock_entity_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ def create_entity(self, entity: Entity, entity_type: str) -> Entity:
return self.entities.create(entity)

def update_entity(self, entity: Entity, update_mask: FieldMask = None) -> Entity:
raise NotImplementedError()
# Note: The mock implementation ignores update_mask
return self.entities.update(entity)

def delete_entity(self, name: str) -> None:
# Note: The mock implementation does not delete associated time series and relationships
Expand Down
3 changes: 2 additions & 1 deletion exabel_data_sdk/tests/client/api/mock_relationship_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ def create_relationship(self, relationship: Relationship) -> Relationship:
def update_relationship(
self, relationship: Relationship, update_mask: FieldMask = None
) -> Relationship:
raise NotImplementedError()
# Note: The mock implementation ignores update_mask
return self.relationships.update(relationship, self._key(relationship))

def delete_relationship(self, relationship_type: str, from_entity: str, to_entity: str) -> None:
self.relationships.delete((relationship_type, from_entity, to_entity))
Expand Down
20 changes: 20 additions & 0 deletions exabel_data_sdk/tests/client/api/mock_resource_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,26 @@ def create(self, resource: TResource, key: object = None) -> TResource:
self.resources[key] = resource
return resource

@failure_prone
def update(self, resource: TResource, key: object = None) -> TResource:
"""
Update the given resource in the store.
Typically, resources have a resource name, which should be used as the key.
In this case, the key parameter should not be set.
If not, as in the case of Relationship resources, an explicit key must be provided.
Args:
resource: the resource to create in the store
key: the key of the resource. Defaults to the resource's name.
"""
if key is None:
key = resource.name # type: ignore[attr-defined]
if key not in self.resources:
raise RequestError(ErrorType.NOT_FOUND, f"Does not exist: {key}")
self.resources[key] = resource
return resource

def delete(self, key: object):
"""Delete the resource with the given key."""
if key in self.resources:
Expand Down
38 changes: 38 additions & 0 deletions exabel_data_sdk/tests/client/api/test_entity_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import unittest

from exabel_data_sdk.client.api.data_classes.entity import Entity
from exabel_data_sdk.client.api.entity_api import EntityApi
from exabel_data_sdk.tests.client.api.mock_entity_api import MockEntityApi


class TestEntityApi(unittest.TestCase):
def test_upsert(self):
for assume_exists in (True, False):
entity_api: EntityApi = MockEntityApi()
expected = Entity(
name="entityTypes/company/entities/Amazon",
display_name="Amazon",
)
created_entity = entity_api.upsert_entity(expected, assume_exists)
self.assertEqual(expected, created_entity)
updated_entity = entity_api.upsert_entity(expected, assume_exists)
self.assertEqual(expected, updated_entity)

def test_upsert_replaces_resource(self):
for assume_exists in (True, False):
entity_api: EntityApi = MockEntityApi()
old_entity = Entity(
name="entityTypes/company/entities/Amazon",
display_name="Amazon's old display name",
description="Amazon's old description",
properties={"old_property": "old_value"},
)
expected = Entity(
name="entityTypes/company/entities/Amazon",
display_name="Amazon",
description="Amazon's new description",
)
entity_api.create_entity(old_entity, old_entity.get_entity_type())
entity_api.upsert_entity(expected, assume_exists)
actual_entity = entity_api.get_entity(expected.name)
self.assertEqual(expected, actual_entity)
47 changes: 47 additions & 0 deletions exabel_data_sdk/tests/client/api/test_relationship_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import unittest

from exabel_data_sdk.client.api.data_classes.relationship import Relationship
from exabel_data_sdk.client.api.relationship_api import RelationshipApi
from exabel_data_sdk.tests.client.api.mock_relationship_api import MockRelationshipApi

AMAZON = "entityTypes/company/entities/Amazon"
USA = "entityTypes/country/entities/USA"
RELATIONHIP_TYPE = "relationshipTypes/LOCATED_IN"


class TestRelationshipApi(unittest.TestCase):
def test_upsert(self):
for assume_exists in (True, False):
relationship_api: RelationshipApi = MockRelationshipApi()
expected = Relationship(
relationship_type=RELATIONHIP_TYPE,
from_entity=AMAZON,
to_entity=USA,
)
created_relationship = relationship_api.upsert_relationship(expected, assume_exists)
self.assertEqual(expected, created_relationship)
updated_relationship = relationship_api.upsert_relationship(expected, assume_exists)
self.assertEqual(expected, updated_relationship)

def test_upsert_replaces_resource(self):
for assume_exists in (True, False):
relationship_api: RelationshipApi = MockRelationshipApi()
old_relationship = Relationship(
relationship_type=RELATIONHIP_TYPE,
from_entity=AMAZON,
to_entity=USA,
description="Old relationship description",
properties={"old_property": "old_value"},
)
expected = Relationship(
relationship_type=RELATIONHIP_TYPE,
from_entity=AMAZON,
to_entity=USA,
description="New relationship description",
)
relationship_api.create_relationship(old_relationship)
relationship_api.upsert_relationship(expected, assume_exists)
actual_relationship = relationship_api.get_relationship(
expected.relationship_type, expected.from_entity, expected.to_entity
)
self.assertEqual(expected, actual_relationship)
2 changes: 1 addition & 1 deletion exabel_data_sdk/tests/resources/data/entities.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
brand,description
Spring & Vine,Shampoo bars
The Coconut Tree,Sri Lankan street food
Spring & Vine,This entry will be ignored because it's a duplicate
Spring & Vine,This entry might be ignored because it's a duplicate
2 changes: 1 addition & 1 deletion exabel_data_sdk/tests/resources/data/relationships.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
entity_from,brand,description
entityTypes/company/company_x,Spring & Vine,Owned since 2019
entityTypes/company/company_x,Spring & Vine,This entry will be ignored because it's a duplicate
entityTypes/company/company_x,Spring & Vine,This entry might be ignored because it's a duplicate
entityTypes/company/company_y,The Coconut Tree,Acquired for $200M
6 changes: 4 additions & 2 deletions exabel_data_sdk/tests/scripts/common_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
from exabel_data_sdk.tests.client.exabel_mock_client import ExabelMockClient


def load_test_data_from_csv(csv_script: Type[CsvScript], args: Sequence[str]) -> ExabelClient:
def load_test_data_from_csv(
csv_script: Type[CsvScript], args: Sequence[str], client: ExabelClient = None
) -> ExabelClient:
"""Loads entities to an ExabelMockClient using exabel_data_sdk.scripts.load_entities_from_csv"""
script = csv_script(args, f"Test{type(csv_script).__name__}")
client = ExabelMockClient()
client = client or ExabelMockClient()
script.run_script(client, script.parse_arguments())

return client
Loading

0 comments on commit dfb6393

Please sign in to comment.