From e4b252f510f1dd80ea68bc4603ef7f7353ebd5e0 Mon Sep 17 00:00:00 2001 From: izzy <60406698+izzye84@users.noreply.github.com> Date: Wed, 27 Sep 2023 14:33:11 -0600 Subject: [PATCH] Add asset checks and project cleanup (#41) * ignore local duckdb and dbt logs * moved sensor related files to the sensors directory * moved analytics_job and predict_job out of definitions.py * moved analytics_schedule to this directory for better organization * added check_avg_orders * added check_users asset check * added check_country_stats asset check * moved jobs, resources, schedules, and sensor logic to their respective directories * moved resources from definitions.py to resources directory --- .gitignore | 6 +- hooli_basics/definitions.py | 13 +- hooli_data_eng/assets/marketing/__init__.py | 24 ++- hooli_data_eng/assets/raw_data/__init__.py | 29 ++- hooli_data_eng/definitions.py | 190 +----------------- hooli_data_eng/jobs/__init__.py | 25 +++ hooli_data_eng/resources/__init__.py | 126 ++++++++++++ hooli_data_eng/schedules/__init__.py | 6 + hooli_data_eng/sensors/__init__.py | 17 ++ .../delayed_asset_alerts.py | 0 .../{jobs => sensors}/test_sensor_util.py | 0 .../{jobs => sensors}/touch_s3_file.py | 0 hooli_data_eng/{jobs => sensors}/watch_s3.py | 0 13 files changed, 250 insertions(+), 186 deletions(-) create mode 100644 hooli_data_eng/resources/__init__.py create mode 100644 hooli_data_eng/schedules/__init__.py create mode 100644 hooli_data_eng/sensors/__init__.py rename hooli_data_eng/{assets => sensors}/delayed_asset_alerts.py (100%) rename hooli_data_eng/{jobs => sensors}/test_sensor_util.py (100%) rename hooli_data_eng/{jobs => sensors}/touch_s3_file.py (100%) rename hooli_data_eng/{jobs => sensors}/watch_s3.py (100%) diff --git a/.gitignore b/.gitignore index 8fec5f89..638d3ddf 100644 --- a/.gitignore +++ b/.gitignore @@ -168,4 +168,8 @@ cython_debug/ # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ -tmp*/ \ No newline at end of file +tmp*/ + +# dbt +dbt_project/example.duckdb +dbt_project/logs diff --git a/hooli_basics/definitions.py b/hooli_basics/definitions.py index 3805f3d2..d5699e24 100644 --- a/hooli_basics/definitions.py +++ b/hooli_basics/definitions.py @@ -1,4 +1,4 @@ -from dagster import asset +from dagster import asset, asset_check, AssetCheckResult, Definitions from pandas import DataFrame, read_html, get_dummies, to_numeric from sklearn.linear_model import LinearRegression as Regression @@ -9,6 +9,12 @@ def country_stats() -> DataFrame: df["pop_change"] = ((to_numeric(df["pop_2023"]) / to_numeric(df["pop_2022"])) - 1)*100 return df +@asset_check( + asset=country_stats +) +def check_country_stats(country_stats): + return AssetCheckResult(success=True) + @asset def change_model(country_stats: DataFrame) -> Regression: data = country_stats.dropna(subset=["pop_change"]) @@ -21,3 +27,8 @@ def continent_stats(country_stats: DataFrame, change_model: Regression) -> DataF result["pop_change_factor"] = change_model.coef_ return result +defs = Definitions( + assets=[country_stats, continent_stats, change_model], + asset_checks=[check_country_stats] +) + diff --git a/hooli_data_eng/assets/marketing/__init__.py b/hooli_data_eng/assets/marketing/__init__.py index f907d1be..e46c951d 100644 --- a/hooli_data_eng/assets/marketing/__init__.py +++ b/hooli_data_eng/assets/marketing/__init__.py @@ -1,4 +1,14 @@ -from dagster import asset, FreshnessPolicy, AssetIn, DynamicPartitionsDefinition, MetadataValue, AutoMaterializePolicy +from dagster import ( + asset, + FreshnessPolicy, + AssetIn, + DynamicPartitionsDefinition, + MetadataValue, + AutoMaterializePolicy, + AssetExecutionContext, + AssetCheckResult, + asset_check +) import pandas as pd # These assets take data from a SQL table managed by @@ -13,13 +23,23 @@ op_tags={"owner": "bi@hooli.com"}, ins={"company_perf": AssetIn(key_prefix=["ANALYTICS"])} ) -def avg_orders(company_perf: pd.DataFrame) -> pd.DataFrame: +def avg_orders(context: AssetExecutionContext, company_perf: pd.DataFrame) -> pd.DataFrame: """ Computes avg order KPI, must be updated regularly for exec dashboard """ return pd.DataFrame({ "avg_order": company_perf['total_revenue'] / company_perf['n_orders'] }) +@asset_check( + description="check that avg orders are expected", + asset=avg_orders +) +def check_avg_orders(context, avg_orders: pd.DataFrame): + avg = avg_orders['avg_order'][0] + return AssetCheckResult( + success= True if (avg < 50) else False, + metadata={"actual average": avg, "threshold": 50} + ) @asset( key_prefix="MARKETING", diff --git a/hooli_data_eng/assets/raw_data/__init__.py b/hooli_data_eng/assets/raw_data/__init__.py index a7632b96..9f59c79b 100644 --- a/hooli_data_eng/assets/raw_data/__init__.py +++ b/hooli_data_eng/assets/raw_data/__init__.py @@ -1,6 +1,19 @@ +from datetime import timedelta + +from dagster import ( + asset, + asset_check, + AssetCheckSeverity, + AssetCheckResult, + AssetKey, + Backoff, + DailyPartitionsDefinition, + Jitter, + RetryPolicy, +) import pandas as pd -from dagster import asset, RetryPolicy, Backoff, Jitter, DailyPartitionsDefinition, OpExecutionContext, build_op_context, build_resources -from datetime import datetime, timedelta + + from hooli_data_eng.resources.api import RawDataAPI @@ -37,6 +50,18 @@ def users(context, api: RawDataAPI) -> pd.DataFrame: return pd.concat(all_users) +@asset_check( + asset=AssetKey(["RAW_DATA", "users"]), + description="check that users are from expected companies", + #severity=AssetCheckSeverity.WARN, +) +def check_users(context, users: pd.DataFrame): + unique_companies = pd.unique(users['company']).tolist() + return AssetCheckResult( + success= (unique_companies == ["FoodCo", "ShopMart", "SportTime", "FamilyLtd"]), + metadata={"companies": unique_companies}, + severity=AssetCheckSeverity.WARN + ) @asset( compute_kind="api", diff --git a/hooli_data_eng/definitions.py b/hooli_data_eng/definitions.py index a5582f12..39822d2c 100644 --- a/hooli_data_eng/definitions.py +++ b/hooli_data_eng/definitions.py @@ -1,43 +1,20 @@ -import os - -from dagster_pyspark import pyspark_resource - -from hooli_data_eng.assets import forecasting, raw_data, marketing, dbt_assets -from hooli_data_eng.assets.delayed_asset_alerts import asset_delay_alert_sensor -from hooli_data_eng.resources.sensor_file_managers import s3FileSystem, LocalFileSystem -from hooli_data_eng.resources.sensor_smtp import LocalEmailAlert, SESEmailAlert -from hooli_data_eng.resources.databricks import db_step_launcher -from hooli_data_eng.resources.api import RawDataAPI -from hooli_data_eng.jobs.watch_s3 import watch_s3_sensor -from dagster_duckdb_pandas import DuckDBPandasIOManager -from dagster_dbt import DbtCliClientResource -from hooli_data_eng.resources.dbt import DbtCli2 as DbtCli -from dagster_snowflake_pandas import SnowflakePandasIOManager -#from hooli_data_eng.resources.warehouse import MySnowflakeIOManager as SnowflakePandasIOManager -from dagster_aws.s3 import ConfigurablePickledObjectS3IOManager, S3Resource -from dagstermill import ConfigurableLocalOutputNotebookIOManager - from dagster import ( - build_schedule_from_partitioned_job, - AssetSelection, Definitions, - EnvVar, - EventLogEntry, - RunRequest, - SensorEvaluationContext, - ResourceDefinition, - asset_sensor, - define_asset_job, - FilesystemIOManager, load_assets_from_modules, load_assets_from_package_module, - AssetKey, - AutoMaterializePolicy, multiprocess_executor, ) -from dagster._utils import file_relative_path +from hooli_data_eng.assets import forecasting, raw_data, marketing, dbt_assets +from hooli_data_eng.assets.marketing import check_avg_orders +from hooli_data_eng.assets.raw_data import check_users +from hooli_data_eng.jobs import analytics_job, predict_job +from hooli_data_eng.resources import get_env, resource_def +from hooli_data_eng.schedules import analytics_schedule +from hooli_data_eng.sensors import orders_sensor +from hooli_data_eng.sensors.delayed_asset_alerts import asset_delay_alert_sensor +from hooli_data_eng.sensors.watch_s3 import watch_s3_sensor # --------------------------------------------------- # Assets @@ -62,10 +39,6 @@ # specifies what databases to targets, and locally will # execute against a DuckDB - -DBT_PROJECT_DIR = file_relative_path(__file__, "../dbt_project") -DBT_PROFILES_DIR = file_relative_path(__file__, "../dbt_project/config") - dbt_assets = load_assets_from_modules([dbt_assets]) # Our final set of assets represent Python code that @@ -77,150 +50,6 @@ marketing_assets = load_assets_from_package_module(marketing, group_name="MARKETING") -# --------------------------------------------------- -# Resources - -# Resources represent external systems and, and specifically IO Managers -# tell dagster where our assets should be materialized. In dagster -# resources are separate from logical code to make it possible -# to develop locally, run tests, and run integration tests -# -# This project is designed for everything to run locally -# using the file system and DuckDB as the primary development resources -# -# PRs use a "branch" environment that mirrors production with -# staging Snowflake and S3 resources -# -# The production deployment on Dagster Cloud uses production Snowflake -# and S3 resources - - -def get_env(): - if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1": - return "BRANCH" - if os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod": - return "PROD" - return "LOCAL" - - -# Similar to having different dbt targets, here we create the resource -# configuration by environment - -resource_def = { - "LOCAL": { - "io_manager": DuckDBPandasIOManager( - database=os.path.join(DBT_PROJECT_DIR, "example.duckdb") - ), - "model_io_manager": FilesystemIOManager(), - "output_notebook_io_manager": ConfigurableLocalOutputNotebookIOManager(), - "api": RawDataAPI.configure_at_launch(), - "s3": ResourceDefinition.none_resource(), - "dbt": DbtCliClientResource( - project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="LOCAL" - ), - "dbt2": DbtCli(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="LOCAL"), - "pyspark": pyspark_resource, - "step_launcher": ResourceDefinition.none_resource(), - "monitor_fs": LocalFileSystem(base_dir=file_relative_path(__file__, ".")), - "email": LocalEmailAlert( - smtp_email_to=["data@awesome.com"], smtp_email_from="no-reply@awesome.com" - ), - }, - "BRANCH": { - "io_manager": SnowflakePandasIOManager( - database="DEMO_DB2_BRANCH", - account=EnvVar("SNOWFLAKE_ACCOUNT"), - user=EnvVar("SNOWFLAKE_USER"), - password=EnvVar("SNOWFLAKE_PASSWORD"), - warehouse="TINY_WAREHOUSE", - ), - "model_io_manager": ConfigurablePickledObjectS3IOManager( - s3_bucket="hooli-demo-branch", - s3_resource=S3Resource(region_name="us-west-2"), - ), - "output_notebook_io_manager": ConfigurableLocalOutputNotebookIOManager(), - "api": RawDataAPI.configure_at_launch(), - "dbt": DbtCliClientResource( - project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="BRANCH" - ), - "dbt2": DbtCli(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="BRANCH"), - "pyspark": pyspark_resource, - "step_launcher": db_step_launcher, - "monitor_fs": s3FileSystem( - region_name="us-west-2", s3_bucket="hooli-demo-branch" - ), - "email": ResourceDefinition.none_resource(), - }, - "PROD": { - "io_manager": SnowflakePandasIOManager( - database="DEMO_DB2", - account=EnvVar("SNOWFLAKE_ACCOUNT"), - user=EnvVar("SNOWFLAKE_USER"), - password=EnvVar("SNOWFLAKE_PASSWORD"), - warehouse="TINY_WAREHOUSE", - ), - "model_io_manager": ConfigurablePickledObjectS3IOManager( - s3_bucket="hooli-demo-branch", - s3_resource=S3Resource(region_name="us-west-2"), - ), - "output_notebook_io_manager": ConfigurableLocalOutputNotebookIOManager(), - "api": RawDataAPI.configure_at_launch(), - "dbt": DbtCliClientResource( - project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="PROD" - ), - "dbt2": DbtCli(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="PROD"), - "pyspark": pyspark_resource, - "step_launcher": db_step_launcher, - "monitor_fs": s3FileSystem(region_name="us-west-2", s3_bucket="hooli-demo"), - "email": SESEmailAlert( - smtp_host="email-smtp.us-west-2.amazonaws.com", - smtp_email_from="lopp@elementl.com", - smtp_email_to=["lopp@elementl.com"], - smtp_username=EnvVar("SMTP_USERNAME"), - smtp_password=EnvVar("SMTP_PASSWORD"), - ), - }, -} - -# --------------------------------------------------- -# Jobs and Sensors - -# With assets defined we have everything to run Dagster -# ourselves if we wanted to manually create assets. -# Most of the time you will want to automate asset creation. -# In dagster, jobs allow you to update all or some assets. -# Jobs can be run on a schedule, or in response to an external -# event using a sensor. - -# This job updates all of the assets upstream of "orders_augmented", -# which is an asset representing a model in dbt -analytics_job = define_asset_job( - name="refresh_analytics_model_job", - selection=AssetSelection.keys(["ANALYTICS", "orders_augmented"]).upstream(), - tags={"dagster/max_retries": "1"}, - # config = {"execution": {"config": {"multiprocess": {"max_concurrent": 1}}}} -) - -# This schedule tells dagster to run the analytics_job daily -analytics_schedule = build_schedule_from_partitioned_job(analytics_job) - -# This job selects the predicted_orders asset defined in -# assets/forecasting/__init__.py -predict_job = define_asset_job( - "predict_job", - selection=AssetSelection.keys(["FORECASTING", "predicted_orders"]), - tags={"alert_team": "ml"}, -) - - -# This sensor listens for changes to the orders_augmented asset which -# represents a dbt model. When the table managed by dbt is updated, -# this sensor will trigger the predict_job above, ensuring that anytime -# new order data is produced the forecast is updated -@asset_sensor(asset_key=AssetKey(["ANALYTICS", "orders_augmented"]), job=predict_job) -def orders_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry): - yield RunRequest(run_key=context.cursor) - # --------------------------------------------------- # Definitions @@ -232,6 +61,7 @@ def orders_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry): {"max_concurrent": 3} ), assets=[*dbt_assets, *raw_data_assets, *forecasting_assets, *marketing_assets], + asset_checks=[check_users, check_avg_orders], resources=resource_def[get_env()], schedules=[analytics_schedule], sensors=[ diff --git a/hooli_data_eng/jobs/__init__.py b/hooli_data_eng/jobs/__init__.py index e69de29b..52051272 100644 --- a/hooli_data_eng/jobs/__init__.py +++ b/hooli_data_eng/jobs/__init__.py @@ -0,0 +1,25 @@ +from dagster import AssetSelection, define_asset_job + +# With assets defined we have everything to run Dagster +# ourselves if we wanted to manually create assets. +# Most of the time you will want to automate asset creation. +# In dagster, jobs allow you to update all or some assets. +# Jobs can be run on a schedule, or in response to an external +# event using a sensor. + +# This job updates all of the assets upstream of "orders_augmented", +# which is an asset representing a model in dbt +analytics_job = define_asset_job( + name="refresh_analytics_model_job", + selection=AssetSelection.keys(["ANALYTICS", "orders_augmented"]).upstream(), + tags={"dagster/max_retries": "1"}, + # config = {"execution": {"config": {"multiprocess": {"max_concurrent": 1}}}} +) + +# This job selects the predicted_orders asset defined in +# assets/forecasting/__init__.py +predict_job = define_asset_job( + "predict_job", + selection=AssetSelection.keys(["FORECASTING", "predicted_orders"]), + tags={"alert_team": "ml"}, +) \ No newline at end of file diff --git a/hooli_data_eng/resources/__init__.py b/hooli_data_eng/resources/__init__.py new file mode 100644 index 00000000..647cd911 --- /dev/null +++ b/hooli_data_eng/resources/__init__.py @@ -0,0 +1,126 @@ +import os + +from dagster import EnvVar, FilesystemIOManager, ResourceDefinition +from dagster._utils import file_relative_path +from dagster_aws.s3 import ConfigurablePickledObjectS3IOManager, S3Resource +from dagster_dbt import DbtCliClientResource +from dagster_duckdb_pandas import DuckDBPandasIOManager +from dagster_pyspark import pyspark_resource +from dagster_snowflake_pandas import SnowflakePandasIOManager +from dagstermill import ConfigurableLocalOutputNotebookIOManager + +from hooli_data_eng.resources.api import RawDataAPI +from hooli_data_eng.resources.databricks import db_step_launcher +from hooli_data_eng.resources.dbt import DbtCli2 as DbtCli +#from hooli_data_eng.resources.warehouse import MySnowflakeIOManager as SnowflakePandasIOManager +from hooli_data_eng.resources.sensor_file_managers import s3FileSystem, LocalFileSystem +from hooli_data_eng.resources.sensor_smtp import LocalEmailAlert, SESEmailAlert + + +# Resources represent external systems and, and specifically IO Managers +# tell dagster where our assets should be materialized. In dagster +# resources are separate from logical code to make it possible +# to develop locally, run tests, and run integration tests +# +# This project is designed for everything to run locally +# using the file system and DuckDB as the primary development resources +# +# PRs use a "branch" environment that mirrors production with +# staging Snowflake and S3 resources +# +# The production deployment on Dagster Cloud uses production Snowflake +# and S3 resources + +def get_env(): + if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1": + return "BRANCH" + if os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod": + return "PROD" + return "LOCAL" + + +# The dbt file dbt_project/config/profiles.yaml +# specifies what databases to targets, and locally will +# execute against a DuckDB + +DBT_PROJECT_DIR = file_relative_path(__file__, "../../dbt_project") +DBT_PROFILES_DIR = file_relative_path(__file__, "../../dbt_project/config") + +# Similar to having different dbt targets, here we create the resource +# configuration by environment + +resource_def = { + "LOCAL": { + "io_manager": DuckDBPandasIOManager( + database=os.path.join(DBT_PROJECT_DIR, "example.duckdb") + ), + "model_io_manager": FilesystemIOManager(), + "output_notebook_io_manager": ConfigurableLocalOutputNotebookIOManager(), + "api": RawDataAPI.configure_at_launch(), + "s3": ResourceDefinition.none_resource(), + "dbt": DbtCliClientResource( + project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="LOCAL" + ), + "dbt2": DbtCli(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="LOCAL"), + "pyspark": pyspark_resource, + "step_launcher": ResourceDefinition.none_resource(), + "monitor_fs": LocalFileSystem(base_dir=file_relative_path(__file__, ".")), + "email": LocalEmailAlert( + smtp_email_to=["data@awesome.com"], smtp_email_from="no-reply@awesome.com" + ), + }, + "BRANCH": { + "io_manager": SnowflakePandasIOManager( + database="DEMO_DB2_BRANCH", + account=EnvVar("SNOWFLAKE_ACCOUNT"), + user=EnvVar("SNOWFLAKE_USER"), + password=EnvVar("SNOWFLAKE_PASSWORD"), + warehouse="TINY_WAREHOUSE", + ), + "model_io_manager": ConfigurablePickledObjectS3IOManager( + s3_bucket="hooli-demo-branch", + s3_resource=S3Resource(region_name="us-west-2"), + ), + "output_notebook_io_manager": ConfigurableLocalOutputNotebookIOManager(), + "api": RawDataAPI.configure_at_launch(), + "dbt": DbtCliClientResource( + project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="BRANCH" + ), + "dbt2": DbtCli(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="BRANCH"), + "pyspark": pyspark_resource, + "step_launcher": db_step_launcher, + "monitor_fs": s3FileSystem( + region_name="us-west-2", s3_bucket="hooli-demo-branch" + ), + "email": ResourceDefinition.none_resource(), + }, + "PROD": { + "io_manager": SnowflakePandasIOManager( + database="DEMO_DB2", + account=EnvVar("SNOWFLAKE_ACCOUNT"), + user=EnvVar("SNOWFLAKE_USER"), + password=EnvVar("SNOWFLAKE_PASSWORD"), + warehouse="TINY_WAREHOUSE", + ), + "model_io_manager": ConfigurablePickledObjectS3IOManager( + s3_bucket="hooli-demo-branch", + s3_resource=S3Resource(region_name="us-west-2"), + ), + "output_notebook_io_manager": ConfigurableLocalOutputNotebookIOManager(), + "api": RawDataAPI.configure_at_launch(), + "dbt": DbtCliClientResource( + project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="PROD" + ), + "dbt2": DbtCli(project_dir=DBT_PROJECT_DIR, profiles_dir=DBT_PROFILES_DIR, target="PROD"), + "pyspark": pyspark_resource, + "step_launcher": db_step_launcher, + "monitor_fs": s3FileSystem(region_name="us-west-2", s3_bucket="hooli-demo"), + "email": SESEmailAlert( + smtp_host="email-smtp.us-west-2.amazonaws.com", + smtp_email_from="lopp@elementl.com", + smtp_email_to=["lopp@elementl.com"], + smtp_username=EnvVar("SMTP_USERNAME"), + smtp_password=EnvVar("SMTP_PASSWORD"), + ), + }, +} \ No newline at end of file diff --git a/hooli_data_eng/schedules/__init__.py b/hooli_data_eng/schedules/__init__.py new file mode 100644 index 00000000..1d19d0b0 --- /dev/null +++ b/hooli_data_eng/schedules/__init__.py @@ -0,0 +1,6 @@ +from dagster import build_schedule_from_partitioned_job + +from hooli_data_eng.jobs import analytics_job + +# This schedule tells dagster to run the analytics_job daily +analytics_schedule = build_schedule_from_partitioned_job(analytics_job) \ No newline at end of file diff --git a/hooli_data_eng/sensors/__init__.py b/hooli_data_eng/sensors/__init__.py new file mode 100644 index 00000000..fcd19e3d --- /dev/null +++ b/hooli_data_eng/sensors/__init__.py @@ -0,0 +1,17 @@ +from dagster import ( + asset_sensor, + AssetKey, + EventLogEntry, + RunRequest, + SensorEvaluationContext, +) + +from hooli_data_eng.jobs import predict_job + +# This sensor listens for changes to the orders_augmented asset which +# represents a dbt model. When the table managed by dbt is updated, +# this sensor will trigger the predict_job above, ensuring that anytime +# new order data is produced the forecast is updated +@asset_sensor(asset_key=AssetKey(["ANALYTICS", "orders_augmented"]), job=predict_job) +def orders_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry): + yield RunRequest(run_key=context.cursor) \ No newline at end of file diff --git a/hooli_data_eng/assets/delayed_asset_alerts.py b/hooli_data_eng/sensors/delayed_asset_alerts.py similarity index 100% rename from hooli_data_eng/assets/delayed_asset_alerts.py rename to hooli_data_eng/sensors/delayed_asset_alerts.py diff --git a/hooli_data_eng/jobs/test_sensor_util.py b/hooli_data_eng/sensors/test_sensor_util.py similarity index 100% rename from hooli_data_eng/jobs/test_sensor_util.py rename to hooli_data_eng/sensors/test_sensor_util.py diff --git a/hooli_data_eng/jobs/touch_s3_file.py b/hooli_data_eng/sensors/touch_s3_file.py similarity index 100% rename from hooli_data_eng/jobs/touch_s3_file.py rename to hooli_data_eng/sensors/touch_s3_file.py diff --git a/hooli_data_eng/jobs/watch_s3.py b/hooli_data_eng/sensors/watch_s3.py similarity index 100% rename from hooli_data_eng/jobs/watch_s3.py rename to hooli_data_eng/sensors/watch_s3.py