Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(datasets): Replace deprecated GBQDataset load/save funcs #826

Merged
merged 8 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions kedro-datasets/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@

## Bug fixes and other changes
* Refactored all datasets to set `fs_args` defaults in the same way as `load_args` and `save_args` and not have hardcoded values in the save methods.
* Fixed deprecated load and save approaches of GBQTableDataset and GBQQueryDataset by invoking save and load directly over `pandas-gbq` lib

## Breaking Changes
## Community contributions
Many thanks to the following Kedroids for contributing PRs to this release:
* [Brandon Meek](https://github.com/bpmeek)
* [yury-fedotov](https://github.com/yury-fedotov)
* [janickspirig](https://github.com/janickspirig)


# Release 4.1.0
Expand Down
20 changes: 11 additions & 9 deletions kedro-datasets/kedro_datasets/pandas/gbq_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import fsspec
import pandas as pd
import pandas_gbq as pd_gbq
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from google.oauth2.credentials import Credentials
Expand Down Expand Up @@ -138,16 +139,17 @@ def _describe(self) -> dict[str, Any]:

def _load(self) -> pd.DataFrame:
sql = f"select * from {self._dataset}.{self._table_name}" # nosec
self._load_args.setdefault("query", sql)
return pd.read_gbq(
self._load_args.setdefault("query_or_table", sql)
return pd_gbq.read_gbq(
project_id=self._project_id,
credentials=self._credentials,
**self._load_args,
)

def _save(self, data: pd.DataFrame) -> None:
data.to_gbq(
f"{self._dataset}.{self._table_name}",
pd_gbq.to_gbq(
dataframe=data,
destination_table=f"{self._dataset}.{self._table_name}",
project_id=self._project_id,
credentials=self._credentials,
**self._save_args,
Expand Down Expand Up @@ -176,7 +178,7 @@ def _validate_location(self):

class GBQQueryDataset(AbstractDataset[None, pd.DataFrame]):
"""``GBQQueryDataset`` loads data from a provided SQL query from Google
BigQuery. It uses ``pandas.read_gbq`` which itself uses ``pandas-gbq``
BigQuery. It uses ``pandas_gbq.read_gbq`` which itself uses ``pandas-gbq``
internally to read from BigQuery table. Therefore it supports all allowed
pandas options on ``read_gbq``.

Expand Down Expand Up @@ -274,7 +276,7 @@ def __init__( # noqa: PLR0913

# load sql query from arg or from file
if sql:
self._load_args["query"] = sql
self._load_args["query_or_table"] = sql
self._filepath = None
else:
# filesystem for loading sql file
Expand All @@ -291,7 +293,7 @@ def __init__( # noqa: PLR0913
def _describe(self) -> dict[str, Any]:
load_args = copy.deepcopy(self._load_args)
desc = {}
desc["sql"] = str(load_args.pop("query", None))
desc["sql"] = str(load_args.pop("query_or_table", None))
desc["filepath"] = str(self._filepath)
desc["load_args"] = str(load_args)

Expand All @@ -303,9 +305,9 @@ def _load(self) -> pd.DataFrame:
if self._filepath:
load_path = get_filepath_str(PurePosixPath(self._filepath), self._protocol)
with self._fs.open(load_path, mode="r") as fs_file:
load_args["query"] = fs_file.read()
load_args["query_or_table"] = fs_file.read()

return pd.read_gbq(
return pd_gbq.read_gbq(
project_id=self._project_id,
credentials=self._credentials,
**load_args,
Expand Down
43 changes: 31 additions & 12 deletions kedro-datasets/tests/pandas/test_gbq_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ def test_save_extra_params(self, gbq_dataset, save_args):
def test_load_missing_file(self, gbq_dataset, mocker):
"""Check the error when trying to load missing table."""
pattern = r"Failed while loading data from data set GBQTableDataset\(.*\)"
mocked_read_gbq = mocker.patch("kedro_datasets.pandas.gbq_dataset.pd.read_gbq")
mocked_read_gbq = mocker.patch(
"kedro_datasets.pandas.gbq_dataset.pd_gbq.read_gbq"
)
mocked_read_gbq.side_effect = ValueError
with pytest.raises(DatasetError, match=pattern):
gbq_dataset.load()
Expand Down Expand Up @@ -133,30 +135,43 @@ def test_save_load_data(self, gbq_dataset, dummy_dataframe, mocker):
"""Test saving and reloading the data set."""
sql = f"select * from {DATASET}.{TABLE_NAME}"
table_id = f"{DATASET}.{TABLE_NAME}"
mocked_read_gbq = mocker.patch("kedro_datasets.pandas.gbq_dataset.pd.read_gbq")
mocked_to_gbq = mocker.patch("kedro_datasets.pandas.gbq_dataset.pd_gbq.to_gbq")
mocked_read_gbq = mocker.patch(
"kedro_datasets.pandas.gbq_dataset.pd_gbq.read_gbq"
)
mocked_read_gbq.return_value = dummy_dataframe
mocked_df = mocker.Mock()

gbq_dataset.save(mocked_df)
loaded_data = gbq_dataset.load()

mocked_df.to_gbq.assert_called_once_with(
table_id, project_id=PROJECT, credentials=None, progress_bar=False
mocked_to_gbq.assert_called_once_with(
dataframe=mocked_df,
destination_table=table_id,
project_id=PROJECT,
credentials=None,
progress_bar=False,
)
mocked_read_gbq.assert_called_once_with(
project_id=PROJECT, credentials=None, query=sql
project_id=PROJECT, credentials=None, query_or_table=sql
)
assert_frame_equal(dummy_dataframe, loaded_data)

@pytest.mark.parametrize("load_args", [{"query": "Select 1"}], indirect=True)
@pytest.mark.parametrize(
"load_args", [{"query_or_table": "Select 1"}], indirect=True
)
def test_read_gbq_with_query(self, gbq_dataset, dummy_dataframe, mocker, load_args):
"""Test loading data set with query in the argument."""
mocked_read_gbq = mocker.patch("kedro_datasets.pandas.gbq_dataset.pd.read_gbq")
mocked_read_gbq = mocker.patch(
"kedro_datasets.pandas.gbq_dataset.pd_gbq.read_gbq"
)
mocked_read_gbq.return_value = dummy_dataframe
loaded_data = gbq_dataset.load()

mocked_read_gbq.assert_called_once_with(
project_id=PROJECT, credentials=None, query=load_args["query"]
project_id=PROJECT,
credentials=None,
query_or_table=load_args["query_or_table"],
)

assert_frame_equal(dummy_dataframe, loaded_data)
Expand Down Expand Up @@ -239,26 +254,30 @@ def test_credentials_propagation(self, mocker):

def test_load(self, mocker, gbq_sql_dataset, dummy_dataframe):
"""Test `load` method invocation"""
mocked_read_gbq = mocker.patch("kedro_datasets.pandas.gbq_dataset.pd.read_gbq")
mocked_read_gbq = mocker.patch(
"kedro_datasets.pandas.gbq_dataset.pd_gbq.read_gbq"
)
mocked_read_gbq.return_value = dummy_dataframe

loaded_data = gbq_sql_dataset.load()

mocked_read_gbq.assert_called_once_with(
project_id=PROJECT, credentials=None, query=SQL_QUERY
project_id=PROJECT, credentials=None, query_or_table=SQL_QUERY
)

assert_frame_equal(dummy_dataframe, loaded_data)

def test_load_query_file(self, mocker, gbq_sql_file_dataset, dummy_dataframe):
"""Test `load` method invocation using a file as input query"""
mocked_read_gbq = mocker.patch("kedro_datasets.pandas.gbq_dataset.pd.read_gbq")
mocked_read_gbq = mocker.patch(
"kedro_datasets.pandas.gbq_dataset.pd_gbq.read_gbq"
)
mocked_read_gbq.return_value = dummy_dataframe

loaded_data = gbq_sql_file_dataset.load()

mocked_read_gbq.assert_called_once_with(
project_id=PROJECT, credentials=None, query=SQL_QUERY
project_id=PROJECT, credentials=None, query_or_table=SQL_QUERY
)

assert_frame_equal(dummy_dataframe, loaded_data)
Expand Down