-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[8/n][dagster-fivetran] Implement FivetranConnector and FivetranDestination #25889
Conversation
FivetranWorkspaceData
to FivetranConnectorTableProps
method
#25797
@@ -127,7 +131,9 @@ def _build_fivetran_assets( | |||
connector_id=connection_metadata.connector_id, | |||
name=connection_metadata.name, | |||
connector_url=connection_metadata.connector_url, | |||
schema_config=connection_metadata.schemas, | |||
schema_config=FivetranSchemaConfig.from_schema_config_details( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[1] FivetranConnectorTableProps
was updated to take a FivetranSchemaConfig
object instead of a Mapping[str, Any]
one. The goal here is to keep FivetranConnectorTableProps
as close as possible to the new implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that the two implementations are built across the same version of fivetran's API, it makes sense to me that we should be able to share implementation across these subclasses.
@@ -122,6 +122,9 @@ def get_complex_sample_connector_schema_config( | |||
"allowed": False, | |||
"reason_code": "SYSTEM_TABLE", | |||
}, | |||
"columns": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[1] Columns are required in the schema config API docs. By updating the legacy test samples, the goal is that the legacy tests succeed with the updated FivetranConnectorTableProps
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Historically this wasn't the case (#23631) and if the user did not customize which columns to sync for a table, the Fivetran API would not return column schema info - might be good to verify that this is not the case now and/or still make sure we have a fallback in case we don't get column level info
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's good feedback - I reverted the test changes in a171504 and made columns optional
@@ -23,6 +23,7 @@ def get_fivetran_connector_table_name(schema_name: str, table_name: str) -> str: | |||
return f"{schema_name}.{table_name}" | |||
|
|||
|
|||
# TODO: move to translator.py and make table_data a FivetranTable object |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO added because this is out-of-scope for this PR, but should be addressed when the legacy code is removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit; let's not leave todos in the code. At least 50% of the time they end up staying there 😢 - any time there's a todo, I feel like it should probably be a linear ticket (or subticket)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very fair - removed in a1cea87 and I created a linear ticket
connector_details: Mapping[str, Any], | ||
destination_details: Mapping[str, Any], | ||
schema_config_details: Mapping[str, Any], | ||
) -> "FivetranDestination": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return type annotation is incorrect - this method returns a FivetranConnector
instance but is annotated to return FivetranDestination
. The annotation should be updated to -> "FivetranConnector"
.
Spotted by Graphite Reviewer
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will leave for @dpeng817 to approve but looks good to me - left one quick comment about column schema which historically hasn't been included in all API responses - this has caused us troubl ein the past, might not be the case anymore though
@@ -122,6 +122,9 @@ def get_complex_sample_connector_schema_config( | |||
"allowed": False, | |||
"reason_code": "SYSTEM_TABLE", | |||
}, | |||
"columns": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Historically this wasn't the case (#23631) and if the user did not customize which columns to sync for a table, the Fivetran API would not return column schema info - might be good to verify that this is not the case now and/or still make sure we have a fallback in case we don't get column level info
0923217
to
14f49b7
Compare
4418da5
to
0997b15
Compare
0997b15
to
47a8a9d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a few comments + the test behavior change is a bit scary, i think we should aim to, within this PR, not have to update tests at all
for connector_id, connector_data in self.connectors_by_id.items(): | ||
connector_details = connector_data.properties | ||
connector_name = connector_details["schema"] | ||
connector_url = get_fivetran_connector_url(connector_details) | ||
|
||
schema_config = connector_details["schema_config"] | ||
|
||
destination_details = self.destinations_by_id[ | ||
connector_details["destination_id"] | ||
].properties | ||
database = destination_details.get("config", {}).get("database") | ||
service = destination_details.get("service") | ||
|
||
schemas_data = cast(Dict[str, Any], schema_config["schemas"]) | ||
for schema in schemas_data.values(): | ||
if schema["enabled"]: | ||
schema_name = schema["name_in_destination"] | ||
schema_tables: Dict[str, Dict[str, Any]] = cast( | ||
Dict[str, Dict[str, Any]], schema["tables"] | ||
) | ||
for table in schema_tables.values(): | ||
if table["enabled"]: | ||
table_name = table["name_in_destination"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🥳
@@ -23,6 +23,7 @@ def get_fivetran_connector_table_name(schema_name: str, table_name: str) -> str: | |||
return f"{schema_name}.{table_name}" | |||
|
|||
|
|||
# TODO: move to translator.py and make table_data a FivetranTable object |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit; let's not leave todos in the code. At least 50% of the time they end up staying there 😢 - any time there's a todo, I feel like it should probably be a linear ticket (or subticket)
@@ -127,7 +131,9 @@ def _build_fivetran_assets( | |||
connector_id=connection_metadata.connector_id, | |||
name=connection_metadata.name, | |||
connector_url=connection_metadata.connector_url, | |||
schema_config=connection_metadata.schemas, | |||
schema_config=FivetranSchemaConfig.from_schema_config_details( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that the two implementations are built across the same version of fivetran's API, it makes sense to me that we should be able to share implementation across these subclasses.
destination_details = client.get_destination_details(destination_id=group_id) | ||
destinations.append( | ||
FivetranContentData( | ||
content_type=FivetranContentType.DESTINATION, properties=destination_details | ||
) | ||
destination_id = destination_details["id"] | ||
|
||
destinations_by_id[destination_id] = FivetranDestination.from_destination_details( | ||
destination_details=destination_details |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm feels like all this string level munging should be avoidable now right? You should be able to directly construct a FivetranDestination
, FivetranConnector
, etc right and avoid all the bare indexing as a result?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated in 6e42757
@dpeng817 That's fair - I can revert the changes for the tests and support optional columns. Fivetran API reference states that columns are required in the schema config, but we can that when the the legacy code is deprecated. |
47a8a9d
to
6e42757
Compare
@maximearmstrong yea I think the change is probably reasonable, but generally good to consider that type of thing in isolation I think. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, a few design proposals for your consideration. Will leave it up to you whether you want to go forward with them.
) | ||
|
||
return FivetranWorkspaceData.from_content_data(connectors + destinations) | ||
if not connector.has_bad_status: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know I sent this property name privately, but definitely open to other names 😆
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the property to is_connected
in 5d9e68b
schema_config_details: Mapping[str, Any], | ||
) -> "FivetranConnector": | ||
return cls( | ||
id=connector_details["id"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the connector_details dictionary large? I think my preference is for the record to have just connector_details
and schema_config
as an attribute, and then destination_id
, has_bad_status
etc could all just be properties built off of it. But if the dictionary is too large, then I understand.
Given that this is only being persisted in the cache, I view it as a reversible decision either way, and will leave that up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dictionary is quite large. That said, I think you are right and values like has_bad_status should be properties.
Now working on the next PR to implement sync and poll methods, we often use the connector details to test conditions. In hindsight, I believe FivetranConnector
should be built only using the connector details retrieved from the API - the schema configs should be decoupled and stored in the FivetranWorkspaceData
object. I will update the PR to reflect these changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code updated in 6685d1b
service: str | ||
schema_config: "FivetranSchemaConfig" | ||
destination_id: str | ||
has_bad_status: bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do end up making this an attribute instead of a property, I think I would have a preference for storing the setup_state directly as an attribute, and then you can have is_invalid_state or something as a property on the enum.
a8daee3
to
283f3d8
Compare
Summary & Motivation
This PR implements
FivetranConnector
andFivetranDestination
, and removesFivetranContentData
andFivetranContentType
.This addresses the concerns raised here about the legacy code.
How I Tested These Changes
BK with same tests.