From e860c44516e98dc15ce7fd7a880f29e54f4bbcbc Mon Sep 17 00:00:00 2001 From: ReubenFrankel Date: Tue, 22 Mar 2022 16:55:37 +0000 Subject: [PATCH 1/2] Poll job when `status` is not `completed` --- tap_auth0/streams.py | 10 +++--- tap_auth0/tests/test_sync.py | 59 ++++++++++++++++++++++++++++++++++++ tap_auth0/tests/utils.py | 31 ++++++++++++++++--- 3 files changed, 92 insertions(+), 8 deletions(-) diff --git a/tap_auth0/streams.py b/tap_auth0/streams.py index 57821d4..52bdf2f 100644 --- a/tap_auth0/streams.py +++ b/tap_auth0/streams.py @@ -96,11 +96,13 @@ def _poll_job(self, get_job_request: requests.PreparedRequest, count=1) -> Any: get_job_response = self._request(get_job_request, None) job = get_job_response.json() - if job["status"] == "pending": - time.sleep(job_poll_interval_ms / 1000) - return self._poll_job(get_job_request, count=count + 1) + status = job["status"] - return job + if status == "completed": + return job + + time.sleep(job_poll_interval_ms / 1000) + return self._poll_job(get_job_request, count=count + 1) class ClientsStream(Auth0Stream): diff --git a/tap_auth0/tests/test_sync.py b/tap_auth0/tests/test_sync.py index aadf338..8466fe3 100644 --- a/tap_auth0/tests/test_sync.py +++ b/tap_auth0/tests/test_sync.py @@ -1,7 +1,10 @@ """Tests the tap using a mock base credentials config.""" +import gzip +import json import unittest +import ndjson import responses import singer @@ -31,6 +34,62 @@ def test_base_credentials_discovery(self): # expect valid catalog to be discovered self.assertEqual(len(catalog), 3, "Total streams from default catalog") + @responses.activate + def test_auth0_sync_users(self): + """Test sync users.""" + + tap = test_utils.set_up_tap_with_custom_catalog( + self.mock_config, ["stream_auth0_users"] + ) + + responses.add( + responses.POST, + "https://test.auth0.com/oauth/token", + json={"access_token": "12345", "expires_in": 3622}, + status=200, + ) + + job_id = "12345" + job = test_utils.users_export_job_pending(job_id) + responses.add( + responses.POST, + "https://test.auth0.com/api/v2/jobs/users-exports", + status=200, + json=job, + ) + + job = test_utils.users_export_job_processing(job_id) + responses.add( + responses.POST, + "https://test.auth0.com/api/v2/jobs/users-exports", + status=200, + json=job, + ) + + job = test_utils.users_export_job_completed(job_id) + responses.add( + responses.GET, + f"https://test.auth0.com/api/v2/jobs/{job_id}", + status=200, + json=job, + ) + + responses.add( + responses.GET, + job["location"], + status=200, + body=gzip.compress( + json.dumps(test_utils.users_data, cls=ndjson.Encoder).encode() + ), + ) + + tap.sync_all() + + self.assertEqual(len(test_utils.SINGER_MESSAGES), 3) + self.assertIsInstance(test_utils.SINGER_MESSAGES[0], singer.SchemaMessage) + self.assertIsInstance(test_utils.SINGER_MESSAGES[1], singer.RecordMessage) + self.assertIsInstance(test_utils.SINGER_MESSAGES[2], singer.StateMessage) + @responses.activate def test_auth0_sync_clients(self): """Test sync clients.""" diff --git a/tap_auth0/tests/utils.py b/tap_auth0/tests/utils.py index 3dbeb08..ab8dfda 100644 --- a/tap_auth0/tests/utils.py +++ b/tap_auth0/tests/utils.py @@ -1,14 +1,37 @@ """ Utilities used in this module """ -from tap_auth0.tap import TapAuth0 - -from singer_sdk.helpers._singer import Catalog from singer_sdk.helpers import _catalog +from singer_sdk.helpers._singer import Catalog +from tap_auth0.tap import TapAuth0 SINGER_MESSAGES = [] -clients_data = {"start": 0, "total": 100, "clients": [{"client_id": "client_id_12345"}]} +users_data = [{"user_id": "user_id_12345"}] + +def users_export_job_pending(job_id: str): + return { + "status": "pending", + "id": job_id, + } + + +def users_export_job_processing(job_id: str): + return { + "status": "processing", + "id": job_id, + } + + +def users_export_job_completed(job_id: str): + return { + "status": "completed", + "id": job_id, + "location": "https://test.com", + } + + +clients_data = {"start": 0, "total": 100, "clients": [{"client_id": "client_id_12345"}]} logs_data = [{"log_id": "log_id_12345"}] From ba7a49e0c3f89a31c5b6d3c7d52c7a95f30e41f8 Mon Sep 17 00:00:00 2001 From: ReubenFrankel Date: Tue, 22 Mar 2022 16:58:36 +0000 Subject: [PATCH 2/2] Throw `RuntimeError` with message on job failure --- tap_auth0/streams.py | 7 ++++++ tap_auth0/tests/test_sync.py | 48 ++++++++++++++++++++++++++++++++++++ tap_auth0/tests/utils.py | 13 ++++++++++ 3 files changed, 68 insertions(+) diff --git a/tap_auth0/streams.py b/tap_auth0/streams.py index 52bdf2f..959f6fd 100644 --- a/tap_auth0/streams.py +++ b/tap_auth0/streams.py @@ -101,6 +101,13 @@ def _poll_job(self, get_job_request: requests.PreparedRequest, count=1) -> Any: if status == "completed": return job + if status == "failed": + id_ = job["id"] + summary: dict[str, int] = job["summary"] + summary_format = ", ".join(f"{k}: {v}" for k, v in summary.items()) + + raise RuntimeError(f"Job '{id_}' failed ({summary_format})") + time.sleep(job_poll_interval_ms / 1000) return self._poll_job(get_job_request, count=count + 1) diff --git a/tap_auth0/tests/test_sync.py b/tap_auth0/tests/test_sync.py index 8466fe3..19427db 100644 --- a/tap_auth0/tests/test_sync.py +++ b/tap_auth0/tests/test_sync.py @@ -90,6 +90,54 @@ def test_auth0_sync_users(self): self.assertIsInstance(test_utils.SINGER_MESSAGES[1], singer.RecordMessage) self.assertIsInstance(test_utils.SINGER_MESSAGES[2], singer.StateMessage) + @responses.activate + def test_auth0_sync_users_failed(self): + """Test sync users with failed job""" + + tap = test_utils.set_up_tap_with_custom_catalog( + self.mock_config, ["stream_auth0_users"] + ) + + responses.add( + responses.POST, + "https://test.auth0.com/oauth/token", + json={"access_token": "12345", "expires_in": 3622}, + status=200, + ) + + job_id = "12345" + job = test_utils.users_export_job_pending(job_id) + responses.add( + responses.POST, + "https://test.auth0.com/api/v2/jobs/users-exports", + status=200, + json=job, + ) + + job = test_utils.users_export_job_processing(job_id) + responses.add( + responses.POST, + "https://test.auth0.com/api/v2/jobs/users-exports", + status=200, + json=job, + ) + + job = test_utils.users_export_job_failed(job_id) + responses.add( + responses.GET, + f"https://test.auth0.com/api/v2/jobs/{job_id}", + status=200, + json=job, + ) + + with self.assertRaises(RuntimeError) as err: + tap.sync_all() + + self.assertIn(f"Job '{job_id}' failed", str(err.exception)) + + self.assertEqual(len(test_utils.SINGER_MESSAGES), 1) + self.assertIsInstance(test_utils.SINGER_MESSAGES[0], singer.SchemaMessage) + @responses.activate def test_auth0_sync_clients(self): """Test sync clients.""" diff --git a/tap_auth0/tests/utils.py b/tap_auth0/tests/utils.py index ab8dfda..e0098c3 100644 --- a/tap_auth0/tests/utils.py +++ b/tap_auth0/tests/utils.py @@ -31,6 +31,19 @@ def users_export_job_completed(job_id: str): } +def users_export_job_failed(job_id: str): + return { + "status": "failed", + "id": job_id, + "summary": { + "failed": len(users_data), + "updated": 0, + "inserted": 0, + "total": len(users_data), + }, + } + + clients_data = {"start": 0, "total": 100, "clients": [{"client_id": "client_id_12345"}]} logs_data = [{"log_id": "log_id_12345"}]