-
Notifications
You must be signed in to change notification settings - Fork 30
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* test setup for base suite * Freshdesk check job test creation * Created config.yml file * Freshdesk bookmarks updates started * Updates to bookmarks test, added assertions for 5 streams * Updates from review * Update base test suite phase 3 (base, bookmarks, paginations, start date) * Enhanced failure handling for missing streams, clean up * Updates from review * Add JIRA ticket info Co-authored-by: kspeer <[email protected]> Co-authored-by: btowles <[email protected]>
- Loading branch information
1 parent
3bb1a92
commit dc64f26
Showing
6 changed files
with
529 additions
and
14 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,250 @@ | ||
import re | ||
import os | ||
import pytz | ||
import time | ||
import dateutil.parser | ||
|
||
from datetime import timedelta | ||
from datetime import datetime | ||
|
||
from tap_tester import menagerie, connections, runner | ||
|
||
from base import FreshdeskBaseTest | ||
|
||
|
||
class FreshdeskBookmarks(FreshdeskBaseTest): | ||
"""Test incremental replication via bookmarks (without CRUD).""" | ||
|
||
start_date = "" | ||
test_streams = {} | ||
|
||
@staticmethod | ||
def name(): | ||
return "tt_freshdesk_bookmarks" | ||
|
||
def get_properties(self): | ||
return_value = { | ||
'start_date': '2019-01-04T00:00:00Z', # start date includes roles | ||
} | ||
|
||
self.start_date = return_value['start_date'] | ||
return return_value | ||
|
||
def calculated_states_by_stream(self, current_state): | ||
""" | ||
Look at the bookmarks from a previous sync and set a new bookmark | ||
value based off timedelta expectations. This ensures the subsequent sync will replicate | ||
at least 1 record but, fewer records than the previous sync. | ||
Sufficient test data is required for this test to cover a given stream. | ||
An incremental replication stream must have at least two records with | ||
replication keys that differ by some time span. | ||
If the test data is changed in the future this may break expectations for this test. | ||
""" | ||
bookmark_streams = self.test_streams - {'conversations'} | ||
print("bookmark_streams: {}".format(bookmark_streams)) | ||
|
||
timedelta_by_stream = {stream: [0, 12, 0] # {stream_name: [days, hours, minutes], ...} | ||
for stream in bookmark_streams} | ||
#timedelta_by_stream['tickets'] = [698, 17, 26] # original conversations math, must update | ||
# TODO Add time_entries, satisfaction_ratings streams (403) | ||
|
||
# BUG https://jira.talendforge.org/browse/TDL-17559. Redefining state to be closer to | ||
# expected format so the underlying code wont have to change as much after the JIRA fix | ||
current_state = {'bookmarks': current_state} | ||
del current_state['bookmarks']['tickets_deleted'] # Delete unexpected streams | ||
del current_state['bookmarks']['tickets_spam'] # generated by filter? | ||
|
||
# Keep existing format for this method so it will work after bug fix | ||
stream_to_calculated_state = {stream: "" for stream in bookmark_streams} | ||
for stream, state_value in current_state['bookmarks'].items(): | ||
|
||
if stream in bookmark_streams: | ||
state_as_datetime = dateutil.parser.parse(state_value) | ||
|
||
days, hours, minutes = timedelta_by_stream[stream] | ||
calculated_state_as_datetime = state_as_datetime - timedelta(days=days, hours=hours, minutes=minutes) | ||
|
||
state_format = self.BOOKMARK_FORMAT | ||
calculated_state_formatted = datetime.strftime(calculated_state_as_datetime, state_format) | ||
if calculated_state_formatted < self.start_date: | ||
raise RuntimeError("Time delta error for stream {}, sim start_date < start_date!".format(stream)) | ||
stream_to_calculated_state[stream] = calculated_state_formatted | ||
|
||
return stream_to_calculated_state | ||
|
||
def test_run(self): | ||
"""A Bookmarks Test""" | ||
# Since this tap has no table and field selection all streams will sync every time. So define | ||
# the subset of streams to run assertions against until all streams are covered | ||
self.test_streams = {'tickets', 'companies', 'agents', 'groups', 'roles', 'conversations'} | ||
|
||
expected_replication_keys = self.expected_replication_keys() | ||
expected_replication_methods = self.expected_replication_method() | ||
|
||
########################################################################## | ||
### First Sync | ||
########################################################################## | ||
|
||
conn_id = connections.ensure_connection(self) | ||
|
||
# Run in check mode | ||
check_job_name = self.run_and_verify_check_mode(conn_id) | ||
|
||
# Run a sync job using orchestrator | ||
first_sync_record_count = self.run_and_verify_sync(conn_id) | ||
first_sync_messages = runner.get_records_from_target_output() | ||
first_sync_bookmarks = menagerie.get_state(conn_id) | ||
|
||
# Update based on sync data | ||
first_sync_empty = self.test_streams - first_sync_messages.keys() | ||
if len(first_sync_empty) > 0: | ||
print("Missing stream(s): {} in sync 1. Failing test for stream(s)".format(first_sync_empty)) | ||
self.first_sync_empty = first_sync_empty | ||
first_sync_bonus = first_sync_messages.keys() - self.test_streams | ||
if len(first_sync_bonus) > 0: | ||
print("Found stream: {} in first sync. Add to test_streams?".format(first_sync_bonus)) | ||
|
||
########################################################################## | ||
### Update State Between Syncs | ||
########################################################################## | ||
|
||
#new_states = {'bookmarks': dict()} # BUG TDL-17559 | ||
simulated_states = self.calculated_states_by_stream(first_sync_bookmarks) | ||
# for stream, new_state in simulated_states.items(): # BUG TDL-17559 | ||
# new_states['bookmarks'][stream] = new_state # Save expected format | ||
# menagerie.set_state(conn_id, new_states) | ||
menagerie.set_state(conn_id, simulated_states) | ||
|
||
########################################################################## | ||
### Second Sync | ||
########################################################################## | ||
|
||
second_sync_record_count = self.run_and_verify_sync(conn_id) | ||
second_sync_messages = runner.get_records_from_target_output() | ||
second_sync_bookmarks = menagerie.get_state(conn_id) | ||
|
||
# Update based on sync data | ||
second_sync_empty = self.test_streams - second_sync_messages.keys() | ||
if len(second_sync_empty) > 0: | ||
print("Missing stream(s): {} in sync 2. Failing test. Check test data!"\ | ||
.format(second_sync_empty)) | ||
self.second_sync_empty = second_sync_empty | ||
second_sync_bonus = second_sync_messages.keys() - self.test_streams | ||
if len(second_sync_bonus) > 0: | ||
print("Found stream(s): {} in second sync. Add to test_streams?".format(second_sync_bonus)) | ||
|
||
########################################################################## | ||
### Test By Stream | ||
########################################################################## | ||
|
||
for stream in self.test_streams: # Add supported streams 1 by 1 | ||
with self.subTest(stream=stream): | ||
|
||
# Assert failures for streams not present in first sync (loss of coverage) | ||
if stream in self.first_sync_empty: | ||
self.assertTrue(False, msg="Stream: {} no longer in sync 1. Check test data".format(stream)) | ||
|
||
continue | ||
|
||
# Assert failures for streams present in first sync but not second sync | ||
if stream in self.second_sync_empty: | ||
if stream == 'conversations': | ||
print("Commented out failing test case for stream: {}".format(stream)) | ||
print("See https://jira.talendforge.org/browse/TDL-17738 for details") | ||
# conversations is a child of tickets. When the child object (conversation / note) | ||
# is updated, the parent object (ticket) is also updated. The ticket is then being | ||
# sync'd after update but the child conversation of that updated ticket is not | ||
# (at least when the actual note text was updated. There are several ways to update | ||
# the child). | ||
#self.assertTrue(False, msg="Stream: {} present in sync 1, missing in sync 2!".format(stream)) | ||
|
||
continue | ||
|
||
self.assertTrue(False, msg="Stream: {} present in sync 1, missing in sync 2!".format(stream)) | ||
|
||
continue | ||
|
||
# expected values | ||
expected_replication_method = expected_replication_methods[stream] | ||
|
||
# collect information for assertions from syncs 1 & 2 base on expected values | ||
first_sync_count = first_sync_record_count.get(stream, 0) | ||
second_sync_count = second_sync_record_count.get(stream, 0) | ||
first_sync_records = [record.get('data') for record in | ||
first_sync_messages.get(stream).get('messages') | ||
if record.get('action') == 'upsert'] | ||
second_sync_records = [record.get('data') for record in | ||
second_sync_messages.get(stream).get('messages') | ||
if record.get('action') == 'upsert'] | ||
if stream != 'conversations': # conversations has no bookmark | ||
first_bookmark_value = first_sync_bookmarks.get(stream) | ||
second_bookmark_value = second_sync_bookmarks.get(stream) | ||
|
||
if expected_replication_method == self.INCREMENTAL: | ||
|
||
# collect information specific to incremental streams from syncs 1 & 2 | ||
replication_key = next(iter(expected_replication_keys[stream])) | ||
if stream != 'conversations': # conversations has no bookmark | ||
simulated_bookmark_value = simulated_states[stream] | ||
|
||
if stream == 'conversations': | ||
print("*** Only checking sync counts for stream: {}".format(stream)) | ||
# TODO discuss re-factor to use tickets bookmark for conversations assertions | ||
# Verify the number of records in the 2nd sync is less then the first | ||
self.assertLessEqual(second_sync_count, first_sync_count) | ||
if second_sync_count == first_sync_count: | ||
print("WARN: first_sync_count == second_sync_count for stream: {}".format(stream)) | ||
|
||
continue | ||
|
||
# Verify the first sync sets a bookmark of the expected form | ||
self.assertIsNotNone(first_bookmark_value) | ||
|
||
# Verify the second sync sets a bookmark of the expected form | ||
self.assertIsNotNone(second_bookmark_value) | ||
|
||
# Verify the second sync bookmark is Equal to the first sync bookmark | ||
# assumes no changes to data during test | ||
self.assertEqual(second_bookmark_value, first_bookmark_value) | ||
|
||
# Verify the number of records in the 2nd sync is less then the first | ||
if stream == 'roles': | ||
self.assertEqual(second_sync_count, first_sync_count) | ||
print("WARN: Less covereage, unable to update records for stream: {}".format(stream)) | ||
else: | ||
self.assertLess(second_sync_count, first_sync_count) | ||
|
||
# Verify the bookmark is the max value sent to the target for a given replication key. | ||
rec_time = [] | ||
for record in first_sync_records: | ||
rec_time += record['updated_at'], | ||
|
||
rec_time.sort() | ||
self.assertEqual(rec_time[-1], first_bookmark_value) | ||
|
||
rec_time = [] | ||
for record in second_sync_records: | ||
rec_time += record['updated_at'], | ||
|
||
rec_time.sort() | ||
self.assertEqual(rec_time[-1], second_bookmark_value) | ||
|
||
# Verify all replication key values in sync 2 are >= the simulated bookmark value. | ||
for record in second_sync_records: | ||
self.assertTrue(record['updated_at'] >= simulated_states[stream], | ||
msg="record time cannot be less than bookmark time" | ||
) | ||
|
||
|
||
# No full table streams for freshdesk as of Jan 31 2022 | ||
else: | ||
|
||
raise NotImplementedError( | ||
"INVALID EXPECTATIONS\t\tSTREAM: {} REPLICATION_METHOD: {}".format(stream, expected_replication_method) | ||
) | ||
|
||
|
||
# Verify at least 1 record was replicated in the second sync | ||
self.assertGreater(second_sync_count, 0, msg="We are not fully testing bookmarking for {}".format(stream)) |
Oops, something went wrong.