Skip to content

Commit

Permalink
Merge pull request #33 from quantile-development/feature/retry-policy
Browse files Browse the repository at this point in the history
Added the ability to retry
  • Loading branch information
JulesHuisman authored May 18, 2023
2 parents d6fe6ac + 4bb1804 commit bf013c9
Show file tree
Hide file tree
Showing 9 changed files with 1,051 additions and 878 deletions.
8 changes: 7 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,11 @@
},
"python.testing.pytestArgs": ["tests"],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
"python.testing.pytestEnabled": true,
"[python]": {
"diffEditor.ignoreTrimWhitespace": false,
"editor.formatOnType": true,
"editor.wordBasedSuggestions": false,
"editor.formatOnSave": true
}
}
3 changes: 3 additions & 0 deletions dagster_meltano/generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,22 @@

def load_jobs_from_meltano_project(
meltano_project_dir: Optional[str],
retries: int = 0,
) -> List[Union[JobDefinition, ScheduleDefinition]]:
"""This function generates dagster jobs for all jobs defined in the Meltano project. If there are schedules connected
to the jobs, it also returns those.
Args:
project_dir (Optional[str], optional): The location of the Meltano project. Defaults to os.getenv("MELTANO_PROJECT_ROOT").
retries (int, optional): The number of retries to attempt if the Meltano CLI fails to run. Defaults to 0.
Returns:
List[Union[JobDefinition, ScheduleDefinition]]: Returns a list of either Dagster JobDefinitions or ScheduleDefinitions
"""
meltano_resource = MeltanoResource(
project_dir=meltano_project_dir,
meltano_bin="meltano",
retries=retries,
)

meltano_jobs = meltano_resource.jobs
Expand Down
5 changes: 4 additions & 1 deletion dagster_meltano/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@
get_dagster_logger,
job,
op,
RetryPolicy,
)

from dagster_meltano.ops import meltano_run_op as meltano_run_op_factory
from dagster_meltano.utils import generate_dagster_name


class Job:
def __init__(self, meltano_job: dict) -> None:
def __init__(self, meltano_job: dict, retries: int = 0) -> None:
self.name = meltano_job["job_name"]
self.tasks = meltano_job["tasks"]
self.retries = retries

@property
def dagster_name(self) -> str:
Expand All @@ -40,6 +42,7 @@ def dagster_job(self) -> JobDefinition:
name=self.dagster_name,
description=f"Runs the `{self.name}` job from Meltano.",
resource_defs={"meltano": meltano_resource},
op_retry_policy=RetryPolicy(max_retries=self.retries),
)
def dagster_job():
op_layers = [[], []]
Expand Down
34 changes: 28 additions & 6 deletions dagster_meltano/meltano_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@


class MeltanoResource(metaclass=Singleton):
def __init__(self, project_dir: str = None, meltano_bin: Optional[str] = "meltano"):
def __init__(
self,
project_dir: str = None,
meltano_bin: Optional[str] = "meltano",
retries: int = 0,
):
self.project_dir = str(project_dir)
self.meltano_bin = meltano_bin
self.retries = retries

@property
def default_env(self) -> Dict[str, str]:
Expand Down Expand Up @@ -65,8 +71,6 @@ def execute_command(
f"Command '{command}' failed with exit code {exit_code}"
)

print(output)

return output

async def load_json_from_cli(self, command: List[str]) -> dict:
Expand Down Expand Up @@ -118,7 +122,13 @@ def meltano_yaml(self) -> dict:
@cached_property
def meltano_jobs(self) -> List[Job]:
meltano_job_list = self.meltano_yaml["jobs"]
return [Job(meltano_job) for meltano_job in meltano_job_list]
return [
Job(
meltano_job=meltano_job,
retries=self.retries,
)
for meltano_job in meltano_job_list
]

@cached_property
def meltano_schedules(self) -> List[Schedule]:
Expand Down Expand Up @@ -146,14 +156,26 @@ def jobs(self) -> List[dict]:
config_schema={
"project_dir": Field(
str,
description="The path to the Meltano project.",
default_value=os.getenv("MELTANO_PROJECT_ROOT", os.getcwd()),
is_required=False,
)
),
"retries": Field(
int,
description="The number of times to retry a failed job.",
default_value=0,
is_required=False,
),
},
)
def meltano_resource(init_context):
project_dir = init_context.resource_config["project_dir"]
return MeltanoResource(project_dir)
retries = init_context.resource_config["retries"]

return MeltanoResource(
project_dir=project_dir,
retries=retries,
)


if __name__ == "__main__":
Expand Down
11 changes: 8 additions & 3 deletions meltano_project/orchestrate/dagster/repository.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import os

from dagster import repository
from dagster import job, repository, Definitions

from dagster_meltano import load_jobs_from_meltano_project
from dagster_meltano import load_jobs_from_meltano_project, meltano_resource

MELTANO_PROJECT_DIR = os.getenv("MELTANO_PROJECT_ROOT", os.getcwd())
MELTANO_BIN = os.getenv("MELTANO_BIN", "meltano")


@repository
def meltano_jobs():
return [load_jobs_from_meltano_project(MELTANO_PROJECT_DIR)]
return [
load_jobs_from_meltano_project(
meltano_project_dir=MELTANO_PROJECT_DIR,
retries=1,
)
]
Empty file removed meltano_project/requirements.txt
Empty file.
Loading

0 comments on commit bf013c9

Please sign in to comment.