Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3/n][dagster-fivetran] Implement FivetranClient for rework #25756

Merged
merged 9 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
from dagster_fivetran.utils import get_fivetran_connector_url, get_fivetran_logs_url

FIVETRAN_API_BASE = "https://api.fivetran.com"
FIVETRAN_API_VERSION_PATH = "v1/"
FIVETRAN_CONNECTOR_PATH = "connectors/"
FIVETRAN_API_VERSION = "v1"
FIVETRAN_CONNECTOR_ENDPOINT = "connectors"
FIVETRAN_API_VERSION_PATH = f"{FIVETRAN_API_VERSION}/"
FIVETRAN_CONNECTOR_PATH = f"{FIVETRAN_CONNECTOR_ENDPOINT}/"

# default polling interval (in seconds)
DEFAULT_POLL_INTERVAL = 10
Expand Down Expand Up @@ -463,7 +465,7 @@ def __init__(

@property
def _auth(self) -> HTTPBasicAuth:
raise NotImplementedError()
return HTTPBasicAuth(self.api_key, self.api_secret)

@property
@cached_method
Expand All @@ -472,40 +474,102 @@ def _log(self) -> logging.Logger:

@property
def api_base_url(self) -> str:
raise NotImplementedError()
return f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}"

@property
def api_connector_url(self) -> str:
raise NotImplementedError()
return f"{self.api_base_url}/{FIVETRAN_CONNECTOR_ENDPOINT}"

def make_connector_request(
def _make_connector_request(
self, method: str, endpoint: str, data: Optional[str] = None
) -> Mapping[str, Any]:
raise NotImplementedError()
return self._make_request(method, f"{FIVETRAN_CONNECTOR_ENDPOINT}/{endpoint}", data)

def make_request(
def _make_request(
self, method: str, endpoint: str, data: Optional[str] = None
) -> Mapping[str, Any]:
raise NotImplementedError()
"""Creates and sends a request to the desired Fivetran API endpoint.

Args:
method (str): The http method to use for this request (e.g. "POST", "GET", "PATCH").
endpoint (str): The Fivetran API endpoint to send this request to.
data (Optional[str]): JSON-formatted data string to be included in the request.

Returns:
Dict[str, Any]: Parsed json data from the response to this request.
"""
url = f"{self.api_base_url}/{endpoint}"
headers = {
"User-Agent": f"dagster-fivetran/{__version__}",
"Content-Type": "application/json;version=2",
}

num_retries = 0
while True:
try:
response = requests.request(
method=method,
url=url,
headers=headers,
auth=self._auth,
data=data,
timeout=int(os.getenv("DAGSTER_FIVETRAN_API_REQUEST_TIMEOUT", "60")),
)
response.raise_for_status()
resp_dict = response.json()
return resp_dict["data"] if "data" in resp_dict else resp_dict
except RequestException as e:
self._log.error("Request to Fivetran API failed: %s", e)
if num_retries == self.request_max_retries:
break
num_retries += 1
time.sleep(self.request_retry_delay)

raise Failure(f"Max retries ({self.request_max_retries}) exceeded with url: {url}.")

def get_connector_details(self, connector_id: str) -> Mapping[str, Any]:
"""Fetches details about a given connector from the Fivetran API."""
raise NotImplementedError()
"""Gets details about a given connector from the Fivetran API.

Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.

Returns:
Dict[str, Any]: Parsed json data from the response to this request.
"""
return self._make_connector_request(method="GET", endpoint=connector_id)

def get_connectors_for_group(self, group_id: str) -> Mapping[str, Any]:
"""Fetches all connectors for a given group from the Fivetran API."""
raise NotImplementedError()
"""Fetches all connectors for a given group from the Fivetran API.

Args:
group_id (str): The Fivetran Group ID.

Returns:
Dict[str, Any]: Parsed json data from the response to this request.
"""
return self._make_request("GET", f"groups/{group_id}/connectors")

def get_destination_details(self, destination_id: str) -> Mapping[str, Any]:
"""Fetches details about a given destination from the Fivetran API."""
raise NotImplementedError()
"""Fetches details about a given destination from the Fivetran API.

Args:
destination_id (str): The Fivetran Destination ID.

Returns:
Dict[str, Any]: Parsed json data from the response to this request.
"""
return self._make_request("GET", f"destinations/{destination_id}")

def get_groups(self) -> Mapping[str, Any]:
"""Fetches all groups from the Fivetran API."""
raise NotImplementedError()
"""Fetches all groups from the Fivetran API.

Returns:
Dict[str, Any]: Parsed json data from the response to this request.
"""
return self._make_request("GET", "groups")


@experimental
class FivetranWorkspace(ConfigurableResource):
"""This class represents a Fivetran workspace and provides utilities
to interact with Fivetran APIs.
Expand Down
Loading