diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 4def534..760d251 100755 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -41,6 +41,7 @@ "python.linting.pylintEnabled": true, "python.linting.lintOnSave": true, "python.linting.pylintArgs": ["--rcfile", "${workspaceFolder}/.pylintrc"], + "python.analysis.extraPaths": ["/workspace"], "python.sortImports.args": ["--profile", "black"], "[python]": { "editor.codeActionsOnSave": { diff --git a/.devcontainer/vscode.Dockerfile b/.devcontainer/vscode.Dockerfile index 75a63f3..f3f2631 100755 --- a/.devcontainer/vscode.Dockerfile +++ b/.devcontainer/vscode.Dockerfile @@ -1,3 +1,5 @@ FROM quantiledevelopment/vscode-python:3.9 +ENV MELTANO_PROJECT_ROOT=/workspace/meltano + RUN pipx install meltano==2.6.0 \ No newline at end of file diff --git a/.gitignore b/.gitignore index c6075d3..0830ecb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,185 @@ **/.venv/ -**/.meltano/ \ No newline at end of file +**/.meltano/ + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# Local .terraform directories +**/.terraform/* + +# .tfstate files +*.tfstate +*.tfstate.* + +# Crash log files +crash.log + +# Exclude all .tfvars files, which are likely to contain sentitive data, such as +# password, private keys, and other secrets. These should not be part of version +# control as they are data points which are potentially sensitive and subject +# to change depending on the environment. +# +*.tfvars + +# Ignore override files as they are usually used to override resources locally and so +# are not checked in +override.tf +override.tf.json +*_override.tf +*_override.tf.json + +# Include override files you do wish to add to version control using negated pattern +# +# !example_override.tf + +# Include tfplan files to ignore the plan output of command: terraform plan -out=tfplan +# example: *tfplan* + +# Ignore CLI configuration files +.terraformrc +terraform.rc + +# Dask +**/dask-worker-space + +# Sqlite +*.sqlite + +# Modules +**/node_modules \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..7731c26 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "python.analysis.extraPaths": ["/workspace"] +} diff --git a/dagster_meltano/jobs.py b/dagster_meltano/jobs.py index a36963a..244e39c 100644 --- a/dagster_meltano/jobs.py +++ b/dagster_meltano/jobs.py @@ -21,16 +21,14 @@ def load_jobs_from_meltano_project( List[Union[JobDefinition, ScheduleDefinition]]: Returns a list of either Dagster JobDefinitions or ScheduleDefinitions """ meltano_resource = MeltanoResource(project_dir) + meltano_jobs = meltano_resource.jobs - list_result = subprocess.run( - [meltano_resource.meltano_bin, "schedule", "list", "--format=json"], - cwd=meltano_resource.project_dir, - stdout=subprocess.PIPE, - universal_newlines=True, - check=True, - ) - - schedule_export = json.loads(list_result.stdout) - logging.info(schedule_export) + return list(meltano_jobs) + logging.error("------------") + logging.error(meltano_resource.schedules) return [] + + +if __name__ == "__main__": + load_jobs_from_meltano_project("/workspace/meltano") diff --git a/dagster_meltano/meltano_resource.py b/dagster_meltano/meltano_resource.py index 7173c3a..9fe8af5 100644 --- a/dagster_meltano/meltano_resource.py +++ b/dagster_meltano/meltano_resource.py @@ -1,9 +1,53 @@ -from typing import Optional +import json +import subprocess +from typing import List, Optional -from dagster_meltano.utils import Singleton +from dagster import job, op + +from dagster_meltano.utils import Singleton, generate_dagster_name class MeltanoResource(metaclass=Singleton): def __init__(self, project_dir: str, meltano_bin: Optional[str] = "meltano"): self.project_dir = project_dir self.meltano_bin = meltano_bin + + # TODO: Refactor to different file + def run_cli(self, commands: List[str]): + return json.loads( + subprocess.run( + [self.meltano_bin] + commands, + cwd=self.project_dir, + stdout=subprocess.PIPE, + universal_newlines=True, + check=True, + ).stdout + ) + + @property + def jobs(self) -> List[dict]: + meltano_jobs = self.run_cli(["job", "list", "--format=json"]) + + for meltano_job in meltano_jobs["jobs"]: + + @op(name=f'run_{generate_dagster_name(meltano_job["job_name"])}') + def dagster_op(): + print( + subprocess.run( + [self.meltano_bin, "--help"], + cwd=self.project_dir, + stdout=subprocess.PIPE, + universal_newlines=True, + check=True, + ).stdout + ) + + @job(name=generate_dagster_name(meltano_job["job_name"])) + def dagster_job(): + dagster_op() + + yield dagster_job + + @property + def schedules(self) -> List[dict]: + return self.run_cli(["schedule", "list", "--format=json"]) diff --git a/files_dagster_ext/orchestrate/repository.py b/files_dagster_ext/orchestrate/repository.py index baabb8c..b978c24 100644 --- a/files_dagster_ext/orchestrate/repository.py +++ b/files_dagster_ext/orchestrate/repository.py @@ -6,31 +6,17 @@ from dagster import job, op, repository, with_resources from dagster._utils import file_relative_path from dagster_dbt import dbt_cli_resource - -# from dagster_meltano import ( -# load_assets_from_meltano_project, -# load_jobs_from_meltano_project, -# meltano_resource, -# ) from dagster_meltano.jobs import load_jobs_from_meltano_project MELTANO_PROJECT_ROOT = os.getenv("MELTANO_PROJECT_ROOT", os.getcwd()) MELTANO_BIN = os.getenv("MELTANO_BIN", "meltano") - -@op -def hello(): - print("world") - - -@job -def hello_job(): - hello() +meltano_jobs = load_jobs_from_meltano_project(MELTANO_PROJECT_ROOT) @repository def repository(): - return [load_jobs_from_meltano_project(MELTANO_PROJECT_ROOT)] + return [meltano_jobs] # MELTANO_PROJECT_DIR = file_relative_path( diff --git a/meltano/meltano.yml b/meltano/meltano.yml index 3fd2fe6..6d19c6f 100644 --- a/meltano/meltano.yml +++ b/meltano/meltano.yml @@ -5,7 +5,7 @@ plugins: utilities: - name: dagster namespace: dagster - pip_url: -e .. dbt-postgres + pip_url: -e /workspace dbt-postgres executable: dagster_invoker commands: initialize: @@ -17,11 +17,11 @@ plugins: # invoke: # executable: airflow_extension # args: invoke - # settings: - # - name: core.dags_folder - # label: DAGs Folder - # value: $MELTANO_PROJECT_ROOT/orchestrate/dags - # env: AIRFLOW__CORE__DAGS_FOLDER + settings: + - name: dagster_home + label: The home folder of Dagster + value: $MELTANO_PROJECT_ROOT/.meltano + env: DAGSTER_HOME # - name: core.plugins_folder # label: Plugins Folder # value: $MELTANO_PROJECT_ROOT/orchestrate/plugins diff --git a/meltano/tmpk03xco8s/history/runs.db b/meltano/tmpk03xco8s/history/runs.db deleted file mode 100644 index 0685e4d..0000000 Binary files a/meltano/tmpk03xco8s/history/runs.db and /dev/null differ diff --git a/meltano/tmpk03xco8s/history/runs/index.db b/meltano/tmpk03xco8s/history/runs/index.db deleted file mode 100644 index 01be3ff..0000000 Binary files a/meltano/tmpk03xco8s/history/runs/index.db and /dev/null differ diff --git a/meltano/tmpk03xco8s/schedules/schedules.db b/meltano/tmpk03xco8s/schedules/schedules.db deleted file mode 100644 index d5bcd41..0000000 Binary files a/meltano/tmpk03xco8s/schedules/schedules.db and /dev/null differ diff --git a/pyproject.toml b/pyproject.toml index 90d4e1b..41bd91c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,8 +18,6 @@ dagster-dbt = ">=0.16,<0.17" black = "^22.3.0" isort = "^5.10.1" flake8 = "^3.9.0" - -[tool.poetry.group.dev.dependencies] pylint = "^2.15.3" [build-system]