Skip to content

Commit

Permalink
v0.0.25 (#46)
Browse files Browse the repository at this point in the history
* Parallel upload requests for faster processing
* Automatic retries of failed requests
* Support mic:ticker company identifiers
* Verify that there are no resource name collisions introduced by identifier normalization
* Read integer only identifier columns as string
* Prefix relationship_type command line argument with namespace
  • Loading branch information
grotmol authored Oct 18, 2021
1 parent 6f13c86 commit f7786d5
Show file tree
Hide file tree
Showing 29 changed files with 896 additions and 148 deletions.
1 change: 1 addition & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ disable=duplicate-code,
no-self-use,
too-few-public-methods,
too-many-arguments,
too-many-branches,
too-many-locals,
too-many-return-statements,
too-many-instance-attributes,
Expand Down
3 changes: 2 additions & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
0.0.24
0.0.25

97 changes: 97 additions & 0 deletions exabel_data_sdk/client/api/bulk_insert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from concurrent.futures.thread import ThreadPoolExecutor
from time import time
from typing import Callable, Sequence

from exabel_data_sdk.client.api.data_classes.request_error import ErrorType, RequestError
from exabel_data_sdk.client.api.resource_creation_result import (
ResourceCreationResult,
ResourceCreationResults,
ResourceCreationStatus,
TResource,
)


def _process(
results: ResourceCreationResults[TResource],
resource: TResource,
insert_func: Callable[[TResource], ResourceCreationStatus],
) -> None:
"""
Insert the given resource using the provided function.
Catches and handles RequestErrors.
Args:
results: the result set to append to
resource: the resource to be inserted
insert_func: the function to use to insert the resource
"""
try:
status = insert_func(resource)
results.add(ResourceCreationResult(status, resource))
except RequestError as error:
status = (
ResourceCreationStatus.EXISTS
if error.error_type == ErrorType.ALREADY_EXISTS
else ResourceCreationStatus.FAILED
)
results.add(ResourceCreationResult(status, resource, error))


def _bulk_insert(
results: ResourceCreationResults[TResource],
resources: Sequence[TResource],
insert_func: Callable[[TResource], ResourceCreationStatus],
threads: int = 40,
) -> None:
"""
Calls the provided insert function with each of the provided resources,
while catching errors and tracking progress.
Args:
results: add the results to this result set
resources: the resources to be inserted
insert_func: the function to call for each insert.
threads: the number of parallel upload threads to use
"""
if threads == 1:
for resource in resources:
_process(results, resource, insert_func)
else:
with ThreadPoolExecutor(max_workers=threads) as executor:
for resource in resources:
executor.submit(_process, results, resource, insert_func)


def bulk_insert(
resources: Sequence[TResource],
insert_func: Callable[[TResource], ResourceCreationStatus],
retries: int = 2,
threads: int = 40,
) -> ResourceCreationResults[TResource]:
"""
Calls the provided insert function with each of the provided resources,
while catching errors and tracking progress.
Args:
resources: the resources to be inserted
insert_func: the function to call for each insert.
retries: the maximum number of retries to make for each failed request
threads: the number of parallel upload threads to use
Returns:
the result set showing the current status for each insert
"""
start_time = time()
results: ResourceCreationResults[TResource] = ResourceCreationResults(len(resources))
for trial in range(retries + 1):
if trial > 0:
failures = results.extract_retryable_failures()
if not failures:
break
resources = [result.resource for result in failures]
print(f"Retry #{trial} with {len(resources)} resources:")
_bulk_insert(results, resources, insert_func, threads=threads)
spent_time = int(time() - start_time)
print(f"Spent {spent_time} seconds loading {len(resources)} resources ({threads} threads)")
results.print_summary()
return results
14 changes: 14 additions & 0 deletions exabel_data_sdk/client/api/data_classes/entity.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
from typing import Mapping, Union

from exabel_data_sdk.client.api.proto_utils import from_struct, to_struct
Expand Down Expand Up @@ -89,3 +90,16 @@ def __repr__(self) -> str:
f"description='{self.description}', properties={self.properties}, "
f"read_only={self.read_only})"
)

def __lt__(self, other: object) -> bool:
if not isinstance(other, Entity):
raise ValueError(f"Cannot compare Entity to non-Entity: {other}")
return self.name < other.name

def get_entity_type(self) -> str:
"""Extracts the entity type name from the entity's resource name."""
p = re.compile(r"(entityTypes/[a-zA-Z0-9_\-.]+)/entities/[a-zA-Z0-9_\-.]+")
m = p.match(self.name)
if m:
return m.group(1)
raise ValueError(f"Could not parse entity resource name: {self.name}")
7 changes: 6 additions & 1 deletion exabel_data_sdk/client/api/data_classes/request_error.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from enum import Enum
from enum import Enum, unique


@unique
class ErrorType(Enum):
"""
Error types.
Expand All @@ -24,6 +25,10 @@ class ErrorType(Enum):
# Any internal error.
INTERNAL = 10

def retryable(self) -> bool:
"""Return whether it makes sense to retry the request if this error is given."""
return self in (ErrorType.UNAVAILABLE, ErrorType.TIMEOUT, ErrorType.INTERNAL)


class RequestError(Exception):
"""
Expand Down
34 changes: 12 additions & 22 deletions exabel_data_sdk/client/api/entity_api.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from typing import Callable, Optional, Sequence
from typing import Optional, Sequence

from google.protobuf.field_mask_pb2 import FieldMask

from exabel_data_sdk.client.api.api_client.grpc.entity_grpc_client import EntityGrpcClient
from exabel_data_sdk.client.api.api_client.http.entity_http_client import EntityHttpClient
from exabel_data_sdk.client.api.bulk_insert import bulk_insert
from exabel_data_sdk.client.api.data_classes.entity import Entity
from exabel_data_sdk.client.api.data_classes.entity_type import EntityType
from exabel_data_sdk.client.api.data_classes.paging_result import PagingResult
from exabel_data_sdk.client.api.data_classes.request_error import ErrorType, RequestError
from exabel_data_sdk.client.api.resource_creation_result import (
ResourceCreationResult,
ResourceCreationResults,
ResourceCreationStatus,
)
Expand Down Expand Up @@ -188,29 +188,19 @@ def bulk_create_entities(
self,
entities: Sequence[Entity],
entity_type: str,
status_callback: Callable[[ResourceCreationResults, int], None] = None,
threads: int = 40,
) -> ResourceCreationResults[Entity]:
"""
Check if the provided entities exist, and create them if they don't.
All entities must be of the given entity_type.
If an entity with the given name already exists, it is not updated.
"""

def insert(entity: Entity) -> ResourceCreationStatus:
# Optimistically insert the entity.
# If the entity already exists, we'll get an ALREADY_EXISTS error from the backend,
# which is handled appropriately by the bulk_insert function.
self.create_entity(entity=entity, entity_type=entity_type)
return ResourceCreationStatus.CREATED

Optionally, a callback can be provided to track the progress.
The callback is called after every 10th entity is processed.
"""
results: ResourceCreationResults[Entity] = ResourceCreationResults()
for entity in entities:
try:
existing_entity = self.get_entity(entity.name)
if existing_entity is None:
new_entity = self.create_entity(entity=entity, entity_type=entity_type)
results.add(ResourceCreationResult(ResourceCreationStatus.CREATED, new_entity))
else:
results.add(
ResourceCreationResult(ResourceCreationStatus.EXISTS, existing_entity)
)
except RequestError as error:
results.add(ResourceCreationResult(ResourceCreationStatus.FAILED, entity, error))
if status_callback and (results.count() % 10 == 0 or results.count() == len(entities)):
status_callback(results, len(entities))
return results
return bulk_insert(entities, insert, threads=threads)
43 changes: 12 additions & 31 deletions exabel_data_sdk/client/api/relationship_api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Callable, Optional, Sequence
from typing import Optional, Sequence

from google.protobuf.field_mask_pb2 import FieldMask

Expand All @@ -8,12 +8,12 @@
from exabel_data_sdk.client.api.api_client.http.relationship_http_client import (
RelationshipHttpClient,
)
from exabel_data_sdk.client.api.bulk_insert import bulk_insert
from exabel_data_sdk.client.api.data_classes.paging_result import PagingResult
from exabel_data_sdk.client.api.data_classes.relationship import Relationship
from exabel_data_sdk.client.api.data_classes.relationship_type import RelationshipType
from exabel_data_sdk.client.api.data_classes.request_error import ErrorType, RequestError
from exabel_data_sdk.client.api.resource_creation_result import (
ResourceCreationResult,
ResourceCreationResults,
ResourceCreationStatus,
)
Expand Down Expand Up @@ -276,37 +276,18 @@ def relationship_exists(self, relationship_type: str, from_entity: str, to_entit
def bulk_create_relationships(
self,
relationships: Sequence[Relationship],
status_callback: Callable[[ResourceCreationResults, int], None] = None,
threads: int = 40,
) -> ResourceCreationResults[Relationship]:
"""
Check if the provided relationships exist, and create them if they don't.
If the relationship already exists, it is not updated.
Optionally, a callback can be provided to track the progress.
The callback is called after every 10th relationship is processed.
"""
total = len(relationships)
results: ResourceCreationResults[Relationship] = ResourceCreationResults()
for relationship in relationships:
try:
existing_relationship = self.get_relationship(
relationship_type=relationship.relationship_type,
from_entity=relationship.from_entity,
to_entity=relationship.to_entity,
)
if existing_relationship is None:
new_relationship = self.create_relationship(relationship=relationship)
results.add(
ResourceCreationResult(ResourceCreationStatus.CREATED, new_relationship)
)
else:
results.add(
ResourceCreationResult(ResourceCreationStatus.EXISTS, existing_relationship)
)
except RequestError as error:
results.add(
ResourceCreationResult(ResourceCreationStatus.FAILED, relationship, error)
)
if status_callback and (results.count() % 10 == 0 or results.count() == total):
status_callback(results, total)
return results

def insert(relationship: Relationship) -> ResourceCreationStatus:
# Optimistically insert the relationship.
# If the relationship already exists, we'll get an ALREADY_EXISTS error from the
# backend, which is handled appropriately by the bulk_insert function.
self.create_relationship(relationship=relationship)
return ResourceCreationStatus.CREATED

return bulk_insert(relationships, insert, threads=threads)
90 changes: 65 additions & 25 deletions exabel_data_sdk/client/api/resource_creation_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,29 @@ class ResourceCreationResults(Generic[TResource]):
Class for returning resource creation results.
"""

def __init__(self) -> None:
def __init__(
self, total_count: int, print_status: bool = True, abort_threshold: float = 0.5
) -> None:
"""
Args:
total_count: The total number of resources expected to be loaded.
print_status: Whether to print status of the upload during processing.
abort_threshold: If the fraction of failed requests exceeds this threshold,
the upload is aborted, and the script exits.
Note that this only happens if print_status is set to True.
"""
self.results: List[ResourceCreationResult[TResource]] = []
self.counter: Counter = Counter()
self.total_count = total_count
self.do_print_status = print_status
self.abort_threshold = abort_threshold

def add(self, result: ResourceCreationResult[TResource]) -> None:
"""Add the result for a resource."""
self.results.append(result)
self.counter.update([result.status])
if self.do_print_status and (self.count() % 20 == 0 or self.count() == self.total_count):
self.print_status()

def count(self, status: ResourceCreationStatus = None) -> int:
"""
Expand All @@ -64,37 +79,62 @@ def count(self, status: ResourceCreationStatus = None) -> int:
"""
return len(self.results) if status is None else self.counter[status]

def extract_retryable_failures(self) -> List[ResourceCreationResult[TResource]]:
"""
Remove all retryable failures from this result set,
and return them.
"""
failed = []
rest = []
for result in self.results:
if (
result.status == ResourceCreationStatus.FAILED
and result.error
and result.error.error_type.retryable()
):
failed.append(result)
else:
rest.append(result)
self.counter.subtract([result.status for result in failed])
self.results = rest
return failed

def print_summary(self) -> None:
"""Prints a human legible summary of the resource creation results to screen."""
print(self.counter[ResourceCreationStatus.CREATED], "new resources created")
print(self.counter[ResourceCreationStatus.EXISTS], "resources already existed")
if self.counter[ResourceCreationStatus.EXISTS]:
print(self.counter[ResourceCreationStatus.EXISTS], "resources already existed")
if self.counter[ResourceCreationStatus.FAILED]:
print(self.counter[ResourceCreationStatus.FAILED], "resources failed:")
for result in self.results:
if result.status == ResourceCreationStatus.FAILED:
print(" ", result.resource, ":\n ", result.error)

def print_status(self) -> None:
"""
Prints a status update on the progress of the data loading, showing the percentage complete
and how many objects were created, already existed or failed.
def status_callback(results: ResourceCreationResults, total_count: int) -> None:
"""
Prints a status update on the progress of the data loading, showing the percentage complete
and how many objects were created, already existed or failed.
Note that the previous status message is overwritten (by writing '\r'),
but this only works if nothing else has been printed to stdout since the last update.
"""
fraction_complete = results.count() / total_count
sys.stdout.write(
f"\r{fraction_complete:.0%} - "
f"{results.count(ResourceCreationStatus.CREATED)} created, "
f"{results.count(ResourceCreationStatus.EXISTS)} exists, "
f"{results.count(ResourceCreationStatus.FAILED)} failed"
)
if fraction_complete == 1:
sys.stdout.write("\n")
fraction_error = results.count(ResourceCreationStatus.FAILED) / results.count()
if fraction_error > 0.5:
sys.stdout.write("\nAborting - more than half the requests are failing.\n")
results.print_summary()
sys.exit(-1)
sys.stdout.flush()
Note that the previous status message is overwritten (by writing '\r'),
but this only works if nothing else has been printed to stdout since the last update.
"""
fraction_complete = self.count() / self.total_count
sys.stdout.write(
f"\r{fraction_complete:.0%} - "
f"{self.count(ResourceCreationStatus.CREATED)} created, "
f"{self.count(ResourceCreationStatus.EXISTS)} exists, "
f"{self.count(ResourceCreationStatus.FAILED)} failed"
)
if fraction_complete == 1:
sys.stdout.write("\n")
else:
fraction_error = self.count(ResourceCreationStatus.FAILED) / self.count()
if fraction_error > self.abort_threshold:
sys.stdout.write(
f"\nAborting - more than {self.abort_threshold:.0%} "
"of the requests are failing.\n"
)
self.print_summary()
sys.stdout.flush()
sys.exit(1)
sys.stdout.flush()
Loading

0 comments on commit f7786d5

Please sign in to comment.