Skip to content

Commit

Permalink
script for exporting data + SnowflakeReader (#155)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivarurdalen authored Jan 16, 2024
1 parent 18b29d1 commit 407f38b
Show file tree
Hide file tree
Showing 8 changed files with 282 additions and 15 deletions.
8 changes: 6 additions & 2 deletions exabel_data_sdk/client/api/export_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from exabel_data_sdk.query.predicate import Predicate
from exabel_data_sdk.query.query import Query
from exabel_data_sdk.query.signals import Signals
from exabel_data_sdk.scripts.utils import conditional_progress_bar


class ExportApi:
Expand Down Expand Up @@ -80,7 +81,7 @@ def run_query_bytes(self, query: Union[str, Query], file_format: str) -> bytes:
error_message = response.content.decode()
if error_message.startswith('"') and error_message.endswith('"'):
error_message = error_message[1:-1]
error_message = f"{response.status_code}: {error_message}"
error_message = f"Got {response.status_code}: {error_message} for query {query}"
raise ValueError(error_message)

def run_query(self, query: Union[str, Query]) -> pd.DataFrame:
Expand Down Expand Up @@ -225,6 +226,7 @@ def batched_signal_query(
end_time: Optional[Union[str, pd.Timestamp]] = None,
identifier: Optional[Union[Column, Sequence[Column]]] = None,
version: Optional[Union[str, pd.Timestamp, Sequence[str], Sequence[pd.Timestamp]]] = None,
show_progress: bool = False,
) -> Union[pd.Series, pd.DataFrame]:
"""
Run a query for one or more signals.
Expand Down Expand Up @@ -271,6 +273,8 @@ def batched_signal_query(
version=version,
**{entity_identifier: entities[i : i + batch_size]},
)
for i in range(0, len(entities), batch_size)
for i in conditional_progress_bar(
range(0, len(entities), batch_size), show_progress=show_progress
)
]
return pd.concat(results)
2 changes: 1 addition & 1 deletion exabel_data_sdk/scripts/export_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


class ExportData:
"""Script for exporting data from the Exabel API."""
"""Script for exporting data from the Exabel API with a user-provided query string."""

def __init__(self, argv: Sequence[str]):
self.argv = argv
Expand Down
135 changes: 135 additions & 0 deletions exabel_data_sdk/scripts/export_signals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import argparse
import logging
import os
import sys
from datetime import timedelta
from time import time
from typing import List, Sequence, Set

import pandas as pd

from exabel_data_sdk import ExabelClient
from exabel_data_sdk.client.api.export_api import ExportApi
from exabel_data_sdk.scripts.base_script import BaseScript
from exabel_data_sdk.scripts.file_utils import (
supported_formats_message,
to_file,
validate_file_extension,
)


class ExportSignals(BaseScript):
"""Script for exporting time series from the Exabel API for specified signals."""

def __init__(self, argv: Sequence[str], description: str):
super().__init__(argv, description)
self.parser.add_argument(
"--signal",
nargs="+",
required=True,
type=str,
help="The signal(s) to export",
)
self.parser.add_argument(
"--tag",
nargs="+",
required=True,
type=str,
help="The tag(s) with entities for which to evaluate the signal(s)",
)
self.parser.add_argument(
"--filename",
required=True,
type=validate_file_extension,
help=(
"The filename where the exported data should be saved. "
+ supported_formats_message()
),
)
self.parser.add_argument(
"--start-date",
required=False,
type=pd.Timestamp,
help="The first date to evaluate the signals for",
)
self.parser.add_argument(
"--end-date",
required=False,
type=pd.Timestamp,
help="The last date to evaluate the signals for",
)
self.parser.add_argument(
"--known-time",
required=False,
type=pd.Timestamp,
help="The point-in-time to retrieve the time series at",
)
self.parser.add_argument(
"--batch-size",
type=int,
help="The number of entities to evaluate in each batch.",
default=100,
)
self.parser.add_argument(
"--show-progress",
required=False,
action="store_true",
default=False,
help="Show progress bar",
)

@staticmethod
def get_api_key(args: argparse.Namespace) -> str:
"""
Get the API key to use, either from the command line arguments or the environment.
Raises SystemExit if there is no API key provided.
"""
api_key = args.api_key or os.getenv("EXABEL_API_KEY")
if not api_key:
print("No API key specified.")
print("Use the --api-key command line argument or EXABEL_API_KEY environment variable.")
sys.exit(1)
return api_key

def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None:
"""Download data from the Exabel API and store it to file."""
api_key = self.get_api_key(args)
start_time = time()
tag_results: List[Set[str]] = []
for tag in args.tag:
if not tag.startswith("tags/"):
tag = f"tags/{tag}"
result = set(client.tag_api.get_entity_iterator(tag))
tag_results.append(result)
print("Found", len(result), "entities in tag", tag)
entities = tag_results[0]
if len(tag_results) > 1:
entities = entities.union(*tag_results[1:])
print("In total", len(entities), "entities")
export_api = ExportApi.from_api_key(api_key)
signals = args.signal
print("Downloading signal(s):", ", ".join(signals))
logging.getLogger("exabel_data_sdk.client.api.export_api").setLevel(logging.WARNING)
data = export_api.batched_signal_query(
batch_size=args.batch_size,
signal=signals,
resource_name=list(entities),
start_time=args.start_date,
end_time=args.end_date,
version=args.known_time,
show_progress=args.show_progress,
)
if isinstance(data, pd.Series):
data = data.to_frame()
# Remove timezone information (all dates are UTC, and Excel doesn't support timezone)
data.index = pd.MultiIndex.from_arrays(
[data.index.get_level_values(0), data.index.get_level_values(1).tz_localize(None)],
names=data.index.names,
)
to_file(data, args.filename)
spent_time = timedelta(seconds=int(time() - start_time))
print(f"{data.shape[0]} rows of data written to {args.filename}, spent {spent_time}")


if __name__ == "__main__":
ExportSignals(sys.argv, "Export time series from the Exabel API for specified signals").run()
44 changes: 44 additions & 0 deletions exabel_data_sdk/scripts/file_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from argparse import ArgumentTypeError

import pandas as pd


def supported_formats_message() -> str:
"""Returns a message with the supported file formats."""
return "Supported file formats are .csv, .xlsx, .pickle, .feather and .parquet."


def validate_file_extension(file: str) -> str:
"""
Raises ArgumentTypeError if the file extension is not one of those accepted by 'to_file'.
Otherwise, returns the given file name.
Intended to be used as the 'type' of an argparse argument.
"""
extension = file.split(".")[-1]
if extension not in ("csv", "xlsx", "pickle", "feather", "parquet"):
raise ArgumentTypeError(
f"Unknown file extension {extension}. " + supported_formats_message()
)
return file


def to_file(data: pd.DataFrame, file: str) -> None:
"""Writes the given data to a file, with the file type dictated by the file extension."""
extension = file.split(".")[-1]
if extension == "csv":
data.to_csv(file)
elif extension == "xlsx":
data.to_excel(file)
elif extension == "pickle":
data.to_pickle(file)
elif extension == "feather":
# feather only supports string column names.
if isinstance(data.columns, pd.MultiIndex):
data.columns = [str(column) for column in data.columns]
if isinstance(data.index, pd.MultiIndex):
data = data.reset_index()
data.to_feather(file)
elif extension == "parquet":
data.to_parquet(file)
else:
raise ValueError(f"Unknown file extension {extension} in file name {file}")
18 changes: 7 additions & 11 deletions exabel_data_sdk/scripts/get_time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,32 +29,28 @@ def __init__(self, argv: Sequence[str]):
self.parser.add_argument(
"--start",
required=False,
type=str,
type=pd.Timestamp,
help="The first date of the time series",
)
self.parser.add_argument(
"--end",
required=False,
type=str,
type=pd.Timestamp,
help="The last date of the time series",
)
self.parser.add_argument(
"--known-time",
required=False,
type=str,
type=pd.Timestamp,
help="The point-in-time to retrieve the time series at",
)

def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None:
start = pd.Timestamp(args.start) if args.start is not None else None
end = pd.Timestamp(args.end) if args.end is not None else None
known_time = pd.Timestamp(args.known_time) if args.known_time is not None else None

result = client.time_series_api.get_time_series(
args.name,
start=start,
end=end,
known_time=known_time,
start=args.start,
end=args.end,
known_time=args.known_time,
include_metadata=True,
)
if result is None:
Expand All @@ -67,7 +63,7 @@ def run_script(self, client: ExabelClient, args: argparse.Namespace) -> None:
[(t.strftime("%Y-%m-%d"), k.strftime("%Y-%m-%d")) for t, k in result.series.index],
names=["date", "known_time"],
)
if not known_time:
if not args.known_time:
result.series = result.series.droplevel("known_time")
else:
result.series.index = pd.Index(
Expand Down
10 changes: 10 additions & 0 deletions exabel_data_sdk/scripts/sql/read_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Sequence

from exabel_data_sdk.scripts.sql.sql_script import SqlScript
from exabel_data_sdk.services.sql.snowflake_reader import SnowflakeReader
from exabel_data_sdk.services.sql.snowflake_reader_configuration import SnowflakeReaderConfiguration


Expand Down Expand Up @@ -46,6 +47,15 @@ def __init__(self, argv: Sequence[str]):
help="The role to use. Required if no default is set for the user.",
)

def run(self) -> None:
args = self.parse_arguments()
self.setup_logging()
configuration = self.reader_configuration_class.from_args(args)
reader = SnowflakeReader(configuration.get_connection_args())
reader.read_sql_query_and_write_result(
args.query, args.output_file, batch_size=args.batch_size
)


if __name__ == "__main__":
ReadSnowflake(sys.argv).run()
2 changes: 1 addition & 1 deletion exabel_data_sdk/tests/scripts/sql/test_read_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from exabel_data_sdk.tests.decorators import requires_modules


@requires_modules("snowflake.sqlalchemy", "sqlalchemy")
@requires_modules("snowflake.connector")
class TestReadSnowflake(unittest.TestCase):
def test_read_snowflake_parse_args(self):
args = [
Expand Down
78 changes: 78 additions & 0 deletions exabel_data_sdk/tests/scripts/test_export_signals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import argparse
import os
import unittest

import pandas as pd

from exabel_data_sdk.scripts.export_signals import ExportSignals


class TestExportSignals(unittest.TestCase):
def setUp(self):
self.common_args = [
"script",
"--signal",
"signalA",
"expression() AS signalB",
"--tag",
"rbics:5010",
"rbics:5012",
"--start-date",
"2023-01-31",
"--end-date",
"2024-02-29",
"--filename",
]

def _assert_common_args(self, args: argparse.Namespace):
self.assertEqual(args.signal, ["signalA", "expression() AS signalB"])
self.assertEqual(args.tag, ["rbics:5010", "rbics:5012"])
self.assertEqual(args.start_date, pd.Timestamp("2023-01-31"))
self.assertEqual(args.end_date, pd.Timestamp("2024-02-29"))

def test_args_api_key(self):
script = ExportSignals(self.common_args + ["foo.csv", "--api-key", "api-key"], "desc")
self.assertEqual(script.parser.description, "desc")
args = script.parse_arguments()
self._assert_common_args(args)
self.assertEqual(args.filename, "foo.csv")
self.assertEqual(args.api_key, "api-key")
self.assertEqual(script.get_api_key(args), "api-key")

def test_args_env_variable(self):
os.environ["EXABEL_API_KEY"] = "env_key"
try:
script = ExportSignals(self.common_args + ["foo.csv"], "desc")
self.assertEqual(script.parser.description, "desc")
args = script.parse_arguments()
self._assert_common_args(args)
self.assertIsNone(args.api_key)
self.assertEqual(args.filename, "foo.csv")
self.assertEqual(script.get_api_key(args), "env_key")
finally:
del os.environ["EXABEL_API_KEY"]

def test_args_known_time(self):
script = ExportSignals(
self.common_args + ["foo.csv", "--api-key", "api-key", "--known-time", "2024-01-03"],
"desc",
)
self.assertEqual(script.parser.description, "desc")
args = script.parse_arguments()
self._assert_common_args(args)
self.assertEqual(args.known_time, pd.Timestamp("2024-01-03"))

def test_args_missing_api_key(self):
script = ExportSignals(self.common_args + ["foo.csv"], "desc")
self.assertRaises(SystemExit, script.parse_arguments)

def test_args_unknown_file_extension(self):
script = ExportSignals(self.common_args + ["foo.pdf", "--api-key", "api-key"], "desc")
self.assertRaises(SystemExit, script.parse_arguments)

def test_args_unknown_date_format(self):
script = ExportSignals(
self.common_args + ["foo.pickle", "--api-key", "api-key", "--known-time", "Monday"],
"desc",
)
self.assertRaises(SystemExit, script.parse_arguments)

0 comments on commit 407f38b

Please sign in to comment.