Skip to content

Version 6.1.0 #175

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
905 changes: 472 additions & 433 deletions Pipfile.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
6.0.0
6.1.0
8 changes: 7 additions & 1 deletion exabel_data_sdk/client/api/bulk_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
)
from exabel_data_sdk.services.csv_loading_constants import (
DEFAULT_ABORT_THRESHOLD,
DEFAULT_MAX_BACKOFF_SECONDS,
DEFAULT_MIN_BACKOFF_SECONDS,
DEFAULT_NUMBER_OF_RETRIES,
DEFAULT_NUMBER_OF_THREADS,
)
Expand Down Expand Up @@ -105,7 +107,11 @@ def _bulk_insert(
raise BulkInsertFailedError()


def _get_backoff(trial: int, min_sleep: float = 1.0, max_sleep: float = 60.0) -> float:
def _get_backoff(
trial: int,
min_sleep: float = DEFAULT_MIN_BACKOFF_SECONDS,
max_sleep: float = DEFAULT_MAX_BACKOFF_SECONDS,
) -> float:
"""Return the backoff in seconds for the given trial."""
return min(min_sleep * 2**trial, max_sleep)

Expand Down
30 changes: 30 additions & 0 deletions exabel_data_sdk/client/api/time_series_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def create_time_series(
series: Union[pd.Series, TimeSeries],
create_tag: Optional[bool] = None, # pylint: disable=unused-argument
default_known_time: Optional[DefaultKnownTime] = None,
should_optimise: Optional[bool] = None,
) -> None:
"""
Create a time series.
Expand All @@ -193,13 +194,17 @@ def create_time_series(
the Known Time for data points where a specific known time timestamp
has not been given. If not provided, the Exabel API defaults to the
current time (upload time) as the Known Time.
should_optimise:
Whether time series storage optimisation should be enabled or not. If not
set, optimisation is at the discretion of the server.
"""
series = self._handle_time_series(name, series)

self.client.create_time_series(
CreateTimeSeriesRequest(
time_series=series.to_proto(),
default_known_time=default_known_time,
insert_options=InsertOptions(should_optimise=should_optimise),
),
)

Expand All @@ -210,6 +215,7 @@ def upsert_time_series(
series: pd.Series,
create_tag: Optional[bool] = None, # pylint: disable=unused-argument
default_known_time: Optional[DefaultKnownTime] = None,
should_optimise: Optional[bool] = None,
) -> None:
"""
Create or update a time series.
Expand All @@ -229,12 +235,16 @@ def upsert_time_series(
the Known Time for data points where a specific known time timestamp
has not been given. If not provided, the Exabel API defaults to the
current time (upload time) as the Known Time.
should_optimise:
Whether time series storage optimisation should be enabled or not. If not
set, optimisation is at the discretion of the server.
"""
self.append_time_series_data(
name,
series,
default_known_time,
allow_missing=True,
should_optimise=should_optimise,
)

@deprecate_arguments(create_tag=None)
Expand All @@ -245,6 +255,7 @@ def append_time_series_data(
default_known_time: Optional[DefaultKnownTime] = None,
allow_missing: bool = False,
create_tag: bool = False, # pylint: disable=unused-argument
should_optimise: Optional[bool] = None,
) -> None:
"""
Append data to the given time series.
Expand All @@ -263,6 +274,9 @@ def append_time_series_data(
allow_missing: If set to true, and the resource is not found, a new resource will be
created. In this situation, the "update_mask" is ignored.
create_tag: Deprecated.
should_optimise:
Whether time series storage optimisation should be enabled or not. If not
set, optimisation is at the discretion of the server.
"""
series = self._handle_time_series(name, series)

Expand All @@ -271,6 +285,7 @@ def append_time_series_data(
time_series=series.to_proto(),
insert_options=InsertOptions(
default_known_time=default_known_time,
should_optimise=should_optimise,
),
update_options=UpdateOptions(
allow_missing=allow_missing,
Expand All @@ -289,6 +304,7 @@ def import_time_series(
status_in_response: bool = False,
replace_existing_time_series: bool = False,
replace_existing_data_points: bool = False,
should_optimise: Optional[bool] = None,
) -> Optional[Sequence[ResourceCreationResult]]:
"""
Import multiple time series.
Expand Down Expand Up @@ -325,6 +341,9 @@ def import_time_series(
inserted time series points. Data points at times not present in the
request will be left untouched. Only one of replace_existing_data_points
or replace_existing_time_series can be set to true.
should_optimise:
Whether time series storage optimisation should be enabled or not. If
not set, optimisation is at the discretion of the server.
Returns:
If status_in_response is set to true, a list of ResourceCreationResult will be returned.
Otherwise, None is returned.
Expand All @@ -348,6 +367,7 @@ def import_time_series(
status_in_response=status_in_response,
insert_options=InsertOptions(
default_known_time=default_known_time,
should_optimise=should_optimise,
),
update_options=update_options,
)
Expand All @@ -367,6 +387,7 @@ def append_time_series_data_and_return(
allow_missing: Optional[bool] = False,
create_tag: Optional[bool] = None, # pylint: disable=unused-argument
include_metadata: Optional[bool] = False,
should_optimise: Optional[bool] = None,
) -> Union[pd.Series, TimeSeries]:
"""
Append data to the given time series, and return the full series.
Expand All @@ -388,6 +409,9 @@ def append_time_series_data_and_return(
include_metadata:
Whether to include the metadata of the time series in the response.
Returns a TimeSeries object if set to True, otherwise a pandas Series.
should_optimise:
Whether time series storage optimisation should be enabled or not. If not
set, optimisation is at the discretion of the server.

Returns:
A series with all data for the given time series.
Expand All @@ -401,6 +425,7 @@ def append_time_series_data_and_return(
view=TimeSeriesView(time_range=TimeRange()),
insert_options=InsertOptions(
default_known_time=default_known_time,
should_optimise=should_optimise,
),
update_options=UpdateOptions(
allow_missing=allow_missing,
Expand Down Expand Up @@ -441,6 +466,7 @@ def bulk_upsert_time_series(
default_known_time: Optional[DefaultKnownTime] = None,
replace_existing_time_series: bool = False,
replace_existing_data_points: bool = False,
should_optimise: Optional[bool] = None,
retries: int = DEFAULT_NUMBER_OF_RETRIES,
abort_threshold: Optional[float] = DEFAULT_ABORT_THRESHOLD,
# Deprecated arguments
Expand Down Expand Up @@ -477,6 +503,9 @@ def bulk_upsert_time_series(
inserted time series points. Data points at times not present in the
request will be left untouched. Only one of replace_existing_data_points
or replace_existing_time_series can be set to true.
should_optimise:
Whether time series storage optimisation should be enabled or not. If
not set, optimisation is at the discretion of the server.
retries: Maximum number of retries to make for each failed request.
abort_threshold:
The threshold for the proportion of failed requests that will cause the
Expand All @@ -497,6 +526,7 @@ def import_func(
status_in_response=True,
replace_existing_time_series=replace_existing_time_series,
replace_existing_data_points=replace_existing_data_points,
should_optimise=should_optimise,
)
assert result is not None
return result
Expand Down
118 changes: 118 additions & 0 deletions exabel_data_sdk/scripts/delete_time_series_points.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import argparse
import sys
from typing import Sequence

import pandas as pd

from exabel_data_sdk import ExabelClient
from exabel_data_sdk.scripts.list_time_series import ListTimeSeries
from exabel_data_sdk.services.csv_loading_constants import (
DEFAULT_NUMBER_OF_RETRIES,
DEFAULT_NUMBER_OF_THREADS,
)


class DeleteTimeSeriesPoints(ListTimeSeries):
"""
Deletes all time series data points for a specifc date and known time.
The script fetches all time series for a given signal, entity type or
entity, or a combination of these.
"""

def __init__(self, argv: Sequence[str], description: str):
super().__init__(argv, description)
self.parser.add_argument(
"--date",
required=True,
type=str,
help="The date of the time series point to delete.",
)
self.parser.add_argument(
"--known-time",
required=False,
type=str,
help="The known time of the time series point to delete.",
)
self.parser.add_argument(
"--dry-run",
required=False,
action="store_true",
default=False,
help="Only print to console instead of deleting",
)
self.parser.add_argument(
"--threads",
required=False,
type=int,
choices=range(1, 101),
metavar="[1-100]",
default=DEFAULT_NUMBER_OF_THREADS,
help=f"The number of parallel upload threads to run. "
f"Defaults to {DEFAULT_NUMBER_OF_THREADS}.",
)
self.parser.add_argument(
"--retries",
required=False,
type=int,
choices=range(1, 51),
metavar="[1-50]",
default=DEFAULT_NUMBER_OF_RETRIES,
help=f"The maximum number of retries to make for each failed request. Defaults to "
f"{DEFAULT_NUMBER_OF_RETRIES}.",
)

def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None:
all_time_series = self._list_time_series(
client,
entity=args.entity,
signal=args.signal,
entity_type=args.entity_type,
show_progress=args.show_progress,
)

if not all_time_series:
print("Did not find any time series.")
return

num_time_series = len(all_time_series)
print(f"Number of time series data points to delete: {num_time_series}")

date = pd.Timestamp(args.date)
known_time = pd.Timestamp(args.known_time) if args.known_time else None

index = (
pd.MultiIndex.from_tuples([(date, known_time)], names=["date", "known_time"])
if known_time
else pd.Index([date], name="date")
)

series = [pd.Series([1], index=index, name=time_series) for time_series in all_time_series]

print(f"Deleting time series data points with date {args.date}", end=" ")
if known_time:
print(f"and known time {args.known_time}", end=" ")
print(f"from the following {num_time_series} time series.")

for ts in series:
print(ts.name)

if args.dry_run:
print(f"Would have deleted {num_time_series} time series data points.")
return

result = client.time_series_api.batch_delete_time_series_points(
series, args.threads, args.retries
)

print(f"Successfully deleted {result.total_count} time series data points.")
if result.has_failure():
print(f"Failed to delete {len(result.get_failures())} time series data points.")


if __name__ == "__main__":
DeleteTimeSeriesPoints(
sys.argv,
"Deletes all time series data points for a given date and known time. "
"A signal, entity type, or entity, or a combination of these, can be specified "
"to filter the time series to delete.",
).run()
18 changes: 18 additions & 0 deletions exabel_data_sdk/scripts/load_time_series_from_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,21 @@ def __init__(self, argv: Sequence[str]):
default=False,
help="Replace any existing data points on the specified dates when importing",
)
group = self.parser.add_mutually_exclusive_group()
group.add_argument(
"--optimise",
required=False,
action="store_true",
help="Enable time series storage optimisation. If neither this nor --no-optimise is "
"set, optimisation is at the discretion of the server.",
)
group.add_argument(
"--no-optimise",
required=False,
action="store_true",
help="Disable time series storage optimisation. If neither this nor -optimise is set, "
"optimisation is at the discretion of the server.",
)

def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None:
try:
Expand All @@ -159,6 +174,9 @@ def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None:
abort_threshold=args.abort_threshold,
replace_existing_time_series=args.replace_existing_time_series,
replace_existing_data_points=args.replace_existing_data_points,
should_optimise=(
True if args.optimise is True else False if args.no_optimise is True else None
),
)
except FileLoadingException as e:
print("ERROR: Loading time series failed.")
Expand Down
2 changes: 2 additions & 0 deletions exabel_data_sdk/services/csv_loading_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@
DEFAULT_NUMBER_OF_RETRIES = 5
MAX_THREADS_FOR_IMPORT = 100
FAILURE_LOG_LIMIT = None # type: ignore[var-annotated]
DEFAULT_MIN_BACKOFF_SECONDS = 1
DEFAULT_MAX_BACKOFF_SECONDS = 60 * 10
7 changes: 7 additions & 0 deletions exabel_data_sdk/services/file_time_series_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def load_time_series(
case_sensitive_signals: bool = False,
replace_existing_time_series: bool = False,
replace_existing_data_points: bool = False,
should_optimise: Optional[bool] = None,
return_results: bool = True,
processed_rows: int = 0,
total_rows: Optional[int] = None,
Expand Down Expand Up @@ -122,6 +123,8 @@ def load_time_series(
case_sensitive_signals: if True, signals are case sensitive
replace_existing_time_series: if True, any existing time series are replaced
replace_existing_data_points: if True, any existing time series data points are replaced
should_optimise: Whether time series storage optimisation should be enabled or not. If
not set, optimisation is at the discretion of the server.
return_results: if True, returns a list of TimeSeriesFileLoadingResults
or otherwise an empty list.
processed_rows: the number of rows already processed
Expand Down Expand Up @@ -169,6 +172,7 @@ def load_time_series(
replace_existing_time_series=replace_existing_time_series,
replace_existing_data_points=replace_existing_data_points,
replaced_time_series=replaced_time_series,
should_optimise=should_optimise,
)
if result.processed_rows is not None and total_rows:
processed_rows = processed_rows + result.processed_rows
Expand Down Expand Up @@ -206,6 +210,7 @@ def _load_time_series(
replace_existing_time_series: bool = False,
replace_existing_data_points: bool = False,
replaced_time_series: Optional[Sequence[str]] = None,
should_optimise: Optional[bool] = None,
) -> TimeSeriesFileLoadingResult:
"""
Load time series from a parser.
Expand Down Expand Up @@ -371,6 +376,7 @@ def _load_time_series(
retries=retries,
abort_threshold=abort_threshold,
replace_existing_time_series=True,
should_optimise=should_optimise,
)
if error_on_any_failure and (replace_result.has_failure() or invalid_series):
raise FileLoadingException(
Expand All @@ -387,6 +393,7 @@ def _load_time_series(
retries=retries,
abort_threshold=abort_threshold,
replace_existing_data_points=replace_existing_data_points,
should_optimise=should_optimise,
)
if error_on_any_failure and (result.has_failure() or invalid_series):
raise FileLoadingException(
Expand Down
Loading
Loading