Skip to content

Commit

Permalink
feat(meltano): ✨ Inject environement variable for ops (#20)
Browse files Browse the repository at this point in the history
* feat(dagster): ✨ Inject environement variable to dagster-meltno ops

* WIP: fix transfer dict in python API

* fix: solution with op_config

* Small env inject refactor

* Added an example to the readme

Co-authored-by: Pedot T <[email protected]>
Co-authored-by: Jules Huisman <jules.huisman>
  • Loading branch information
slamer59 and Pedot T authored Jan 15, 2023
1 parent a6e597e commit 37ccda5
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 20 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ def repository():
return [meltano_run_job]
```

You can inject Meltano config with the following Dagster config.

```yaml
ops:
tap_smoke_test_target_jsonl:
config:
env:
TAP_SMOKE_TEST_STREAMS: '[{"stream_name": "new-stream", "input_filename": "demo.json"}]'
```
## Development using VSCode
1. Open this repository in Visual Studio Code.
Expand Down
1 change: 0 additions & 1 deletion dagster_meltano/meltano_invoker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from dagster_meltano.log_processing import LogProcessor
from dagster_meltano.log_processing.passthrough_processor import PassthroughLogProcessor

# log = structlog.get_logger()
log = get_dagster_logger()


Expand Down
6 changes: 4 additions & 2 deletions dagster_meltano/meltano_resource.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import os
from functools import lru_cache
from typing import Dict, List, Optional
from typing import Any, Dict, List, Optional

from dagster import resource, Field

Expand All @@ -19,13 +19,15 @@ def __init__(
self,
project_dir: str = None,
meltano_bin: Optional[str] = "meltano",
env: Optional[Dict[str, Any]] = {},
):
self.project_dir = project_dir
self.meltano_bin = meltano_bin
self.meltano_invoker = MeltanoInvoker(
bin=meltano_bin,
cwd=project_dir,
log_level="info", # TODO: Get this from the resource config
log_level="info", # TODO: Get this from the resource config
env=env,
)

async def load_json_from_cli(self, command: List[str]) -> dict:
Expand Down
47 changes: 31 additions & 16 deletions dagster_meltano/ops.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from __future__ import annotations

from functools import lru_cache
from typing import TYPE_CHECKING

from typing import TYPE_CHECKING, Any, Dict, Optional
import json
from dagster import (
In,
Nothing,
OpDefinition,
OpExecutionContext,
get_dagster_logger,
op,
Field,
)

from dagster_meltano.log_processing.metadata_processor import MetadataLogProcessor
Expand All @@ -24,39 +24,54 @@


@lru_cache
def meltano_run_op(command: str) -> OpDefinition:
def meltano_run_op(
command: str,
) -> OpDefinition:
"""
Run `meltano run <command>` using a Dagster op.
This factory is cached to make sure the same commands can be reused in the
same repository.
"""
dagster_name = generate_dagster_name(command)
ins = {
"after": In(Nothing),
}

@op(
name=dagster_name,
description=f"Run `{command}` using Meltano.",
ins={"after": In(Nothing)},
ins=ins,
tags={"kind": "meltano"},
required_resource_keys={"meltano"},
config_schema={
"env": Field(
dict,
description="Environment variables to inject into the Meltano run process.",
default_value={},
is_required=False,
)
},
)
def dagster_op(context: OpExecutionContext):
"""
Run `meltano run <command>` using a Dagster op.
Args:
context (OpExecutionContext): The Dagster op execution context.
"""
meltano_resource: MeltanoResource = context.resources.meltano
log_results = meltano_resource.meltano_invoker.run_and_log(

# Get the environment variables from the config and
# add them to the Meltano invoker
env = context.op_config.get("env")
meltano_resource.meltano_invoker.env.update(env)

_log_results = meltano_resource.meltano_invoker.run_and_log(
"run",
MetadataLogProcessor,
command.split(),
)
# dagster_logger.info(log_results[STDOUT])

# yield AssetMaterialization(
# asset_key="my_dataset",
# metadata={
# "my_text_label": "hello",
# "dashboard_url": MetadataValue.url("http://mycoolsite.com/my_dashboard"),
# "num_rows": 0,
# },
# )

return dagster_op

Expand Down
33 changes: 32 additions & 1 deletion tests/test_meltano_run.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import os
import subprocess
import pytest
from dagster import job

from dagster_meltano import meltano_resource, meltano_run_op
Expand Down Expand Up @@ -32,7 +34,7 @@ def test_meltano_run():
assert job_response.success


def test_meltano_run_using_env():
def test_meltano_run_using_env_root():
"""
Check if we can run abitrary `meltano run` commands, with the project
root defined using an env variable.
Expand All @@ -41,3 +43,32 @@ def test_meltano_run_using_env():
job_response = meltano_run_job.execute_in_process()

assert job_response.success


def test_meltano_run_injecting_env():
"""
Check if we can inject environment variables into the `meltano run` command.
We test this by injecting a non existing Meltano environment, which should
cause the command to fail.
"""
with pytest.raises(subprocess.CalledProcessError):
meltano_run_job.execute_in_process(
{
"resources": {
"meltano": {
"config": {
"project_dir": MELTANO_PROJECT_TEST_PATH,
},
}
},
"ops": {
"tap_smoke_test_target_jsonl": {
"config": {
"env": {
"MELTANO_ENVIRONMENT": "non_existing_env",
}
}
}
},
}
)

0 comments on commit 37ccda5

Please sign in to comment.