Skip to content

Commit

Permalink
Publish v2.0.0 (#50)
Browse files Browse the repository at this point in the history
* add: Support for inserting time series points with known time for point in time support
* add: Support for an entity mapping file to override resource name normalisation and identifier lookup against the Exabel API when using the CSV upload scripts
* add: Option to override API key in the BaseScript
* change: When using the CSV upload scripts, a point in time policy must be set
* fix: Allow creating relationships on global read-only entities when using the CSV upload scripts
* fix: Validate data points before uploading them to the Exabel API when using the CSV upload scripts
  • Loading branch information
aksestok authored Nov 19, 2021
1 parent f0042ba commit f498349
Show file tree
Hide file tree
Showing 21 changed files with 853 additions and 107 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.0.0
2.0.0
149 changes: 115 additions & 34 deletions exabel_data_sdk/client/api/time_series_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from exabel_data_sdk.stubs.exabel.api.data.v1.all_pb2 import (
BatchDeleteTimeSeriesPointsRequest,
CreateTimeSeriesRequest,
DefaultKnownTime,
DeleteTimeSeriesRequest,
GetTimeSeriesRequest,
ListTimeSeriesRequest,
Expand Down Expand Up @@ -83,30 +84,43 @@ def get_entity_time_series(
)

def get_time_series(
self, name: str, start: pd.Timestamp = None, end: pd.Timestamp = None
self,
name: str,
start: pd.Timestamp = None,
end: pd.Timestamp = None,
known_time: pd.Timestamp = None,
) -> Optional[pd.Series]:
"""
Get one time series.
If start and end are not specified, all data points will be returned.
If start or end is specified, both must be specified.
If known_time is specified, the data will be returned as if it was requested at the given
time (in the past). Values inserted after this known time are disregarded.
If not set, the newest values are returned.
If time series does not exist, None is returned.
Args:
name: The resource name of the requested time series, for example
"entityTypes/ns.type1/entities/ns.entity1/signals/ns.signal1" or
"signals/ns.signal1/entityTypes/ns.type1/entities/ns.entity1".
start: Start of the period to get data for.
end: End of the period to get data for.
name: The resource name of the requested time series, for example
"entityTypes/ns.type1/entities/ns.entity1/signals/ns.signal1" or
"signals/ns.signal1/entityTypes/ns.type1/entities/ns.entity1".
start: Start of the period to get data for.
end: End of the period to get data for.
known_time: The point-in-time at which to request the time series.
"""
if bool(start) != bool(end):
raise ValueError("Either specify both 'start' and 'end' or none of them.")
time_range = self._get_time_range(start, end)

try:
time_series = self.client.get_time_series(
GetTimeSeriesRequest(name=name, view=TimeSeriesView(time_range=time_range)),
GetTimeSeriesRequest(
name=name,
view=TimeSeriesView(
time_range=time_range,
known_time=TimeSeriesApi._pandas_timestamp_to_proto(known_time),
),
),
)
except RequestError as error:
if error.error_type == ErrorType.NOT_FOUND:
Expand All @@ -115,7 +129,13 @@ def get_time_series(

return self._time_series_points_to_series(time_series.points, time_series.name)

def create_time_series(self, name: str, series: pd.Series, create_tag: bool = False) -> None:
def create_time_series(
self,
name: str,
series: pd.Series,
create_tag: bool = False,
default_known_time: DefaultKnownTime = None,
) -> None:
"""
Create a time series.
Expand All @@ -135,16 +155,28 @@ def create_time_series(self, name: str, series: pd.Series, create_tag: bool = Fa
create_tag: Set to true to create a tag for every entity type a signal has time series
for. If a tag already exists, it will be updated when time series are
created (or deleted) regardless of the value of this flag.
default_known_time:
Specify a default known time policy. This is used to determine
the Known Time for data points where a specific known time timestamp
has not been given. If not provided, the Exabel API defaults to the
current time (upload time) as the Known Time.
"""
time_series_points = self._series_to_time_series_points(series)
self.client.create_time_series(
CreateTimeSeriesRequest(
time_series=ProtoTimeSeries(name=name, points=time_series_points),
create_tag=create_tag,
default_known_time=default_known_time,
),
)

def upsert_time_series(self, name: str, series: pd.Series, create_tag: bool = False) -> bool:
def upsert_time_series(
self,
name: str,
series: pd.Series,
create_tag: bool = False,
default_known_time: DefaultKnownTime = None,
) -> bool:
"""
Create or update a time series.
Expand All @@ -160,18 +192,23 @@ def upsert_time_series(self, name: str, series: pd.Series, create_tag: bool = Fa
create_tag: Set to true to create a tag for every entity type a signal has time series
for. If a tag already exists, it will be updated when time series are
created (or deleted) regardless of the value of this flag.
default_known_time:
Specify a default known time policy. This is used to determine
the Known Time for data points where a specific known time timestamp
has not been given. If not provided, the Exabel API defaults to the
current time (upload time) as the Known Time.
Returns:
True if the time series already existed, or False if it is created
"""
try:
# Optimistically assume that the time series exists, and append to it.
# If it doesn't exist, we catch the error below and create the time series instead.
self.append_time_series_data(name, series)
self.append_time_series_data(name, series, default_known_time)
return True
except RequestError as error:
if error.error_type == ErrorType.NOT_FOUND:
self.create_time_series(name, series, create_tag)
self.create_time_series(name, series, create_tag, default_known_time)
return False
raise

Expand All @@ -189,7 +226,9 @@ def clear_time_series_data(self, name: str, start: pd.Timestamp, end: pd.Timesta
BatchDeleteTimeSeriesPointsRequest(name=name, time_ranges=[time_range]),
)

def append_time_series_data(self, name: str, series: pd.Series) -> None:
def append_time_series_data(
self, name: str, series: pd.Series, default_known_time: DefaultKnownTime = None
) -> None:
"""
Append data to the given time series.
Expand All @@ -199,16 +238,24 @@ def append_time_series_data(self, name: str, series: pd.Series) -> None:
Args:
name: The resource name of the time series.
series: Series with data to append.
default_known_time:
Specify a default known time policy. This is used to determine
the Known Time for data points where a specific known time timestamp
has not been given. If not provided, the Exabel API defaults to the
current time (upload time) as the Known Time.
"""
self.client.update_time_series(
UpdateTimeSeriesRequest(
time_series=ProtoTimeSeries(
name=name, points=self._series_to_time_series_points(series)
),
default_known_time=default_known_time,
),
)

def append_time_series_data_and_return(self, name: str, series: pd.Series) -> pd.Series:
def append_time_series_data_and_return(
self, name: str, series: pd.Series, default_known_time: DefaultKnownTime = None
) -> pd.Series:
"""
Append data to the given time series, and return the full series.
Expand All @@ -218,6 +265,11 @@ def append_time_series_data_and_return(self, name: str, series: pd.Series) -> pd
Args:
name: The resource name of the time series.
series: Series with data to append.
default_known_time:
Specify a default known time policy. This is used to determine
the Known Time for data points where a specific known time timestamp
has not been given. If not provided, the Exabel API defaults to the
current time (upload time) as the Known Time.
Returns:
A series with all data for the given time series.
Expand All @@ -229,6 +281,7 @@ def append_time_series_data_and_return(self, name: str, series: pd.Series) -> pd
name=name, points=self._series_to_time_series_points(series)
),
view=TimeSeriesView(time_range=TimeRange()),
default_known_time=default_known_time,
),
)
return self._time_series_points_to_series(time_series.points, time_series.name)
Expand Down Expand Up @@ -259,6 +312,7 @@ def bulk_upsert_time_series(
series: Sequence[pd.Series],
create_tag: bool = False,
threads: int = 40,
default_known_time: DefaultKnownTime = None,
) -> ResourceCreationResults[pd.Series]:
"""
Calls upsert_time_series for each of the provided time series,
Expand All @@ -273,10 +327,17 @@ def bulk_upsert_time_series(
series for. If a tag already exists, it will be updated when time
series are created (or deleted) regardless of the value of this flag.
threads: The number of parallel upload threads to use.
default_known_time:
Specify a default known time policy. This is used to determine
the Known Time for data points where a specific known time timestamp
has not been given. If not provided, the Exabel API defaults to the
current time (upload time) as the Known Time.
"""

def insert(ts: pd.Series) -> ResourceCreationStatus:
existed = self.upsert_time_series(str(ts.name), ts, create_tag=create_tag)
existed = self.upsert_time_series(
str(ts.name), ts, create_tag=create_tag, default_known_time=default_known_time
)
return ResourceCreationStatus.EXISTS if existed else ResourceCreationStatus.CREATED

return bulk_insert(series, insert, threads=threads)
Expand All @@ -285,13 +346,26 @@ def insert(ts: pd.Series) -> ResourceCreationStatus:
def _series_to_time_series_points(series: pd.Series) -> Sequence[TimeSeriesPoint]:
"""Convert a pandas Series to a sequence of TimeSeriesPoint."""
points = []
for timestamp, value in series.iteritems():
proto_timestamp = timestamp_pb2.Timestamp()
if not timestamp.tz:
timestamp = timestamp.tz_localize(tz=tz.tzutc())
proto_timestamp.FromJsonString(timestamp.isoformat())
proto_value = DoubleValue(value=value)
points.append(TimeSeriesPoint(time=proto_timestamp, value=proto_value))
for index, value in series.iteritems():
if isinstance(index, tuple):
# (timestamp, known_time)
if len(index) != 2:
raise ValueError(
"A time series with a MultiIndex is expected to have exactly "
f"two elements: (timestamp, known_time), but got {index}"
)
timestamp = index[0]
known_time = index[1]
else:
timestamp = index
known_time = None
points.append(
TimeSeriesPoint(
time=TimeSeriesApi._pandas_timestamp_to_proto(timestamp),
value=DoubleValue(value=value),
known_time=TimeSeriesApi._pandas_timestamp_to_proto(known_time),
)
)
return points

@staticmethod
Expand Down Expand Up @@ -321,26 +395,33 @@ def _get_time_range(
include_start: Whether to include the start timestamp.
include_end: Whether to include the end timestamp.
"""
if bool(start) != bool(end):
raise ValueError("Either specify both 'start' and 'end' or none of them.")
if start is None:
return TimeRange()

start_timestamp = timestamp_pb2.Timestamp()
start_timestamp.FromJsonString(TimeSeriesApi._convert_utc(start).isoformat())

end_timestamp = timestamp_pb2.Timestamp()
end_timestamp.FromJsonString(TimeSeriesApi._convert_utc(end).isoformat())

return TimeRange(
from_time=start_timestamp,
to_time=end_timestamp,
from_time=TimeSeriesApi._pandas_timestamp_to_proto(start),
to_time=TimeSeriesApi._pandas_timestamp_to_proto(end),
exclude_from=not include_start,
include_to=include_end,
)

@staticmethod
def _proto_timestamp_to_pandas_time(
timestamp: timestamp_pb2.Timestamp,
) -> pd.Timestamp:
def _pandas_timestamp_to_proto(
timestamp: Optional[pd.Timestamp],
) -> Optional[timestamp_pb2.Timestamp]:
"""
Convert a pandas Timestamp to a protobuf Timestamp.
Note that second time resolution is used, and any fraction of a second is discarded.
If the input is None, the result is None.
"""
if timestamp is None:
return None
return timestamp_pb2.Timestamp(seconds=timestamp.value // 1000000000)

@staticmethod
def _proto_timestamp_to_pandas_time(timestamp: timestamp_pb2.Timestamp) -> pd.Timestamp:
"""Convert a protobuf Timestamp to a pandas Timestamp."""
pts = pd.Timestamp(timestamp.ToJsonString())
return TimeSeriesApi._convert_utc(pts)
Expand Down
30 changes: 19 additions & 11 deletions exabel_data_sdk/scripts/base_script.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,31 @@
import abc
import argparse
import os
from typing import Sequence
from typing import Callable, Sequence

from exabel_data_sdk import ExabelClient


class BaseScript(abc.ABC):
"""Base class for scripts using the Exabel Python SDK."""

def __init__(self, argv: Sequence[str], description: str):
def __init__(
self,
argv: Sequence[str],
description: str,
api_key_retriever: Callable[[argparse.Namespace], str] = None,
):
self.argv = argv
self.parser = argparse.ArgumentParser(description=description)
api_key = os.getenv("EXABEL_API_KEY")
help_text = "The API key to use"
if api_key:
help_text += " (found in EXABEL_API_KEY environment variable)"
else:
help_text += ". Can also be specified in the EXABEL_API_KEY environment variable."
self.parser.add_argument("--api-key", required=not api_key, type=str, help=help_text)
self.api_key_retriever = api_key_retriever
if api_key_retriever is None:
api_key = os.getenv("EXABEL_API_KEY")
help_text = "The API key to use"
if api_key:
help_text += " (found in EXABEL_API_KEY environment variable)"
else:
help_text += ". Can also be specified in the EXABEL_API_KEY environment variable."
self.parser.add_argument("--api-key", required=not api_key, type=str, help=help_text)
self.parser.add_argument(
"--exabel-api-host",
required=False,
Expand All @@ -36,9 +43,10 @@ def __init__(self, argv: Sequence[str], description: str):
def run(self) -> None:
"""Runs the script."""
args = self.parse_arguments()
client = ExabelClient(
host=args.exabel_api_host, api_key=args.api_key, use_json=args.use_json
api_key = (
self.api_key_retriever(args) if self.api_key_retriever is not None else args.api_key
)
client = ExabelClient(host=args.exabel_api_host, api_key=api_key, use_json=args.use_json)
self.run_script(client, args)

def parse_arguments(self) -> argparse.Namespace:
Expand Down
Loading

0 comments on commit f498349

Please sign in to comment.