diff --git a/pipelinewise/cli/config.py b/pipelinewise/cli/config.py index 5e31ce259..4768aecf3 100644 --- a/pipelinewise/cli/config.py +++ b/pipelinewise/cli/config.py @@ -6,7 +6,7 @@ import sys import json -from typing import Dict, List +from typing import Dict from pipelinewise.utils import safe_column_name from . import utils @@ -247,9 +247,12 @@ def save_tap_jsons(self, target, tap, extra_config_keys=None): """ Generating JSON config files for a singer tap connector: 1. config.json :(Singer spec): Tap connection details + 2. properties.json :(Singer spec): Tap schema properties (generated) + 3. state.json :(Singer spec): Bookmark for incremental and log_based + replications 4. selection.json :(Pipelinewise): List of streams/tables to replicate - 5. inheritable_config.json :(Pipelinewise): Extra config keys for the linked + 5. inheritabel_config.json :(Pipelinewise): Extra config keys for the linked singer target connector that pipelinewise will pass at run time 6. transformation.json :(Pipelinewise): Column transformations between the @@ -257,19 +260,6 @@ def save_tap_jsons(self, target, tap, extra_config_keys=None): """ if extra_config_keys is None: extra_config_keys = {} - - # Generate tap config dict - tap_config = self.generate_tap_connection_config(tap, extra_config_keys) - - # Generate tap selection - tap_selection = {'selection': self.generate_selection(tap)} - - # Generate tap transformation - tap_transformation = {'transformations': self.generate_transformations(tap)} - - # Generate tap inheritable_config dict - tap_inheritable_config = self.generate_inheritable_config(tap) - tap_dir = self.get_tap_dir(target.get('id'), tap.get('id')) self.logger.info('SAVING TAP JSONS to %s', tap_dir) @@ -283,34 +273,14 @@ def save_tap_jsons(self, target, tap, extra_config_keys=None): if not os.path.exists(tap_dir): os.mkdir(tap_dir) - # Save the generated JSON files - utils.save_json(tap_config, tap_config_path) - utils.save_json(tap_inheritable_config, tap_inheritable_config_path) - utils.save_json(tap_transformation, tap_transformation_path) - utils.save_json(tap_selection, tap_selection_path) - - @classmethod - def generate_tap_connection_config(cls, tap: Dict, extra_config_keys: Dict) -> Dict: - """ - Generate tap connection config which is a merged dictionary of db_connection and optional extra_keys - Args: - tap: tap config - extra_config_keys: extra keys to add to the db conn config - Returns: Dictionary of tap connection config - """ - return {**tap.get('db_conn'), **extra_config_keys} + # Generate tap config dict: a merged dictionary of db_connection and optional extra_keys + tap_config = {**tap.get('db_conn'), **extra_config_keys} - @classmethod - def generate_selection(cls, tap: Dict) -> List[Dict]: - """ - Generate the selection data which is the list of selected streams and their replication method - Args: - tap: the tap config dictionary + # Get additional properties will be needed later to generate tap_stream_id + tap_dbname = tap_config.get('dbname') - Returns: List of dictionaries of selected streams - """ + # Generate tap selection selection = [] - for schema in tap.get('schemas', []): schema_name = schema.get('source_schema') for table in schema.get('tables', []): @@ -322,60 +292,52 @@ def generate_selection(cls, tap: Dict) -> List[Dict]: utils.delete_empty_keys( { 'tap_stream_id': utils.get_tap_stream_id( - tap, tap['db_conn'].get('dbname'), schema_name, table_name + tap, tap_dbname, schema_name, table_name ), 'replication_method': replication_method, # Add replication_key only if replication_method is INCREMENTAL 'replication_key': table.get('replication_key') - if replication_method == 'INCREMENTAL' else None, + if replication_method == 'INCREMENTAL' + else None, } ) ) + tap_selection = {'selection': selection} - return selection - - @classmethod - def generate_transformations(cls, tap: Dict) -> List[Dict]: - """ - Generate the transformations data from the given tap config - Args: - tap: the tap config dictionary - - Returns: List of transformations - """ + # Generate tap transformation transformations = [] - for schema in tap.get('schemas', []): schema_name = schema.get('source_schema') for table in schema.get('tables', []): table_name = table.get('table_name') for trans in table.get('transformations', []): + if 'external_type' in trans: + external_type = trans['external_type'] + else: + external_type = 'INTERNAL' + if 'param' in trans: + param = trans['param'] + else: + param = None transformations.append( { 'tap_stream_name': utils.get_tap_stream_name( - tap, tap['db_conn'].get('dbname'), schema_name, table_name), + tap, tap_dbname, schema_name, table_name + ), 'field_id': trans['column'], # Make column name safe by wrapping it in quotes, it's useful when a field_id is a reserved # word to be used by target snowflake in fastsync 'safe_field_id': safe_column_name(trans['column']), - 'field_paths': trans.get('field_paths'), 'type': trans['type'], + 'external_type': external_type, + 'param': param, 'when': trans.get('when'), } ) + tap_transformation = {'transformations': transformations} - return transformations - - def generate_inheritable_config(self, tap: Dict) -> Dict: - """ - Generate the inheritable config which is the custom config that should be fed to the target at runtime - Args: - tap: tap config - - Returns: Dictionary of config - """ + # Generate stream to schema mapping schema_mapping = {} - for schema in tap.get('schemas', []): source_schema = schema.get('source_schema') target_schema = schema.get('target_schema') @@ -437,15 +399,16 @@ def generate_inheritable_config(self, tap: Dict) -> Dict: # flatten the schema and data by creating columns automatically. When 'data_flattening_max_level' # is set to 0 then flattening functionality is turned off. # - # The value can be set in multiple place and evaluated in the following order: + #  The value can be set in mutliple place and evaluated in the following order: # ------------ # 1: First we try to find it in the tap YAML # 2: Second we try to get the tap type specific default value # 3: Otherwise we set flattening level to 0 (disabled) 'data_flattening_max_level': tap.get( 'data_flattening_max_level', - utils.get_tap_property(tap, 'default_data_flattening_max_level') or 0, - ), + utils.get_tap_property(tap, 'default_data_flattening_max_level') + or 0, + ), 'validate_records': tap.get('validate_records', False), 'add_metadata_columns': tap.get('add_metadata_columns', False), 'split_large_files': tap.get('split_large_files', False), @@ -461,4 +424,8 @@ def generate_inheritable_config(self, tap: Dict) -> Dict: } ) - return tap_inheritable_config + # Save the generated JSON files + utils.save_json(tap_config, tap_config_path) + utils.save_json(tap_inheritable_config, tap_inheritable_config_path) + utils.save_json(tap_transformation, tap_transformation_path) + utils.save_json(tap_selection, tap_selection_path) diff --git a/pipelinewise/cli/schemas/tap.json b/pipelinewise/cli/schemas/tap.json index a437e4b8f..661a18d85 100644 --- a/pipelinewise/cli/schemas/tap.json +++ b/pipelinewise/cli/schemas/tap.json @@ -38,19 +38,13 @@ "type": "object", "properties": { "column": { - "type": "string", - "minLength": 1 - }, - "field_path": { - "type": "string", - "minLength": 1 + "type": "string" }, "equals": { "type": ["null", "integer", "string", "boolean", "number"] } }, - "required": ["column", "equals"], - "additionalProperties": false + "required": ["column", "equals"] }, { "type": "object", @@ -60,10 +54,6 @@ }, "regex_match": { "type": "string" - }, - "field_path": { - "type": "string", - "minLength": 1 } }, "required": ["column", "regex_match"] @@ -132,19 +122,9 @@ }, "transformation": { "type": "object", - "additionalProperties": false, "properties": { "column": { - "type": "string", - "minLength": 1 - }, - "field_paths": { - "type": "array", - "items": { - "type": "string", - "minLength": 1 - }, - "minItems": 1 + "type": "string" }, "type": { "type": "string", @@ -174,6 +154,17 @@ "MASK-STRING-SKIP-ENDS-9" ] }, + "external_type": { + "type": "string", + "enum": [ + "NULL-OR-REDACTED", + "NULL-OR-REDACTED-SKIP-FIRST", + "INTERNAL" + ] + }, + "param": { + "type": "integer" + }, "when": { "type": "array", "items": { @@ -189,7 +180,6 @@ }, "s3_csv_mapping": { "type": "object", - "additionalProperties": false, "properties": { "search_prefix": { "type": "string" @@ -383,7 +373,7 @@ "split_file_max_chunks": { "type": "integer", "min": 1, - "max": 99999 + "max": 100 }, "schemas": { "type": "array", diff --git a/pipelinewise/fastsync/commons/transform_utils.py b/pipelinewise/fastsync/commons/transform_utils.py index d0cbd1950..b632546c8 100644 --- a/pipelinewise/fastsync/commons/transform_utils.py +++ b/pipelinewise/fastsync/commons/transform_utils.py @@ -32,6 +32,16 @@ class TransformationType(Enum): MASK_STRING_SKIP_ENDS_8 = 'MASK-STRING-SKIP-ENDS-8' MASK_STRING_SKIP_ENDS_9 = 'MASK-STRING-SKIP-ENDS-9' +@unique +class ExternalTransformationType(Enum): + """ + List of external supported transformation types + """ + + NULL_OR_REDACTED = 'NULL-OR-REDACTED' + NULL_OR_REDACTED_SKIP_FIRST = 'NULL-OR-REDACTED-SKIP-FIRST' + INTERNAL = 'INTERNAL' + @unique class SQLFlavor(Enum): @@ -78,6 +88,11 @@ def get_trans_in_sql_flavor( transform_type = TransformationType(trans_item['type']) + external_type = ExternalTransformationType(trans_item['external_type']) + + param = trans_item['param'] + + # Make the field id safe in case it's a reserved word column = cls.__safe_column(trans_item['field_id'], sql_flavor) @@ -92,13 +107,29 @@ def get_trans_in_sql_flavor( ) elif transform_type == TransformationType.HASH: - - trans_map.append( - { - 'trans': cls.__hash_to_sql(column, sql_flavor), - 'conditions': conditions, - } - ) + if external_type == ExternalTransformationType.INTERNAL: + trans_map.append( + { + 'trans': cls.__hash_to_sql(column, sql_flavor), + 'conditions': conditions, + } + ) + elif external_type == ExternalTransformationType.NULL_OR_REDACTED: + trans_map.append( + { + 'trans': cls.__null_or_redacted_to_sql(column, sql_flavor), + 'conditions': conditions, + } + + ) + elif external_type == ExternalTransformationType.NULL_OR_REDACTED_SKIP_FIRST: + trans_map.append( + { + 'trans': cls.__null_or_redacted_skip_first_to_sql(column, param, sql_flavor), + 'conditions': conditions, + } + + ) elif transform_type.value.startswith('HASH-SKIP-FIRST-'): @@ -141,6 +172,15 @@ def get_trans_in_sql_flavor( {'trans': f"{column} = 'hidden'", 'conditions': conditions} ) + elif transform_type == TransformationType.NULL_OR_REDACTED: + trans_map.append( + { + 'trans': cls.__null_or_redacted_to_sql( + column, sql_flavor + ), + 'conditions': conditions} + ) + return trans_map @classmethod @@ -388,3 +428,100 @@ def __mask_string_skip_ends_to_sql( f'not implemented!') return trans + + @classmethod + # pylint: disable=W0238 # False positive when it is used by another classmethod + def __mask_string_skip_ends_to_sql( + cls, transform_type: TransformationType, column: str, sql_flavor: SQLFlavor + ) -> str: + """ + convert MASK-STRING-SKIP-ENDS-n transformation into the right sql string + Args: + column: column to apply the masking to + sql_flavor: the sql flavor to use + + Raises: NotImplementedError if mask-string-skip-ends is not implemented for the given sql flavor + + Returns: sql string equivalent of the mask-string-skip-ends + """ + skip_ends_n = int(transform_type.value[-1]) + + if sql_flavor == SQLFlavor.SNOWFLAKE: + trans = '{0} = CASE WHEN LENGTH({0}) > 2 * {1} THEN ' \ + 'CONCAT(SUBSTRING({0}, 1, {1}), REPEAT(\'*\', LENGTH({0})-(2 * {1})), ' \ + 'SUBSTRING({0}, LENGTH({0})-{1}+1, {1})) ' \ + 'ELSE REPEAT(\'*\', LENGTH({0})) END'.format(column, skip_ends_n) + elif sql_flavor == SQLFlavor.POSTGRES: + trans = '{0} = CASE WHEN LENGTH({0}) > 2 * {1} THEN ' \ + 'CONCAT(SUBSTRING({0}, 1, {1}), REPEAT(\'*\', LENGTH({0})-(2 * {1})), ' \ + 'SUBSTRING({0}, LENGTH({0})-{1}+1, {1})) ' \ + 'ELSE REPEAT(\'*\', LENGTH({0})) END'.format(column, skip_ends_n) + elif sql_flavor == SQLFlavor.BIGQUERY: + trans = '{0} = CASE WHEN LENGTH({0}) > 2 * {1} THEN ' \ + 'CONCAT(SUBSTRING({0}, 1, {1}), REPEAT(\'*\', LENGTH({0})-(2 * {1})), ' \ + 'SUBSTRING({0}, LENGTH({0})-{1}+1, {1})) ' \ + 'ELSE REPEAT(\'*\', LENGTH({0})) END'.format(column, skip_ends_n) + else: + raise NotImplementedError(f'MASK-STRING-SKIP-ENDS transformation in {sql_flavor.value} SQL flavor ' + f'not implemented!') + + return trans + + @classmethod + # pylint: disable=W0238 # False positive when it is used by another classmethod + def __null_or_redacted_to_sql(cls, column: str, sql_flavor: SQLFlavor) -> str: + """ + convert NULL-OR-REDACTED to the right sql string + Args: + column: column to apply the hash to + sql_flavor: the sql flavor to use + + Raises: NotImplementedError if hash is not implemented for the given sql flavor + + Returns: sql string equivalent of the hash + """ + if sql_flavor == SQLFlavor.SNOWFLAKE: + trans = f'{column} = CASE WHEN {column} IS NULL THEN NULL ELSE \'\' END' + + elif sql_flavor == SQLFlavor.POSTGRES: + trans = f'{column} = CASE WHEN {column} IS NULL THEN NULL ELSE \'\' END' + + elif sql_flavor == SQLFlavor.BIGQUERY: + trans = f'{column} = CASE WHEN {column} IS NULL THEN NULL ELSE \'\' END' + + else: + raise NotImplementedError( + f'NULL-OR-REDACTED transformation in {sql_flavor.value} SQL flavor not implemented!' + ) + + return trans + + @classmethod + # pylint: disable=W0238 # False positive when it is used by another classmethod + def __null_or_redacted_skip_first_to_sql(cls, column: str, param: int, sql_flavor: SQLFlavor) -> str: + """ + convert NULL-OR-REDACTED-SKIP-FIRST to the right sql string + Args: + column: column to apply the hash to + param: number of characters to skip + sql_flavor: the sql flavor to use + + Raises: NotImplementedError if hash is not implemented for the given sql flavor + + Returns: sql string equivalent of the hash + """ + if sql_flavor == SQLFlavor.SNOWFLAKE: + trans = f'{column} = CASE WHEN {column} IS NULL THEN NULL ELSE SUBSTR({column}, 1, {param}) || \'\' END' + + elif sql_flavor == SQLFlavor.POSTGRES: + trans = f'{column} = CASE WHEN {column} IS NULL THEN NULL ELSE SUBSTR({column}, 1, {param}) || \'\' END' + + elif sql_flavor == SQLFlavor.BIGQUERY: + trans = f'{column} = CASE WHEN {column} IS NULL THEN NULL ELSE SUBSTR({column}, 1, {param}) || \'\' END' + + else: + raise NotImplementedError( + f'NULL-OR-REDACTED transformation in {sql_flavor.value} SQL flavor not implemented!' + ) + + return trans