Skip to content

Commit

Permalink
Update post review
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Dec 19, 2024
1 parent d2c39d3 commit 3e9f644
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ def airbyte_assets(
@airbyte_assets(
connection_id="airbyte_connection_id",
name="airbyte_connection_id",
group_name="airbyte_connection_id",
workspace=airbyte_workspace,
)
def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: AirbyteCloudWorkspace):
Expand Down Expand Up @@ -74,8 +72,8 @@ def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: Airbyt
class CustomDagsterAirbyteTranslator(DagsterAirbyteTranslator):
def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec:
default_spec = super().get_asset_spec(props)
return default_spec.replace_attributes(
key=asset_spec.key.with_prefix("my_prefix"),
return default_spec.merge_attributes(
metadata={"custom": "metadata"},
)
airbyte_workspace = AirbyteCloudWorkspace(
Expand All @@ -87,8 +85,6 @@ def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec:
@airbyte_assets(
connection_id="airbyte_connection_id",
name="airbyte_connection_id",
group_name="airbyte_connection_id",
workspace=airbyte_workspace,
dagster_airbyte_translator=CustomDagsterAirbyteTranslator()
)
Expand All @@ -104,7 +100,7 @@ def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: Airbyt
return multi_asset(
name=name,
group_name=group_name,
can_subset=False,
can_subset=True,
specs=[
spec
for spec in workspace.load_asset_specs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1102,8 +1102,8 @@ def build_airbyte_assets_definitions(
class CustomDagsterAirbyteTranslator(DagsterAirbyteTranslator):
def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec:
default_spec = super().get_asset_spec(props)
return default_spec.replace_attributes(
key=asset_spec.key.with_prefix("my_prefix"),
return default_spec.merge_attributes(
metadata={"custom": "metadata"},
)
airbyte_workspace = AirbyteCloudWorkspace(
Expand All @@ -1129,19 +1129,22 @@ def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec:
dagster_airbyte_translator=dagster_airbyte_translator
)

connection_ids = {
check.not_none(AirbyteMetadataSet.extract(spec.metadata).connection_id)
connections = {
(
check.not_none(AirbyteMetadataSet.extract(spec.metadata).connection_id),
check.not_none(AirbyteMetadataSet.extract(spec.metadata).connection_name),
)
for spec in all_asset_specs
}

_asset_fns = []
for connection_id in connection_ids:
for connection_id, connection_name in connections:

@airbyte_assets(
connection_id=connection_id,
workspace=workspace,
name=_clean_name(connection_id),
group_name=_clean_name(connection_id),
name=_clean_name(connection_name),
group_name=_clean_name(connection_name),
dagster_airbyte_translator=dagster_airbyte_translator,
)
def _asset_fn(context: AssetExecutionContext, airbyte: AirbyteCloudWorkspace):
Expand Down

0 comments on commit 3e9f644

Please sign in to comment.