diff --git a/dagster_meltano/ops.py b/dagster_meltano/ops.py index 2e47f85..8e36bdc 100644 --- a/dagster_meltano/ops.py +++ b/dagster_meltano/ops.py @@ -24,7 +24,7 @@ @lru_cache def meltano_command_op( command: str, - dagster_name: Optional[str] = None, + dagster_name: str, ) -> OpDefinition: """ Run `meltano ` using a Dagster op. @@ -34,13 +34,11 @@ def meltano_command_op( Args: command (str): The Meltano command to run. - dagster_name (Optional[str], optional): The Dagster name to use for the op. - Defaults to None. + dagster_name (str): The Dagster name to use for the op. Returns: OpDefinition: The Dagster op definition. """ - dagster_name = dagster_name or generate_dagster_name(command) ins = { "after": In(Nothing), "env": In( @@ -101,14 +99,23 @@ def dagster_op( @lru_cache def meltano_run_op( command: str, + dagster_name: Optional[str] = None, + ) -> OpDefinition: """ Run `meltano run ` using a Dagster op. This factory is cached to make sure the same commands can be reused in the same repository. + + Args: + command (str): The Meltano command to run. + dagster_name (Optional[str], optional): The Dagster name to use for the op. + Defaults to None. """ - dagster_name = generate_dagster_name(command) + # if not set, generate a dagster_name from the command + dagster_name = dagster_name or generate_dagster_name(command) + return meltano_command_op( command=f"run {command} --force", dagster_name=dagster_name ) diff --git a/dagster_meltano/utils.py b/dagster_meltano/utils.py index 2c5edcc..7585c25 100644 --- a/dagster_meltano/utils.py +++ b/dagster_meltano/utils.py @@ -16,7 +16,7 @@ def generate_dagster_name(input_string) -> str: """ Generate a dagster safe name (^[A-Za-z0-9_]+$.) """ - return input_string.replace("-", "_").replace(" ", "_").replace(":", "_") + return input_string.replace("-", "_").replace(" ", "_").replace(":", "_").replace("=", "_") def generate_dbt_group_name(node_info: dict) -> str: diff --git a/tests/test_meltano_command.py b/tests/test_meltano_command.py index 184b025..8ddb4d5 100644 --- a/tests/test_meltano_command.py +++ b/tests/test_meltano_command.py @@ -9,7 +9,7 @@ @job(resource_defs={"meltano": meltano_resource}) def meltano_command_job(): - meltano_command_op("install extractor tap-smoke-test")() + meltano_command_op("install extractor tap-smoke-test", "custom_job_name")() def test_meltano_command():