Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dagster SDA workflow using @asset #28

Open
sohooo opened this issue Mar 9, 2023 · 13 comments
Open

dagster SDA workflow using @asset #28

sohooo opened this issue Mar 9, 2023 · 13 comments

Comments

@sohooo
Copy link

sohooo commented Mar 9, 2023

Hi, I want to define dagster @assets for meltano runs, as the dagster SDA seems to be the recommended/most reasonable way to design new data flows. How could this be configured with dagster-meltano?

@JulesHuisman
Copy link
Contributor

JulesHuisman commented Mar 10, 2023

I once created a proof of concept of this idea, however, it is currently not possible to see the schema of the taps. (The POC was done by using the Meltano python interface, which is not stable.)

image

We are possibly able to create assets for the taps themselves, would this help you?

@sohooo
Copy link
Author

sohooo commented Mar 10, 2023

Dagster recommends using SDAs for new pipelines, and I'd like to follow that mind shift in approaching a pipeline setup:

Dagster is mainly used to build data pipelines, and most data pipelines can be expressed in Dagster as sets of software-defined assets. If you’re a new Dagster user and your goal is to build a data pipeline, we recommend starting with software-defined assets and not worrying about ops or graphs. This is because most of the code you’ll be writing will directly relate to producing data assets.

It would be great if this library would support this new way of pipeline design. Please correct me if my reasoning is wrong or I'm missing something! I'm new in the field of DataOps, which a history of tooling like Jenkins, Rundeck, Gitlab, etc. :)

@sohooo
Copy link
Author

sohooo commented May 12, 2023

We are possibly able to create assets for the taps themselves, would this help you?

Yeah I think this would help!

I tried using @multi_assets to explicitly define the assets coming from Meltano tap-oracle:

@multi_asset(
    resource_defs={"meltano": meltano_resource},
    compute_kind="meltano",
    group_name="sources",
    outs={
        "departments": AssetOut(key_prefix=["hr"]),
        "employees": AssetOut(key_prefix=["hr"]),
        "jobs": AssetOut(key_prefix=["hr"]),
    }
)
def meltano_run_job():
    meltano_command_op("run tap-oracle target-postgres")()

Now there's a continuous lineage from Meltano-produced tables (dbt sources) to the following dbt assets:

image

However, when running meltano_run_job() (by materializing the assets in the sources group), the job fails with:

dagster._core.errors.DagsterInvalidInvocationError: Compute function of op 'run_tap_oracle_target_postgres' has context argument, but no context was provided when invoking.

Same error when using meltano_run_op(). What am I missing? How would you design this @JulesHuisman ? :)

@JulesHuisman
Copy link
Contributor

We are possibly able to create assets for the taps themselves, would this help you?

Yeah I think this would help!

I tried using @multi_assets to explicitly define the assets coming from Meltano tap-oracle:

@multi_asset(
    resource_defs={"meltano": meltano_resource},
    compute_kind="meltano",
    group_name="sources",
    outs={
        "departments": AssetOut(key_prefix=["hr"]),
        "employees": AssetOut(key_prefix=["hr"]),
        "jobs": AssetOut(key_prefix=["hr"]),
    }
)
def meltano_run_job():
    meltano_command_op("run tap-oracle target-postgres")()

Now there's a continuous lineage from Meltano-produced tables (dbt sources) to the following dbt assets:

image

However, when running meltano_run_job() (by materializing the assets in the sources group), the job fails with:

dagster._core.errors.DagsterInvalidInvocationError: Compute function of op 'run_tap_oracle_target_postgres' has context argument, but no context was provided when invoking.

Same error when using meltano_run_op(). What am I missing? How would you design this @JulesHuisman ? :)

I don't think you are able to call an op like that in the asset function. You might be able to achieve this by replicating this line:

output = meltano_resource.execute_command(f"{command}", env, context.log)
inside the asset.

@LeqitSebi
Copy link

LeqitSebi commented Jun 13, 2023

We are currently stuck at creating the meltano assets dynamically from a list. Our approach at the moment looks somewhat like this but it keeps overwriting the prior assets and just displays the last one. @JulesHuisman do you know of any way how this could be achieved?

names = ["a", "b", "c"]

for name in names:
    @multi_asset(
        compute_kind="meltano",
        group_name="sources",
        outs={
            name: AssetOut(key_prefix=["hr"])
        }
    )
    def meltano_run_single(context: OpExecutionContext):
        return "a"

@JulesHuisman
Copy link
Contributor

You could do something like this, use a factory design to automatically create assets. In this example it are individual assets, but you could do the same to dynamically create multi assets.

def meltano_asset_factory(names: list) -> list:
    assets = []

    for name in names:
        @asset
        def compute():
            return name
        
        assets.append(compute)

    return assets

@LeqitSebi
Copy link

Thank you. With the help of the dagster slack channel we kinda figured it out.
Code to run the meltano job

def meltano_run_job(context, table: str):
    context.log.info(context.selected_output_names)

    #Run the meltano job "import_hr" with logging
    execute_shell_command(
        f"NO_COLOR=1 TAP_ORACLE__HR_FILTER_TABLES={table} meltano run import_hr",
        output_logging="STREAM",
        log=context.log,
        cwd=MELTANO_PROJECT,
        # env={"FILTER_TABLES":table},
    )

Code to build the assets out of YAML spec

def build_asset(spec) -> AssetsDefinition:
    @asset(name=spec["name"], group_name="sources", key_prefix="hr", compute_kind="meltano")
    def _asset(context):
        meltano_run_job(context=context, table=spec["table"])

    return _asset

assets=[build_asset(spec) for spec in asset_list]

The last thing we are missing is how to correctly use the filter_table env variable.

@andymcarter
Copy link

andymcarter commented Jun 23, 2023

@LeqitSebi I think the SELECT env var might help if you just want to run meltano to update a single table.

Did you move away from the multi asset approach? Multi asset makes more sense if you have many child streams. With the single asset approach would you be reiterating your API calls?

Multi asset fits better with my intended outcome, that any request to any meltano table causes a run of the whole tap and target combo.

@acarter24
Copy link

acarter24 commented Jul 28, 2023

I figured out a workflow that more or less works and gives the correct lineage, assuming you want to run the whole tap, plus downstream tables. This goes into the dagster repository.py. You have to define your taps and streams in the python code (although probably a way to get this from meltano.yml). My sources are defined with a raw_ prefix, so tap-freshdesk becomes raw_freshdesk source in dbt. I have also defined my groups in dbt. I tend to materialise entire dbt groups at ones, so I create a job and schedule for my group.

import os
from pathlib import Path

from dagster import ScheduleDefinition, DefaultScheduleStatus, Definitions, define_asset_job, AssetOut, multi_asset, OpExecutionContext, ConfigurableResource, AssetSelection
import enum

from dagster_meltano import meltano_resource

from dagster_dbt import load_assets_from_dbt_project, DbtCliResource

DBT_PROJECT_PATH = str(Path(__file__).parent.parent.parent.parent / "my_dbt_directory")
DBT_PROFILE = os.getenv('DBT_PROFILE')
DBT_TARGET = os.getenv('DBT_TARGET')

class MeltanoEnv(enum.Enum):
    dev = enum.auto()
    prod = enum.auto()

MELTANO_PROJECT_DIR = os.getenv("MELTANO_PROJECT_ROOT", os.getcwd())
MELTANO_BIN = os.getenv("MELTANO_BIN", "meltano")

resources= {
    "dbt": DbtCliResource(project_dir=DBT_PROJECT_PATH, target=DBT_TARGET, profile=DBT_PROFILE),
    # "meltano": meltano_resource,
}

ALL_TAP_STREAMS = {
    "freshdesk": [
        "conversations",
        "ticket_fields",
        "tickets_detail",
    ],
    "mailchimp": [
        "campaigns",
        "lists",
        "lists_members",
        "reports_email_activity",
        "reports_sent_to",
        "reports_unsubscribes",
    ],
    "instagram": [
        "media",
        "media_children",
        "media_insights",
        "stories",
        "story_insights",
    ],
    "tiktok": [
        "accounts",
        "videos",
        "comments",
    ]
}

def meltano_asset_factory(all_tap_streams: list) -> list:
    multi_assets = []
    jobs = []
    schedules = []

    for tap_name, tap_streams in all_tap_streams.items():
        @multi_asset(
            name=tap_name,
            resource_defs={'meltano': meltano_resource},
            compute_kind="meltano",
            group_name=tap_name,
            outs={
            stream: AssetOut(key_prefix=[f'raw_{tap_name}'])
            for stream
            in tap_streams
            }
        )
        def compute(context: OpExecutionContext, meltano: ConfigurableResource):
            command = f"run tap-{context.op.name} target-postgres"
            meltano.execute_command(f"{command}", dict(), context.log)
            return tuple([None for _ in context.selected_output_names])
        
        multi_assets.append(compute)

        asset_job = define_asset_job(f"{tap_name}_assets", AssetSelection.groups(tap_name))

        basic_schedule = ScheduleDefinition(
            job=asset_job, 
            cron_schedule="@hourly", 
            default_status=DefaultScheduleStatus.RUNNING
        )

        jobs.append(asset_job)
        schedules.append(basic_schedule)

    return multi_assets, jobs, schedules

meltano_assets, jobs, schedules = meltano_asset_factory(ALL_TAP_STREAMS)

dbt_assets = load_assets_from_dbt_project(DBT_PROJECT_PATH, profiles_dir=DBT_PROJECT_PATH,)

defs = Definitions(
    assets= (dbt_assets + meltano_assets),
    resources= resources,
    jobs=jobs,
    schedules=schedules,
)

@Jcohen010
Copy link

@jaceksan and I are working on an extenstion of the dagster-meltano plugin that includes functionality to automatically load tapstreams into dagster as assets.

We're just getting started on it; collaborators are welcome!

@acarter24
Copy link

@jaceksan and I are working on an extenstion of the dagster-meltano plugin that includes functionality to automatically load tapstreams into dagster as assets.

We're just getting started on it; collaborators are welcome!

If I understand correctly that would mean it's not necessary to keep a list of taps and streams in repository.py? That would be a nice improvement! One issue I've found is regularising the job names and dbt sources/groups to make sure the dependency chain in dagster works.

@jaceksan
Copy link

@jaceksan and I are working on an extenstion of the dagster-meltano plugin that includes functionality to automatically load tapstreams into dagster as assets.
We're just getting started on it; collaborators are welcome!

If I understand correctly that would mean it's not necessary to keep a list of taps and streams in repository.py? That would be a nice improvement! One issue I've found is regularising the job names and dbt sources/groups to make sure the dependency chain in Dagster works.

We are now struggling with how streams/attributes are defined in meltano.yml, resp. in meltano_manifest.json.
There may be globs.
I suggested running meltano select and identifying all selected attributes from the output.
Names in this output are equal to target table names and column names, at least in my demo (tap-gihub, tap-exchangeratehost, tap-s3-csv; many targets).

I loaded dbt assets to Dagster in my demo project and asset names are equal to underlying table names.
So, should not both Meltano and Dagster asset names match?

@acarter24
Copy link

acarter24 commented Jan 16, 2024

The issue I've found is consistency in naming between meltano and DBT. I don't worry too much about the asset names in dagster really.

In my instance, every tap in meltano has an equivalent source named raw_{TAP}, and I had to specify these as related downstream assets. Hence the asset_out part in the below.

    for tap_name, tap_streams in all_tap_streams.items():
        @multi_asset(
            name=tap_name,
            resource_defs={'meltano': meltano_resource},
            compute_kind="meltano",
            group_name=tap_name,
            outs={
            stream: AssetOut(key_prefix=[f'raw_{tap_name}'])
            for stream
            in tap_streams
            }
        )

Without this info, dagster can't infer that the dbt source is a downstream of the associated meltano stream.

Does that explain a bit more?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants