diff --git a/.circleci/config.yml b/.circleci/config.yml index b7e033f..79c785a 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -2,7 +2,7 @@ version: 2 jobs: build: docker: - - image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:tap-tester-v4 + - image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:stitch-tap-tester steps: - checkout - run: @@ -11,7 +11,7 @@ jobs: python3 -mvenv /usr/local/share/virtualenvs/tap-pendo source /usr/local/share/virtualenvs/tap-pendo/bin/activate pip install -U pip setuptools - pip install .[dev] + pip install .[test] - run: name: 'JSON Validator' command: | @@ -23,20 +23,25 @@ jobs: source /usr/local/share/virtualenvs/tap-pendo/bin/activate # TODO: Adjust the pylint disables pylint tap_pendo --disable 'broad-except,chained-comparison,empty-docstring,fixme,invalid-name,line-too-long,missing-class-docstring,missing-function-docstring,missing-module-docstring,no-else-raise,no-else-return,too-few-public-methods,too-many-arguments,too-many-branches,too-many-lines,too-many-locals,ungrouped-imports,wrong-spelling-in-comment,wrong-spelling-in-docstring,bad-whitespace,missing-class-docstring' + - run: + name: 'Unit Tests' + command: | + source /usr/local/share/virtualenvs/tap-pendo/bin/activate + pip install coverage + nosetests --with-coverage --cover-erase --cover-package=tap_pendo --cover-html-dir=htmlcov tests/unittests + coverage html + - store_test_results: + path: test_output/report.xml + - store_artifacts: + path: htmlcov - add_ssh_keys - run: name: 'Integration Tests' command: | - aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/sandbox dev_env.sh + aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/tap_tester_sandbox dev_env.sh source dev_env.sh source /usr/local/share/virtualenvs/tap-tester/bin/activate - run-test --tap=tap-pendo \ - --target=target-stitch \ - --orchestrator=stitch-orchestrator \ - --email=harrison+sandboxtest@stitchdata.com \ - --password=$SANDBOX_PASSWORD \ - --client-id=50 \ - tests/tap_tester + run-test --tap=tap-pendo tests/tap_tester workflows: version: 2 commit: @@ -53,4 +58,4 @@ workflows: - master jobs: - build: - context: circleci-user + context: circleci-user \ No newline at end of file diff --git a/README.md b/README.md index e50c118..862fe51 100644 --- a/README.md +++ b/README.md @@ -245,6 +245,7 @@ Interrupted syncs for Event type stream are resumed via a bookmark placed during - `x_pendo_integration_key` (string, `ABCdef123`): an integration key from Pendo. - `period` (string, `ABCdef123`): `dayRange` or `hourRange` - `lookback_window` (integer): 10 (For event objects. Default: 0) + - `request_timeout` (integer): 300 (For passing timeout to the request. Default: 300) ```json { @@ -252,6 +253,7 @@ Interrupted syncs for Event type stream are resumed via a bookmark placed during "start_date": "2020-09-18T00:00:00Z", "period": "dayRange", "lookback_window": 10, + "request_timeout": 300, "include_anonymous_visitors: "true" } ``` diff --git a/setup.py b/setup.py index dcf47d4..b206e74 100755 --- a/setup.py +++ b/setup.py @@ -10,16 +10,19 @@ classifiers=["Programming Language :: Python :: 3 :: Only"], py_modules=["tap_pendo"], install_requires=[ - 'singer-python==5.2.1', + 'singer-python==5.12.2', "requests", 'pyhumps==1.3.1', - 'backoff==1.3.2', + 'backoff==1.8.0', 'ijson==3.1.4', ], extras_require={ - 'dev': [ - 'ipdb==0.11', + 'test': [ 'pylint==2.5.3', + 'nose' + ], + 'dev': [ + 'ipdb==0.11' ] }, entry_points=""" diff --git a/tap_pendo/__init__.py b/tap_pendo/__init__.py index e10080c..3badb0b 100644 --- a/tap_pendo/__init__.py +++ b/tap_pendo/__init__.py @@ -17,6 +17,7 @@ def do_discover(config): + # Discover schemas for all streams and dump catalog. Also validate the credentials provided in config.json LOGGER.info("Starting discover") catalog = {"streams": discover_streams(config)} json.dump(catalog, sys.stdout, indent=2) @@ -24,10 +25,12 @@ def do_discover(config): def stream_is_selected(mdata): + # Check stream selected or not from metadata return mdata.get((), {}).get('selected', False) def get_sub_stream_ids(): + # Return list of all the sub streams of the streams sub_stream_ids = [] for _, value in SUB_STREAMS.items(): sub_stream_ids.append(value) @@ -39,6 +42,7 @@ class DependencyException(Exception): def validate_dependencies(selected_stream_ids): + # Validate and raise exceptions if sub-streams are selected but related parents are not selected errs = [] msg_tmpl = ("Unable to extract {0} data. " "To receive {0} data, you also need to select {1}.") @@ -54,6 +58,7 @@ def validate_dependencies(selected_stream_ids): def populate_class_schemas(catalog, selected_stream_ids): + # Populate class schemas for all the streams selected in the catalog for stream in catalog.streams: if stream.tap_stream_id in selected_stream_ids: STREAMS[stream.tap_stream_id].stream = stream @@ -66,6 +71,7 @@ def get_abs_path(path): def load_schemas(): + # Open and read JSON schemas of streams and return a dictionary of schemas schemas = {} for filename in os.listdir(get_abs_path('schemas')): @@ -78,6 +84,7 @@ def load_schemas(): def get_selected_streams(catalog): + # Return list of all the selected streams in catalog selected_stream_ids = [] for stream in catalog.streams: mdata = metadata.to_map(stream.metadata) @@ -90,9 +97,9 @@ def sync(config, state, catalog): LOGGER.info("Starting with state %s", state) start_date = config['start_date'] - selected_stream_ids = get_selected_streams(catalog) - validate_dependencies(selected_stream_ids) - populate_class_schemas(catalog, selected_stream_ids) + selected_stream_ids = get_selected_streams(catalog) # Get list of selected streams from catalog + validate_dependencies(selected_stream_ids) # Validate parent-child streams dependencies + populate_class_schemas(catalog, selected_stream_ids) # Populate schemas for selected streams all_sub_stream_ids = get_sub_stream_ids() # Loop over streams in catalog @@ -106,13 +113,16 @@ def sync(config, state, catalog): LOGGER.info('START Syncing: %s', stream_id) update_currently_syncing(state, stream_id) + # Write schema of streams to STDOUT key_properties = metadata.get(mdata, (), 'table-key-properties') singer.write_schema( stream_id, stream.schema.to_dict(), key_properties) sub_stream_ids = SUB_STREAMS.get(stream_id) + # Populate class schemas and write a schema for the selected substreams of the stream if sub_stream_ids: + # Loop over sub-streams of current stream for sub_stream_id in sub_stream_ids: if sub_stream_id not in selected_stream_ids: continue @@ -128,7 +138,7 @@ def sync(config, state, catalog): continue LOGGER.info("Stream %s: Starting sync", stream_id) - instance = STREAMS[stream_id](config) + instance = STREAMS[stream_id](config) # Intialize class for selected stream counter_value = 0 if instance.replication_method == "INCREMENTAL": diff --git a/tap_pendo/discover.py b/tap_pendo/discover.py index 8c45dc7..61ae1f5 100644 --- a/tap_pendo/discover.py +++ b/tap_pendo/discover.py @@ -46,10 +46,12 @@ def get_abs_path(path): def get_custom_fields(instance): + # Return custom fields to add into a catalog for provided instance(stream) return instance.get_fields().get('custom') def get_schema_property_type(schema_type): + # Return dictionary of types to add into catalog based on provided data type if schema_type == 'string': return {"type": ["null", "string"]} elif schema_type == 'time': @@ -67,12 +69,13 @@ def get_schema_property_type(schema_type): def build_metadata_metadata(mdata, schema, custom_fields): + # Build metadata with custom fields for streams 'metadata_accounts' and 'metadata_visitors' if 'custom' not in schema['properties']: schema['properties']['custom'] = {} schema['properties']['custom']['type'] = ["null", "object"] schema['properties']['custom']['additional_properties'] = "false" - for key, _ in custom_fields.items(): schema['properties']['custom']['properties'] = {} + for key, _ in custom_fields.items(): schema['properties']['custom']['properties'][key] = {} schema['properties']['custom']['properties'][key]['type'] = [ "null", "object" @@ -86,6 +89,7 @@ def build_metadata_metadata(mdata, schema, custom_fields): def build_account_visitor_metadata(mdata, schema, custom_fields): + # Build metadata with custom fields for streams 'accounts' and 'visitors' if 'metadata_custom' not in schema['properties']: schema['properties']['metadata_custom'] = {} schema['properties']['metadata_custom']['type'] = ["null", "object"] @@ -102,6 +106,7 @@ def build_account_visitor_metadata(mdata, schema, custom_fields): def discover_streams(config): + # Discover schemas, build metadata for all the steams and return catalog streams = [] LOGGER.info("Discovering custom fields for Accounts") @@ -116,9 +121,10 @@ def discover_streams(config): s = s(config) - schema = s.load_schema() + schema = s.load_schema() # load schema for the stream mdata = metadata.to_map(s.load_metadata()) + # additionally, build metadata for custom fields for below streams if s.name == 'accounts': build_account_visitor_metadata(mdata, schema, custom_account_fields) diff --git a/tap_pendo/streams.py b/tap_pendo/streams.py index fea0a73..e74165c 100644 --- a/tap_pendo/streams.py +++ b/tap_pendo/streams.py @@ -14,6 +14,7 @@ import singer import singer.metrics as metrics from requests.exceptions import HTTPError +from requests.models import ProtocolError from singer import Transformer, metadata from singer.utils import now, strftime, strptime_to_utc from tap_pendo import utils as tap_pendo_utils @@ -21,144 +22,14 @@ KEY_PROPERTIES = ['id'] BASE_URL = "https://app.pendo.io" -endpoints = { - "account": { - "method": "GET", - "endpoint": "/api/v1/account/{accountId}" - }, - "accounts": { - "method": "POST", - "endpoint": "/api/v1/aggregation", - "data": { - "response": { - "mimeType": "application/json" - }, - "request": { - "name": "all-accounts", - "pipeline": [{ - "source": { - "accounts": "null" - } - }], - "requestId": "all-accounts" - } - } - }, - "features": { - "method": "POST", - "endpoint": "/api/v1/aggregation", - }, - "guide_events": { - "method": "POST", - "endpoint": "/api/v1/aggregation", - }, - "feature_events": { - "method": "POST", - "endpoint": "/api/v1/aggregation", - "data": { - "response": { - "mimeType": "application/json" - }, - "request": { - "pipeline": [{ - "source": { - "featureEvents": { - "featureId": "{featureId}" - }, - "timeSeries": { - "period": "dayRange", - "first": 1598920967000, - "last": "now()" - } - } - }] - } - } - }, - "guides": { - "method": "POST", - "endpoint": "/api/v1/aggregation", - }, - "metadata_accounts": { - "method": "GET", - "endpoint": "/api/v1/metadata/schema/account" - }, - "metadata_visitors": { - "method": "GET", - "endpoint": "/api/v1/metadata/schema/visitor" - }, - "events": { - "method": "POST", - "endpoint": "/api/v1/aggregation", - }, - "pages": { - "method": "POST", - "endpoint": "/api/v1/aggregation", - }, - "page_events": { - "method": "POST", - "endpoint": "/api/v1/aggregation", - }, - "poll_events": { - "method": "POST", - "endpoint": "/api/v1/aggregation", - }, - "reports": { - "method": "GET", - "endpoint": "/api/v1/report" - }, - "visitor": { - "method": "GET", - "endpoint": "/api/v1/visitor/{visitorId}" - }, - "visitors": { - "method": "POST", - "endpoint": "/api/v1/aggregation" - - }, - "visitor_history": { - "method": "GET", - "endpoint": "/api/v1/visitor/{visitorId}/history", - "headers": { - 'content-type': 'application/x-www-form-urlencoded' - }, - "params": { - "starttime": "start_time" - } - }, - "track_types": { - "method": "POST", - "endpoint": "/api/v1/aggregation" - }, - "track_events": { - "method": "POST", - "endpoint": "/api/v1/aggregation" - } -} - LOGGER = singer.get_logger() session = requests.Session() - +# timeout request after 300 seconds +REQUEST_TIMEOUT = 300 def get_abs_path(path): return os.path.join(os.path.dirname(os.path.realpath(__file__)), path) - -def get_url(endpoint, **kwargs): - return BASE_URL + endpoints[endpoint]['endpoint'].format(**kwargs) - - -def get_method(endpoint): - return endpoints[endpoint]['method'] - - -def get_headers(endpoint): - return endpoints[endpoint].get('headers', {}) - - -def get_params(endpoint): - return endpoints[endpoint].get('params', {}) - # Determine absolute start and end times w/ attribution_window constraint # abs_start/end and window_start/end must be rounded to nearest hour or day (granularity) # Graph API enforces max history of 28 days @@ -177,6 +48,7 @@ def get_absolute_start_end_time(last_dttm): def round_times(start=None, end=None): + # Return start and end time with rounded time(removed hours local) start_rounded = None end_rounded = None # Round min_start, max_end to hours or dates @@ -186,6 +58,7 @@ def round_times(start=None, end=None): def remove_hours_local(dttm): + # Remove hours local from provided datetime new_dttm = dttm.replace(hour=0, minute=0, second=0, microsecond=0) return new_dttm @@ -206,21 +79,51 @@ def update_currently_syncing(state, stream_name): class Server42xRateLimitError(Exception): pass + +class Endpoints(): + endpoint = "" + method = "" + headers = {} + params = {} + + def __init__(self, endpoint, method, headers=None, params=None): + self.endpoint = endpoint + self.method = method + self.headers = headers + self.params = params + + def get_url(self, **kwargs): + """ + Concatenate and format the dynamic values to the BASE_URL + """ + return BASE_URL + self.endpoint.format(**kwargs) + + class Stream(): + """ + Base Stream class that works as a parent for child stream classes. + """ name = None replication_method = None replication_key = None key_properties = KEY_PROPERTIES stream = None - method = "GET" period = None + # initialized the endpoint attribute which can be overriden by child streams based on + # the different parameters used by the stream. + endpoint = Endpoints("/api/v1/aggregation", "POST") def __init__(self, config=None): self.config = config def send_request_get_results(self, req): - resp = session.send(req) + # Set request timeout to config param `request_timeout` value. + # If value is 0,"0", "" or None then it will set default to default to 300.0 seconds if not passed in config. + config_request_timeout = self.config.get('request_timeout') + request_timeout = config_request_timeout and float(config_request_timeout) or REQUEST_TIMEOUT # pylint: disable=consider-using-ternary + resp = session.send(req, timeout=request_timeout) + # Sleep for provided time and retry if rate limit exceeded if 'Too Many Requests' in resp.reason: retry_after = 30 LOGGER.info("Rate limit reached. Sleeping for %s seconds", @@ -228,18 +131,24 @@ def send_request_get_results(self, req): time.sleep(retry_after) raise Server42xRateLimitError(resp.reason) - resp.raise_for_status() + resp.raise_for_status() # Check for requests status and raise exception in failure dec = humps.decamelize(resp.json()) return dec + # backoff for Timeout error is already included in "requests.exceptions.RequestException" + # as it is the parent class of "Timeout" error @backoff.on_exception(backoff.expo, (requests.exceptions.RequestException, Server42xRateLimitError), max_tries=5, giveup=lambda e: e.response is not None and 400 <= e. response.status_code < 500, factor=2) + @backoff.on_exception(backoff.expo, (ConnectionError, ProtocolError), # backoff error + max_tries=5, + factor=2) @tap_pendo_utils.ratelimit(1, 2) def request(self, endpoint, params=None, **kwargs): + # Set requests headers, url, methods, params and extra provided arguments # params = params or {} headers = { 'x-pendo-integration-key': self.config['x_pendo_integration_key'], @@ -247,13 +156,13 @@ def request(self, endpoint, params=None, **kwargs): } request_kwargs = { - 'url': get_url(endpoint, **kwargs), - 'method': get_method(endpoint), + 'url': self.endpoint.get_url(**kwargs), + 'method': self.endpoint.method, 'headers': headers, 'params': params } - headers = get_headers(endpoint) + headers = self.endpoint.headers if headers: request_kwargs['headers'].update(headers) @@ -263,7 +172,7 @@ def request(self, endpoint, params=None, **kwargs): if kwargs.get('json'): request_kwargs['json'] = kwargs.get('json') - req = requests.Request(**request_kwargs).prepare() + req = requests.Request(**request_kwargs).prepare() # Prepare request LOGGER.info("%s %s %s", request_kwargs['method'], request_kwargs['url'], request_kwargs['params']) @@ -271,10 +180,13 @@ def request(self, endpoint, params=None, **kwargs): return self.send_request_get_results(req) def get_bookmark(self, state, stream, default, key=None): + # Return default value if no bookmark present in state for provided stream if (state is None) or ('bookmarks' not in state): return default + # Initialize bookmark if not present if not state.get('bookmarks').get(stream): state['bookmarks'][stream] = {} + # Look for bookmark with key for stream if key: return (state.get('bookmarks', {}).get(stream, {}).get(key, default)) @@ -298,6 +210,7 @@ def update_bookmark(self, def load_shared_schema_refs(self): + # Load and return dictionary of referenced schemas from 'schemas/shared' shared_schemas_path = get_abs_path('schemas/shared') shared_file_names = [ @@ -313,6 +226,7 @@ def load_shared_schema_refs(self): return shared_schema_refs def resolve_schema_references(self, schema, key, refs): + # Resolve $ref of schemas with provided referenced schema if isinstance(schema, dict): for k, v in schema.items(): if isinstance(v, (dict, list)): @@ -323,7 +237,7 @@ def resolve_schema_references(self, schema, key, refs): def load_schema(self): - refs = self.load_shared_schema_refs() + refs = self.load_shared_schema_refs() # Load references scheamas schema_file = "schemas/{}.json".format(self.name) with open(get_abs_path(schema_file)) as f: @@ -338,17 +252,30 @@ def load_metadata(self): schema = self.load_schema() mdata = metadata.new() + # Write key properties and replication method to metadata mdata = metadata.write(mdata, (), 'table-key-properties', self.key_properties) mdata = metadata.write(mdata, (), 'forced-replication-method', self.replication_method) + # Write replication key to metadata if self.replication_key: + # for a certain stream like "features, track_types, pages, guides" + # the replication key in schema is "last_updated_at" and in class variable + # of stream it is "lastUpdatedAt" so rather than updating the replication key + # value in the class variable used "humps.decamelize" for backward compatibility + # as for previous syncs the value in the bookmark will contain "lastUpdatedAt" mdata = metadata.write(mdata, (), 'valid-replication-keys', - [self.replication_key]) + [humps.decamelize(self.replication_key)]) + # Make inclusion automatic for the key properties and replication keys for field_name in schema['properties'].keys(): - if field_name in self.key_properties or field_name == self.replication_key: + # for a certain stream like "features, track_types, pages, guides" + # the replication key in schema is "last_updated_at" and in class variable + # of stream it is "lastUpdatedAt" so rather than updating the replication key + # value in the class variable used "humps.decamelize" for backward compatibility + # as for previous syncs the value in the bookmark will contain "lastUpdatedAt" + if field_name in self.key_properties or field_name == humps.decamelize(self.replication_key): mdata = metadata.write(mdata, ('properties', field_name), 'inclusion', 'automatic') else: @@ -356,7 +283,7 @@ def load_metadata(self): 'inclusion', 'available') # For period stream adjust schema for time period - if self.replication_key == 'day' or self.replication_key == 'hour': + if self.replication_key in ('day', 'hour'): if hasattr(self, 'period') and self.period == 'hourRange': mdata.pop(('properties', 'day')) elif hasattr(self, 'period') and self.period == 'dayRange': @@ -371,6 +298,7 @@ def transform(self, record): return humps.decamelize(record) def sync_substream(self, state, parent, sub_stream, parent_response): + # Get bookmark from state or start date for the stream bookmark_date = self.get_bookmark(state, sub_stream.name, self.config.get('start_date'), sub_stream.replication_key) @@ -399,24 +327,30 @@ def sync_substream(self, state, parent, sub_stream, parent_response): break i += 1 + # Loop over records of parent stream for record in parent_response: try: + # Initialize counter and transformer with specific datetime format with metrics.record_counter( sub_stream.name) as counter, Transformer( integer_datetime_fmt= "unix-milliseconds-integer-datetime-parsing" ) as transformer: - stream_events = sub_stream.sync(state, new_bookmark, + # syncing child streams from start date or state file date + stream_events = sub_stream.sync(state, bookmark_dttm, record.get(parent.key_properties[0])) + # Loop over data of sub-stream for event in stream_events: counter.increment() + # Get metadata for the stream to use in transform schema_dict = sub_stream.stream.schema.to_dict() stream_metadata = metadata.to_map( sub_stream.stream.metadata) transformed_event = sub_stream.transform(event) + # Transform record as per field selection in metadata try: transformed_record = transformer.transform( transformed_event, schema_dict, @@ -430,10 +364,12 @@ def sync_substream(self, state, parent, sub_stream, parent_response): indent=2)) raise err - event_time = strptime_to_utc( - transformed_record.get(sub_stream.replication_key)) + # Check for replication_value from record and if value found then use it for updating bookmark + replication_value = transformed_record.get(sub_stream.replication_key) + if replication_value: + event_time = strptime_to_utc(replication_value) + new_bookmark = max(new_bookmark, event_time) - new_bookmark = max(new_bookmark, event_time) singer.write_record(sub_stream.stream.tap_stream_id, transformed_record) @@ -454,11 +390,13 @@ def sync_substream(self, state, parent, sub_stream, parent_response): def sync(self, state, start_date=None, key_id=None): stream_response = self.request(self.name, json=self.get_body())['results'] or [] + # Get and intialize sub-stream for the current stream if STREAMS.get(SUB_STREAMS.get(self.name)): sub_stream = STREAMS.get(SUB_STREAMS.get(self.name))(self.config) else: sub_stream = None + # Sync substream if the current stream has sub-stream and selected in the catalog if stream_response and sub_stream and sub_stream.is_selected(): self.sync_substream(state, self, sub_stream, stream_response) @@ -466,6 +404,7 @@ def sync(self, state, start_date=None, key_id=None): return (self.stream, stream_response) def lookback_window(self): + # Get lookback window from config and verify value lookback_window = self.config.get('lookback_window') or '0' if not lookback_window.isdigit(): raise TypeError("lookback_window '{}' is not numeric. Check your configuration".format(lookback_window)) @@ -473,7 +412,11 @@ def lookback_window(self): class LazyAggregationStream(Stream): def send_request_get_results(self, req): - with session.send(req, stream=True) as resp: + # Set request timeout to config param `request_timeout` value. + # If value is 0,"0", "" or None then it will set default to default to 300.0 seconds if not passed in config. + config_request_timeout = self.config.get('request_timeout') + request_timeout = config_request_timeout and float(config_request_timeout) or REQUEST_TIMEOUT # pylint: disable=consider-using-ternary + with session.send(req, stream=True, timeout=request_timeout) as resp: if 'Too Many Requests' in resp.reason: retry_after = 30 LOGGER.info("Rate limit reached. Sleeping for %s seconds", @@ -481,19 +424,25 @@ def send_request_get_results(self, req): time.sleep(retry_after) raise Server42xRateLimitError(resp.reason) - resp.raise_for_status() + resp.raise_for_status() # Check for requests status and raise exception in failure + # Return list of records instead of yielding because more than one iteration occur over data in tap flow + # and yield will return generator which flushes out after one iteration. + to_return = [] for item in ijson.items(resp.raw, 'results.item'): - yield humps.decamelize(item) + to_return.append(humps.decamelize(item)) + return to_return def sync(self, state, start_date=None, key_id=None): stream_response = self.request(self.name, json=self.get_body()) or [] + # Get and intialize sub-stream for the current stream if STREAMS.get(SUB_STREAMS.get(self.name)): sub_stream = STREAMS.get(SUB_STREAMS.get(self.name))(self.config) else: sub_stream = None + # Sync substream if the current stream has sub-stream and selected in the catalog if stream_response and sub_stream and sub_stream.is_selected(): self.sync_substream(state, self, sub_stream, stream_response) @@ -514,6 +463,7 @@ def __init__(self, config): def sync(self, state, start_date=None, key_id=None): update_currently_syncing(state, self.name) + # Calculate lookback window lookback = start_date - timedelta( days=self.lookback_window()) ts = int(lookback.timestamp()) * 1000 @@ -531,7 +481,6 @@ class Accounts(Stream): replication_method = "INCREMENTAL" replication_key = "lastupdated" key_properties = ["account_id"] - method = "POST" def get_body(self): return { @@ -551,6 +500,7 @@ def get_body(self): } def transform(self, record): + # Transform data of accounts into one level dictionary with following transformation record['lastupdated'] = record.get('metadata').get('auto').get( 'lastupdated') transformed = record @@ -637,10 +587,12 @@ def sync(self, state, start_date=None, key_id=None): self.replication_key) bookmark_dttm = strptime_to_utc(bookmark_date) + # Set lookback window lookback = bookmark_dttm - timedelta( days=self.lookback_window()) ts = int(lookback.timestamp()) * 1000 + # Get period type from config and make request for event's data period = self.config.get('period') body = self.get_body(period, ts) events = self.request(self.name, json=body) or [] @@ -707,15 +659,18 @@ def get_body(self, period, first): def sync(self, state, start_date=None, key_id=None): update_currently_syncing(state, self.name) + # Get bookmark from state or start date for the stream bookmark_date = self.get_bookmark(state, self.name, self.config.get('start_date'), self.replication_key) bookmark_dttm = strptime_to_utc(bookmark_date) + # Set lookback window lookback = bookmark_dttm - timedelta( days=self.lookback_window()) ts = int(lookback.timestamp()) * 1000 + # Get period type from config and make request for event's data period = self.config.get('period') body = self.get_body(period, ts) events = self.request(self.name, json=body).get('results') or [] @@ -778,7 +733,7 @@ def get_body(self, key_id, period, first): "period": period, "first": first, "last": "now()" - } + } } }, { @@ -898,6 +853,8 @@ class Reports(Stream): name = "reports" replication_method = "INCREMENTAL" replication_key = "lastUpdatedAt" + # the endpoint attribute overriden and re-initialized with different endpoint URL and method + endpoint = Endpoints("/api/v1/report", "GET") def sync(self, state, start_date=None, key_id=None): reports = self.request(self.name) @@ -908,6 +865,8 @@ def sync(self, state, start_date=None, key_id=None): class MetadataVisitor(Stream): name = "metadata_visitor" replication_method = "FULL_TABLE" + # the endpoint attribute overriden and re-initialized with different endpoint URL and method + endpoint = Endpoints("/api/v1/metadata/schema/visitor", "GET") def sync(self, state, start_date=None, key_id=None): reports = self.request(self.name) @@ -921,6 +880,16 @@ class VisitorHistory(Stream): replication_key = "modified_ts" key_properties = ['visitor_id'] DATE_WINDOW_SIZE = 1 + headers = { + 'content-type': 'application/x-www-form-urlencoded' + } + params = { + "starttime": "start_time" + } + # the endpoint attribute overriden and re-initialized with different endpoint URL, method, headers and params + # the visitorId parameter will be formatted in the get_url() function of the endpoints class + endpoint = Endpoints( + "/api/v1/visitor/{visitorId}/history", "GET", headers, params) def get_params(self, start_time): return {"starttime": start_time} @@ -928,15 +897,14 @@ def get_params(self, start_time): def sync(self, state, start_date=None, key_id=None): update_currently_syncing(state, self.name) - bookmark_date = self.get_bookmark(state, self.name, - self.config.get('start_date'), - self.replication_key) - bookmark_dttm = strptime_to_utc(bookmark_date) - - abs_start, abs_end = get_absolute_start_end_time(bookmark_dttm) + # using "start_date" that is passed and not using the bookmark + # value stored in the state file, as it will be updated after + # every sync of child stream for parent stream + abs_start, abs_end = get_absolute_start_end_time(start_date) lookback = abs_start - timedelta(days=self.lookback_window()) window_next = lookback + # Get data with sliding window upto abs_end while window_next <= abs_end: ts = int(window_next.timestamp()) * 1000 params = self.get_params(start_time=ts) @@ -946,7 +914,7 @@ def sync(self, state, start_date=None, key_id=None): for visitor in visitor_history: visitor['visitorId'] = key_id yield visitor - window_next = window_next + timedelta(days=self.DATE_WINDOW_SIZE) + window_next = window_next + timedelta(days=self.DATE_WINDOW_SIZE) # Update window for next call def transform(self, record): max_value = max(record.get('ts', 0), record.get('last_ts', 0)) @@ -959,10 +927,6 @@ class Visitors(LazyAggregationStream): replication_method = "INCREMENTAL" replication_key = "lastupdated" key_properties = ["visitor_id"] - method = "POST" - - def get_endpoint(self): - return "/api/v1/aggregation" def get_body(self): include_anonymous_visitors = bool(self.config.get('include_anonymous_visitors', 'false').lower() == 'true') @@ -988,6 +952,7 @@ def get_body(self): } def transform(self, record): + # Transform data of accounts into one level dictionary with following transformation record['lastupdated'] = record.get('metadata').get('auto').get( 'lastupdated') transformed = record @@ -1003,6 +968,8 @@ class MetadataAccounts(Stream): name = "metadata_accounts" replication_method = "FULL_TABLE" key_properties = [] + # the endpoint attribute overriden and re-initialized with different endpoint URL and method + endpoint = Endpoints("/api/v1/metadata/schema/account", "GET") def get_body(self): return None @@ -1010,11 +977,13 @@ def get_body(self): def sync(self, state, start_date=None, key_id=None): stream_response = self.request(self.name, json=self.get_body()) + # Get and intialize sub-stream for the current stream if STREAMS.get(SUB_STREAMS.get(self.name)): sub_stream = STREAMS.get(SUB_STREAMS.get(self.name))(self.config) else: sub_stream = None + # Sync substream if the current stream has sub-stream and selected in the catalog if stream_response and sub_stream and sub_stream.is_selected(): self.sync_substream(state, self, sub_stream, stream_response) @@ -1024,10 +993,13 @@ def sync(self, state, start_date=None, key_id=None): def get_fields(self): return self.request(self.name, json=self.get_body()) + class MetadataVisitors(Stream): name = "metadata_visitors" replication_method = "FULL_TABLE" key_properties = [] + # the endpoint attribute overriden and re-initialized with different endpoint URL and method + endpoint = Endpoints("/api/v1/metadata/schema/visitor", "GET") def get_body(self): return None @@ -1035,11 +1007,13 @@ def get_body(self): def sync(self, state, start_date=None, key_id=None): stream_response = self.request(self.name, json=self.get_body()) + # Get and intialize sub-stream for the current stream if STREAMS.get(SUB_STREAMS.get(self.name)): sub_stream = STREAMS.get(SUB_STREAMS.get(self.name))(self.config) else: sub_stream = None + # Sync substream if the current stream has sub-stream and selected in the catalog if stream_response and sub_stream and sub_stream.is_selected(): self.sync_substream(state, self, sub_stream, stream_response) diff --git a/tap_pendo/sync.py b/tap_pendo/sync.py index a772fbf..818b702 100644 --- a/tap_pendo/sync.py +++ b/tap_pendo/sync.py @@ -13,6 +13,7 @@ def sync_stream(state, start_date, instance): stream = instance.stream + # Get bookmark from state or start date for the stream bookmark_date = instance.get_bookmark(state, instance.name, start_date, instance.replication_key) bookmark_dttm = strptime_to_utc(bookmark_date) @@ -21,6 +22,7 @@ def sync_stream(state, start_date, instance): with metrics.record_counter(stream.tap_stream_id) as counter, Transformer( integer_datetime_fmt="unix-milliseconds-integer-datetime-parsing" ) as transformer: + # Get records for the stream (stream, records) = instance.sync(state) for record in records: schema_dict = stream.schema.to_dict() @@ -28,6 +30,7 @@ def sync_stream(state, start_date, instance): transformed_record = instance.transform(record) + # Transform record as per field selection in metadata try: transformed_record = transformer.transform( transformed_record, schema_dict, stream_metadata) @@ -38,18 +41,23 @@ def sync_stream(state, start_date, instance): LOGGER.error('Transform failed for %s', record) raise err - record_timestamp = strptime_to_utc( - transformed_record.get( - humps.decamelize(instance.replication_key))) - new_bookmark = max(new_bookmark, record_timestamp) + # Check for replication_value from record and if value found then use it for updating bookmark + replication_value = transformed_record.get( + humps.decamelize(instance.replication_key)) + if replication_value: + record_timestamp = strptime_to_utc(replication_value) + new_bookmark = max(new_bookmark, record_timestamp) - if record_timestamp > bookmark_dttm: - singer.write_record(stream.tap_stream_id, transformed_record) - counter.increment() - else: + if record_timestamp > bookmark_dttm: + singer.write_record(stream.tap_stream_id, transformed_record) + counter.increment() + + else: # No replication_value found then write record without considering for bookmark singer.write_record(stream.tap_stream_id, transformed_record) + LOGGER.info('Replication Value NULL for tap_stream_id: %s', stream.tap_stream_id) counter.increment() + # Update bookmark and write state for the stream with new_bookmark instance.update_bookmark(state, instance.name, strftime(new_bookmark), instance.replication_key) singer.write_state(state) @@ -63,6 +71,7 @@ def sync_full_table(state, instance): with metrics.record_counter(stream.tap_stream_id) as counter, Transformer( integer_datetime_fmt="unix-milliseconds-integer-datetime-parsing" ) as transformer: + # Get records for the stream (stream, records) = instance.sync(state) for record in records: schema_dict = stream.schema.to_dict() @@ -70,6 +79,7 @@ def sync_full_table(state, instance): transformed_record = instance.transform(record) + # Transform record as per field selection in metadata try: transformed_record = transformer.transform( transformed_record, schema_dict, stream_metadata) @@ -82,4 +92,5 @@ def sync_full_table(state, instance): singer.write_record(stream.tap_stream_id, transformed_record) counter.increment() - return counter.value + # return the count of records synced + return counter.value diff --git a/tap_pendo/utils.py b/tap_pendo/utils.py index bc3bb9e..0923390 100644 --- a/tap_pendo/utils.py +++ b/tap_pendo/utils.py @@ -18,6 +18,7 @@ def strftime(dt): def ratelimit(limit, every): + # Limit on numbers of call in 'limit' time by sleeping for required time def limitdecorator(fn): times = collections.deque() @@ -39,6 +40,7 @@ def wrapper(*args, **kwargs): def chunk(l, n): + # Return provided list into chunk of size n for i in range(0, len(l), n): yield l[i:i + n] @@ -63,21 +65,24 @@ def update_state(state, entity, dt): if isinstance(dt, datetime.datetime): dt = strftime(dt) + # Add entity in state if not found if entity not in state: state[entity] = dt + # Update state if provided datetime is greater than existing one if dt >= state[entity]: state[entity] = dt def parse_args(required_config_keys): + # Parse command line arguments parser = argparse.ArgumentParser() parser.add_argument('-c', '--config', help='Config file', required=True) parser.add_argument('-s', '--state', help='State file') args = parser.parse_args() config = load_json(args.config) - check_config(config, required_config_keys) + check_config(config, required_config_keys) # Check config for missing fields if args.state: state = load_json(args.state) @@ -88,6 +93,7 @@ def parse_args(required_config_keys): def check_config(config, required_keys): + # Verify that all the required keys are present in config missing_keys = [key for key in required_keys if key not in config] if missing_keys: raise Exception( diff --git a/tests/tap_tester/base.py b/tests/tap_tester/base.py new file mode 100644 index 0000000..5d65360 --- /dev/null +++ b/tests/tap_tester/base.py @@ -0,0 +1,387 @@ +import os +import unittest +from datetime import datetime as dt +from datetime import timedelta + +import dateutil.parser +import pytz + +import tap_tester.connections as connections +import tap_tester.runner as runner +from tap_tester import menagerie + + +class TestPendoBase(unittest.TestCase): + + REPLICATION_KEYS = "valid-replication-keys" + PRIMARY_KEYS = "table-key-properties" + FOREIGN_KEYS = "table-foreign-key-properties" + REPLICATION_METHOD = "forced-replication-method" + INCREMENTAL = "INCREMENTAL" + FULL_TABLE = "FULL_TABLE" + START_DATE_FORMAT = "%Y-%m-%dT00:00:00Z" + BOOKMARK_COMPARISON_FORMAT = "%Y-%m-%dT%H:%M%S%z" + start_date = "" + + @staticmethod + def name(): + return "test_sync" + + @staticmethod + def tap_name(): + """The name of the tap""" + return "tap-pendo" + + @staticmethod + def get_type(): + """the expected url route ending""" + return "platform.pendo" + + def expected_metadata(self): + """The expected streams and metadata about the streams""" + return { + "accounts": { + self.PRIMARY_KEYS: {'account_id'}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {'lastupdated'} + }, + "features": { + self.PRIMARY_KEYS: {'id'}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {'last_updated_at'} + }, + "guides": { + self.PRIMARY_KEYS: {'id'}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {'last_updated_at'} + }, + "pages": { + self.PRIMARY_KEYS: {'id'}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {'last_updated_at'} + }, + # Add back when visitor_history stream causing this test to take + # 4+ hours is solved, tracked in this JIRA: + # https://stitchdata.atlassian.net/browse/SRCE-4755 + # "visitor_history": { + # self.PRIMARY_KEYS: {'visitor_id'}, + # self.REPLICATION_METHOD: self.INCREMENTAL, + # self.REPLICATION_KEYS: {'modified_ts'} + # }, + "visitors": { + self.PRIMARY_KEYS: {'visitor_id'}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {'lastupdated'} + }, + "track_types": { + self.PRIMARY_KEYS: {'id'}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {'last_updated_at'} + }, + "feature_events":{ + self.PRIMARY_KEYS: {"visitor_id", "account_id", "server", "remote_ip"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {'day'} + }, + "events": { + self.PRIMARY_KEYS: {"visitor_id", "account_id", "server", "remote_ip"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {'day'} + }, + "page_events": { + self.PRIMARY_KEYS: {"visitor_id", "account_id", "server", "remote_ip"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {'day'} + }, + "guide_events": { + self.PRIMARY_KEYS: {"visitor_id", "account_id", "server_name", "remote_ip"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {'browser_time'} + }, + "poll_events":{ + self.PRIMARY_KEYS: {"visitor_id", "account_id", "server_name", "remote_ip"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {'browser_time'} + }, + "track_events": { + self.PRIMARY_KEYS: {"visitor_id", "account_id", "server", "remote_ip"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {'day'} + }, + "metadata_accounts": { + self.REPLICATION_METHOD: self.FULL_TABLE, + }, + "metadata_visitors": { + self.REPLICATION_METHOD: self.FULL_TABLE, + }, + } + + def setUp(self): + missing_envs = [x for x in [ + "TAP_PENDO_INTEGRATION_KEY", + ] if os.getenv(x) is None] + + if missing_envs: + raise Exception("Missing environment variables: {}".format(missing_envs)) + + @staticmethod + def get_credentials(): + """Authentication information for the test account""" + return { + "x_pendo_integration_key": os.getenv("TAP_PENDO_INTEGRATION_KEY") + } + + def get_properties(self, original: bool = True): + """Configuration properties required for the tap.""" + return_value = { + "start_date": "2020-09-10T00:00:00Z", + "lookback_window": "1", + "period": "dayRange", + } + if original: + return return_value + + return_value["start_date"] = self.start_date + return return_value + + + def expected_streams(self): + """A set of expected stream names""" + + return set(self.expected_metadata().keys()) + + def expected_pks(self): + """return a dictionary with key of table name and value as a set of primary key fields""" + return {table: properties.get(self.PRIMARY_KEYS, set()) + for table, properties + in self.expected_metadata().items()} + + def expected_replication_keys(self): + """return a dictionary with key of table name and value as a set of replication key fields""" + return {table: properties.get(self.REPLICATION_KEYS, set()) + for table, properties + in self.expected_metadata().items()} + + def expected_replication_method(self): + """return a dictionary with key of table name nd value of replication method""" + return {table: properties.get(self.REPLICATION_METHOD, None) + for table, properties + in self.expected_metadata().items()} + + def expected_automatic_fields(self): + """return a dictionary with key of table name and value as a set of automatic key fields""" + auto_fields = {} + for k, v in self.expected_metadata().items(): + + auto_fields[k] = v.get(self.PRIMARY_KEYS, set()) | v.get(self.REPLICATION_KEYS, set()) \ + | v.get(self.FOREIGN_KEYS, set()) + return auto_fields + + + ######################### + # Helper Methods # + ######################### + + def run_and_verify_check_mode(self, conn_id): + """ + Run the tap in check mode and verify it succeeds. + This should be ran prior to field selection and initial sync. + Return the connection id and found catalogs from menagerie. + """ + # run in check mode + check_job_name = runner.run_check_mode(self, conn_id) + + # verify check exit codes + exit_status = menagerie.get_exit_status(conn_id, check_job_name) + menagerie.verify_check_exit_status(self, exit_status, check_job_name) + + found_catalogs = menagerie.get_catalogs(conn_id) + self.assertGreater(len( + found_catalogs), 0, msg="unable to locate schemas for connection {}".format(conn_id)) + + found_catalog_names = set( + map(lambda c: c['stream_name'], found_catalogs)) + + subset = self.expected_streams().issubset(found_catalog_names) + self.assertTrue( + subset, msg="Expected check streams are not subset of discovered catalog") + print("discovered schemas are OK") + + return found_catalogs + + def run_and_verify_sync(self, conn_id): + """ + Run a sync job and make sure it exited properly. + Return a dictionary with keys of streams synced + and values of records synced for each stream + """ + + # Run a sync job using orchestrator + sync_job_name = runner.run_sync_mode(self, conn_id) + + # Verify tap and target exit codes + exit_status = menagerie.get_exit_status(conn_id, sync_job_name) + menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) + + # Verify actual rows were synced + sync_record_count = runner.examine_target_output_file( + self, conn_id, self.expected_streams(), self.expected_pks()) + self.assertGreater( + sum(sync_record_count.values()), 0, + msg="failed to replicate any data: {}".format(sync_record_count) + ) + print("total replicated row count: {}".format( + sum(sync_record_count.values()))) + + return sync_record_count + + def perform_and_verify_table_and_field_selection(self, conn_id, test_catalogs, select_all_fields=True): + """ + Perform table and field selection based off of the streams to select + set and field selection parameters. + Verify this results in the expected streams selected and all or no + fields selected for those streams. + """ + + # Select all available fields or select no fields from all testable streams + self.select_all_streams_and_fields( + conn_id, test_catalogs, select_all_fields) + + catalogs = menagerie.get_catalogs(conn_id) + + # Ensure our selection affects the catalog + expected_selected = [tc.get('stream_name') for tc in test_catalogs] + + for cat in catalogs: + catalog_entry = menagerie.get_annotated_schema( + conn_id, cat['stream_id']) + + # Verify all testable streams are selected + selected = catalog_entry.get('annotated-schema').get('selected') + print("Validating selection on {}: {}".format( + cat['stream_name'], selected)) + if cat['stream_name'] not in expected_selected: + self.assertFalse( + selected, msg="Stream selected, but not testable.") + continue # Skip remaining assertions if we aren't selecting this stream + self.assertTrue(selected, msg="Stream not selected.") + + if select_all_fields: + # Verify all fields within each selected stream are selected + for field, field_props in catalog_entry.get('annotated-schema').get('properties').items(): + field_selected = field_props.get('selected') + print("\tValidating selection on {}.{}: {}".format( + cat['stream_name'], field, field_selected)) + self.assertTrue(field_selected, msg="Field not selected.") + else: + # Verify only automatic fields are selected + expected_automatic_fields = self.expected_automatic_fields().get( + cat['stream_name']) + selected_fields = self.get_selected_fields_from_metadata( + catalog_entry['metadata']) + self.assertEqual(expected_automatic_fields, selected_fields) + + def get_selected_fields_from_metadata(self, metadata): + selected_fields = set() + for field in metadata: + is_field_metadata = len(field['breadcrumb']) > 1 + + inclusion_automatic_or_selected = ( + field['metadata'].get('selected') is True or + field['metadata'].get('inclusion') == 'automatic' + ) + if is_field_metadata and inclusion_automatic_or_selected: + selected_fields.add(field['breadcrumb'][1]) + return selected_fields + + def select_all_streams_and_fields(self, conn_id, catalogs, select_all_fields: bool = True): + """Select all streams and all fields within streams""" + for catalog in catalogs: + schema = menagerie.get_annotated_schema( + conn_id, catalog['stream_id']) + + non_selected_properties = [] + if not select_all_fields: + # get a list of all properties so that none are selected + non_selected_properties = schema.get('annotated-schema', {}).get( + 'properties', {}).keys() + + connections.select_catalog_and_fields_via_metadata( + conn_id, catalog, schema, [], non_selected_properties) + + def calculated_states_by_stream(self, current_state): + timedelta_by_stream = {stream: [0,0,0,5] # {stream_name: [days, hours, minutes, seconds], ...} + for stream in self.expected_streams()} + + stream_to_calculated_state = {stream: "" for stream in current_state['bookmarks'].keys()} + for stream, state in current_state['bookmarks'].items(): + state_key, state_value = next(iter(state.keys())), next(iter(state.values())) + state_as_datetime = dateutil.parser.parse(state_value) + + days, hours, minutes, seconds = timedelta_by_stream[stream] + calculated_state_as_datetime = state_as_datetime - timedelta(days=days, hours=hours, minutes=minutes, seconds=seconds) + + state_format = '%Y-%m-%dT%H:%M:%S-00:00' + calculated_state_formatted = dt.strftime(calculated_state_as_datetime, state_format) + + stream_to_calculated_state[stream] = {state_key: calculated_state_formatted} + + return stream_to_calculated_state + + def parse_date(self, date_value): + """ + Pass in string-formatted-datetime, parse the value, and return it as an unformatted datetime object. + """ + date_formats = { + "%Y-%m-%dT%H:%M:%S.%fZ", + "%Y-%m-%dT%H:%M:%SZ", + "%Y-%m-%dT%H:%M:%S.%f+00:00", + "%Y-%m-%dT%H:%M:%S+00:00", + "%Y-%m-%d" + } + for date_format in date_formats: + try: + date_stripped = dt.strptime(date_value, date_format) + return date_stripped + except ValueError: + continue + + raise NotImplementedError( + "Tests do not account for dates of this format: {}".format(date_value)) + + ########################################################################## + # Tap Specific Methods + ########################################################################## + + def convert_state_to_utc(self, date_str): + """ + Convert a saved bookmark value of the form '2020-08-25T13:17:36-07:00' to + a string formatted utc datetime, + in order to compare aginast json formatted datetime values + """ + date_object = dateutil.parser.parse(date_str) + date_object_utc = date_object.astimezone(tz=pytz.UTC) + return dt.strftime(date_object_utc, "%Y-%m-%dT%H:%M:%SZ") + + def timedelta_formatted(self, dtime, days=0): + try: + date_stripped = dt.strptime(dtime, "%Y-%m-%dT%H:%M:%SZ") + return_date = date_stripped + timedelta(days=days) + + return dt.strftime(return_date, "%Y-%m-%dT%H:%M:%SZ") + + except ValueError: + try: + date_stripped = dt.strptime(dtime, self.BOOKMARK_COMPARISON_FORMAT) + return_date = date_stripped + timedelta(days=days) + + return dt.strftime(return_date, self.BOOKMARK_COMPARISON_FORMAT) + + except ValueError: + return Exception("Datetime object is not of the format: {}".format(self.START_DATE_FORMAT)) + + def is_incremental(self, stream): + return self.expected_metadata().get(stream).get(self.REPLICATION_METHOD) == self.INCREMENTAL + + def is_event(self, stream): + return stream.endswith('events') diff --git a/tests/tap_tester/test_all_fields.py b/tests/tap_tester/test_all_fields.py new file mode 100644 index 0000000..c536b50 --- /dev/null +++ b/tests/tap_tester/test_all_fields.py @@ -0,0 +1,93 @@ +import tap_tester.connections as connections +import tap_tester.runner as runner +import tap_tester.menagerie as menagerie +from base import TestPendoBase + +class PendoAllFieldsTest(TestPendoBase): + def name(self): + return "pendo_all_fields_test" + + def test_run(self): + """ + • Verify no unexpected streams were replicated + • Verify that more than just the automatic fields are replicated for each stream. + • verify all fields for each stream are replicated + """ + + # Streams to verify all fields tests + expected_streams = self.expected_streams() + + expected_automatic_fields = self.expected_automatic_fields() + conn_id = connections.ensure_connection(self) + + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # table and field selection + test_catalogs_all_fields = [catalog for catalog in found_catalogs + if catalog.get('tap_stream_id') in expected_streams] + + self.perform_and_verify_table_and_field_selection( + conn_id, test_catalogs_all_fields) + + # grab metadata after performing table-and-field selection to set expectations + # used for asserting all fields are replicated + stream_to_all_catalog_fields = dict() + for catalog in test_catalogs_all_fields: + stream_id, stream_name = catalog['stream_id'], catalog['stream_name'] + catalog_entry = menagerie.get_annotated_schema(conn_id, stream_id) + fields_from_field_level_md = [md_entry['breadcrumb'][1] + for md_entry in catalog_entry['metadata'] + if md_entry['breadcrumb'] != []] + stream_to_all_catalog_fields[stream_name] = set( + fields_from_field_level_md) + + self.run_and_verify_sync(conn_id) + + actual_fields_by_stream = runner.examine_target_output_for_fields() + + synced_records = runner.get_records_from_target_output() + + # Verify no unexpected streams were replicated + synced_stream_names = set(synced_records.keys()) + self.assertSetEqual(expected_streams, synced_stream_names) + + for stream in expected_streams: + with self.subTest(stream=stream): + + # expected values + expected_all_keys = stream_to_all_catalog_fields[stream] + expected_automatic_keys = expected_automatic_fields.get( + stream, set()) + + # collect actual values + messages = synced_records.get(stream) + actual_all_keys = [set(message['data'].keys()) for message in messages['messages'] + if message['action'] == 'upsert'][0] + + # Verify that more than just the automatic fields are replicated for each stream. + self.assertTrue(expected_automatic_keys.issubset( + expected_all_keys), msg='{} is not in "expected_all_keys"'.format(expected_automatic_keys-expected_all_keys)) + + # As we can't find the below fields in the docs and also + # it won't be generated by pendo APIs now so expected. + if stream == "visitors": + expected_all_keys = expected_all_keys - {'metadata_custom'} + elif stream == "feature_events" or stream == "page_events": + expected_all_keys = expected_all_keys - {'hour'} + elif stream == "events": + expected_all_keys = expected_all_keys - {'hour', "feature_id"} + elif stream == "track_events": + expected_all_keys = expected_all_keys - {'hour', "properties"} + elif stream == "guide_events": + expected_all_keys = expected_all_keys - {'poll_response', "poll_id"} + elif stream == "features": + expected_all_keys = expected_all_keys - {'page_id'} + elif stream == "guides": + expected_all_keys = expected_all_keys - {'audience'} + + # verify all fields for each stream are replicated + self.assertSetEqual(expected_all_keys, actual_all_keys) + + + + \ No newline at end of file diff --git a/tests/tap_tester/test_automatic_fields.py b/tests/tap_tester/test_automatic_fields.py new file mode 100644 index 0000000..77ae5b3 --- /dev/null +++ b/tests/tap_tester/test_automatic_fields.py @@ -0,0 +1,67 @@ +import tap_tester.connections as connections +import tap_tester.runner as runner +from base import TestPendoBase + +class PendoAutomaticFieldsTest(TestPendoBase): + """ + Ensure running the tap with all streams selected and all fields deselected results in the replication of just the + primary keys and replication keys (automatic fields). + """ + + def name(self): + return "pendo_automatic_fields_test" + + def test_run(self): + """ + Verify we can deselect all fields except when inclusion=automatic, which is handled by base.py methods + Verify that only the automatic fields are sent to the target. + Verify that all replicated records have unique primary key values. + """ + + streams_to_test = self.expected_streams() + streams_to_test = streams_to_test - {'features', 'guides', 'pages', 'track_types', 'feature_events', + 'page_events', 'guide_events', 'track_events', 'poll_events', 'events'} + conn_id = connections.ensure_connection(self) + + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # table and field selection + test_catalogs_automatic_fields = [catalog for catalog in found_catalogs + if catalog.get('tap_stream_id') in streams_to_test] + + # Select all streams and no fields within streams + self.perform_and_verify_table_and_field_selection( + conn_id, test_catalogs_automatic_fields, select_all_fields=False) + + record_count_by_stream = self.run_and_verify_sync(conn_id) + synced_records = runner.get_records_from_target_output() + + for stream in streams_to_test: + with self.subTest(stream=stream): + + # expected values + expected_keys = self.expected_automatic_fields().get(stream) + expected_primary_keys = self.expected_pks()[stream] + + # collect actual values + data = synced_records.get(stream, {}) + record_messages_keys = [set(row['data'].keys()) + for row in data.get('messages', [])] + primary_keys_list = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys) + for message in data.get('messages') + if message.get('action') == 'upsert'] + unique_primary_keys_list = set(primary_keys_list) + + # Verify that you get some records for each stream + self.assertGreater( + record_count_by_stream.get(stream, -1), 0, + msg="The number of records is not over the stream min limit") + + # Verify that only the automatic fields are sent to the target + for actual_keys in record_messages_keys: + self.assertSetEqual(expected_keys, actual_keys) + + #Verify that all replicated records have unique primary key values. + self.assertEqual(len(primary_keys_list), + len(unique_primary_keys_list), + msg="Replicated record does not have unique primary key values.") \ No newline at end of file diff --git a/tests/tap_tester/test_bookmark.py b/tests/tap_tester/test_bookmark.py new file mode 100644 index 0000000..d86a7e8 --- /dev/null +++ b/tests/tap_tester/test_bookmark.py @@ -0,0 +1,194 @@ +import tap_tester.connections as connections +import tap_tester.runner as runner +from base import TestPendoBase +from tap_tester import menagerie + +class PendoBookMarkTest(TestPendoBase): + """Test tap sets a bookmark and respects it for the next sync of a stream""" + + def name(self): + return "pendo_bookmark_test" + + def test_run(self): + """ + Verify that for each stream you can do a sync which records bookmarks. + That the bookmark is the maximum value sent to the target for the replication key. + That a second sync respects the bookmark + All data of the second sync is >= the bookmark from the first sync + The number of records in the 2nd sync is less then the first (This assumes that + new data added to the stream is done at a rate slow enough that you haven't + doubled the amount of data from the start date to the first sync between + the first sync and second sync run in this test) + + Verify that for full table stream, all data replicated in sync 1 is replicated again in sync 2. + + PREREQUISITE + For EACH stream that is incrementally replicated there are multiple rows of data with + different values for the replication key + """ + + expected_streams = self.expected_streams() + expected_replication_keys = self.expected_replication_keys() + expected_replication_methods = self.expected_replication_method() + expected_lookback_window = -1 * int(self.get_properties()['lookback_window']) # lookback window + + ########################################################################## + # First Sync + ########################################################################## + conn_id = connections.ensure_connection(self) + + # Run in check mode + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # table and field selection + catalog_entries = [catalog for catalog in found_catalogs + if catalog.get('tap_stream_id') in expected_streams] + + self.perform_and_verify_table_and_field_selection( + conn_id, catalog_entries) + + # Run a first sync job using orchestrator + first_sync_record_count = self.run_and_verify_sync(conn_id) + first_sync_records = runner.get_records_from_target_output() + first_sync_bookmarks = menagerie.get_state(conn_id) + + ########################################################################## + # Update State Between Syncs + ########################################################################## + + new_states = {'bookmarks': dict()} + simulated_states = self.calculated_states_by_stream( + first_sync_bookmarks) + for stream, new_state in simulated_states.items(): + new_states['bookmarks'][stream] = new_state + menagerie.set_state(conn_id, new_states) + + ########################################################################## + # Second Sync + ########################################################################## + + second_sync_record_count = self.run_and_verify_sync(conn_id) + second_sync_records = runner.get_records_from_target_output() + second_sync_bookmarks = menagerie.get_state(conn_id) + + ########################################################################## + # Test By Stream + ########################################################################## + + + for stream in expected_streams: + with self.subTest(stream=stream): + + # expected values + expected_replication_method = expected_replication_methods[stream] + + # collect information for assertions from syncs 1 & 2 base on expected values + first_sync_count = first_sync_record_count.get(stream, 0) + second_sync_count = second_sync_record_count.get(stream, 0) + first_sync_messages = [record.get('data') for record in + first_sync_records.get( + stream, {}).get('messages', []) + if record.get('action') == 'upsert'] + second_sync_messages = [record.get('data') for record in + second_sync_records.get( + stream, {}).get('messages', []) + if record.get('action') == 'upsert'] + first_bookmark_key_value = first_sync_bookmarks.get('bookmarks', {stream: None}).get(stream) + second_bookmark_key_value = second_sync_bookmarks.get('bookmarks', {stream: None}).get(stream) + + + if expected_replication_method == self.INCREMENTAL: + + # collect information specific to incremental streams from syncs 1 & 2 + replication_key = next( + iter(expected_replication_keys[stream])) + + # As for below four stream API return records with last_updated_at key while in state file + # it store bookmark as lastUpdatedAt key. So, to fetch bookmark from state file set it to lastUpdatedAt. + if stream in ["features", "guides", "pages", "track_types"]: + replication_key = "lastUpdatedAt" + + first_bookmark_value = first_bookmark_key_value.get(replication_key) + second_bookmark_value = second_bookmark_key_value.get(replication_key) + first_bookmark_value_utc = self.convert_state_to_utc( + first_bookmark_value) + second_bookmark_value_utc = self.convert_state_to_utc( + second_bookmark_value) + + + simulated_bookmark_value = self.convert_state_to_utc(new_states['bookmarks'][stream][replication_key]) + simulated_bookmark_minus_lookback = self.timedelta_formatted( + simulated_bookmark_value, days=expected_lookback_window + ) if self.is_event(stream) else simulated_bookmark_value + + # For track_event we have data within 2 days. As per pendo documentation for dayRange + # period sometimes it may include 23 or 25 hours of data before bookmark. + # So, we have subtracted 1 day from last saved bookmark. + # More details can be found at https://developers.pendo.io/docs/?bash#time-series. + simulated_bookmark_minus_lookback = self.timedelta_formatted(simulated_bookmark_minus_lookback, -1) + + # Verify the first sync sets a bookmark of the expected form + self.assertIsNotNone(first_bookmark_key_value) + self.assertIsNotNone(first_bookmark_value) + + # Verify the second sync sets a bookmark of the expected form + self.assertIsNotNone(second_bookmark_key_value) + self.assertIsNotNone(second_bookmark_value) + + # Verify the second sync bookmark is Equal to the first sync bookmark + # assumes no changes to data during test + self.assertEqual(second_bookmark_value, + first_bookmark_value) + + # As for these four stream record comes with last_updated_at key while in state file + # it store as bookmark key lastUpdatedAt key. + # We updated replication_key to lastUpdatedAt for these streams at above. + # So, reverting back again to fetch records by replication key. + if stream in ["features", "guides", "pages", "track_types"]: + replication_key = "last_updated_at" + + for record in first_sync_messages: + # Verify the first sync bookmark value is the max replication key value for a given stream + replication_key_value = record.get(replication_key) + self.assertLessEqual( + replication_key_value, first_bookmark_value_utc, + msg="First sync bookmark was set incorrectly, a record with a greater replication-key value was synced." + ) + + for record in second_sync_messages: + # Verify the second sync replication key value is Greater or Equal to the first sync bookmark + replication_key_value = record.get(replication_key) + self.assertGreaterEqual(replication_key_value, simulated_bookmark_minus_lookback, + msg="Second sync records do not repect the previous bookmark.") + + # Verify the second sync bookmark value is the max replication key value for a given stream + self.assertLessEqual( + replication_key_value, second_bookmark_value_utc, + msg="Second sync bookmark was set incorrectly, a record with a greater replication-key value was synced." + ) + + # verify that you get less data the 2nd time around + self.assertLess( + second_sync_count, + first_sync_count, + msg="second sync didn't have less records, bookmark usage not verified") + + elif expected_replication_method == self.FULL_TABLE: + + # Verify the syncs do not set a bookmark for full table streams + self.assertIsNone(first_bookmark_key_value) + self.assertIsNone(second_bookmark_key_value) + + # Verify the number of records in the second sync is the same as the first + self.assertEqual(second_sync_count, first_sync_count) + + else: + + raise NotImplementedError( + "INVALID EXPECTATIONS\t\tSTREAM: {} REPLICATION_METHOD: {}".format( + stream, expected_replication_method) + ) + + # Verify at least 1 record was replicated in the second sync + self.assertGreater( + second_sync_count, 0, msg="We are not fully testing bookmarking for {}".format(stream)) \ No newline at end of file diff --git a/tests/tap_tester/test_child_stream_start_date.py b/tests/tap_tester/test_child_stream_start_date.py new file mode 100644 index 0000000..2a86bf0 --- /dev/null +++ b/tests/tap_tester/test_child_stream_start_date.py @@ -0,0 +1,65 @@ +from tap_tester import connections, runner +from base import TestPendoBase +from datetime import datetime + +class PendoChildStreamStartDateTest(TestPendoBase): + + def name(self): + return "pendo_child_stream_start_date_test" + + def test_run(self): + + streams_to_test = {"guides", "guide_events"} + + conn_id = connections.ensure_connection(self) + + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # table and field selection + test_catalogs_all_fields = [catalog for catalog in found_catalogs + if catalog.get('tap_stream_id') in streams_to_test] + + self.perform_and_verify_table_and_field_selection(conn_id,test_catalogs_all_fields) + + record_count_by_stream = self.run_and_verify_sync(conn_id) + synced_records = runner.get_records_from_target_output() + + # check if all streams have collected records + for stream in streams_to_test: + self.assertGreater(record_count_by_stream.get(stream, -1), 0, + msg="failed to replicate any data for stream : {}".format(stream)) + + # collect "guide" and "guide_events" data + guides = synced_records.get("guides") + guide_events = synced_records.get("guide_events") + + # find the first guide's id + first_guide_id = guides.get("messages")[0].get("data").get("id") + + first_guide_ids_events = [] + rest_guide_events = [] + + # seperate guide events based on guide id + for guide_event in guide_events.get("messages"): + if guide_event.get("data").get("guide_id") == first_guide_id: + first_guide_ids_events.append(guide_event.get("data")) + else: + rest_guide_events.append(guide_event.get("data")) + + replication_key_for_guide_events = next(iter(self.expected_replication_keys().get("guide_events"))) + + # find the maximun bookmark date for first guide's events + sorted_first_guide_ids_events = sorted(first_guide_ids_events, key=lambda i: i[replication_key_for_guide_events], reverse=True) + max_bookmark = sorted_first_guide_ids_events[0].get(replication_key_for_guide_events) + + # used for verifying if we synced guide events before + # than the maximum bookmark of first guide's events + synced_older_data = False + for rest_guide_event in rest_guide_events: + event_time = datetime.strptime(rest_guide_event.get(replication_key_for_guide_events), "%Y-%m-%dT%H:%M:%S.%fZ") + max_bookmark_time = datetime.strptime(max_bookmark, "%Y-%m-%dT%H:%M:%S.%fZ") + if event_time < max_bookmark_time: + synced_older_data = True + break + + self.assertTrue(synced_older_data) diff --git a/tests/tap_tester/test_discovery.py b/tests/tap_tester/test_discovery.py new file mode 100644 index 0000000..dd43867 --- /dev/null +++ b/tests/tap_tester/test_discovery.py @@ -0,0 +1,125 @@ +import re + +import tap_tester.connections as connections +from base import TestPendoBase +from tap_tester import menagerie + +class PendoDiscoverTest(TestPendoBase): + """ + Testing that discovery creates the appropriate catalog with valid metadata. + • Verify number of actual streams discovered match expected + • Verify the stream names discovered were what we expect + • Verify stream names follow naming convention + streams should only have lowercase alphas and underscores + • verify there is only 1 top level breadcrumb + • verify replication key(s) + • verify primary key(s) + • verify that if there is a replication key we are doing INCREMENTAL otherwise FULL + • verify the actual replication matches our expected replication method + • verify that primary, replication keys are given the inclusion of automatic. + • verify that all other fields have inclusion of available metadata. + """ + + def name(self): + return "pendo_discover_test" + + def test_run(self): + streams_to_test = self.expected_streams() + + conn_id = connections.ensure_connection(self, payload_hook=None) + + # Verify that there are catalogs found + found_catalogs = self.run_and_verify_check_mode( + conn_id) + + # Verify stream names follow naming convention + # streams should only have lowercase alphas and underscores + found_catalog_names = {c['tap_stream_id'] for c in found_catalogs} + self.assertTrue(all([re.fullmatch(r"[a-z_]+", name) for name in found_catalog_names]), + msg="One or more streams don't follow standard naming") + + for stream in streams_to_test: + with self.subTest(stream=stream): + + # Verify ensure the caatalog is found for a given stream + catalog = next(iter([catalog for catalog in found_catalogs + if catalog["stream_name"] == stream])) + self.assertIsNotNone(catalog) + + # collecting expected values + expected_primary_keys = self.expected_pks()[stream] + expected_replication_keys = self.expected_replication_keys()[ + stream] + expected_automatic_fields = self.expected_automatic_fields().get(stream) + expected_replication_method = self.expected_replication_method()[ + stream] + + # collecting actual values... + schema_and_metadata = menagerie.get_annotated_schema( + conn_id, catalog['stream_id']) + metadata = schema_and_metadata["metadata"] + stream_properties = [ + item for item in metadata if item.get("breadcrumb") == []] + actual_primary_keys = set( + stream_properties[0].get( + "metadata", {self.PRIMARY_KEYS: []}).get(self.PRIMARY_KEYS, []) + ) + actual_replication_keys = set( + stream_properties[0].get( + "metadata", {self.REPLICATION_KEYS: []}).get(self.REPLICATION_KEYS, []) + ) + actual_replication_method = stream_properties[0].get( + "metadata", {self.REPLICATION_METHOD: None}).get(self.REPLICATION_METHOD) + actual_automatic_fields = set( + item.get("breadcrumb", ["properties", None])[1] for item in metadata + if item.get("metadata").get("inclusion") == "automatic" + ) + + ########################################################################## + # metadata assertions + ########################################################################## + + # verify there is only 1 top level breadcrumb in metadata + self.assertTrue(len(stream_properties) == 1, + msg="There is NOT only one top level breadcrumb for {}".format(stream) + + "\nstream_properties | {}".format(stream_properties)) + + # verify that if there is a replication key we are doing INCREMENTAL otherwise FULL + if actual_replication_keys: + self.assertTrue(actual_replication_method == self.INCREMENTAL, + msg="Expected INCREMENTAL replication " + "since there is a replication key") + else: + self.assertTrue(actual_replication_method == self.FULL_TABLE, + msg="Expected FULL replication " + "since there is no replication key") + + # verify the actual replication matches our expected replication method + self.assertEqual(expected_replication_method, actual_replication_method, + msg="The actual replication method {} doesn't match the expected {}".format( + actual_replication_method, expected_replication_method)) + + # verify replication key(s) + self.assertEqual(expected_replication_keys, actual_replication_keys, + msg="expected replication key {} but actual is {}".format( + expected_replication_keys, actual_replication_keys)) + + # verify primary key(s) match expectations + self.assertSetEqual( + expected_primary_keys, actual_primary_keys, + ) + + # verify that primary keys and replication keys + # are given the inclusion of automatic in metadata. + self.assertSetEqual(expected_automatic_fields, + actual_automatic_fields) + + # verify that all other fields have inclusion of available + # This assumes there are no unsupported fields for SaaS sources + self.assertTrue( + all({item.get("metadata").get("inclusion") == "available" + for item in metadata + if item.get("breadcrumb", []) != [] + and item.get("breadcrumb", ["properties", None])[1] + not in actual_automatic_fields}), + msg="Not all non key properties are set to available in metadata") diff --git a/tests/tap_tester/test_start_date.py b/tests/tap_tester/test_start_date.py new file mode 100644 index 0000000..9bb5f8b --- /dev/null +++ b/tests/tap_tester/test_start_date.py @@ -0,0 +1,163 @@ +import tap_tester.connections as connections +import tap_tester.runner as runner +from base import TestPendoBase + +class PendoStartDateTest(TestPendoBase): + """Instantiate start date according to the desired data set and run the test""" + + def get_properties(self, *args, **kwargs): + props = super().get_properties(*args, **kwargs) + props.pop('lookback_window') + return props + + start_date_1 = "" + start_date_2 = "" + + def name(self): + return "pendo_start_date_test" + + def test_run(self): + self.run_test("2021-09-09T00:00:00Z", "2021-09-13T00:00:00Z", {"accounts", "visitors", "metadata_visitors", "metadata_accounts"}) + self.run_test("2020-09-01T00:00:00Z", "2021-03-01T00:00:00Z", {"features", "feature_events", "pages", "page_events", "events"}) + self.run_test("2021-09-09T00:00:00Z", "2021-09-16T00:00:00Z", {"guides", "guide_events"}) + self.run_test("2021-09-13T00:00:00Z", "2021-09-15T00:00:00Z", {"track_types", "track_events"}) + + def run_test(self, start_date_1, start_date_2, streams): + """ + Test that the start_date configuration is respected + • verify that a sync with a later start date has at least one record synced + and less records than the 1st sync with a previous start date + • verify that each stream has less records than the earlier start date sync + • verify all data from later start data has bookmark values >= start_date + """ + + self.start_date_1 = start_date_1 + self.start_date_2 = start_date_2 + + self.start_date = self.start_date_1 + + expected_streams = streams + + ########################################################################## + # First Sync + ########################################################################## + + # instantiate connection + conn_id_1 = connections.ensure_connection(self, original_properties=False) + + # run check mode + found_catalogs_1 = self.run_and_verify_check_mode(conn_id_1) + + # table and field selection + test_catalogs_1_all_fields = [catalog for catalog in found_catalogs_1 + if catalog.get('tap_stream_id') in expected_streams] + self.perform_and_verify_table_and_field_selection( + conn_id_1, test_catalogs_1_all_fields, select_all_fields=True) + + # run initial sync + record_count_by_stream_1 = self.run_and_verify_sync(conn_id_1) + synced_records_1 = runner.get_records_from_target_output() + + ########################################################################## + # Update START DATE Between Syncs + ########################################################################## + + print("REPLICATION START DATE CHANGE: {} ===>>> {} ".format( + self.start_date, self.start_date_2)) + self.start_date = self.start_date_2 + + ########################################################################## + # Second Sync + ########################################################################## + + # create a new connection with the new start_date + conn_id_2 = connections.ensure_connection( + self, original_properties=False) + + # run check mode + found_catalogs_2 = self.run_and_verify_check_mode(conn_id_2) + + # table and field selection + test_catalogs_2_all_fields = [catalog for catalog in found_catalogs_2 + if catalog.get('tap_stream_id') in expected_streams] + self.perform_and_verify_table_and_field_selection( + conn_id_2, test_catalogs_2_all_fields, select_all_fields=True) + + # run sync + record_count_by_stream_2 = self.run_and_verify_sync(conn_id_2) + synced_records_2 = runner.get_records_from_target_output() + + for stream in expected_streams: + with self.subTest(stream=stream): + + # expected values + expected_primary_keys = self.expected_pks()[stream] + expected_start_date_1 = self.timedelta_formatted( + self.start_date_1, -1) + expected_start_date_2 = self.timedelta_formatted( + self.start_date_2, -1) + + # collect information for assertions from syncs 1 & 2 base on expected values + record_count_sync_1 = record_count_by_stream_1.get(stream, 0) + record_count_sync_2 = record_count_by_stream_2.get(stream, 0) + + primary_keys_list_1 = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys) + for message in synced_records_1.get(stream, {}).get('messages', []) + if message.get('action') == 'upsert'] + primary_keys_list_2 = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys) + for message in synced_records_2.get(stream, {}).get('messages', []) + if message.get('action') == 'upsert'] + + primary_keys_sync_1 = set(primary_keys_list_1) + primary_keys_sync_2 = set(primary_keys_list_2) + + if self.is_incremental(stream): + + # collect information specific to incremental streams from syncs 1 & 2 + expected_replication_key = next( + iter(self.expected_replication_keys().get(stream, []))) + replication_dates_1 = [row.get('data').get(expected_replication_key) for row in + synced_records_1.get(stream, {'messages': []}).get('messages', []) + if row.get('data')] + replication_dates_2 = [row.get('data').get(expected_replication_key) for row in + synced_records_2.get(stream, {'messages': []}).get('messages', []) + if row.get('data')] + + # Verify replication key is greater or equal to start_date for sync 1 + for replication_date in replication_dates_1: + self.assertGreaterEqual( + self.parse_date(replication_date), self.parse_date( + expected_start_date_1), + msg="Report pertains to a date prior to our start date.\n" + + "Sync start_date: {}\n".format(expected_start_date_1) + + "Record date: {} ".format(replication_date) + ) + + # Verify replication key is greater or equal to start_date for sync 2 + for replication_date in replication_dates_2: + self.assertGreaterEqual( + self.parse_date(replication_date), self.parse_date( + expected_start_date_2), + msg="Report pertains to a date prior to our start date.\n" + + "Sync start_date: {}\n".format(expected_start_date_2) + + "Record date: {} ".format(replication_date) + ) + + # Verify the number of records replicated in sync 1 is greater than the number + # of records replicated in sync 2 + self.assertGreater(record_count_sync_1, + record_count_sync_2) + + # Verify the records replicated in sync 2 were also replicated in sync 1 + self.assertTrue( + primary_keys_sync_2.issubset(primary_keys_sync_1)) + + else: + + # Verify that the 2nd sync with a later start date replicates the same number of + # records as the 1st sync. + self.assertEqual(record_count_sync_2, record_count_sync_1) + + # Verify by primary key the same records are replicated in the 1st and 2nd syncs + self.assertSetEqual(primary_keys_sync_1, + primary_keys_sync_2) diff --git a/tests/tap_tester/test_sync.py b/tests/tap_tester/test_sync.py index 9e368ce..449cb5d 100644 --- a/tests/tap_tester/test_sync.py +++ b/tests/tap_tester/test_sync.py @@ -1,150 +1,34 @@ -import unittest -from datetime import datetime as dt - -from datetime import timedelta -import os -from tap_tester import menagerie -import tap_tester.runner as runner import tap_tester.connections as connections - - -class TestSyncNonReportStreams(unittest.TestCase): - START_DATE_FORMAT = "%Y-%m-%dT00:00:00Z" - - """ Test the non-report streams """ - - @staticmethod - def name(): - return "test_sync" - - @staticmethod - def tap_name(): - """The name of the tap""" - return "tap-pendo" - - @staticmethod - def get_type(): - """the expected url route ending""" - return "platform.pendo" - - def expected_check_streams(self): - return set(self.expected_pks().keys()) - - def expected_sync_streams(self): - return set(self.expected_pks().keys()) - - @staticmethod - def expected_pks(): - return { - "accounts": {"account_id"}, - "features": {"id"}, - "guides": {"id"}, - "pages": {"id"}, - # Add back when visitor_history stream causing this test to take - # 4+ hours is solved, tracked in this JIRA: - # https://stitchdata.atlassian.net/browse/SRCE-4755 - # "visitor_history": {"visitor_id"}, - - "visitors": {"visitor_id"}, - "track_types": {"id"}, - "feature_events": {"visitor_id", "account_id", "server", "remote_ip"}, - "events": {"visitor_id", "account_id", "server", "remote_ip"}, - "page_events": {"visitor_id", "account_id", "server", "remote_ip"}, - "guide_events": {"visitor_id", "account_id", "server_name", "remote_ip"}, - "poll_events": {"visitor_id", "account_id", "server_name", "remote_ip"}, - "track_events": {"visitor_id", "account_id", "server", "remote_ip"}, - "metadata_accounts": {}, - "metadata_visitors": {}, - } - - def get_properties(self): - return { - "start_date": self.get_start_date(), - "lookback_window": "1", - "period": "dayRange", - } - - def get_start_date(self): - if not hasattr(self, 'start_date'): - self.start_date = dt.strftime(dt.utcnow() - timedelta(days=2), self.START_DATE_FORMAT) - - return self.start_date - - @staticmethod - def get_credentials(): - return { - "x_pendo_integration_key": os.getenv("TAP_PENDO_INTEGRATION_KEY") - } - - def setUp(self): - missing_envs = [x for x in [ - "TAP_PENDO_INTEGRATION_KEY", - ] if os.getenv(x) is None] - - if missing_envs: - raise Exception("Missing environment variables: {}".format(missing_envs)) - - def test_run(self): - - conn_id = connections.ensure_connection(self, payload_hook=None) - - # Run the tap in check mode - check_job_name = runner.run_check_mode(self, conn_id) - - # Verify the check's exit status - exit_status = menagerie.get_exit_status(conn_id, check_job_name) - menagerie.verify_check_exit_status(self, exit_status, check_job_name) - - # Verify that there are catalogs found - found_catalogs = menagerie.get_catalogs(conn_id) - self.assertGreater(len(found_catalogs), 0, msg="unable to locate schemas for connection {}".format(conn_id)) - - found_catalog_names = set(map(lambda c: c['tap_stream_id'], found_catalogs)) - subset = self.expected_check_streams().issubset(found_catalog_names) - self.assertTrue(subset, msg="Expected check streams are not subset of discovered catalog, extra streams={}".format(self.expected_check_streams().difference(found_catalog_names))) - # - # # Select some catalogs - our_catalogs = [c for c in found_catalogs if c.get('tap_stream_id') in self.expected_sync_streams()] - for catalog in our_catalogs: - schema = menagerie.get_annotated_schema(conn_id, catalog['stream_id']) - connections.select_catalog_and_fields_via_metadata(conn_id, catalog, schema, [], []) - - # # Verify that all streams sync at least one row for initial sync - # # This test is also verifying access token expiration handling. If test fails with - # # authentication error, refresh token was not replaced after expiring. - menagerie.set_state(conn_id, {}) - sync_job_name = runner.run_sync_mode(self, conn_id) - - # # Verify tap and target exit codes - exit_status = menagerie.get_exit_status(conn_id, sync_job_name) - menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) - record_count_by_stream = runner.examine_target_output_file(self, conn_id, self.expected_sync_streams(), - self.expected_pks()) - - # Verify that all streams sync at least one row for initial sync - for stream in self.expected_sync_streams().difference({ - 'feature_events', - 'events', - 'page_events', - 'guide_events', - 'poll_events', - 'track_events', - 'track_types', - }): - with self.subTest(stream=stream): - self.assertLess(0, record_count_by_stream[stream]) - - # TODO run the remaining assertions against all incremental streams - - # Verify that bookmark values are correct after incremental sync - start_date = self.get_properties()['start_date'] - current_state = menagerie.get_state(conn_id) - test_bookmark = current_state['bookmarks']['accounts'] - - # Verify a bookmark is present for accounts - self.assertIn('bookmarks', current_state.keys()) - self.assertIn('accounts', current_state['bookmarks'].keys()) - - # # BUG | https://jira.talendforge.org/browse/TDL-13470 - # # Verify the bookmarked value is correct after incremental sync for accounts - # self.assertGreater(test_bookmark['lastupdated'], start_date) +from base import TestPendoBase + +class PendoSyncTest(TestPendoBase): + def name(self): + return "pendo_sync_test" + + def sync_test_run(self): + """ + Testing that sync creates the appropriate catalog with valid metadata. + • Verify that all fields and all streams have selected set to True in the metadata + """ + streams_to_test = self.expected_streams() + + conn_id = connections.ensure_connection(self) + + found_catalogs = self.run_and_verify_check_mode(conn_id) + + + # table and field selection + test_catalogs_all_fields = [catalog for catalog in found_catalogs + if catalog.get('tap_stream_id') in streams_to_test] + + self.perform_and_verify_table_and_field_selection(conn_id,test_catalogs_all_fields) + + record_count_by_stream = self.run_and_verify_sync(conn_id) + + # check if all streams have collected records + for stream in streams_to_test: + self.assertGreater( + record_count_by_stream.get(stream, -1), 0, + msg="failed to replicate any data for stream : {}".format(stream) + ) + diff --git a/tests/unittests/test_backoff.py b/tests/unittests/test_backoff.py new file mode 100644 index 0000000..f4859bb --- /dev/null +++ b/tests/unittests/test_backoff.py @@ -0,0 +1,918 @@ +import unittest +import requests +import socket +from unittest import mock +import tap_pendo.streams as streams +from requests.models import ProtocolError + +class Mockresponse: + def __init__(self, status_code, json, raise_error, headers=None): + self.status_code = status_code + self.raise_error = raise_error + self.text = json + self.headers = headers + self.reason = "test" + self.raw = '{"results": [{"key1": "value1", "key2": "value2"}]}' + + def __enter__(self): + return self + + def __exit__(self, *args): + return True + + def raise_for_status(self): + if not self.raise_error: + return self.status_code + + raise requests.HTTPError("Sample message") + + def json(self): + return self.text + +def get_response(json={}): + return Mockresponse(200, json, False) + +@mock.patch("time.sleep") +@mock.patch('requests.Session.send') +class TestTimeOut(unittest.TestCase): + + def test_timeout__accounts(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = requests.exceptions.Timeout + + # initialize 'accounts' stream class + accounts = streams.Accounts({'x_pendo_integration_key': 'test'}) + + try: + accounts.request('accounts') + except requests.exceptions.Timeout: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_timeout__features(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = requests.exceptions.Timeout + + # initialize 'features' stream class + features = streams.Features({'x_pendo_integration_key': 'test'}) + + try: + features.request('features') + except requests.exceptions.Timeout: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_timeout__guides(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = requests.exceptions.Timeout + + # initialize 'guides' stream class + guides = streams.Guides({'x_pendo_integration_key': 'test'}) + + try: + guides.request('guides') + except requests.exceptions.Timeout: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_timeout__pages(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = requests.exceptions.Timeout + + # initialize 'pages' stream class + pages = streams.Pages({'x_pendo_integration_key': 'test'}) + + try: + pages.request('pages') + except requests.exceptions.Timeout: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_timeout__feature_events(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = requests.exceptions.Timeout + + # initialize 'feature_events' stream class + feature_events = streams.FeatureEvents({'x_pendo_integration_key': 'test'}) + + try: + feature_events.request('feature_events') + except requests.exceptions.Timeout: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_timeout__page_events(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = requests.exceptions.Timeout + + # initialize 'page_events' stream class + page_events = streams.PageEvents({'x_pendo_integration_key': 'test'}) + + try: + page_events.request('page_events') + except requests.exceptions.Timeout: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_timeout__guide_events(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = requests.exceptions.Timeout + + # initialize 'guide_events' stream class + guide_events = streams.GuideEvents({'x_pendo_integration_key': 'test'}) + + try: + guide_events.request('guide_events') + except requests.exceptions.Timeout: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_timeout__poll_events(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = requests.exceptions.Timeout + + # initialize 'poll_events' stream class + poll_events = streams.PollEvents({'x_pendo_integration_key': 'test'}) + + try: + poll_events.request('poll_events') + except requests.exceptions.Timeout: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_timeout__track_types(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = requests.exceptions.Timeout + + # initialize 'track_types' stream class + track_types = streams.TrackTypes({'x_pendo_integration_key': 'test'}) + + try: + track_types.request('track_types') + except requests.exceptions.Timeout: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_timeout__track_events(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = requests.exceptions.Timeout + + # initialize 'track_events' stream class + track_events = streams.TrackEvents({'x_pendo_integration_key': 'test'}) + + try: + track_events.request('track_events') + except requests.exceptions.Timeout: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_timeout__metadata_accounts(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = requests.exceptions.Timeout + + # initialize 'metadata_accounts' stream class + metadata_accounts = streams.MetadataAccounts({'x_pendo_integration_key': 'test'}) + + try: + metadata_accounts.request('metadata_accounts') + except requests.exceptions.Timeout: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_timeout__metadata_visitors(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = requests.exceptions.Timeout + + # initialize 'metadata_visitors' stream class + metadata_visitors = streams.MetadataVisitors({'x_pendo_integration_key': 'test'}) + + try: + metadata_visitors.request('metadata_visitors') + except requests.exceptions.Timeout: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_timeout__visitor_history(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = requests.exceptions.Timeout + + # initialize 'visitor_history' stream class + visitor_history = streams.VisitorHistory({'x_pendo_integration_key': 'test'}) + + try: + visitor_history.request('visitor_history', visitorId=1) + except requests.exceptions.Timeout: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_timeout__visitors(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = requests.exceptions.Timeout + + # initialize 'visitors' stream class + visitors = streams.Visitors({'x_pendo_integration_key': 'test'}) + + try: + visitors.request('visitors') + except requests.exceptions.Timeout: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_timeout__events(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = requests.exceptions.Timeout + + # initialize 'events' stream class + events = streams.Events({'x_pendo_integration_key': 'test'}) + + try: + events.request('events') + except requests.exceptions.Timeout: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + +@mock.patch("time.sleep") +@mock.patch('requests.Session.send') +class TestConnectionResetError(unittest.TestCase): + + def test_connection_reset_error__accounts(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = socket.error(104, 'Connection reset by peer') + + # initialize 'accounts' stream class + accounts = streams.Accounts({'x_pendo_integration_key': 'test'}) + + try: + accounts.request('accounts') + except ConnectionResetError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_connection_reset_error__features(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = socket.error(104, 'Connection reset by peer') + + # initialize 'features' stream class + features = streams.Features({'x_pendo_integration_key': 'test'}) + + try: + features.request('features') + except ConnectionResetError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_connection_reset_error__guides(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = socket.error(104, 'Connection reset by peer') + + # initialize 'guides' stream class + guides = streams.Guides({'x_pendo_integration_key': 'test'}) + + try: + guides.request('guides') + except ConnectionResetError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_connection_reset_error__pages(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = socket.error(104, 'Connection reset by peer') + + # initialize 'pages' stream class + pages = streams.Pages({'x_pendo_integration_key': 'test'}) + + try: + pages.request('pages') + except ConnectionResetError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_connection_reset_error__feature_events(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = socket.error(104, 'Connection reset by peer') + + # initialize 'feature_events' stream class + feature_events = streams.FeatureEvents({'x_pendo_integration_key': 'test'}) + + try: + feature_events.request('feature_events') + except ConnectionResetError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_connection_reset_error__page_events(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = socket.error(104, 'Connection reset by peer') + + # initialize 'page_events' stream class + page_events = streams.PageEvents({'x_pendo_integration_key': 'test'}) + + try: + page_events.request('page_events') + except ConnectionResetError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_connection_reset_error__guide_events(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = socket.error(104, 'Connection reset by peer') + + # initialize 'guide_events' stream class + guide_events = streams.GuideEvents({'x_pendo_integration_key': 'test'}) + + try: + guide_events.request('guide_events') + except ConnectionResetError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_connection_reset_error__poll_events(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = socket.error(104, 'Connection reset by peer') + + # initialize 'poll_events' stream class + poll_events = streams.PollEvents({'x_pendo_integration_key': 'test'}) + + try: + poll_events.request('poll_events') + except ConnectionResetError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_connection_reset_error__track_types(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = socket.error(104, 'Connection reset by peer') + + # initialize 'track_types' stream class + track_types = streams.TrackTypes({'x_pendo_integration_key': 'test'}) + + try: + track_types.request('track_types') + except ConnectionResetError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_connection_reset_error__track_events(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = socket.error(104, 'Connection reset by peer') + + # initialize 'track_events' stream class + track_events = streams.TrackEvents({'x_pendo_integration_key': 'test'}) + + try: + track_events.request('track_events') + except ConnectionResetError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_connection_reset_error__metadata_accounts(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = socket.error(104, 'Connection reset by peer') + + # initialize 'metadata_accounts' stream class + metadata_accounts = streams.MetadataAccounts({'x_pendo_integration_key': 'test'}) + + try: + metadata_accounts.request('metadata_accounts') + except ConnectionResetError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_connection_reset_error__metadata_visitors(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = socket.error(104, 'Connection reset by peer') + + # initialize 'metadata_visitors' stream class + metadata_visitors = streams.MetadataVisitors({'x_pendo_integration_key': 'test'}) + + try: + metadata_visitors.request('metadata_visitors') + except ConnectionResetError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_connection_reset_error__visitor_history(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = socket.error(104, 'Connection reset by peer') + + # initialize 'visitor_history' stream class + visitor_history = streams.VisitorHistory({'x_pendo_integration_key': 'test'}) + + try: + visitor_history.request('visitor_history', visitorId=1) + except ConnectionResetError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_connection_reset_error__visitors(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = socket.error(104, 'Connection reset by peer') + + # initialize 'visitors' stream class + visitors = streams.Visitors({'x_pendo_integration_key': 'test'}) + + try: + visitors.request('visitors') + except ConnectionResetError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_connection_reset_error__events(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = socket.error(104, 'Connection reset by peer') + + # initialize 'events' stream class + events = streams.Events({'x_pendo_integration_key': 'test'}) + + try: + events.request('events') + except ConnectionResetError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + +@mock.patch("time.sleep") +@mock.patch('requests.Session.send') +class TestProtocolError(unittest.TestCase): + + def test_protocol_error__accounts(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = ProtocolError("Connection broken: ConnectionResetError(104, 'Connection reset by peer')", ConnectionResetError(104, 'Connection reset by peer')) + + # initialize 'accounts' stream class + accounts = streams.Accounts({'x_pendo_integration_key': 'test'}) + + try: + accounts.request('accounts') + except ProtocolError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_protocol_error__features(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = ProtocolError("Connection broken: ConnectionResetError(104, 'Connection reset by peer')", ConnectionResetError(104, 'Connection reset by peer')) + + # initialize 'features' stream class + features = streams.Features({'x_pendo_integration_key': 'test'}) + + try: + features.request('features') + except ProtocolError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_protocol_error__guides(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = ProtocolError("Connection broken: ConnectionResetError(104, 'Connection reset by peer')", ConnectionResetError(104, 'Connection reset by peer')) + + # initialize 'guides' stream class + guides = streams.Guides({'x_pendo_integration_key': 'test'}) + + try: + guides.request('guides') + except ProtocolError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_protocol_error__pages(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = ProtocolError("Connection broken: ConnectionResetError(104, 'Connection reset by peer')", ConnectionResetError(104, 'Connection reset by peer')) + + # initialize 'pages' stream class + pages = streams.Pages({'x_pendo_integration_key': 'test'}) + + try: + pages.request('pages') + except ProtocolError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_protocol_error__feature_events(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = ProtocolError("Connection broken: ConnectionResetError(104, 'Connection reset by peer')", ConnectionResetError(104, 'Connection reset by peer')) + + # initialize 'feature_events' stream class + feature_events = streams.FeatureEvents({'x_pendo_integration_key': 'test'}) + + try: + feature_events.request('feature_events') + except ProtocolError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_protocol_error__page_events(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = ProtocolError("Connection broken: ConnectionResetError(104, 'Connection reset by peer')", ConnectionResetError(104, 'Connection reset by peer')) + + # initialize 'page_events' stream class + page_events = streams.PageEvents({'x_pendo_integration_key': 'test'}) + + try: + page_events.request('page_events') + except ProtocolError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_protocol_error__guide_events(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = ProtocolError("Connection broken: ConnectionResetError(104, 'Connection reset by peer')", ConnectionResetError(104, 'Connection reset by peer')) + + # initialize 'guide_events' stream class + guide_events = streams.GuideEvents({'x_pendo_integration_key': 'test'}) + + try: + guide_events.request('guide_events') + except ProtocolError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_protocol_error__poll_events(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = ProtocolError("Connection broken: ConnectionResetError(104, 'Connection reset by peer')", ConnectionResetError(104, 'Connection reset by peer')) + + # initialize 'poll_events' stream class + poll_events = streams.PollEvents({'x_pendo_integration_key': 'test'}) + + try: + poll_events.request('poll_events') + except ProtocolError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_protocol_error__track_types(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = ProtocolError("Connection broken: ConnectionResetError(104, 'Connection reset by peer')", ConnectionResetError(104, 'Connection reset by peer')) + + # initialize 'track_types' stream class + track_types = streams.TrackTypes({'x_pendo_integration_key': 'test'}) + + try: + track_types.request('track_types') + except ProtocolError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_protocol_error__track_events(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = ProtocolError("Connection broken: ConnectionResetError(104, 'Connection reset by peer')", ConnectionResetError(104, 'Connection reset by peer')) + + # initialize 'track_events' stream class + track_events = streams.TrackEvents({'x_pendo_integration_key': 'test'}) + + try: + track_events.request('track_events') + except ProtocolError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_protocol_error__metadata_accounts(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = ProtocolError("Connection broken: ConnectionResetError(104, 'Connection reset by peer')", ConnectionResetError(104, 'Connection reset by peer')) + + # initialize 'metadata_accounts' stream class + metadata_accounts = streams.MetadataAccounts({'x_pendo_integration_key': 'test'}) + + try: + metadata_accounts.request('metadata_accounts') + except ProtocolError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_protocol_error__metadata_visitors(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = ProtocolError("Connection broken: ConnectionResetError(104, 'Connection reset by peer')", ConnectionResetError(104, 'Connection reset by peer')) + + # initialize 'metadata_visitors' stream class + metadata_visitors = streams.MetadataVisitors({'x_pendo_integration_key': 'test'}) + + try: + metadata_visitors.request('metadata_visitors') + except ProtocolError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_protocol_error__visitor_history(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = ProtocolError("Connection broken: ConnectionResetError(104, 'Connection reset by peer')", ConnectionResetError(104, 'Connection reset by peer')) + + # initialize 'visitor_history' stream class + visitor_history = streams.VisitorHistory({'x_pendo_integration_key': 'test'}) + + try: + visitor_history.request('visitor_history', visitorId=1) + except ProtocolError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_protocol_error__visitors(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = ProtocolError("Connection broken: ConnectionResetError(104, 'Connection reset by peer')", ConnectionResetError(104, 'Connection reset by peer')) + + # initialize 'visitors' stream class + visitors = streams.Visitors({'x_pendo_integration_key': 'test'}) + + try: + visitors.request('visitors') + except ProtocolError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + + def test_protocol_error__events(self, mocked_send, mocked_sleep): + # mock request and raise error + mocked_send.side_effect = ProtocolError("Connection broken: ConnectionResetError(104, 'Connection reset by peer')", ConnectionResetError(104, 'Connection reset by peer')) + + # initialize 'events' stream class + events = streams.Events({'x_pendo_integration_key': 'test'}) + + try: + events.request('events') + except ProtocolError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) + +@mock.patch('requests.Session.send') +class Positive(unittest.TestCase): + + def test_positive__accounts(self, mocked_send): + json = {"key1": "value1", "key2": "value2"} + # mock request and return dummy data + mocked_send.return_value = get_response(json) + + # initialize 'accounts' stream class + accounts = streams.Accounts({'x_pendo_integration_key': 'test'}) + + resp = accounts.request('accounts') + + # verify if the desired data was returned from the request + self.assertEquals(resp, json) + + def test_positive__features(self, mocked_send): + json = {"key1": "value1", "key2": "value2"} + # mock request and return dummy data + mocked_send.return_value = get_response(json) + + # initialize 'features' stream class + features = streams.Features({'x_pendo_integration_key': 'test'}) + + resp = features.request('features') + + # verify if the desired data was returned from the request + self.assertEquals(resp, json) + + def test_positive__guides(self, mocked_send): + json = {"key1": "value1", "key2": "value2"} + # mock request and return dummy data + mocked_send.return_value = get_response(json) + + # initialize 'guides' stream class + guides = streams.Guides({'x_pendo_integration_key': 'test'}) + + resp = guides.request('guides') + + # verify if the desired data was returned from the request + self.assertEquals(resp, json) + + def test_positive__pages(self, mocked_send): + json = {"key1": "value1", "key2": "value2"} + # mock request and return dummy data + mocked_send.return_value = get_response(json) + + # initialize 'pages' stream class + pages = streams.Pages({'x_pendo_integration_key': 'test'}) + + resp = pages.request('pages') + + # verify if the desired data was returned from the request + self.assertEquals(resp, json) + + def test_positive__feature_events(self, mocked_send): + json = {"key1": "value1", "key2": "value2"} + # mock request and return dummy data + mocked_send.return_value = get_response(json) + + # initialize 'feature_events' stream class + feature_events = streams.FeatureEvents({'x_pendo_integration_key': 'test'}) + + resp = feature_events.request('feature_events') + + # verify if the desired data was returned from the request + self.assertEquals(resp, json) + + def test_positive__page_events(self, mocked_send): + json = {"key1": "value1", "key2": "value2"} + # mock request and return dummy data + mocked_send.return_value = get_response(json) + + # initialize 'page_events' stream class + page_events = streams.PageEvents({'x_pendo_integration_key': 'test'}) + + resp = page_events.request('page_events') + + # verify if the desired data was returned from the request + self.assertEquals(resp, json) + + def test_positive__guide_events(self, mocked_send): + json = {"key1": "value1", "key2": "value2"} + # mock request and return dummy data + mocked_send.return_value = get_response(json) + + # initialize 'guide_events' stream class + guide_events = streams.GuideEvents({'x_pendo_integration_key': 'test'}) + + resp = guide_events.request('guide_events') + + # verify if the desired data was returned from the request + self.assertEquals(resp, json) + + def test_positive__poll_events(self, mocked_send): + json = {"key1": "value1", "key2": "value2"} + # mock request and return dummy data + mocked_send.return_value = get_response(json) + + # initialize 'poll_events' stream class + poll_events = streams.PollEvents({'x_pendo_integration_key': 'test'}) + + resp = poll_events.request('poll_events') + + # verify if the desired data was returned from the request + self.assertEquals(resp, json) + + def test_positive__track_types(self, mocked_send): + json = {"key1": "value1", "key2": "value2"} + # mock request and return dummy data + mocked_send.return_value = get_response(json) + + # initialize 'track_types' stream class + track_types = streams.TrackTypes({'x_pendo_integration_key': 'test'}) + + resp = track_types.request('track_types') + + # verify if the desired data was returned from the request + self.assertEquals(resp, json) + + def test_positive__track_events(self, mocked_send): + json = {"key1": "value1", "key2": "value2"} + # mock request and return dummy data + mocked_send.return_value = get_response(json) + + # initialize 'track_events' stream class + track_events = streams.TrackEvents({'x_pendo_integration_key': 'test'}) + + resp = track_events.request('track_events') + + # verify if the desired data was returned from the request + self.assertEquals(resp, json) + + def test_positive__metadata_accounts(self, mocked_send): + json = {"key1": "value1", "key2": "value2"} + # mock request and return dummy data + mocked_send.return_value = get_response(json) + + # initialize 'metadata_accounts' stream class + metadata_accounts = streams.MetadataAccounts({'x_pendo_integration_key': 'test'}) + + resp = metadata_accounts.request('metadata_accounts') + + # verify if the desired data was returned from the request + self.assertEquals(resp, json) + + def test_positive__metadata_visitors(self, mocked_send): + json = {"key1": "value1", "key2": "value2"} + # mock request and return dummy data + mocked_send.return_value = get_response(json) + + # initialize 'metadata_visitors' stream class + metadata_visitors = streams.MetadataVisitors({'x_pendo_integration_key': 'test'}) + + resp = metadata_visitors.request('metadata_visitors') + + # verify if the desired data was returned from the request + self.assertEquals(resp, json) + + def test_positive__visitor_history(self, mocked_send): + json = {"key1": "value1", "key2": "value2"} + # mock request and return dummy data + mocked_send.return_value = get_response(json) + + # initialize 'visitor_history' stream class + visitor_history = streams.VisitorHistory({'x_pendo_integration_key': 'test'}) + + resp = visitor_history.request('visitor_history', visitorId=1) + + # verify if the desired data was returned from the request + self.assertEquals(resp, json) + + def test_positive__visitors(self, mocked_send): + json = {"key1": "value1", "key2": "value2"} + # mock request and return dummy data + mocked_send.return_value = get_response(json) + + # initialize 'visitors' stream class + visitors = streams.Visitors({'x_pendo_integration_key': 'test'}) + + resp = visitors.request('visitors') + + # verify if the desired data was returned from the request + self.assertEquals(resp, [json]) + + def test_positive__events(self, mocked_send): + json = {"key1": "value1", "key2": "value2"} + # mock request and return dummy data + mocked_send.return_value = get_response(json) + + # initialize 'events' stream class + events = streams.Events({'x_pendo_integration_key': 'test'}) + + resp = events.request('events') + + # verify if the desired data was returned from the request + self.assertEquals(resp, [json]) diff --git a/tests/unittests/test_backoff_for_conn_reset.py b/tests/unittests/test_backoff_for_conn_reset.py new file mode 100644 index 0000000..1b88603 --- /dev/null +++ b/tests/unittests/test_backoff_for_conn_reset.py @@ -0,0 +1,31 @@ +from unittest import mock +from tap_pendo.streams import Endpoints, Visitors +import unittest +import socket +from requests.models import ProtocolError + +config = {'x_pendo_integration_key': "TEST_KEY"} +stream = Visitors(config=config) +stream.endpoint = Endpoints('', 'GET') + +@mock.patch("time.sleep") +@mock.patch('requests.Session.send') +class TestConnectionResetError(unittest.TestCase): + + def test_connection_reset_error__accounts(self, mocked_send, mocked_sleep): + # mock request and raise error + + config = {'x_pendo_integration_key': "TEST_KEY"} + # initialize 'visitors' stream class + visitors = Visitors(config=config) + stream.endpoint = Endpoints('', 'GET') + + mocked_send.side_effect = socket.error(104, 'Connection reset by peer') + + try: + visitors.request(endpoint=None) + except ConnectionResetError: + pass + + # verify if the request was called 5 times + self.assertEquals(mocked_send.call_count, 5) \ No newline at end of file diff --git a/tests/unittests/test_child_stream_start_date.py b/tests/unittests/test_child_stream_start_date.py new file mode 100644 index 0000000..acf9f8b --- /dev/null +++ b/tests/unittests/test_child_stream_start_date.py @@ -0,0 +1,105 @@ +import unittest +import tap_pendo.streams as streams +from unittest import mock +from singer.utils import strftime +from dateutil.parser import parse + +# stores the arguments that are passed in the 'sync' +# function of child stream for assertion +TEST = [] + +class Schema: + schema = None + + def __init__(self, schema): + self.schema = schema + + def to_dict(self): + return self.schema + +class Test: + schema = Schema({}) + metadata = {} + tap_stream_id = "test" + +# dummy child stream class +class ChildStream: + schema = None + stream = Test() + config = None + name = "test_stream" + replication_key = "date" + key_properties = ["id"] + + # return the data which was passed as argument for transformation in the argument + def transform(*args, **kwargs): + return args[1] + + def sync(*args, **kwargs): + # append 'args' in the TEST variable for assertion + TEST.append(args) + # return dummy data + return [{"id": 1, "date": "2021-02-01T00:00:00Z"}, + {"id": 2, "date": "2021-03-01T00:00:00Z"}] + + def __init__(self, config): + self.config = config + +# dummy parent stream class +class ParentStream: + schema = None + name = "test_stream" + key_properties = ["id"] + + def transform(*args, **kwargs): + return {} + + def sync(*args, **kwargs): + return [] + +def update_bookmark(state, stream, bookmark_value, bookmark_key): + if not state.get("bookmarks").get(stream): + state["bookmarks"][stream] = {} + state["bookmarks"][stream][bookmark_key] = bookmark_value + +def transform(*args, **kwargs): + # return the data with was passed for transformation in the argument + return args[0] + +class TestStartDateOfChildStream(unittest.TestCase): + + @mock.patch("singer.write_schema") + @mock.patch("tap_pendo.streams.Stream.update_bookmark") + @mock.patch("tap_pendo.streams.update_currently_syncing") + @mock.patch("singer.metadata.to_map") + @mock.patch("singer.Transformer.transform") + @mock.patch("singer.write_records") + def test_run(self, mocked_write_records, mocked_transform, mocked_metadata_to_map, mocked_update_currently_syncing, mocked_update_bookmark, mocked_write_schema): + """ + Test case for verifying if the start date / bookmark is used for fetching records + of child stream rather than the updated bookmark from previous child stream sync + """ + # config file + config = {"start_date": "2021-01-01T00:00:00Z"} + + # create dummy parent records + mock_records = [{"id":1}, {"id":2}, {"id":3}] + + # mock update bookmark + mocked_update_bookmark.side_effect = update_bookmark + # mock singer transform + mocked_transform.side_effect = transform + + stream_instance = streams.Stream(config) + + # call function + stream_instance.sync_substream({"bookmarks": {}}, ParentStream(), ChildStream(config), mock_records) + + # iterate over 'TEST' and verify if the start date was passed as argument rather than the updated bookmark + for test in TEST: + # get start date from TEST + start_date = test[2] + # parse start date as it is in the format: 2021-01-01T00:00:00.000000Z + parsed_start_date = parse(strftime(start_date)).strftime("%Y-%m-%dT%H:%M:%SZ") + # verify if the 'parsed_start_date' is same as the start date from config file + self.assertEquals(parsed_start_date, config.get("start_date")) diff --git a/tests/unittests/test_custom_fields.py b/tests/unittests/test_custom_fields.py new file mode 100644 index 0000000..f134b11 --- /dev/null +++ b/tests/unittests/test_custom_fields.py @@ -0,0 +1,303 @@ +import unittest +from unittest import mock +from singer import utils, metadata +from singer.utils import strptime_to_utc, strftime +from tap_pendo.discover import LOGGER, build_metadata_metadata, discover_streams +from tap_pendo.streams import Stream + + +class TestCustomFields(unittest.TestCase): + def test_build_account_visitor_metadata_for_accounts(self): + """ + This function tests that the build_account_visitor_metadata() correctly takes the data from the accounts API + and appends all the custom fields to the custom metadata in the schema. + """ + custom_account_fields = { + "testaccountcfield1": { + "type": "boolean", + "display_name": "testAccountCField1", + "element_type": "", + "element_format": "", + "dirty": True, + "is_hidden": False, + "is_deleted": False, + "is_calculated": False, + "is_per_app": False, + "never_index": False + }, + "testaccountcustomfield": { + "type": "string", + "display_name": "test account custom field", + "element_type": "", + "element_format": "", + "dirty": True, + "is_hidden": False, + "is_deleted": False, + "is_calculated": False, + "is_per_app": False, + "never_index": False + } + } + # the expected schema contains all the custom fields + expected_schema = { + "properties":{ + "custom":{ + "type":[ + "null", + "object" + ], + "additional_properties":"false", + "properties":{ + "testaccountcfield1":{ + "type":[ + "null", + "object" + ], + "additional_properties":"false", + "properties":{ + "type":{ + "type":[ + "null", + "string" + ] + }, + "display_name":{ + "type":[ + "null", + "string" + ] + }, + "element_type":{ + "type":[ + "null", + "string" + ] + }, + "element_format":{ + "type":[ + "null", + "string" + ] + }, + "dirty":{ + "type":[ + "null", + "boolean" + ] + }, + "is_hidden":{ + "type":[ + "null", + "boolean" + ] + }, + "is_deleted":{ + "type":[ + "null", + "boolean" + ] + }, + "is_calculated":{ + "type":[ + "null", + "boolean" + ] + }, + "is_per_app":{ + "type":[ + "null", + "boolean" + ] + }, + "never_index":{ + "type":[ + "null", + "boolean" + ] + } + } + }, + "testaccountcustomfield":{ + "type":[ + "null", + "object" + ], + "additional_properties":"false", + "properties":{ + "type":{ + "type":[ + "null", + "string" + ] + }, + "display_name":{ + "type":[ + "null", + "string" + ] + }, + "element_type":{ + "type":[ + "null", + "string" + ] + }, + "element_format":{ + "type":[ + "null", + "string" + ] + }, + "dirty":{ + "type":[ + "null", + "boolean" + ] + }, + "is_hidden":{ + "type":[ + "null", + "boolean" + ] + }, + "is_deleted":{ + "type":[ + "null", + "boolean" + ] + }, + "is_calculated":{ + "type":[ + "null", + "boolean" + ] + }, + "is_per_app":{ + "type":[ + "null", + "boolean" + ] + }, + "never_index":{ + "type":[ + "null", + "boolean" + ] + } + } + } + } + } + } + } + mdata = {} + schema = {'properties': {}} + build_metadata_metadata(mdata, schema, custom_account_fields) + self.assertEqual(schema, expected_schema) + + def test_build_account_visitor_metadata_for_visitors(self): + """ + This function tests that the build_account_visitor_metadata() correctly takes the data from the accounts API + and appends all the custom fields to the custom metadata in the schema. + """ + custom_visitor_fields = { + "testcustomfield": { + "type": "string", + "display_name": "testCustomField", + "element_type": "", + "element_format": "", + "dirty": True, + "is_hidden": False, + "is_deleted": False, + "is_calculated": False, + "is_per_app": False, + "never_index": False + } + } + # the expected schema contains all the custom fields + expected_schema = { + "properties":{ + "custom":{ + "type":[ + "null", + "object" + ], + "additional_properties":"false", + "properties":{ + "testcustomfield":{ + "type":[ + "null", + "object" + ], + "additional_properties":"false", + "properties":{ + "type":{ + "type":[ + "null", + "string" + ] + }, + "display_name":{ + "type":[ + "null", + "string" + ] + }, + "element_type":{ + "type":[ + "null", + "string" + ] + }, + "element_format":{ + "type":[ + "null", + "string" + ] + }, + "dirty":{ + "type":[ + "null", + "boolean" + ] + }, + "is_hidden":{ + "type":[ + "null", + "boolean" + ] + }, + "is_deleted":{ + "type":[ + "null", + "boolean" + ] + }, + "is_calculated":{ + "type":[ + "null", + "boolean" + ] + }, + "is_per_app":{ + "type":[ + "null", + "boolean" + ] + }, + "never_index":{ + "type":[ + "null", + "boolean" + ] + } + } + } + } + } + } + } + mdata = {} + schema = {'properties': {}} + build_metadata_metadata(mdata, schema, custom_visitor_fields) + self.assertEqual(schema, expected_schema) diff --git a/tests/unittests/test_endpoints_dict.py b/tests/unittests/test_endpoints_dict.py new file mode 100644 index 0000000..0068da6 --- /dev/null +++ b/tests/unittests/test_endpoints_dict.py @@ -0,0 +1,18 @@ +from tap_pendo.streams import Endpoints, Stream + +config = {'x_pendo_integration_key': "TEST_KEY"} +stream = Stream(config=config) +stream.endpoint = Endpoints("/api/v1/aggregation", "POST", {"headers": "headers"}, {"params": "params"}) + + +def test_correct_values_passed_in_endpoint_object(): + assert stream.endpoint.endpoint == "/api/v1/aggregation" + assert stream.endpoint.method == "POST" + assert stream.endpoint.headers == {"headers": "headers"} + assert stream.endpoint.params == {"params": "params"} + +def test_correct_endpoint_url(): + stream.endpoint = Endpoints( + "/api/v1/visitor/{visitorID}/history", "GET") + formatted_url = stream.endpoint.get_url(visitorID='abc') + assert formatted_url == 'https://app.pendo.io/api/v1/visitor/abc/history' \ No newline at end of file diff --git a/tests/unittests/test_full_table_sync_record_count.py b/tests/unittests/test_full_table_sync_record_count.py new file mode 100644 index 0000000..b5da286 --- /dev/null +++ b/tests/unittests/test_full_table_sync_record_count.py @@ -0,0 +1,51 @@ +import unittest +from unittest import mock +from tap_pendo.sync import sync_full_table +import tap_pendo.streams as streams + +class Schema: + schema = None + + def __init__(self, schema): + self.schema = schema + + def to_dict(self): + return self.schema + +class MockStream: + tap_stream_id = None + schema = None + metadata = {} + + def __init__(self, id): + self.tap_stream_id = id + self.schema = Schema({}) + +class TestFullTableSyncRecordCount(unittest.TestCase): + + @mock.patch("tap_pendo.streams.Stream.sync") + @mock.patch("singer.write_record") + def test_valid_value_for_replication_key(self, mocked_write, mocked_sync): + """ + Verify that 'counter.value' ie. number of records returned from + 'sync_full_table' is same as the number of records + """ + + mock_config = mock_state = {} + + # create dummy records + mock_records = [{"id":1, "name": "test1"}, + {"id":2, "name": "test2"}, + {"id":2, "name": "test3"}] + + # 'sync' returns Stream class and records + mocked_sync.return_value = MockStream('test'), mock_records + + stream_instance = streams.Stream(mock_config) + stream_instance.stream = MockStream('test') + + # call the full table sync function + counter = sync_full_table(mock_state, stream_instance) + + # verify that the counter is same as the number of dummy records + self.assertEqual(counter, len(mock_records)) diff --git a/tests/unittests/test_incremental.py b/tests/unittests/test_incremental.py new file mode 100644 index 0000000..2cf02ab --- /dev/null +++ b/tests/unittests/test_incremental.py @@ -0,0 +1,87 @@ +import unittest +from unittest import mock + +import singer +import tap_pendo.streams as streams +from tap_pendo.sync import sync_stream + +class Schema: + schema = None + + def __init__(self, schema): + self.schema = schema + + def to_dict(self): + return self.schema + +class MockStream: + tap_stream_id = None + schema = None + metadata = {} + + def __init__(self, id): + self.tap_stream_id = id + self.schema = Schema({}) + +@mock.patch("requests.Session.send") +@mock.patch("tap_pendo.streams.Stream.sync") +@mock.patch("singer.write_record") +@mock.patch("singer.write_state") +class TestIncremental(unittest.TestCase): + + def test_scenario_1(self, mocked_state, mocked_write, mocked_sync, mocked_request): + ''' + Verify that all records are written as both are as + the replication key is later than start date + ''' + mock_config = mock_state = {} + mock_start_date = "2021-01-10T00:00:00Z" + mock_records = [{"id":1, "lastupdated": "2021-01-12T00:00:00Z"}, + {"id":2, "lastupdated": "2021-01-15T00:00:00Z"}] + mocked_sync.return_value = MockStream('test'), mock_records + + stream_instance = streams.Stream(mock_config) + stream_instance.replication_key = 'lastupdated' + stream_instance.stream = MockStream('test') + sync_stream(mock_state, mock_start_date, stream_instance) + + # Verify that write record is called for 2 records + self.assertEqual(mocked_write.call_count, 2) + + def test_scenario_2(self, mocked_state, mocked_write, mocked_sync, mocked_request): + ''' + Verify that only 1 record is written as + it is updated later than the start date + ''' + mock_config = mock_state = {} + mock_start_date = "2021-01-10T00:00:00Z" + mock_records = [{"id":1, "lastupdated": "2021-01-12T00:00:00Z"}, + {"id":2, "lastupdated": "2021-01-08T00:00:00Z"}] + mocked_sync.return_value = MockStream('test'), mock_records + + stream_instance = streams.Stream(mock_config) + stream_instance.replication_key = 'lastupdated' + stream_instance.stream = MockStream('test') + sync_stream(mock_state, mock_start_date, stream_instance) + + # Verify that write record is called for 1 records + self.assertEqual(mocked_write.call_count, 1) + + def test_scenario_3(self, mocked_state, mocked_write, mocked_sync, mocked_request): + ''' + Verify that none of the records were written + as both were updated before the start date + ''' + mock_config = mock_state = {} + mock_start_date = "2021-01-10T00:00:00Z" + mock_records = [{"id":1, "lastupdated": "2021-01-01T00:00:00Z"}, + {"id":2, "lastupdated": "2021-01-08T00:00:00Z"}] + mocked_sync.return_value = MockStream('test'), mock_records + + stream_instance = streams.Stream(mock_config) + stream_instance.replication_key = 'lastupdated' + stream_instance.stream = MockStream('test') + sync_stream(mock_state, mock_start_date, stream_instance) + + # Verify that write record is called for 0 records + self.assertEqual(mocked_write.call_count, 0) diff --git a/tests/unittests/test_lazy_aggregation_sync.py b/tests/unittests/test_lazy_aggregation_sync.py new file mode 100644 index 0000000..f5db0ec --- /dev/null +++ b/tests/unittests/test_lazy_aggregation_sync.py @@ -0,0 +1,76 @@ +import unittest +import requests +from unittest import mock +from tap_pendo.streams import Visitors + +class Mockresponse: + def __init__(self, resp, status_code, headers=None, raise_error=False): + self.status_code = status_code + self.raw = resp + self.headers = headers + self.raise_error = raise_error + self.reason = "error" + + def raise_for_status(self): + if not self.raise_error: + return self.status_code + + raise requests.HTTPError("mock sample message") + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, tb): + return True + +# Mocking sync of substream +def mocked_substream(state, parent, sub_stream, parent_response): + for record in parent_response: + pass + +class TestLazyAggregationSync(unittest.TestCase): + ''' + Verify that sync of LazzyAggregation is return all the data + ''' + + @mock.patch("requests.Session.send") + @mock.patch("tap_pendo.streams.Stream.is_selected") + @mock.patch("tap_pendo.streams.Stream.sync_substream", side_effect=mocked_substream) + def test_lazzy_aggregation_with_sub_stream(self, mocked_substream, mocked_selected, mocked_request): + ''' + Verify that if sub stream is present then also all data should be return for super stream + and sync_substream should be called + ''' + expected_data = [{"id":1}, {"id":2}, {"id":3}] + records = '{"results": [{"id":1}, {"id":2}, {"id":3}]}' + mocked_selected.return_value = True # Sub stream is selected + mocked_request.return_value = Mockresponse(records, 200, raise_error=False) + config = {'start_date': '2021-01-01T00:00:00Z', + 'x_pendo_integration_key': 'test'} + + lazzy_aggr = Visitors(config) + stream, stream_response = lazzy_aggr.sync({}) + + self.assertEqual(stream_response, expected_data) # parent stream get all expected data + self.assertEqual(mocked_substream.call_count, 1) + + @mock.patch("requests.Session.send") + @mock.patch("tap_pendo.streams.Stream.is_selected") + @mock.patch("tap_pendo.streams.Stream.sync_substream", side_effect=mocked_substream) + def test_lazzy_aggregation_without_sub_stream(self, mocked_substream, mocked_selected, mocked_request): + ''' + Verify that if sub stream is not selected then also all data should be return for super stream + and sync_substream should not be called + ''' + expected_data = [{"id":1}, {"id":2}, {"id":3}] + records = '{"results": [{"id":1}, {"id":2}, {"id":3}]}' + mocked_selected.return_value = False # Sub stream is not selected + mocked_request.return_value = Mockresponse(records, 200, raise_error=False) + config = {'start_date': '2021-01-01T00:00:00Z', + 'x_pendo_integration_key': 'test'} + + lazzy_aggr = Visitors(config) + stream, stream_response = lazzy_aggr.sync({}) + + self.assertEqual(stream_response, expected_data) + self.assertEqual(mocked_substream.call_count, 0) diff --git a/tests/unittests/test_none_date.py b/tests/unittests/test_none_date.py new file mode 100644 index 0000000..6330805 --- /dev/null +++ b/tests/unittests/test_none_date.py @@ -0,0 +1,146 @@ +import unittest +from unittest import mock + +import singer +import tap_pendo.streams as streams +from tap_pendo.sync import sync_stream + +class Schema: + schema = None + + def __init__(self, schema): + self.schema = schema + + def to_dict(self): + return self.schema + +class MockStream: + tap_stream_id = None + schema = None + metadata = {} + + def __init__(self, id): + self.tap_stream_id = id + self.schema = Schema({}) + +class TestNoneReplicatioKeys(unittest.TestCase): + ''' + Verify that none value for replication key in data is handled properly + ''' + + @mock.patch("requests.Session.send") + @mock.patch("tap_pendo.streams.Stream.sync") + @mock.patch("singer.write_record") + def test_valid_value_for_replication_key(self, mocked_write, mocked_sync, mocked_request): + ''' + Verify that if replication key value are present in valid form then tap + write all valid records in tap output + ''' + mock_config = {} + mock_state = {} + mock_start_date = "2021-01-01T00:00:00Z" + mock_records = [{"id":1, "lastupdated": "2021-09-01T00:00:00Z"}, + {"id":2, "lastupdated": "2021-09-02T00:00:00Z"}] + mocked_sync.return_value = MockStream('test'), mock_records + + stream_instance = streams.Stream(mock_config) + stream_instance.name = 'test' + stream_instance.replication_key = 'lastupdated'# set replication ley + stream_instance.stream = MockStream('test') + no_of_record = sync_stream(mock_state, mock_start_date, stream_instance) + + # Verify that write record is called for 2 records + self.assertEqual(mocked_write.call_count, 2) + self.assertEqual(no_of_record, 2) + # Verify state should be updated with expected bookmark + self.assertEqual(mock_state, {'bookmarks': {'test': {'lastupdated': '2021-09-02T00:00:00.000000Z'}}}) + + @mock.patch("requests.Session.send") + @mock.patch("tap_pendo.streams.Stream.sync") + @mock.patch("singer.write_record") + def test_none_or_no_value_for_replication_key(self, mocked_write, mocked_sync, mocked_request): + ''' + Verify that if replication key not present or null value in data then tap should not break and + write all such records in tap output + ''' + mock_config = {} + mock_state = {} + mock_start_date = "2021-01-01T00:00:00Z" + mock_records = [{"id":1},# No replication key present + {"id":2, "lastupdated": "2021-09-01T00:00:00Z"}, + {"id":3, "lastupdated": None}] # Replication key with None value + mocked_sync.return_value = MockStream('test'), mock_records + + stream_instance = streams.Stream(mock_config) + stream_instance.name = 'test' + stream_instance.replication_key = 'lastupdated'# set replication ley + stream_instance.stream = MockStream('test') + no_of_record = sync_stream(mock_state, mock_start_date, stream_instance) + + # Verify that write record is called for 3 records + self.assertEqual(mocked_write.call_count, 3) + self.assertEqual(no_of_record, 3) + # Verify state should be updated with expected bookmark + self.assertEqual(mock_state, {'bookmarks': {'test': {'lastupdated': '2021-09-01T00:00:00.000000Z'}}}) + + + +class TestNoneReplicatioKeysInSubStreams(unittest.TestCase): + ''' + Verify that none value for replication key in data is handled properly + ''' + + @mock.patch("requests.Session.send") + @mock.patch("tap_pendo.streams.Stream.sync") + @mock.patch("singer.write_record") + def test_valid_value_for_replication_key_sub_stream(self, mocked_write, mocked_sync, mocked_request): + ''' + Verify that if replication key value are present in valid form then tap + write all valid records in tap output for sub stream + ''' + mock_config = {"start_date": "2021-01-01T00:00:00Z"} + mock_state = {} + mock_parent_data = [{"id": 1}] + mock_records = [{"id":1, "lastupdated": "2021-09-01T00:00:00Z"}, + {"id":2, "lastupdated": "2021-09-02T00:00:00Z"}] + mocked_sync.return_value = mock_records + + parent_instance = streams.Stream(mock_config) + sub_stream = streams.Stream(mock_config) + sub_stream.name = 'test' + sub_stream.replication_key = 'lastupdated'# set replication ley + sub_stream.stream = MockStream('test') + parent_instance.sync_substream(mock_state, parent_instance, sub_stream, mock_parent_data) + + # Verify that write record is called for 2 records + self.assertEqual(mocked_write.call_count, 2) + # Verify state should be updated with expected bookmark + self.assertEqual(mock_state, {'bookmarks': {'test': {'lastupdated': '2021-09-02T00:00:00.000000Z'}}, 'currently_syncing': None}) + + @mock.patch("requests.Session.send") + @mock.patch("tap_pendo.streams.Stream.sync") + @mock.patch("singer.write_record") + def test_none_or_no_value_for_replication_key_sub_stream(self, mocked_write, mocked_sync, mocked_request): + ''' + Verify that if replication key not present or null value in data then tap should not break and + write all such records in tap output for sub stream + ''' + mock_config = {"start_date": "2021-01-01T00:00:00Z"} + mock_state = {} + mock_parent_data = [{"id": 1}] + mock_records = [{"id":1},# No replication key present + {"id":2, "lastupdated": "2021-09-01T00:00:00Z"}, + {"id":3, "lastupdated": None}] # Replication key with None value + mocked_sync.return_value = mock_records + + parent_instance = streams.Stream(mock_config) + sub_stream = streams.Stream(mock_config) + sub_stream.name = 'test' + sub_stream.replication_key = 'lastupdated'# set replication ley + sub_stream.stream = MockStream('test') + parent_instance.sync_substream(mock_state, parent_instance, sub_stream, mock_parent_data) + + # Verify that write record is called for 3 records + self.assertEqual(mocked_write.call_count, 3) + # Verify state should be updated with expected bookmark + self.assertEqual(mock_state, {'bookmarks': {'test': {'lastupdated': '2021-09-01T00:00:00.000000Z'}}, 'currently_syncing': None}) diff --git a/tests/unittests/test_timeout_value.py b/tests/unittests/test_timeout_value.py new file mode 100644 index 0000000..c2d422e --- /dev/null +++ b/tests/unittests/test_timeout_value.py @@ -0,0 +1,225 @@ +import unittest +import requests +from unittest import mock +import tap_pendo.streams as streams + +class Mockresponse: + def __init__(self, status_code, json, raise_error, headers=None): + self.status_code = status_code + self.raise_error = raise_error + self.text = json + self.headers = headers + self.reason = "test" + self.raw = '{"results": [{"key1": "value1", "key2": "value2"}]}' + + def __enter__(self): + return self + + def __exit__(self, *args): + return True + + def raise_for_status(self): + if not self.raise_error: + return self.status_code + + raise requests.HTTPError("Sample message") + + def json(self): + return self.text + +def get_response(json={}): + return Mockresponse(200, json, False) + +@mock.patch("time.sleep") +@mock.patch('requests.Session.send') +class TestTimeOutValue(unittest.TestCase): + + def test_timeout_value_in_config__Stream(self, mocked_send, mocked_sleep): + """ + Verify if the request was called with timeout value param passed in the config file + """ + json = {"key1": "value1", "key2": "value2"} + mocked_send.return_value = get_response(json) + + # pass 'request_timeout' param in the config + stream = streams.Stream({'x_pendo_integration_key': 'test', 'request_timeout': 100}) + + stream.send_request_get_results('test_req') + + # verify if the request was called with the desired timeout + mocked_send.assert_called_with('test_req', timeout=100.0) + + def test_timeout_value_not_in_config__Stream(self, mocked_send, mocked_sleep): + """ + Verify if the request was called with default timeout value + as the timeout param is not passed in the config file + """ + json = {"key1": "value1", "key2": "value2"} + mocked_send.return_value = get_response(json) + + # not pass 'request_timeout' param in the config + stream = streams.Stream({'x_pendo_integration_key': 'test'}) + + stream.send_request_get_results('test_req') + + # verify if the request was called with default timeout + mocked_send.assert_called_with('test_req', timeout=300.0) + + def test_timeout_string__Stream(self, mocked_send, mocked_sleep): + """ + Verify if the request was called with integer timeout + as param passed in the config file is in string + """ + json = {"key1": "value1", "key2": "value2"} + mocked_send.return_value = get_response(json) + + # pass string value of 'request_timeout' in the config + stream = streams.Stream({'x_pendo_integration_key': 'test', 'request_timeout': "100"}) + + stream.send_request_get_results('test_req') + + # verify if the request was called with passed timeout param + mocked_send.assert_called_with('test_req', timeout=100.0) + + def test_timeout_empty__Stream(self, mocked_send, mocked_sleep): + """ + Verify if the request was called with default timeout + as param passed in the config file is empty string + """ + json = {"key1": "value1", "key2": "value2"} + mocked_send.return_value = get_response(json) + + # pass empty string value of 'request_timeout' in the config + stream = streams.Stream({'x_pendo_integration_key': 'test', 'request_timeout': ""}) + + stream.send_request_get_results('test_req') + + # verify if the request was called with passed timeout param + mocked_send.assert_called_with('test_req', timeout=300.0) + + def test_timeout_0__Stream(self, mocked_send, mocked_sleep): + """ + Verify if the request was called with default timeout + as param passed in the config file is 0 + """ + json = {"key1": "value1", "key2": "value2"} + mocked_send.return_value = get_response(json) + + # pass empty string value of 'request_timeout' in the config + stream = streams.Stream({'x_pendo_integration_key': 'test', 'request_timeout': 0.0}) + + stream.send_request_get_results('test_req') + + # verify if the request was called with passed timeout param + mocked_send.assert_called_with('test_req', timeout=300.0) + + def test_timeout_string_0__Stream(self, mocked_send, mocked_sleep): + """ + Verify if the request was called with default timeout + as param passed in the config file is string 0 + """ + json = {"key1": "value1", "key2": "value2"} + mocked_send.return_value = get_response(json) + + # pass empty string value of 'request_timeout' in the config + stream = streams.Stream({'x_pendo_integration_key': 'test', 'request_timeout': "0.0"}) + + stream.send_request_get_results('test_req') + + # verify if the request was called with passed timeout param + mocked_send.assert_called_with('test_req', timeout=300.0) + + def test_timeout_value_in_config__LazyAggregation(self, mocked_send, mocked_sleep): + """ + Verify if the request was called with timeout value param passed in the config file + """ + json = {"key1": "value1", "key2": "value2"} + mocked_send.return_value = get_response(json) + + # pass 'request_timeout' param in the config + stream = streams.LazyAggregationStream({'x_pendo_integration_key': 'test', 'request_timeout': 100}) + + stream.send_request_get_results('test_req') + + # verify if the request was called with the desired timeout + mocked_send.assert_called_with('test_req', stream=True, timeout=100.0) + + def test_timeout_value_not_in_config__LazyAggregation(self, mocked_send, mocked_sleep): + """ + Verify if the request was called with default timeout value + as the timeout param is not passed in the config file + """ + json = {"key1": "value1", "key2": "value2"} + mocked_send.return_value = get_response(json) + + # not pass 'request_timeout' param in the config + stream = streams.LazyAggregationStream({'x_pendo_integration_key': 'test'}) + + stream.send_request_get_results('test_req') + + # verify if the request was called with default timeout + mocked_send.assert_called_with('test_req', stream=True, timeout=300.0) + + def test_timeout_string__LazyAggregation(self, mocked_send, mocked_sleep): + """ + Verify if the request was called with integer timeout + as param passed in the config file is in string + """ + json = {"key1": "value1", "key2": "value2"} + mocked_send.return_value = get_response(json) + + # pass string value of 'request_timeout' in the config + stream = streams.LazyAggregationStream({'x_pendo_integration_key': 'test', 'request_timeout': "100"}) + + stream.send_request_get_results('test_req') + + # verify if the request was called with passed timeout param + mocked_send.assert_called_with('test_req', stream=True, timeout=100.0) + + def test_timeout_empty__LazyAggregation(self, mocked_send, mocked_sleep): + """ + Verify if the request was called with default timeout + as param passed in the config file is empty string + """ + json = {"key1": "value1", "key2": "value2"} + mocked_send.return_value = get_response(json) + + # pass string value of 'request_timeout' in the config + stream = streams.LazyAggregationStream({'x_pendo_integration_key': 'test', 'request_timeout': ""}) + + stream.send_request_get_results('test_req') + + # verify if the request was called with passed timeout param + mocked_send.assert_called_with('test_req', stream=True, timeout=300.0) + + def test_timeout_0__LazyAggregation(self, mocked_send, mocked_sleep): + """ + Verify if the request was called with default timeout + as param passed in the config file is 0 + """ + json = {"key1": "value1", "key2": "value2"} + mocked_send.return_value = get_response(json) + + # pass string value of 'request_timeout' in the config + stream = streams.LazyAggregationStream({'x_pendo_integration_key': 'test', 'request_timeout': 0.0}) + + stream.send_request_get_results('test_req') + + # verify if the request was called with passed timeout param + mocked_send.assert_called_with('test_req', stream=True, timeout=300.0) + + def test_timeout_string_0__LazyAggregation(self, mocked_send, mocked_sleep): + """ + Verify if the request was called with default timeout + as param passed in the config file is string 0 + """ + json = {"key1": "value1", "key2": "value2"} + mocked_send.return_value = get_response(json) + + # pass string value of 'request_timeout' in the config + stream = streams.LazyAggregationStream({'x_pendo_integration_key': 'test', 'request_timeout': "0.0"}) + + stream.send_request_get_results('test_req') + + # verify if the request was called with passed timeout param + mocked_send.assert_called_with('test_req', stream=True, timeout=300.0)