Skip to content

Commit

Permalink
improved error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
DevDaveFrame committed Dec 5, 2023
1 parent e5b2722 commit 32403b9
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 24 deletions.
63 changes: 46 additions & 17 deletions bigquery_exporter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
from google.cloud import bigquery
from google.oauth2 import service_account
from google.api_core.exceptions import GoogleAPICallError
from google.api_core.exceptions import RetryError
from google.api_core.retry import Retry

from bigquery_exporter.errors import BigQueryExporterInitError, BigQueryExporterValidationError

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -40,16 +44,35 @@ def batch_qs(qs, batch_size=1000):


class BigQueryExporter:
"""
Base class for exporting Django models to BigQuery.
"""
model = None
fields = []
batch = 1000
table_name = ''
replace_nulls_with_empty = False

def __init__(self, project=None, credentials=None):
"""
Initializes the BigQueryExporter.
Args:
project (str, optional): The Google Cloud project id. If not provided, the default project
will be used.
credentials (str, optional): The path to the service account credentials file. If not provided,
the default credentials will be used.
Raises:
BigQueryExporterInitError: If an error occurs while initializing the BigQuery client.
BigQueryExporterValidationError: If an error occurs while validating the fields.
AssertionError: If the model or table_name is not defined.
"""
assert self.model is not None, 'Model is not defined'
assert self.table_name != '', 'BigQuery table name is not defined'
logger.info(f'Initializing BigQuery client for {self.__class__.__name__}')
self._initialize_client(project, credentials)
logger.info(f'Validating fields for {self.__class__.__name__}')
self._validate_fields()

def define_queryset(self):
Expand Down Expand Up @@ -80,21 +103,26 @@ def export(self, pull_date=None):
Exception: If an error occurs while exporting the data.
Returns:
None
errors: A list of errors that occurred while exporting the data.
"""
pull_time = datetime.datetime.now() if not pull_date else pull_date
errors = []
try:
queryset = self.define_queryset()
for start, end, total, qs in batch_qs(queryset, self.batch):
logger.info(f'Processing {start} - {end} of {total} {self.model}')
reporting_data = self._process_queryset(qs, pull_time)
if reporting_data:
self._push_to_bigquery(reporting_data)
logger.info(
f'Finished exporting {len(queryset)} {self.model} in {datetime.datetime.now() - pull_time}'
)
except Exception as e:
logger.error(f'Error while exporting {self.model}: {e}')
if reporting_data := self._process_queryset(qs, pull_time):
if batch_errors := self._push_to_bigquery(reporting_data):
# updating the row index to account for the batch offset
for error in batch_errors:
error['index'] += start
errors.extend(errors)

logger.info(f'Finished exporting {len(queryset)} {self.model} in {datetime.datetime.now() - pull_time}')
except (GoogleAPICallError, RetryError) as e:
logger.error(f'GoogleAPIError while exporting {self.__class__.__name__}: {e}')
raise e
return errors

def table_has_data(self, pull_date=None):
"""
Expand Down Expand Up @@ -127,14 +155,15 @@ def _initialize_client(self, project=None, credentials=None):
self.table = self.client.get_table(self.table_name)
except GoogleAPICallError as e:
logging.error(f'Error while creating BigQuery client: {e}')
raise BigQueryExporterInitError(e)

def _push_to_bigquery(self, data):
def _push_to_bigquery(self, data, retry_deadline=600):
try:
errors = self.client.insert_rows(self.table, data)
if errors:
logger.error(f'Encountered errors while pushing to BigQuery {self.model}: {errors}')
except GoogleAPICallError as e:
logger.error(f'Error while exporting {self.model}: {e}')
if errors := self.client.insert_rows(self.table, data, retry=Retry(deadline=retry_deadline)):
return errors
except (GoogleAPICallError, RetryError) as e:
logger.error(f'Error pushing {self.__class__.__name__} to {self.table_name}: {e}')
raise e

def _process_queryset(self, queryset, pull_time):
processed_queryset = []
Expand Down Expand Up @@ -184,7 +213,7 @@ def _validate_fields_against_model(self):
for field in self.fields:
# check that all fields are valid (either a model field or a custom field method)
if not hasattr(self.model, field) and not hasattr(self, field):
raise Exception(
raise BigQueryExporterValidationError(
f'Invalid field {field} for model {self.model}. Must be a model field or a custom field method.'
)

Expand All @@ -195,4 +224,4 @@ def _validate_fields_against_table(self):
table_fields = [field.name for field in self.table.schema]
for field in self.fields:
if field not in table_fields:
raise Exception(f'Field {field} is not in the BigQuery table {self.table_name}')
raise BigQueryExporterValidationError(f'Field {field} is not in the BigQuery table {self.table_name}')
13 changes: 13 additions & 0 deletions bigquery_exporter/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from google.api_core.exceptions import GoogleAPICallError


class BigQueryExporterError(Exception):
pass


class BigQueryExporterInitError(BigQueryExporterError, GoogleAPICallError):
pass


class BigQueryExporterValidationError(BigQueryExporterError):
pass
19 changes: 12 additions & 7 deletions tests/test_base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
import datetime
from google.api_core.exceptions import GoogleAPICallError
from google.api_core.exceptions import GoogleAPICallError, RetryError

from bigquery_exporter.base import batch_qs, custom_field, BigQueryExporter


Expand Down Expand Up @@ -69,14 +70,18 @@ def test_export_calls_push_to_bigquery(self, test_exporter):
test_exporter._push_to_bigquery.assert_called()

def test_export_logs_error_on_google_api_call_error(self, test_exporter, caplog):
test_exporter._push_to_bigquery.side_effect = GoogleAPICallError('Error')
test_exporter.export()
assert 'Error while exporting' in caplog.text
with pytest.raises(GoogleAPICallError):
test_exporter._push_to_bigquery.side_effect = GoogleAPICallError('Error', 'error')
test_exporter.export()
assert 'Error pushing TestExporter' in caplog.text
assert 'GoogleAPICallError' in caplog.text

def test_export_logs_error_on_exception(self, test_exporter, caplog):
test_exporter._push_to_bigquery.side_effect = Exception('Error')
test_exporter.export()
assert 'Error while exporting' in caplog.text
with pytest.raises(RetryError):
test_exporter._push_to_bigquery.side_effect = RetryError('Error', 'error')
test_exporter.export()
assert 'Error pushing TestExporter' in caplog.text
assert 'RetryError' in caplog.text

def test_custom_field_decorator_sets_custom_attribute_on_callable(self):
@custom_field
Expand Down

0 comments on commit 32403b9

Please sign in to comment.