Skip to content

Commit

Permalink
Create meltano-map-transform (#1)
Browse files Browse the repository at this point in the history
* first commit

* Update readme

* Fix docstring in activate_version mapping method

Co-authored-by: Aaron ("AJ") Steers <[email protected]>

* Update meltano_map_transform/mapper.py

Co-authored-by: Aaron ("AJ") Steers <[email protected]>

* fix: map activate_version message to all stream aliases and duplicates

* use SDK 0.3.18

* add dependabot

Co-authored-by: Aaron ("AJ") Steers <[email protected]>
  • Loading branch information
edgarrmondragon and aaronsteers authored Jan 14, 2022
1 parent 51825bf commit fc779f8
Show file tree
Hide file tree
Showing 9 changed files with 1,566 additions and 2 deletions.
19 changes: 19 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
# https://help.github.com/github/administering-a-repository/configuration-options-for-dependency-updates

version: 2
updates:
- package-ecosystem: "pip"
directory: "/"
schedule:
interval: "weekly"
time: "13:00"
day: "monday"
timezone: "US/Central"
reviewers:
- "edgarrmondragon"
- "aaronsteers"
labels:
- "dependencies"
38 changes: 38 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
### A CI workflow template that runs linting and python testing

name: Test

on: [push]

jobs:
linting:

runs-on: ubuntu-latest
env:
SETUPTOOLS_USE_DISTUTILS: stdlib
strategy:
matrix:
# Only lint using the primary version used for dev
python-version: ["3.9"]

steps:
- name: Checkout code
uses: actions/checkout@v2

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}

- name: Install Poetry
uses: snok/install-poetry@v1
with:
version: 1.1.12

- name: Install dependencies
run: |
pip install tox==3.24.4
- name: Run lint command from tox.ini
run: |
tox -e lint
133 changes: 133 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Ignore meltano internal cache and sqlite systemdb

.meltano/

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
.python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/
15 changes: 13 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
# meltano-map-transform
# `meltano-map-transformer`

A map transformer which implements the [`Stream Maps` capability](https://sdk.meltano.com/en/latest/stream_maps.html) from Meltano's tap and target SDK: https://sdk.meltano.com/

This mapper plugin is fully compliant with the Singer Spec and can be placed in between any Singer tap and target.

Status: Under Development
## Capabilities

* `stream-maps`

## Settings

| Setting | Required | Default | Description |
|:------------|:--------:|:-------:|:------------|
| stream_maps | True | None | Stream maps |

A full list of supported settings and capabilities is available by running: `meltano-map-transformer --about`
4 changes: 4 additions & 0 deletions meltano_map_transform/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""A map transformer which implements the Stream Maps capability.
Based on Meltano's tap and target SDK.
"""
153 changes: 153 additions & 0 deletions meltano_map_transform/mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
"""A sample inline mapper app."""

from pathlib import PurePath
from typing import Generator, List, Optional, Union

import singer
import singer_sdk.typing as th
from singer_sdk.helpers._util import utc_now
from singer_sdk.mapper import PluginMapper
from singer_sdk.mapper_base import InlineMapper


class StreamTransform(InlineMapper):
"""A map transformer which implements the Stream Maps capability."""

name = "meltano-map-transformer"

config_jsonschema = th.PropertiesList(
th.Property(
"stream_maps",
th.ObjectType(
additional_properties=th.CustomType(
{
"type": ["object", "string", "null"],
"properties": {
"__filter__": {"type": ["string", "null"]},
"__source__": {"type": ["string", "null"]},
"__else__": {"type": ["null"]},
"__key_properties__": {
"type": ["array", "null"],
"items": {"type": "string"},
},
},
"additionalProperties": {"type": ["string", "null"]},
}
)
),
required=True,
description="Stream maps",
)
).to_dict()

def __init__(
self,
config: Optional[Union[dict, PurePath, str, List[Union[PurePath, str]]]] = None,
parse_env_config: bool = False,
validate_config: bool = True,
) -> None:
"""Create a new inline mapper.
Args:
config: Mapper configuration. Can be a dictionary, a single path to a
configuration file, or a list of paths to multiple configuration
files.
parse_env_config: Whether to look for configuration values in environment
variables.
validate_config: True to require validation of config settings.
"""
super().__init__(
config=config,
parse_env_config=parse_env_config,
validate_config=validate_config,
)

self.mapper = PluginMapper(plugin_config=dict(self.config), logger=self.logger)

def map_schema_message(
self,
message_dict: dict,
) -> Generator[singer.Message, None, None]:
"""Map a schema message according to config.
Args:
message_dict: A SCHEMA message JSON dictionary.
Yields:
Transformed schema messages.
"""
self._assert_line_requires(message_dict, requires={"stream", "schema"})

stream_id: str = message_dict["stream"]
self.mapper.register_raw_stream_schema(
stream_id,
message_dict["schema"],
message_dict.get("key_properties", []),
)
for stream_map in self.mapper.stream_maps[stream_id]:
schema_message = singer.SchemaMessage(
stream_map.stream_alias,
stream_map.transformed_schema,
stream_map.transformed_key_properties,
message_dict.get("bookmark_keys", []),
)
yield schema_message

def map_record_message(
self,
message_dict: dict,
) -> Generator[singer.Message, None, None]:
"""Map a record message according to config.
Args:
message_dict: A RECORD message JSON dictionary.
Yields:
Transformed record messages.
"""
self._assert_line_requires(message_dict, requires={"stream", "record"})

stream_id: str = message_dict["stream"]
for stream_map in self.mapper.stream_maps[stream_id]:
mapped_record = stream_map.transform(message_dict["record"])
if mapped_record is not None:
record_message = singer.RecordMessage(
stream=stream_map.stream_alias,
record=mapped_record,
version=message_dict.get("version"),
time_extracted=utc_now(),
)
self.logger.info(stream_map.stream_alias)
yield record_message

def map_state_message(self, message_dict: dict) -> List[singer.Message]:
"""Do nothing to the message.
Args:
message_dict: A STATE message JSON dictionary.
Returns:
The same state message
"""
return [singer.StateMessage(value=message_dict["value"])]

def map_activate_version_message(
self,
message_dict: dict,
) -> Generator[singer.Message, None, None]:
"""Duplicate the message or alias the stream name as defined in configuration.
Args:
message_dict: An ACTIVATE_VERSION message JSON dictionary.
Yields:
An ACTIVATE_VERSION for each duplicated or aliased stream.
"""
self._assert_line_requires(message_dict, requires={"stream", "version"})

stream_id: str = message_dict["stream"]
for stream_map in self.mapper.stream_maps[stream_id]:
yield singer.ActivateVersionMessage(
stream=stream_map.stream_alias,
version=message_dict["version"],
)
Loading

0 comments on commit fc779f8

Please sign in to comment.