Skip to content

Commit

Permalink
Sync wait, configuration and documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
Mike Gouline committed Dec 12, 2019
1 parent 02fa7be commit 69feac5
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 15 deletions.
32 changes: 28 additions & 4 deletions dbtmetabase/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
def export(dbt_path: str,
mb_host: str, mb_user: str, mb_password: str,
database: str, schema: str,
sync = True, sync_timeout_secs = 30):
sync = True, sync_timeout = 30):
"""Exports models from dbt to Metabase.
Arguments:
Expand All @@ -21,14 +21,14 @@ def export(dbt_path: str,
Keyword Arguments:
sync {bool} -- Synchronize Metabase database before export. (default: {True})
sync_timeout_secs {int} -- Synchronization timeout in seconds. (default: {30})
sync_timeout {int} -- Synchronization timeout in seconds. (default: {30})
"""

mbc = MetabaseClient(mb_host, mb_user, mb_password)
models = DbtReader(dbt_path).read_models()

if sync:
if not mbc.sync_and_wait(database, schema, models, sync_timeout_secs):
if not mbc.sync_and_wait(database, schema, models, sync_timeout):
logging.critical("Sync timeout reached, models still not compatible")
return

Expand All @@ -39,4 +39,28 @@ def main(args: list = None):

logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO)

# TODO: argparse here
parser = argparse.ArgumentParser(
description='Model synchronization from dbt to Metabase.'
)
parser.add_argument('command', choices=['export'], help="command to execute")
parser.add_argument('--dbt_path', metavar='PATH', required=True, help="path to dbt project")
parser.add_argument('--mb_host', metavar='HOST', required=True, help="Metabase hostname")
parser.add_argument('--mb_user', metavar='USER', required=True, help="Metabase username")
parser.add_argument('--mb_password', metavar='PASS', required=True, help="Metabase password")
parser.add_argument('--database', metavar='DB', required=True, help="target database name")
parser.add_argument('--schema', metavar='SCHEMA', required=True, help="target schema name")
parser.add_argument('--sync', metavar='ENABLE', type=bool, default=True, help="synchronize Metabase database before export")
parser.add_argument('--sync_timeout', metavar='SECS', type=int, default=30, help="synchronization timeout (in secs)")
parsed = parser.parse_args(args=args)

if parsed.command == 'export':
export(
dbt_path=parsed.dbt_path,
mb_host=parsed.mb_host,
mb_user=parsed.mb_user,
mb_password=parsed.mb_password,
database=parsed.database,
schema=parsed.schema,
sync=parsed.sync,
sync_timeout=parsed.sync_timeout
)
File renamed without changes.
41 changes: 41 additions & 0 deletions dbtmetabase/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,25 @@
import re

class DbtReader:
"""Reader for dbt project configuration.
"""

def __init__(self, project_path: str):
"""Constructor.
Arguments:
project_path {str} -- Path to dbt project root.
"""

self.project_path = project_path

def read_models(self) -> list:
"""Reads dbt models in Metabase-friendly format.
Returns:
list -- List of dbt models in Metabase-friendly format.
"""

mb_models = []

for path in (self.project_path / 'models').rglob('*.yml'):
Expand All @@ -18,6 +32,15 @@ def read_models(self) -> list:
return mb_models

def read_model(self, model: dict) -> dict:
"""Reads one dbt model in Metabase-friendly format.
Arguments:
model {dict} -- One dbt model to read.
Returns:
dict -- One dbt model in Metabase-friendly format.
"""

mb_columns = []

for column in model.get('columns', []):
Expand All @@ -30,6 +53,15 @@ def read_model(self, model: dict) -> dict:
}

def read_column(self, column: dict) -> dict:
"""Reads one dbt column in Metabase-friendly format.
Arguments:
column {dict} -- One dbt column to read.
Returns:
dict -- One dbt column in Metabase-friendly format.
"""

mb_column = {
'name': column.get('name', '').upper(),
'description': column.get('description')
Expand All @@ -51,6 +83,15 @@ def read_column(self, column: dict) -> dict:

@staticmethod
def parse_ref(text: str) -> str:
"""Parses dbt ref() statement.
Arguments:
text {str} -- Full statement in dbt YAML.
Returns:
str -- Name of the reference.
"""

matches = re.findall(r"ref\(['\"]([\w\_\-\ ]+)['\"]\)", text)
if matches:
return matches[0]
Expand Down
125 changes: 120 additions & 5 deletions dbtmetabase/metabase.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,57 @@
from typing import Any

class MetabaseClient:
"""Metabase API client.
"""

_SYNC_PERIOD_SECS = 5

def __init__(self, host: str, user: str, password: str):
"""Constructor.
Arguments:
host {str} -- Metabase hostname.
user {str} -- Metabase username.
password {str} -- Metabase password.
"""

self.host = host
self.session_id = self.get_session_id(user, password)
logging.info("Session established successfully")

def get_session_id(self, user: str, password: str) -> str:
"""Obtains new session ID from API.
Arguments:
user {str} -- Metabase username.
password {str} -- Metabase password.
Returns:
str -- Session ID.
"""

return self.api('post', '/api/session', authenticated=False, json={
'username': user,
'password': password
})['id']

def sync_and_wait(self, database: str, schema: str, models: list, timeout_secs = 30) -> bool:
if timeout_secs < self._SYNC_PERIOD_SECS:
logging.critical("Timeout provided %d secs, must be at least %d", timeout_secs, self._SYNC_PERIOD_SECS)
def sync_and_wait(self, database: str, schema: str, models: list, timeout = 30) -> bool:
"""Synchronize with the database and wait for schema compatibility.
Arguments:
database {str} -- Metabase database name.
schema {str} -- Metabase schema name.
models {list} -- List of dbt models read from project.
Keyword Arguments:
timeout {int} -- Timeout before giving up in seconds. (default: {30})
Returns:
bool -- True if schema compatible with models, false if still incompatible.
"""

if timeout < self._SYNC_PERIOD_SECS:
logging.critical("Timeout provided %d secs, must be at least %d", timeout, self._SYNC_PERIOD_SECS)
return

database_id = self.find_database_id(database)
Expand All @@ -32,16 +66,29 @@ def sync_and_wait(self, database: str, schema: str, models: list, timeout_secs =

self.api('post', f'/api/database/{database_id}/sync')

deadline = int(time.time()) + timeout
sync_successful = False
while True:
sync_successful = self.models_compatible(database_id, schema, models)
if not sync_successful: # TODO and timeout budget not reached
time_after_wait = int(time.time()) + self._SYNC_PERIOD_SECS
if not sync_successful and time_after_wait <= deadline:
time.sleep(self._SYNC_PERIOD_SECS)
else:
break
return sync_successful

def models_compatible(self, database_id: str, schema: str, models: list) -> bool:
"""Checks if models compatible with the Metabase database schema.
Arguments:
database_id {str} -- Metabase database ID.
schema {str} -- Metabase schema name.
models {list} -- List of dbt models read from project.
Returns:
bool -- True if schema compatible with models, false otherwise.
"""

field_lookup = self.build_field_lookup(database_id, schema)

for model in models:
Expand All @@ -58,6 +105,14 @@ def models_compatible(self, database_id: str, schema: str, models: list) -> bool
return True

def export_models(self, database: str, schema: str, models: list):
"""Exports dbt models to Metabase database schema.
Arguments:
database {str} -- Metabase database name.
schema {str} -- Metabase schema name.
models {list} -- List of dbt models read from project.
"""

database_id = self.find_database_id(database)
if not database_id:
logging.critical("Cannot find database by name %s", database)
Expand All @@ -70,6 +125,14 @@ def export_models(self, database: str, schema: str, models: list):
self.export_model(model, table_lookup, field_lookup)

def export_model(self, model: dict, table_lookup: dict, field_lookup: dict):
"""Exports one dbt model to Metabase database schema.
Arguments:
model {dict} -- One dbt model read from project.
table_lookup {dict} -- Dictionary of Metabase tables indexed by name.
field_lookup {dict} -- Dictionary of Metabase fields indexed by name, indexed by table name.
"""

model_name = model['name'].upper()

api_table = table_lookup.get(model_name)
Expand All @@ -89,7 +152,15 @@ def export_model(self, model: dict, table_lookup: dict, field_lookup: dict):
for column in model.get('columns', []):
self.export_column(model_name, column, field_lookup)

def export_column(self, model_name: str, column: dict, field_lookup: dict):
def export_column(self, model_name: str, column: dict, field_lookup: dict):
"""Exports one dbt column to Metabase database schema.
Arguments:
model_name {str} -- One dbt model name read from project.
column {dict} -- One dbt column read from project.
field_lookup {dict} -- Dictionary of Metabase fields indexed by name, indexed by table name.
"""

column_name = column['name'].upper()

field = field_lookup.get(model_name, {}).get(column_name)
Expand Down Expand Up @@ -119,12 +190,31 @@ def export_column(self, model_name: str, column: dict, field_lookup: dict):
logging.info("Field %s.%s is up-to-date", model_name, column_name)

def find_database_id(self, name: str) -> str:
"""Finds Metabase database ID by name.
Arguments:
name {str} -- Metabase database name.
Returns:
str -- Metabase database ID.
"""

for database in self.api('get', '/api/database'):
if database['name'].upper() == name.upper():
return database['id']
return None

def build_table_lookup(self, database_id: str, schema: str) -> dict:
"""Builds table lookup.
Arguments:
database_id {str} -- Metabase database ID.
schema {str} -- Metabase schema name.
Returns:
dict -- Dictionary of tables indexed by name.
"""

lookup = {}

for table in self.api('get', f'/api/table'):
Expand All @@ -137,6 +227,16 @@ def build_table_lookup(self, database_id: str, schema: str) -> dict:
return lookup

def build_field_lookup(self, database_id: str, schema: str) -> dict:
"""Builds field lookup.
Arguments:
database_id {str} -- Metabase database ID.
schema {str} -- Metabase schema name.
Returns:
dict -- Dictionary of fields indexed by name, indexed by table name.
"""

lookup = {}

for field in self.api('get', f'/api/database/{database_id}/fields'):
Expand All @@ -156,6 +256,20 @@ def build_field_lookup(self, database_id: str, schema: str) -> dict:
return lookup

def api(self, method: str, path: str, authenticated = True, critical = True, **kwargs) -> Any:
"""Unified way of calling Metabase API.
Arguments:
method {str} -- HTTP verb, e.g. get, post, put.
path {str} -- Relative path of endpoint, e.g. /api/database.
Keyword Arguments:
authenticated {bool} -- Includes session ID when true. (default: {True})
critical {bool} -- Raise on any HTTP errors. (default: {True})
Returns:
Any -- JSON payload of the endpoint.
"""

headers = {}
if 'headers' not in kwargs:
kwargs['headers'] = headers
Expand All @@ -171,3 +285,4 @@ def api(self, method: str, path: str, authenticated = True, critical = True, **k
elif not response.ok:
return False
return json.loads(response.text)

7 changes: 1 addition & 6 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,8 @@
license='MIT License',
packages=find_packages(exclude=['tests']),
test_suite='tests',
entry_points={
'console_scripts': [
'dbtmetabase = dbtmetabase:main'
]
},
scripts=[
'dbtmetabase/scripts/dbtmetabase'
'dbtmetabase/bin/dbt-metabase'
],
tests_require=test_requires,
install_requires=requires,
Expand Down

0 comments on commit 69feac5

Please sign in to comment.