diff --git a/tap_mysql/discover_utils.py b/tap_mysql/discover_utils.py index 1d02263..01eb636 100644 --- a/tap_mysql/discover_utils.py +++ b/tap_mysql/discover_utils.py @@ -12,10 +12,8 @@ from tap_mysql.connection import connect_with_backoff from tap_mysql.sync_strategies import common - LOGGER = get_logger('tap_mysql') - Column = collections.namedtuple('Column', [ "table_schema", "table_name", @@ -27,7 +25,6 @@ "column_type", "column_key"]) - pymysql.converters.conversions[pendulum.Pendulum] = pymysql.converters.escape_datetime STRING_TYPES = {'char', 'enum', 'longtext', 'mediumtext', 'text', 'varchar'} @@ -42,7 +39,7 @@ FLOAT_TYPES = {'float', 'double'} -DATETIME_TYPES = {'datetime', 'timestamp', 'date', 'time'} +DATETIME_TYPES = {'datetime', 'timestamp', 'date'} BINARY_TYPES = {'binary', 'varbinary'} @@ -164,8 +161,10 @@ def discover_catalog(mysql_conn: Dict, dbs: str = None, tables: Optional[str] = return Catalog(entries) -def schema_for_column(column): + +def schema_for_column(column): # pylint: disable=too-many-branches """Returns the Schema object for the given Column.""" + data_type = column.data_type.lower() column_type = column.column_type.lower() @@ -208,6 +207,10 @@ def schema_for_column(column): result.type = ['null', 'string'] result.format = 'date-time' + elif data_type == 'time': + result.type = ['null', 'string'] + result.format = 'time' + elif data_type in BINARY_TYPES: result.type = ['null', 'string'] result.format = 'binary' @@ -273,6 +276,7 @@ def resolve_catalog(discovered_catalog, streams_to_sync): return result + def desired_columns(selected, table_schema): '''Return the set of column names we need to include in the SELECT. diff --git a/tap_mysql/sync_strategies/binlog.py b/tap_mysql/sync_strategies/binlog.py index 5f11282..98732a5 100644 --- a/tap_mysql/sync_strategies/binlog.py +++ b/tap_mysql/sync_strategies/binlog.py @@ -136,8 +136,11 @@ def row_to_singer_record(catalog_entry, version, db_column_map, row, time_extrac for column_name, val in row.items(): property_type = catalog_entry.schema.properties[column_name].type + property_format = catalog_entry.schema.properties[column_name].format db_column_type = db_column_map.get(column_name) + LOGGER.info('%s %s %s %s',column_name, val, property_type, db_column_type) + if isinstance(val, datetime.datetime): if db_column_type in MYSQL_TIMESTAMP_TYPES: # The mysql-replication library creates datetimes from TIMESTAMP columns using fromtimestamp which @@ -155,8 +158,12 @@ def row_to_singer_record(catalog_entry, version, db_column_map, row, time_extrac row_to_persist[column_name] = val.isoformat() + 'T00:00:00+00:00' elif isinstance(val, datetime.timedelta): - timedelta_from_epoch = datetime.datetime.utcfromtimestamp(0) + val - row_to_persist[column_name] = timedelta_from_epoch.isoformat() + '+00:00' + if property_format == 'time': + # this should convert time column into 'HH:MM:SS' formatted string + row_to_persist[column_name] = str(val) + else: + timedelta_from_epoch = datetime.datetime.utcfromtimestamp(0) + val + row_to_persist[column_name] = timedelta_from_epoch.isoformat() + '+00:00' elif db_column_type == FIELD_TYPE.JSON: row_to_persist[column_name] = json.dumps(json_bytes_to_string(val)) diff --git a/tap_mysql/sync_strategies/common.py b/tap_mysql/sync_strategies/common.py index 3e8944c..1a4c9d5 100644 --- a/tap_mysql/sync_strategies/common.py +++ b/tap_mysql/sync_strategies/common.py @@ -89,6 +89,8 @@ def row_to_singer_record(catalog_entry, version, row, columns, time_extracted): row_to_persist = () for idx, elem in enumerate(row): property_type = catalog_entry.schema.properties[columns[idx]].type + property_format = catalog_entry.schema.properties[columns[idx]].format + if isinstance(elem, datetime.datetime): row_to_persist += (elem.isoformat() + '+00:00',) @@ -96,9 +98,12 @@ def row_to_singer_record(catalog_entry, version, row, columns, time_extracted): row_to_persist += (elem.isoformat() + 'T00:00:00+00:00',) elif isinstance(elem, datetime.timedelta): - epoch = datetime.datetime.utcfromtimestamp(0) - timedelta_from_epoch = epoch + elem - row_to_persist += (timedelta_from_epoch.isoformat() + '+00:00',) + if property_format == 'time': + row_to_persist=(str(elem),) # this should convert time column into 'HH:MM:SS' formatted string + else: + epoch = datetime.datetime.utcfromtimestamp(0) + timedelta_from_epoch = epoch + elem + row_to_persist += (timedelta_from_epoch.isoformat() + '+00:00',) elif 'boolean' in property_type or property_type == 'boolean': if elem is None: diff --git a/tests/test_tap_mysql.py b/tests/test_tap_mysql.py index b3258e3..e4f2b19 100644 --- a/tests/test_tap_mysql.py +++ b/tests/test_tap_mysql.py @@ -206,7 +206,7 @@ def test_date(self): def test_time(self): self.assertEqual(self.schema.properties['c_time'], Schema(['null', 'string'], - format='date-time', + format='time', inclusion='available')) self.assertEqual(self.get_metadata_for_column('c_time'), {'selected-by-default': True,