Skip to content

Commit

Permalink
Added local dagster meltano
Browse files Browse the repository at this point in the history
  • Loading branch information
JulesHuisman committed Sep 22, 2022
1 parent 3f11d6b commit ff07649
Show file tree
Hide file tree
Showing 38 changed files with 2,626 additions and 408 deletions.
Binary file modified dagster_ext/__pycache__/__init__.cpython-39.pyc
Binary file not shown.
Binary file modified dagster_ext/__pycache__/extension.cpython-39.pyc
Binary file not shown.
Binary file modified dagster_ext/__pycache__/main.cpython-39.pyc
Binary file not shown.
Binary file modified dagster_ext/__pycache__/pass_through.cpython-39.pyc
Binary file not shown.
4 changes: 2 additions & 2 deletions dagster_ext/extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def pass_through_invoker(
command_args=command_args,
)
try:
self.pre_invoke()
self.pre_invoke(None)
except Exception:
logger.exception(
"pre_invoke failed with uncaught exception, please report to maintainer"
Expand All @@ -95,7 +95,7 @@ def pass_through_invoker(
sys.exit(1)

try:
self.post_invoke()
self.post_invoke(None)
except Exception:
logger.exception(
"post_invoke failed with uncaught exception, please report to maintainer" # noqa: E501
Expand Down
Binary file added dagster_meltano/__pycache__/jobs.cpython-39.pyc
Binary file not shown.
Binary file not shown.
Binary file added dagster_meltano/__pycache__/utils.cpython-39.pyc
Binary file not shown.
40 changes: 40 additions & 0 deletions dagster_meltano/assets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from typing import List, Optional

from dagster import AssetsDefinition
from dagster_dbt import load_assets_from_dbt_project

# from dagster_meltano.meltano_resource import MeltanoResource
# from .utils import generate_dbt_group_name


def load_assets_from_meltano_project(
meltano_project_dir: str,
dbt_project_dir: Optional[str] = None,
dbt_profiles_dir: Optional[str] = None,
dbt_target_dir: Optional[str] = None,
dbt_use_build_command: bool = True,
) -> List[AssetsDefinition]:
"""This function generates all Assets it can find in the supplied Meltano project.
This currently includes the taps and dbt assets.
Args:
project_dir (Optional[str], optional): The location of the Meltano project. Defaults to os.getenv("MELTANO_PROJECT_ROOT").
Returns:
List[AssetsDefinition]: Returns a list of all Meltano assets
"""
return []
meltano_resource = MeltanoResource(meltano_project_dir)
meltano_assets = [extractor.asset for extractor in meltano_resource.extractors]

if dbt_project_dir:
dbt_assets = load_assets_from_dbt_project(
project_dir=dbt_project_dir,
profiles_dir=dbt_profiles_dir,
target_dir=dbt_target_dir,
use_build_command=dbt_use_build_command,
node_info_to_group_fn=generate_dbt_group_name,
)
meltano_assets += dbt_assets

return meltano_assets
Empty file added dagster_meltano/cli/__init__.py
Empty file.
Empty file added dagster_meltano/cli/utils.py
Empty file.
36 changes: 36 additions & 0 deletions dagster_meltano/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import json
import logging
import subprocess
from typing import List, Optional, Union

from dagster import JobDefinition, ScheduleDefinition

from dagster_meltano.meltano_resource import MeltanoResource


def load_jobs_from_meltano_project(
project_dir: Optional[str],
) -> List[Union[JobDefinition, ScheduleDefinition]]:
"""This function generates dagster jobs for all jobs defined in the Meltano project. If there are schedules connected
to the jobs, it also returns those.
Args:
project_dir (Optional[str], optional): The location of the Meltano project. Defaults to os.getenv("MELTANO_PROJECT_ROOT").
Returns:
List[Union[JobDefinition, ScheduleDefinition]]: Returns a list of either Dagster JobDefinitions or ScheduleDefinitions
"""
meltano_resource = MeltanoResource(project_dir)

list_result = subprocess.run(
[meltano_resource.meltano_bin, "schedule", "list", "--format=json"],
cwd=meltano_resource.project_dir,
stdout=subprocess.PIPE,
universal_newlines=True,
check=True,
)

schedule_export = json.loads(list_result.stdout)
logging.info(schedule_export)

return []
9 changes: 9 additions & 0 deletions dagster_meltano/meltano_resource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from typing import Optional

from dagster_meltano.utils import Singleton


class MeltanoResource(metaclass=Singleton):
def __init__(self, project_dir: str, meltano_bin: Optional[str] = "meltano"):
self.project_dir = project_dir
self.meltano_bin = meltano_bin
21 changes: 21 additions & 0 deletions dagster_meltano/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
class Singleton(type):
_instances = {}

def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]


def generate_dagster_name(string) -> str:
"""
Generate a dagster safe name (^[A-Za-z0-9_]+$.)
"""
return string.replace("-", "_").replace(" ", "_")


def generate_dbt_group_name(node_info):
if len(node_info.get("fqn", [])) >= 3:
return "_".join(node_info["fqn"][1:-1])

return "dbt"
5 changes: 5 additions & 0 deletions dagster_meltano_old/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .assets import load_assets_from_meltano_project
from .jobs import load_jobs_from_meltano_project
from .meltano.extractor import Extractor
from .meltano.resource import MeltanoResource, meltano_resource
from .ops import meltano_install_op, meltano_run_op
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file added dagster_meltano_old/__pycache__/utils.cpython-38.pyc
Binary file not shown.
40 changes: 40 additions & 0 deletions dagster_meltano_old/assets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from typing import List, Optional

from dagster import AssetsDefinition

from dagster_dbt import load_assets_from_dbt_project

from .meltano.resource import MeltanoResource
from .utils import generate_dbt_group_name


def load_assets_from_meltano_project(
meltano_project_dir: str,
dbt_project_dir: Optional[str] = None,
dbt_profiles_dir: Optional[str] = None,
dbt_target_dir: Optional[str] = None,
dbt_use_build_command: bool = True,
) -> List[AssetsDefinition]:
"""This function generates all Assets it can find in the supplied Meltano project.
This currently includes the taps and dbt assets.
Args:
project_dir (Optional[str], optional): The location of the Meltano project. Defaults to os.getenv("MELTANO_PROJECT_ROOT").
Returns:
List[AssetsDefinition]: Returns a list of all Meltano assets
"""
meltano_resource = MeltanoResource(meltano_project_dir)
meltano_assets = [extractor.asset for extractor in meltano_resource.extractors]

if dbt_project_dir:
dbt_assets = load_assets_from_dbt_project(
project_dir=dbt_project_dir,
profiles_dir=dbt_profiles_dir,
target_dir=dbt_target_dir,
use_build_command=dbt_use_build_command,
node_info_to_group_fn=generate_dbt_group_name,
)
meltano_assets += dbt_assets

return meltano_assets
24 changes: 24 additions & 0 deletions dagster_meltano_old/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from functools import cache
from typing import List, Optional, Union

from dagster import JobDefinition, ScheduleDefinition

from .meltano.resource import MeltanoResource


@cache
def load_jobs_from_meltano_project(
project_dir: Optional[str],
) -> List[Union[JobDefinition, ScheduleDefinition]]:
"""This function generates dagster jobs for all jobs defined in the Meltano project. If there are schedules connected
to the jobs, it also returns those.
Args:
project_dir (Optional[str], optional): The location of the Meltano project. Defaults to os.getenv("MELTANO_PROJECT_ROOT").
Returns:
List[Union[JobDefinition, ScheduleDefinition]]: Returns a list of either Dagster JobDefinitions or ScheduleDefinitions
"""

meltano_resource = MeltanoResource(project_dir)
return list(meltano_resource.jobs)
Empty file.
163 changes: 163 additions & 0 deletions dagster_meltano_old/meltano/extract_load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
from __future__ import annotations

import asyncio
import logging
from contextlib import closing
from typing import TYPE_CHECKING, Dict, Optional

from dagster import Field, OpExecutionContext, Out, Output, op
from meltano.cli.elt import _elt_context_builder, _run_extract_load
from meltano.core.job import Job
from meltano.core.logging import JobLoggingService, OutputLogger

if TYPE_CHECKING:
from .resource import MeltanoResource

from dataclasses import dataclass

from .logger import Metrics, add_repeat_handler


@dataclass
class Config:
full_refresh: str = False
state_suffix: Optional[str] = None
state_id: Optional[str] = None


def generate_state_id(
environment: str,
extractor_name: str,
loader_name: str,
suffix: str,
):
state_id = f"{environment}:{extractor_name}-to-{loader_name}"

if suffix:
state_id += f":{suffix}"

return state_id


def load_config_from_dagster(context: OpExecutionContext) -> Config:
"""
If no config was provided the `op_config` value is None.
In this case we create a Config instance with default values.
"""
if context.op_config == None:
return Config()

full_refresh = context.op_config.get("full_refresh", False)
state_suffix = context.op_config.get("state_suffix", None)
state_id = context.op_config.get("state_id", None)

return Config(
full_refresh=full_refresh,
state_suffix=state_suffix,
state_id=state_id,
)


def extract_load_factory(
name: str,
extractor_name: str,
loader_name: str,
outs: Dict[str, Out],
):
@op(
name=name,
out=outs,
required_resource_keys={"meltano"},
tags={"kind": "singer"},
config_schema={
"full_refresh": Field(
bool,
default_value=False,
description="Whether to ignore existing state.",
),
"state_id": Field(
str,
is_required=False,
description="Use this field to overwrite the default generated state_id.",
),
"state_suffix": Field(
str,
is_required=False,
description="This will be appended to the Meltano state id. Ignored if `state_id` provided.",
),
},
)
def extract_load(context: OpExecutionContext):
log = context.log
meltano_resource: MeltanoResource = context.resources.meltano
environment = meltano_resource.environment

config = load_config_from_dagster(context)

if config.state_id == None:
config.state_id = generate_state_id(
environment,
extractor_name,
loader_name,
config.state_suffix,
)

repeat_handler = add_repeat_handler(
logger=logging.getLogger("meltano"),
dagster_logger=log,
)

select_filter = list(context.selected_output_names)
log.debug(f"Selected the following streams: {select_filter}")

job = Job(job_name=config.state_id)

with closing(meltano_resource.session()) as session:
plugins_service = meltano_resource.plugins_service
context_builder = _elt_context_builder(
project=meltano_resource.project,
job=job,
session=session,
extractor=extractor_name,
loader=loader_name,
transform="skip",
full_refresh=config.full_refresh,
select_filter=select_filter,
plugins_service=plugins_service,
).context()

job_logging_service = JobLoggingService(meltano_resource.project)

log_file = job_logging_service.generate_log_name(job.job_name, job.run_id)
output_logger = OutputLogger(log_file)

log.debug(f"Logging to file: {log_file}")

loop = asyncio.get_event_loop()
loop.run_until_complete(
_run_extract_load(
log,
context_builder,
output_logger,
)
)

record_counts = repeat_handler.metrics.record_counts
request_durations = repeat_handler.metrics.request_durations

for stream_name in context.selected_output_names:
metadata = {}

if stream_name in record_counts:
metadata["Records extracted"] = record_counts[stream_name]

if stream_name in request_durations:
metadata["Average request duration"] = Metrics.mean(request_durations[stream_name])

yield Output(
value=None,
output_name=stream_name,
metadata=metadata,
)

return extract_load
Loading

0 comments on commit ff07649

Please sign in to comment.