Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-airlift] complete revamp #23369

Merged
merged 1 commit into from
Aug 5, 2024
Merged

Conversation

dpeng817
Copy link
Contributor

@dpeng817 dpeng817 commented Aug 1, 2024

Complete project revamp.

@dpeng817 dpeng817 marked this pull request as ready for review August 1, 2024 22:17
@dpeng817 dpeng817 requested a review from schrockn August 2, 2024 16:21
Comment on lines 404 to 410
# Each task_id.yaml will look something like this:
# - asset: my/asset/key
# tags: {}
# metadata: {}
# deps: [upstream/asset/key]
# compute_pointer: my.compute.pointer
# - asset: my/other/asset/key
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is totally configurable on a per-blueprint basis, correct?

@@ -0,0 +1,5 @@
dags:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can easily change this later but I do think we will want one file per dag

Comment on lines 419 to 472
class AssetComputePointer(BaseModel):
module_name: str
fn_name: str


class AssetInfo(BaseModel):
asset_key: str
tags: Dict[str, str] = {}
metadata: Dict[str, Any] = {}
deps: List[str] = []
asset_compute_pointer: Optional[AssetComputePointer] = None


class MultiAssetSpec(BaseModel):
asset_infos: List[AssetInfo]


class MigratingAssetsBlueprint(BaseModel):
type: Literal["external_asset"] = "external_asset"
asset_infos: List[AssetInfo]
migrated: bool
compute_kind: str
name: str

def build_defs(self) -> Definitions:
asset_specs = [
AssetSpec(
key=AssetKey.from_user_string(info.asset_key),
metadata=info.metadata,
tags=info.tags,
deps=[AssetDep(AssetKey.from_user_string(dep)) for dep in info.deps],
)
for info in self.asset_infos
]

@multi_asset(
specs=asset_specs,
name=self.name,
compute_kind=self.compute_kind,
)
def _external_asset() -> None:
if self.migrated:
for info in self.asset_infos:
ptr = check.not_none(info.asset_compute_pointer)
module = import_module(ptr.module_name)
compute_fn = getattr(module, ptr.fn_name)
compute_fn()

return Definitions(assets=[_external_asset])


class TaskMigrationState(BaseModel):
task_id: str
migrated: bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For our own sake, I think we should split these into different files, and make it super clear what we expect to be in the airflow process versus the dagster process. We will probably want to have different modules eventually.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heavily agreed about the different modules. I figure for a lot of people adding a heavyweight dependency to dagster within their airflow code will be a non-starter. I've been thinking that airlift will consist of the dagster component, and then a lightweight-pipes esque package with the only dependency being requests. Maybe we could even get away without that.

I'll put them into their own file for now.

Comment on lines 436 to 467
class MigratingAssetsBlueprint(BaseModel):
type: Literal["external_asset"] = "external_asset"
asset_infos: List[AssetInfo]
migrated: bool
compute_kind: str
name: str

def build_defs(self) -> Definitions:
asset_specs = [
AssetSpec(
key=AssetKey.from_user_string(info.asset_key),
metadata=info.metadata,
tags=info.tags,
deps=[AssetDep(AssetKey.from_user_string(dep)) for dep in info.deps],
)
for info in self.asset_infos
]

@multi_asset(
specs=asset_specs,
name=self.name,
compute_kind=self.compute_kind,
)
def _external_asset() -> None:
if self.migrated:
for info in self.asset_infos:
ptr = check.not_none(info.asset_compute_pointer)
module = import_module(ptr.module_name)
compute_fn = getattr(module, ptr.fn_name)
compute_fn()

return Definitions(assets=[_external_asset])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd also like to see the pattern where have an existing blueprint (e.g. dbt) and have it only return specs when it is not migrated.

Copy link
Member

@schrockn schrockn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one probably worth talking through live.

# metadata: {}
# deps: [upstream/asset/key]
# compute_pointer: my.compute.pointer
# - asset: my/other/asset/key
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type: python
python_fn: my_module.python_fn
assets:
  - key:
     tags:
     metadata:
     deps:

Comment on lines 460 to 465
if self.migrated:
for info in self.asset_infos:
ptr = check.not_none(info.asset_compute_pointer)
module = import_module(ptr.module_name)
compute_fn = getattr(module, ptr.fn_name)
compute_fn()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accessing the attr should happen in the factory function, not the body of the multi-asset

Copy link
Member

@schrockn schrockn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flushing through comments

@dpeng817 dpeng817 force-pushed the dpeng817/mwaa_auth_backend branch from 1eb3e6b to 587e720 Compare August 3, 2024 23:59
@dpeng817 dpeng817 force-pushed the dpeng817/airflow_blueprints branch from 4ff80f5 to de7ca4a Compare August 3, 2024 23:59
@dpeng817 dpeng817 force-pushed the dpeng817/mwaa_auth_backend branch from 587e720 to 2c7d6eb Compare August 4, 2024 00:03
@dpeng817 dpeng817 force-pushed the dpeng817/airflow_blueprints branch from de7ca4a to a57a8c1 Compare August 4, 2024 00:04
@dpeng817 dpeng817 force-pushed the dpeng817/mwaa_auth_backend branch from 2c7d6eb to 3247e12 Compare August 5, 2024 02:08
@dpeng817 dpeng817 force-pushed the dpeng817/airflow_blueprints branch from a57a8c1 to 7c7c8b4 Compare August 5, 2024 02:08
@dpeng817 dpeng817 changed the title [dagster-airlift] airflow migration blueprints [dagster-airlift] airflow migration tooling + defs from yaml Aug 5, 2024
@dpeng817 dpeng817 changed the title [dagster-airlift] airflow migration tooling + defs from yaml [dagster-airlift] complete revamp Aug 5, 2024
@dpeng817 dpeng817 force-pushed the dpeng817/airflow_blueprints branch 2 times, most recently from e745d9a to 15efcd4 Compare August 5, 2024 16:22
@dpeng817 dpeng817 force-pushed the dpeng817/airflow_blueprints branch 2 times, most recently from 1ad114f to a70502a Compare August 5, 2024 17:54
Base automatically changed from dpeng817/mwaa_auth_backend to master August 5, 2024 19:37
Comment on lines +12 to +25
class FromEnvVar(BaseModel):
env_var: str


class DbtProjectDefs(Blueprint):
type: Literal["dbt_project"] = "dbt_project"
dbt_project_path: FromEnvVar
group: Optional[str] = None

def build_defs(self) -> Definitions:
source_file_name_without_ext = self.source_file_name.split(".")[0]
dbt_project_path = Path(os.environ[self.dbt_project_path.env_var])
dbt_manifest_path = dbt_project_path / "target" / "manifest.json"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would prefer to see FromEnvVar go away so we don't rely on it

Comment on lines +15 to +18
-e ../../../../../python_modules/libraries/dagster-databricks
-e ../../../../../python_modules/libraries/dagster-pandas
-e ../../../../../python_modules/libraries/dagster-pyspark
-e ../../../../../python_modules/libraries/dagster-spark
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need all of these?

Copy link
Member

@schrockn schrockn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok given that this is experimental we can fix forward stuff, but

  1. Eliminating unnecessary dependencies very important (e.g. early design partners who do not use databricks should not install dagster-databricks)
  2. We should cut our dependency on blueprints.

@dpeng817 dpeng817 force-pushed the dpeng817/airflow_blueprints branch from a70502a to ba0ad3f Compare August 5, 2024 20:53
@dpeng817 dpeng817 force-pushed the dpeng817/airflow_blueprints branch from ba0ad3f to 6ea9e6b Compare August 5, 2024 21:05
@dpeng817 dpeng817 merged commit bb9aec7 into master Aug 5, 2024
1 check passed
@dpeng817 dpeng817 deleted the dpeng817/airflow_blueprints branch August 5, 2024 21:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants