Skip to content

Commit

Permalink
[dagster-airbyte] Implement sync_and_poll method in AirbyteCloudClient
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Dec 12, 2024
1 parent 01c67e0 commit 4b9b2c1
Showing 1 changed file with 78 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,84 @@ def get_job_details(self, job_id: int) -> Mapping[str, Any]:
def cancel_job(self, job_id: int) -> Mapping[str, Any]:
return self.make_request(method="DELETE", endpoint=f"jobs/{job_id}")

def sync_and_poll(
self,
connection_id: str,
poll_interval: Optional[float] = None,
poll_timeout: Optional[float] = None,
) -> AirbyteOutput:
"""Initializes a sync operation for the given connection, and polls until it completes.
Args:
connection_id (str): The Airbyte Connection ID. You can retrieve this value from the
"Connection" tab of a given connection in the Airbyte UI.
poll_interval (float): The time (in seconds) that will be waited between successive polls.
poll_timeout (float): The maximum time that will wait before this operation is timed
out. By default, this will never time out.
Returns:
:py:class:`~AirbyteOutput`:
Details of the sync job.
"""
connection_details = self.get_connection_details(connection_id)
job_details = self.start_sync(connection_id)
# TODO: Use AirbyteJob class
job_id = job_details["id"]

self._log.info(f"Job {job_id} initialized for connection_id={connection_id}.")
start = time.monotonic()
logged_attempts = 0
logged_lines = 0
state = None

try:
while True:
if poll_timeout and start + poll_timeout < time.monotonic():
raise Failure(
f"Timeout: Airbyte job {job_id} is not ready after the timeout"
f" {poll_timeout} seconds"
)
time.sleep(poll_interval or self.poll_interval)
job_details = self.get_job_status(connection_id, job_id)
attempts = cast(List, job_details.get("attempts", []))
cur_attempt = len(attempts)
# spit out the available Airbyte log info
if cur_attempt:
if self._should_forward_logs:
log_lines = attempts[logged_attempts].get("logs", {}).get("logLines", [])

for line in log_lines[logged_lines:]:
sys.stdout.write(line + "\n")
sys.stdout.flush()
logged_lines = len(log_lines)

# if there's a next attempt, this one will have no more log messages
if logged_attempts < cur_attempt - 1:
logged_lines = 0
logged_attempts += 1

state = job_details["status"]
if state in (AirbyteState.RUNNING, AirbyteState.PENDING, AirbyteState.INCOMPLETE):
continue
elif state == AirbyteState.SUCCEEDED:
break
elif state == AirbyteState.ERROR:
raise Failure(f"Job failed: {job_id}")
elif state == AirbyteState.CANCELLED:
raise Failure(f"Job was cancelled: {job_id}")
else:
raise Failure(f"Encountered unexpected state `{state}` for job_id {job_id}")
finally:
# if Airbyte sync has not completed, make sure to cancel it so that it doesn't outlive
# the python process
if (
state not in (AirbyteState.SUCCEEDED, AirbyteState.ERROR, AirbyteState.CANCELLED)
and self.cancel_sync_on_run_termination
):
self.cancel_job(job_id)

return AirbyteOutput(job_details=job_details, connection_details=connection_details)


@experimental
class AirbyteCloudWorkspace(ConfigurableResource):
Expand Down

0 comments on commit 4b9b2c1

Please sign in to comment.