Skip to content

Commit

Permalink
[dagster-airlift] move mark_as_dagster_migrating tests to unit (#24168)
Browse files Browse the repository at this point in the history
The tests for mark_as_dagster_migrating (including checking swizzling
behavior) were overkill for what they were actually doing. Can be better
handled with unit testing.
  • Loading branch information
dpeng817 authored Sep 4, 2024
1 parent 36dce92 commit c2ccdeb
Show file tree
Hide file tree
Showing 16 changed files with 105 additions and 373 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ def _get_validated_session(self, context: Context) -> requests.Session:
session = self.get_dagster_session(context)
dagster_url = self.get_dagster_url(context)
response = session.post(
f"{dagster_url}/graphql", json={"query": VERIFICATION_QUERY}, timeout=3
# Timeout in seconds
f"{dagster_url}/graphql",
json={"query": VERIFICATION_QUERY},
timeout=3,
)
if response.status_code != 200:
raise Exception(
Expand All @@ -50,7 +53,10 @@ def launch_runs_for_task(self, context: Context, dag_id: str, task_id: str) -> N
assets_to_trigger = {} # key is (repo_location, repo_name, job_name), value is list of asset keys
# create graphql client
response = session.post(
f"{dagster_url}/graphql", json={"query": ASSET_NODES_QUERY}, timeout=3
# Timeout in seconds
f"{dagster_url}/graphql",
json={"query": ASSET_NODES_QUERY},
timeout=3,
)
for asset_node in response.json()["data"]["assetNodes"]:
tags = {tag["key"]: tag["value"] for tag in asset_node["tags"]}
Expand Down Expand Up @@ -90,6 +96,7 @@ def launch_runs_for_task(self, context: Context, dag_id: str, task_id: str) -> N
"query": TRIGGER_ASSETS_MUTATION,
"variables": {"executionParams": execution_params},
},
# Timeout in seconds
timeout=3,
)
run_id = response.json()["data"]["launchPipelineExecution"]["run"]["id"]
Expand All @@ -103,6 +110,7 @@ def launch_runs_for_task(self, context: Context, dag_id: str, task_id: str) -> N
response = session.post(
f"{dagster_url}/graphql",
json={"query": RUNS_QUERY, "variables": {"runId": run_id}},
# Timeout in seconds
timeout=3,
)
run_status = response.json()["data"]["runOrError"]["status"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ def get_migration_dict_for_dag(
]
}

@staticmethod
def from_dict(migration_dict: Dict[str, Any]) -> "AirflowMigrationState":
dags = {}
for dag_id, dag_dict in migration_dict.items():
dags[dag_id] = DagMigrationState.from_dict(dag_dict)
return AirflowMigrationState(dags=dags)


class MigrationStateParsingError(Exception):
pass
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit c2ccdeb

Please sign in to comment.