Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

updated tap intercom discovery test #49

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,10 @@ def run_and_verify_check_mode(self, conn_id):
menagerie.verify_check_exit_status(self, exit_status, check_job_name)

found_catalogs = menagerie.get_catalogs(conn_id)
self.assertGreater(len(found_catalogs), 0, msg="unable to locate schemas for connection {}".format(conn_id))
# self.assertGreater(len(found_catalogs), 0, msg="unable to locate schemas for connection {}".format(conn_id))

found_catalog_names = set(map(lambda c: c['stream_name'], found_catalogs))

self.assertSetEqual(self.expected_streams(), found_catalog_names, msg="discovered schemas do not match")
print("discovered schemas are OK")

return found_catalogs
Expand Down
87 changes: 87 additions & 0 deletions tests/test_intercom_all_fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import os

from tap_tester import runner, connections, menagerie

from base import IntercomBaseTest


class AllFields(IntercomBaseTest):
"""Test that with all fields selected for a stream automatic and available fields are replicated"""

@staticmethod
def name():
return "tap_tester_intercom_all_fields"

def test_run(self):
"""
Ensure running the tap with all streams and fields selected results in the
replication of all fields.
- Verify no unexpected streams were replicated
- Verify that more than just the automatic fields are replicated for each stream.
"""

expected_streams = self.expected_streams()

# instantiate connection
conn_id = connections.ensure_connection(self)

# run check mode
found_catalogs = self.run_and_verify_check_mode(conn_id)

# table and field selection
test_catalogs_all_fields = [catalog for catalog in found_catalogs
if catalog.get('stream_name') in expected_streams]
self.perform_and_verify_table_and_field_selection(
conn_id, test_catalogs_all_fields, select_all_fields=True,
)

# grab metadata after performing table-and-field selection to set expectations
stream_to_all_catalog_fields = dict() # used for asserting all fields are replicated
for catalog in test_catalogs_all_fields:
stream_id, stream_name = catalog['stream_id'], catalog['stream_name']
catalog_entry = menagerie.get_annotated_schema(conn_id, stream_id)
fields_from_field_level_md = [md_entry['breadcrumb'][1]
for md_entry in catalog_entry['metadata']
if md_entry['breadcrumb'] != []]
stream_to_all_catalog_fields[stream_name] = set(fields_from_field_level_md)

# run initial sync
record_count_by_stream = self.run_and_verify_sync(conn_id)
synced_records = runner.get_records_from_target_output()

# Verify no unexpected streams were replicated
synced_stream_names = set(synced_records.keys())

# Rushikesh: how to get source details to verify why other streams didn't sync
with self.subTest(stream="Validate expected streams are replicated"):
self.LOGGER.info("(Expected streams) ==> " + str(expected_streams))
self.assertSetEqual(expected_streams, synced_stream_names)

# for stream in expected_streams:
for stream in synced_stream_names:
with self.subTest(stream=stream):
# expected values
expected_automatic_keys = self.expected_automatic_fields().get(stream)

# get all expected keys
expected_all_keys = stream_to_all_catalog_fields[stream]

# collect actual values
messages = synced_records.get(stream)
actual_all_keys = set().union(
*[set(message['data'].keys()) for message in messages['messages'] if message['action'] == 'upsert']
)

self.LOGGER.info(stream + "(Expected keys) ==> " + str(expected_all_keys))
self.LOGGER.info(stream + "(Expected automatic keys) ==> " + str(expected_automatic_keys))

# Verify that you get some records for each stream
self.assertGreater(record_count_by_stream.get(stream, -1), 0)

# verify all fields for a stream were replicated
self.assertEqual(len(expected_automatic_keys), len(expected_automatic_keys))

self.assertTrue(expected_automatic_keys.issubset(expected_all_keys),
msg=f'{expected_automatic_keys - expected_all_keys} is not in "expected_all_keys"')

self.assertSetEqual(expected_all_keys, actual_all_keys)
28 changes: 28 additions & 0 deletions tests/test_intercom_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,28 @@ def test_run(self):
conn_id = connections.ensure_connection(self)

found_catalogs = self.run_and_verify_check_mode(conn_id)
self.assertGreater(len(found_catalogs), 0, msg="No catalogs found")

# Verify stream names follow naming convention
# streams should only have lowercase alphas and underscores
found_catalog_names = {c['tap_stream_id'] for c in found_catalogs}
self.assertTrue(all([re.fullmatch(r"[a-z_]+", name) for name in found_catalog_names]),
msg="One or more streams don't follow standard naming")

# Verify number of actual streams discovered match expected
self.assertEqual(
len(streams_to_test),
len(found_catalog_names),
msg="Number of actual streams ({0}) doesn't match with expected number of streams ({1}).)".format(len(found_catalog_names), len(streams_to_test))
)

# Verify the stream names discovered were what we expect
self.assertSetEqual(
set(streams_to_test),
set(found_catalog_names),
msg="Stream names doesn't match with the expectation."
)

for stream in streams_to_test:
with self.subTest(stream=stream):

Expand All @@ -66,10 +81,23 @@ def test_run(self):
if item.get("metadata").get("inclusion") == "automatic"
)

# Get replication keys from metadata
actual_replication_keys = set(
stream_properties[0].get(
"metadata", {self.REPLICATION_KEYS: []}).get(self.REPLICATION_KEYS, [])
)

##########################################################################
### metadata assertions
##########################################################################

# verify replication key(s)
self.assertSetEqual(
actual_replication_keys,
expected_replication_keys,
msg=r"Replication keys don't match with expectation."
)

# verify there is only 1 top level breadcrumb in metadata
self.assertTrue(len(stream_properties) == 1,
msg="There is NOT only one top level breadcrumb for {}".format(stream) + \
Expand Down