Skip to content

Commit

Permalink
Release 5.2.0 (#163)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivarurdalen authored May 28, 2024
1 parent 21670be commit 653d0a4
Show file tree
Hide file tree
Showing 47 changed files with 1,208 additions and 725 deletions.
2 changes: 1 addition & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pandas = "*"
protobuf = "*"
pyathena = "*"
pyarrow = "*"
pylint = "<3.0.0"
pylint = "*"
requests = "*"
setuptools = "*"
snowflake-connector-python = "*"
Expand Down
981 changes: 454 additions & 527 deletions Pipfile.lock

Large diffs are not rendered by default.

48 changes: 47 additions & 1 deletion exabel_data_sdk/client/api/api_client/grpc/base_grpc_client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
import json

import grpc

from exabel_data_sdk.client.api.api_client.exabel_api_group import ExabelApiGroup
from exabel_data_sdk.client.client_config import ClientConfig

SIXTEEN_MEGABYTES_IN_BYTES = 16 * 1024 * 1024

# Retry parameters
DEFAULT_MAX_ATTEMPTS = 5
DEFAULT_INITIAL_BACKOFF_SECONDS = 0.1
DEFAULT_MAX_BACKOFF_SECONDS = 5
DEFAULT_BACKOFF_MULTIPLIER = 2


class BaseGrpcClient:
"""
Expand All @@ -18,7 +26,10 @@ def __init__(self, config: ClientConfig, api_group: ExabelApiGroup):
"target": f"{api_group.get_host(config)}:{api_group.get_port(config)}",
# When importing time series, we may receive a large amount of precondition failure
# violations in the trailing metadata, therefore we increase the maximum metadata size.
"options": (("grpc.max_metadata_size", SIXTEEN_MEGABYTES_IN_BYTES),),
"options": (
("grpc.max_metadata_size", SIXTEEN_MEGABYTES_IN_BYTES),
("grpc.service_config", self._get_service_config()),
),
}
if config.api_key == "NO_KEY":
# Use an insecure channel. This can be used for local testing.
Expand All @@ -34,3 +45,38 @@ def __init__(self, config: ClientConfig, api_group: ExabelApiGroup):
)
for header in self.config.extra_headers:
self.metadata.append(header)

def _get_service_config(
self,
max_attempts: int = DEFAULT_MAX_ATTEMPTS,
initial_backoff: float = DEFAULT_INITIAL_BACKOFF_SECONDS,
max_backoff: float = DEFAULT_MAX_BACKOFF_SECONDS,
backoff_multiplier: int = DEFAULT_BACKOFF_MULTIPLIER,
) -> str:
"""
Returns a JSON string to use for grpc.service_config option when creating a grpc channel
"""
return json.dumps(
{
"methodConfig": [
{
"name": [{}],
"retryPolicy": {
"maxAttempts": max_attempts,
"initialBackoff": f"{initial_backoff}s",
"maxBackoff": f"{max_backoff}s",
"backoffMultiplier": backoff_multiplier,
"retryableStatusCodes": [
"CANCELLED",
"UNKNOWN",
"DEADLINE_EXCEEDED",
"RESOURCE_EXHAUSTED",
"ABORTED",
"INTERNAL",
"UNAVAILABLE",
],
},
}
]
}
)
19 changes: 11 additions & 8 deletions exabel_data_sdk/client/api/bulk_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
)
from exabel_data_sdk.util.deprecate_arguments import deprecate_arguments
from exabel_data_sdk.util.import_ import get_batches_for_import
from exabel_data_sdk.util.logging_thread_pool_executor import LoggingThreadPoolExecutor

logger = logging.getLogger(__name__)

Expand All @@ -47,15 +48,13 @@ def __post_init__(self) -> None:
@overload
def get_time_series_result_from_violations(
resource: TimeSeries, violations: Mapping[str, Violation]
) -> ResourceCreationResult[TimeSeries]:
...
) -> ResourceCreationResult[TimeSeries]: ...

@staticmethod
@overload
def get_time_series_result_from_violations(
resource: pd.Series, violations: Mapping[str, Violation]
) -> ResourceCreationResult[pd.Series]:
...
) -> ResourceCreationResult[pd.Series]: ...

@staticmethod
def get_time_series_result_from_violations(
Expand Down Expand Up @@ -262,17 +261,21 @@ def _bulk_import(
)

else:
with ThreadPoolExecutor(max_workers=threads) as executor:
with (
LoggingThreadPoolExecutor(max_workers=threads)
if logger.getEffectiveLevel() == logging.DEBUG
else ThreadPoolExecutor(max_workers=threads)
) as executor:
for resource_batch in resource_batches:
if not results.abort:
# The generic type hints do not guarantee that TResource refers to the same
# class for each of the three parameters. This leads to mypy warnings for the
# following function call.
executor.submit(
_process,
results, # type: ignore[arg-type]
resource_batch, # type: ignore[arg-type]
import_func, # type: ignore[arg-type]
results,
resource_batch,
import_func,
# Python 3.9 added support for the shutdown argument 'cancel_futures'.
# We should set this argument to True once we have moved to this python
# version.
Expand Down
6 changes: 3 additions & 3 deletions exabel_data_sdk/client/api/bulk_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ def _bulk_insert(
# following function call.
executor.submit(
_process,
results, # type: ignore[arg-type]
resource, # type: ignore[arg-type]
insert_func, # type: ignore[arg-type]
results,
resource,
insert_func,
# Python 3.9 added support for the shutdown argument 'cancel_futures'.
# We should set this argument to True once we have moved to this python
# version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,11 @@ def from_proto(model_run: ProtoPredictionModelRun) -> "PredictionModelRun":
name=model_run.name,
description=model_run.description,
configuration=ModelConfiguration(model_run.configuration),
configuration_source=model_run.configuration_source
if model_run.HasField("configuration_source")
else None,
configuration_source=(
model_run.configuration_source
if model_run.HasField("configuration_source")
else None
),
auto_activate=model_run.auto_activate,
)

Expand Down
3 changes: 2 additions & 1 deletion exabel_data_sdk/client/api/data_classes/time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class Dimension(Enum):
DIMENSION_MASS = ProtoUnit.Dimension.DIMENSION_MASS
DIMENSION_LENGTH = ProtoUnit.Dimension.DIMENSION_LENGTH
DIMENSION_TIME = ProtoUnit.Dimension.DIMENSION_TIME
DIMENSION_RATIO = ProtoUnit.Dimension.DIMENSION_RATIO

@classmethod
def from_string(cls, dimension: str) -> Dimension:
Expand All @@ -32,7 +33,7 @@ def from_string(cls, dimension: str) -> Dimension:
except KeyError as e:
raise ValueError(
f"Unknown dimension: {dimension}. "
+ "Supported values are: unknown, currency, mass, length, time."
+ "Supported values are: unknown, currency, mass, length, time, and ratio."
) from e


Expand Down
3 changes: 1 addition & 2 deletions exabel_data_sdk/client/api/pageable_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ def _get_resource_iterator(
resource_count = 0
while True:
result = pageable_func(**kwargs, page_token=page_token)
for resource in result.results:
yield resource
yield from result.results
page_token = result.next_page_token
resource_count += len(result.results)
if resource_count >= result.total_size:
Expand Down
44 changes: 36 additions & 8 deletions exabel_data_sdk/client/api/resource_creation_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from exabel_data_sdk.services.csv_loading_constants import (
DEFAULT_ABORT_THRESHOLD,
DEFAULT_BULK_LOAD_CHECKPOINTS,
FAILURE_LOG_LIMIT,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -90,7 +91,11 @@ def get_printable_error(self) -> str:
Return a printable request error message if it is set, otherwise return the string
representation of the error
"""
return self.error.message if self.error and self.error.message else str(self.error)
return (
f"{self.error.error_type.name}: {self.error.message}"
if self.error and self.error.message
else str(self.error)
)


class ResourceCreationResults(Generic[ResourceT]):
Expand Down Expand Up @@ -165,7 +170,9 @@ def get_failures(self) -> Sequence[ResourceCreationResult[ResourceT]]:
"""Return all the failed results."""
return list(filter(lambda r: r.status == ResourceCreationStatus.FAILED, self.results))

def extract_retryable_failures(self) -> Sequence[ResourceCreationResult[ResourceT]]:
def extract_retryable_failures(
self, log_summary: bool = True
) -> Sequence[ResourceCreationResult[ResourceT]]:
"""
Remove all retryable failures from this result set,
and return them.
Expand All @@ -183,6 +190,14 @@ def extract_retryable_failures(self) -> Sequence[ResourceCreationResult[Resource
rest.append(result)
self.counter.subtract([result.status for result in failed])
self.results = rest

if log_summary and failed:
errors = [failure.error for failure in failed if failure.error]
error_types = Counter(error.error_type for error in errors)

logger.info("The following retryable failures were returned:")
for error_type, count in error_types.items():
logger.info("%d failures with error type: %s", count, error_type.name)
return failed

def check_failures(self) -> None:
Expand All @@ -198,7 +213,7 @@ def check_failures(self) -> None:
self.abort_threshold * 100,
)

def print_summary(self) -> None:
def print_summary(self, failure_log_limit: Optional[int] = FAILURE_LOG_LIMIT) -> None:
"""Prints a human legible summary of the resource creation results to screen."""
if self.counter[ResourceCreationStatus.CREATED]:
logger.info("%s new resources created", self.counter[ResourceCreationStatus.CREATED])
Expand All @@ -208,13 +223,26 @@ def print_summary(self) -> None:
logger.info("%s resources upserted", self.counter[ResourceCreationStatus.UPSERTED])
if self.counter[ResourceCreationStatus.FAILED]:
logger.warning("%s resources failed", self.counter[ResourceCreationStatus.FAILED])
for result in self.results:
if result.status == ResourceCreationStatus.FAILED:
failures = self.get_failures()
for i, failure in enumerate(failures):
if failure_log_limit and i > failure_log_limit:
logger.warning(
" %s\n %s",
result.get_printable_resource(),
result.get_printable_error(),
"%d resources failed. Only %d resources shown.",
len(failures),
failure_log_limit,
)
break
logger.warning(
" %s\n %s",
failure.get_printable_resource(),
failure.get_printable_error(),
)

errors = [failure.error for failure in failures if failure.error]
error_types = Counter(error.error_type for error in errors)
logger.warning("Summary of the errors for the failed resources:")
for error_type, count in error_types.items():
logger.warning(" %s: %d", error_type.name, count)

def print_status(self) -> None:
"""
Expand Down
6 changes: 3 additions & 3 deletions exabel_data_sdk/scripts/check_company_identifiers_in_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ def _process_entity_resource_names(
for identifier, entity in entity_resource_names.mapping.items()
)
df = pd.DataFrame(all_rows)
df.loc[
(df["entity"].duplicated(keep=False) & ~df["entity"].isna()), "warning"
] = "Multiple identifiers mapping to the same company"
df.loc[(df["entity"].duplicated(keep=False) & ~df["entity"].isna()), "warning"] = (
"Multiple identifiers mapping to the same company"
)
if not keep_all_identifiers:
df = df[df["warning"].notnull()]
df = df.fillna("").sort_values(["entity", identifier_type])
Expand Down
1 change: 1 addition & 0 deletions exabel_data_sdk/scripts/load_time_series_from_csv.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
This file is here to keep backwards capability with the old name of the time series import script.
"""

import sys

from exabel_data_sdk.scripts.load_time_series_from_file import LoadTimeSeriesFromFile
Expand Down
11 changes: 0 additions & 11 deletions exabel_data_sdk/scripts/load_time_series_metadata_from_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,6 @@ def __init__(self, argv: Sequence[str]):
"'known_time'. Take care to maintain correct casing in the file when using this "
"option.",
)
self.parser.add_argument(
"--unit-type",
required=False,
type=str,
default=None,
help=(
"The unit type of the time series. "
"One of 'currency', 'time', 'mass', 'length' or 'unknown'."
),
)

def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None:
try:
Expand All @@ -119,7 +109,6 @@ def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None:
batch_size=args.batch_size,
skip_validation=args.skip_validation,
case_sensitive_signals=args.case_sensitive_signals,
unit_type=args.unit_type,
)

except FileLoadingException as e:
Expand Down
2 changes: 1 addition & 1 deletion exabel_data_sdk/scripts/upsert_time_series_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def __init__(self, argv: Sequence[str]):
default=None,
help=(
"The unit type of the time series. "
"One of 'unknown', 'currency', 'mass', 'length', 'time'."
"One of 'unknown', 'currency', 'mass', 'length', 'time', 'ratio'."
),
)
self.parser.add_argument(
Expand Down
13 changes: 12 additions & 1 deletion exabel_data_sdk/services/csv_entity_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def load_entities(
abort_threshold: Optional[float] = DEFAULT_ABORT_THRESHOLD,
batch_size: Optional[int] = None,
return_results: bool = True,
total_rows: Optional[int] = None,
# Deprecated arguments
name_column: Optional[str] = None, # pylint: disable=unused-argument
namespace: Optional[str] = None, # pylint: disable=unused-argument
Expand Down Expand Up @@ -82,6 +83,9 @@ def load_entities(
upload to be aborted; if it is `None`, the upload is never aborted
batch_size: the number of entities to upload in each batch; if not specified, the
entire file will be read into memory and uploaded in a single batch
return_results: if True, returns a FileLoadingResult with info, else returns an
empty FileLoadingResult
total_rows: the total number of rows to be processed
"""
if dry_run:
logger.info("Running dry-run...")
Expand Down Expand Up @@ -155,6 +159,13 @@ def load_entities(
)
if return_results:
combined_result.update(result)
if combined_result.processed_rows is not None and total_rows:
logger.info(
"Rows processed: %d / %d. %.1f %%",
combined_result.processed_rows,
total_rows,
100 * combined_result.processed_rows / total_rows,
)

return combined_result

Expand Down Expand Up @@ -229,7 +240,7 @@ def _load_entities(
"An error occurred while uploading entities.",
failures=result.get_failures(),
)
return FileLoadingResult(result)
return FileLoadingResult(result, processed_rows=len(data_frame))
except BulkInsertFailedError as e:
# An error summary has already been printed.
if error_on_any_failure:
Expand Down
1 change: 1 addition & 0 deletions exabel_data_sdk/services/csv_exception.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""This file aliases an import for backwards compatibility after an exception object was renamed."""

# pylint: disable=unused-import
from exabel_data_sdk.services.file_loading_exception import (
FileLoadingException as CsvLoadingException,
Expand Down
3 changes: 2 additions & 1 deletion exabel_data_sdk/services/csv_loading_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
DEFAULT_NUMBER_OF_THREADS = 40
DEFAULT_NUMBER_OF_THREADS_FOR_IMPORT = 4
DEFAULT_NUMBER_OF_RETRIES = 5
MAX_THREADS_FOR_IMPORT = 40
MAX_THREADS_FOR_IMPORT = 100
FAILURE_LOG_LIMIT = None # type: ignore[var-annotated]
1 change: 1 addition & 0 deletions exabel_data_sdk/services/csv_loading_result.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""This file aliases an import for backwards compatibility after the result object was renamed."""

# pylint: disable=unused-import
from exabel_data_sdk.services.file_loading_result import FileLoadingResult as CsvLoadingResult
Loading

0 comments on commit 653d0a4

Please sign in to comment.