diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index a2c82215d0e46..c519248e752d9 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -630,6 +630,24 @@ def update_schedule_type_for_connector( method="PATCH", endpoint=connector_id, data=json.dumps({"schedule_type": schedule_type}) ) + def get_columns_config_for_table( + self, connector_id: str, schema_name: str, table_name: str + ) -> Mapping[str, Any]: + """Fetches the source table columns config for a given table from the Fivetran API. + + Args: + connector_id (str): The Fivetran Connector ID. + schema_name (str): The Fivetran Schema name. + table_name (str): The Fivetran Table name. + + Returns: + Dict[str, Any]: Parsed json data from the response to this request. + """ + return self._make_connector_request( + method="GET", + endpoint=f"{connector_id}/schemas/{schema_name}/tables/{table_name}/columns", + ) + def start_sync(self, connector_id: str) -> None: """Initiates a sync of a Fivetran connector. diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index e19e422235d6f..7e675e4b74b25 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -153,7 +153,7 @@ class FivetranTable: enabled: bool name_in_destination: str - # We keep the raw data for columns to add it as `column_info in the metadata. + # We keep the raw data for columns to add it as `column_info` in the metadata. columns: Optional[Mapping[str, Any]] @classmethod diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py index 3d8c5db724e0c..76df67e6416e3 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py @@ -18,6 +18,10 @@ TEST_API_SECRET = "test_api_secret" TEST_ANOTHER_ACCOUNT_ID = "test_another_account_id" +TEST_SCHEMA_NAME = "schema_name_in_destination_1" +TEST_TABLE_NAME = "table_name_in_destination_1" +TEST_ANOTHER_TABLE_NAME = "another_table_name_in_destination_1" + # Taken from Fivetran API documentation # https://fivetran.com/docs/rest-api/api-reference/groups/list-all-groups SAMPLE_GROUPS = { @@ -401,16 +405,47 @@ def get_sample_schema_config_for_connector(table_name: str) -> Mapping[str, Any] SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR = get_sample_schema_config_for_connector( - table_name="table_name_in_destination_1" + table_name=TEST_TABLE_NAME ) # We change the name of the original example to test the sync and poll materialization method ALTERED_SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR = get_sample_schema_config_for_connector( - table_name="another_table_name_in_destination_1" + table_name=TEST_ANOTHER_TABLE_NAME ) SAMPLE_SUCCESS_MESSAGE = {"code": "Success", "message": "Operation performed."} +SAMPLE_SOURCE_TABLE_COLUMNS_CONFIG = { + "code": "Success", + "message": "Operation performed.", + "data": { + "columns": { + "property1": { + "name_in_destination": "column_name_in_destination_1", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, + }, + "property2": { + "name_in_destination": "column_name_in_destination_2", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, + }, + } + }, +} + def get_fivetran_connector_api_url(connector_id: str) -> str: return ( @@ -480,9 +515,10 @@ def all_api_mocks_fixture( group_id: str, fetch_workspace_data_api_mocks: responses.RequestsMock, ) -> Iterator[responses.RequestsMock]: + test_connector_api_url = get_fivetran_connector_api_url(connector_id) fetch_workspace_data_api_mocks.add( method=responses.GET, - url=get_fivetran_connector_api_url(connector_id), + url=test_connector_api_url, json=get_sample_connection_details( succeeded_at=TEST_MAX_TIME_STR, failed_at=TEST_PREVIOUS_MAX_TIME_STR ), @@ -490,7 +526,7 @@ def all_api_mocks_fixture( ) fetch_workspace_data_api_mocks.add( method=responses.PATCH, - url=get_fivetran_connector_api_url(connector_id), + url=test_connector_api_url, json=get_sample_connection_details( succeeded_at=TEST_MAX_TIME_STR, failed_at=TEST_PREVIOUS_MAX_TIME_STR ), @@ -498,22 +534,28 @@ def all_api_mocks_fixture( ) fetch_workspace_data_api_mocks.add( method=responses.POST, - url=f"{get_fivetran_connector_api_url(connector_id)}/force", + url=f"{test_connector_api_url}/force", json=SAMPLE_SUCCESS_MESSAGE, status=200, ) fetch_workspace_data_api_mocks.add( method=responses.POST, - url=f"{get_fivetran_connector_api_url(connector_id)}/resync", + url=f"{test_connector_api_url}/resync", json=SAMPLE_SUCCESS_MESSAGE, status=200, ) fetch_workspace_data_api_mocks.add( method=responses.POST, - url=f"{get_fivetran_connector_api_url(connector_id)}/schemas/tables/resync", + url=f"{test_connector_api_url}/schemas/tables/resync", json=SAMPLE_SUCCESS_MESSAGE, status=200, ) + fetch_workspace_data_api_mocks.add( + method=responses.GET, + url=f"{test_connector_api_url}/schemas/{TEST_SCHEMA_NAME}/tables/{TEST_TABLE_NAME}/columns", + json=SAMPLE_SOURCE_TABLE_COLUMNS_CONFIG, + status=200, + ) yield fetch_workspace_data_api_mocks diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py index a371a44feec57..48da73164c512 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py @@ -19,6 +19,8 @@ TEST_API_SECRET, TEST_MAX_TIME_STR, TEST_PREVIOUS_MAX_TIME_STR, + TEST_SCHEMA_NAME, + TEST_TABLE_NAME, get_fivetran_connector_api_url, get_sample_connection_details, ) @@ -58,6 +60,17 @@ def test_basic_resource_request( assert connector_id in all_api_mocks.calls[1].request.url assert all_api_mocks.calls[1].request.method == "PATCH" + # columns config calls + all_api_mocks.calls.reset() + client.get_columns_config_for_table( + connector_id=connector_id, schema_name=TEST_SCHEMA_NAME, table_name=TEST_TABLE_NAME + ) + assert len(all_api_mocks.calls) == 1 + assert ( + f"{connector_id}/schemas/{TEST_SCHEMA_NAME}/tables/{TEST_TABLE_NAME}/columns" + in all_api_mocks.calls[0].request.url + ) + # sync calls all_api_mocks.calls.reset() client.start_sync(connector_id=connector_id)