diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py index 7daf470948a46..a196a9ee5d10a 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py @@ -1011,6 +1011,84 @@ def get_job_details(self, job_id: int) -> Mapping[str, Any]: def cancel_job(self, job_id: int) -> Mapping[str, Any]: return self.make_request(method="DELETE", endpoint=f"jobs/{job_id}") + def sync_and_poll( + self, + connection_id: str, + poll_interval: Optional[float] = None, + poll_timeout: Optional[float] = None, + ) -> AirbyteOutput: + """Initializes a sync operation for the given connection, and polls until it completes. + + Args: + connection_id (str): The Airbyte Connection ID. You can retrieve this value from the + "Connection" tab of a given connection in the Airbyte UI. + poll_interval (float): The time (in seconds) that will be waited between successive polls. + poll_timeout (float): The maximum time that will wait before this operation is timed + out. By default, this will never time out. + + Returns: + :py:class:`~AirbyteOutput`: + Details of the sync job. + """ + connection_details = self.get_connection_details(connection_id) + job_details = self.start_sync(connection_id) + # TODO: Use AirbyteJob class + job_id = job_details["id"] + + self._log.info(f"Job {job_id} initialized for connection_id={connection_id}.") + start = time.monotonic() + logged_attempts = 0 + logged_lines = 0 + state = None + + try: + while True: + if poll_timeout and start + poll_timeout < time.monotonic(): + raise Failure( + f"Timeout: Airbyte job {job_id} is not ready after the timeout" + f" {poll_timeout} seconds" + ) + time.sleep(poll_interval or self.poll_interval) + job_details = self.get_job_status(connection_id, job_id) + attempts = cast(List, job_details.get("attempts", [])) + cur_attempt = len(attempts) + # spit out the available Airbyte log info + if cur_attempt: + if self._should_forward_logs: + log_lines = attempts[logged_attempts].get("logs", {}).get("logLines", []) + + for line in log_lines[logged_lines:]: + sys.stdout.write(line + "\n") + sys.stdout.flush() + logged_lines = len(log_lines) + + # if there's a next attempt, this one will have no more log messages + if logged_attempts < cur_attempt - 1: + logged_lines = 0 + logged_attempts += 1 + + state = job_details["status"] + if state in (AirbyteState.RUNNING, AirbyteState.PENDING, AirbyteState.INCOMPLETE): + continue + elif state == AirbyteState.SUCCEEDED: + break + elif state == AirbyteState.ERROR: + raise Failure(f"Job failed: {job_id}") + elif state == AirbyteState.CANCELLED: + raise Failure(f"Job was cancelled: {job_id}") + else: + raise Failure(f"Encountered unexpected state `{state}` for job_id {job_id}") + finally: + # if Airbyte sync has not completed, make sure to cancel it so that it doesn't outlive + # the python process + if ( + state not in (AirbyteState.SUCCEEDED, AirbyteState.ERROR, AirbyteState.CANCELLED) + and self.cancel_sync_on_run_termination + ): + self.cancel_job(job_id) + + return AirbyteOutput(job_details=job_details, connection_details=connection_details) + @experimental class AirbyteCloudWorkspace(ConfigurableResource):