From 37305bc5939d317f975eb874e2987d025605b2d9 Mon Sep 17 00:00:00 2001 From: Jules Huisman Date: Thu, 22 Sep 2022 20:38:37 +0000 Subject: [PATCH] Added automatic job loading --- .devcontainer/devcontainer.json | 1 + .devcontainer/vscode.Dockerfile | 2 + .gitignore | 185 +++++++++++++++++++- .vscode/settings.json | 3 + dagster_meltano/jobs.py | 18 +- dagster_meltano/meltano_resource.py | 48 ++++- files_dagster_ext/orchestrate/repository.py | 18 +- meltano/meltano.yml | 12 +- meltano/tmpk03xco8s/history/runs.db | Bin 102400 -> 0 bytes meltano/tmpk03xco8s/history/runs/index.db | Bin 53248 -> 0 bytes meltano/tmpk03xco8s/schedules/schedules.db | Bin 65536 -> 0 bytes pyproject.toml | 2 - 12 files changed, 252 insertions(+), 37 deletions(-) create mode 100644 .vscode/settings.json delete mode 100644 meltano/tmpk03xco8s/history/runs.db delete mode 100644 meltano/tmpk03xco8s/history/runs/index.db delete mode 100644 meltano/tmpk03xco8s/schedules/schedules.db diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 4def534..760d251 100755 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -41,6 +41,7 @@ "python.linting.pylintEnabled": true, "python.linting.lintOnSave": true, "python.linting.pylintArgs": ["--rcfile", "${workspaceFolder}/.pylintrc"], + "python.analysis.extraPaths": ["/workspace"], "python.sortImports.args": ["--profile", "black"], "[python]": { "editor.codeActionsOnSave": { diff --git a/.devcontainer/vscode.Dockerfile b/.devcontainer/vscode.Dockerfile index 75a63f3..f3f2631 100755 --- a/.devcontainer/vscode.Dockerfile +++ b/.devcontainer/vscode.Dockerfile @@ -1,3 +1,5 @@ FROM quantiledevelopment/vscode-python:3.9 +ENV MELTANO_PROJECT_ROOT=/workspace/meltano + RUN pipx install meltano==2.6.0 \ No newline at end of file diff --git a/.gitignore b/.gitignore index c6075d3..0830ecb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,185 @@ **/.venv/ -**/.meltano/ \ No newline at end of file +**/.meltano/ + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# Local .terraform directories +**/.terraform/* + +# .tfstate files +*.tfstate +*.tfstate.* + +# Crash log files +crash.log + +# Exclude all .tfvars files, which are likely to contain sentitive data, such as +# password, private keys, and other secrets. These should not be part of version +# control as they are data points which are potentially sensitive and subject +# to change depending on the environment. +# +*.tfvars + +# Ignore override files as they are usually used to override resources locally and so +# are not checked in +override.tf +override.tf.json +*_override.tf +*_override.tf.json + +# Include override files you do wish to add to version control using negated pattern +# +# !example_override.tf + +# Include tfplan files to ignore the plan output of command: terraform plan -out=tfplan +# example: *tfplan* + +# Ignore CLI configuration files +.terraformrc +terraform.rc + +# Dask +**/dask-worker-space + +# Sqlite +*.sqlite + +# Modules +**/node_modules \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..7731c26 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "python.analysis.extraPaths": ["/workspace"] +} diff --git a/dagster_meltano/jobs.py b/dagster_meltano/jobs.py index a36963a..244e39c 100644 --- a/dagster_meltano/jobs.py +++ b/dagster_meltano/jobs.py @@ -21,16 +21,14 @@ def load_jobs_from_meltano_project( List[Union[JobDefinition, ScheduleDefinition]]: Returns a list of either Dagster JobDefinitions or ScheduleDefinitions """ meltano_resource = MeltanoResource(project_dir) + meltano_jobs = meltano_resource.jobs - list_result = subprocess.run( - [meltano_resource.meltano_bin, "schedule", "list", "--format=json"], - cwd=meltano_resource.project_dir, - stdout=subprocess.PIPE, - universal_newlines=True, - check=True, - ) - - schedule_export = json.loads(list_result.stdout) - logging.info(schedule_export) + return list(meltano_jobs) + logging.error("------------") + logging.error(meltano_resource.schedules) return [] + + +if __name__ == "__main__": + load_jobs_from_meltano_project("/workspace/meltano") diff --git a/dagster_meltano/meltano_resource.py b/dagster_meltano/meltano_resource.py index 7173c3a..9fe8af5 100644 --- a/dagster_meltano/meltano_resource.py +++ b/dagster_meltano/meltano_resource.py @@ -1,9 +1,53 @@ -from typing import Optional +import json +import subprocess +from typing import List, Optional -from dagster_meltano.utils import Singleton +from dagster import job, op + +from dagster_meltano.utils import Singleton, generate_dagster_name class MeltanoResource(metaclass=Singleton): def __init__(self, project_dir: str, meltano_bin: Optional[str] = "meltano"): self.project_dir = project_dir self.meltano_bin = meltano_bin + + # TODO: Refactor to different file + def run_cli(self, commands: List[str]): + return json.loads( + subprocess.run( + [self.meltano_bin] + commands, + cwd=self.project_dir, + stdout=subprocess.PIPE, + universal_newlines=True, + check=True, + ).stdout + ) + + @property + def jobs(self) -> List[dict]: + meltano_jobs = self.run_cli(["job", "list", "--format=json"]) + + for meltano_job in meltano_jobs["jobs"]: + + @op(name=f'run_{generate_dagster_name(meltano_job["job_name"])}') + def dagster_op(): + print( + subprocess.run( + [self.meltano_bin, "--help"], + cwd=self.project_dir, + stdout=subprocess.PIPE, + universal_newlines=True, + check=True, + ).stdout + ) + + @job(name=generate_dagster_name(meltano_job["job_name"])) + def dagster_job(): + dagster_op() + + yield dagster_job + + @property + def schedules(self) -> List[dict]: + return self.run_cli(["schedule", "list", "--format=json"]) diff --git a/files_dagster_ext/orchestrate/repository.py b/files_dagster_ext/orchestrate/repository.py index baabb8c..b978c24 100644 --- a/files_dagster_ext/orchestrate/repository.py +++ b/files_dagster_ext/orchestrate/repository.py @@ -6,31 +6,17 @@ from dagster import job, op, repository, with_resources from dagster._utils import file_relative_path from dagster_dbt import dbt_cli_resource - -# from dagster_meltano import ( -# load_assets_from_meltano_project, -# load_jobs_from_meltano_project, -# meltano_resource, -# ) from dagster_meltano.jobs import load_jobs_from_meltano_project MELTANO_PROJECT_ROOT = os.getenv("MELTANO_PROJECT_ROOT", os.getcwd()) MELTANO_BIN = os.getenv("MELTANO_BIN", "meltano") - -@op -def hello(): - print("world") - - -@job -def hello_job(): - hello() +meltano_jobs = load_jobs_from_meltano_project(MELTANO_PROJECT_ROOT) @repository def repository(): - return [load_jobs_from_meltano_project(MELTANO_PROJECT_ROOT)] + return [meltano_jobs] # MELTANO_PROJECT_DIR = file_relative_path( diff --git a/meltano/meltano.yml b/meltano/meltano.yml index 3fd2fe6..6d19c6f 100644 --- a/meltano/meltano.yml +++ b/meltano/meltano.yml @@ -5,7 +5,7 @@ plugins: utilities: - name: dagster namespace: dagster - pip_url: -e .. dbt-postgres + pip_url: -e /workspace dbt-postgres executable: dagster_invoker commands: initialize: @@ -17,11 +17,11 @@ plugins: # invoke: # executable: airflow_extension # args: invoke - # settings: - # - name: core.dags_folder - # label: DAGs Folder - # value: $MELTANO_PROJECT_ROOT/orchestrate/dags - # env: AIRFLOW__CORE__DAGS_FOLDER + settings: + - name: dagster_home + label: The home folder of Dagster + value: $MELTANO_PROJECT_ROOT/.meltano + env: DAGSTER_HOME # - name: core.plugins_folder # label: Plugins Folder # value: $MELTANO_PROJECT_ROOT/orchestrate/plugins diff --git a/meltano/tmpk03xco8s/history/runs.db b/meltano/tmpk03xco8s/history/runs.db deleted file mode 100644 index 0685e4d3951be666a605418e4c48d687d947484e..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 102400 zcmeI*+i%;}9l&u>k|j%ttvIgB6hjcMfEyF>O;TjXErv#M=+28{C$d~5KnH@7m{hQC z5v8QY`r>3L_PV#d4cPxM40{;{Y=6Vnf1ub)G3;@VJESCtZ3?TM?gzvJ~02s#P8>RJO9nqZ|1(ZlD_&l{@3_k>>sgmY<~7Hv*pM~v-fBIF!N+4 z99|3kJ@|d-H)<8*3SbifL5LYN1@zUz<&9uxM`8=E3{b@G0l1{10;Za^WJUKX6RpPPTqr>BTF1vr2@2^2` zoEDWQx&6o4+;ioT_FP%<)ThSPpWKMZPwz;9a;;>3WxcA1gH$)#t-7`V9{ z|9(VXzA3ex?B&u|db3^ATgFSP_bGR~w}1Fhd!~4EC`X4%FPF0N;i-A{;i~f5sI*P> zJiio`Z{C#N{=xz5 zcGrAl<&u+s?3qRgS;J_y$}JIOPKECct?(kx zMWt?_O80_xiI$L{2YCXKuirzP5BR zxmbF;R({ns#p<?TE+XH)Er~G^-w)-Q_^cI8ybiL zRMwWI^B=hYg?8msH;T@*(EZd~XARSvKG$vO!os`NTgpc9Zc|n|RXbIxKfe-@*OsL> zLmT%NyE?ML!A6|9TKzgcvU8SMF^d93Xn5PYGO{gkT)L$O+x2WztFfs3iJ*-R?VT73 zdnWfb9RDv3I|YWc>7=V~B%0v3Gg4+eqgG!n&<5UrxTr? z_`WZFq3llKiaR;^w)>_;Pk6XBiU#dz#DnY3jH70tP7uV^?b(R@X!)|qz)PSfy=$@a zdbTbV(3Hd>m zy{XvKrkESEk%)YU*9gin`XJyGHa>LRBCN=XDyXdl6OB&W7vZJQhBR<1A|!{>PMN-Y3&Dku>il-v#mxfy z`M>z3e_jY6fB*srAbP|I8@E^yStextQ9u5GHc1rQf8~TZ5lh91^)lP2^&~; z7XbtiKmY**5I_I{1Q0*~fiVR5{r@r0D1rb22q1s}0tg_000IagfWU+a@ce(m28`WB z009ILKmY**5I_I{1Q0-A3;~}1kAX%J1Q0*~0R#|0009ILKmY**CQN|q{|Or~b{7Ez z5I_I{1Q0*~0R#|00D&70tg_0 z00IagfB*srAbd sC(ZQsmi_#n>;EYkT)Ke(0tg_000IagfB*srAb>zuAQ4)Qd9VNf12GxV1ONa4 diff --git a/meltano/tmpk03xco8s/history/runs/index.db b/meltano/tmpk03xco8s/history/runs/index.db deleted file mode 100644 index 01be3ff5efe3f4f07f691752d8907b032028d5cb..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 53248 zcmeI*&vM#E9Kdl2Y%tiEv=Qb)rrlYl9r^MmfAFBV&fQ#HaTe&qk1&qUqDV{ z-<(Vzpbyf%K_4R12k4>a&h*?)(DNQgZ&-bvBXdP zVb^p-Nx7y(BFZ;HD2kGFpDFh_c_iG0g~Tj!G zrv6NwraoQ%d*!cWW%>6+Bk|v?jVVU}0R#|0009Kve}OY?Ii6WxSDz&u^I^v_J689h zeP}$fh9kS*3%y*~YSyZHP3YBccWNTEQsk0}^UaLjaaa6MZEk&EZRScvO*Hm&(P-`L zTo=j2)?VX4Z&vFKJsiOpJUSHjn)Tgk^MSZqdmwTb;%Q0ktCd)0e_d7VUfX&)dfai} z1{&tE)AwH+q3#%kP(kY1FcHsWv+6IKN@685cF_pxgESvb=%Fa7g}uBzN@x& zbdlR?HJi1DZpa4xK3+fK%iPJ%%$XE+(A$ckC>$9pWt76|9`X6Lj({& z009ILKmY**5I_I{1Q2+20q-Ked(oc<0tg_000IagfB*srAblZ`$H!z&4pRc^6XrofB6w6UH|9)e{VV|#76)D1Q0*~0R#|0 z009ILK;Yd9@b~}k)}Pi9KmY**5I_I{1Q0*~0R#|u8v@?-e>MHD;(qZ!009ILKmY** z5I_I{1Q0*~fjJRaRU>O?lUP|mj zdqAk0_6P8{aOH@&@+WZM!jU6FLgIOx#&Mh`n#QHFzE+$1dF_3a;*vwv&ZWK~<) z&HlFh?~=Lv>*CLu$LZhGUoQN)aFp6uC?~7Qf2MUoJpu?IfWY||ILWW1j4N047iq7t z-?kfVyR+{!t#9qV>-4&jVs^7uG3ymkH$UE~h{!>a%cM`6S>3~q_}r{*erDEkrD9%G zck80Mx3hCqWYU|v)kpQ3xmB%4d$10kH^qb6)_t?~L~K`{h+L?@d?x>4HfcP(qH9jK zWgofUwB@%!t8wV{{IV5!J60i5aq;AMDP`Qap}&0B57o7sy>6?~Keqh#wmVU=JQnW6 zVHL2`5;Bm=r^M*R4Ns=SYW6w@ZQHY3L0i@F7@5=Z+w4`h9`02{PQ4RJ zXD%m=y&Kc$CPK3c6ICnS#gtLHq@Nf*m(x13p7r)+G|lIh497ckhedbk)NQMJuku)& zai-XgoA>Y7S@CJq>oiTQL zV(iJ2Bg3i_0=w|QFB++oap{u&>MehH3_BX7b>Vcd!Ril=&sYXyV+HG>ST6fhGz2ht z%%4#~^{tZM!aF_~EvsQ&u$+QLqt=9;G%{9UipaY~4oJWhQ@;NT!Uf zHT~rBa8f&6*K?jW2OWut&ys}L$AAIv35I_I{1Q0*~0R#|0AbtYe|Hm(@D z2q1s}0tg_000IagfB*sryt%+mVqP;#rrZ45ZXLF5%X6B~UCXuGcGK(iEvHpnFBY$@ zZ(J)DMd9Y1LitW%{fw&Xx8J{2S}$+w^V;1|Z_j%BZ_-)$w%l&2=l_Z9FIx5w`ND<( z0tg_000IagfB*srAb