Skip to content

Commit

Permalink
[3/n][dagster-fivetran] Implement FivetranClient for rework (#25756)
Browse files Browse the repository at this point in the history
## Summary & Motivation

This PR implements `FivetranClient`, which is based on the legacy
`FivetranResource` code. Basically reusing how `make_request` was
implemented and adding more methods to leverage in a subsequent PR to
fetch the Fivetran workspace data.

## How I Tested These Changes

Additional unit test
  • Loading branch information
maximearmstrong authored Nov 8, 2024
1 parent c27d160 commit 83582b5
Show file tree
Hide file tree
Showing 4 changed files with 375 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
from dagster_fivetran.utils import get_fivetran_connector_url, get_fivetran_logs_url

FIVETRAN_API_BASE = "https://api.fivetran.com"
FIVETRAN_API_VERSION_PATH = "v1/"
FIVETRAN_CONNECTOR_PATH = "connectors/"
FIVETRAN_API_VERSION = "v1"
FIVETRAN_CONNECTOR_ENDPOINT = "connectors"
FIVETRAN_API_VERSION_PATH = f"{FIVETRAN_API_VERSION}/"
FIVETRAN_CONNECTOR_PATH = f"{FIVETRAN_CONNECTOR_ENDPOINT}/"

# default polling interval (in seconds)
DEFAULT_POLL_INTERVAL = 10
Expand Down Expand Up @@ -463,7 +465,7 @@ def __init__(

@property
def _auth(self) -> HTTPBasicAuth:
raise NotImplementedError()
return HTTPBasicAuth(self.api_key, self.api_secret)

@property
@cached_method
Expand All @@ -472,40 +474,102 @@ def _log(self) -> logging.Logger:

@property
def api_base_url(self) -> str:
raise NotImplementedError()
return f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}"

@property
def api_connector_url(self) -> str:
raise NotImplementedError()
return f"{self.api_base_url}/{FIVETRAN_CONNECTOR_ENDPOINT}"

def make_connector_request(
def _make_connector_request(
self, method: str, endpoint: str, data: Optional[str] = None
) -> Mapping[str, Any]:
raise NotImplementedError()
return self._make_request(method, f"{FIVETRAN_CONNECTOR_ENDPOINT}/{endpoint}", data)

def make_request(
def _make_request(
self, method: str, endpoint: str, data: Optional[str] = None
) -> Mapping[str, Any]:
raise NotImplementedError()
"""Creates and sends a request to the desired Fivetran API endpoint.
Args:
method (str): The http method to use for this request (e.g. "POST", "GET", "PATCH").
endpoint (str): The Fivetran API endpoint to send this request to.
data (Optional[str]): JSON-formatted data string to be included in the request.
Returns:
Dict[str, Any]: Parsed json data from the response to this request.
"""
url = f"{self.api_base_url}/{endpoint}"
headers = {
"User-Agent": f"dagster-fivetran/{__version__}",
"Content-Type": "application/json;version=2",
}

num_retries = 0
while True:
try:
response = requests.request(
method=method,
url=url,
headers=headers,
auth=self._auth,
data=data,
timeout=int(os.getenv("DAGSTER_FIVETRAN_API_REQUEST_TIMEOUT", "60")),
)
response.raise_for_status()
resp_dict = response.json()
return resp_dict["data"] if "data" in resp_dict else resp_dict
except RequestException as e:
self._log.error("Request to Fivetran API failed: %s", e)
if num_retries == self.request_max_retries:
break
num_retries += 1
time.sleep(self.request_retry_delay)

raise Failure(f"Max retries ({self.request_max_retries}) exceeded with url: {url}.")

def get_connector_details(self, connector_id: str) -> Mapping[str, Any]:
"""Fetches details about a given connector from the Fivetran API."""
raise NotImplementedError()
"""Gets details about a given connector from the Fivetran API.
Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
Returns:
Dict[str, Any]: Parsed json data from the response to this request.
"""
return self._make_connector_request(method="GET", endpoint=connector_id)

def get_connectors_for_group(self, group_id: str) -> Mapping[str, Any]:
"""Fetches all connectors for a given group from the Fivetran API."""
raise NotImplementedError()
"""Fetches all connectors for a given group from the Fivetran API.
Args:
group_id (str): The Fivetran Group ID.
Returns:
Dict[str, Any]: Parsed json data from the response to this request.
"""
return self._make_request("GET", f"groups/{group_id}/connectors")

def get_destination_details(self, destination_id: str) -> Mapping[str, Any]:
"""Fetches details about a given destination from the Fivetran API."""
raise NotImplementedError()
"""Fetches details about a given destination from the Fivetran API.
Args:
destination_id (str): The Fivetran Destination ID.
Returns:
Dict[str, Any]: Parsed json data from the response to this request.
"""
return self._make_request("GET", f"destinations/{destination_id}")

def get_groups(self) -> Mapping[str, Any]:
"""Fetches all groups from the Fivetran API."""
raise NotImplementedError()
"""Fetches all groups from the Fivetran API.
Returns:
Dict[str, Any]: Parsed json data from the response to this request.
"""
return self._make_request("GET", "groups")


@experimental
class FivetranWorkspace(ConfigurableResource):
"""This class represents a Fivetran workspace and provides utilities
to interact with Fivetran APIs.
Expand Down
Empty file.
Loading

0 comments on commit 83582b5

Please sign in to comment.