Skip to content

Commit

Permalink
Bulk import: Fix for abort functionality (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
hallarak authored Oct 29, 2021
1 parent f3a656e commit f0042ba
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 31 deletions.
1 change: 1 addition & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ disable=duplicate-code,
too-many-locals,
too-many-return-statements,
too-many-instance-attributes,
too-many-statements,

[BASIC]

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.0.26
1.0.0
58 changes: 45 additions & 13 deletions exabel_data_sdk/client/api/bulk_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,15 @@
)


class BulkInsertFailedError(Exception):
"""Error indicating that the bulk insert failed."""


def _process(
results: ResourceCreationResults[TResource],
resource: TResource,
insert_func: Callable[[TResource], ResourceCreationStatus],
abort: Callable,
) -> None:
"""
Insert the given resource using the provided function.
Expand All @@ -24,7 +29,12 @@ def _process(
results: the result set to append to
resource: the resource to be inserted
insert_func: the function to use to insert the resource
abort: the function to call when the insert is aborted
"""
if results.abort:
abort()
return

try:
status = insert_func(resource)
results.add(ResourceCreationResult(status, resource))
Expand All @@ -37,6 +47,11 @@ def _process(
results.add(ResourceCreationResult(status, resource, error))


def _raise_error() -> None:
"""Raise a BulkInsertFailedError."""
raise BulkInsertFailedError()


def _bulk_insert(
results: ResourceCreationResults[TResource],
resources: Sequence[TResource],
Expand All @@ -55,11 +70,24 @@ def _bulk_insert(
"""
if threads == 1:
for resource in resources:
_process(results, resource, insert_func)
_process(results, resource, insert_func, _raise_error)

else:
with ThreadPoolExecutor(max_workers=threads) as executor:
for resource in resources:
executor.submit(_process, results, resource, insert_func)
if not results.abort:
executor.submit(
_process,
results,
resource,
insert_func,
# Python 3.9 added support for the shutdown argument 'cancel_futures'.
# We should set this argument to True once we have moved to this python
# version.
lambda: executor.shutdown(wait=False),
)
if results.abort:
raise BulkInsertFailedError()


def bulk_insert(
Expand All @@ -72,6 +100,8 @@ def bulk_insert(
Calls the provided insert function with each of the provided resources,
while catching errors and tracking progress.
Raises a BulkInsertFailedError if more than 50% of the resources fail to insert.
Args:
resources: the resources to be inserted
insert_func: the function to call for each insert.
Expand All @@ -83,15 +113,17 @@ def bulk_insert(
"""
start_time = time()
results: ResourceCreationResults[TResource] = ResourceCreationResults(len(resources))
for trial in range(retries + 1):
if trial > 0:
failures = results.extract_retryable_failures()
if not failures:
break
resources = [result.resource for result in failures]
print(f"Retry #{trial} with {len(resources)} resources:")
_bulk_insert(results, resources, insert_func, threads=threads)
spent_time = int(time() - start_time)
print(f"Spent {spent_time} seconds loading {len(resources)} resources ({threads} threads)")
results.print_summary()
try:
for trial in range(retries + 1):
if trial > 0:
failures = results.extract_retryable_failures()
if not failures:
break
resources = [result.resource for result in failures]
print(f"Retry #{trial} with {len(resources)} resources:")
_bulk_insert(results, resources, insert_func, threads=threads)
finally:
spent_time = int(time() - start_time)
print(f"Spent {spent_time} seconds loading {len(resources)} resources ({threads} threads)")
results.print_summary()
return results
30 changes: 18 additions & 12 deletions exabel_data_sdk/client/api/resource_creation_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,23 @@ def __init__(
total_count: The total number of resources expected to be loaded.
print_status: Whether to print status of the upload during processing.
abort_threshold: If the fraction of failed requests exceeds this threshold,
the upload is aborted, and the script exits.
Note that this only happens if print_status is set to True.
the upload is aborted.
"""
self.results: List[ResourceCreationResult[TResource]] = []
self.counter: Counter = Counter()
self.total_count = total_count
self.do_print_status = print_status
self.abort_threshold = abort_threshold
self.abort = False

def add(self, result: ResourceCreationResult[TResource]) -> None:
"""Add the result for a resource."""
self.results.append(result)
self.counter.update([result.status])
if self.do_print_status and (self.count() % 20 == 0 or self.count() == self.total_count):
self.print_status()
if self.count() % 20 == 0 and self.count() != self.total_count:
self.check_failures()

def count(self, status: ResourceCreationStatus = None) -> int:
"""
Expand Down Expand Up @@ -99,6 +101,20 @@ def extract_retryable_failures(self) -> List[ResourceCreationResult[TResource]]:
self.results = rest
return failed

def check_failures(self) -> None:
"""
Set the member field 'abort' to True if the fraction of errors exceeds the abort threshold.
"""
fraction_error = self.count(ResourceCreationStatus.FAILED) / self.count()
if fraction_error > self.abort_threshold and not self.abort:
self.abort = True
if self.do_print_status:
sys.stdout.write(
f"\nAborting - more than {self.abort_threshold:.0%} "
"of the requests are failing.\n"
)
sys.stdout.flush()

def print_summary(self) -> None:
"""Prints a human legible summary of the resource creation results to screen."""
print(self.counter[ResourceCreationStatus.CREATED], "new resources created")
Expand Down Expand Up @@ -127,14 +143,4 @@ def print_status(self) -> None:
)
if fraction_complete == 1:
sys.stdout.write("\n")
else:
fraction_error = self.count(ResourceCreationStatus.FAILED) / self.count()
if fraction_error > self.abort_threshold:
sys.stdout.write(
f"\nAborting - more than {self.abort_threshold:.0%} "
"of the requests are failing.\n"
)
self.print_summary()
sys.stdout.flush()
sys.exit(1)
sys.stdout.flush()
7 changes: 6 additions & 1 deletion exabel_data_sdk/scripts/load_entities_from_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Sequence

from exabel_data_sdk import ExabelClient
from exabel_data_sdk.client.api.bulk_insert import BulkInsertFailedError
from exabel_data_sdk.client.api.data_classes.entity import Entity
from exabel_data_sdk.scripts.csv_script import CsvScript
from exabel_data_sdk.util.resource_name_normalization import normalize_resource_name
Expand Down Expand Up @@ -99,7 +100,11 @@ def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None:
print(entities)
return

client.entity_api.bulk_create_entities(entities, entity_type_name, threads=args.threads)
try:
client.entity_api.bulk_create_entities(entities, entity_type_name, threads=args.threads)
except BulkInsertFailedError:
# An error summary has already been printed.
pass


if __name__ == "__main__":
Expand Down
7 changes: 6 additions & 1 deletion exabel_data_sdk/scripts/load_relationships_from_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Sequence

from exabel_data_sdk import ExabelClient
from exabel_data_sdk.client.api.bulk_insert import BulkInsertFailedError
from exabel_data_sdk.client.api.data_classes.relationship import Relationship
from exabel_data_sdk.client.api.data_classes.relationship_type import RelationshipType
from exabel_data_sdk.scripts.csv_script import CsvScript
Expand Down Expand Up @@ -124,7 +125,11 @@ def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None:
print(relationships)
return

client.relationship_api.bulk_create_relationships(relationships, threads=args.threads)
try:
client.relationship_api.bulk_create_relationships(relationships, threads=args.threads)
except BulkInsertFailedError:
# An error summary has already been printed.
pass


if __name__ == "__main__":
Expand Down
11 changes: 8 additions & 3 deletions exabel_data_sdk/scripts/load_time_series_from_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from dateutil import tz

from exabel_data_sdk import ExabelClient
from exabel_data_sdk.client.api.bulk_insert import BulkInsertFailedError
from exabel_data_sdk.client.api.data_classes.signal import Signal
from exabel_data_sdk.scripts.csv_script import CsvScript
from exabel_data_sdk.util.resource_name_normalization import (
Expand Down Expand Up @@ -140,9 +141,13 @@ def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None:
print(f" {ts.name}")
return

client.time_series_api.bulk_upsert_time_series(
series, create_tag=True, threads=args.threads
)
try:
client.time_series_api.bulk_upsert_time_series(
series, create_tag=True, threads=args.threads
)
except BulkInsertFailedError:
# An error summary has already been printed.
pass


if __name__ == "__main__":
Expand Down

0 comments on commit f0042ba

Please sign in to comment.