Skip to content

Commit

Permalink
config.json format changed to obj to work better with meltano
Browse files Browse the repository at this point in the history
  • Loading branch information
Alban King committed Nov 13, 2023
1 parent 6d35b32 commit 8a94a30
Show file tree
Hide file tree
Showing 9 changed files with 459 additions and 9 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ repos:
rev: v1.4.1
hooks:
- id: mypy
exclude: "dev/.*\\.py"
additional_dependencies:
- types-requests
114 changes: 114 additions & 0 deletions dev/infer-schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#! /usr/bin/env python
""" Natively infer the top level schema from of an Inventio stream
Inventio records are commonly flat objects with string values.
Any item which appears to only be null is assumed to possibly
be a string
"""

from __future__ import annotations

import sys
import json
import argparse
from collections import defaultdict


def log(*args, **kwargs):
print(*args, **kwargs, file=sys.stderr)


def serialize(obj):
if isinstance(obj, set):
return serialize(list(obj))

if obj == ["null"]:
# null by itself is not useful, add str by default
return ["null", "string"]

return obj


def get_record(line: str, *, is_singer_format=False) -> dict | None:
try:
record = json.loads(line)

if not isinstance(record, dict):
log(f"row doesn't look like a record: {line!r}")
return None

if not is_singer_format:
return record

if record.get("type") == "RECORD":
return record.get("record")

except json.JSONDecodeError as err:
log(f"failed to pass {line!r}, error: {err}")

return None


def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument(
"-p", "--pretty", action="store_true", help="pretty print schema"
)
parser.add_argument("-r", "--required", nargs="+", help="required keys")
parser.add_argument(
"-s",
"--singer-style",
action="store_true",
help="are records coming straight from the tap? extract only record content",
)

args = parser.parse_args()

properties = defaultdict(lambda: {"type": {"null"}})

schema = {"type": "object", "properties": properties}

for line in sys.stdin.read().split("\n"):
if line:
record = get_record(line, is_singer_format=args.singer_style)

if not record:
continue

else:
for key, value in record.items():
if isinstance(value, dict):
_type = "object"

elif isinstance(value, list):
_type = "array"

elif isinstance(value, (int, float)):
_type = "number"

elif isinstance(value, bool):
_type = "bool"

elif value is None:
_type = "null"

else: # string is the default type
_type = "string"

schema["properties"][key]["type"].add(_type)

if args.required:
schema["required"] = {"company_name"} | set(args.required)
for key in schema["required"]:
if key not in schema["properties"]:
raise ValueError(
f"required property {key!r} was not found in object keys {list(schema['properties'])}"
)

print(json.dumps(schema, default=serialize, indent=(2 if args.pretty else None)))

return 0


if __name__ == "__main__":
raise SystemExit(main())
80 changes: 80 additions & 0 deletions dev/inventio_get.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#! /usr/bin/env python
"""Get response from Inventio API and format as json."""

from __future__ import annotations

import argparse
import json
import sys

import requests
import xmltodict


def log(*args, **kwargs):
print(*args, **kwargs, file=sys.stderr)


def get(url) -> dict:
return xmltodict.parse(requests.get(url).content)


def main(argv: str | None = None) -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--url", help="complete url query")
parser.add_argument("-c", "--company", help="company")
parser.add_argument("-t", "--type", help="the 'type' of endpoint")
parser.add_argument("-k", "--token", help="endpoint token")
parser.add_argument(
"-p",
"--pretty",
action="store_true",
help="pretty format as indented json",
)
parser.add_argument(
"-l",
"--limit",
type=int,
help="limit the response from the API",
)

args = parser.parse_args(argv)
exit_code = 0

if not bool(args.url) ^ bool(
args.company and args.type and args.token,
): # xor (^) means choose one
msg = "Too many arguments. Supply only --url, or all of --company, --type, and --token"
raise ValueError(msg)

limit_str = f"&limit={args.limit}" if args.limit else ""

url = (
args.url
or f"https://app.cloud.inventio.it/{args.company}/smartapi/?type={args.type}&token={args.token}{limit_str}"
)
log(f"getting from {url!r}")

content_json = get(url)

if "error" in content_json:
log(f"error: {content_json}")
exit_code = 1

try:
print(
json.dumps(
content_json,
**({"indent": 2, "default": str} if args.pretty else {}),
),
)

except json.JSONDecodeError as e:
log(f"failed to parse result json: {e}")
exit_code = 1

return exit_code


if __name__ == "__main__":
raise SystemExit(main())
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ ignore = [
select = ["ALL"]
src = ["tap_inventio"]
target-version = "py37"
exclude = ["dev"]


[tool.ruff.flake8-annotations]
Expand Down
5 changes: 4 additions & 1 deletion tap_inventio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ class InventioStream(RESTStream):

# path is required by design of the RESTStream. it is not used
path = None
records_jsonpath = "$.entries.entry[*]" # .entries.entry[*]

_current_company_name: str | None = None

Expand Down Expand Up @@ -249,6 +248,10 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]:
self.path = self.name # So the endpoint will be printed in the error
raise FatalAPIError(self.response_error_message(response))

if not self.records_jsonpath:
msg = "'records_jsonpath' must be specified"
raise NotImplementedError(msg)

yield from extract_jsonpath(self.records_jsonpath, input=json_response)

def post_process(
Expand Down
Loading

0 comments on commit 8a94a30

Please sign in to comment.