Skip to content

Commit

Permalink
Added automatic job loading
Browse files Browse the repository at this point in the history
  • Loading branch information
JulesHuisman committed Sep 22, 2022
1 parent ff07649 commit 37305bc
Show file tree
Hide file tree
Showing 12 changed files with 252 additions and 37 deletions.
1 change: 1 addition & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"python.linting.pylintEnabled": true,
"python.linting.lintOnSave": true,
"python.linting.pylintArgs": ["--rcfile", "${workspaceFolder}/.pylintrc"],
"python.analysis.extraPaths": ["/workspace"],
"python.sortImports.args": ["--profile", "black"],
"[python]": {
"editor.codeActionsOnSave": {
Expand Down
2 changes: 2 additions & 0 deletions .devcontainer/vscode.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
FROM quantiledevelopment/vscode-python:3.9

ENV MELTANO_PROJECT_ROOT=/workspace/meltano

RUN pipx install meltano==2.6.0
185 changes: 184 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,185 @@
**/.venv/
**/.meltano/
**/.meltano/

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# Local .terraform directories
**/.terraform/*

# .tfstate files
*.tfstate
*.tfstate.*

# Crash log files
crash.log

# Exclude all .tfvars files, which are likely to contain sentitive data, such as
# password, private keys, and other secrets. These should not be part of version
# control as they are data points which are potentially sensitive and subject
# to change depending on the environment.
#
*.tfvars

# Ignore override files as they are usually used to override resources locally and so
# are not checked in
override.tf
override.tf.json
*_override.tf
*_override.tf.json

# Include override files you do wish to add to version control using negated pattern
#
# !example_override.tf

# Include tfplan files to ignore the plan output of command: terraform plan -out=tfplan
# example: *tfplan*

# Ignore CLI configuration files
.terraformrc
terraform.rc

# Dask
**/dask-worker-space

# Sqlite
*.sqlite

# Modules
**/node_modules
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"python.analysis.extraPaths": ["/workspace"]
}
18 changes: 8 additions & 10 deletions dagster_meltano/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@ def load_jobs_from_meltano_project(
List[Union[JobDefinition, ScheduleDefinition]]: Returns a list of either Dagster JobDefinitions or ScheduleDefinitions
"""
meltano_resource = MeltanoResource(project_dir)
meltano_jobs = meltano_resource.jobs

list_result = subprocess.run(
[meltano_resource.meltano_bin, "schedule", "list", "--format=json"],
cwd=meltano_resource.project_dir,
stdout=subprocess.PIPE,
universal_newlines=True,
check=True,
)

schedule_export = json.loads(list_result.stdout)
logging.info(schedule_export)
return list(meltano_jobs)
logging.error("------------")
logging.error(meltano_resource.schedules)

return []


if __name__ == "__main__":
load_jobs_from_meltano_project("/workspace/meltano")
48 changes: 46 additions & 2 deletions dagster_meltano/meltano_resource.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,53 @@
from typing import Optional
import json
import subprocess
from typing import List, Optional

from dagster_meltano.utils import Singleton
from dagster import job, op

from dagster_meltano.utils import Singleton, generate_dagster_name


class MeltanoResource(metaclass=Singleton):
def __init__(self, project_dir: str, meltano_bin: Optional[str] = "meltano"):
self.project_dir = project_dir
self.meltano_bin = meltano_bin

# TODO: Refactor to different file
def run_cli(self, commands: List[str]):
return json.loads(
subprocess.run(
[self.meltano_bin] + commands,
cwd=self.project_dir,
stdout=subprocess.PIPE,
universal_newlines=True,
check=True,
).stdout
)

@property
def jobs(self) -> List[dict]:
meltano_jobs = self.run_cli(["job", "list", "--format=json"])

for meltano_job in meltano_jobs["jobs"]:

@op(name=f'run_{generate_dagster_name(meltano_job["job_name"])}')
def dagster_op():
print(
subprocess.run(
[self.meltano_bin, "--help"],
cwd=self.project_dir,
stdout=subprocess.PIPE,
universal_newlines=True,
check=True,
).stdout
)

@job(name=generate_dagster_name(meltano_job["job_name"]))
def dagster_job():
dagster_op()

yield dagster_job

@property
def schedules(self) -> List[dict]:
return self.run_cli(["schedule", "list", "--format=json"])
18 changes: 2 additions & 16 deletions files_dagster_ext/orchestrate/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,17 @@
from dagster import job, op, repository, with_resources
from dagster._utils import file_relative_path
from dagster_dbt import dbt_cli_resource

# from dagster_meltano import (
# load_assets_from_meltano_project,
# load_jobs_from_meltano_project,
# meltano_resource,
# )
from dagster_meltano.jobs import load_jobs_from_meltano_project

MELTANO_PROJECT_ROOT = os.getenv("MELTANO_PROJECT_ROOT", os.getcwd())
MELTANO_BIN = os.getenv("MELTANO_BIN", "meltano")


@op
def hello():
print("world")


@job
def hello_job():
hello()
meltano_jobs = load_jobs_from_meltano_project(MELTANO_PROJECT_ROOT)


@repository
def repository():
return [load_jobs_from_meltano_project(MELTANO_PROJECT_ROOT)]
return [meltano_jobs]


# MELTANO_PROJECT_DIR = file_relative_path(
Expand Down
12 changes: 6 additions & 6 deletions meltano/meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins:
utilities:
- name: dagster
namespace: dagster
pip_url: -e .. dbt-postgres
pip_url: -e /workspace dbt-postgres
executable: dagster_invoker
commands:
initialize:
Expand All @@ -17,11 +17,11 @@ plugins:
# invoke:
# executable: airflow_extension
# args: invoke
# settings:
# - name: core.dags_folder
# label: DAGs Folder
# value: $MELTANO_PROJECT_ROOT/orchestrate/dags
# env: AIRFLOW__CORE__DAGS_FOLDER
settings:
- name: dagster_home
label: The home folder of Dagster
value: $MELTANO_PROJECT_ROOT/.meltano
env: DAGSTER_HOME
# - name: core.plugins_folder
# label: Plugins Folder
# value: $MELTANO_PROJECT_ROOT/orchestrate/plugins
Expand Down
Binary file removed meltano/tmpk03xco8s/history/runs.db
Binary file not shown.
Binary file removed meltano/tmpk03xco8s/history/runs/index.db
Binary file not shown.
Binary file removed meltano/tmpk03xco8s/schedules/schedules.db
Binary file not shown.
2 changes: 0 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ dagster-dbt = ">=0.16,<0.17"
black = "^22.3.0"
isort = "^5.10.1"
flake8 = "^3.9.0"

[tool.poetry.group.dev.dependencies]
pylint = "^2.15.3"

[build-system]
Expand Down

0 comments on commit 37305bc

Please sign in to comment.