Skip to content

Commit

Permalink
Ruff ify (#139)
Browse files Browse the repository at this point in the history
  • Loading branch information
cnolanminich authored Jan 3, 2025
1 parent 5847de8 commit bc1dc3e
Show file tree
Hide file tree
Showing 41 changed files with 489 additions and 393 deletions.
8 changes: 7 additions & 1 deletion .github/workflows/deploy-dagster-cloud.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ jobs:
if: steps.prerun.outputs.result != 'skip'
with:
ref: ${{ github.head_ref }}

- name: Get changed files
id: changed-files
uses: tj-actions/changed-files@v45
Expand Down Expand Up @@ -107,7 +108,12 @@ jobs:
run: |
uv venv
source .venv/bin/activate
uv pip install dagster-dbt dagster-cloud dbt-core dbt-duckdb dbt-snowflake --upgrade;
uv pip install dagster-dbt dagster-cloud dbt-core dbt-duckdb "dbt-snowflake<=1.8.4" --upgrade;
- name: Run Ruff
uses: astral-sh/ruff-action@v3
with:
args: check --fix --output-format=github .

- name: Validate configuration
id: ci-validate
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ update_packages:
uv lock --upgrade --directory hooli-data-ingest;
uv lock --upgrade --directory hooli-bi;

ruff:
-ruff check --fix .
ruff format .

# ensure that DAGSTER_GIT_REPO_DIR is set to the path of the dagster repo
# see https://www.notion.so/dagster/Local-Dev-Setup-e58aba352f704dcc88a8dc44cb1ce7fc for more details
Expand Down
2 changes: 1 addition & 1 deletion hooli-bi/hooli_bi/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from hooli_bi.definitions import defs as defs
from hooli_bi.definitions import defs as defs
53 changes: 31 additions & 22 deletions hooli-bi/hooli_bi/powerbi_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dagster_powerbi.translator import PowerBIContentData
from hooli_bi.powerbi_workspace import power_bi_workspace


class MyCustomPowerBITranslator(DagsterPowerBITranslator):
def get_report_spec(self, data: PowerBIContentData) -> AssetSpec:
spec = super().get_report_spec(data)
Expand All @@ -16,44 +17,52 @@ def get_report_spec(self, data: PowerBIContentData) -> AssetSpec:
description=f"Report link: https://app.powerbi.com/groups/{EnvVar("AZURE_POWERBI_WORKSPACE_ID").get_value()}/reports/{data.properties["id"]}",
group_name="BI",
)
return merge_attributes(specs_replaced, tags={"core_kpis":""})
return merge_attributes(specs_replaced, tags={"core_kpis": ""})

def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec:
def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec:
spec = super().get_semantic_model_spec(data)
spec_replaced = replace_attributes(
spec,
description=f"Semantic model link: https://app.powerbi.com/groups/{EnvVar("AZURE_POWERBI_WORKSPACE_ID").get_value()}/datasets/{data.properties["id"]}/details",
group_name="BI",
deps=[AssetKey(path=[dep.asset_key.path[1].upper(), dep.asset_key.path[2]]) for dep in spec.deps],
deps=[
AssetKey(path=[dep.asset_key.path[1].upper(), dep.asset_key.path[2]])
for dep in spec.deps
],
)
return merge_attributes(spec_replaced,
metadata={"dagster/column_schema": TableSchema(
columns=[TableColumn(
name=col["name"],
type=col["dataType"],
tags={"PII":""} if col["name"] == "USER_ID" else None)
for col in data.properties["tables"][0]["columns"]])},
tags={"core_kpis":""},
return merge_attributes(
spec_replaced,
metadata={
"dagster/column_schema": TableSchema(
columns=[
TableColumn(
name=col["name"],
type=col["dataType"],
tags={"PII": ""} if col["name"] == "USER_ID" else None,
)
for col in data.properties["tables"][0]["columns"]
]
)
},
tags={"core_kpis": ""},
)

def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec:
spec = super().get_dashboard_spec(data)
return replace_attributes(
spec,
group_name="BI"
)

return replace_attributes(spec, group_name="BI")

def get_data_source_spec(self, data: PowerBIContentData) -> AssetSpec:
spec = super().get_data_source_spec(data)
return replace_attributes(
spec,
group_name="BI"
)
return replace_attributes(spec, group_name="BI")


powerbi_assets = [
build_semantic_model_refresh_asset_definition(resource_key="power_bi", spec=spec)
if spec.tags.get("dagster-powerbi/asset_type") == "semantic_model"
else spec
for spec in load_powerbi_asset_specs(
power_bi_workspace, dagster_powerbi_translator=MyCustomPowerBITranslator, use_workspace_scan=True)
]
power_bi_workspace,
dagster_powerbi_translator=MyCustomPowerBITranslator,
use_workspace_scan=True,
)
]
2 changes: 1 addition & 1 deletion hooli-bi/hooli_bi/powerbi_workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
tenant_id=EnvVar("AZURE_POWERBI_TENANT_ID"),
),
workspace_id=EnvVar("AZURE_POWERBI_WORKSPACE_ID"),
)
)
2 changes: 1 addition & 1 deletion hooli-data-ingest/hooli_data_ingest/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from hooli_data_ingest.definitions import defs as defs
from hooli_data_ingest.definitions import defs as defs
35 changes: 18 additions & 17 deletions hooli-data-ingest/hooli_data_ingest/assets/sling.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,34 @@
from dagster_embedded_elt.sling import (
sling_assets,
SlingResource,
sling_assets,
SlingResource,
)
from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator

from hooli_data_ingest.resources import replication_config


class CustomSlingTranslator(DagsterSlingTranslator):
def __init__(self, target_prefix="RAW_DATA"):
def __init__(self, target_prefix="RAW_DATA"):
super().__init__(target_prefix=target_prefix)
self.replication_config = replication_config
def get_group_name(self, stream_definition):
return "RAW_DATA"
def get_kinds(self, stream_definition):
storage_kind = self.replication_config.get("target", "DUCKDB")
if storage_kind.startswith("SNOWFLAKE"):

def get_group_name(self, stream_definition):
return "RAW_DATA"

def get_kinds(self, stream_definition):
storage_kind = self.replication_config.get("target", "DUCKDB")
if storage_kind.startswith("SNOWFLAKE"):
storage_kind = "SNOWFLAKE"
return {"sling",storage_kind}

return {"sling", storage_kind}


@sling_assets(
replication_config=replication_config,
dagster_sling_translator=CustomSlingTranslator(),
replication_config=replication_config,
dagster_sling_translator=CustomSlingTranslator(),
)
def my_sling_assets(context, sling: SlingResource):
yield from sling.replicate(context=context).fetch_column_metadata().fetch_row_count()
for row in sling.stream_raw_logs():
context.log.info(row)
yield from (
sling.replicate(context=context).fetch_column_metadata().fetch_row_count()
)
for row in sling.stream_raw_logs():
context.log.info(row)
23 changes: 10 additions & 13 deletions hooli-data-ingest/hooli_data_ingest/definitions.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from pathlib import Path

from dagster import (
AnchorBasedFilePathMapping,
Definitions,
with_source_code_references,
AnchorBasedFilePathMapping,
Definitions,
with_source_code_references,
)
from dagster._core.definitions.metadata import with_source_code_references
from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud

from hooli_data_ingest.assets.sling import my_sling_assets
Expand All @@ -15,16 +14,14 @@


defs = Definitions(
assets=link_code_references_to_git_if_cloud(
with_source_code_references([my_sling_assets]),
file_path_mapping=AnchorBasedFilePathMapping(
assets=link_code_references_to_git_if_cloud(
with_source_code_references([my_sling_assets]),
file_path_mapping=AnchorBasedFilePathMapping(
local_file_anchor=Path(__file__),
file_anchor_path_in_repository="hooli-data-ingest/hooli_data_ingest/definitions.py",
),
),
schedules=[daily_sling_assets],
jobs=[daily_sling_job],
resources={
"sling": sling_resource
},
),
schedules=[daily_sling_assets],
jobs=[daily_sling_job],
resources={"sling": sling_resource},
)
4 changes: 2 additions & 2 deletions hooli-data-ingest/hooli_data_ingest/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@


daily_sling_job = define_asset_job(
name="daily_sling_job",
selection=raw_location_by_day,
name="daily_sling_job",
selection=raw_location_by_day,
)
96 changes: 44 additions & 52 deletions hooli-data-ingest/hooli_data_ingest/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@

from dagster import EnvVar
from dagster_embedded_elt.sling import (
SlingResource,
SlingConnectionResource,
SlingResource,
SlingConnectionResource,
)


def get_env():
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1":
return "BRANCH"
if os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod":
return "PROD"
return "LOCAL"
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1":
return "BRANCH"
if os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod":
return "PROD"
return "LOCAL"


# Paths for local dev
current_file_path = Path(__file__)
Expand All @@ -23,25 +25,19 @@ def get_env():


if get_env() == "LOCAL":
replication_config = {
"source": "LOCAL",
"target": "DUCKDB",
"defaults": {
"mode": "full-refresh",
"object": "{stream_file_folder}.{stream_file_name}",
"source_options": {
"format": "csv"
}
},
"streams": {
LOCATIONS_CSV_PATH: {
"object": "locations"
}
}
}
replication_config = {
"source": "LOCAL",
"target": "DUCKDB",
"defaults": {
"mode": "full-refresh",
"object": "{stream_file_folder}.{stream_file_name}",
"source_options": {"format": "csv"},
},
"streams": {LOCATIONS_CSV_PATH: {"object": "locations"}},
}

sling_resource = SlingResource(
connections=[
sling_resource = SlingResource(
connections=[
SlingConnectionResource(
name="Local",
type="local",
Expand All @@ -55,7 +51,7 @@ def get_env():
schema="raw_data",
),
]
)
)

if get_env() != "LOCAL":
replication_config = {
Expand All @@ -64,36 +60,32 @@ def get_env():
"defaults": {
"mode": "full-refresh",
"object": "{stream_file_folder}.{stream_file_name}",
"source_options": {
"format": "csv"
}
"source_options": {"format": "csv"},
},
"streams": {
"s3://hooli-demo/embedded-elt/locations.csv": {
"object": "locations"
}
}
"s3://hooli-demo/embedded-elt/locations.csv": {"object": "locations"}
},
}

sling_resource = SlingResource(
connections=[
SlingConnectionResource(
name="S3",
type="s3",
bucket=EnvVar("AWS_S3_BUCKET"),
region=EnvVar("AWS_REGION"),
access_key_id=EnvVar("AWS_ACCESS_KEY_ID"),
secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"),
),
SlingConnectionResource(
name="SNOWFLAKE_PROD" if get_env() == "PROD" else "SNOWFLAKE_BRANCH",
type="snowflake",
host=EnvVar("SNOWFLAKE_HOST"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
role=EnvVar("SNOWFLAKE_ROLE"),
database="DEMO_DB2" if get_env() == "PROD" else "DEMO_DB2_BRANCH",
schema="RAW_DATA",
)
SlingConnectionResource(
name="S3",
type="s3",
bucket=EnvVar("AWS_S3_BUCKET"),
region=EnvVar("AWS_REGION"),
access_key_id=EnvVar("AWS_ACCESS_KEY_ID"),
secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"),
),
SlingConnectionResource(
name="SNOWFLAKE_PROD" if get_env() == "PROD" else "SNOWFLAKE_BRANCH",
type="snowflake",
host=EnvVar("SNOWFLAKE_HOST"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
role=EnvVar("SNOWFLAKE_ROLE"),
database="DEMO_DB2" if get_env() == "PROD" else "DEMO_DB2_BRANCH",
schema="RAW_DATA",
),
]
)
)
4 changes: 2 additions & 2 deletions hooli-data-ingest/hooli_data_ingest/schedules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@


daily_sling_assets = ScheduleDefinition(
job=daily_sling_job,
cron_schedule="0 0 * * *", # every day at midnight
job=daily_sling_job,
cron_schedule="0 0 * * *", # every day at midnight
)
Loading

0 comments on commit bc1dc3e

Please sign in to comment.