diff --git a/README.md b/README.md index 11055da..d6eeb83 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,16 @@ def repository(): return [meltano_run_job] ``` +You can inject Meltano config with the following Dagster config. + +```yaml +ops: + tap_smoke_test_target_jsonl: + config: + env: + TAP_SMOKE_TEST_STREAMS: '[{"stream_name": "new-stream", "input_filename": "demo.json"}]' +``` + ## Development using VSCode 1. Open this repository in Visual Studio Code. diff --git a/dagster_meltano/meltano_invoker.py b/dagster_meltano/meltano_invoker.py index ca0d50e..2cf032c 100644 --- a/dagster_meltano/meltano_invoker.py +++ b/dagster_meltano/meltano_invoker.py @@ -11,7 +11,6 @@ from dagster_meltano.log_processing import LogProcessor from dagster_meltano.log_processing.passthrough_processor import PassthroughLogProcessor -# log = structlog.get_logger() log = get_dagster_logger() diff --git a/dagster_meltano/meltano_resource.py b/dagster_meltano/meltano_resource.py index f255c3e..557d6b7 100644 --- a/dagster_meltano/meltano_resource.py +++ b/dagster_meltano/meltano_resource.py @@ -1,7 +1,7 @@ import asyncio import os from functools import lru_cache -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional from dagster import resource, Field @@ -19,13 +19,15 @@ def __init__( self, project_dir: str = None, meltano_bin: Optional[str] = "meltano", + env: Optional[Dict[str, Any]] = {}, ): self.project_dir = project_dir self.meltano_bin = meltano_bin self.meltano_invoker = MeltanoInvoker( bin=meltano_bin, cwd=project_dir, - log_level="info", # TODO: Get this from the resource config + log_level="info", # TODO: Get this from the resource config + env=env, ) async def load_json_from_cli(self, command: List[str]) -> dict: diff --git a/dagster_meltano/ops.py b/dagster_meltano/ops.py index 48b7c80..9cacd77 100644 --- a/dagster_meltano/ops.py +++ b/dagster_meltano/ops.py @@ -1,8 +1,7 @@ from __future__ import annotations - from functools import lru_cache -from typing import TYPE_CHECKING - +from typing import TYPE_CHECKING, Any, Dict, Optional +import json from dagster import ( In, Nothing, @@ -10,6 +9,7 @@ OpExecutionContext, get_dagster_logger, op, + Field, ) from dagster_meltano.log_processing.metadata_processor import MetadataLogProcessor @@ -24,7 +24,9 @@ @lru_cache -def meltano_run_op(command: str) -> OpDefinition: +def meltano_run_op( + command: str, +) -> OpDefinition: """ Run `meltano run ` using a Dagster op. @@ -32,31 +34,44 @@ def meltano_run_op(command: str) -> OpDefinition: same repository. """ dagster_name = generate_dagster_name(command) + ins = { + "after": In(Nothing), + } @op( name=dagster_name, description=f"Run `{command}` using Meltano.", - ins={"after": In(Nothing)}, + ins=ins, tags={"kind": "meltano"}, required_resource_keys={"meltano"}, + config_schema={ + "env": Field( + dict, + description="Environment variables to inject into the Meltano run process.", + default_value={}, + is_required=False, + ) + }, ) def dagster_op(context: OpExecutionContext): + """ + Run `meltano run ` using a Dagster op. + + Args: + context (OpExecutionContext): The Dagster op execution context. + """ meltano_resource: MeltanoResource = context.resources.meltano - log_results = meltano_resource.meltano_invoker.run_and_log( + + # Get the environment variables from the config and + # add them to the Meltano invoker + env = context.op_config.get("env") + meltano_resource.meltano_invoker.env.update(env) + + _log_results = meltano_resource.meltano_invoker.run_and_log( "run", MetadataLogProcessor, command.split(), ) - # dagster_logger.info(log_results[STDOUT]) - - # yield AssetMaterialization( - # asset_key="my_dataset", - # metadata={ - # "my_text_label": "hello", - # "dashboard_url": MetadataValue.url("http://mycoolsite.com/my_dashboard"), - # "num_rows": 0, - # }, - # ) return dagster_op diff --git a/tests/test_meltano_run.py b/tests/test_meltano_run.py index 02563c3..56c5453 100644 --- a/tests/test_meltano_run.py +++ b/tests/test_meltano_run.py @@ -1,4 +1,6 @@ import os +import subprocess +import pytest from dagster import job from dagster_meltano import meltano_resource, meltano_run_op @@ -32,7 +34,7 @@ def test_meltano_run(): assert job_response.success -def test_meltano_run_using_env(): +def test_meltano_run_using_env_root(): """ Check if we can run abitrary `meltano run` commands, with the project root defined using an env variable. @@ -41,3 +43,32 @@ def test_meltano_run_using_env(): job_response = meltano_run_job.execute_in_process() assert job_response.success + + +def test_meltano_run_injecting_env(): + """ + Check if we can inject environment variables into the `meltano run` command. + We test this by injecting a non existing Meltano environment, which should + cause the command to fail. + """ + with pytest.raises(subprocess.CalledProcessError): + meltano_run_job.execute_in_process( + { + "resources": { + "meltano": { + "config": { + "project_dir": MELTANO_PROJECT_TEST_PATH, + }, + } + }, + "ops": { + "tap_smoke_test_target_jsonl": { + "config": { + "env": { + "MELTANO_ENVIRONMENT": "non_existing_env", + } + } + } + }, + } + )