Skip to content

Commit

Permalink
Poll job when status is not completed
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenFrankel committed Mar 22, 2022
1 parent 0606f09 commit e860c44
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 8 deletions.
10 changes: 6 additions & 4 deletions tap_auth0/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,13 @@ def _poll_job(self, get_job_request: requests.PreparedRequest, count=1) -> Any:
get_job_response = self._request(get_job_request, None)
job = get_job_response.json()

if job["status"] == "pending":
time.sleep(job_poll_interval_ms / 1000)
return self._poll_job(get_job_request, count=count + 1)
status = job["status"]

return job
if status == "completed":
return job

time.sleep(job_poll_interval_ms / 1000)
return self._poll_job(get_job_request, count=count + 1)


class ClientsStream(Auth0Stream):
Expand Down
59 changes: 59 additions & 0 deletions tap_auth0/tests/test_sync.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""Tests the tap using a mock base credentials config."""

import gzip
import json
import unittest

import ndjson
import responses
import singer

Expand Down Expand Up @@ -31,6 +34,62 @@ def test_base_credentials_discovery(self):
# expect valid catalog to be discovered
self.assertEqual(len(catalog), 3, "Total streams from default catalog")

@responses.activate
def test_auth0_sync_users(self):
"""Test sync users."""

tap = test_utils.set_up_tap_with_custom_catalog(
self.mock_config, ["stream_auth0_users"]
)

responses.add(
responses.POST,
"https://test.auth0.com/oauth/token",
json={"access_token": "12345", "expires_in": 3622},
status=200,
)

job_id = "12345"
job = test_utils.users_export_job_pending(job_id)
responses.add(
responses.POST,
"https://test.auth0.com/api/v2/jobs/users-exports",
status=200,
json=job,
)

job = test_utils.users_export_job_processing(job_id)
responses.add(
responses.POST,
"https://test.auth0.com/api/v2/jobs/users-exports",
status=200,
json=job,
)

job = test_utils.users_export_job_completed(job_id)
responses.add(
responses.GET,
f"https://test.auth0.com/api/v2/jobs/{job_id}",
status=200,
json=job,
)

responses.add(
responses.GET,
job["location"],
status=200,
body=gzip.compress(
json.dumps(test_utils.users_data, cls=ndjson.Encoder).encode()
),
)

tap.sync_all()

self.assertEqual(len(test_utils.SINGER_MESSAGES), 3)
self.assertIsInstance(test_utils.SINGER_MESSAGES[0], singer.SchemaMessage)
self.assertIsInstance(test_utils.SINGER_MESSAGES[1], singer.RecordMessage)
self.assertIsInstance(test_utils.SINGER_MESSAGES[2], singer.StateMessage)

@responses.activate
def test_auth0_sync_clients(self):
"""Test sync clients."""
Expand Down
31 changes: 27 additions & 4 deletions tap_auth0/tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,37 @@
""" Utilities used in this module """
from tap_auth0.tap import TapAuth0

from singer_sdk.helpers._singer import Catalog
from singer_sdk.helpers import _catalog
from singer_sdk.helpers._singer import Catalog

from tap_auth0.tap import TapAuth0

SINGER_MESSAGES = []

clients_data = {"start": 0, "total": 100, "clients": [{"client_id": "client_id_12345"}]}
users_data = [{"user_id": "user_id_12345"}]


def users_export_job_pending(job_id: str):
return {
"status": "pending",
"id": job_id,
}


def users_export_job_processing(job_id: str):
return {
"status": "processing",
"id": job_id,
}


def users_export_job_completed(job_id: str):
return {
"status": "completed",
"id": job_id,
"location": "https://test.com",
}


clients_data = {"start": 0, "total": 100, "clients": [{"client_id": "client_id_12345"}]}
logs_data = [{"log_id": "log_id_12345"}]


Expand Down

0 comments on commit e860c44

Please sign in to comment.