This package contains a lightweight ETL framework with a focus on transparency and complexity reduction. It has a number of baked-in assumptions/ principles:
-
Data integration pipelines as code: pipelines, tasks and commands are created using declarative Python code.
-
PostgreSQL as a data processing engine.
-
Extensive web ui. The web browser as the main tool for inspecting, running and debugging pipelines.
-
GNU make semantics. Nodes depend on the completion of upstream nodes. No data dependencies or data flows.
-
No in-app data processing: command line tools as the main tool for interacting with databases and data.
-
Single machine pipeline execution based on Python's multiprocessing. No need for distributed task queues. Easy debugging and output logging.
-
Cost based priority queues: nodes with higher cost (based on recorded run times) are run first.
To use the library directly, use pip:
pip install --process-dependency-links git+https://github.com/mara/data-integration.git
For an example of an integration into a flask application, have a look at the mara example project.
Here is a pipeline "demo" consisting of three nodes that depend on each other: the task ping_localhost
, the pipeline sub_pipeline
and the task sleep
:
from data_integration.commands.bash import RunBash
from data_integration.pipelines import Pipeline, Task
from data_integration.ui.cli import run_pipeline, run_interactively
pipeline = Pipeline(
id='demo',
description='A small pipeline that demonstrates the interplay between pipelines, tasks and commands')
pipeline.add(Task(id='ping_localhost', description='Pings localhost',
commands=[RunBash('ping -c 3 localhost')]))
sub_pipeline = Pipeline(id='sub_pipeline', description='Pings a number of hosts')
for host in ['google', 'amazon', 'facebook']:
sub_pipeline.add(Task(id=f'ping_{host}', description=f'Pings {host}',
commands=[RunBash(f'ping -c 3 {host}.com')]))
sub_pipeline.add_dependency('ping_amazon', 'ping_facebook')
sub_pipeline.add(Task(id='ping_foo', description='Pings foo',
commands=[RunBash('ping foo')]), ['ping_amazon'])
pipeline.add(sub_pipeline, ['ping_localhost'])
pipeline.add(Task(id='sleep', description='Sleeps for 2 seconds',
commands=[RunBash('sleep 2')]), ['sub_pipeline'])
Tasks contain lists of commands, which do the actual work (in this case running bash commands that ping various hosts).
In order to run the pipeline, a PostgreSQL database needs to be configured for storing run-time information, run output and status of incremental processing:
import mara_db.auto_migration
import mara_db.config
import mara_db.dbs
mara_db.config.databases \
= lambda: {'mara': mara_db.dbs.PostgreSQLDB(host='localhost', user='root', database='example_etl_mara')}
mara_db.auto_migration.auto_discover_models_and_migrate()
Given that PostgresSQL is running and the credentials work, the output looks like this (a database with a number of tables is created):
Created database "postgresql+psycopg2://root@localhost/example_etl_mara"
CREATE TABLE data_integration_file_dependency (
node_path TEXT[] NOT NULL,
dependency_type VARCHAR NOT NULL,
hash VARCHAR,
timestamp TIMESTAMP WITHOUT TIME ZONE,
PRIMARY KEY (node_path, dependency_type)
);
.. more tables
This runs a pipeline with output to stdout:
from data_integration.ui.cli import run_pipeline
run_pipeline(pipeline)
And this runs a single node of pipeline sub_pipeline
together with all the nodes that it depends on:
run_pipeline(sub_pipeline, nodes=[sub_pipeline.nodes['ping_amazon']], with_upstreams=True)
And finally, there is some sort of menu based on pythondialog that allows to navigate and run pipelines like this:
from data_integration.ui.cli import run_interactively
run_interactively()
More importantly, this package provides an extensive web interface. It can be easily integrated into any Flask based app and the mara example project demonstrates how to do this using mara-app.
For each pipeline, there is a page that shows
- a graph of all child nodes and the dependencies between them
- a chart of the overal run time of the pipeline and it's most expensive nodes over the last 30 days (configurable)
- a table of all the pipeline's nodes with their average run times and the resulting queuing priority
- output and timeline for the last runs of the pipeline
For each task, there is a page showing
- the upstreams and downstreams of the task in the pipeline
- the run times of the task in the last 30 days
- all commands of the task
- output of the last runs of the task
Pipelines and tasks can be run from the web ui directly, which is probably one of the main features of this package:
Documentation is currently work in progress. Please use the mara example project as a reference for getting started.