From 96c5ad0dfe982a6bf55e719890e4a220814d5dca Mon Sep 17 00:00:00 2001 From: Ryan Samarakoon Date: Fri, 8 Apr 2022 08:45:46 +1000 Subject: [PATCH] Add missing endpoints available in singer-io/tap-github (#93) * Add assignees stream * Fix issues with assignee stream * Add collaborators stream * Add review comments and reviews stream * Fix review comments stream and use repo parent instead * Fix mypy issue * Fix tests * add milestone and commit comment streams * Fix mypy * Fix tests * commit wip todo streams * fix formatting * [ci skip] format todo file and fix arraytype usage * [ci skip] more regex magic to convert everything to classes * Add paths [ci skip] * Move all streams to main file * Add replication keys * fix tests (change type to datetime) * introduce streams enum * Fix up organization stream * Reverse order of testing versions * remove unsupported types from class * Fix format * Try use capital types to pass ci * Fix tap not including org streams on organization given * Add test for org stream * Add rest of org streams * [ci skip] Temp changes for testing * Fix parent context being missing * Set ignore parent replication to true for project * fix mypy issue * fix mistyped params * Add parent keys * Fix mistyped params * Fix mistyped ids in events * [ci skip] Remove pointless comment * Change ignore parent key to true * update ignore_parent_replication and remove unneeded import * Simple comment [ci skip] * Work on comments [ci skip] * Work on comments [ci skip] * Fix mistyped stuff (good catch Laurent) and more comment addressing * Update fixture comment [ci skip] * Add bunch of meltano lab sample projects * update state partitioning keys * Fix merge * Add ORG_LEVEL_TOKEN to be used only for specific streams * Add docstring to alternative_sync_chidren Co-authored-by: Eric Boucher --- .github/workflows/test_tap.yml | 7 +- README.md | 1 + tap_github/client.py | 4 +- tap_github/organization_streams.py | 164 ++++++++ tap_github/repository_streams.py | 624 +++++++++++++++++++++++++++-- tap_github/streams.py | 111 +++++ tap_github/tap.py | 71 +--- tap_github/tests/fixtures.py | 41 +- tap_github/tests/test_core.py | 30 +- tap_github/tests/test_tap.py | 52 ++- tap_github/user_streams.py | 10 +- 11 files changed, 989 insertions(+), 126 deletions(-) create mode 100644 tap_github/organization_streams.py create mode 100644 tap_github/streams.py diff --git a/.github/workflows/test_tap.yml b/.github/workflows/test_tap.yml index 8d31fd9a..a47e091f 100644 --- a/.github/workflows/test_tap.yml +++ b/.github/workflows/test_tap.yml @@ -13,9 +13,10 @@ jobs: runs-on: ubuntu-latest env: GITHUB_TOKEN: ${{secrets.GITHUB_TOKEN}} + ORG_LEVEL_TOKEN: ${{secrets.ORG_LEVEL_TOKEN}} strategy: matrix: - python-version: [3.7, 3.8, 3.9, "3.10"] + python-version: ["3.10", 3.9, 3.8, 3.7] # run the matrix jobs one after the other so they can benefit from caching max-parallel: 1 @@ -28,7 +29,9 @@ jobs: path: '**/api_calls_tests_cache.sqlite' # github cache expires after 1wk, and we expire the content after 24h # this key should not need to change unless we need to clear the cache - key: api-cache-v2 + key: api-cache-v3 + restore-keys: | + api-cache-v2 - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v2 with: diff --git a/README.md b/README.md index 65cfd83d..7c249379 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,7 @@ tap-github --config CONFIG --discover > ./catalog.json ``` ## Contributing +This project uses parent-child streams. Learn more about them [here.](https://gitlab.com/meltano/sdk/-/blob/main/docs/parent_streams.md) ### Initialize your Development Environment diff --git a/tap_github/client.py b/tap_github/client.py index af8ef299..b686119e 100644 --- a/tap_github/client.py +++ b/tap_github/client.py @@ -121,7 +121,7 @@ def get_next_page_token( return (previous_token or 1) + 1 def get_url_params( - self, context: Optional[dict], next_page_token: Optional[Any] + self, context: Optional[Dict], next_page_token: Optional[Any] ) -> Dict[str, Any]: """Return a dictionary of values to be used in URL parameterization.""" params: dict = {"per_page": self.MAX_PER_PAGE} @@ -261,7 +261,7 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: yield from extract_jsonpath(self.query_jsonpath, input=resp_json) def get_url_params( - self, context: Optional[dict], next_page_token: Optional[Any] + self, context: Optional[Dict], next_page_token: Optional[Any] ) -> Dict[str, Any]: """Return a dictionary of values to be used in URL parameterization.""" params = context or dict() diff --git a/tap_github/organization_streams.py b/tap_github/organization_streams.py new file mode 100644 index 00000000..37ceee65 --- /dev/null +++ b/tap_github/organization_streams.py @@ -0,0 +1,164 @@ +"""User Stream types classes for tap-github.""" + +from typing import Dict, List, Optional, Iterable, Any + +from singer_sdk import typing as th # JSON Schema typing helpers + +from tap_github.client import GitHubRestStream + + +class OrganizationStream(GitHubRestStream): + """Defines a GitHub Organization Stream. + API Reference: https://docs.github.com/en/rest/reference/orgs#get-an-organization + """ + + name = "organizations" + path = "/orgs/{org}" + + @property + def partitions(self) -> Optional[List[Dict]]: + return [{"org": org} for org in self.config["organizations"]] + + def get_child_context(self, record: Dict, context: Optional[Dict]) -> dict: + return { + "org": record["login"], + } + + def get_records(self, context: Optional[Dict]) -> Iterable[Dict[str, Any]]: + """ + Override the parent method to allow skipping API calls + if the stream is deselected and skip_parent_streams is True in config. + This allows running the tap with fewer API calls and preserving + quota when only syncing a child stream. Without this, + the API call is sent but data is discarded. + """ + if ( + not self.selected + and "skip_parent_streams" in self.config + and self.config["skip_parent_streams"] + and context is not None + ): + # build a minimal mock record so that self._sync_records + # can proceed with child streams + yield { + "org": context["org"], + } + else: + yield from super().get_records(context) + + schema = th.PropertiesList( + th.Property("login", th.StringType), + th.Property("id", th.IntegerType), + th.Property("node_id", th.StringType), + th.Property("url", th.StringType), + th.Property("repos_url", th.StringType), + th.Property("events_url", th.StringType), + th.Property("hooks_url", th.StringType), + th.Property("issues_url", th.StringType), + th.Property("members_url", th.StringType), + th.Property("public_members_url", th.StringType), + th.Property("avatar_url", th.StringType), + th.Property("description", th.StringType), + ).to_dict() + + +class TeamsStream(GitHubRestStream): + """ + API Reference: https://docs.github.com/en/rest/reference/teams#list-teams + """ + + name = "teams" + primary_keys = ["id"] + path = "/orgs/{org}/teams" + ignore_parent_replication_key = True + parent_stream_type = OrganizationStream + state_partitioning_keys = ["org"] + + def get_child_context(self, record: Dict, context: Optional[Dict]) -> dict: + new_context = {"team_slug": record["slug"]} + if context: + return { + **context, + **new_context, + } + return new_context + + schema = th.PropertiesList( + # Parent Keys + th.Property("org", th.StringType), + # Rest + th.Property("id", th.IntegerType), + th.Property("node_id", th.StringType), + th.Property("url", th.StringType), + th.Property("html_url", th.StringType), + th.Property("name", th.StringType), + th.Property("slug", th.StringType), + th.Property("description", th.StringType), + th.Property("privacy", th.StringType), + th.Property("permission", th.StringType), + th.Property("members_url", th.StringType), + th.Property("repositories_url", th.StringType), + th.Property("parent", th.StringType), + ).to_dict() + + +class TeamMembersStream(GitHubRestStream): + """ + API Reference: https://docs.github.com/en/rest/reference/teams#list-team-members + """ + + name = "team_members" + primary_keys = ["id"] + path = "/orgs/{org}/teams/{team_slug}/members" + ignore_parent_replication_key = True + parent_stream_type = TeamsStream + state_partitioning_keys = ["team_slug", "org"] + + def get_child_context(self, record: Dict, context: Optional[Dict]) -> dict: + new_context = {"username": record["login"]} + if context: + return { + **context, + **new_context, + } + return new_context + + schema = th.PropertiesList( + # Parent keys + th.Property("org", th.StringType), + th.Property("team_slug", th.StringType), + # Rest + th.Property("login", th.StringType), + th.Property("id", th.IntegerType), + th.Property("node_id", th.StringType), + th.Property("avatar_url", th.StringType), + th.Property("gravatar_id", th.StringType), + th.Property("url", th.StringType), + th.Property("html_url", th.StringType), + th.Property("type", th.StringType), + th.Property("site_admin", th.BooleanType), + ).to_dict() + + +class TeamRolesStream(GitHubRestStream): + """ + API Reference: https://docs.github.com/en/rest/reference/teams#get-team-membership-for-a-user + """ + + name = "team_roles" + path = "/orgs/{org}/teams/{team_slug}/memberships/{username}" + ignore_parent_replication_key = True + primary_keys = ["url"] + parent_stream_type = TeamMembersStream + state_partitioning_keys = ["username", "team_slug", "org"] + + schema = th.PropertiesList( + # Parent keys + th.Property("org", th.StringType), + th.Property("team_slug", th.StringType), + th.Property("username", th.StringType), + # Rest + th.Property("url", th.StringType), + th.Property("role", th.StringType), + th.Property("state", th.StringType), + ).to_dict() diff --git a/tap_github/repository_streams.py b/tap_github/repository_streams.py index bfe318ba..4d3235e5 100644 --- a/tap_github/repository_streams.py +++ b/tap_github/repository_streams.py @@ -21,7 +21,7 @@ class RepositoryStream(GitHubRestStream): replication_key = "updated_at" def get_url_params( - self, context: Optional[dict], next_page_token: Optional[Any] + self, context: Optional[Dict], next_page_token: Optional[Any] ) -> Dict[str, Any]: """Return a dictionary of values to be used in URL parameterization.""" assert context is not None, f"Context cannot be empty for '{self.name}' stream." @@ -66,7 +66,7 @@ def partitions(self) -> Optional[List[Dict]]: return [{"org": org} for org in self.config["organizations"]] return None - def get_child_context(self, record: dict, context: Optional[dict]) -> dict: + def get_child_context(self, record: Dict, context: Optional[Dict]) -> dict: """Return a child context object from the record and optional provided context. By default, will return context if provided and otherwise the record dict. @@ -78,7 +78,7 @@ def get_child_context(self, record: dict, context: Optional[dict]) -> dict: "repo": record["name"], } - def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]: + def get_records(self, context: Optional[Dict]) -> Iterable[Dict[str, Any]]: """ Override the parent method to allow skipping API calls if the stream is deselected and skip_parent_streams is True in config. @@ -373,7 +373,7 @@ class EventsStream(GitHubRestStream): # GitHub is missing the "since" parameter on this endpoint. missing_since_parameter = True - def get_records(self, context: Optional[dict] = None) -> Iterable[Dict[str, Any]]: + def get_records(self, context: Optional[Dict] = None) -> Iterable[Dict[str, Any]]: """Return a generator of row-type dictionary objects. Each row emitted should be a dictionary of property names to their values. """ @@ -383,7 +383,7 @@ def get_records(self, context: Optional[dict] = None) -> Iterable[Dict[str, Any] return super().get_records(context) - def post_process(self, row: dict, context: Optional[dict] = None) -> dict: + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: # TODO - We should think about the best approach to handle this. An alternative would be to # do a 'dumb' tap that just keeps the same schemas as GitHub without renaming these # objects to "target_". They are worth keeping, however, as they can be different from @@ -393,7 +393,7 @@ def post_process(self, row: dict, context: Optional[dict] = None) -> dict: return row schema = th.PropertiesList( - th.Property("id", th.IntegerType), + th.Property("id", th.StringType), th.Property("type", th.StringType), th.Property("repo", th.StringType), th.Property("org", th.StringType), @@ -408,14 +408,14 @@ def post_process(self, row: dict, context: Optional[dict] = None) -> dict: th.Property( "target_repo", th.ObjectType( - th.Property("id", th.StringType), + th.Property("id", th.IntegerType), th.Property("name", th.StringType), ), ), th.Property( "target_org", th.ObjectType( - th.Property("id", th.StringType), + th.Property("id", th.IntegerType), th.Property("login", th.StringType), ), ), @@ -523,6 +523,132 @@ def post_process(self, row: dict, context: Optional[dict] = None) -> dict: ).to_dict() +class MilestonesStream(GitHubRestStream): + name = "milestones" + path = "/repos/{org}/{repo}/milestones" + primary_keys = ["id"] + replication_key = "updated_at" + parent_stream_type = RepositoryStream + state_partitioning_keys = ["repo", "org"] + ignore_parent_replication_key = True + + schema = th.PropertiesList( + # Parent Keys + th.Property("repo", th.StringType), + th.Property("org", th.StringType), + # Rest + th.Property("url", th.StringType), + th.Property("html_url", th.StringType), + th.Property("labels_url", th.StringType), + th.Property("id", th.IntegerType), + th.Property("node_id", th.StringType), + th.Property("number", th.IntegerType), + th.Property("state", th.StringType), + th.Property("title", th.StringType), + th.Property("description", th.StringType), + th.Property( + "creator", + th.ObjectType( + th.Property("login", th.StringType), + th.Property("id", th.IntegerType), + th.Property("node_id", th.StringType), + th.Property("avatar_url", th.StringType), + th.Property("gravatar_id", th.StringType), + th.Property("url", th.StringType), + th.Property("html_url", th.StringType), + th.Property("type", th.StringType), + th.Property("site_admin", th.BooleanType), + ), + ), + th.Property("open_issues", th.IntegerType), + th.Property("closed_issues", th.IntegerType), + th.Property("created_at", th.DateTimeType), + th.Property("updated_at", th.DateTimeType), + th.Property("closed_at", th.DateTimeType), + th.Property("due_on", th.StringType), + ).to_dict() + + +class ReleasesStream(GitHubRestStream): + name = "releases" + path = "/repos/{org}/{repo}/releases" + ignore_parent_replication_key = True + primary_keys = ["id"] + parent_stream_type = RepositoryStream + state_partitioning_keys = ["repo", "org"] + replication_key = "published_at" + + schema = th.PropertiesList( + # Parent keys + th.Property("repo", th.StringType), + th.Property("org", th.StringType), + # Rest + th.Property("url", th.StringType), + th.Property("html_url", th.StringType), + th.Property("assets_url", th.StringType), + th.Property("upload_url", th.StringType), + th.Property("tarball_url", th.StringType), + th.Property("zipball_url", th.StringType), + th.Property("id", th.IntegerType), + th.Property("node_id", th.StringType), + th.Property("tag_name", th.StringType), + th.Property("target_commitish", th.StringType), + th.Property("name", th.StringType), + th.Property("body", th.StringType), + th.Property("draft", th.BooleanType), + th.Property("prerelease", th.BooleanType), + th.Property("created_at", th.DateTimeType), + th.Property("published_at", th.DateTimeType), + th.Property( + "author", + th.ObjectType( + th.Property("login", th.StringType), + th.Property("id", th.IntegerType), + th.Property("node_id", th.StringType), + th.Property("avatar_url", th.StringType), + th.Property("gravatar_id", th.StringType), + th.Property("url", th.StringType), + th.Property("html_url", th.StringType), + th.Property("type", th.StringType), + th.Property("site_admin", th.BooleanType), + ), + ), + th.Property( + "assets", + th.ArrayType( + th.ObjectType( + th.Property("url", th.StringType), + th.Property("browser_download_url", th.StringType), + th.Property("id", th.IntegerType), + th.Property("node_id", th.StringType), + th.Property("name", th.StringType), + th.Property("label", th.StringType), + th.Property("state", th.StringType), + th.Property("content_type", th.StringType), + th.Property("size", th.IntegerType), + th.Property("download_count", th.IntegerType), + th.Property("created_at", th.DateTimeType), + th.Property("updated_at", th.DateTimeType), + th.Property( + "uploader", + th.ObjectType( + th.Property("login", th.StringType), + th.Property("id", th.IntegerType), + th.Property("node_id", th.StringType), + th.Property("avatar_url", th.StringType), + th.Property("gravatar_id", th.StringType), + th.Property("url", th.StringType), + th.Property("html_url", th.StringType), + th.Property("type", th.StringType), + th.Property("site_admin", th.BooleanType), + ), + ), + ) + ), + ), + ).to_dict() + + class LanguagesStream(GitHubRestStream): name = "languages" path = "/repos/{org}/{repo}/languages" @@ -551,6 +677,69 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: ).to_dict() +class CollaboratorsStream(GitHubRestStream): + name = "collaborators" + path = "/repos/{org}/{repo}/collaborators" + primary_keys = ["id"] + parent_stream_type = RepositoryStream + ignore_parent_replication_key = True + state_partitioning_keys = ["repo", "org"] + + schema = th.PropertiesList( + # Parent Keys + th.Property("repo", th.StringType), + th.Property("org", th.StringType), + # Rest + th.Property("login", th.StringType), + th.Property("id", th.IntegerType), + th.Property("node_id", th.StringType), + th.Property("avatar_url", th.StringType), + th.Property("gravatar_id", th.StringType), + th.Property("url", th.StringType), + th.Property("html_url", th.StringType), + th.Property("type", th.StringType), + th.Property("site_admin", th.BooleanType), + th.Property( + "permissions", + th.ObjectType( + th.Property("pull", th.BooleanType), + th.Property("triage", th.BooleanType), + th.Property("push", th.BooleanType), + th.Property("maintain", th.BooleanType), + th.Property("admin", th.BooleanType), + ), + ), + th.Property("role_name", th.StringType), + ).to_dict() + + +class AssigneesStream(GitHubRestStream): + """Defines 'Assignees' stream which returns possible assignees for issues/prs following GitHub's API convention.""" + + name = "assignees" + path = "/repos/{org}/{repo}/assignees" + primary_keys = ["id"] + parent_stream_type = RepositoryStream + ignore_parent_replication_key = True + state_partitioning_keys = ["repo", "org"] + + schema = th.PropertiesList( + # Parent keys + th.Property("repo", th.StringType), + th.Property("org", th.StringType), + # Rest + th.Property("login", th.StringType), + th.Property("id", th.IntegerType), + th.Property("node_id", th.StringType), + th.Property("avatar_url", th.StringType), + th.Property("gravatar_id", th.StringType), + th.Property("url", th.StringType), + th.Property("html_url", th.StringType), + th.Property("type", th.StringType), + th.Property("site_admin", th.BooleanType), + ).to_dict() + + class IssuesStream(GitHubRestStream): """Defines 'Issues' stream which returns Issues and PRs following GitHub's API convention.""" @@ -563,7 +752,7 @@ class IssuesStream(GitHubRestStream): state_partitioning_keys = ["repo", "org"] def get_url_params( - self, context: Optional[dict], next_page_token: Optional[Any] + self, context: Optional[Dict], next_page_token: Optional[Any] ) -> Dict[str, Any]: """Return a dictionary of values to be used in URL parameterization.""" assert context is not None, f"Context cannot be empty for '{self.name}' stream." @@ -592,7 +781,7 @@ def http_headers(self) -> dict: headers["Accept"] = "application/vnd.github.squirrel-girl-preview" return headers - def post_process(self, row: dict, context: Optional[dict] = None) -> dict: + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: row["type"] = "pull_request" if "pull_request" in row else "issue" if row["body"] is not None: # some issue bodies include control characters such as \x00 @@ -755,7 +944,7 @@ class IssueCommentsStream(GitHubRestStream): # we have gaps in our data tolerated_http_errors = [502] - def get_records(self, context: Optional[dict] = None) -> Iterable[Dict[str, Any]]: + def get_records(self, context: Optional[Dict] = None) -> Iterable[Dict[str, Any]]: """Return a generator of row-type dictionary objects. Each row emitted should be a dictionary of property names to their values. @@ -766,7 +955,7 @@ def get_records(self, context: Optional[dict] = None) -> Iterable[Dict[str, Any] return super().get_records(context) - def post_process(self, row: dict, context: Optional[dict] = None) -> dict: + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: row["issue_number"] = int(row["issue_url"].split("/")[-1]) if row["body"] is not None: # some comment bodies include control characters such as \x00 @@ -777,6 +966,10 @@ def post_process(self, row: dict, context: Optional[dict] = None) -> dict: return row schema = th.PropertiesList( + # Parent keys + th.Property("repo", th.StringType), + th.Property("org", th.StringType), + # Rest th.Property("id", th.IntegerType), th.Property("node_id", th.StringType), th.Property("issue_number", th.IntegerType), @@ -820,7 +1013,7 @@ class IssueEventsStream(GitHubRestStream): # GitHub is missing the "since" parameter on this endpoint. missing_since_parameter = True - def get_records(self, context: Optional[dict] = None) -> Iterable[Dict[str, Any]]: + def get_records(self, context: Optional[Dict] = None) -> Iterable[Dict[str, Any]]: """Return a generator of row-type dictionary objects. Each row emitted should be a dictionary of property names to their values. @@ -831,7 +1024,7 @@ def get_records(self, context: Optional[dict] = None) -> Iterable[Dict[str, Any] return super().get_records(context) - def post_process(self, row: dict, context: Optional[dict] = None) -> dict: + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: row["issue_number"] = int(row["issue"].pop("number")) row["issue_url"] = row["issue"].pop("url") return row @@ -877,7 +1070,7 @@ class CommitsStream(GitHubRestStream): state_partitioning_keys = ["repo", "org"] ignore_parent_replication_key = True - def post_process(self, row: dict, context: Optional[dict] = None) -> dict: + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: """ Add a timestamp top-level field to be used as state replication key. It's not clear from github's API docs which time (author or committer) @@ -965,6 +1158,49 @@ def post_process(self, row: dict, context: Optional[dict] = None) -> dict: ).to_dict() +class CommitCommentsStream(GitHubRestStream): + name = "commit_comments" + path = "/repos/{org}/{repo}/comments" + primary_keys = ["id"] + replication_key = "updated_at" + parent_stream_type = RepositoryStream + state_partitioning_keys = ["repo", "org"] + ignore_parent_replication_key = True + + schema = th.PropertiesList( + # Parent keys + th.Property("repo", th.StringType), + th.Property("org", th.StringType), + # Rest + th.Property("html_url", th.StringType), + th.Property("url", th.StringType), + th.Property("id", th.IntegerType), + th.Property("node_id", th.StringType), + th.Property("body", th.StringType), + th.Property("path", th.StringType), + th.Property("position", th.IntegerType), + th.Property("line", th.IntegerType), + th.Property("commit_id", th.StringType), + th.Property( + "user", + th.ObjectType( + th.Property("login", th.StringType), + th.Property("id", th.IntegerType), + th.Property("node_id", th.StringType), + th.Property("avatar_url", th.StringType), + th.Property("gravatar_id", th.StringType), + th.Property("url", th.StringType), + th.Property("html_url", th.StringType), + th.Property("type", th.StringType), + th.Property("site_admin", th.BooleanType), + ), + ), + th.Property("created_at", th.DateTimeType), + th.Property("updated_at", th.DateTimeType), + th.Property("author_association", th.StringType), + ).to_dict() + + class PullRequestsStream(GitHubRestStream): """Defines 'PullRequests' stream.""" @@ -979,7 +1215,7 @@ class PullRequestsStream(GitHubRestStream): missing_since_parameter = True def get_url_params( - self, context: Optional[dict], next_page_token: Optional[Any] + self, context: Optional[Dict], next_page_token: Optional[Any] ) -> Dict[str, Any]: """Return a dictionary of values to be used in URL parameterization.""" assert context is not None, f"Context cannot be empty for '{self.name}' stream." @@ -999,7 +1235,7 @@ def http_headers(self) -> dict: headers["Accept"] = "application/vnd.github.squirrel-girl-preview" return headers - def post_process(self, row: dict, context: Optional[dict] = None) -> dict: + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: if row["body"] is not None: # some pr bodies include control characters such as \x00 # that some targets (such as postgresql) choke on. This ensures @@ -1012,6 +1248,19 @@ def post_process(self, row: dict, context: Optional[dict] = None) -> dict: row["minus_one"] = row.pop("-1", None) return row + def get_child_context(self, record: Dict, context: Optional[Dict]) -> dict: + if context: + return { + "org": context["org"], + "repo": context["repo"], + "pull_number": record["number"], + } + return { + "pull_number": record["number"], + "org": record["base"]["user"]["login"], + "repo": record["base"]["repo"]["name"], + } + schema = th.PropertiesList( # Parent keys th.Property("repo", th.StringType), @@ -1232,6 +1481,216 @@ def post_process(self, row: dict, context: Optional[dict] = None) -> dict: ).to_dict() +class PullRequestCommits(GitHubRestStream): + name = "pull_request_commits" + path = "/repos/{org}/{repo}/pulls/{pull_number}/commits" + ignore_parent_replication_key = False + primary_keys = ["node_id"] + parent_stream_type = PullRequestsStream + state_partitioning_keys = ["repo", "org"] + + schema = th.PropertiesList( + # Parent keys + th.Property("org", th.StringType), + th.Property("repo", th.StringType), + th.Property("pull_number", th.IntegerType), + # Rest + th.Property("url", th.StringType), + th.Property("sha", th.StringType), + th.Property("node_id", th.StringType), + th.Property("html_url", th.StringType), + th.Property("comments_url", th.StringType), + th.Property( + "commit", + th.ObjectType( + th.Property("url", th.StringType), + th.Property( + "author", + th.ObjectType( + th.Property("name", th.StringType), + th.Property("email", th.StringType), + th.Property("date", th.StringType), + ), + ), + th.Property( + "committer", + th.ObjectType( + th.Property("name", th.StringType), + th.Property("email", th.StringType), + th.Property("date", th.StringType), + ), + ), + th.Property("message", th.StringType), + th.Property( + "tree", + th.ObjectType( + th.Property("url", th.StringType), + th.Property("sha", th.StringType), + ), + ), + th.Property("comment_count", th.IntegerType), + th.Property( + "verification", + th.ObjectType( + th.Property("verified", th.BooleanType), + th.Property("reason", th.StringType), + th.Property("signature", th.StringType), + th.Property("payload", th.StringType), + ), + ), + ), + ), + th.Property( + "author", + th.ObjectType( + th.Property("login", th.StringType), + th.Property("id", th.IntegerType), + th.Property("node_id", th.StringType), + th.Property("avatar_url", th.StringType), + th.Property("gravatar_id", th.StringType), + th.Property("url", th.StringType), + th.Property("html_url", th.StringType), + th.Property("type", th.StringType), + th.Property("site_admin", th.BooleanType), + ), + ), + th.Property( + "committer", + th.ObjectType( + th.Property("login", th.StringType), + th.Property("id", th.IntegerType), + th.Property("node_id", th.StringType), + th.Property("avatar_url", th.StringType), + th.Property("gravatar_id", th.StringType), + th.Property("url", th.StringType), + th.Property("html_url", th.StringType), + th.Property("type", th.StringType), + th.Property("site_admin", th.BooleanType), + ), + ), + th.Property( + "parents", + th.ArrayType( + th.ObjectType( + th.Property("url", th.StringType), th.Property("sha", th.StringType) + ) + ), + ), + ).to_dict() + + +class ReviewsStream(GitHubRestStream): + name = "reviews" + path = "/repos/{org}/{repo}/pulls/{pull_number}/reviews" + primary_keys = ["id"] + parent_stream_type = PullRequestsStream + ignore_parent_replication_key = False + state_partitioning_keys = ["repo", "org"] + + schema = th.PropertiesList( + # Parent keys + th.Property("pull_number", th.IntegerType), + th.Property("org", th.StringType), + th.Property("repo", th.StringType), + # Rest + th.Property("id", th.IntegerType), + th.Property("node_id", th.StringType), + th.Property( + "user", + th.ObjectType( + th.Property("login", th.StringType), + th.Property("id", th.IntegerType), + th.Property("node_id", th.StringType), + th.Property("avatar_url", th.StringType), + th.Property("gravatar_id", th.StringType), + th.Property("url", th.StringType), + th.Property("html_url", th.StringType), + th.Property("type", th.StringType), + th.Property("site_admin", th.BooleanType), + ), + ), + th.Property("body", th.StringType), + th.Property("state", th.StringType), + th.Property("html_url", th.StringType), + th.Property("pull_request_url", th.StringType), + th.Property( + "_links", + th.ObjectType( + th.Property("html", th.ObjectType(th.Property("href", th.StringType))), + th.Property( + "pull_request", th.ObjectType(th.Property("href", th.StringType)) + ), + ), + ), + th.Property("submitted_at", th.DateTimeType), + th.Property("commit_id", th.StringType), + th.Property("author_association", th.StringType), + ).to_dict() + + +class ReviewCommentsStream(GitHubRestStream): + name = "review_comments" + path = "/repos/{org}/{repo}/pulls/comments" + primary_keys = ["id"] + parent_stream_type = RepositoryStream + ignore_parent_replication_key = True + state_partitioning_keys = ["repo", "org"] + + schema = th.PropertiesList( + # Parent keys + th.Property("org", th.StringType), + th.Property("repo", th.StringType), + # Rest + th.Property("url", th.StringType), + th.Property("pull_request_review_id", th.IntegerType), + th.Property("id", th.IntegerType), + th.Property("node_id", th.StringType), + th.Property("diff_hunk", th.StringType), + th.Property("path", th.StringType), + th.Property("position", th.IntegerType), + th.Property("original_position", th.IntegerType), + th.Property("commit_id", th.StringType), + th.Property("original_commit_id", th.StringType), + th.Property("in_reply_to_id", th.IntegerType), + th.Property( + "user", + th.ObjectType( + th.Property("login", th.StringType), + th.Property("id", th.IntegerType), + th.Property("node_id", th.StringType), + th.Property("avatar_url", th.StringType), + th.Property("gravatar_id", th.StringType), + th.Property("url", th.StringType), + th.Property("html_url", th.StringType), + th.Property("type", th.StringType), + th.Property("site_admin", th.BooleanType), + ), + ), + th.Property("body", th.StringType), + th.Property("created_at", th.DateTimeType), + th.Property("updated_at", th.DateTimeType), + th.Property("html_url", th.StringType), + th.Property("pull_request_url", th.StringType), + th.Property("author_association", th.StringType), + th.Property( + "_links", + th.ObjectType( + th.Property("self", th.ObjectType(th.Property("href", th.StringType))), + th.Property("html", th.ObjectType(th.Property("href", th.StringType))), + th.Property( + "pull_request", th.ObjectType(th.Property("href", th.StringType)) + ), + ), + ), + th.Property("start_line", th.IntegerType), + th.Property("original_start_line", th.IntegerType), + th.Property("start_side", th.StringType), + th.Property("line", th.IntegerType), + th.Property("original_line", th.IntegerType), + th.Property("side", th.StringType), + ).to_dict() + + class ContributorsStream(GitHubRestStream): """Defines 'Contributors' stream. Fetching User & Bot contributors.""" @@ -1254,15 +1713,6 @@ class ContributorsStream(GitHubRestStream): th.Property("gravatar_id", th.StringType), th.Property("url", th.StringType), th.Property("html_url", th.StringType), - th.Property("followers_url", th.StringType), - th.Property("following_url", th.StringType), - th.Property("gists_url", th.StringType), - th.Property("starred_url", th.StringType), - th.Property("subscriptions_url", th.StringType), - th.Property("organizations_url", th.StringType), - th.Property("repos_url", th.StringType), - th.Property("events_url", th.StringType), - th.Property("received_events_url", th.StringType), th.Property("type", th.StringType), th.Property("site_admin", th.BooleanType), th.Property("contributions", th.IntegerType), @@ -1280,7 +1730,7 @@ class AnonymousContributorsStream(GitHubRestStream): state_partitioning_keys = ["repo", "org"] def get_url_params( - self, context: Optional[dict], next_page_token: Optional[Any] + self, context: Optional[Dict], next_page_token: Optional[Any] ) -> Dict[str, Any]: """Return a dictionary of values to be used in URL parameterization.""" assert context is not None, f"Context cannot be empty for '{self.name}' stream." @@ -1329,7 +1779,7 @@ def http_headers(self) -> dict: headers["Accept"] = "application/vnd.github.v3.star+json" return headers - def post_process(self, row: dict, context: Optional[dict] = None) -> dict: + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: """ Add a user_id top-level field to be used as state replication key. """ @@ -1419,6 +1869,124 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: ).to_dict() +class ProjectsStream(GitHubRestStream): + name = "projects" + path = "/repos/{org}/{repo}/projects" + ignore_parent_replication_key = True + replication_key = "updated_at" + primary_keys = ["id"] + parent_stream_type = RepositoryStream + state_partitioning_keys = ["repo", "org"] + + def get_child_context(self, record: Dict, context: Optional[Dict]) -> dict: + return {"project_id": record["id"]} + + schema = th.PropertiesList( + # Parent keys + th.Property("repo", th.StringType), + th.Property("org", th.StringType), + # Rest + th.Property("owner_url", th.StringType), + th.Property("url", th.StringType), + th.Property("html_url", th.StringType), + th.Property("columns_url", th.StringType), + th.Property("id", th.IntegerType), + th.Property("node_id", th.StringType), + th.Property("name", th.StringType), + th.Property("body", th.StringType), + th.Property("number", th.IntegerType), + th.Property("state", th.StringType), + th.Property( + "creator", + th.ObjectType( + th.Property("login", th.StringType), + th.Property("id", th.IntegerType), + th.Property("node_id", th.StringType), + th.Property("avatar_url", th.StringType), + th.Property("gravatar_id", th.StringType), + th.Property("url", th.StringType), + th.Property("html_url", th.StringType), + th.Property("type", th.StringType), + th.Property("site_admin", th.BooleanType), + ), + ), + th.Property("created_at", th.DateTimeType), + th.Property("updated_at", th.DateTimeType), + ).to_dict() + + +class ProjectColumnsStream(GitHubRestStream): + name = "project_columns" + path = "/projects/{project_id}/columns" + ignore_parent_replication_key = True + replication_key = "updated_at" + primary_keys = ["id"] + parent_stream_type = ProjectsStream + state_partitioning_keys = ["project_id", "repo", "org"] + + def get_child_context(self, record: Dict, context: Optional[Dict]) -> dict: + return {"column_id": record["id"]} + + schema = th.PropertiesList( + # Parent Keys + th.Property("repo", th.StringType), + th.Property("org", th.StringType), + th.Property("project_id", th.IntegerType), + # Rest + th.Property("url", th.StringType), + th.Property("project_url", th.StringType), + th.Property("cards_url", th.StringType), + th.Property("id", th.IntegerType), + th.Property("node_id", th.StringType), + th.Property("name", th.StringType), + th.Property("created_at", th.DateTimeType), + th.Property("updated_at", th.DateTimeType), + ).to_dict() + + +class ProjectCardsStream(GitHubRestStream): + name = "project_cards" + path = "/projects/columns/{column_id}/cards" + ignore_parent_replication_key = True + replication_key = "updated_at" + primary_keys = ["id"] + parent_stream_type = ProjectColumnsStream + state_partitioning_keys = ["project_id", "repo", "org"] + + schema = th.PropertiesList( + # Parent Keys + th.Property("repo", th.StringType), + th.Property("org", th.StringType), + th.Property("project_id", th.IntegerType), + th.Property("column_id", th.IntegerType), + # Properties + th.Property("url", th.StringType), + th.Property("id", th.IntegerType), + th.Property("node_id", th.StringType), + th.Property("note", th.StringType), + th.Property( + "creator", + th.ObjectType( + th.Property("login", th.StringType), + th.Property("id", th.IntegerType), + th.Property("node_id", th.StringType), + th.Property("avatar_url", th.StringType), + th.Property("gravatar_id", th.StringType), + th.Property("url", th.StringType), + th.Property("html_url", th.StringType), + th.Property("type", th.StringType), + th.Property("site_admin", th.BooleanType), + ), + ), + th.Property("created_at", th.DateTimeType), + th.Property("updated_at", th.DateTimeType), + th.Property("archived", th.BooleanType), + th.Property("column_url", th.StringType), + th.Property("content_url", th.StringType), + th.Property("project_url", th.StringType), + ).to_dict() + + class WorkflowsStream(GitHubRestStream): """Defines 'workflows' stream.""" diff --git a/tap_github/streams.py b/tap_github/streams.py new file mode 100644 index 00000000..2ae5df57 --- /dev/null +++ b/tap_github/streams.py @@ -0,0 +1,111 @@ +from enum import Enum +from typing import Type, Set, List + +from singer_sdk.streams.core import Stream + +from tap_github.repository_streams import ( + AnonymousContributorsStream, + CommitsStream, + CommunityProfileStream, + ContributorsStream, + EventsStream, + IssueCommentsStream, + IssueEventsStream, + IssuesStream, + LanguagesStream, + PullRequestsStream, + ReadmeHtmlStream, + ReadmeStream, + RepositoryStream, + StargazersStream, + StatsContributorsStream, + AssigneesStream, + CollaboratorsStream, + ReviewsStream, + ReviewCommentsStream, + ProjectsStream, + ProjectColumnsStream, + ProjectCardsStream, + PullRequestCommits, + MilestonesStream, + CommitCommentsStream, + ReleasesStream, + WorkflowsStream, + WorkflowRunJobsStream, + WorkflowRunsStream, +) +from tap_github.user_streams import ( + StarredStream, + UserContributedToStream, + UserStream, +) +from tap_github.organization_streams import ( + OrganizationStream, + TeamsStream, + TeamMembersStream, + TeamRolesStream, +) + + +class Streams(Enum): + """ + Represents all streams our tap supports, and which queries (by username, by organization, etc.) you can use. + """ + + valid_queries: Set[str] + streams: List[Type[Stream]] + + def __init__(self, valid_queries: Set[str], streams: List[Type[Stream]]): + self.valid_queries = valid_queries + self.streams = streams + + REPOSITORY = ( + {"repositories", "organizations", "searches"}, + [ + AnonymousContributorsStream, + CommitsStream, + CommitCommentsStream, + CommunityProfileStream, + ContributorsStream, + EventsStream, + MilestonesStream, + ReleasesStream, + CollaboratorsStream, + AssigneesStream, + IssuesStream, + IssueCommentsStream, + IssueEventsStream, + LanguagesStream, + PullRequestsStream, + PullRequestCommits, + ReviewsStream, + ReviewCommentsStream, + ReadmeHtmlStream, + ReadmeStream, + RepositoryStream, + StargazersStream, + StatsContributorsStream, + ProjectsStream, + ProjectColumnsStream, + ProjectCardsStream, + WorkflowsStream, + WorkflowRunJobsStream, + WorkflowRunsStream, + ], + ) + USERS = ( + {"user_usernames", "user_ids"}, + [ + StarredStream, + UserContributedToStream, + UserStream, + ], + ) + ORGANIZATIONS = ( + {"organizations"}, + [OrganizationStream, TeamsStream, TeamMembersStream, TeamRolesStream], + ) + + @classmethod + def all_valid_queries(cls): + return set.union(*[stream.valid_queries for stream in Streams]) diff --git a/tap_github/tap.py b/tap_github/tap.py index b0eefb35..ed8ab64a 100644 --- a/tap_github/tap.py +++ b/tap_github/tap.py @@ -5,31 +5,7 @@ from singer_sdk import Stream, Tap from singer_sdk import typing as th # JSON schema typing helpers -from tap_github.repository_streams import ( - AnonymousContributorsStream, - CommitsStream, - CommunityProfileStream, - ContributorsStream, - EventsStream, - IssueCommentsStream, - IssueEventsStream, - IssuesStream, - LanguagesStream, - PullRequestsStream, - ReadmeHtmlStream, - ReadmeStream, - RepositoryStream, - StargazersStream, - StatsContributorsStream, - WorkflowsStream, - WorkflowRunJobsStream, - WorkflowRunsStream, -) -from tap_github.user_streams import ( - StarredStream, - UserContributedToStream, - UserStream, -) +from tap_github.streams import Streams class TapGitHub(Tap): @@ -84,43 +60,22 @@ class TapGitHub(Tap): def discover_streams(self) -> List[Stream]: """Return a list of discovered streams for each query.""" - VALID_USER_QUERIES = {"user_usernames", "user_ids"} - VALID_REPO_QUERIES = {"repositories", "organizations", "searches"} - VALID_QUERIES = VALID_REPO_QUERIES.union(VALID_USER_QUERIES) - if len(VALID_QUERIES.intersection(self.config)) != 1: + if len(Streams.all_valid_queries().intersection(self.config)) != 1: raise ValueError( "This tap requires one and only one of the following path options: " - f"{VALID_QUERIES}." + f"{Streams.all_valid_queries()}." ) - is_user_query = len(VALID_USER_QUERIES.intersection(self.config)) > 0 - if is_user_query: - return [ - StarredStream(tap=self), - UserContributedToStream(tap=self), - UserStream(tap=self), - ] - else: - return [ - AnonymousContributorsStream(tap=self), - CommitsStream(tap=self), - CommunityProfileStream(tap=self), - ContributorsStream(tap=self), - EventsStream(tap=self), - IssueCommentsStream(tap=self), - IssueEventsStream(tap=self), - IssuesStream(tap=self), - LanguagesStream(tap=self), - PullRequestsStream(tap=self), - ReadmeHtmlStream(tap=self), - ReadmeStream(tap=self), - RepositoryStream(tap=self), - StargazersStream(tap=self), - StatsContributorsStream(tap=self), - WorkflowsStream(tap=self), - WorkflowRunJobsStream(tap=self), - WorkflowRunsStream(tap=self), - ] + streams = [] + for stream_type in Streams: + if len(stream_type.valid_queries.intersection(self.config)) > 0: + streams += [ + StreamClass(tap=self) for StreamClass in stream_type.streams + ] + + if not streams: + raise ValueError("No valid streams found.") + return streams # CLI Execution: diff --git a/tap_github/tests/fixtures.py b/tap_github/tests/fixtures.py index 4aecd624..1fb7b8b5 100644 --- a/tap_github/tests/fixtures.py +++ b/tap_github/tests/fixtures.py @@ -37,38 +37,55 @@ def repo_list_config(request): @pytest.fixture -def usernames_list_config(request): +def username_list_config(request): """ Get a default list of usernames or pass your own by decorating your test with - @pytest.mark.usernames_list(['ericboucher', 'aaronsteers']) + @pytest.mark.username_list(['ericboucher', 'aaronsteers']) """ - marker = request.node.get_closest_marker("usernames_list") + marker = request.node.get_closest_marker("username_list") if marker is None: - usernames_list = ["ericboucher", "aaronsteers"] + username_list = ["ericboucher", "aaronsteers"] else: - usernames_list = marker.args[0] + username_list = marker.args[0] return { "metrics_log_level": "none", "start_date": datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d"), - "user_usernames": usernames_list, + "user_usernames": username_list, } @pytest.fixture -def user_ids_list_config(request): +def user_id_list_config(request): """ Get a default list of usernames or pass your own by decorating your test with - @pytest.mark.user_ids_list(['ericboucher', 'aaronsteers']) + @pytest.mark.user_id_list(['ericboucher', 'aaronsteers']) """ - marker = request.node.get_closest_marker("user_ids_list") + marker = request.node.get_closest_marker("user_id_list") if marker is None: - user_ids_list = [1, 2] + user_id_list = [1, 2] else: - user_ids_list = marker.args[0] + user_id_list = marker.args[0] return { "metrics_log_level": "none", "start_date": datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d"), - "user_ids": user_ids_list, + "user_ids": user_id_list, + } + + +@pytest.fixture +def organization_list_config(request): + """ + Get a default list of organizations or pass your own by decorating your test with + @pytest.mark.organization_list(['MeltanoLabs', 'oviohub']) + """ + marker = request.node.get_closest_marker("organization_list") + + organization_list = ["MeltanoLabs"] if marker is None else marker.args[0] + + return { + "metrics_log_level": "none", + "start_date": datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d"), + "organizations": organization_list, } diff --git a/tap_github/tests/test_core.py b/tap_github/tests/test_core.py index c86c40e8..edbc527a 100644 --- a/tap_github/tests/test_core.py +++ b/tap_github/tests/test_core.py @@ -1,10 +1,19 @@ """Tests standard tap features using the built-in SDK tests library.""" +import os +import logging + +from unittest import mock from singer_sdk.testing import get_standard_tap_tests from tap_github.tap import TapGitHub -from .fixtures import repo_list_config, search_config, usernames_list_config +from .fixtures import ( + repo_list_config, + search_config, + username_list_config, + organization_list_config, +) # Run standard built-in tap tests from the SDK: @@ -22,11 +31,24 @@ def test_standard_tap_tests_for_repo_list_mode(repo_list_config): test() -def test_standard_tap_tests_for_usernames_list_mode(usernames_list_config): +def test_standard_tap_tests_for_username_list_mode(username_list_config): """Run standard tap tests from the SDK.""" - tests = get_standard_tap_tests(TapGitHub, config=usernames_list_config) + tests = get_standard_tap_tests(TapGitHub, config=username_list_config) for test in tests: test() -# TODO: Create additional tests as appropriate for your tap. +# This token needs to have read:org access for the organization listed in fixtures.py +# Default is "MeltanoLabs" +ORG_LEVEL_TOKEN = os.environ.get("ORG_LEVEL_TOKEN") + + +@mock.patch.dict(os.environ, {"GITHUB_TOKEN": ORG_LEVEL_TOKEN or ""}) +def test_standard_tap_tests_for_organization_list_mode(organization_list_config): + """Run standard tap tests from the SDK.""" + if not ORG_LEVEL_TOKEN: + logging.warning('No "ORG_LEVEL_TOKEN" found. Skipping organization tap tests.') + return + tests = get_standard_tap_tests(TapGitHub, config=organization_list_config) + for test in tests: + test() diff --git a/tap_github/tests/test_tap.py b/tap_github/tests/test_tap.py index 1d29ab62..6ad8e852 100644 --- a/tap_github/tests/test_tap.py +++ b/tap_github/tests/test_tap.py @@ -1,31 +1,56 @@ -import datetime - +import os +import logging import pytest -from singer_sdk.helpers._catalog import ( - deselect_all_streams, - set_catalog_stream_selected, -) + +from unittest.mock import patch from tap_github.tap import TapGitHub from .fixtures import repo_list_config -repo_list_2 = ["octocat/hello-world", "MeltanoLabs/tap-github", "mapswipe/mapswipe"] +repo_list_2 = [ + "MeltanoLabs/tap-github", + "MeltanoLabs/tap-gitlab", + "MeltanoLabs/target-athena", +] @pytest.mark.repo_list(repo_list_2) def test_validate_repo_list_config(repo_list_config): """Verify that the repositories list is parsed correctly""" repo_list_context = [ - {"org": "octocat", "repo": "hello-world"}, - {"org": "MeltanoLabs", "repo": "tap-github"}, - {"org": "mapswipe", "repo": "mapswipe"}, + {"org": repo.split("/")[0], "repo": repo.split("/")[1]} for repo in repo_list_2 ] tap = TapGitHub(config=repo_list_config) partitions = tap.streams["repositories"].partitions assert partitions == repo_list_context +def alternative_sync_chidren(self, child_context: dict) -> None: + """ + Override for Stream._sync_children. + Enabling us to use an ORG_LEVEL_TOKEN for the collaborators sream. + """ + for child_stream in self.child_streams: + # Use org:write access level credentials for collaborators stream + if child_stream.name in ["collaborators"]: + ORG_LEVEL_TOKEN = os.environ.get("ORG_LEVEL_TOKEN") + if not ORG_LEVEL_TOKEN: + logging.warning( + 'No "ORG_LEVEL_TOKEN" found. Skipping collaborators stream sync.' + ) + continue + SAVED_GTHUB_TOKEN = os.environ.get("GITHUB_TOKEN") + os.environ["GITHUB_TOKEN"] = ORG_LEVEL_TOKEN + child_stream.sync(context=child_context) + os.environ["GITHUB_TOKEN"] = SAVED_GTHUB_TOKEN or "" + continue + + # default behavior: + if child_stream.selected or child_stream.has_selected_descendents: + child_stream.sync(context=child_context) + + @pytest.mark.repo_list(repo_list_2) def test_get_a_repository_in_repo_list_mode(capsys, repo_list_config): """ @@ -42,8 +67,11 @@ def test_get_a_repository_in_repo_list_mode(capsys, repo_list_config): # ) # discard previous output to stdout (potentially from other tests) capsys.readouterr() - tap2 = TapGitHub(config=repo_list_config, catalog=catalog) - tap2.sync_all() + with patch( + "singer_sdk.streams.core.Stream._sync_children", alternative_sync_chidren + ): + tap2 = TapGitHub(config=repo_list_config, catalog=catalog) + tap2.sync_all() captured = capsys.readouterr() # Verify we got the right number of records (one per repo in the list) assert captured.out.count('{"type": "RECORD", "stream": "repositories"') == len( diff --git a/tap_github/user_streams.py b/tap_github/user_streams.py index d202f46d..d3fcc1fc 100644 --- a/tap_github/user_streams.py +++ b/tap_github/user_streams.py @@ -30,13 +30,7 @@ def partitions(self) -> Optional[List[Dict]]: return [{"id": id} for id in self.config["user_ids"]] return None - def get_child_context(self, record: dict, context: Optional[dict]) -> dict: - """Return a child context object from the record and optional provided context. - - By default, will return context if provided and otherwise the record dict. - Developers may override this behavior to send specific information to child - streams for context. - """ + def get_child_context(self, record: Dict, context: Optional[Dict]) -> dict: return { "username": record["login"], } @@ -102,7 +96,7 @@ def http_headers(self) -> dict: headers["Accept"] = "application/vnd.github.v3.star+json" return headers - def post_process(self, row: dict, context: Optional[dict] = None) -> dict: + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: """ Add a repo_id top-level field to be used as state replication key. """