diff --git a/.circleci/config.yml b/.circleci/config.yml index f1f1165..b5bd5d1 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -1,39 +1,52 @@ version: 2 + +references: + container_config: &container_config + docker: + # Main Python container + - image: circleci/python:3.6.2 + environment: + TAP_MYSQL_PORT: 3306 + TAP_MYSQL_USER: root + TAP_MYSQL_PASSWORD: my-secret-passwd + TAP_MYSQL_HOST: test_mysql + + # MariaDB service container image used as test source database + - image: mariadb:10.2.26 + name: test_mysql + environment: + MYSQL_ROOT_PASSWORD: my-secret-passwd + MYSQL_DATABASE: tap_mysql_test + ports: + - 3306:3306 + command: --default-authentication-plugin=mysql_native_password --log-bin=mysql-bin --binlog-format=ROW + jobs: build: - docker: - - image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:tap-tester + <<: *container_config + steps: - checkout - run: - name: 'Get environment variables' + name: 'Setup virtual environment' command: | - aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/sandbox dev_env.sh + python3 -m venv ./virtualenvs/tap-mysql + . ./virtualenvs/tap-mysql/bin/activate + pip3 install --upgrade pip + pip3 install .[test] - run: - name: 'Setup virtual env' + name: 'Pylinting' command: | - virtualenv -p python3 ~/.virtualenvs/tap-mysql - source ~/.virtualenvs/tap-mysql/bin/activate - pip install . - pip install pylint - pylint tap_mysql -d C,W,unexpected-keyword-arg,duplicate-code + . ./virtualenvs/tap-mysql/bin/activate + pylint --rcfile .pylintrc tap_mysql/ - run: - name: 'Unit Tests' + name: 'Tests' command: | - source ~/.virtualenvs/tap-mysql/bin/activate - pip install nose - source dev_env.sh - nosetests - - add_ssh_keys - - run: - name: 'Integration Tests' - command: | - source ~/.virtualenvs/tap-tester/bin/activate - source dev_env.sh - run-a-test --tap=tap-mysql \ - --target=target-stitch \ - --orchestrator=stitch-orchestrator \ - --email=harrison+sandboxtest@stitchdata.com \ - --password=$SANDBOX_PASSWORD \ - --client-id=50 \ - tap_tester.suites.mysql + . ./virtualenvs/tap-mysql/bin/activate + nosetests -c .noserc tests + +workflows: + version: 2 + build: + jobs: + - build diff --git a/.noserc b/.noserc new file mode 100644 index 0000000..018b8db --- /dev/null +++ b/.noserc @@ -0,0 +1,15 @@ +[nosetests] +# enable coverage +with-coverage=1 + +# cover only the main package tap_mysql +cover-package=tap_mysql + +# set coverage minimum to 85, it's the current value +cover-min-percentage=85 + +# product html coverage report +cover-html=1 + +# folder where to produce the html coverage report +cover-html-dir=coverage_report \ No newline at end of file diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 0000000..63e99a1 --- /dev/null +++ b/.pylintrc @@ -0,0 +1,552 @@ +# Based on Apache 2.0 licensed code from https://github.com/ClusterHQ/flocker + +[MASTER] + +# Specify a configuration file. +#rcfile= + +# Python code to execute, usually for sys.path manipulation such as +# pygtk.require(). +# init-hook= + +# Add files or directories to the blacklist. They should be base names, not paths. +ignore= + +# Pickle collected data for later comparisons. +persistent=no + +# List of plugins (as comma separated values of python modules names) to load, +# usually to register additional checkers. +load-plugins= + +# Use multiple processes to speed up Pylint. +# DO NOT CHANGE THIS VALUES >1 HIDE RESULTS!!!!! +jobs=1 + +# Allow loading of arbitrary C extensions. Extensions are imported into the +# active Python interpreter and may run arbitrary code. +unsafe-load-any-extension=no + +# A comma-separated list of package or module names from where C extensions may +# be loaded. Extensions are loading into the active Python interpreter and may +# run arbitrary code +extension-pkg-whitelist=ujson + +# Allow optimization of some AST trees. This will activate a peephole AST +# optimizer, which will apply various small optimizations. For instance, it can +# be used to obtain the result of joining multiple strings with the addition +# operator. Joining a lot of strings can lead to a maximum recursion error in +# Pylint and this flag can prevent that. It has one side effect, the resulting +# AST will be different than the one from reality. +optimize-ast=no + + +[MESSAGES CONTROL] + +# Only show warnings with the listed confidence levels. Leave empty to show +# all. Valid levels: HIGH, INFERENCE, INFERENCE_FAILURE, UNDEFINED +confidence= + +# Enable the message, report, category or checker with the given id(s). You can +# either give multiple identifier separated by comma (,) or put this option +# multiple time. See also the "--disable" option for examples. +disable=wrong-import-order, + broad-except, + missing-module-docstring, + + +enable=import-error, + import-self, + reimported, + wildcard-import, + misplaced-future, + deprecated-module, + unpacking-non-sequence, + invalid-all-object, + undefined-all-variable, + used-before-assignment, + cell-var-from-loop, + global-variable-undefined, + redefine-in-handler, + unused-import, + unused-wildcard-import, + global-variable-not-assigned, + undefined-loop-variable, + global-statement, + global-at-module-level, + bad-open-mode, + redundant-unittest-assert, + boolean-datetime + deprecated-method, + anomalous-unicode-escape-in-string, + anomalous-backslash-in-string, + not-in-loop, + continue-in-finally, + abstract-class-instantiated, + star-needs-assignment-target, + duplicate-argument-name, + return-in-init, + too-many-star-expressions, + nonlocal-and-global, + return-outside-function, + return-arg-in-generator, + invalid-star-assignment-target, + bad-reversed-sequence, + nonexistent-operator, + yield-outside-function, + init-is-generator, + nonlocal-without-binding, + lost-exception, + assert-on-tuple, + dangerous-default-value, + duplicate-key, + useless-else-on-loop + expression-not-assigned, + confusing-with-statement, + unnecessary-lambda, + pointless-statement, + pointless-string-statement, + unnecessary-pass, + unreachable, + eval-used, + exec-used, + using-constant-test, + bad-super-call, + missing-super-argument, + slots-on-old-class, + super-on-old-class, + property-on-old-class, + not-an-iterable, + not-a-mapping, + format-needs-mapping, + truncated-format-string, + missing-format-string-key, + mixed-format-string, + too-few-format-args, + bad-str-strip-call, + too-many-format-args, + bad-format-character, + format-combined-specification, + bad-format-string-key, + bad-format-string, + missing-format-attribute, + missing-format-argument-key, + unused-format-string-argument + unused-format-string-key, + invalid-format-index, + bad-indentation, + mixed-indentation, + unnecessary-semicolon, + lowercase-l-suffix, + invalid-encoded-data, + unpacking-in-except, + import-star-module-level, + long-suffix, + old-octal-literal, + old-ne-operator, + backtick, + old-raise-syntax, + metaclass-assignment, + next-method-called, + dict-iter-method, + dict-view-method, + indexing-exception, + raising-string, + using-cmp-argument, + cmp-method, + coerce-method, + delslice-method, + getslice-method, + hex-method, + nonzero-method, + t-method, + setslice-method, + old-division, + logging-format-truncated, + logging-too-few-args, + logging-too-many-args, + logging-unsupported-format, + logging-format-interpolation, + invalid-unary-operand-type, + unsupported-binary-operation, + not-callable, + redundant-keyword-arg, + assignment-from-no-return, + assignment-from-none, + not-context-manager, + repeated-keyword, + missing-kwoa, + no-value-for-parameter, + invalid-sequence-index, + invalid-slice-index, + unexpected-keyword-arg, + unsupported-membership-test, + unsubscriptable-object, + access-member-before-definition, + method-hidden, + assigning-non-slot, + duplicate-bases, + inconsistent-mro, + inherit-non-class, + invalid-slots, + invalid-slots-object, + no-method-argument, + no-self-argument, + unexpected-special-method-signature, + non-iterator-returned, + arguments-differ, + signature-differs, + bad-staticmethod-argument, + non-parent-init-called, + bad-except-order, + catching-non-exception, + bad-exception-context, + notimplemented-raised, + raising-bad-type, + raising-non-exception, + misplaced-bare-raise, + duplicate-except, + nonstandard-exception, + binary-op-exception, + bare-except, + not-async-context-manager, + yield-inside-async-function + +# Needs investigation: +# abstract-method (might be indicating a bug? probably not though) +# protected-access (requires some refactoring) +# attribute-defined-outside-init (requires some refactoring) +# super-init-not-called (requires some cleanup) + +# Things we'd like to enable someday: +# redefined-builtin (requires a bunch of work to clean up our code first) +# redefined-outer-name (requires a bunch of work to clean up our code first) +# undefined-variable (re-enable when pylint fixes https://github.com/PyCQA/pylint/issues/760) +# no-name-in-module (giving us spurious warnings https://github.com/PyCQA/pylint/issues/73) +# unused-argument (need to clean up or code a lot, e.g. prefix unused_?) +# function-redefined (@overload causes lots of spurious warnings) +# too-many-function-args (@overload causes spurious warnings... I think) +# parameter-unpacking (needed for eventual Python 3 compat) +# print-statement (needed for eventual Python 3 compat) +# filter-builtin-not-iterating (Python 3) +# map-builtin-not-iterating (Python 3) +# range-builtin-not-iterating (Python 3) +# zip-builtin-not-iterating (Python 3) +# many others relevant to Python 3 +# unused-variable (a little work to cleanup, is all) + +# ... +[REPORTS] + +# Set the output format. Available formats are text, parseable, colorized, msvs +# (visual studio) and html. You can also give a reporter class, eg +# mypackage.mymodule.MyReporterClass. +output-format=parseable + +# Put messages in a separate file for each module / package specified on the +# command line instead of printing them on stdout. Reports (if any) will be +# written in a file name "pylint_global.[txt|html]". +files-output=no + +# Tells whether to display a full report or only the messages +reports=no + +# Python expression which should return a note less than 10 (10 is the highest +# note). You have access to the variables errors warning, statement which +# respectively contain the number of errors / warnings messages and the total +# number of statements analyzed. This is used by the global evaluation report +# (RP0004). +evaluation=10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10) + +# Template used to display messages. This is a python new-style format string +# used to format the message information. See doc for all details +#msg-template= + + +[LOGGING] + +# Logging modules to check that the string format arguments are in logging +# function parameter format +logging-modules=logging + + +[FORMAT] + +# Maximum number of characters on a single line. +max-line-length=120 + +# Regexp for a line that is allowed to be longer than the limit. +ignore-long-lines=^\s*(# )??$ + +# Allow the body of an if to be on the same line as the test if there is no +# else. +single-line-if-stmt=no + +# List of optional constructs for which whitespace checking is disabled. `dict- +# separator` is used to allow tabulation in dicts, etc.: {1 : 1,\n222: 2}. +# `trailing-comma` allows a space between comma and closing bracket: (a, ). +# `empty-line` allows space-only lines. +no-space-check=trailing-comma,dict-separator + +# Maximum number of lines in a module +max-module-lines=1000 + +# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1 +# tab). +indent-string=' ' + +# Number of spaces of indent required inside a hanging or continued line. +indent-after-paren=4 + +# Expected format of line ending, e.g. empty (any line ending), LF or CRLF. +expected-line-ending-format= + + +[TYPECHECK] + +# Tells whether missing members accessed in mixin class should be ignored. A +# mixin class is detected if its name ends with "mixin" (case insensitive). +ignore-mixin-members=yes + +# List of module names for which member attributes should not be checked +# (useful for modules/projects where namespaces are manipulated during runtime +# and thus existing member attributes cannot be deduced by static analysis. It +# supports qualified module names, as well as Unix pattern matching. +ignored-modules= + +# List of classes names for which member attributes should not be checked +# (useful for classes with attributes dynamically set). This supports can work +# with qualified names. +ignored-classes= + +# List of members which are set dynamically and missed by pylint inference +# system, and so shouldn't trigger E1101 when accessed. Python regular +# expressions are accepted. +generated-members= + + +[VARIABLES] + +# Tells whether we should check for unused import in __init__ files. +init-import=no + +# A regular expression matching the name of dummy variables (i.e. expectedly +# not used). +dummy-variables-rgx=_$|dummy + +# List of additional names supposed to be defined in builtins. Remember that +# you should avoid to define new builtins when possible. +additional-builtins= + +# List of strings which can identify a callback function by name. A callback +# name must start or end with one of those strings. +callbacks=cb_,_cb + + +[SIMILARITIES] + +# Minimum lines number of a similarity. +min-similarity-lines=4 + +# Ignore comments when computing similarities. +ignore-comments=yes + +# Ignore docstrings when computing similarities. +ignore-docstrings=yes + +# Ignore imports when computing similarities. +ignore-imports=no + + +[SPELLING] + +# Spelling dictionary name. Available dictionaries: none. To make it working +# install python-enchant package. +spelling-dict= + +# List of comma separated words that should not be checked. +spelling-ignore-words= + +# A path to a file that contains private dictionary; one word per line. +spelling-private-dict-file= + +# Tells whether to store unknown words to indicated private dictionary in +# --spelling-private-dict-file option instead of raising a message. +spelling-store-unknown-words=no + + +[MISCELLANEOUS] + +# List of note tags to take in consideration, separated by a comma. +notes=FIXME,XXX,TODO + + +[BASIC] + +# List of builtins function names that should not be used, separated by a comma +bad-functions=map,filter,input + +# Good variable names which should always be accepted, separated by a comma +good-names=i,j,k,ex,Run,_ + +# Bad variable names which should always be refused, separated by a comma +bad-names=foo,bar,baz,toto,tutu,tata + +# Colon-delimited sets of names that determine each other's naming style when +# the name regexes allow several styles. +name-group= + +# Include a hint for the correct naming format with invalid-name +include-naming-hint=no + +# Regular expression matching correct function names +function-rgx=[a-z_][a-z0-9_]{2,40}$ + +# Naming hint for function names +function-name-hint=[a-z_][a-z0-9_]{2,40}$ + +# Regular expression matching correct variable names +variable-rgx=[a-z_][a-z0-9_]{2,30}$ + +# Naming hint for variable names +variable-name-hint=[a-z_][a-z0-9_]{2,30}$ + +# Regular expression matching correct constant names +const-rgx=(([A-Z_][A-Z0-9_]*)|(__.*__))$ + +# Naming hint for constant names +const-name-hint=(([A-Z_][A-Z0-9_]*)|(__.*__))$ + +# Regular expression matching correct attribute names +attr-rgx=[a-z_][a-z0-9_]{2,30}$ + +# Naming hint for attribute names +attr-name-hint=[a-z_][a-z0-9_]{2,30}$ + +# Regular expression matching correct argument names +argument-rgx=[a-z_][a-z0-9_]{2,30}$ + +# Naming hint for argument names +argument-name-hint=[a-z_][a-z0-9_]{2,30}$ + +# Regular expression matching correct class attribute names +class-attribute-rgx=([A-Za-z_][A-Za-z0-9_]{2,30}|(__.*__))$ + +# Naming hint for class attribute names +class-attribute-name-hint=([A-Za-z_][A-Za-z0-9_]{2,30}|(__.*__))$ + +# Regular expression matching correct inline iteration names +inlinevar-rgx=[A-Za-z_][A-Za-z0-9_]*$ + +# Naming hint for inline iteration names +inlinevar-name-hint=[A-Za-z_][A-Za-z0-9_]*$ + +# Regular expression matching correct class names +class-rgx=[A-Z_][a-zA-Z0-9]+$ + +# Naming hint for class names +class-name-hint=[A-Z_][a-zA-Z0-9]+$ + +# Regular expression matching correct module names +module-rgx=(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$ + +# Naming hint for module names +module-name-hint=(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$ + +# Regular expression matching correct method names +method-rgx=[a-z_][a-z0-9_]{2,30}$ + +# Naming hint for method names +method-name-hint=[a-z_][a-z0-9_]{2,30}$ + +# Regular expression which should only match function or class names that do +# not require a docstring. +no-docstring-rgx=^_ + +# Minimum line length for functions/classes that require docstrings, shorter +# ones are exempt. +docstring-min-length=-1 + + +[ELIF] + +# Maximum number of nested blocks for function / method body +max-nested-blocks=5 + + +[IMPORTS] + +# Deprecated modules which should not be used, separated by a comma +deprecated-modules=regsub,TERMIOS,Bastion,rexec + +# Create a graph of every (i.e. internal and external) dependencies in the +# given file (report RP0402 must not be disabled) +import-graph= + +# Create a graph of external dependencies in the given file (report RP0402 must +# not be disabled) +ext-import-graph= + +# Create a graph of internal dependencies in the given file (report RP0402 must +# not be disabled) +int-import-graph= + + +[DESIGN] + +# Maximum number of arguments for function / method +max-args=5 + +# Argument names that match this expression will be ignored. Default to name +# with leading underscore +ignored-argument-names=_.* + +# Maximum number of locals for function / method body +max-locals=15 + +# Maximum number of return / yield for function / method body +max-returns=6 + +# Maximum number of branch for function / method body +max-branches=12 + +# Maximum number of statements in function / method body +max-statements=50 + +# Maximum number of parents for a class (see R0901). +max-parents=7 + +# Maximum number of attributes for a class (see R0902). +max-attributes=7 + +# Minimum number of public methods for a class (see R0903). +min-public-methods=2 + +# Maximum number of public methods for a class (see R0904). +max-public-methods=20 + +# Maximum number of boolean expressions in a if statement +max-bool-expr=5 + + +[CLASSES] + +# List of method names used to declare (i.e. assign) instance attributes. +defining-attr-methods=__init__,__new__,setUp + +# List of valid names for the first argument in a class method. +valid-classmethod-first-arg=cls + +# List of valid names for the first argument in a metaclass class method. +valid-metaclass-classmethod-first-arg=mcs + +# List of member names, which should be excluded from the protected access +# warning. +exclude-protected=_asdict,_fields,_replace,_source,_make + + +[EXCEPTIONS] + +# Exceptions that will emit a warning when being caught. Defaults to +# "Exception" +overgeneral-exceptions=Exception \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..45984d3 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,20 @@ +version: '3.1' + +services: + test_mysql: + image: mariadb:10.2.26 + environment: + MYSQL_ROOT_PASSWORD: my-secret-passwd + MYSQL_DATABASE: tap_mysql_test + ports: + - 3306:3306 + command: --default-authentication-plugin=mysql_native_password --log-bin=mysql-bin --binlog-format=ROW + networks: + - dev-net + +networks: + dev-net: + ipam: + driver: default + config: + - subnet: "185.16.238.0/24" \ No newline at end of file diff --git a/setup.py b/setup.py index f101df6..c4814f5 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup setup(name='pipelinewise-tap-mysql', - version='1.0.7', + version='1.1.0', description='Singer.io tap for extracting data from MySQL - PipelineWise compatible', author='Stitch', url='https://github.com/transferwise/pipelinewise-tap-mysql', @@ -20,6 +20,13 @@ 'backoff==1.3.2', 'mysql-replication==0.21', ], + extras_require={ + 'test': [ + 'nose==1.3.*', + 'pylint==2.4.*', + 'nose-cov==1.6' + ] + }, entry_points=''' [console_scripts] tap-mysql=tap_mysql:main diff --git a/tap_mysql/__init__.py b/tap_mysql/__init__.py index 1875206..adabb13 100644 --- a/tap_mysql/__init__.py +++ b/tap_mysql/__init__.py @@ -1,10 +1,8 @@ #!/usr/bin/env python3 -# pylint: disable=missing-docstring,not-an-iterable,too-many-locals,too-many-arguments,too-many-branches,invalid-name,duplicate-code,too-many-statements +# pylint: disable=missing-docstring,too-many-locals -import datetime import collections import itertools -from itertools import dropwhile import copy import pendulum @@ -15,7 +13,6 @@ import singer.metrics as metrics import singer.schema -from singer import bookmarks from singer import metadata from singer import utils from singer.schema import Schema @@ -28,7 +25,6 @@ from tap_mysql.connection import connect_with_backoff, MySQLConnection - Column = collections.namedtuple('Column', [ "table_schema", "table_name", @@ -51,15 +47,7 @@ pymysql.converters.conversions[pendulum.Pendulum] = pymysql.converters.escape_datetime - -STRING_TYPES = set([ - 'char', - 'enum', - 'longtext', - 'mediumtext', - 'text', - 'varchar' -]) +STRING_TYPES = {'char', 'enum', 'longtext', 'mediumtext', 'text', 'varchar'} BYTES_FOR_INTEGER_TYPE = { 'tinyint': 1, @@ -69,19 +57,21 @@ 'bigint': 8 } -FLOAT_TYPES = set(['float', 'double']) +FLOAT_TYPES = {'float', 'double'} + +DATETIME_TYPES = {'datetime', 'timestamp', 'date', 'time'} -DATETIME_TYPES = set(['datetime', 'timestamp', 'date', 'time']) +BINARY_TYPES = {'binary', 'varbinary'} -def schema_for_column(c): - '''Returns the Schema object for the given Column.''' - data_type = c.data_type.lower() - column_type = c.column_type.lower() +def schema_for_column(column): + """Returns the Schema object for the given Column.""" + data_type = column.data_type.lower() + column_type = column.column_type.lower() inclusion = 'available' # We want to automatically include all primary key columns - if c.column_key.lower() == 'pri': + if column.column_key.lower() == 'pri': inclusion = 'automatic' result = Schema(inclusion=inclusion) @@ -92,7 +82,7 @@ def schema_for_column(c): elif data_type in BYTES_FOR_INTEGER_TYPE: result.type = ['null', 'integer'] bits = BYTES_FOR_INTEGER_TYPE[data_type] * 8 - if 'unsigned' in c.column_type: + if 'unsigned' in column.column_type: result.minimum = 0 result.maximum = 2 ** bits - 1 else: @@ -104,53 +94,54 @@ def schema_for_column(c): elif data_type == 'decimal': result.type = ['null', 'number'] - result.multipleOf = 10 ** (0 - c.numeric_scale) + result.multipleOf = 10 ** (0 - column.numeric_scale) return result elif data_type in STRING_TYPES: result.type = ['null', 'string'] - result.maxLength = c.character_maximum_length + result.maxLength = column.character_maximum_length elif data_type in DATETIME_TYPES: result.type = ['null', 'string'] result.format = 'date-time' + elif data_type in BINARY_TYPES: + result.type = ['null', 'string'] + result.format = 'binary' + else: result = Schema(None, inclusion='unsupported', - description='Unsupported column type {}'.format(column_type)) + description=f'Unsupported column type {column_type}') return result def create_column_metadata(cols): mdata = {} mdata = metadata.write(mdata, (), 'selected-by-default', False) - for c in cols: - schema = schema_for_column(c) + for col in cols: + schema = schema_for_column(col) mdata = metadata.write(mdata, - ('properties', c.column_name), + ('properties', col.column_name), 'selected-by-default', schema.inclusion != 'unsupported') mdata = metadata.write(mdata, - ('properties', c.column_name), + ('properties', col.column_name), 'sql-datatype', - c.column_type.lower()) + col.column_type.lower()) return metadata.to_list(mdata) def discover_catalog(mysql_conn, config): - '''Returns a Catalog describing the structure of the database.''' - + """Returns a Catalog describing the structure of the database.""" filter_dbs_config = config.get('filter_dbs') - if filter_dbs_config: - filter_dbs_clause = ",".join(["'{}'".format(db) - for db in filter_dbs_config.split(",")]) + filter_dbs_clause = ",".join([f"'{db_name}'" for db_name in filter_dbs_config.split(",")]) - table_schema_clause = "WHERE table_schema IN ({})".format(filter_dbs_clause) + table_schema_clause = f"WHERE table_schema IN ({filter_dbs_clause})" else: table_schema_clause = """ WHERE table_schema NOT IN ( @@ -162,27 +153,27 @@ def discover_catalog(mysql_conn, config): with connect_with_backoff(mysql_conn) as open_conn: with open_conn.cursor() as cur: - cur.execute(""" + cur.execute(f""" SELECT table_schema, table_name, table_type, table_rows FROM information_schema.tables - {} - """.format(table_schema_clause)) + {table_schema_clause} + """) table_info = {} - for (db, table, table_type, rows) in cur.fetchall(): - if db not in table_info: - table_info[db] = {} + for (db_name, table, table_type, rows) in cur.fetchall(): + if db_name not in table_info: + table_info[db_name] = {} - table_info[db][table] = { + table_info[db_name][table] = { 'row_count': rows, 'is_view': table_type == 'VIEW' } - cur.execute(""" + cur.execute(f""" SELECT table_schema, table_name, column_name, @@ -193,9 +184,9 @@ def discover_catalog(mysql_conn, config): column_type, column_key FROM information_schema.columns - {} + {table_schema_clause} ORDER BY table_schema, table_name - """.format(table_schema_clause)) + """) columns = [] rec = cur.fetchone() @@ -209,8 +200,8 @@ def discover_catalog(mysql_conn, config): (table_schema, table_name) = k schema = Schema(type='object', properties={c.column_name: schema_for_column(c) for c in cols}) - md = create_column_metadata(cols) - md_map = metadata.to_map(md) + mdata = create_column_metadata(cols) + md_map = metadata.to_map(mdata) md_map = metadata.write(md_map, (), @@ -233,10 +224,8 @@ def discover_catalog(mysql_conn, config): 'is-view', is_view) - column_is_key_prop = lambda c, s: ( - c.column_key == 'PRI' and - s.properties[c.column_name].inclusion != 'unsupported' - ) + column_is_key_prop = lambda c, s: (c.column_key == 'PRI' and + s.properties[c.column_name].inclusion != 'unsupported') key_properties = [c.column_name for c in cols if column_is_key_prop(c, schema)] @@ -262,9 +251,7 @@ def do_discover(mysql_conn, config): discover_catalog(mysql_conn, config).dump() -# TODO: Maybe put in a singer-db-utils library. def desired_columns(selected, table_schema): - '''Return the set of column names we need to include in the SELECT. selected - set of column names marked as selected in the input catalog @@ -345,6 +332,7 @@ def is_valid_currently_syncing_stream(selected_stream, state): return False + def binlog_stream_requires_historical(catalog_entry, state): log_file = singer.get_bookmark(state, catalog_entry.tap_stream_id, @@ -429,7 +417,7 @@ def get_non_binlog_streams(mysql_conn, catalog, config, state): discovered = discover_catalog(mysql_conn, config) # Filter catalog to include only selected streams - selected_streams = list(filter(lambda s: common.stream_is_selected(s), catalog.streams)) + selected_streams = list(filter(common.stream_is_selected, catalog.streams)) streams_with_state = [] streams_without_state = [] @@ -447,7 +435,8 @@ def get_non_binlog_streams(mysql_conn, catalog, config, state): is_view = common.get_is_view(stream) if is_view: - raise Exception("Unable to replicate stream({}) with binlog because it is a view.".format(stream.stream)) + raise Exception( + f"Unable to replicate stream({stream.stream}) with binlog because it is a view.") LOGGER.info("LOG_BASED stream %s will resume its historical sync", stream.tap_stream_id) @@ -481,13 +470,12 @@ def get_non_binlog_streams(mysql_conn, catalog, config, state): def get_binlog_streams(mysql_conn, catalog, config, state): discovered = discover_catalog(mysql_conn, config) - selected_streams = list(filter(lambda s: common.stream_is_selected(s), catalog.streams)) + selected_streams = list(filter(common.stream_is_selected, catalog.streams)) binlog_streams = [] for stream in selected_streams: stream_metadata = metadata.to_map(stream.metadata) replication_method = stream_metadata.get((), {}).get('replication-method') - stream_state = state.get('bookmarks', {}).get(stream.tap_stream_id) if replication_method == 'LOG_BASED' and not binlog_stream_requires_historical(stream, state): binlog_streams.append(stream) @@ -495,7 +483,10 @@ def get_binlog_streams(mysql_conn, catalog, config, state): return resolve_catalog(discovered, binlog_streams) -def write_schema_message(catalog_entry, bookmark_properties=[]): +def write_schema_message(catalog_entry, bookmark_properties=None): + if bookmark_properties is None: + bookmark_properties = [] + key_properties = common.get_key_properties(catalog_entry) singer.write_message(singer.SchemaMessage( @@ -513,7 +504,8 @@ def do_sync_incremental(mysql_conn, catalog_entry, state, columns): replication_key = md_map.get((), {}).get('replication-key') if not replication_key: - raise Exception("Cannot use INCREMENTAL replication for table ({}) without a replication key.".format(catalog_entry.stream)) + raise Exception( + f"Cannot use INCREMENTAL replication for table ({catalog_entry.stream}) without a replication key.") write_schema_message(catalog_entry=catalog_entry, bookmark_properties=[replication_key]) @@ -527,10 +519,9 @@ def do_sync_historical_binlog(mysql_conn, config, catalog_entry, state, columns) binlog.verify_binlog_config(mysql_conn) is_view = common.get_is_view(catalog_entry) - key_properties = common.get_key_properties(catalog_entry) if is_view: - raise Exception("Unable to replicate stream({}) with binlog because it is a view.".format(catalog_entry.stream)) + raise Exception(f"Unable to replicate stream({catalog_entry.stream}) with binlog because it is a view.") log_file = singer.get_bookmark(state, catalog_entry.tap_stream_id, @@ -544,10 +535,6 @@ def do_sync_historical_binlog(mysql_conn, config, catalog_entry, state, columns) catalog_entry.tap_stream_id, 'max_pk_values') - last_pk_fetched = singer.get_bookmark(state, - catalog_entry.tap_stream_id, - 'last_pk_fetched') - write_schema_message(catalog_entry) stream_version = common.get_stream_version(catalog_entry.tap_stream_id, state) @@ -600,7 +587,6 @@ def do_sync_historical_binlog(mysql_conn, config, catalog_entry, state, columns) def do_sync_full_table(mysql_conn, catalog_entry, state, columns): LOGGER.info("Stream %s is using full table replication", catalog_entry.stream) - key_properties = common.get_key_properties(catalog_entry) write_schema_message(catalog_entry) @@ -662,7 +648,7 @@ def sync_binlog_streams(mysql_conn, binlog_catalog, config, state): for stream in binlog_catalog.streams: write_schema_message(stream) - with metrics.job_timer('sync_binlog') as timer: + with metrics.job_timer('sync_binlog'): binlog.sync_binlog_stream(mysql_conn, config, binlog_catalog.streams, state) @@ -673,6 +659,7 @@ def do_sync(mysql_conn, config, catalog, state): sync_non_binlog_streams(mysql_conn, non_binlog_catalog, config, state) sync_binlog_streams(mysql_conn, binlog_catalog, config, state) + def log_server_params(mysql_conn): with connect_with_backoff(mysql_conn) as open_conn: try: @@ -695,15 +682,12 @@ def log_server_params(mysql_conn): cur.execute(''' show session status where Variable_name IN ('Ssl_version', 'Ssl_cipher')''') rows = cur.fetchall() - mapped_row = {k:v for (k,v) in [(r[0], r[1]) for r in rows]} - LOGGER.info('Server SSL Parameters (blank means SSL is not active): ' + - '[ssl_version: %s], ' + - '[ssl_cipher: %s]', - mapped_row['Ssl_version'], - mapped_row['Ssl_cipher']) - - except pymysql.err.InternalError as e: - LOGGER.warning("Encountered error checking server params. Error: (%s) %s", *e.args) + mapped_row = {r[0]: r[1] for r in rows} + LOGGER.info('Server SSL Parameters(blank means SSL is not active): [ssl_version: %s], [ssl_cipher: %s]', + mapped_row['Ssl_version'], mapped_row['Ssl_cipher']) + + except pymysql.err.InternalError as exc: + LOGGER.warning("Encountered error checking server params. Error: (%s) %s", *exc.args) def main_impl(): diff --git a/tap_mysql/connection.py b/tap_mysql/connection.py index f450b9d..9e94780 100644 --- a/tap_mysql/connection.py +++ b/tap_mysql/connection.py @@ -1,4 +1,5 @@ #!/usr/bin/env python3 +# pylint: disable=missing-docstring,arguments-differ,missing-function-docstring import backoff @@ -14,7 +15,8 @@ READ_TIMEOUT_SECONDS = 3600 # We need to hold onto this for self-signed SSL -match_hostname = ssl.match_hostname +MATCH_HOSTNAME = ssl.match_hostname + @backoff.on_exception(backoff.expo, (pymysql.err.OperationalError), @@ -27,32 +29,31 @@ def connect_with_backoff(connection): with connection.cursor() as cur: try: cur.execute('SET @@session.time_zone="+0:00"') - except pymysql.err.InternalError as e: - warnings.append('Could not set session.time_zone. Error: ({}) {}'.format(*e.args)) + except pymysql.err.InternalError as exc: + warnings.append(f'Could not set session.time_zone. Error: ({exc.args[0]}) {exc.args[1]}') try: cur.execute('SET @@session.wait_timeout=2700') - except pymysql.err.InternalError as e: - warnings.append('Could not set session.wait_timeout. Error: ({}) {}'.format(*e.args)) + except pymysql.err.InternalError as exc: + warnings.append(f'Could not set session.wait_timeout. Error: ({exc.args[0]}) {exc.args[1]}') try: - cur.execute("SET @@session.net_read_timeout={}".format(READ_TIMEOUT_SECONDS)) - except pymysql.err.InternalError as e: - warnings.append('Could not set session.net_read_timeout. Error: ({}) {}'.format(*e.args)) - + cur.execute(f"SET @@session.net_read_timeout={READ_TIMEOUT_SECONDS}") + except pymysql.err.InternalError as exc: + warnings.append(f'Could not set session.net_read_timeout. Error: ({exc.args[0]}) {exc.args[1]}') try: cur.execute('SET @@session.innodb_lock_wait_timeout=2700') - except pymysql.err.InternalError as e: + except pymysql.err.InternalError as exc: warnings.append( - 'Could not set session.innodb_lock_wait_timeout. Error: ({}) {}'.format(*e.args) - ) + f'Could not set session.innodb_lock_wait_timeout. Error: ({exc.args[0]}) {exc.args[1]}' + ) if warnings: LOGGER.info(("Encountered non-fatal errors when configuring MySQL session that could " "impact performance:")) - for w in warnings: - LOGGER.warning(w) + for warning in warnings: + LOGGER.warning(warning) return connection @@ -129,7 +130,7 @@ def __init__(self, config): # override match hostname for google cloud if config.get("internal_hostname"): parsed_hostname = parse_internal_hostname(config["internal_hostname"]) - ssl.match_hostname = lambda cert, hostname: match_hostname(cert, parsed_hostname) + ssl.match_hostname = lambda cert, hostname: MATCH_HOSTNAME(cert, parsed_hostname) super().__init__(defer_connect=True, ssl=ssl_arg, **args) @@ -142,11 +143,9 @@ def __init__(self, config): self.ctx.verify_mode = ssl.CERT_NONE self.client_flag |= CLIENT.SSL - def __enter__(self): return self - def __exit__(self, *exc_info): del exc_info self.close() @@ -154,7 +153,7 @@ def __exit__(self, *exc_info): def make_connection_wrapper(config): class ConnectionWrapper(MySQLConnection): - def __init__(self, *args, **kwargs): + def __init__(self, *args, **kwargs): # pylint: disable=unused-argument config["cursorclass"] = kwargs.get('cursorclass') super().__init__(config) diff --git a/tap_mysql/sync_strategies/__init__.py b/tap_mysql/sync_strategies/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tap_mysql/sync_strategies/binlog.py b/tap_mysql/sync_strategies/binlog.py index 1565c22..5ffafef 100644 --- a/tap_mysql/sync_strategies/binlog.py +++ b/tap_mysql/sync_strategies/binlog.py @@ -1,31 +1,27 @@ #!/usr/bin/env python3 -# pylint: disable=duplicate-code,too-many-locals,too-many-arguments,too-many-branches - +# pylint: disable=missing-function-docstring,too-many-arguments,too-many-branches +import codecs import copy - import datetime -import pytz -import tzlocal - -import singer -from singer import metadata -from singer import utils -from singer.schema import Schema import pymysql.connections import pymysql.err -import re -import tap_mysql.sync_strategies.common as common - -from tap_mysql.connection import connect_with_backoff, MySQLConnection, make_connection_wrapper -from pymysqlreplication.constants import FIELD_TYPE +import pytz +import singer +import tzlocal from pymysqlreplication import BinLogStreamReader +from pymysqlreplication.constants import FIELD_TYPE from pymysqlreplication.event import RotateEvent from pymysqlreplication.row_event import ( - DeleteRowsEvent, - UpdateRowsEvent, - WriteRowsEvent, - ) + DeleteRowsEvent, + UpdateRowsEvent, + WriteRowsEvent, +) +from singer import utils +from singer.schema import Schema + +import tap_mysql.sync_strategies.common as common +from tap_mysql.connection import connect_with_backoff, make_connection_wrapper LOGGER = singer.get_logger() @@ -35,16 +31,17 @@ BOOKMARK_KEYS = {'log_file', 'log_pos', 'version'} -mysql_timestamp_types = { +MYSQL_TIMESTAMP_TYPES = { FIELD_TYPE.TIMESTAMP, FIELD_TYPE.TIMESTAMP2 } + def add_automatic_properties(catalog_entry, columns): catalog_entry.schema.properties[SDC_DELETED_AT] = Schema( type=["null", "string"], format="date-time" - ) + ) columns.append(SDC_DELETED_AT) @@ -58,21 +55,22 @@ def verify_binlog_config(mysql_conn): binlog_format = cur.fetchone()[0] if binlog_format != 'ROW': - raise Exception("Unable to replicate binlog stream because binlog_format is not set to 'ROW': {}." - .format(binlog_format)) + raise Exception(f"Unable to replicate binlog stream because binlog_format is " + f"not set to 'ROW': {binlog_format}.") try: cur.execute("SELECT @@binlog_row_image") binlog_row_image = cur.fetchone()[0] except pymysql.err.InternalError as ex: if ex.args[0] == 1193: - raise Exception("Unable to replicate binlog stream because binlog_row_image system variable does not exist. MySQL version must be at least 5.6.2 to use binlog replication.") - else: - raise ex + raise Exception("Unable to replicate binlog stream because binlog_row_image " + "system variable does not exist. MySQL version must be at " + "least 5.6.2 to use binlog replication.") + raise ex if binlog_row_image != 'FULL': - raise Exception("Unable to replicate binlog stream because binlog_row_image is not set to 'FULL': {}." - .format(binlog_row_image)) + raise Exception(f"Unable to replicate binlog stream because binlog_row_image is " + f"not set to 'FULL': {binlog_row_image}.") def verify_log_file_exists(mysql_conn, log_file, log_pos): @@ -84,14 +82,13 @@ def verify_log_file_exists(mysql_conn, log_file, log_pos): existing_log_file = list(filter(lambda log: log[0] == log_file, result)) if not existing_log_file: - raise Exception("Unable to replicate binlog stream because log file {} does not exist." - .format(log_file)) + raise Exception(f"Unable to replicate binlog stream because log file {log_file} does not exist.") current_log_pos = existing_log_file[0][1] if log_pos > current_log_pos: - raise Exception("Unable to replicate binlog stream because requested position ({}) for log file {} is greater than current position ({})." - .format(log_pos, log_file, current_log_pos)) + raise Exception(f"Unable to replicate binlog stream because requested position ({log_pos}) " + f"for log file {log_file} is greater than current position ({current_log_pos}). ") def fetch_current_log_file_and_pos(mysql_conn): @@ -126,10 +123,11 @@ def row_to_singer_record(catalog_entry, version, db_column_map, row, time_extrac db_column_type = db_column_map.get(column_name) if isinstance(val, datetime.datetime): - if db_column_type in mysql_timestamp_types: - # The mysql-replication library creates datetimes from TIMESTAMP columns using fromtimestamp - # which will use the local timezone thus we must set tzinfo accordingly - # See: https://github.com/noplay/python-mysql-replication/blob/master/pymysqlreplication/row_event.py#L143-L145 + if db_column_type in MYSQL_TIMESTAMP_TYPES: + # The mysql-replication library creates datetimes from TIMESTAMP columns using fromtimestamp which + # will use the local timezone thus we must set tzinfo accordingly See: + # https://github.com/noplay/python-mysql-replication/blob/master/pymysqlreplication/row_event.py#L143 + # -L145 timezone = tzlocal.get_localzone() local_datetime = timezone.localize(val) utc_datetime = local_datetime.astimezone(pytz.UTC) @@ -141,10 +139,13 @@ def row_to_singer_record(catalog_entry, version, db_column_map, row, time_extrac row_to_persist[column_name] = val.isoformat() + 'T00:00:00+00:00' elif isinstance(val, datetime.timedelta): - epoch = datetime.datetime.utcfromtimestamp(0) - timedelta_from_epoch = epoch + val + timedelta_from_epoch = datetime.datetime.utcfromtimestamp(0) + val row_to_persist[column_name] = timedelta_from_epoch.isoformat() + '+00:00' + elif isinstance(val, bytes): + # encode bytes as hex + row_to_persist[column_name] = codecs.encode(val, 'hex') + elif 'boolean' in property_type or property_type == 'boolean': if val is None: boolean_representation = None @@ -193,6 +194,7 @@ def get_min_log_pos_per_log_file(binlog_streams_map, state): return min_log_pos_per_file + def calculate_bookmark(mysql_conn, binlog_streams_map, state): min_log_pos_per_file = get_min_log_pos_per_log_file(binlog_streams_map, state) @@ -208,8 +210,8 @@ def calculate_bookmark(mysql_conn, binlog_streams_map, state): expired_logs = state_logs_set.difference(server_logs_set) if expired_logs: - raise Exception("Unable to replicate binlog stream because the following binary log(s) no longer exist: {}".format( - ", ".join(expired_logs))) + raise Exception('Unable to replicate binlog stream because the following binary log(s) no longer ' + f'exist: {", ".join(expired_logs)}') for log_file in sorted(server_logs_set): if min_log_pos_per_file.get(log_file): @@ -217,6 +219,7 @@ def calculate_bookmark(mysql_conn, binlog_streams_map, state): raise Exception("Unable to replicate binlog stream because no binary logs exist on the server.") + def update_bookmarks(state, binlog_streams_map, log_file, log_pos): for tap_stream_id in binlog_streams_map.keys(): state = singer.write_bookmark(state, @@ -233,7 +236,7 @@ def update_bookmarks(state, binlog_streams_map, log_file, log_pos): def get_db_column_types(event): - return {c.name:c.type for c in event.columns} + return {c.name: c.type for c in event.columns} def handle_write_rows_event(event, catalog_entry, state, columns, rows_saved, time_extracted): @@ -241,7 +244,7 @@ def handle_write_rows_event(event, catalog_entry, state, columns, rows_saved, ti db_column_types = get_db_column_types(event) for row in event.rows: - filtered_vals = {k:v for k,v in row['values'].items() + filtered_vals = {k: v for k, v in row['values'].items() if k in columns} record_message = row_to_singer_record(catalog_entry, @@ -261,7 +264,7 @@ def handle_update_rows_event(event, catalog_entry, state, columns, rows_saved, t db_column_types = get_db_column_types(event) for row in event.rows: - filtered_vals = {k:v for k,v in row['after_values'].items() + filtered_vals = {k: v for k, v in row['after_values'].items() if k in columns} record_message = row_to_singer_record(catalog_entry, @@ -276,6 +279,7 @@ def handle_update_rows_event(event, catalog_entry, state, columns, rows_saved, t return rows_saved + def handle_delete_rows_event(event, catalog_entry, state, columns, rows_saved, time_extracted): stream_version = common.get_stream_version(catalog_entry.tap_stream_id, state) db_column_types = get_db_column_types(event) @@ -286,7 +290,7 @@ def handle_delete_rows_event(event, catalog_entry, state, columns, rows_saved, t vals = row['values'] vals[SDC_DELETED_AT] = event_ts - filtered_vals = {k:v for k,v in vals.items() + filtered_vals = {k: v for k, v in vals.items() if k in columns} record_message = row_to_singer_record(catalog_entry, @@ -316,6 +320,7 @@ def generate_streams_map(binlog_streams): return stream_map + def _run_binlog_sync(mysql_conn, reader, binlog_streams_map, state): time_extracted = utils.now() @@ -385,13 +390,14 @@ def _run_binlog_sync(mysql_conn, reader, binlog_streams_map, state): break if ((rows_saved and rows_saved % UPDATE_BOOKMARK_PERIOD == 0) or - (events_skipped and events_skipped % UPDATE_BOOKMARK_PERIOD == 0)): + (events_skipped and events_skipped % UPDATE_BOOKMARK_PERIOD == 0)): singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) + def sync_binlog_stream(mysql_conn, config, binlog_streams, state): binlog_streams_map = generate_streams_map(binlog_streams) - for tap_stream_id in binlog_streams_map.keys(): + for tap_stream_id, _ in binlog_streams_map.items(): common.whitelist_bookmark_keys(BOOKMARK_KEYS, tap_stream_id, state) log_file, log_pos = calculate_bookmark(mysql_conn, binlog_streams_map, state) @@ -411,7 +417,7 @@ def sync_binlog_stream(mysql_conn, config, binlog_streams, state): reader = BinLogStreamReader( connection_settings={}, server_id=server_id, - slave_uuid='stitch-slave-{}'.format(server_id), + slave_uuid=f'stitch-slave-{server_id}', log_file=log_file, log_pos=log_pos, resume_stream=True, diff --git a/tap_mysql/sync_strategies/common.py b/tap_mysql/sync_strategies/common.py index cfad4a8..e026a6d 100644 --- a/tap_mysql/sync_strategies/common.py +++ b/tap_mysql/sync_strategies/common.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -# pylint: disable=too-many-arguments,duplicate-code,too-many-locals - +# pylint: disable=missing-function-docstring,too-many-arguments,too-many-locals +import codecs import copy import datetime import singer @@ -14,8 +14,7 @@ def escape(string): if '`' in string: - raise Exception("Can't escape identifier {} because it contains a backtick" - .format(string)) + raise Exception(f"Can't escape identifier {string} because it contains a backtick") return '`' + string + '`' @@ -78,10 +77,7 @@ def generate_select_sql(catalog_entry, columns): escaped_table = escape(catalog_entry.table) escaped_columns = [escape(c) for c in columns] - select_sql = 'SELECT {} FROM {}.{}'.format( - ','.join(escaped_columns), - escaped_db, - escaped_table) + select_sql = f'SELECT {",".join(escaped_columns)} FROM {escaped_db}.{escaped_table}' # escape percent signs select_sql = select_sql.replace('%', '%%') @@ -104,9 +100,8 @@ def row_to_singer_record(catalog_entry, version, row, columns, time_extracted): row_to_persist += (timedelta_from_epoch.isoformat() + '+00:00',) elif isinstance(elem, bytes): - # for BIT value, treat 0 as False and anything else as True - boolean_representation = elem != b'\x00' - row_to_persist += (boolean_representation,) + # encode bytes as hex + row_to_persist += (codecs.encode(elem, 'hex'),) elif 'boolean' in property_type or property_type == 'boolean': if elem is None: @@ -129,11 +124,10 @@ def row_to_singer_record(catalog_entry, version, row, columns, time_extracted): def whitelist_bookmark_keys(bookmark_key_set, tap_stream_id, state): - for bk in [non_whitelisted_bookmark_key - for non_whitelisted_bookmark_key - in state.get('bookmarks', {}).get(tap_stream_id, {}).keys() - if non_whitelisted_bookmark_key not in bookmark_key_set]: - singer.clear_bookmark(state, tap_stream_id, bk) + for bookmark_key in [non_whitelisted_bookmark_key for + non_whitelisted_bookmark_key in state.get('bookmarks', {}).get(tap_stream_id, {}).keys() + if non_whitelisted_bookmark_key not in bookmark_key_set]: + singer.clear_bookmark(state, tap_stream_id, bookmark_key) def sync_query(cursor, catalog_entry, state, select_sql, columns, stream_version, params): @@ -179,7 +173,7 @@ def sync_query(cursor, catalog_entry, state, select_sql, columns, stream_version 'max_pk_values') if max_pk_values: - last_pk_fetched = {k:v for k,v in record_message.record.items() + last_pk_fetched = {k:v for k, v in record_message.record.items() if k in key_properties} state = singer.write_bookmark(state, diff --git a/tap_mysql/sync_strategies/full_table.py b/tap_mysql/sync_strategies/full_table.py index 64f7eb1..a5c93a1 100644 --- a/tap_mysql/sync_strategies/full_table.py +++ b/tap_mysql/sync_strategies/full_table.py @@ -1,14 +1,12 @@ #!/usr/bin/env python3 -# pylint: disable=duplicate-code,too-many-locals,simplifiable-if-expression +# pylint: disable=too-many-locals,missing-function-docstring -import copy import singer from singer import metadata import tap_mysql.sync_strategies.binlog as binlog import tap_mysql.sync_strategies.common as common - -from tap_mysql.connection import connect_with_backoff, MySQLConnection +from tap_mysql.connection import connect_with_backoff LOGGER = singer.get_logger() @@ -45,10 +43,10 @@ def pks_are_auto_incrementing(mysql_conn, catalog_entry): with connect_with_backoff(mysql_conn) as open_conn: with open_conn.cursor() as cur: - for pk in key_properties: + for primary_key in key_properties: cur.execute(sql.format(database_name, - catalog_entry.table, - pk)) + catalog_entry.table, + primary_key)) result = cur.fetchone() @@ -73,12 +71,12 @@ def get_max_pk_values(cursor, catalog_entry): """ select_column_clause = ", ".join(escaped_columns) - order_column_clause = ", ".join([pk + " DESC" for pk in escaped_columns]) + order_column_clause = ", ".join([primary_key + " DESC" for primary_key in escaped_columns]) cursor.execute(sql.format(select_column_clause, - escaped_db, - escaped_table, - order_column_clause)) + escaped_db, + escaped_table, + order_column_clause)) result = cursor.fetchone() if result: @@ -88,13 +86,11 @@ def get_max_pk_values(cursor, catalog_entry): return max_pk_values + def generate_pk_clause(catalog_entry, state): key_properties = common.get_key_properties(catalog_entry) escaped_columns = [common.escape(c) for c in key_properties] - where_clause = " AND ".join([pk + " > `{}`" for pk in escaped_columns]) - order_by_clause = ", ".join(['`{}`, ' for pk in escaped_columns]) - max_pk_values = singer.get_bookmark(state, catalog_entry.tap_stream_id, 'max_pk_values') @@ -119,12 +115,11 @@ def generate_pk_clause(catalog_entry, state): return sql - def sync_table(mysql_conn, catalog_entry, state, columns, stream_version): common.whitelist_bookmark_keys(generate_bookmark_keys(catalog_entry), catalog_entry.tap_stream_id, state) bookmark = state.get('bookmarks', {}).get(catalog_entry.tap_stream_id, {}) - version_exists = True if 'version' in bookmark else False + version_exists = 'version' in bookmark initial_full_table_complete = singer.get_bookmark(state, catalog_entry.tap_stream_id, @@ -156,9 +151,8 @@ def sync_table(mysql_conn, catalog_entry, state, columns, stream_version): catalog_entry.tap_stream_id, 'max_pk_values') or get_max_pk_values(cur, catalog_entry) - if not max_pk_values: - LOGGER.info("No max value for auto-incrementing PK found for table {}".format(catalog_entry.table)) + LOGGER.info("No max value for auto-incrementing PK found for table %s", catalog_entry.table) else: state = singer.write_bookmark(state, catalog_entry.tap_stream_id, diff --git a/tap_mysql/sync_strategies/incremental.py b/tap_mysql/sync_strategies/incremental.py index 66b9db5..f3958dc 100644 --- a/tap_mysql/sync_strategies/incremental.py +++ b/tap_mysql/sync_strategies/incremental.py @@ -1,11 +1,11 @@ #!/usr/bin/env python3 -# pylint: disable=duplicate-code +# pylint: disable=missing-function-docstring import pendulum import singer from singer import metadata -from tap_mysql.connection import connect_with_backoff, MySQLConnection +from tap_mysql.connection import connect_with_backoff import tap_mysql.sync_strategies.common as common LOGGER = singer.get_logger() @@ -58,9 +58,8 @@ def sync_table(mysql_conn, catalog_entry, state, columns): if catalog_entry.schema.properties[replication_key_metadata].format == 'date-time': replication_key_value = pendulum.parse(replication_key_value) - select_sql += ' WHERE `{}` >= %(replication_key_value)s ORDER BY `{}` ASC'.format( - replication_key_metadata, - replication_key_metadata) + select_sql += f" WHERE `{replication_key_metadata}` >= %(replication_key_value)s " \ + f"ORDER BY `{replication_key_metadata}` ASC" params['replication_key_value'] = replication_key_value elif replication_key_metadata is not None: diff --git a/tests/test_full_table_interruption.py b/tests/test_full_table_interruption.py index 4898bc0..63b7400 100644 --- a/tests/test_full_table_interruption.py +++ b/tests/test_full_table_interruption.py @@ -1,6 +1,3 @@ -import copy -import os -import pymysql import unittest import singer import singer.metadata @@ -19,12 +16,13 @@ TABLE_2_RECORD_COUNT = 0 # FOO BAR -TABLE_1_DATA = [[ 100, 'abc' ], - [ 200, 'def' ], - [ 300, 'ghi' ]] +TABLE_1_DATA = [[100, 'abc'], + [200, 'def'], + [300, 'ghi']] TABLE_2_DATA = TABLE_1_DATA[::-1] + def insert_record(conn, table_name, record): value_sql = ",".join(["%s" for i in range(len(record))]) @@ -32,9 +30,9 @@ def insert_record(conn, table_name, record): INSERT INTO {}.{} ( `foo`, `bar` ) VALUES ( {} )""".format( - test_utils.DB_NAME, - table_name, - value_sql) + test_utils.DB_NAME, + table_name, + value_sql) with connect_with_backoff(conn) as open_conn: with open_conn.cursor() as cur: @@ -44,7 +42,7 @@ def insert_record(conn, table_name, record): def singer_write_message_no_table_2(message): global TABLE_2_RECORD_COUNT - if isinstance(message, singer.RecordMessage) and message.stream == 'table_2': + if isinstance(message, singer.RecordMessage) and message.stream == 'tap_mysql_test-table_2': TABLE_2_RECORD_COUNT = TABLE_2_RECORD_COUNT + 1 if TABLE_2_RECORD_COUNT > 1: @@ -56,6 +54,7 @@ def singer_write_message_no_table_2(message): def singer_write_message_ok(message): SINGER_MESSAGES.append(message) + def init_tables(conn): with connect_with_backoff(conn) as open_conn: with open_conn.cursor() as cur: @@ -131,10 +130,10 @@ def test_table_2_interrupted(self): if isinstance(m, singer.RecordMessage)] self.assertEqual(record_messages_1, - [['table_1', {'id': 1, 'bar': 'abc', 'foo': 100}], - ['table_1', {'id': 2, 'bar': 'def', 'foo': 200}], - ['table_1', {'id': 3, 'bar': 'ghi', 'foo': 300}], - ['table_2', {'id': 1, 'bar': 'ghi', 'foo': 300}]]) + [['tap_mysql_test-table_1', {'id': 1, 'bar': 'abc', 'foo': 100}], + ['tap_mysql_test-table_1', {'id': 2, 'bar': 'def', 'foo': 200}], + ['tap_mysql_test-table_1', {'id': 3, 'bar': 'ghi', 'foo': 300}], + ['tap_mysql_test-table_2', {'id': 1, 'bar': 'ghi', 'foo': 300}]]) self.assertEqual(state['currently_syncing'], 'tap_mysql_test-table_2') @@ -169,11 +168,11 @@ def test_table_2_interrupted(self): if isinstance(m, singer.RecordMessage)] self.assertEqual(record_messages_2, - [['table_2', {'id': 2, 'bar': 'def', 'foo': 200}], - ['table_2', {'id': 3, 'bar': 'abc', 'foo': 100}], - ['table_1', {'id': 1, 'bar': 'abc', 'foo': 100}], - ['table_1', {'id': 2, 'bar': 'def', 'foo': 200}], - ['table_1', {'id': 3, 'bar': 'ghi', 'foo': 300}]]) + [['tap_mysql_test-table_2', {'id': 2, 'bar': 'def', 'foo': 200}], + ['tap_mysql_test-table_2', {'id': 3, 'bar': 'abc', 'foo': 100}], + ['tap_mysql_test-table_1', {'id': 1, 'bar': 'abc', 'foo': 100}], + ['tap_mysql_test-table_1', {'id': 2, 'bar': 'def', 'foo': 200}], + ['tap_mysql_test-table_1', {'id': 3, 'bar': 'ghi', 'foo': 300}]]) self.assertIsNone(state['currently_syncing']) @@ -194,14 +193,12 @@ def test_table_2_interrupted(self): self.assertIsNotNone(table_2_bookmark.get('log_file')) self.assertIsNotNone(table_2_bookmark.get('log_pos')) - - new_table_2_records = [[ 400, 'jkl' ], - [ 500, 'mno' ]] + new_table_2_records = [[400, 'jkl'], + [500, 'mno']] for record in new_table_2_records: insert_record(self.conn, 'table_2', record) - TABLE_2_RECORD_COUNT = 0 SINGER_MESSAGES.clear() tap_mysql.do_sync(self.conn, test_utils.get_db_config(), self.catalog, state) @@ -212,11 +209,11 @@ def test_table_2_interrupted(self): if isinstance(m, singer.RecordMessage)] self.assertEqual(record_messages_3, - [['table_1', {'id': 1, 'bar': 'abc', 'foo': 100}], - ['table_1', {'id': 2, 'bar': 'def', 'foo': 200}], - ['table_1', {'id': 3, 'bar': 'ghi', 'foo': 300}], - ['table_2', {'id': 4, 'bar': 'jkl', 'foo': 400}], - ['table_2', {'id': 5, 'bar': 'mno', 'foo': 500}]]) + [['tap_mysql_test-table_1', {'id': 1, 'bar': 'abc', 'foo': 100}], + ['tap_mysql_test-table_1', {'id': 2, 'bar': 'def', 'foo': 200}], + ['tap_mysql_test-table_1', {'id': 3, 'bar': 'ghi', 'foo': 300}], + ['tap_mysql_test-table_2', {'id': 4, 'bar': 'jkl', 'foo': 400}], + ['tap_mysql_test-table_2', {'id': 5, 'bar': 'mno', 'foo': 500}]]) self.assertIsNone(state['currently_syncing']) @@ -274,29 +271,15 @@ def test_table_2_interrupted(self): if isinstance(m, singer.RecordMessage)] self.assertEqual(record_messages_1, - [['table_1', {'id': 1, 'bar': 'abc', 'foo': 100}], - ['table_1', {'id': 2, 'bar': 'def', 'foo': 200}], - ['table_1', {'id': 3, 'bar': 'ghi', 'foo': 300}], - ['table_2', {'id': 1, 'bar': 'ghi', 'foo': 300}] - ]) - - expected_state_1 = { - 'currently_syncing': 'tap_mysql_test-table_2', - 'bookmarks': { - 'tap_mysql_test-table_2': { - 'last_pk_fetched': {'id': 1}, - 'max_pk_values': {'id': 3} - }, - 'tap_mysql_test-table_1': { - 'initial_full_table_complete': True - } - } - } + [['tap_mysql_test-table_1', {'id': 1, 'bar': 'abc', 'foo': 100}], + ['tap_mysql_test-table_1', {'id': 2, 'bar': 'def', 'foo': 200}], + ['tap_mysql_test-table_1', {'id': 3, 'bar': 'ghi', 'foo': 300}], + ['tap_mysql_test-table_2', {'id': 1, 'bar': 'ghi', 'foo': 300}] + ]) failed_syncing_table_2 = False singer.write_message = singer_write_message_ok - TABLE_2_RECORD_COUNT = 0 SINGER_MESSAGES.clear() tap_mysql.do_sync(self.conn, {}, self.catalog, state) @@ -305,11 +288,11 @@ def test_table_2_interrupted(self): record_messages_2 = [[m.stream, m.record] for m in SINGER_MESSAGES if isinstance(m, singer.RecordMessage)] self.assertEqual(record_messages_2, - [['table_2', {'id': 2, 'bar': 'def', 'foo': 200}], - ['table_2', {'id': 3, 'bar': 'abc', 'foo': 100}], - ['table_1', {'id': 1, 'bar': 'abc', 'foo': 100}], - ['table_1', {'id': 2, 'bar': 'def', 'foo': 200}], - ['table_1', {'id': 3, 'bar': 'ghi', 'foo': 300}]]) + [['tap_mysql_test-table_2', {'id': 2, 'bar': 'def', 'foo': 200}], + ['tap_mysql_test-table_2', {'id': 3, 'bar': 'abc', 'foo': 100}], + ['tap_mysql_test-table_1', {'id': 1, 'bar': 'abc', 'foo': 100}], + ['tap_mysql_test-table_1', {'id': 2, 'bar': 'def', 'foo': 200}], + ['tap_mysql_test-table_1', {'id': 3, 'bar': 'ghi', 'foo': 300}]]) expected_state_2 = { 'currently_syncing': None, @@ -326,7 +309,7 @@ def test_table_2_interrupted(self): self.assertEqual(state, expected_state_2) -if __name__== "__main__": +if __name__ == "__main__": test1 = BinlogInterruption() test1.setUp() test1.test_table_2_interrupted() diff --git a/tests/test_tap_mysql.py b/tests/test_tap_mysql.py index 8881623..2704440 100644 --- a/tests/test_tap_mysql.py +++ b/tests/test_tap_mysql.py @@ -1,10 +1,9 @@ import unittest -import pymysql -import tap_mysql -import copy + import singer -import os import singer.metadata + +import tap_mysql from tap_mysql.connection import connect_with_backoff try: @@ -15,25 +14,20 @@ import tap_mysql.sync_strategies.binlog as binlog import tap_mysql.sync_strategies.common as common -from pymysqlreplication import BinLogStreamReader -from pymysqlreplication.event import RotateEvent -from pymysqlreplication.row_event import ( - DeleteRowsEvent, - UpdateRowsEvent, - WriteRowsEvent, - ) - from singer.schema import Schema LOGGER = singer.get_logger() SINGER_MESSAGES = [] + def accumulate_singer_messages(message): SINGER_MESSAGES.append(message) + singer.write_message = accumulate_singer_messages + class TestTypeMapping(unittest.TestCase): @classmethod @@ -246,6 +240,7 @@ def runTest(self): set(['a', 'c']), 'Keep automatic as well as selected, available columns.') + class TestSchemaMessages(unittest.TestCase): def runTest(self): @@ -281,6 +276,7 @@ def runTest(self): self.assertEqual(schema_message.schema['properties'].keys(), set(expectedKeys)) + def currently_syncing_seq(messages): return ''.join( [(m.value.get('currently_syncing', '_') or '_')[-1] @@ -288,6 +284,7 @@ def currently_syncing_seq(messages): if isinstance(m, singer.StateMessage)] ) + class TestCurrentStream(unittest.TestCase): def setUp(self): @@ -322,7 +319,7 @@ def test_emit_currently_syncing(self): SINGER_MESSAGES.clear() tap_mysql.do_sync(self.conn, {}, self.catalog, state) - self.assertRegexpMatches(currently_syncing_seq(SINGER_MESSAGES), '^a+b+c+_+') + self.assertRegex(currently_syncing_seq(SINGER_MESSAGES), '^a+b+c+_+') def test_start_at_currently_syncing(self): state = { @@ -341,7 +338,8 @@ def test_start_at_currently_syncing(self): SINGER_MESSAGES.clear() tap_mysql.do_sync(self.conn, {}, self.catalog, state) - self.assertRegexpMatches(currently_syncing_seq(SINGER_MESSAGES), '^b+c+a+_+') + self.assertRegex(currently_syncing_seq(SINGER_MESSAGES), '^b+c+a+_+') + def message_types_and_versions(messages): message_types = [] @@ -406,7 +404,6 @@ def test_with_no_initial_full_table_complete_in_state(self): (message_types, versions) = message_types_and_versions(SINGER_MESSAGES) - self.assertEqual(['RecordMessage', 'ActivateVersionMessage'], message_types) self.assertEqual(versions, [12345, 12345]) @@ -480,10 +477,10 @@ def setUp(self): stream.metadata = [ {'breadcrumb': (), 'metadata': { - 'selected': True, + 'selected': True, 'table-key-properties': [], - 'database-name': 'tap_mysql_test' - }}, + 'database-name': 'tap_mysql_test' + }}, {'breadcrumb': ('properties', 'val'), 'metadata': {'selected': True}} ] @@ -513,7 +510,6 @@ def test_with_no_state(self): self.assertTrue(isinstance(versions[0], int)) self.assertEqual(versions[0], versions[1]) - def test_with_state(self): state = { 'bookmarks': { @@ -570,6 +566,8 @@ def test_change_replication_key(self): tap_mysql.do_sync(self.conn, {}, self.catalog, state) + print(state) + self.assertEqual(state['bookmarks']['tap_mysql_test-incremental']['replication_key'], 'val') self.assertEqual(state['bookmarks']['tap_mysql_test-incremental']['replication_key_value'], 3) self.assertEqual(state['bookmarks']['tap_mysql_test-incremental']['version'], 1) @@ -589,6 +587,7 @@ def test_version_not_cleared_from_state_after_incremental_success(self): self.assertEqual(state['bookmarks']['tap_mysql_test-incremental']['version'], 1) + class TestBinlogReplication(unittest.TestCase): def setUp(self): @@ -650,7 +649,7 @@ def setUp(self): def test_initial_full_table(self): state = {} - expected_log_file, expected_log_pos = binlog.fetch_current_log_file_and_pos(self.conn) + binlog.fetch_current_log_file_and_pos(self.conn) global SINGER_MESSAGES SINGER_MESSAGES.clear() @@ -676,15 +675,13 @@ def test_initial_full_table(self): singer.StateMessage]) activate_version_message_1 = list(filter( - lambda m: isinstance(m, singer.ActivateVersionMessage) and m.stream == 'binlog_1', + lambda m: isinstance(m, singer.ActivateVersionMessage) and m.stream == 'tap_mysql_test-binlog_1', SINGER_MESSAGES))[0] activate_version_message_2 = list(filter( - lambda m: isinstance(m, singer.ActivateVersionMessage) and m.stream == 'binlog_2', + lambda m: isinstance(m, singer.ActivateVersionMessage) and m.stream == 'tap_mysql_test-binlog_2', SINGER_MESSAGES))[0] - record_messages = list(filter(lambda m: isinstance(m, singer.RecordMessage), SINGER_MESSAGES)) - self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_1', 'log_file')) self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_1', 'log_pos')) @@ -706,7 +703,8 @@ def test_fail_on_view(self): failed = False exception_message = None - expected_exception_message = "Unable to replicate stream({}) with binlog because it is a view.".format(self.catalog.streams[0].stream) + expected_exception_message = "Unable to replicate stream(tap_mysql_test-{}) with binlog because it is a view.".format( + self.catalog.streams[0].stream) try: tap_mysql.do_sync(self.conn, {}, self.catalog, state) @@ -718,7 +716,6 @@ def test_fail_on_view(self): self.assertTrue(failed) self.assertEqual(expected_exception_message, exception_message) - def test_fail_if_log_file_does_not_exist(self): log_file = 'chicken' stream = self.catalog.streams[0] @@ -737,16 +734,14 @@ def test_fail_if_log_file_does_not_exist(self): expected_exception_message = "Unable to replicate stream({}) with binlog because log file {} does not exist.".format( stream, log_file - ) + ) try: tap_mysql.do_sync(self.conn, {}, self.catalog, state) except Exception as e: - failed = True exception_message = str(e) LOGGER.error(exception_message) - def test_binlog_stream(self): global SINGER_MESSAGES SINGER_MESSAGES.clear() @@ -774,16 +769,16 @@ def test_binlog_stream(self): singer.RecordMessage, singer.StateMessage]) - self.assertEqual([('binlog_1', 1, '2017-06-01T00:00:00+00:00', False), - ('binlog_1', 2, '2017-06-20T00:00:00+00:00', False), - ('binlog_1', 3, '2017-09-22T00:00:00+00:00', False), - ('binlog_2', 1, '2017-10-22T00:00:00+00:00', False), - ('binlog_2', 2, '2017-11-10T00:00:00+00:00', False), - ('binlog_2', 3, '2017-12-10T00:00:00+00:00', False), - ('binlog_1', 3, '2018-06-18T00:00:00+00:00', False), - ('binlog_2', 2, '2018-06-18T00:00:00+00:00', False), - ('binlog_1', 2, '2017-06-20T00:00:00+00:00', True), - ('binlog_2', 1, '2017-10-22T00:00:00+00:00', True)], + self.assertEqual([('tap_mysql_test-binlog_1', 1, '2017-06-01T00:00:00+00:00', False), + ('tap_mysql_test-binlog_1', 2, '2017-06-20T00:00:00+00:00', False), + ('tap_mysql_test-binlog_1', 3, '2017-09-22T00:00:00+00:00', False), + ('tap_mysql_test-binlog_2', 1, '2017-10-22T00:00:00+00:00', False), + ('tap_mysql_test-binlog_2', 2, '2017-11-10T00:00:00+00:00', False), + ('tap_mysql_test-binlog_2', 3, '2017-12-10T00:00:00+00:00', False), + ('tap_mysql_test-binlog_1', 3, '2018-06-18T00:00:00+00:00', False), + ('tap_mysql_test-binlog_2', 2, '2018-06-18T00:00:00+00:00', False), + ('tap_mysql_test-binlog_1', 2, '2017-06-20T00:00:00+00:00', True), + ('tap_mysql_test-binlog_2', 1, '2017-10-22T00:00:00+00:00', True)], [(m.stream, m.record['id'], m.record['updated'], @@ -840,6 +835,7 @@ def test_do_not_discover_key_properties_for_view(self): {'a_table': ['id'], 'a_view': None}) + class TestEscaping(unittest.TestCase): def setUp(self): @@ -876,17 +872,19 @@ def runTest(self): self.assertTrue(isinstance(record_message, singer.RecordMessage)) self.assertEqual(record_message.record, {'b c': 1}) -class TestUnsupportedPK(unittest.TestCase): + +class TestSupportedPK(unittest.TestCase): def setUp(self): self.conn = test_utils.get_test_connection() with connect_with_backoff(self.conn) as open_conn: with open_conn.cursor() as cursor: - cursor.execute('CREATE TABLE bad_pk_tab (bad_pk BINARY, age INT, PRIMARY KEY (bad_pk))') # BINARY not presently supported - cursor.execute('CREATE TABLE good_pk_tab (good_pk INT, age INT, PRIMARY KEY (good_pk))') - cursor.execute("INSERT INTO bad_pk_tab (bad_pk, age) VALUES ('a', 100)") - cursor.execute("INSERT INTO good_pk_tab (good_pk, age) VALUES (1, 100)") + cursor.execute( + # BINARY is presently supported + 'CREATE TABLE good_pk_tab (good_pk BINARY, age INT, PRIMARY KEY (good_pk))') + + cursor.execute("INSERT INTO good_pk_tab (good_pk, age) VALUES (BINARY('a'), 20), (BINARY('b'), 30)") def runTest(self): catalog = test_utils.discover_catalog(self.conn, {}) @@ -895,10 +893,10 @@ def runTest(self): for c in catalog.streams: primary_keys[c.table] = singer.metadata.to_map(c.metadata).get((), {}).get('table-key-properties') - self.assertEqual(primary_keys, {'good_pk_tab': ['good_pk'], 'bad_pk_tab': []}) + self.assertEqual(primary_keys, {'good_pk_tab': ['good_pk']}) -if __name__== "__main__": +if __name__ == "__main__": test1 = TestBinlogReplication() test1.setUp() test1.test_binlog_stream() diff --git a/tests/utils.py b/tests/utils.py index d13a1c2..002bc03 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -9,10 +9,10 @@ def get_db_config(): config = {} - config['host'] = os.environ.get('TAP_MYSQL_HOST') - config['port'] = int(os.environ.get('TAP_MYSQL_PORT')) - config['user'] = os.environ.get('TAP_MYSQL_USER') - config['password'] = os.environ.get('TAP_MYSQL_PASSWORD') + config['host'] = os.environ['TAP_MYSQL_HOST'] + config['port'] = int(os.environ['TAP_MYSQL_PORT']) + config['user'] = os.environ['TAP_MYSQL_USER'] + config['password'] = os.environ['TAP_MYSQL_PASSWORD'] config['charset'] = 'utf8' if not config['password']: del config['password']