From 407f38b64953456b63815373168e8b8569dddcc3 Mon Sep 17 00:00:00 2001 From: Ivar Soares Urdalen Date: Wed, 17 Jan 2024 00:12:47 +0100 Subject: [PATCH] script for exporting data + SnowflakeReader (#155) --- exabel_data_sdk/client/api/export_api.py | 8 +- exabel_data_sdk/scripts/export_data.py | 2 +- exabel_data_sdk/scripts/export_signals.py | 135 ++++++++++++++++++ exabel_data_sdk/scripts/file_utils.py | 44 ++++++ exabel_data_sdk/scripts/get_time_series.py | 18 +-- exabel_data_sdk/scripts/sql/read_snowflake.py | 10 ++ .../tests/scripts/sql/test_read_snowflake.py | 2 +- .../tests/scripts/test_export_signals.py | 78 ++++++++++ 8 files changed, 282 insertions(+), 15 deletions(-) create mode 100644 exabel_data_sdk/scripts/export_signals.py create mode 100644 exabel_data_sdk/scripts/file_utils.py create mode 100644 exabel_data_sdk/tests/scripts/test_export_signals.py diff --git a/exabel_data_sdk/client/api/export_api.py b/exabel_data_sdk/client/api/export_api.py index fea6f6c..dd38242 100644 --- a/exabel_data_sdk/client/api/export_api.py +++ b/exabel_data_sdk/client/api/export_api.py @@ -11,6 +11,7 @@ from exabel_data_sdk.query.predicate import Predicate from exabel_data_sdk.query.query import Query from exabel_data_sdk.query.signals import Signals +from exabel_data_sdk.scripts.utils import conditional_progress_bar class ExportApi: @@ -80,7 +81,7 @@ def run_query_bytes(self, query: Union[str, Query], file_format: str) -> bytes: error_message = response.content.decode() if error_message.startswith('"') and error_message.endswith('"'): error_message = error_message[1:-1] - error_message = f"{response.status_code}: {error_message}" + error_message = f"Got {response.status_code}: {error_message} for query {query}" raise ValueError(error_message) def run_query(self, query: Union[str, Query]) -> pd.DataFrame: @@ -225,6 +226,7 @@ def batched_signal_query( end_time: Optional[Union[str, pd.Timestamp]] = None, identifier: Optional[Union[Column, Sequence[Column]]] = None, version: Optional[Union[str, pd.Timestamp, Sequence[str], Sequence[pd.Timestamp]]] = None, + show_progress: bool = False, ) -> Union[pd.Series, pd.DataFrame]: """ Run a query for one or more signals. @@ -271,6 +273,8 @@ def batched_signal_query( version=version, **{entity_identifier: entities[i : i + batch_size]}, ) - for i in range(0, len(entities), batch_size) + for i in conditional_progress_bar( + range(0, len(entities), batch_size), show_progress=show_progress + ) ] return pd.concat(results) diff --git a/exabel_data_sdk/scripts/export_data.py b/exabel_data_sdk/scripts/export_data.py index 0bba7d3..0852471 100644 --- a/exabel_data_sdk/scripts/export_data.py +++ b/exabel_data_sdk/scripts/export_data.py @@ -7,7 +7,7 @@ class ExportData: - """Script for exporting data from the Exabel API.""" + """Script for exporting data from the Exabel API with a user-provided query string.""" def __init__(self, argv: Sequence[str]): self.argv = argv diff --git a/exabel_data_sdk/scripts/export_signals.py b/exabel_data_sdk/scripts/export_signals.py new file mode 100644 index 0000000..901c831 --- /dev/null +++ b/exabel_data_sdk/scripts/export_signals.py @@ -0,0 +1,135 @@ +import argparse +import logging +import os +import sys +from datetime import timedelta +from time import time +from typing import List, Sequence, Set + +import pandas as pd + +from exabel_data_sdk import ExabelClient +from exabel_data_sdk.client.api.export_api import ExportApi +from exabel_data_sdk.scripts.base_script import BaseScript +from exabel_data_sdk.scripts.file_utils import ( + supported_formats_message, + to_file, + validate_file_extension, +) + + +class ExportSignals(BaseScript): + """Script for exporting time series from the Exabel API for specified signals.""" + + def __init__(self, argv: Sequence[str], description: str): + super().__init__(argv, description) + self.parser.add_argument( + "--signal", + nargs="+", + required=True, + type=str, + help="The signal(s) to export", + ) + self.parser.add_argument( + "--tag", + nargs="+", + required=True, + type=str, + help="The tag(s) with entities for which to evaluate the signal(s)", + ) + self.parser.add_argument( + "--filename", + required=True, + type=validate_file_extension, + help=( + "The filename where the exported data should be saved. " + + supported_formats_message() + ), + ) + self.parser.add_argument( + "--start-date", + required=False, + type=pd.Timestamp, + help="The first date to evaluate the signals for", + ) + self.parser.add_argument( + "--end-date", + required=False, + type=pd.Timestamp, + help="The last date to evaluate the signals for", + ) + self.parser.add_argument( + "--known-time", + required=False, + type=pd.Timestamp, + help="The point-in-time to retrieve the time series at", + ) + self.parser.add_argument( + "--batch-size", + type=int, + help="The number of entities to evaluate in each batch.", + default=100, + ) + self.parser.add_argument( + "--show-progress", + required=False, + action="store_true", + default=False, + help="Show progress bar", + ) + + @staticmethod + def get_api_key(args: argparse.Namespace) -> str: + """ + Get the API key to use, either from the command line arguments or the environment. + Raises SystemExit if there is no API key provided. + """ + api_key = args.api_key or os.getenv("EXABEL_API_KEY") + if not api_key: + print("No API key specified.") + print("Use the --api-key command line argument or EXABEL_API_KEY environment variable.") + sys.exit(1) + return api_key + + def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None: + """Download data from the Exabel API and store it to file.""" + api_key = self.get_api_key(args) + start_time = time() + tag_results: List[Set[str]] = [] + for tag in args.tag: + if not tag.startswith("tags/"): + tag = f"tags/{tag}" + result = set(client.tag_api.get_entity_iterator(tag)) + tag_results.append(result) + print("Found", len(result), "entities in tag", tag) + entities = tag_results[0] + if len(tag_results) > 1: + entities = entities.union(*tag_results[1:]) + print("In total", len(entities), "entities") + export_api = ExportApi.from_api_key(api_key) + signals = args.signal + print("Downloading signal(s):", ", ".join(signals)) + logging.getLogger("exabel_data_sdk.client.api.export_api").setLevel(logging.WARNING) + data = export_api.batched_signal_query( + batch_size=args.batch_size, + signal=signals, + resource_name=list(entities), + start_time=args.start_date, + end_time=args.end_date, + version=args.known_time, + show_progress=args.show_progress, + ) + if isinstance(data, pd.Series): + data = data.to_frame() + # Remove timezone information (all dates are UTC, and Excel doesn't support timezone) + data.index = pd.MultiIndex.from_arrays( + [data.index.get_level_values(0), data.index.get_level_values(1).tz_localize(None)], + names=data.index.names, + ) + to_file(data, args.filename) + spent_time = timedelta(seconds=int(time() - start_time)) + print(f"{data.shape[0]} rows of data written to {args.filename}, spent {spent_time}") + + +if __name__ == "__main__": + ExportSignals(sys.argv, "Export time series from the Exabel API for specified signals").run() diff --git a/exabel_data_sdk/scripts/file_utils.py b/exabel_data_sdk/scripts/file_utils.py new file mode 100644 index 0000000..66cbda2 --- /dev/null +++ b/exabel_data_sdk/scripts/file_utils.py @@ -0,0 +1,44 @@ +from argparse import ArgumentTypeError + +import pandas as pd + + +def supported_formats_message() -> str: + """Returns a message with the supported file formats.""" + return "Supported file formats are .csv, .xlsx, .pickle, .feather and .parquet." + + +def validate_file_extension(file: str) -> str: + """ + Raises ArgumentTypeError if the file extension is not one of those accepted by 'to_file'. + Otherwise, returns the given file name. + Intended to be used as the 'type' of an argparse argument. + """ + extension = file.split(".")[-1] + if extension not in ("csv", "xlsx", "pickle", "feather", "parquet"): + raise ArgumentTypeError( + f"Unknown file extension {extension}. " + supported_formats_message() + ) + return file + + +def to_file(data: pd.DataFrame, file: str) -> None: + """Writes the given data to a file, with the file type dictated by the file extension.""" + extension = file.split(".")[-1] + if extension == "csv": + data.to_csv(file) + elif extension == "xlsx": + data.to_excel(file) + elif extension == "pickle": + data.to_pickle(file) + elif extension == "feather": + # feather only supports string column names. + if isinstance(data.columns, pd.MultiIndex): + data.columns = [str(column) for column in data.columns] + if isinstance(data.index, pd.MultiIndex): + data = data.reset_index() + data.to_feather(file) + elif extension == "parquet": + data.to_parquet(file) + else: + raise ValueError(f"Unknown file extension {extension} in file name {file}") diff --git a/exabel_data_sdk/scripts/get_time_series.py b/exabel_data_sdk/scripts/get_time_series.py index 985adc8..69ffba0 100644 --- a/exabel_data_sdk/scripts/get_time_series.py +++ b/exabel_data_sdk/scripts/get_time_series.py @@ -29,32 +29,28 @@ def __init__(self, argv: Sequence[str]): self.parser.add_argument( "--start", required=False, - type=str, + type=pd.Timestamp, help="The first date of the time series", ) self.parser.add_argument( "--end", required=False, - type=str, + type=pd.Timestamp, help="The last date of the time series", ) self.parser.add_argument( "--known-time", required=False, - type=str, + type=pd.Timestamp, help="The point-in-time to retrieve the time series at", ) def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None: - start = pd.Timestamp(args.start) if args.start is not None else None - end = pd.Timestamp(args.end) if args.end is not None else None - known_time = pd.Timestamp(args.known_time) if args.known_time is not None else None - result = client.time_series_api.get_time_series( args.name, - start=start, - end=end, - known_time=known_time, + start=args.start, + end=args.end, + known_time=args.known_time, include_metadata=True, ) if result is None: @@ -67,7 +63,7 @@ def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None: [(t.strftime("%Y-%m-%d"), k.strftime("%Y-%m-%d")) for t, k in result.series.index], names=["date", "known_time"], ) - if not known_time: + if not args.known_time: result.series = result.series.droplevel("known_time") else: result.series.index = pd.Index( diff --git a/exabel_data_sdk/scripts/sql/read_snowflake.py b/exabel_data_sdk/scripts/sql/read_snowflake.py index cf44eca..ea1c424 100644 --- a/exabel_data_sdk/scripts/sql/read_snowflake.py +++ b/exabel_data_sdk/scripts/sql/read_snowflake.py @@ -2,6 +2,7 @@ from typing import Sequence from exabel_data_sdk.scripts.sql.sql_script import SqlScript +from exabel_data_sdk.services.sql.snowflake_reader import SnowflakeReader from exabel_data_sdk.services.sql.snowflake_reader_configuration import SnowflakeReaderConfiguration @@ -46,6 +47,15 @@ def __init__(self, argv: Sequence[str]): help="The role to use. Required if no default is set for the user.", ) + def run(self) -> None: + args = self.parse_arguments() + self.setup_logging() + configuration = self.reader_configuration_class.from_args(args) + reader = SnowflakeReader(configuration.get_connection_args()) + reader.read_sql_query_and_write_result( + args.query, args.output_file, batch_size=args.batch_size + ) + if __name__ == "__main__": ReadSnowflake(sys.argv).run() diff --git a/exabel_data_sdk/tests/scripts/sql/test_read_snowflake.py b/exabel_data_sdk/tests/scripts/sql/test_read_snowflake.py index ecf5152..93bcf35 100644 --- a/exabel_data_sdk/tests/scripts/sql/test_read_snowflake.py +++ b/exabel_data_sdk/tests/scripts/sql/test_read_snowflake.py @@ -5,7 +5,7 @@ from exabel_data_sdk.tests.decorators import requires_modules -@requires_modules("snowflake.sqlalchemy", "sqlalchemy") +@requires_modules("snowflake.connector") class TestReadSnowflake(unittest.TestCase): def test_read_snowflake_parse_args(self): args = [ diff --git a/exabel_data_sdk/tests/scripts/test_export_signals.py b/exabel_data_sdk/tests/scripts/test_export_signals.py new file mode 100644 index 0000000..8eb4697 --- /dev/null +++ b/exabel_data_sdk/tests/scripts/test_export_signals.py @@ -0,0 +1,78 @@ +import argparse +import os +import unittest + +import pandas as pd + +from exabel_data_sdk.scripts.export_signals import ExportSignals + + +class TestExportSignals(unittest.TestCase): + def setUp(self): + self.common_args = [ + "script", + "--signal", + "signalA", + "expression() AS signalB", + "--tag", + "rbics:5010", + "rbics:5012", + "--start-date", + "2023-01-31", + "--end-date", + "2024-02-29", + "--filename", + ] + + def _assert_common_args(self, args: argparse.Namespace): + self.assertEqual(args.signal, ["signalA", "expression() AS signalB"]) + self.assertEqual(args.tag, ["rbics:5010", "rbics:5012"]) + self.assertEqual(args.start_date, pd.Timestamp("2023-01-31")) + self.assertEqual(args.end_date, pd.Timestamp("2024-02-29")) + + def test_args_api_key(self): + script = ExportSignals(self.common_args + ["foo.csv", "--api-key", "api-key"], "desc") + self.assertEqual(script.parser.description, "desc") + args = script.parse_arguments() + self._assert_common_args(args) + self.assertEqual(args.filename, "foo.csv") + self.assertEqual(args.api_key, "api-key") + self.assertEqual(script.get_api_key(args), "api-key") + + def test_args_env_variable(self): + os.environ["EXABEL_API_KEY"] = "env_key" + try: + script = ExportSignals(self.common_args + ["foo.csv"], "desc") + self.assertEqual(script.parser.description, "desc") + args = script.parse_arguments() + self._assert_common_args(args) + self.assertIsNone(args.api_key) + self.assertEqual(args.filename, "foo.csv") + self.assertEqual(script.get_api_key(args), "env_key") + finally: + del os.environ["EXABEL_API_KEY"] + + def test_args_known_time(self): + script = ExportSignals( + self.common_args + ["foo.csv", "--api-key", "api-key", "--known-time", "2024-01-03"], + "desc", + ) + self.assertEqual(script.parser.description, "desc") + args = script.parse_arguments() + self._assert_common_args(args) + self.assertEqual(args.known_time, pd.Timestamp("2024-01-03")) + + def test_args_missing_api_key(self): + script = ExportSignals(self.common_args + ["foo.csv"], "desc") + self.assertRaises(SystemExit, script.parse_arguments) + + def test_args_unknown_file_extension(self): + script = ExportSignals(self.common_args + ["foo.pdf", "--api-key", "api-key"], "desc") + self.assertRaises(SystemExit, script.parse_arguments) + + def test_args_unknown_date_format(self): + script = ExportSignals( + self.common_args + ["foo.pickle", "--api-key", "api-key", "--known-time", "Monday"], + "desc", + ) + self.assertRaises(SystemExit, script.parse_arguments)