From f498349aeed1fcf45adf9f52e044ac3dd8a0f818 Mon Sep 17 00:00:00 2001 From: Aksel Stokseth <36400733+aksestok@users.noreply.github.com> Date: Fri, 19 Nov 2021 10:42:22 +0100 Subject: [PATCH] Publish v2.0.0 (#50) * add: Support for inserting time series points with known time for point in time support * add: Support for an entity mapping file to override resource name normalisation and identifier lookup against the Exabel API when using the CSV upload scripts * add: Option to override API key in the BaseScript * change: When using the CSV upload scripts, a point in time policy must be set * fix: Allow creating relationships on global read-only entities when using the CSV upload scripts * fix: Validate data points before uploading them to the Exabel API when using the CSV upload scripts --- VERSION | 2 +- exabel_data_sdk/client/api/time_series_api.py | 149 ++++++++++++++---- exabel_data_sdk/scripts/base_script.py | 30 ++-- .../scripts/csv_script_with_entity_mapping.py | 138 ++++++++++++++++ exabel_data_sdk/scripts/get_time_series.py | 16 +- .../scripts/load_relationships_from_csv.py | 20 ++- .../scripts/load_time_series_from_csv.py | 137 ++++++++++++++-- .../api/data/v1/time_series_messages_pb2.py | 32 ++-- .../api/data/v1/time_series_messages_pb2.pyi | 14 +- .../tests/client/api/test_time_series_api.py | 51 ++++++ .../tests/resources/data/entity_mapping.csv | 2 + .../tests/resources/data/entity_mapping.json | 5 + .../resources/data/entity_mapping_invalid.csv | 2 + .../data/entity_mapping_invalid_0.json | 3 + .../data/entity_mapping_invalid_1.json | 8 + .../time_series_with_invalid_data_points.csv | 9 ++ .../resources/data/timeseries_known_time.csv | 5 + .../test_csv_script_with_entity_mapping.py | 87 ++++++++++ .../scripts/test_load_time_series_from_csv.py | 112 +++++++++++-- .../util/test_resource_name_normalization.py | 92 +++++++++++ .../util/resource_name_normalization.py | 46 ++++-- 21 files changed, 853 insertions(+), 107 deletions(-) create mode 100644 exabel_data_sdk/scripts/csv_script_with_entity_mapping.py create mode 100644 exabel_data_sdk/tests/resources/data/entity_mapping.csv create mode 100644 exabel_data_sdk/tests/resources/data/entity_mapping.json create mode 100644 exabel_data_sdk/tests/resources/data/entity_mapping_invalid.csv create mode 100644 exabel_data_sdk/tests/resources/data/entity_mapping_invalid_0.json create mode 100644 exabel_data_sdk/tests/resources/data/entity_mapping_invalid_1.json create mode 100644 exabel_data_sdk/tests/resources/data/time_series_with_invalid_data_points.csv create mode 100644 exabel_data_sdk/tests/resources/data/timeseries_known_time.csv create mode 100644 exabel_data_sdk/tests/scripts/test_csv_script_with_entity_mapping.py diff --git a/VERSION b/VERSION index 3eefcb9..227cea2 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.0.0 +2.0.0 diff --git a/exabel_data_sdk/client/api/time_series_api.py b/exabel_data_sdk/client/api/time_series_api.py index 65de333..c695144 100644 --- a/exabel_data_sdk/client/api/time_series_api.py +++ b/exabel_data_sdk/client/api/time_series_api.py @@ -18,6 +18,7 @@ from exabel_data_sdk.stubs.exabel.api.data.v1.all_pb2 import ( BatchDeleteTimeSeriesPointsRequest, CreateTimeSeriesRequest, + DefaultKnownTime, DeleteTimeSeriesRequest, GetTimeSeriesRequest, ListTimeSeriesRequest, @@ -83,7 +84,11 @@ def get_entity_time_series( ) def get_time_series( - self, name: str, start: pd.Timestamp = None, end: pd.Timestamp = None + self, + name: str, + start: pd.Timestamp = None, + end: pd.Timestamp = None, + known_time: pd.Timestamp = None, ) -> Optional[pd.Series]: """ Get one time series. @@ -91,22 +96,31 @@ def get_time_series( If start and end are not specified, all data points will be returned. If start or end is specified, both must be specified. + If known_time is specified, the data will be returned as if it was requested at the given + time (in the past). Values inserted after this known time are disregarded. + If not set, the newest values are returned. + If time series does not exist, None is returned. Args: - name: The resource name of the requested time series, for example - "entityTypes/ns.type1/entities/ns.entity1/signals/ns.signal1" or - "signals/ns.signal1/entityTypes/ns.type1/entities/ns.entity1". - start: Start of the period to get data for. - end: End of the period to get data for. + name: The resource name of the requested time series, for example + "entityTypes/ns.type1/entities/ns.entity1/signals/ns.signal1" or + "signals/ns.signal1/entityTypes/ns.type1/entities/ns.entity1". + start: Start of the period to get data for. + end: End of the period to get data for. + known_time: The point-in-time at which to request the time series. """ - if bool(start) != bool(end): - raise ValueError("Either specify both 'start' and 'end' or none of them.") time_range = self._get_time_range(start, end) try: time_series = self.client.get_time_series( - GetTimeSeriesRequest(name=name, view=TimeSeriesView(time_range=time_range)), + GetTimeSeriesRequest( + name=name, + view=TimeSeriesView( + time_range=time_range, + known_time=TimeSeriesApi._pandas_timestamp_to_proto(known_time), + ), + ), ) except RequestError as error: if error.error_type == ErrorType.NOT_FOUND: @@ -115,7 +129,13 @@ def get_time_series( return self._time_series_points_to_series(time_series.points, time_series.name) - def create_time_series(self, name: str, series: pd.Series, create_tag: bool = False) -> None: + def create_time_series( + self, + name: str, + series: pd.Series, + create_tag: bool = False, + default_known_time: DefaultKnownTime = None, + ) -> None: """ Create a time series. @@ -135,16 +155,28 @@ def create_time_series(self, name: str, series: pd.Series, create_tag: bool = Fa create_tag: Set to true to create a tag for every entity type a signal has time series for. If a tag already exists, it will be updated when time series are created (or deleted) regardless of the value of this flag. + default_known_time: + Specify a default known time policy. This is used to determine + the Known Time for data points where a specific known time timestamp + has not been given. If not provided, the Exabel API defaults to the + current time (upload time) as the Known Time. """ time_series_points = self._series_to_time_series_points(series) self.client.create_time_series( CreateTimeSeriesRequest( time_series=ProtoTimeSeries(name=name, points=time_series_points), create_tag=create_tag, + default_known_time=default_known_time, ), ) - def upsert_time_series(self, name: str, series: pd.Series, create_tag: bool = False) -> bool: + def upsert_time_series( + self, + name: str, + series: pd.Series, + create_tag: bool = False, + default_known_time: DefaultKnownTime = None, + ) -> bool: """ Create or update a time series. @@ -160,6 +192,11 @@ def upsert_time_series(self, name: str, series: pd.Series, create_tag: bool = Fa create_tag: Set to true to create a tag for every entity type a signal has time series for. If a tag already exists, it will be updated when time series are created (or deleted) regardless of the value of this flag. + default_known_time: + Specify a default known time policy. This is used to determine + the Known Time for data points where a specific known time timestamp + has not been given. If not provided, the Exabel API defaults to the + current time (upload time) as the Known Time. Returns: True if the time series already existed, or False if it is created @@ -167,11 +204,11 @@ def upsert_time_series(self, name: str, series: pd.Series, create_tag: bool = Fa try: # Optimistically assume that the time series exists, and append to it. # If it doesn't exist, we catch the error below and create the time series instead. - self.append_time_series_data(name, series) + self.append_time_series_data(name, series, default_known_time) return True except RequestError as error: if error.error_type == ErrorType.NOT_FOUND: - self.create_time_series(name, series, create_tag) + self.create_time_series(name, series, create_tag, default_known_time) return False raise @@ -189,7 +226,9 @@ def clear_time_series_data(self, name: str, start: pd.Timestamp, end: pd.Timesta BatchDeleteTimeSeriesPointsRequest(name=name, time_ranges=[time_range]), ) - def append_time_series_data(self, name: str, series: pd.Series) -> None: + def append_time_series_data( + self, name: str, series: pd.Series, default_known_time: DefaultKnownTime = None + ) -> None: """ Append data to the given time series. @@ -199,16 +238,24 @@ def append_time_series_data(self, name: str, series: pd.Series) -> None: Args: name: The resource name of the time series. series: Series with data to append. + default_known_time: + Specify a default known time policy. This is used to determine + the Known Time for data points where a specific known time timestamp + has not been given. If not provided, the Exabel API defaults to the + current time (upload time) as the Known Time. """ self.client.update_time_series( UpdateTimeSeriesRequest( time_series=ProtoTimeSeries( name=name, points=self._series_to_time_series_points(series) ), + default_known_time=default_known_time, ), ) - def append_time_series_data_and_return(self, name: str, series: pd.Series) -> pd.Series: + def append_time_series_data_and_return( + self, name: str, series: pd.Series, default_known_time: DefaultKnownTime = None + ) -> pd.Series: """ Append data to the given time series, and return the full series. @@ -218,6 +265,11 @@ def append_time_series_data_and_return(self, name: str, series: pd.Series) -> pd Args: name: The resource name of the time series. series: Series with data to append. + default_known_time: + Specify a default known time policy. This is used to determine + the Known Time for data points where a specific known time timestamp + has not been given. If not provided, the Exabel API defaults to the + current time (upload time) as the Known Time. Returns: A series with all data for the given time series. @@ -229,6 +281,7 @@ def append_time_series_data_and_return(self, name: str, series: pd.Series) -> pd name=name, points=self._series_to_time_series_points(series) ), view=TimeSeriesView(time_range=TimeRange()), + default_known_time=default_known_time, ), ) return self._time_series_points_to_series(time_series.points, time_series.name) @@ -259,6 +312,7 @@ def bulk_upsert_time_series( series: Sequence[pd.Series], create_tag: bool = False, threads: int = 40, + default_known_time: DefaultKnownTime = None, ) -> ResourceCreationResults[pd.Series]: """ Calls upsert_time_series for each of the provided time series, @@ -273,10 +327,17 @@ def bulk_upsert_time_series( series for. If a tag already exists, it will be updated when time series are created (or deleted) regardless of the value of this flag. threads: The number of parallel upload threads to use. + default_known_time: + Specify a default known time policy. This is used to determine + the Known Time for data points where a specific known time timestamp + has not been given. If not provided, the Exabel API defaults to the + current time (upload time) as the Known Time. """ def insert(ts: pd.Series) -> ResourceCreationStatus: - existed = self.upsert_time_series(str(ts.name), ts, create_tag=create_tag) + existed = self.upsert_time_series( + str(ts.name), ts, create_tag=create_tag, default_known_time=default_known_time + ) return ResourceCreationStatus.EXISTS if existed else ResourceCreationStatus.CREATED return bulk_insert(series, insert, threads=threads) @@ -285,13 +346,26 @@ def insert(ts: pd.Series) -> ResourceCreationStatus: def _series_to_time_series_points(series: pd.Series) -> Sequence[TimeSeriesPoint]: """Convert a pandas Series to a sequence of TimeSeriesPoint.""" points = [] - for timestamp, value in series.iteritems(): - proto_timestamp = timestamp_pb2.Timestamp() - if not timestamp.tz: - timestamp = timestamp.tz_localize(tz=tz.tzutc()) - proto_timestamp.FromJsonString(timestamp.isoformat()) - proto_value = DoubleValue(value=value) - points.append(TimeSeriesPoint(time=proto_timestamp, value=proto_value)) + for index, value in series.iteritems(): + if isinstance(index, tuple): + # (timestamp, known_time) + if len(index) != 2: + raise ValueError( + "A time series with a MultiIndex is expected to have exactly " + f"two elements: (timestamp, known_time), but got {index}" + ) + timestamp = index[0] + known_time = index[1] + else: + timestamp = index + known_time = None + points.append( + TimeSeriesPoint( + time=TimeSeriesApi._pandas_timestamp_to_proto(timestamp), + value=DoubleValue(value=value), + known_time=TimeSeriesApi._pandas_timestamp_to_proto(known_time), + ) + ) return points @staticmethod @@ -321,26 +395,33 @@ def _get_time_range( include_start: Whether to include the start timestamp. include_end: Whether to include the end timestamp. """ + if bool(start) != bool(end): + raise ValueError("Either specify both 'start' and 'end' or none of them.") if start is None: return TimeRange() - start_timestamp = timestamp_pb2.Timestamp() - start_timestamp.FromJsonString(TimeSeriesApi._convert_utc(start).isoformat()) - - end_timestamp = timestamp_pb2.Timestamp() - end_timestamp.FromJsonString(TimeSeriesApi._convert_utc(end).isoformat()) - return TimeRange( - from_time=start_timestamp, - to_time=end_timestamp, + from_time=TimeSeriesApi._pandas_timestamp_to_proto(start), + to_time=TimeSeriesApi._pandas_timestamp_to_proto(end), exclude_from=not include_start, include_to=include_end, ) @staticmethod - def _proto_timestamp_to_pandas_time( - timestamp: timestamp_pb2.Timestamp, - ) -> pd.Timestamp: + def _pandas_timestamp_to_proto( + timestamp: Optional[pd.Timestamp], + ) -> Optional[timestamp_pb2.Timestamp]: + """ + Convert a pandas Timestamp to a protobuf Timestamp. + Note that second time resolution is used, and any fraction of a second is discarded. + If the input is None, the result is None. + """ + if timestamp is None: + return None + return timestamp_pb2.Timestamp(seconds=timestamp.value // 1000000000) + + @staticmethod + def _proto_timestamp_to_pandas_time(timestamp: timestamp_pb2.Timestamp) -> pd.Timestamp: """Convert a protobuf Timestamp to a pandas Timestamp.""" pts = pd.Timestamp(timestamp.ToJsonString()) return TimeSeriesApi._convert_utc(pts) diff --git a/exabel_data_sdk/scripts/base_script.py b/exabel_data_sdk/scripts/base_script.py index ff1bc11..234380b 100644 --- a/exabel_data_sdk/scripts/base_script.py +++ b/exabel_data_sdk/scripts/base_script.py @@ -1,7 +1,7 @@ import abc import argparse import os -from typing import Sequence +from typing import Callable, Sequence from exabel_data_sdk import ExabelClient @@ -9,16 +9,23 @@ class BaseScript(abc.ABC): """Base class for scripts using the Exabel Python SDK.""" - def __init__(self, argv: Sequence[str], description: str): + def __init__( + self, + argv: Sequence[str], + description: str, + api_key_retriever: Callable[[argparse.Namespace], str] = None, + ): self.argv = argv self.parser = argparse.ArgumentParser(description=description) - api_key = os.getenv("EXABEL_API_KEY") - help_text = "The API key to use" - if api_key: - help_text += " (found in EXABEL_API_KEY environment variable)" - else: - help_text += ". Can also be specified in the EXABEL_API_KEY environment variable." - self.parser.add_argument("--api-key", required=not api_key, type=str, help=help_text) + self.api_key_retriever = api_key_retriever + if api_key_retriever is None: + api_key = os.getenv("EXABEL_API_KEY") + help_text = "The API key to use" + if api_key: + help_text += " (found in EXABEL_API_KEY environment variable)" + else: + help_text += ". Can also be specified in the EXABEL_API_KEY environment variable." + self.parser.add_argument("--api-key", required=not api_key, type=str, help=help_text) self.parser.add_argument( "--exabel-api-host", required=False, @@ -36,9 +43,10 @@ def __init__(self, argv: Sequence[str], description: str): def run(self) -> None: """Runs the script.""" args = self.parse_arguments() - client = ExabelClient( - host=args.exabel_api_host, api_key=args.api_key, use_json=args.use_json + api_key = ( + self.api_key_retriever(args) if self.api_key_retriever is not None else args.api_key ) + client = ExabelClient(host=args.exabel_api_host, api_key=api_key, use_json=args.use_json) self.run_script(client, args) def parse_arguments(self) -> argparse.Namespace: diff --git a/exabel_data_sdk/scripts/csv_script_with_entity_mapping.py b/exabel_data_sdk/scripts/csv_script_with_entity_mapping.py new file mode 100644 index 0000000..af7dc9c --- /dev/null +++ b/exabel_data_sdk/scripts/csv_script_with_entity_mapping.py @@ -0,0 +1,138 @@ +import argparse +import json +import sys +from typing import Mapping, Optional, Sequence + +import pandas as pd + +from exabel_data_sdk.scripts.csv_script import CsvScript + + +class CsvScriptWithEntityMapping(CsvScript): + """ + Base class for scripts that process a CSV files with data to be loaded into the Exabel API, + with the addition of also providing an option to override normalising or looking up given + identifiers in the Exabel API. + + The entity mapping file is either a CSV file with the following columns and headers: + : The identifier of the entity. + _entity: The full resource name of the entity to map this identifier to. + + E.g. + isin,isin_entity + US0000001,entityTypes/company/entities/company_1 + US0000002,entityTypes/company/entities/company_2 + ... + + Or, the entity mapping file is a JSON file with the following format: + { + : { + "identifier": "entity" + } + } + + E.g. + { + "isin": { + "US0000001": "entityTypes/company/entities/company_1", + "US0000002": "entityTypes/company/entities/company_2", + ... + } + } + """ + + def __init__(self, argv: Sequence[str], description: str): + super().__init__(argv, description) + help_text = ( + "The URL of the entity mapping file to use. " + "Supports *.json and *.csv extensions only." + ) + self.parser.add_argument( + "--entity_mapping_filename", required=False, type=str, help=help_text + ) + + def read_entity_mapping_file( + self, args: argparse.Namespace + ) -> Optional[Mapping[str, Mapping[str, str]]]: + """ + Read the entity mapping file from disk with the filename specified by command line + argument. Only supports *.json and *.csv file extensions. + """ + if args.entity_mapping_filename is None: + return None + if args.entity_mapping_filename.endswith(".json"): + with open(args.entity_mapping_filename, "r", encoding="utf-8") as f: + mappings = json.load(f) + # validate the mapping is a dictionary (and not a list) + if not isinstance(mappings, dict): + print( + "Expected entity mapping file to be a JSON key-value object, " + f"but got: {mappings}" + ) + sys.exit(1) + else: + for value in mappings.values(): + if not isinstance(value, dict): + print( + "Expected all values of the JSON object to be objects as well, " + f"but got: {value}" + ) + sys.exit(1) + # validate each sub-dictionary are string-string mappings + for mapping in mappings.values(): + for key, value in mapping.items(): + if not isinstance(key, str) or not isinstance(value, str): + print( + "Expected the key-value pairs in the entity mapping JSON file to be " + f"str-str mappings, but got:\n" + f"Key ({type(key)}): {key}\n" + f"Value ({type(value)}): {value}" + ) + sys.exit(1) + return mappings + + if args.entity_mapping_filename.endswith(".csv"): + csv_data_frame = pd.read_csv( + args.entity_mapping_filename, header=0, sep=args.sep, dtype="str" + ) + identifier_columns = [ + col for col in csv_data_frame.columns if not col.endswith("_entity") + ] + entity_columns = [col for col in csv_data_frame.columns if col.endswith("_entity")] + invalid_identifiers = [ + identifier + for identifier in identifier_columns + if f"{identifier}_entity" not in entity_columns + ] + if invalid_identifiers: + print( + "The entity mapping CSV file is missing one or more entity columns: " + f"{[identifier + '_entity' for identifier in invalid_identifiers]}" + ) + sys.exit(1) + invalid_entities = [ + entity + for entity in entity_columns + if entity[: -len("_entity")] not in identifier_columns + ] + if invalid_entities: + print( + "The entity mapping CSV file is missing one or more identifier columns: " + f"{[entity[:-len('_entity')] for entity in invalid_entities]}" + ) + sys.exit(1) + + mappings = {} + for identifier in identifier_columns: + mappings[identifier] = { + getattr(row, identifier): getattr(row, f"{identifier}_entity") + for row in csv_data_frame[~csv_data_frame[identifier].isna()].itertuples() + } + return mappings + + # If the file extension is not supported or set, exit. + print( + "Expected the entity mapping file to be a *.json or *.csv file, " + f"but got: '{args.entity_mapping_filename}'." + ) + sys.exit(1) diff --git a/exabel_data_sdk/scripts/get_time_series.py b/exabel_data_sdk/scripts/get_time_series.py index 9d66433..f056815 100644 --- a/exabel_data_sdk/scripts/get_time_series.py +++ b/exabel_data_sdk/scripts/get_time_series.py @@ -13,7 +13,8 @@ class GetTimeSeries(BaseScript): Gets a time series. """ - def __init__(self, argv: Sequence[str], description: str): + def __init__(self, argv: Sequence[str]): + description = "Gets a time series." super().__init__(argv, description) self.parser.add_argument( "--name", @@ -36,13 +37,22 @@ def __init__(self, argv: Sequence[str], description: str): type=str, help="The last date of the time series", ) + self.parser.add_argument( + "--known_time", + required=False, + type=str, + help="The point-in-time to retrieve the time series at", + ) def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None: start = pd.Timestamp(args.start) if args.start is not None else None end = pd.Timestamp(args.end) if args.end is not None else None - result = client.time_series_api.get_time_series(args.name, start=start, end=end) + known_time = pd.Timestamp(args.known_time) if args.known_time is not None else None + result = client.time_series_api.get_time_series( + args.name, start=start, end=end, known_time=known_time + ) print(result) if __name__ == "__main__": - GetTimeSeries(sys.argv, "Gets a time series.").run() + GetTimeSeries(sys.argv).run() diff --git a/exabel_data_sdk/scripts/load_relationships_from_csv.py b/exabel_data_sdk/scripts/load_relationships_from_csv.py index defd5b5..5440f1d 100644 --- a/exabel_data_sdk/scripts/load_relationships_from_csv.py +++ b/exabel_data_sdk/scripts/load_relationships_from_csv.py @@ -6,11 +6,11 @@ from exabel_data_sdk.client.api.bulk_insert import BulkInsertFailedError from exabel_data_sdk.client.api.data_classes.relationship import Relationship from exabel_data_sdk.client.api.data_classes.relationship_type import RelationshipType -from exabel_data_sdk.scripts.csv_script import CsvScript +from exabel_data_sdk.scripts.csv_script_with_entity_mapping import CsvScriptWithEntityMapping from exabel_data_sdk.util.resource_name_normalization import to_entity_resource_names -class LoadRelationshipsFromCsv(CsvScript): +class LoadRelationshipsFromCsv(CsvScriptWithEntityMapping): """ Processes a CSV file with relationships and creates them in the Exabel API. @@ -76,10 +76,7 @@ def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None: f"to {args.entity_to_column} from {args.filename}" ) - string_columns = { - args.entity_from_column, - args.entity_to_column, - } + string_columns = {args.entity_from_column, args.entity_to_column} if args.description_column: string_columns.add(args.description_column) @@ -100,11 +97,18 @@ def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None: for rel_type in client.relationship_api.list_relationship_types().results: print(" ", rel_type) + entity_mapping = self.read_entity_mapping_file(args) relationships_df[entity_from_col] = to_entity_resource_names( - client.entity_api, relationships_df[entity_from_col], namespace=args.namespace + client.entity_api, + relationships_df[entity_from_col], + namespace=args.namespace, + entity_mapping=entity_mapping, ) relationships_df[entity_to_col] = to_entity_resource_names( - client.entity_api, relationships_df[entity_to_col], namespace=args.namespace + client.entity_api, + relationships_df[entity_to_col], + namespace=args.namespace, + entity_mapping=entity_mapping, ) # Drop rows where either the from or to entity is missing diff --git a/exabel_data_sdk/scripts/load_time_series_from_csv.py b/exabel_data_sdk/scripts/load_time_series_from_csv.py index b0e78ed..8c14784 100644 --- a/exabel_data_sdk/scripts/load_time_series_from_csv.py +++ b/exabel_data_sdk/scripts/load_time_series_from_csv.py @@ -5,18 +5,21 @@ import pandas as pd from dateutil import tz +from google.protobuf.duration_pb2 import Duration +from pandas.api.types import is_numeric_dtype from exabel_data_sdk import ExabelClient from exabel_data_sdk.client.api.bulk_insert import BulkInsertFailedError from exabel_data_sdk.client.api.data_classes.signal import Signal -from exabel_data_sdk.scripts.csv_script import CsvScript +from exabel_data_sdk.scripts.csv_script_with_entity_mapping import CsvScriptWithEntityMapping +from exabel_data_sdk.stubs.exabel.api.data.v1.all_pb2 import DefaultKnownTime from exabel_data_sdk.util.resource_name_normalization import ( to_entity_resource_names, validate_signal_name, ) -class LoadTimeSeriesFromCsv(CsvScript): +class LoadTimeSeriesFromCsv(CsvScriptWithEntityMapping): """ Processes a timeseries CSV file and uploads the time series to Exabel. @@ -35,7 +38,8 @@ class LoadTimeSeriesFromCsv(CsvScript): The rows do not have to be sorted in any particular order. """ - def __init__(self, argv: Sequence[str], description: str): + def __init__(self, argv: Sequence[str]): + description = "Upload timeseries file." super().__init__(argv, description) self.parser.add_argument( "--create_missing_signals", @@ -44,12 +48,50 @@ def __init__(self, argv: Sequence[str], description: str): default=False, help="Automatically create signals that are not already present in the API.", ) + self.parser.add_argument( + "--pit_current_time", + required=False, + action="store_true", + default=False, + help="Set the Known-Time of the uploaded data to be " + "the time at which it is inserted into the Exabel system.", + ) + self.parser.add_argument( + "--pit_offset", + required=False, + type=int, + choices=range(31), + metavar="[0-30]", + help="Set the Known-Time of the uploaded data to be the timestamp of each data point, " + "plus the specified number of days as an offset. For instance, if the data is " + "available to the user the day after, one would set --pit_offset 1", + ) + + @staticmethod + def set_time_index(ts_data: pd.DataFrame) -> None: + """ + Creates a new index for the given data frame. + There must be a 'date' column, which will be used as (the first level of) the new index. + If there is a 'known_time' column, then the new index will be a MultiIndex with two levels, + where the first level is 'date' and the second level is 'known_time'. + If not, the new index will be a DatetimeIndex. + The 'date' column is removed from the DataFrame, and so is the 'known_time' column, + if present. + """ + date_index = pd.DatetimeIndex(ts_data.date, tz=tz.tzutc()) + date_index.name = None + if "known_time" in ts_data.columns: + known_time_index = pd.DatetimeIndex(ts_data.known_time, tz=tz.tzutc()) + known_time_index.name = None + ts_data.set_index([date_index, known_time_index], inplace=True) + ts_data.drop(columns=["date", "known_time"], inplace=True) + else: + ts_data.set_index(date_index, inplace=True) + ts_data.drop(columns="date", inplace=True) def get_time_series(self, ts_data: pd.DataFrame, prefix: str) -> Sequence[pd.Series]: """Extract all the time series from the given data frame.""" - signals = ts_data.columns[2:] - ts_data.index = pd.DatetimeIndex(ts_data.date, tz=tz.tzutc()) - ts_data.index.name = None + signals = ts_data.columns[1:] series = [] for entity, group in ts_data.groupby("entity"): for signal in signals: @@ -65,18 +107,82 @@ def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None: if args.dry_run: print("Running dry-run...") - ts_data = self.read_csv(args, string_columns=[0]) + default_known_time = None + if args.pit_current_time: + default_known_time = DefaultKnownTime(current_time=True) + if args.pit_offset is not None: + if default_known_time: + print("Cannot specify both pit_current_time and pit_offset, it is one or the other") + sys.exit(1) + time_offset = Duration(seconds=86400 * args.pit_offset) + default_known_time = DefaultKnownTime(time_offset=time_offset) - ts_data.iloc[:, 0] = to_entity_resource_names( - client.entity_api, ts_data.iloc[:, 0], namespace=args.namespace - ) - ts_data.rename(columns={0: "entity"}, inplace=True) + ts_data = self.read_csv(args, string_columns=[0]) + entity_mapping = self.read_entity_mapping_file(args) if ts_data.columns[1] != "date": - print("Expected first column to be named 'date', got", ts_data.columns[1]) + print("Expected second column to be named 'date', got", ts_data.columns[1]) # signals to produce from this csv file signals = list(ts_data.columns[2:]) + if "known_time" in ts_data.columns: + if args.pit_current_time: + print( + "Specified pit_current_time on the command line, but file contains known_time" + " column.\nEither drop the pit_current_time command line argument, or" + " remove the known_time column from the file." + ) + sys.exit(1) + if args.pit_offset: + print( + "Specified pit_offset on the command line, but file contains known_time" + " column.\nEither drop the pit_offset command line argument, or" + " remove the known_time column from the file." + ) + sys.exit(1) + # This column shall not be loaded as a signal + signals.remove("known_time") + else: + if default_known_time is None: + print("The Known-Time of the data must be specified.") + print( + "Please add a column called known_time in the input file, or specify a " + "default policy with the pit_current_time or pit_offset command line " + "arguments." + ) + sys.exit(1) + + ts_data.iloc[:, 0] = to_entity_resource_names( + client.entity_api, + ts_data.iloc[:, 0], + namespace=args.namespace, + entity_mapping=entity_mapping, + ) + ts_data.rename(columns={ts_data.columns[0]: "entity"}, inplace=True) + + # validate all data points are numeric + columns_with_invalid_data_points = {} + for col in signals: + if not is_numeric_dtype(ts_data[col]): + examples = { + index: ts_data[col][index] + for index in ts_data[col][~ts_data[col].str.isnumeric()][:5].index + } + columns_with_invalid_data_points[col] = examples + + if columns_with_invalid_data_points: + print( + "Signal column(s) contain non-numeric values. Please ensure all values " + "can be parsed to numeric values." + ) + print("Columns with non-numeric values (with up to 5 examples):") + for col, examples in columns_with_invalid_data_points.items(): + pretty_examples = ", ".join( + f"'{value}' at index {index}" for index, value in examples.items() + ) + print(f" {col}: {pretty_examples}") + sys.exit(1) + print("Loading signals", ", ".join(str(s) for s in signals), "...") # validate signal names @@ -131,8 +237,7 @@ def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None: print("Aborting script. Please create the missing signals, and try again.") sys.exit(1) - ts_data.columns = ["entity", "date"] + signals - + LoadTimeSeriesFromCsv.set_time_index(ts_data) series = self.get_time_series(ts_data, prefix) if args.dry_run: @@ -143,7 +248,7 @@ def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None: try: client.time_series_api.bulk_upsert_time_series( - series, create_tag=True, threads=args.threads + series, create_tag=True, threads=args.threads, default_known_time=default_known_time ) except BulkInsertFailedError: # An error summary has already been printed. @@ -151,4 +256,4 @@ def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None: if __name__ == "__main__": - LoadTimeSeriesFromCsv(sys.argv, "Upload timeseries file.").run() + LoadTimeSeriesFromCsv(sys.argv).run() diff --git a/exabel_data_sdk/stubs/exabel/api/data/v1/time_series_messages_pb2.py b/exabel_data_sdk/stubs/exabel/api/data/v1/time_series_messages_pb2.py index 013c3b4..f2de40e 100644 --- a/exabel_data_sdk/stubs/exabel/api/data/v1/time_series_messages_pb2.py +++ b/exabel_data_sdk/stubs/exabel/api/data/v1/time_series_messages_pb2.py @@ -12,6 +12,7 @@ from exabel_data_sdk.stubs.exabel.api.time import time_range_pb2 as exabel_dot_api_dot_time_dot_time__range__pb2 +from google.protobuf import duration_pb2 as google_dot_protobuf_dot_duration__pb2 from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 from google.protobuf import wrappers_pb2 as google_dot_protobuf_dot_wrappers__pb2 @@ -22,9 +23,9 @@ syntax='proto3', serialized_options=b'\n\026com.exabel.api.data.v1B\027TimeSeriesMessagesProtoP\001', create_key=_descriptor._internal_create_key, - serialized_pb=b'\n-exabel/api/data/v1/time_series_messages.proto\x12\x12\x65xabel.api.data.v1\x1a exabel/api/time/time_range.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1egoogle/protobuf/wrappers.proto\"b\n\nTimeSeries\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x33\n\x06points\x18\x02 \x03(\x0b\x32#.exabel.api.data.v1.TimeSeriesPoint\x12\x11\n\tread_only\x18\x03 \x01(\x08\"\x98\x01\n\x0fTimeSeriesPoint\x12(\n\x04time\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12+\n\x05value\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.DoubleValue\x12.\n\nknown_time\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"p\n\x0eTimeSeriesView\x12.\n\ntime_range\x18\x01 \x01(\x0b\x32\x1a.exabel.api.time.TimeRange\x12.\n\nknown_time\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"m\n\x10\x44\x65\x66\x61ultKnownTime\x12\x16\n\x0c\x63urrent_time\x18\x01 \x01(\x08H\x00\x12\x30\n\nknown_time\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.TimestampH\x00\x42\x0f\n\rspecificationB3\n\x16\x63om.exabel.api.data.v1B\x17TimeSeriesMessagesProtoP\x01\x62\x06proto3' + serialized_pb=b'\n-exabel/api/data/v1/time_series_messages.proto\x12\x12\x65xabel.api.data.v1\x1a exabel/api/time/time_range.proto\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1egoogle/protobuf/wrappers.proto\"b\n\nTimeSeries\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x33\n\x06points\x18\x02 \x03(\x0b\x32#.exabel.api.data.v1.TimeSeriesPoint\x12\x11\n\tread_only\x18\x03 \x01(\x08\"\x98\x01\n\x0fTimeSeriesPoint\x12(\n\x04time\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12+\n\x05value\x18\x02 \x01(\x0b\x32\x1c.google.protobuf.DoubleValue\x12.\n\nknown_time\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"p\n\x0eTimeSeriesView\x12.\n\ntime_range\x18\x01 \x01(\x0b\x32\x1a.exabel.api.time.TimeRange\x12.\n\nknown_time\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"\x9f\x01\n\x10\x44\x65\x66\x61ultKnownTime\x12\x16\n\x0c\x63urrent_time\x18\x01 \x01(\x08H\x00\x12\x30\n\nknown_time\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.TimestampH\x00\x12\x30\n\x0btime_offset\x18\x03 \x01(\x0b\x32\x19.google.protobuf.DurationH\x00\x42\x0f\n\rspecificationB3\n\x16\x63om.exabel.api.data.v1B\x17TimeSeriesMessagesProtoP\x01\x62\x06proto3' , - dependencies=[exabel_dot_api_dot_time_dot_time__range__pb2.DESCRIPTOR,google_dot_protobuf_dot_timestamp__pb2.DESCRIPTOR,google_dot_protobuf_dot_wrappers__pb2.DESCRIPTOR,]) + dependencies=[exabel_dot_api_dot_time_dot_time__range__pb2.DESCRIPTOR,google_dot_protobuf_dot_duration__pb2.DESCRIPTOR,google_dot_protobuf_dot_timestamp__pb2.DESCRIPTOR,google_dot_protobuf_dot_wrappers__pb2.DESCRIPTOR,]) @@ -70,8 +71,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=168, - serialized_end=266, + serialized_start=200, + serialized_end=298, ) @@ -116,8 +117,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=269, - serialized_end=421, + serialized_start=301, + serialized_end=453, ) @@ -155,8 +156,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=423, - serialized_end=535, + serialized_start=455, + serialized_end=567, ) @@ -182,6 +183,13 @@ message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='time_offset', full_name='exabel.api.data.v1.DefaultKnownTime.time_offset', index=2, + number=3, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), ], extensions=[ ], @@ -199,8 +207,8 @@ create_key=_descriptor._internal_create_key, fields=[]), ], - serialized_start=537, - serialized_end=646, + serialized_start=570, + serialized_end=729, ) _TIMESERIES.fields_by_name['points'].message_type = _TIMESERIESPOINT @@ -210,12 +218,16 @@ _TIMESERIESVIEW.fields_by_name['time_range'].message_type = exabel_dot_api_dot_time_dot_time__range__pb2._TIMERANGE _TIMESERIESVIEW.fields_by_name['known_time'].message_type = google_dot_protobuf_dot_timestamp__pb2._TIMESTAMP _DEFAULTKNOWNTIME.fields_by_name['known_time'].message_type = google_dot_protobuf_dot_timestamp__pb2._TIMESTAMP +_DEFAULTKNOWNTIME.fields_by_name['time_offset'].message_type = google_dot_protobuf_dot_duration__pb2._DURATION _DEFAULTKNOWNTIME.oneofs_by_name['specification'].fields.append( _DEFAULTKNOWNTIME.fields_by_name['current_time']) _DEFAULTKNOWNTIME.fields_by_name['current_time'].containing_oneof = _DEFAULTKNOWNTIME.oneofs_by_name['specification'] _DEFAULTKNOWNTIME.oneofs_by_name['specification'].fields.append( _DEFAULTKNOWNTIME.fields_by_name['known_time']) _DEFAULTKNOWNTIME.fields_by_name['known_time'].containing_oneof = _DEFAULTKNOWNTIME.oneofs_by_name['specification'] +_DEFAULTKNOWNTIME.oneofs_by_name['specification'].fields.append( + _DEFAULTKNOWNTIME.fields_by_name['time_offset']) +_DEFAULTKNOWNTIME.fields_by_name['time_offset'].containing_oneof = _DEFAULTKNOWNTIME.oneofs_by_name['specification'] DESCRIPTOR.message_types_by_name['TimeSeries'] = _TIMESERIES DESCRIPTOR.message_types_by_name['TimeSeriesPoint'] = _TIMESERIESPOINT DESCRIPTOR.message_types_by_name['TimeSeriesView'] = _TIMESERIESVIEW diff --git a/exabel_data_sdk/stubs/exabel/api/data/v1/time_series_messages_pb2.pyi b/exabel_data_sdk/stubs/exabel/api/data/v1/time_series_messages_pb2.pyi index 05da7da..d6e611b 100644 --- a/exabel_data_sdk/stubs/exabel/api/data/v1/time_series_messages_pb2.pyi +++ b/exabel_data_sdk/stubs/exabel/api/data/v1/time_series_messages_pb2.pyi @@ -11,6 +11,10 @@ from google.protobuf.descriptor import ( FileDescriptor as google___protobuf___descriptor___FileDescriptor, ) +from google.protobuf.duration_pb2 import ( + Duration as google___protobuf___duration_pb2___Duration, +) + from google.protobuf.internal.containers import ( RepeatedCompositeFieldContainer as google___protobuf___internal___containers___RepeatedCompositeFieldContainer, ) @@ -110,12 +114,16 @@ class DefaultKnownTime(google___protobuf___message___Message): @property def known_time(self) -> google___protobuf___timestamp_pb2___Timestamp: ... + @property + def time_offset(self) -> google___protobuf___duration_pb2___Duration: ... + def __init__(self, *, current_time : typing___Optional[builtin___bool] = None, known_time : typing___Optional[google___protobuf___timestamp_pb2___Timestamp] = None, + time_offset : typing___Optional[google___protobuf___duration_pb2___Duration] = None, ) -> None: ... - def HasField(self, field_name: typing_extensions___Literal[u"current_time",b"current_time",u"known_time",b"known_time",u"specification",b"specification"]) -> builtin___bool: ... - def ClearField(self, field_name: typing_extensions___Literal[u"current_time",b"current_time",u"known_time",b"known_time",u"specification",b"specification"]) -> None: ... - def WhichOneof(self, oneof_group: typing_extensions___Literal[u"specification",b"specification"]) -> typing_extensions___Literal["current_time","known_time"]: ... + def HasField(self, field_name: typing_extensions___Literal[u"current_time",b"current_time",u"known_time",b"known_time",u"specification",b"specification",u"time_offset",b"time_offset"]) -> builtin___bool: ... + def ClearField(self, field_name: typing_extensions___Literal[u"current_time",b"current_time",u"known_time",b"known_time",u"specification",b"specification",u"time_offset",b"time_offset"]) -> None: ... + def WhichOneof(self, oneof_group: typing_extensions___Literal[u"specification",b"specification"]) -> typing_extensions___Literal["current_time","known_time","time_offset"]: ... type___DefaultKnownTime = DefaultKnownTime diff --git a/exabel_data_sdk/tests/client/api/test_time_series_api.py b/exabel_data_sdk/tests/client/api/test_time_series_api.py index 4ef47e6..929a75b 100644 --- a/exabel_data_sdk/tests/client/api/test_time_series_api.py +++ b/exabel_data_sdk/tests/client/api/test_time_series_api.py @@ -2,10 +2,13 @@ import pandas as pd from dateutil import tz +from google.protobuf import timestamp_pb2 +from google.protobuf.wrappers_pb2 import DoubleValue from exabel_data_sdk.client.api.time_series_api import TimeSeriesApi # pylint: disable=protected-access +from exabel_data_sdk.stubs.exabel.api.data.v1.time_series_messages_pb2 import TimeSeriesPoint class TestTimeSeriesApi(unittest.TestCase): @@ -22,6 +25,35 @@ def test_time_series_conversion(self): ), ) + def test_time_series_conversion_known_time(self): + index = pd.MultiIndex.from_arrays( + [ + pd.DatetimeIndex(["2021-01-01", "2021-01-02"], tz=tz.tzutc()), + pd.DatetimeIndex(["2021-01-01", "2021-01-05"], tz=tz.tzutc()), + ] + ) + series = pd.Series([1.0, 2.0], index=index) + points = TimeSeriesApi._series_to_time_series_points(series) + pd.testing.assert_series_equal( + series.droplevel(1), + TimeSeriesApi._time_series_points_to_series(points), + ) + base_time = pd.Timestamp("2021-01-01").value // 1000000000 + self.assertEqual(1609459200, base_time) + expected_points = [ + TimeSeriesPoint( + time=timestamp_pb2.Timestamp(seconds=base_time), + value=DoubleValue(value=1.0), + known_time=timestamp_pb2.Timestamp(seconds=base_time), + ), + TimeSeriesPoint( + time=timestamp_pb2.Timestamp(seconds=base_time + 86400), + value=DoubleValue(value=2.0), + known_time=timestamp_pb2.Timestamp(seconds=base_time + 86400 * 4), + ), + ] + self.assertSequenceEqual(expected_points, points) + def test_time_series_conversion_without_time_zone(self): series = pd.Series( [1.0, 2.0, 3.0], @@ -38,3 +70,22 @@ def test_time_series_conversion_without_time_zone(self): TimeSeriesApi._series_to_time_series_points(series) ), ) + + def test_time_series_conversion_with_different_time_zone(self): + series = pd.Series( + [1.0, 2.0, 3.0], + index=pd.DatetimeIndex(["2019-01-01", "2019-02-01", "2019-03-01"], tz="US/Eastern"), + ) + expected = pd.Series( + [1.0, 2.0, 3.0], + index=pd.DatetimeIndex( + ["2019-01-01 05:00:00", "2019-02-01 05:00:00", "2019-03-01 05:00:00"], tz=tz.tzutc() + ), + ) + + pd.testing.assert_series_equal( + expected, + TimeSeriesApi._time_series_points_to_series( + TimeSeriesApi._series_to_time_series_points(series) + ), + ) diff --git a/exabel_data_sdk/tests/resources/data/entity_mapping.csv b/exabel_data_sdk/tests/resources/data/entity_mapping.csv new file mode 100644 index 0000000..77c841f --- /dev/null +++ b/exabel_data_sdk/tests/resources/data/entity_mapping.csv @@ -0,0 +1,2 @@ +isin,isin_entity +do_not_search_for,entityTypes/company/entities/was_not_searched_for diff --git a/exabel_data_sdk/tests/resources/data/entity_mapping.json b/exabel_data_sdk/tests/resources/data/entity_mapping.json new file mode 100644 index 0000000..5151ef8 --- /dev/null +++ b/exabel_data_sdk/tests/resources/data/entity_mapping.json @@ -0,0 +1,5 @@ +{ + "isin": { + "do_not_search_for": "entityTypes/company/entities/was_not_searched_for" + } +} diff --git a/exabel_data_sdk/tests/resources/data/entity_mapping_invalid.csv b/exabel_data_sdk/tests/resources/data/entity_mapping_invalid.csv new file mode 100644 index 0000000..95ee132 --- /dev/null +++ b/exabel_data_sdk/tests/resources/data/entity_mapping_invalid.csv @@ -0,0 +1,2 @@ +extra_col,identifier,identifier_entity +unused,unused,unused diff --git a/exabel_data_sdk/tests/resources/data/entity_mapping_invalid_0.json b/exabel_data_sdk/tests/resources/data/entity_mapping_invalid_0.json new file mode 100644 index 0000000..bdf9993 --- /dev/null +++ b/exabel_data_sdk/tests/resources/data/entity_mapping_invalid_0.json @@ -0,0 +1,3 @@ +{ + "do_not_search_for": ["entityTypes/company/entities/was_not_searched_for"] +} diff --git a/exabel_data_sdk/tests/resources/data/entity_mapping_invalid_1.json b/exabel_data_sdk/tests/resources/data/entity_mapping_invalid_1.json new file mode 100644 index 0000000..8b8a9dd --- /dev/null +++ b/exabel_data_sdk/tests/resources/data/entity_mapping_invalid_1.json @@ -0,0 +1,8 @@ +[ + { + "do_not_search_for_0": "entityTypes/company/entities/was_not_searched_for_0" + }, + { + "do_not_search_for_1": "entityTypes/company/entities/was_not_searched_for_1" + } +] diff --git a/exabel_data_sdk/tests/resources/data/time_series_with_invalid_data_points.csv b/exabel_data_sdk/tests/resources/data/time_series_with_invalid_data_points.csv new file mode 100644 index 0000000..e99a3ef --- /dev/null +++ b/exabel_data_sdk/tests/resources/data/time_series_with_invalid_data_points.csv @@ -0,0 +1,9 @@ +brand;date;signal_1;signal_2;signal_3 +apple;2021-01-01;$1;$1;1 +apple;2021-01-02;$2;2;2 +apple;2021-01-03;$3;3;3 +apple;2021-01-04;$4;4;4 +apple;2021-01-05;$5;5;5 +apple;2021-01-06;$6;6;6 +apple;2021-01-07;$7;7;7 +apple;2021-01-08;$8;8;8 diff --git a/exabel_data_sdk/tests/resources/data/timeseries_known_time.csv b/exabel_data_sdk/tests/resources/data/timeseries_known_time.csv new file mode 100644 index 0000000..b6804fd --- /dev/null +++ b/exabel_data_sdk/tests/resources/data/timeseries_known_time.csv @@ -0,0 +1,5 @@ +entity;date;known_time;signal1;signal2 +entityTypes/company/entities/company_A;2021-01-01;2021-01-01;1;10 +entityTypes/company/entities/company_A;2021-01-02;2021-01-05;2;20 +entityTypes/company/entities/company_B;2021-01-01;2021-01-10;4;40 +entityTypes/company/entities/company_B;2021-01-03;2019-12-31;5;50 diff --git a/exabel_data_sdk/tests/scripts/test_csv_script_with_entity_mapping.py b/exabel_data_sdk/tests/scripts/test_csv_script_with_entity_mapping.py new file mode 100644 index 0000000..6869b38 --- /dev/null +++ b/exabel_data_sdk/tests/scripts/test_csv_script_with_entity_mapping.py @@ -0,0 +1,87 @@ +import argparse +import unittest + +from exabel_data_sdk import ExabelClient +from exabel_data_sdk.scripts.csv_script_with_entity_mapping import CsvScriptWithEntityMapping + +common_args = [ + "script-name", + "--namespace", + "test", + "--api-key", + "123", + "--filename", + "./this_file_does_not_exist", +] + + +class ConcreteCsvScriptWithEntitySearch(CsvScriptWithEntityMapping): + """Dummy implementation used for testing""" + + def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None: + raise NotImplementedError + + +class TestCsvScriptWithEntitySearch(unittest.TestCase): + def test_read_entity_mapping_file_json(self): + args = common_args + [ + "--entity_mapping_filename", + "./exabel_data_sdk/tests/resources/data/entity_mapping.json", + ] + + loader = ConcreteCsvScriptWithEntitySearch(args, "Load") + expected_entity_mapping = { + "isin": {"do_not_search_for": "entityTypes/company/entities/was_not_searched_for"} + } + self.assertDictEqual( + loader.read_entity_mapping_file(loader.parse_arguments()), expected_entity_mapping + ) + + def test_read_entity_mapping_file_csv(self): + args = common_args + [ + "--entity_mapping_filename", + "./exabel_data_sdk/tests/resources/data/entity_mapping.csv", + ] + + loader = ConcreteCsvScriptWithEntitySearch(args, "Load") + expected_entity_mapping = { + "isin": {"do_not_search_for": "entityTypes/company/entities/was_not_searched_for"} + } + self.assertDictEqual( + loader.read_entity_mapping_file(loader.parse_arguments()), expected_entity_mapping + ) + + def test_should_fail_read_entity_mapping_file_invalid_csv(self): + args = common_args + [ + "--entity_mapping_filename", + "./exabel_data_sdk/tests/resources/data/entity_mapping_invalid.csv", + ] + + loader = ConcreteCsvScriptWithEntitySearch(args, "Load") + with self.assertRaises(SystemExit): + loader.read_entity_mapping_file(loader.parse_arguments()) + + def test_should_fail_read_entity_mapping_file_invalid_json(self): + files = [ + "./exabel_data_sdk/tests/resources/data/entity_mapping_invalid_0.json", + "./exabel_data_sdk/tests/resources/data/entity_mapping_invalid_1.json", + ] + + for file in files: + args = common_args + ["--entity_mapping_filename", file] + loader = ConcreteCsvScriptWithEntitySearch(args, "Load") + with self.assertRaises(SystemExit): + loader.read_entity_mapping_file(loader.parse_arguments()) + + def test_should_fail_read_entity_mapping_file_invalid_extension(self): + files = [ + "./file/does/not/exist/entity_mapping", + "./file/does/not/exist/entity_mapping.txt", + "./file/does/not/exist/entity_mapping.xlsx", + ] + + for file in files: + args = common_args + ["--entity_mapping_filename", file] + loader = ConcreteCsvScriptWithEntitySearch(args, "Load") + with self.assertRaises(SystemExit): + loader.read_entity_mapping_file(loader.parse_arguments()) diff --git a/exabel_data_sdk/tests/scripts/test_load_time_series_from_csv.py b/exabel_data_sdk/tests/scripts/test_load_time_series_from_csv.py index 0e2ea15..a364075 100644 --- a/exabel_data_sdk/tests/scripts/test_load_time_series_from_csv.py +++ b/exabel_data_sdk/tests/scripts/test_load_time_series_from_csv.py @@ -14,12 +14,12 @@ class TestUploadTimeSeries(unittest.TestCase): def test_one_signal(self): - loader = LoadTimeSeriesFromCsv(sys.argv, "Load") + loader = LoadTimeSeriesFromCsv(sys.argv) data = [["a", "2021-01-01", 1], ["a", "2021-01-02", 2], ["b", "2021-01-01", 3]] - data_frame = pd.DataFrame(data, columns=["entity", "date", "signal1"]) - - time_series = loader.get_time_series(data_frame, "signals/acme.") + ts_data = pd.DataFrame(data, columns=["entity", "date", "signal1"]) + LoadTimeSeriesFromCsv.set_time_index(ts_data) + time_series = loader.get_time_series(ts_data, "signals/acme.") pd.testing.assert_series_equal( pd.Series( [1, 2], @@ -38,16 +38,17 @@ def test_one_signal(self): ) def test_two_signals(self): - loader = LoadTimeSeriesFromCsv(sys.argv, "Load") + loader = LoadTimeSeriesFromCsv(sys.argv) data = [ ["a", "2021-01-01", 1, 100], ["a", "2021-01-02", 2, 200], ["b", "2021-01-01", 3, 300], ] - data_frame = pd.DataFrame(data, columns=["entity", "date", "signal1", "signal2"]) + ts_data = pd.DataFrame(data, columns=["entity", "date", "signal1", "signal2"]) - time_series = loader.get_time_series(data_frame, "signals/acme.") + LoadTimeSeriesFromCsv.set_time_index(ts_data) + time_series = loader.get_time_series(ts_data, "signals/acme.") pd.testing.assert_series_equal( pd.Series( @@ -82,15 +83,28 @@ def test_two_signals(self): time_series[3], ) + def test_read_file_without_pit(self): + args = common_args + [ + "--filename", + "./exabel_data_sdk/tests/resources/data/timeseries.csv", + "--namespace", + "", + ] + script = LoadTimeSeriesFromCsv(args) + client = mock.create_autospec(ExabelClient(host="host", api_key="123")) + with self.assertRaises(SystemExit): + script.run_script(client, script.parse_arguments()) + def test_read_file_use_header_for_signal(self): args = common_args + [ "--filename", "./exabel_data_sdk/tests/resources/data/timeseries.csv", "--namespace", "", + "--pit_current_time", ] - script = LoadTimeSeriesFromCsv(args, "LoadTest1") + script = LoadTimeSeriesFromCsv(args) client = mock.create_autospec(ExabelClient(host="host", api_key="123")) script.run_script(client, script.parse_arguments()) @@ -124,8 +138,10 @@ def test_read_file_with_multiple_signals(self): "./exabel_data_sdk/tests/resources/data/timeseries_multiple_signals.csv", "--namespace", "acme", + "--pit_offset", + "0", ] - script = LoadTimeSeriesFromCsv(args, "LoadTest3") + script = LoadTimeSeriesFromCsv(args) client = mock.create_autospec(ExabelClient(host="host", api_key="123")) script.run_script(client, script.parse_arguments()) @@ -166,15 +182,78 @@ def test_read_file_with_multiple_signals(self): series[3], ) + def test_read_file_with_known_time(self): + args = common_args + [ + "--filename", + "./exabel_data_sdk/tests/resources/data/timeseries_known_time.csv", + "--namespace", + "acme", + ] + script = LoadTimeSeriesFromCsv(args) + client = mock.create_autospec(ExabelClient(host="host", api_key="123")) + script.run_script(client, script.parse_arguments()) + + call_args_list = client.time_series_api.bulk_upsert_time_series.call_args_list + self.assertEqual(1, len(call_args_list)) + series = call_args_list[0][0][0] + self.assertEqual(4, len(series)) + + index_A = pd.MultiIndex.from_arrays( + [ + pd.DatetimeIndex(["2021-01-01", "2021-01-02"], tz=tz.tzutc()), + pd.DatetimeIndex(["2021-01-01", "2021-01-05"], tz=tz.tzutc()), + ] + ) + index_B = pd.MultiIndex.from_arrays( + [ + pd.DatetimeIndex(["2021-01-01", "2021-01-03"], tz=tz.tzutc()), + pd.DatetimeIndex(["2021-01-10", "2019-12-31"], tz=tz.tzutc()), + ] + ) + pd.testing.assert_series_equal( + pd.Series( + [1, 2], + index_A, + name="entityTypes/company/entities/company_A/signals/acme.signal1", + ), + series[0], + ) + pd.testing.assert_series_equal( + pd.Series( + [10, 20], + index_A, + name="entityTypes/company/entities/company_A/signals/acme.signal2", + ), + series[1], + ) + pd.testing.assert_series_equal( + pd.Series( + [4, 5], + index_B, + name="entityTypes/company/entities/company_B/signals/acme.signal1", + ), + series[2], + ) + pd.testing.assert_series_equal( + pd.Series( + [40, 50], + index_B, + name="entityTypes/company/entities/company_B/signals/acme.signal2", + ), + series[3], + ) + def test_read_file_with_integer_identifiers(self): args = common_args + [ "--filename", "./exabel_data_sdk/tests/resources/data/timeseries_with_integer_identifiers.csv", "--namespace", "acme", + "--pit_offset", + "30", ] - script = LoadTimeSeriesFromCsv(args, "LoadTest4") + script = LoadTimeSeriesFromCsv(args) client = mock.create_autospec(ExabelClient(host="host", api_key="123")) script.run_script(client, script.parse_arguments()) @@ -231,6 +310,19 @@ def test_valid_signal_names(self): for signal in valid_signals: validate_signal_name(signal) + def test_should_fail_with_invalid_data_points(self): + args = common_args + [ + "--filename", + "./exabel_data_sdk/tests/resources/data/time_series_with_invalid_data_points.csv", + "--namespace", + "acme", + ] + + script = LoadTimeSeriesFromCsv(args) + client = mock.create_autospec(ExabelClient(host="host", api_key="123")) + with self.assertRaises(SystemExit): + script.run_script(client, script.parse_arguments()) + if __name__ == "__main__": unittest.main() diff --git a/exabel_data_sdk/tests/util/test_resource_name_normalization.py b/exabel_data_sdk/tests/util/test_resource_name_normalization.py index 2479dc3..9f8c680 100644 --- a/exabel_data_sdk/tests/util/test_resource_name_normalization.py +++ b/exabel_data_sdk/tests/util/test_resource_name_normalization.py @@ -4,6 +4,7 @@ import pandas as pd from exabel_data_sdk.client.api.data_classes.entity import Entity +from exabel_data_sdk.client.api.data_classes.entity_type import EntityType from exabel_data_sdk.client.api.entity_api import EntityApi from exabel_data_sdk.client.client_config import ClientConfig from exabel_data_sdk.util.resource_name_normalization import ( @@ -37,6 +38,24 @@ def test_entity_type_mapping(self): result = to_entity_resource_names(entity_api, data, namespace="acme") pd.testing.assert_series_equal(expected, result) + def test_global_entity_type_mapping(self): + data = pd.Series(["I_DE", "I_US", "", "abcXYZ0189"], name="country") + expected = pd.Series( + [ + "entityTypes/country/entities/I_DE", + "entityTypes/country/entities/I_US", + None, + "entityTypes/country/entities/abcXYZ0189", + ], + name="entity", + ) + entity_api = mock.create_autospec(EntityApi(ClientConfig(api_key="123"), use_json=True)) + entity_api.list_entity_types.side_effect = [ + [EntityType("entityTypes/country", "Countries", "Countries", True)] + ] + result = to_entity_resource_names(entity_api, data, namespace="acme") + pd.testing.assert_series_equal(expected, result) + def test_isin_mapping(self): data = pd.Series(["US87612E1064", "DE000A1EWWW0", "US87612E1064"], name="isin") expected = pd.Series( @@ -67,6 +86,79 @@ def test_isin_mapping(self): ) pd.testing.assert_series_equal(expected, result) + def test_isin_mapping_with_entity_mapping(self): + data = pd.Series(["US87612E1064", "do_not_search_for"], name="isin") + entity_mapping = { + "isin": {"do_not_search_for": "entityTypes/company/entities/was_not_searched_for"} + } + expected = pd.Series( + [ + "entityTypes/company/entities/target_inc", + "entityTypes/company/entities/was_not_searched_for", + ], + name="entity", + ) + entity_api = mock.create_autospec(EntityApi(ClientConfig(api_key="123"), use_json=True)) + entity_api.search_for_entities.side_effect = [ + [Entity("entityTypes/company/entities/target_inc", "Target, Inc.")] + ] + result = to_entity_resource_names( + entity_api, data, namespace="acme", entity_mapping=entity_mapping + ) + call_args_list = entity_api.search_for_entities.call_args_list + self.assertEqual(1, len(call_args_list)) + self.assertEqual( + {"entity_type": "entityTypes/company", "isin": "US87612E1064"}, + call_args_list[0][1], + "Arguments not as expected", + ) + pd.testing.assert_series_equal(expected, result) + + def test_entity_mapping(self): + company_data = pd.Series(["TGT US", "ADI GE"], name="bloomberg_ticker") + brand_data = pd.Series( + [ + "should_be_mapped_not_normalized", + "should be mapped not normalized", + "should#be#mapped#not#normalized", + ], + name="brand", + ) + entity_mapping = { + "bloomberg_ticker": { + "TGT US": "entityTypes/company/entities/target_inc", + "ADI GE": "entityTypes/company/entities/adidas_ag", + }, + "brand": { + "should_be_mapped_not_normalized": "entityTypes/company/entities/brand", + "should be mapped not normalized": "entityTypes/company/entities/other_brand", + "should#be#mapped#not#normalized": "entityTypes/company/entities/another_brand", + }, + } + expected_companies = pd.Series( + ["entityTypes/company/entities/target_inc", "entityTypes/company/entities/adidas_ag"], + name="entity", + ) + expected_brands = pd.Series( + [ + "entityTypes/company/entities/brand", + "entityTypes/company/entities/other_brand", + "entityTypes/company/entities/another_brand", + ], + name="entity", + ) + entity_api = mock.create_autospec(EntityApi(ClientConfig(api_key="123"), use_json=True)) + company_result = to_entity_resource_names( + entity_api, company_data, namespace="acme", entity_mapping=entity_mapping + ) + self.assertFalse(entity_api.search_for_entities.called) + pd.testing.assert_series_equal(expected_companies, company_result) + + brand_result = to_entity_resource_names( + entity_api, brand_data, namespace="acme", entity_mapping=entity_mapping + ) + pd.testing.assert_series_equal(expected_brands, brand_result) + def test_micticker_mapping(self): # Note that "NO?COLON" and "TOO:MANY:COLONS" are illegal mic:ticker identifiers, # since any legal identifier must contain exactly one colon. diff --git a/exabel_data_sdk/util/resource_name_normalization.py b/exabel_data_sdk/util/resource_name_normalization.py index 3f0fe64..4b72b7b 100644 --- a/exabel_data_sdk/util/resource_name_normalization.py +++ b/exabel_data_sdk/util/resource_name_normalization.py @@ -1,6 +1,6 @@ import re import sys -from typing import Mapping +from typing import Mapping, MutableMapping import pandas as pd @@ -56,7 +56,10 @@ def _assert_no_collision(mapping: Mapping[str, str]) -> None: def to_entity_resource_names( - entity_api: EntityApi, identifiers: pd.Series, namespace: str = None + entity_api: EntityApi, + identifiers: pd.Series, + namespace: str = None, + entity_mapping: Mapping[str, Mapping[str, str]] = None, ) -> pd.Series: """ Turns the given identifiers into entity resource names. @@ -98,9 +101,17 @@ def to_entity_resource_names( The given identifiers are customer provided names. The names are first normalized (using the normalize_resource_name method) and then a full resource identifier is constructed on this form: - entityTypes/{entityType}/entity/{namespace}.{normalized_name} + entityTypes/{entityType}/entities/{namespace}.{normalized_name} for example: - entityTypes/brand/entity/acme.Spring_Vine + entityTypes/brand/entities/acme.Spring_Vine + If the entity type is read-only, e.g. "country" or "currency", the namespace is + not added to the resource identifier. + For example: + entityTypes/country/entities/I_DE + + It is also possible to override the normalisation and search with the provided + `entity_mapping`. This is useful when the Exabel API is not able to find the corresponding + entity for an identifier, or one wants to hard map an identifier to a specific entity. Returns: a Series with the same index as the input Series @@ -111,12 +122,15 @@ def to_entity_resource_names( # Already resource identifiers, nothing to be done return identifiers - unique_ids = identifiers.unique() + unique_ids = identifiers.unique().tolist() + mapping: MutableMapping[str, str] = {} + if entity_mapping and entity_mapping.get(name): + mapping.update(entity_mapping[name]) + unique_ids = [unique_id for unique_id in unique_ids if unique_id not in mapping] if name in ("isin", "factset_identifier", "bloomberg_ticker", "mic:ticker"): # A company identifier print(f"Looking up {len(unique_ids)} {name}s...") - mapping = {} for identifier in unique_ids: if not identifier: # Skip empty identifiers @@ -144,6 +158,10 @@ def to_entity_resource_names( # Should be a known entity type entity_type_name = f"entityTypes/{name}" entity_type = entity_api.get_entity_type(entity_type_name) + entity_types = entity_api.list_entity_types() + read_only_entity_type_names = [ + entity_type.name for entity_type in entity_types if entity_type.read_only + ] if not entity_type: message = f"Failure: Did not find entity type {entity_type_name}" print(message) @@ -153,16 +171,22 @@ def to_entity_resource_names( if namespace is None: prefix = "" + elif entity_type_name in read_only_entity_type_names: + prefix = "" else: if "." in namespace: raise ValueError(f"Namespace cannot contain periods (.), got {namespace}") prefix = f"{namespace}." - mapping = { - identifier: f"{entity_type_name}/entities/{prefix}{normalize_resource_name(identifier)}" - for identifier in unique_ids - if identifier - } + mapping.update( + { + identifier: ( + f"{entity_type_name}/entities/{prefix}{normalize_resource_name(identifier)}" + ) + for identifier in unique_ids + if identifier + } + ) _assert_no_collision(mapping) result = identifiers.map(mapping)