Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tdl 15987 all non pk changes #72

Merged
merged 13 commits into from
Nov 1, 2021
27 changes: 16 additions & 11 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: 2
jobs:
build:
docker:
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:tap-tester-v4
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:stitch-tap-tester
steps:
- checkout
- run:
Expand All @@ -11,7 +11,7 @@ jobs:
python3 -mvenv /usr/local/share/virtualenvs/tap-pendo
source /usr/local/share/virtualenvs/tap-pendo/bin/activate
pip install -U pip setuptools
pip install .[dev]
pip install .[test]
- run:
name: 'JSON Validator'
command: |
Expand All @@ -23,20 +23,25 @@ jobs:
source /usr/local/share/virtualenvs/tap-pendo/bin/activate
# TODO: Adjust the pylint disables
pylint tap_pendo --disable 'broad-except,chained-comparison,empty-docstring,fixme,invalid-name,line-too-long,missing-class-docstring,missing-function-docstring,missing-module-docstring,no-else-raise,no-else-return,too-few-public-methods,too-many-arguments,too-many-branches,too-many-lines,too-many-locals,ungrouped-imports,wrong-spelling-in-comment,wrong-spelling-in-docstring,bad-whitespace,missing-class-docstring'
- run:
name: 'Unit Tests'
command: |
source /usr/local/share/virtualenvs/tap-pendo/bin/activate
pip install coverage
nosetests --with-coverage --cover-erase --cover-package=tap_pendo --cover-html-dir=htmlcov tests/unittests
coverage html
- store_test_results:
path: test_output/report.xml
- store_artifacts:
path: htmlcov
- add_ssh_keys
- run:
name: 'Integration Tests'
command: |
aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/sandbox dev_env.sh
aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/tap_tester_sandbox dev_env.sh
source dev_env.sh
source /usr/local/share/virtualenvs/tap-tester/bin/activate
run-test --tap=tap-pendo \
--target=target-stitch \
--orchestrator=stitch-orchestrator \
[email protected] \
--password=$SANDBOX_PASSWORD \
--client-id=50 \
tests/tap_tester
run-test --tap=tap-pendo tests/tap_tester
workflows:
version: 2
commit:
Expand All @@ -53,4 +58,4 @@ workflows:
- master
jobs:
- build:
context: circleci-user
context: circleci-user
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,13 +245,15 @@ Interrupted syncs for Event type stream are resumed via a bookmark placed during
- `x_pendo_integration_key` (string, `ABCdef123`): an integration key from Pendo.
- `period` (string, `ABCdef123`): `dayRange` or `hourRange`
- `lookback_window` (integer): 10 (For event objects. Default: 0)
- `request_timeout` (integer): 300 (For passing timeout to the request. Default: 300)

```json
{
"x_pendo_integration_key": "YOUR_INTEGRATION_KEY",
"start_date": "2020-09-18T00:00:00Z",
"period": "dayRange",
"lookback_window": 10,
"request_timeout": 300,
"include_anonymous_visitors: "true"
}
```
Expand Down
11 changes: 7 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,19 @@
classifiers=["Programming Language :: Python :: 3 :: Only"],
py_modules=["tap_pendo"],
install_requires=[
'singer-python==5.2.1',
'singer-python==5.12.2',
"requests",
'pyhumps==1.3.1',
'backoff==1.3.2',
'backoff==1.8.0',
'ijson==3.1.4',
],
extras_require={
'dev': [
'ipdb==0.11',
'test': [
'pylint==2.5.3',
'nose'
],
'dev': [
'ipdb==0.11'
]
},
entry_points="""
Expand Down
18 changes: 14 additions & 4 deletions tap_pendo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@


def do_discover(config):
# Discover schemas for all streams and dump catalog. Also validate the credentials provided in config.json
LOGGER.info("Starting discover")
catalog = {"streams": discover_streams(config)}
json.dump(catalog, sys.stdout, indent=2)
LOGGER.info("Finished discover")


def stream_is_selected(mdata):
# Check stream selected or not from metadata
return mdata.get((), {}).get('selected', False)


def get_sub_stream_ids():
# Return list of all the sub streams of the streams
sub_stream_ids = []
for _, value in SUB_STREAMS.items():
sub_stream_ids.append(value)
Expand All @@ -39,6 +42,7 @@ class DependencyException(Exception):


def validate_dependencies(selected_stream_ids):
# Validate and raise exceptions if sub-streams are selected but related parents are not selected
errs = []
msg_tmpl = ("Unable to extract {0} data. "
"To receive {0} data, you also need to select {1}.")
Expand All @@ -54,6 +58,7 @@ def validate_dependencies(selected_stream_ids):


def populate_class_schemas(catalog, selected_stream_ids):
# Populate class schemas for all the streams selected in the catalog
for stream in catalog.streams:
if stream.tap_stream_id in selected_stream_ids:
STREAMS[stream.tap_stream_id].stream = stream
Expand All @@ -66,6 +71,7 @@ def get_abs_path(path):


def load_schemas():
# Open and read JSON schemas of streams and return a dictionary of schemas
schemas = {}

for filename in os.listdir(get_abs_path('schemas')):
Expand All @@ -78,6 +84,7 @@ def load_schemas():


def get_selected_streams(catalog):
# Return list of all the selected streams in catalog
selected_stream_ids = []
for stream in catalog.streams:
mdata = metadata.to_map(stream.metadata)
Expand All @@ -90,9 +97,9 @@ def sync(config, state, catalog):
LOGGER.info("Starting with state %s", state)
start_date = config['start_date']

selected_stream_ids = get_selected_streams(catalog)
validate_dependencies(selected_stream_ids)
populate_class_schemas(catalog, selected_stream_ids)
selected_stream_ids = get_selected_streams(catalog) # Get list of selected streams from catalog
validate_dependencies(selected_stream_ids) # Validate parent-child streams dependencies
populate_class_schemas(catalog, selected_stream_ids) # Populate schemas for selected streams
all_sub_stream_ids = get_sub_stream_ids()

# Loop over streams in catalog
Expand All @@ -106,13 +113,16 @@ def sync(config, state, catalog):
LOGGER.info('START Syncing: %s', stream_id)
update_currently_syncing(state, stream_id)

# Write schema of streams to STDOUT
key_properties = metadata.get(mdata, (), 'table-key-properties')
singer.write_schema(
stream_id, stream.schema.to_dict(), key_properties)

sub_stream_ids = SUB_STREAMS.get(stream_id)

# Populate class schemas and write a schema for the selected substreams of the stream
if sub_stream_ids:
# Loop over sub-streams of current stream
for sub_stream_id in sub_stream_ids:
if sub_stream_id not in selected_stream_ids:
continue
Expand All @@ -128,7 +138,7 @@ def sync(config, state, catalog):
continue

LOGGER.info("Stream %s: Starting sync", stream_id)
instance = STREAMS[stream_id](config)
instance = STREAMS[stream_id](config) # Intialize class for selected stream

counter_value = 0
if instance.replication_method == "INCREMENTAL":
Expand Down
10 changes: 8 additions & 2 deletions tap_pendo/discover.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ def get_abs_path(path):


def get_custom_fields(instance):
# Return custom fields to add into a catalog for provided instance(stream)
return instance.get_fields().get('custom')


def get_schema_property_type(schema_type):
# Return dictionary of types to add into catalog based on provided data type
if schema_type == 'string':
return {"type": ["null", "string"]}
elif schema_type == 'time':
Expand All @@ -67,12 +69,13 @@ def get_schema_property_type(schema_type):


def build_metadata_metadata(mdata, schema, custom_fields):
# Build metadata with custom fields for streams 'metadata_accounts' and 'metadata_visitors'
if 'custom' not in schema['properties']:
schema['properties']['custom'] = {}
schema['properties']['custom']['type'] = ["null", "object"]
schema['properties']['custom']['additional_properties'] = "false"
for key, _ in custom_fields.items():
schema['properties']['custom']['properties'] = {}
for key, _ in custom_fields.items():
schema['properties']['custom']['properties'][key] = {}
schema['properties']['custom']['properties'][key]['type'] = [
"null", "object"
Expand All @@ -86,6 +89,7 @@ def build_metadata_metadata(mdata, schema, custom_fields):


def build_account_visitor_metadata(mdata, schema, custom_fields):
# Build metadata with custom fields for streams 'accounts' and 'visitors'
if 'metadata_custom' not in schema['properties']:
schema['properties']['metadata_custom'] = {}
schema['properties']['metadata_custom']['type'] = ["null", "object"]
Expand All @@ -102,6 +106,7 @@ def build_account_visitor_metadata(mdata, schema, custom_fields):


def discover_streams(config):
# Discover schemas, build metadata for all the steams and return catalog
streams = []

LOGGER.info("Discovering custom fields for Accounts")
Expand All @@ -116,9 +121,10 @@ def discover_streams(config):

s = s(config)

schema = s.load_schema()
schema = s.load_schema() # load schema for the stream
mdata = metadata.to_map(s.load_metadata())

# additionally, build metadata for custom fields for below streams
if s.name == 'accounts':
build_account_visitor_metadata(mdata, schema,
custom_account_fields)
Expand Down
Loading