diff --git a/tap_postgres/__init__.py b/tap_postgres/__init__.py index 75ed479f..e55a619c 100644 --- a/tap_postgres/__init__.py +++ b/tap_postgres/__init__.py @@ -56,11 +56,20 @@ def do_sync_full_table(conn_config, stream, state, desired_columns, md_map): """ LOGGER.info("Stream %s is using full_table replication", stream['tap_stream_id']) sync_common.send_schema_message(stream, []) - if md_map.get((), {}).get('is-view'): - state = full_table.sync_view(conn_config, stream, state, desired_columns, md_map) - else: - state = full_table.sync_table(conn_config, stream, state, desired_columns, md_map) - return state + attemp = 0 + while True: + try: + if md_map.get((), {}).get('is-view'): + state = full_table.sync_view(conn_config, stream, state, desired_columns, md_map) + else: + state = full_table.sync_table(conn_config, stream, state, desired_columns, md_map) + return state + except Exception as e: + LOGGER.warn("error on read for a stream: %s. Message: %s", stream['tap_stream_id'], e) + if attemp > post_db.TRY_NUMBER: + raise e + else: + attemp = attemp + 1 # Possible state keys: replication_key, replication_key_value, version @@ -411,6 +420,8 @@ def main_impl(): conn_config['sslmode'] = 'require' post_db.CURSOR_ITER_SIZE = int(args.config.get('itersize', post_db.CURSOR_ITER_SIZE)) + post_db.TRY_NUMBER = int(args.config.get('trynumber', post_db.TRY_NUMBER)) + if args.discover: do_discovery(conn_config) diff --git a/tap_postgres/db.py b/tap_postgres/db.py index c7711c42..737148da 100644 --- a/tap_postgres/db.py +++ b/tap_postgres/db.py @@ -14,6 +14,7 @@ LOGGER = singer.get_logger('tap_postgres') CURSOR_ITER_SIZE = 20000 +TRY_NUMBER = 10 # pylint: disable=invalid-name,missing-function-docstring