Skip to content

Commit

Permalink
[AP-1927] added new argument for selecting replication method to sync…
Browse files Browse the repository at this point in the history
… tables (#1189)

* added new arg for selecting replication method to sync tables

* update document

* fix for publish_docs.sh

* fix doc

* fix doc
  • Loading branch information
amofakhar authored Nov 18, 2024
1 parent 4895d9f commit dbfe2f8
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 9 deletions.
1 change: 1 addition & 0 deletions dev-project/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ services:
--tlsMode requireTLS
--tlsAllowConnectionsWithoutCertificates
--tlsCertificateKeyFile /etc/ssl/mongodb.pem
--tlsAllowInvalidHostnames
--tlsCAFile /etc/ssl/rootCA.pem
--quiet
networks:
Expand Down
2 changes: 1 addition & 1 deletion dev-project/mongo/create-pipelinewise-user.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
echo 'CREATE MONGODB PIPELINEWISE USER'

mongo --tls --tlsAllowInvalidCertificates -u $MONGO_INITDB_ROOT_USERNAME -p $MONGO_INITDB_ROOT_PASSWORD --authenticationDatabase admin admin <<EOF
mongo --tls --tlsAllowInvalidCertificates --tlsAllowInvalidHostnames -u $MONGO_INITDB_ROOT_USERNAME -p $MONGO_INITDB_ROOT_PASSWORD --authenticationDatabase admin admin <<EOF
db.getName();
db.createUser({
user: $(jq --arg 'user' $MONGO_USERNAME --null-input '$user'),
Expand Down
4 changes: 4 additions & 0 deletions docs/user_guide/resync.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ add the ``--tables`` argument:
table_mb: <integer/float>
.. attention::

There is an option to chose tables for re-sync which has a specific replication method by ``--replication_method_only <name of replication method>``

$ pipelinewise sync_tables --target <target_id> --tap <tap_id> --replication_method_only log_based



Expand Down
2 changes: 2 additions & 0 deletions pipelinewise/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ def main():
parser.add_argument('--force', default=False, required=False,
help='Force sync_tables for full sync', action='store_true'
)
parser.add_argument('--replication_method_only', default='*', type=str,
help='Sync only tables which their replication method is as entered value')

args = parser.parse_args()

Expand Down
16 changes: 10 additions & 6 deletions pipelinewise/cli/pipelinewise.py
Original file line number Diff line number Diff line change
Expand Up @@ -1397,7 +1397,9 @@ def do_sync_tables(self, fastsync_stream_ids=None):
else:
tables_to_sync = self.args.tables

selected_tables = self._get_sync_tables_setting_from_selection_file(tables_to_sync)
selected_tables = self._get_sync_tables_setting_from_selection_file(
tables_to_sync, self.args.replication_method_only)

processes_list = []
if selected_tables['partial_sync']:
self._reset_state_file_for_partial_sync(selected_tables)
Expand Down Expand Up @@ -2098,7 +2100,8 @@ def _clean_tables_from_bookmarks_in_state_file(state_file_to_clean: str, tables:
def _get_fixed_name_of_table(stream_id):
return stream_id.replace('-', '.', 1)

def _get_sync_tables_setting_from_selection_file(self, tables):
def _get_sync_tables_setting_from_selection_file(self, tables, replication_method_only='*'):
replication_method = replication_method_only.upper()
selection = utils.load_json(self.tap['files']['selection'])
selection = selection.get('selection')
all_tables = {'full_sync': [], 'partial_sync': {}}
Expand All @@ -2107,10 +2110,11 @@ def _get_sync_tables_setting_from_selection_file(self, tables):
for table in selection:
table_name = self._get_fixed_name_of_table(table['tap_stream_id'])
if tables_list is None or table_name in tables_list:
if table.get('sync_start_from'):
all_tables['partial_sync'][table_name] = table['sync_start_from']
else:
all_tables['full_sync'].append(table_name)
if replication_method in ['*', table.get('replication_method')]:
if table.get('sync_start_from'):
all_tables['partial_sync'][table_name] = table['sync_start_from']
else:
all_tables['full_sync'].append(table_name)
return all_tables

def __check_if_table_is_selected(self, table_in_properties):
Expand Down
2 changes: 1 addition & 1 deletion scripts/publish_docs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ if [ -z "$GH_TOKEN" ]; then
fi

# Install dependencies in a virtual env
python3.8 -m venv ~/venv-doc
python3 -m venv ~/venv-doc
. ~/venv-doc/bin/activate
pip install --upgrade pip
pip install sphinx sphinx-rtd-theme
Expand Down
4 changes: 3 additions & 1 deletion tests/units/cli/cli_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ def __init__(
extra_log=False,
debug=False,
profiler=False,
force=False
force=False,
replication_method_only='*'
):
self.target = target
self.tap = tap
Expand All @@ -36,6 +37,7 @@ def __init__(
self.debug = debug
self.profiler = profiler
self.force = force
self.replication_method_only = replication_method_only

# "log" Getters and setters
@property
Expand Down
30 changes: 30 additions & 0 deletions tests/units/cli/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1077,3 +1077,33 @@ def test_get_sync_tables_setting_from_selection_file_if_tables_parameter_is_none
'partial_sync': {'baz': 'PARTIAL', 'par': 'PARTIAL_par'}
}
assert actual_selected_tables == expected_selected_tables

def test_get_sync_tables_if_using_replication_method_only(self):
"""Test if the method for getting list of tables for syncing returns only tables with selected
replication method"""
tables = 'foo,foo_bar,foo_bar_baz,bar_baz,bar,par,par_foo'
with patch('pipelinewise.cli.pipelinewise.utils.load_json') as mocked_load_json:
self.pipelinewise.tap = {
'files': {
'selection': 'foo.json'
}
}
mocked_load_json.return_value = {
'selection': [
{'tap_stream_id': 'foo', 'replication_method': 'FULL_TABLE'},
{'tap_stream_id': 'foo_bar', 'replication_method': 'LOG_BASED'},
{'tap_stream_id': 'foo_bar_baz', 'replication_method': 'INCREMENTAL'},
{'tap_stream_id': 'bar_baz', 'replication_method': 'FULL_TABLE'},
{'tap_stream_id': 'bar', 'replication_method': 'LOG_BASED'},
{'tap_stream_id': 'par', 'sync_start_from': 'PARTIAL_par', 'replication_method': 'INCREMENTAL'},
{'tap_stream_id': 'par_foo', 'sync_start_from': 'PARTIAL_par_foo',
'replication_method': 'LOG_BASED'}
]
}
actual_selected_tables = self.pipelinewise._get_sync_tables_setting_from_selection_file(
tables, replication_method_only='log_based')
expected_selected_tables = {
'full_sync': ['foo_bar', 'bar'],
'partial_sync': {'par_foo': 'PARTIAL_par_foo'}
}
assert actual_selected_tables == expected_selected_tables

0 comments on commit dbfe2f8

Please sign in to comment.