Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overload HASH transfrom to cheat checks #914

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 38 additions & 71 deletions pipelinewise/cli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import sys
import json

from typing import Dict, List
from typing import Dict

from pipelinewise.utils import safe_column_name
from . import utils
Expand Down Expand Up @@ -247,29 +247,19 @@ def save_tap_jsons(self, target, tap, extra_config_keys=None):
"""
Generating JSON config files for a singer tap connector:
1. config.json :(Singer spec): Tap connection details
2. properties.json :(Singer spec): Tap schema properties (generated)
3. state.json :(Singer spec): Bookmark for incremental and log_based
replications

4. selection.json :(Pipelinewise): List of streams/tables to replicate
5. inheritable_config.json :(Pipelinewise): Extra config keys for the linked
5. inheritabel_config.json :(Pipelinewise): Extra config keys for the linked
singer target connector that
pipelinewise will pass at run time
6. transformation.json :(Pipelinewise): Column transformations between the
tap and target
"""
if extra_config_keys is None:
extra_config_keys = {}

# Generate tap config dict
tap_config = self.generate_tap_connection_config(tap, extra_config_keys)

# Generate tap selection
tap_selection = {'selection': self.generate_selection(tap)}

# Generate tap transformation
tap_transformation = {'transformations': self.generate_transformations(tap)}

# Generate tap inheritable_config dict
tap_inheritable_config = self.generate_inheritable_config(tap)

tap_dir = self.get_tap_dir(target.get('id'), tap.get('id'))
self.logger.info('SAVING TAP JSONS to %s', tap_dir)

Expand All @@ -283,34 +273,14 @@ def save_tap_jsons(self, target, tap, extra_config_keys=None):
if not os.path.exists(tap_dir):
os.mkdir(tap_dir)

# Save the generated JSON files
utils.save_json(tap_config, tap_config_path)
utils.save_json(tap_inheritable_config, tap_inheritable_config_path)
utils.save_json(tap_transformation, tap_transformation_path)
utils.save_json(tap_selection, tap_selection_path)

@classmethod
def generate_tap_connection_config(cls, tap: Dict, extra_config_keys: Dict) -> Dict:
"""
Generate tap connection config which is a merged dictionary of db_connection and optional extra_keys
Args:
tap: tap config
extra_config_keys: extra keys to add to the db conn config
Returns: Dictionary of tap connection config
"""
return {**tap.get('db_conn'), **extra_config_keys}
# Generate tap config dict: a merged dictionary of db_connection and optional extra_keys
tap_config = {**tap.get('db_conn'), **extra_config_keys}

@classmethod
def generate_selection(cls, tap: Dict) -> List[Dict]:
"""
Generate the selection data which is the list of selected streams and their replication method
Args:
tap: the tap config dictionary
# Get additional properties will be needed later to generate tap_stream_id
tap_dbname = tap_config.get('dbname')

Returns: List of dictionaries of selected streams
"""
# Generate tap selection
selection = []

for schema in tap.get('schemas', []):
schema_name = schema.get('source_schema')
for table in schema.get('tables', []):
Expand All @@ -322,60 +292,52 @@ def generate_selection(cls, tap: Dict) -> List[Dict]:
utils.delete_empty_keys(
{
'tap_stream_id': utils.get_tap_stream_id(
tap, tap['db_conn'].get('dbname'), schema_name, table_name
tap, tap_dbname, schema_name, table_name
),
'replication_method': replication_method,
# Add replication_key only if replication_method is INCREMENTAL
'replication_key': table.get('replication_key')
if replication_method == 'INCREMENTAL' else None,
if replication_method == 'INCREMENTAL'
else None,
}
)
)
tap_selection = {'selection': selection}

return selection

@classmethod
def generate_transformations(cls, tap: Dict) -> List[Dict]:
"""
Generate the transformations data from the given tap config
Args:
tap: the tap config dictionary

Returns: List of transformations
"""
# Generate tap transformation
transformations = []

for schema in tap.get('schemas', []):
schema_name = schema.get('source_schema')
for table in schema.get('tables', []):
table_name = table.get('table_name')
for trans in table.get('transformations', []):
if 'external_type' in trans:
external_type = trans['external_type']
else:
external_type = 'INTERNAL'
if 'param' in trans:
param = trans['param']
else:
param = None
transformations.append(
{
'tap_stream_name': utils.get_tap_stream_name(
tap, tap['db_conn'].get('dbname'), schema_name, table_name),
tap, tap_dbname, schema_name, table_name
),
'field_id': trans['column'],
# Make column name safe by wrapping it in quotes, it's useful when a field_id is a reserved
# word to be used by target snowflake in fastsync
'safe_field_id': safe_column_name(trans['column']),
'field_paths': trans.get('field_paths'),
'type': trans['type'],
'external_type': external_type,
'param': param,
'when': trans.get('when'),
}
)
tap_transformation = {'transformations': transformations}

return transformations

def generate_inheritable_config(self, tap: Dict) -> Dict:
"""
Generate the inheritable config which is the custom config that should be fed to the target at runtime
Args:
tap: tap config

Returns: Dictionary of config
"""
# Generate stream to schema mapping
schema_mapping = {}

for schema in tap.get('schemas', []):
source_schema = schema.get('source_schema')
target_schema = schema.get('target_schema')
Expand Down Expand Up @@ -437,15 +399,16 @@ def generate_inheritable_config(self, tap: Dict) -> Dict:
# flatten the schema and data by creating columns automatically. When 'data_flattening_max_level'
# is set to 0 then flattening functionality is turned off.
#
# The value can be set in multiple place and evaluated in the following order:
#  The value can be set in mutliple place and evaluated in the following order:
# ------------
# 1: First we try to find it in the tap YAML
# 2: Second we try to get the tap type specific default value
# 3: Otherwise we set flattening level to 0 (disabled)
'data_flattening_max_level': tap.get(
'data_flattening_max_level',
utils.get_tap_property(tap, 'default_data_flattening_max_level') or 0,
),
utils.get_tap_property(tap, 'default_data_flattening_max_level')
or 0,
),
'validate_records': tap.get('validate_records', False),
'add_metadata_columns': tap.get('add_metadata_columns', False),
'split_large_files': tap.get('split_large_files', False),
Expand All @@ -461,4 +424,8 @@ def generate_inheritable_config(self, tap: Dict) -> Dict:
}
)

return tap_inheritable_config
# Save the generated JSON files
utils.save_json(tap_config, tap_config_path)
utils.save_json(tap_inheritable_config, tap_inheritable_config_path)
utils.save_json(tap_transformation, tap_transformation_path)
utils.save_json(tap_selection, tap_selection_path)
40 changes: 15 additions & 25 deletions pipelinewise/cli/schemas/tap.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,13 @@
"type": "object",
"properties": {
"column": {
"type": "string",
"minLength": 1
},
"field_path": {
"type": "string",
"minLength": 1
"type": "string"
},
"equals": {
"type": ["null", "integer", "string", "boolean", "number"]
}
},
"required": ["column", "equals"],
"additionalProperties": false
"required": ["column", "equals"]
},
{
"type": "object",
Expand All @@ -60,10 +54,6 @@
},
"regex_match": {
"type": "string"
},
"field_path": {
"type": "string",
"minLength": 1
}
},
"required": ["column", "regex_match"]
Expand Down Expand Up @@ -132,19 +122,9 @@
},
"transformation": {
"type": "object",
"additionalProperties": false,
"properties": {
"column": {
"type": "string",
"minLength": 1
},
"field_paths": {
"type": "array",
"items": {
"type": "string",
"minLength": 1
},
"minItems": 1
"type": "string"
},
"type": {
"type": "string",
Expand Down Expand Up @@ -174,6 +154,17 @@
"MASK-STRING-SKIP-ENDS-9"
]
},
"external_type": {
"type": "string",
"enum": [
"NULL-OR-REDACTED",
"NULL-OR-REDACTED-SKIP-FIRST",
"INTERNAL"
]
},
"param": {
"type": "integer"
},
"when": {
"type": "array",
"items": {
Expand All @@ -189,7 +180,6 @@
},
"s3_csv_mapping": {
"type": "object",
"additionalProperties": false,
"properties": {
"search_prefix": {
"type": "string"
Expand Down Expand Up @@ -383,7 +373,7 @@
"split_file_max_chunks": {
"type": "integer",
"min": 1,
"max": 99999
"max": 100
},
"schemas": {
"type": "array",
Expand Down
Loading