Skip to content

Commit 73ee1f1

Browse files
authored
Adjust retry logic and add arguments for disabling automatic tag/signal creation (#62)
1 parent f7be591 commit 73ee1f1

17 files changed

+191
-26
lines changed

VERSION

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
2.4.0
1+
2.5.0

exabel_data_sdk/client/api/bulk_insert.py

+10-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from concurrent.futures.thread import ThreadPoolExecutor
2-
from time import time
2+
from time import sleep, time
33
from typing import Callable, Sequence
44

55
from exabel_data_sdk.client.api.data_classes.request_error import ErrorType, RequestError
@@ -90,10 +90,15 @@ def _bulk_insert(
9090
raise BulkInsertFailedError()
9191

9292

93+
def _get_backoff(trial: int, min_sleep: float = 1.0, max_sleep: float = 60.0) -> float:
94+
"""Return the backoff in seconds for the given trial."""
95+
return min(min_sleep * 2 ** trial, max_sleep)
96+
97+
9398
def bulk_insert(
9499
resources: Sequence[TResource],
95100
insert_func: Callable[[TResource], ResourceCreationStatus],
96-
retries: int = 2,
101+
retries: int = 5,
97102
threads: int = 40,
98103
) -> ResourceCreationResults[TResource]:
99104
"""
@@ -119,6 +124,9 @@ def bulk_insert(
119124
failures = results.extract_retryable_failures()
120125
if not failures:
121126
break
127+
backoff = _get_backoff(trial)
128+
print(f"Sleeping {backoff:.2f} seconds before retrying failed requests...")
129+
sleep(backoff)
122130
resources = [result.resource for result in failures]
123131
print(f"Retry #{trial} with {len(resources)} resources:")
124132
_bulk_insert(results, resources, insert_func, threads=threads)

exabel_data_sdk/client/api/entity_api.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ def bulk_create_entities(
220220
entity_type: str,
221221
threads: int = 40,
222222
upsert: bool = False,
223+
retries: int = 5,
223224
) -> ResourceCreationResults[Entity]:
224225
"""
225226
Check if the provided entities exist, and create them if they don't.
@@ -238,4 +239,4 @@ def insert(entity: Entity) -> ResourceCreationStatus:
238239
self.create_entity(entity=entity, entity_type=entity_type)
239240
return ResourceCreationStatus.CREATED
240241

241-
return bulk_insert(entities, insert, threads=threads)
242+
return bulk_insert(entities, insert, threads=threads, retries=retries)

exabel_data_sdk/client/api/relationship_api.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,7 @@ def bulk_create_relationships(
310310
relationships: Sequence[Relationship],
311311
threads: int = 40,
312312
upsert: bool = False,
313+
retries: int = 5,
313314
) -> ResourceCreationResults[Relationship]:
314315
"""
315316
Check if the provided relationships exist, and create them if they don't.
@@ -327,4 +328,4 @@ def insert(relationship: Relationship) -> ResourceCreationStatus:
327328
self.create_relationship(relationship=relationship)
328329
return ResourceCreationStatus.CREATED
329330

330-
return bulk_insert(relationships, insert, threads=threads)
331+
return bulk_insert(relationships, insert, threads=threads, retries=retries)

exabel_data_sdk/client/api/time_series_api.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,7 @@ def bulk_upsert_time_series(
313313
create_tag: bool = False,
314314
threads: int = 40,
315315
default_known_time: DefaultKnownTime = None,
316+
retries: int = 5,
316317
) -> ResourceCreationResults[pd.Series]:
317318
"""
318319
Calls upsert_time_series for each of the provided time series,
@@ -332,6 +333,7 @@ def bulk_upsert_time_series(
332333
the Known Time for data points where a specific known time timestamp
333334
has not been given. If not provided, the Exabel API defaults to the
334335
current time (upload time) as the Known Time.
336+
retries: Maximum number of retries to make for each failed request.
335337
"""
336338

337339
def insert(ts: pd.Series) -> ResourceCreationStatus:
@@ -340,7 +342,7 @@ def insert(ts: pd.Series) -> ResourceCreationStatus:
340342
)
341343
return ResourceCreationStatus.UPSERTED if existed else ResourceCreationStatus.CREATED
342344

343-
return bulk_insert(series, insert, threads=threads)
345+
return bulk_insert(series, insert, threads=threads, retries=retries)
344346

345347
@staticmethod
346348
def _series_to_time_series_points(series: pd.Series) -> Sequence[TimeSeriesPoint]:

exabel_data_sdk/scripts/csv_script.py

+17-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55
import pandas as pd
66

77
from exabel_data_sdk.scripts.base_script import BaseScript
8+
from exabel_data_sdk.services.csv_loading_constants import (
9+
DEFAULT_NUMBER_OF_RETRIES,
10+
DEFAULT_NUMBER_OF_THREADS,
11+
)
812

913

1014
class CsvScript(BaseScript):
@@ -56,8 +60,19 @@ def __init__(self, argv: Sequence[str], description: str):
5660
type=int,
5761
choices=range(1, 101),
5862
metavar="[1-100]",
59-
default=40,
60-
help="The number of parallel upload threads to run. Defaults to 40.",
63+
default=DEFAULT_NUMBER_OF_THREADS,
64+
help=f"The number of parallel upload threads to run. "
65+
f"Defaults to {DEFAULT_NUMBER_OF_THREADS}.",
66+
)
67+
self.parser.add_argument(
68+
"--retries",
69+
required=False,
70+
type=int,
71+
choices=range(1, 51),
72+
metavar="[1-50]",
73+
default=DEFAULT_NUMBER_OF_RETRIES,
74+
help=f"The maximum number of retries to make for each failed request. Defaults to "
75+
f"{DEFAULT_NUMBER_OF_RETRIES}.",
6176
)
6277

6378
def read_csv(

exabel_data_sdk/scripts/load_entities_from_csv.py

+1
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None:
8585
threads=args.threads,
8686
upsert=args.upsert,
8787
dry_run=args.dry_run,
88+
retries=args.retries,
8889
)
8990
except CsvLoadingException as e:
9091
print(e)

exabel_data_sdk/scripts/load_relationships_from_csv.py

+1
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None:
8787
threads=args.threads,
8888
upsert=args.upsert,
8989
dry_run=args.dry_run,
90+
retries=args.retries,
9091
)
9192
except CsvLoadingException as e:
9293
print(e)

exabel_data_sdk/scripts/load_time_series_from_csv.py

+25-4
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,18 @@ class LoadTimeSeriesFromCsv(CsvScriptWithEntityMapping):
1313
Processes a timeseries CSV file and uploads the time series to Exabel.
1414
1515
The CSV file should have a header line on the format
16-
entity;date;`signal_1`; ... ;`signal_n`
16+
entityType;date;`signal_1`; ... ;`signal_n`
17+
18+
For example
19+
brand;date;revenue;sales
1720
1821
Each subsequent row consists of the following elements:
19-
* the entity referred to by the entity’s resource name, e.g.,
20-
entityTypes/company/entities/company_code
22+
* the entity referred to by the entity’s identifier
2123
* the date on ISO format, e.g. 2020-12-31
2224
* one numerical value for each of the signals `signal_1` to `signal_n`
2325
2426
Thus, a typical row would look like:
25-
entityTypes/company/entities/company_code;2020-12-31;12;1234.56;1.23e6
27+
brand_1;2020-12-31;12;1234.56;1.23e6
2628
2729
The rows do not have to be sorted in any particular order.
2830
"""
@@ -59,6 +61,22 @@ def __init__(self, argv: Sequence[str]):
5961
"available to the user the day after, one would set --pit-offset 1"
6062
),
6163
)
64+
self.parser.add_argument(
65+
"--no-create-library-signal",
66+
dest="create_library_signal",
67+
required=False,
68+
action="store_false",
69+
default=True,
70+
help="Set to not create library signal DSL expressions.",
71+
)
72+
self.parser.add_argument(
73+
"--no-create-tag",
74+
dest="create_tag",
75+
required=False,
76+
action="store_false",
77+
default=True,
78+
help="Set to not create a tag for every entity type a signal has time series for.",
79+
)
6280

6381
def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None:
6482
try:
@@ -70,8 +88,11 @@ def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None:
7088
pit_current_time=args.pit_current_time,
7189
pit_offset=args.pit_offset,
7290
create_missing_signals=args.create_missing_signals,
91+
create_tag=args.create_tag,
92+
create_library_signal=args.create_library_signal,
7393
threads=args.threads,
7494
dry_run=args.dry_run,
95+
retries=args.retries,
7596
)
7697
except CsvLoadingException as e:
7798
print(e)

exabel_data_sdk/services/csv_entity_loader.py

+10-3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22
from exabel_data_sdk.client.api.bulk_insert import BulkInsertFailedError
33
from exabel_data_sdk.client.api.data_classes.entity import Entity
44
from exabel_data_sdk.services.csv_exception import CsvLoadingException
5-
from exabel_data_sdk.services.csv_loading_constants import DEFAULT_NUMBER_OF_THREADS
5+
from exabel_data_sdk.services.csv_loading_constants import (
6+
DEFAULT_NUMBER_OF_RETRIES,
7+
DEFAULT_NUMBER_OF_THREADS,
8+
)
69
from exabel_data_sdk.services.csv_reader import CsvReader
710
from exabel_data_sdk.util.resource_name_normalization import normalize_resource_name
811

@@ -29,6 +32,7 @@ def load_entities(
2932
upsert: bool,
3033
dry_run: bool = False,
3134
error_on_any_failure: bool = False,
35+
retries: int = DEFAULT_NUMBER_OF_RETRIES,
3236
) -> None:
3337
"""
3438
Load a CSV file and upload the entities specified therein to the Exabel Data API.
@@ -49,6 +53,7 @@ def load_entities(
4953
upsert: whether entities should be updated if they already exist
5054
dry_run: if True, the file is processed, but no entities are actually uploaded
5155
error_on_any_failure: if True, an exception is raised if any entity failed to be created
56+
retries: the maximum number of retries to make for each failed request
5257
"""
5358
if dry_run:
5459
print("Running dry-run...")
@@ -61,7 +66,9 @@ def load_entities(
6166
}
6267
if description_column:
6368
string_columns.add(description_column)
64-
entities_df = CsvReader.read_csv(filename, separator, string_columns=string_columns)
69+
entities_df = CsvReader.read_csv(
70+
filename, separator, string_columns=string_columns, keep_default_na=False
71+
)
6572

6673
name_col = name_column or entities_df.columns[0]
6774
display_name_col = display_name_column or name_col
@@ -91,7 +98,7 @@ def load_entities(
9198

9299
try:
93100
result = self._client.entity_api.bulk_create_entities(
94-
entities, entity_type_name, threads=threads, upsert=upsert
101+
entities, entity_type_name, threads=threads, upsert=upsert, retries=retries
95102
)
96103
if error_on_any_failure and result.has_failure():
97104
raise CsvLoadingException("An error occurred while uploading entities.")
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
DEFAULT_NUMBER_OF_THREADS = 40
2+
DEFAULT_NUMBER_OF_RETRIES = 5

exabel_data_sdk/services/csv_reader.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@ class CsvReader:
88

99
@staticmethod
1010
def read_csv(
11-
filename: str, separator: str, string_columns: Iterable[Union[str, int]]
11+
filename: str,
12+
separator: str,
13+
string_columns: Iterable[Union[str, int]],
14+
*,
15+
keep_default_na: bool
1216
) -> pd.DataFrame:
1317
"""
1418
Read the given file and return the content as a pandas DataFrame.
@@ -21,4 +25,6 @@ def read_csv(
2125
dtype: Optional[Mapping[Union[str, int], type]] = None
2226
if string_columns:
2327
dtype = {column: str for column in string_columns}
24-
return pd.read_csv(filename, header=0, sep=separator, dtype=dtype, keep_default_na=False)
28+
return pd.read_csv(
29+
filename, header=0, sep=separator, dtype=dtype, keep_default_na=keep_default_na
30+
)

exabel_data_sdk/services/csv_relationship_loader.py

+10-3
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
from exabel_data_sdk.client.api.data_classes.relationship import Relationship
44
from exabel_data_sdk.client.api.data_classes.relationship_type import RelationshipType
55
from exabel_data_sdk.services.csv_exception import CsvLoadingException
6-
from exabel_data_sdk.services.csv_loading_constants import DEFAULT_NUMBER_OF_THREADS
6+
from exabel_data_sdk.services.csv_loading_constants import (
7+
DEFAULT_NUMBER_OF_RETRIES,
8+
DEFAULT_NUMBER_OF_THREADS,
9+
)
710
from exabel_data_sdk.services.csv_reader import CsvReader
811
from exabel_data_sdk.services.entity_mapping_file_reader import EntityMappingFileReader
912
from exabel_data_sdk.util.resource_name_normalization import to_entity_resource_names
@@ -32,6 +35,7 @@ def load_relationships(
3235
upsert: bool = False,
3336
dry_run: bool = False,
3437
error_on_any_failure: bool = False,
38+
retries: int = DEFAULT_NUMBER_OF_RETRIES,
3539
) -> None:
3640
"""
3741
Load a CSV file and upload the relationships specified therein to the Exabel Data API.
@@ -53,6 +57,7 @@ def load_relationships(
5357
dry_run: if True, the file is processed, but no relationships are actually uploaded
5458
error_on_any_failure: if True, an exception is raised if any relationship failed to be
5559
created
60+
retries: the maximum number of retries to make for each failed request
5661
"""
5762
if dry_run:
5863
print("Running dry-run...")
@@ -66,7 +71,9 @@ def load_relationships(
6671
if description_column:
6772
string_columns.add(description_column)
6873

69-
relationships_df = CsvReader.read_csv(filename, separator, string_columns=string_columns)
74+
relationships_df = CsvReader.read_csv(
75+
filename, separator, string_columns=string_columns, keep_default_na=False
76+
)
7077

7178
entity_from_col = entity_from_column
7279
entity_to_col = entity_to_column
@@ -119,7 +126,7 @@ def load_relationships(
119126

120127
try:
121128
result = self._client.relationship_api.bulk_create_relationships(
122-
relationships, threads=threads, upsert=upsert
129+
relationships, threads=threads, upsert=upsert, retries=retries
123130
)
124131
if error_on_any_failure and result.has_failure():
125132
raise CsvLoadingException("An error occurred while uploading relationships.")

exabel_data_sdk/services/csv_time_series_loader.py

+17-4
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@
1010
from exabel_data_sdk.client.api.bulk_insert import BulkInsertFailedError
1111
from exabel_data_sdk.client.api.data_classes.signal import Signal
1212
from exabel_data_sdk.services.csv_exception import CsvLoadingException
13-
from exabel_data_sdk.services.csv_loading_constants import DEFAULT_NUMBER_OF_THREADS
13+
from exabel_data_sdk.services.csv_loading_constants import (
14+
DEFAULT_NUMBER_OF_RETRIES,
15+
DEFAULT_NUMBER_OF_THREADS,
16+
)
1417
from exabel_data_sdk.services.csv_reader import CsvReader
1518
from exabel_data_sdk.services.entity_mapping_file_reader import EntityMappingFileReader
1619
from exabel_data_sdk.stubs.exabel.api.data.v1.time_series_messages_pb2 import DefaultKnownTime
@@ -40,9 +43,12 @@ def load_time_series(
4043
pit_current_time: bool = False,
4144
pit_offset: int = False,
4245
create_missing_signals: bool = False,
46+
create_tag: bool = True,
47+
create_library_signal: bool = True,
4348
threads: int = DEFAULT_NUMBER_OF_THREADS,
4449
dry_run: bool = False,
4550
error_on_any_failure: bool = False,
51+
retries: int = DEFAULT_NUMBER_OF_RETRIES,
4652
) -> None:
4753
"""
4854
Load a CSV file and upload the time series to the Exabel Data API
@@ -64,6 +70,7 @@ def load_time_series(
6470
dry_run: if True, the file is processed, but no time series are actually uploaded
6571
error_on_any_failure: if True, an exception is raised if any time series failed to be
6672
created
73+
retries: the maximum number of retries to make for each failed request
6774
"""
6875
if dry_run:
6976
print("Running dry-run...")
@@ -79,7 +86,9 @@ def load_time_series(
7986
time_offset = Duration(seconds=86400 * pit_offset)
8087
default_known_time = DefaultKnownTime(time_offset=time_offset)
8188

82-
ts_data = CsvReader.read_csv(filename, separator=separator, string_columns=[0])
89+
ts_data = CsvReader.read_csv(
90+
filename, separator=separator, string_columns=[0], keep_default_na=True
91+
)
8392
entity_mapping = EntityMappingFileReader.read_entity_mapping_file(
8493
entity_mapping_filename, separator=separator
8594
)
@@ -194,7 +203,7 @@ def load_time_series(
194203
for signal in missing_signals:
195204
self._client.signal_api.create_signal(
196205
Signal(name=prefix + signal, display_name=signal),
197-
create_library_signal=True,
206+
create_library_signal=create_library_signal,
198207
)
199208
else:
200209
raise CsvLoadingException(
@@ -212,7 +221,11 @@ def load_time_series(
212221

213222
try:
214223
result = self._client.time_series_api.bulk_upsert_time_series(
215-
series, create_tag=True, threads=threads, default_known_time=default_known_time
224+
series,
225+
create_tag=create_tag,
226+
threads=threads,
227+
default_known_time=default_known_time,
228+
retries=retries,
216229
)
217230
if error_on_any_failure and result.has_failure():
218231
raise CsvLoadingException("An error occurred while uploading time series.")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import unittest
2+
3+
from exabel_data_sdk.client.api.bulk_insert import _get_backoff
4+
5+
6+
class TestEntityApi(unittest.TestCase):
7+
def test_get_backoff(self):
8+
self.assertEqual(1.0, _get_backoff(0))
9+
self.assertEqual(2.0, _get_backoff(1))
10+
self.assertEqual(4.0, _get_backoff(2))
11+
self.assertEqual(8.0, _get_backoff(3))
12+
self.assertEqual(16.0, _get_backoff(4))
13+
self.assertEqual(32.0, _get_backoff(5))
14+
self.assertEqual(60.0, _get_backoff(6))

0 commit comments

Comments
 (0)