Skip to content

Commit

Permalink
Merge branch 'master' into pdrm/08-22-_docs-beta_update_gh_action
Browse files Browse the repository at this point in the history
  • Loading branch information
PedramNavid authored Aug 23, 2024
2 parents 2625137 + 3747660 commit 7516c2b
Show file tree
Hide file tree
Showing 10 changed files with 770 additions and 212 deletions.
9 changes: 5 additions & 4 deletions .github/workflows/build-docs-revamp.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
name: Deploy Docs Revamp
on:
push:
branches:
- master
- docs-prod
pull_request:
paths:
- .github/workflows/build-docs-revamp.yml
- docs/docs-beta/**
- examples/docs_beta_snippets/**
paths:
- docs/docs-beta/**
- examples/docs_beta_snippets/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ def get_webserver_url(self) -> str:
raise NotImplementedError("This method must be implemented by subclasses.")


@record
class AirflowInstance:
auth_backend: AirflowAuthBackend
name: str
def __init__(self, auth_backend: AirflowAuthBackend, name: str) -> None:
self.auth_backend = auth_backend
self.name = name

@property
def normalized_name(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from collections import defaultdict
from datetime import timedelta
from typing import Dict, List, Set, Tuple
from typing import Dict, List, Sequence, Set, Tuple

from dagster import (
AssetKey,
Expand All @@ -18,6 +18,7 @@
from dagster._core.definitions.repository_definition.repository_definition import (
RepositoryDefinition,
)
from dagster._core.utils import toposort_flatten
from dagster._time import datetime_from_timestamp, get_current_datetime, get_current_timestamp

from .airflow_instance import AirflowInstance, TaskInstance
Expand All @@ -42,6 +43,7 @@ def airflow_dag_sensor(context: SensorEvaluationContext) -> SensorResult:
)
current_date = get_current_datetime()
materializations_to_report: List[Tuple[float, AssetMaterialization]] = []
toposorted_keys = toposorted_asset_keys(repository_def)
for dag_id, (dag_key, task_keys) in retrieve_unmigrated_dag_keys(repository_def).items():
# For now, we materialize assets representing tasks only when the whole dag completes.
# With a more robust cursor that can let us know when we've seen a particular task run already, then we can relax this constraint.
Expand Down Expand Up @@ -83,8 +85,8 @@ def airflow_dag_sensor(context: SensorEvaluationContext) -> SensorResult:
**metadata,
"Run Details": MarkdownMetadataValue(f"[View Run]({task_run.details_url})"),
"Task Logs": MarkdownMetadataValue(f"[View Logs]({task_run.log_url})"),
"Start Date": TimestampMetadataValue(dag_run.start_date),
"End Date": TimestampMetadataValue(dag_run.end_date),
"Start Date": TimestampMetadataValue(task_run.start_date),
"End Date": TimestampMetadataValue(task_run.end_date),
}
materializations_to_report.append(
(
Expand All @@ -96,8 +98,10 @@ def airflow_dag_sensor(context: SensorEvaluationContext) -> SensorResult:
),
)
)
# Sort materialization
sorted_mats = sorted(materializations_to_report, key=lambda x: x[0])
# Sort materializations by end date and toposort order
sorted_mats = sorted(
materializations_to_report, key=lambda x: (x[0], toposorted_keys.index(x[1].asset_key))
)
context.update_cursor(str(current_date.timestamp()))
return SensorResult(
asset_events=[sorted_mat[1] for sorted_mat in sorted_mats],
Expand Down Expand Up @@ -136,3 +140,14 @@ def retrieve_unmigrated_dag_keys(
else:
task_keys_per_dag[dag_id].update((task_id, spec.key) for spec in assets_def.specs)
return {dag_id: (key, task_keys_per_dag[dag_id]) for dag_id, key in key_per_dag.items()}


def toposorted_asset_keys(
repository_def: RepositoryDefinition,
) -> Sequence[AssetKey]:
asset_dep_graph = defaultdict(set) # upstreams
for assets_def in repository_def.assets_defs_by_key.values():
for spec in assets_def.specs:
asset_dep_graph[spec.key].update(dep.asset_key for dep in spec.deps)

return toposort_flatten(asset_dep_graph)
58 changes: 48 additions & 10 deletions examples/experimental/dagster-airlift/dagster_airlift/core/utils.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,61 @@
from typing import Optional

from dagster import AssetsDefinition
from dagster import (
AssetsDefinition,
_check as check,
)

MIGRATED_TAG = "airlift/task_migrated"
DAG_ID_TAG = "airlift/dag_id"
TASK_ID_TAG = "airlift/task_id"


def get_task_id_from_asset(assets_def: AssetsDefinition) -> Optional[str]:
if assets_def.node_def.tags and "airlift/task_id" in assets_def.node_def.tags:
return assets_def.node_def.tags["airlift/task_id"]
if len(assets_def.node_def.name.split("__")) == 2:
return assets_def.node_def.name.split("__")[1]
return None
return _get_prop_from_asset(assets_def, TASK_ID_TAG, 1)


def get_dag_id_from_asset(assets_def: AssetsDefinition) -> Optional[str]:
if assets_def.node_def.tags and "airlift/dag_id" in assets_def.node_def.tags:
return assets_def.node_def.tags["airlift/dag_id"]
return _get_prop_from_asset(assets_def, DAG_ID_TAG, 0)


def _get_prop_from_asset(
assets_def: AssetsDefinition, prop_tag: str, position: int
) -> Optional[str]:
prop_from_tags = None
if any(prop_tag in spec.tags for spec in assets_def.specs):
prop = None
for spec in assets_def.specs:
if prop is None:
prop = spec.tags[prop_tag]
else:
if spec.tags.get(prop_tag) is None:
check.failed(
f"Missing {prop_tag} tag in spec {spec.key} for {assets_def.node_def.name}"
)
check.invariant(
prop == spec.tags[prop_tag],
f"Task ID mismatch within same AssetsDefinition: {prop} != {spec.tags[prop_tag]}",
)
prop_from_tags = prop
prop_from_op_tags = None
if assets_def.node_def.tags and prop_tag in assets_def.node_def.tags:
prop_from_op_tags = assets_def.node_def.tags[prop_tag]
prop_from_name = None
if len(assets_def.node_def.name.split("__")) == 2:
return assets_def.node_def.name.split("__")[0]
return None
prop_from_name = assets_def.node_def.name.split("__")[position]
if prop_from_tags and prop_from_op_tags:
check.invariant(
prop_from_tags == prop_from_op_tags,
f"ID mismatch between asset tags and op tags: {prop_from_tags} != {prop_from_op_tags}",
)
if prop_from_tags and prop_from_name:
check.invariant(
prop_from_tags == prop_from_name,
f"ID mismatch between tags and name: {prop_from_tags} != {prop_from_name}",
)
if prop_from_op_tags and prop_from_name:
check.invariant(
prop_from_op_tags == prop_from_name,
f"ID mismatch between op tags and name: {prop_from_op_tags} != {prop_from_name}",
)
return prop_from_tags or prop_from_op_tags or prop_from_name

This file was deleted.

Loading

0 comments on commit 7516c2b

Please sign in to comment.