diff --git a/tap_facebook/__init__.py b/tap_facebook/__init__.py index 6459b552..b4bd029e 100644 --- a/tap_facebook/__init__.py +++ b/tap_facebook/__init__.py @@ -193,6 +193,11 @@ def advance_bookmark(state, tap_stream_id, bookmark_key, date): class InsightsJobTimeout(Exception): pass +def log_insights_retry_attempt(details): + _, exception, _ = sys.exc_info() + LOGGER.info(exception) + LOGGER.info('Retrying Insights job...') + @attr.s class AdsInsights(Stream): field_class = adsinsights.AdsInsights.Field @@ -219,11 +224,6 @@ def __attrs_post_init__(self): if self.options.get('primary-keys'): self.key_properties.extend(self.options['primary-keys']) - @backoff.on_exception( - backoff.expo, - (InsightsJobTimeout), - max_tries=3, - factor=2) def job_params(self): start_date = pendulum.parse(get_start(self.state, self.name, self.bookmark_key)) @@ -252,7 +252,13 @@ def job_params(self): } buffered_start_date = buffered_start_date.add(days=1) - + @backoff.on_exception( + backoff.constant, + InsightsJobTimeout, + max_tries=2, + interval=0, + on_backoff=log_insights_retry_attempt + ) def run_job(self, params): LOGGER.info('Starting adsinsights job with params %s', params) job = self.account.get_insights( # pylint: disable=no-member @@ -279,17 +285,18 @@ def run_job(self, params): job_id, INSIGHTS_MAX_WAIT_TO_START_SECONDS)) elif duration > INSIGHTS_MAX_WAIT_TO_FINISH_SECONDS and status != "Job Completed": pretty_error_message = ('Insights job {} did not complete after {} minutes. ' + - 'This is an intermittent error and may resolve itself on subsequent queries to the Facebook API. ' + - 'You should deselect fields from the schema that are not necessary, ' + - 'as that may help improve the reliability of the Facebook API.') - raise Exception(pretty_error_message.format(job_id, INSIGHTS_MAX_WAIT_TO_FINISH_SECONDS//60)) + 'This is an intermittent error and may resolve itself on subsequent queries to the Facebook API. ' + + 'You should deselect fields from the schema that are not necessary, ' + + 'as that may help improve the reliability of the Facebook API.') + raise InsightsJobTimeout(pretty_error_message.format(job_id, + INSIGHTS_MAX_WAIT_TO_FINISH_SECONDS//60)) + LOGGER.info("sleeping for %d seconds until job is done", sleep_time) time.sleep(sleep_time) if sleep_time < INSIGHTS_MAX_ASYNC_SLEEP_SECONDS: sleep_time = 2 * sleep_time return job - def __iter__(self): for params in self.job_params(): with metrics.job_timer('insights'):