diff --git a/.github/workflows/test_tap.yml b/.github/workflows/test_tap.yml index f8e119c0..064e718c 100644 --- a/.github/workflows/test_tap.yml +++ b/.github/workflows/test_tap.yml @@ -1,11 +1,15 @@ name: Test tap-github on: - # Run on all pull requests and on pushes to master. + # Run on all pull requests and on pushes to main. pull_request: push: branches: - - master + - main + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true jobs: tests: @@ -26,13 +30,11 @@ jobs: uses: actions/cache@v2.1.7 with: # must match the path in tests/__init__.py - path: '**/api_calls_tests_cache.sqlite' + path: 'api_calls_tests_cache.sqlite' # github cache expires after 1wk, and we expire the content after 24h # this key should not need to change unless we need to clear the cache key: api-cache-v3 - restore-keys: | - api-cache-v3 - api-cache- + restore-keys: api-cache-v3 - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v2 with: @@ -61,5 +63,13 @@ jobs: run: | poetry run mypy . --ignore-missing-imports - name: Test with pytest + id: test_pytest + continue-on-error: true + run: | + poetry run pytest --capture=no + - name: Test with pytest (run 2) + id: retry_test_pytest + if: steps.test_pytest.outcome=='failure' # check the step outcome, wait and retry run: | + sleep 60m poetry run pytest --capture=no diff --git a/tap_github/client.py b/tap_github/client.py index 0ae8b878..6715976f 100644 --- a/tap_github/client.py +++ b/tap_github/client.py @@ -333,4 +333,9 @@ def get_url_params( params["per_page"] = self.MAX_PER_PAGE if next_page_token: params.update(next_page_token) + + since = self.get_starting_timestamp(context) + if self.replication_key and since: + params["since"] = str(since) + return params diff --git a/tap_github/repository_streams.py b/tap_github/repository_streams.py index c05acafa..5946ccab 100644 --- a/tap_github/repository_streams.py +++ b/tap_github/repository_streams.py @@ -6,6 +6,9 @@ from singer_sdk import typing as th # JSON Schema typing helpers from singer_sdk.helpers.jsonpath import extract_jsonpath +from dateutil.parser import parse +from urllib.parse import parse_qs, urlparse + from tap_github.client import GitHubGraphqlStream, GitHubRestStream from tap_github.schema_objects import ( user_object, @@ -1476,7 +1479,7 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: class StargazersStream(GitHubRestStream): """Defines 'Stargazers' stream. Warning: this stream does NOT track star deletions.""" - name = "stargazers" + name = "stargazers_rest" path = "/repos/{org}/{repo}/stargazers" primary_keys = ["user_id", "repo", "org"] parent_stream_type = RepositoryStream @@ -1485,6 +1488,13 @@ class StargazersStream(GitHubRestStream): # GitHub is missing the "since" parameter on this endpoint. missing_since_parameter = True + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + # TODO - remove warning with next release. + self.logger.warning( + "The stream 'stargazers_rest' is deprecated. Please use the Graphql version instead: 'stargazers'." + ) + @property def http_headers(self) -> dict: """Return the http headers needed. @@ -1517,6 +1527,104 @@ def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: ).to_dict() +class StargazersGraphqlStream(GitHubGraphqlStream): + """Defines 'UserContributedToStream' stream. Warning: this stream 'only' gets the first 100 projects (by stars).""" + + name = "stargazers" + query_jsonpath = "$.data.repository.stargazers.edges.[*]" + primary_keys = ["user_id", "repo_id"] + replication_key = "starred_at" + parent_stream_type = RepositoryStream + state_partitioning_keys = ["repo_id"] + # The parent repository object changes if the number of stargazers changes. + ignore_parent_replication_key = False + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + # TODO - remove warning with next release. + self.logger.warning( + "This stream 'stargazers' might conflict with previous implementation. " + "Looking for the older version? Use 'stargazers_rest'." + ) + + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: + """ + Add a user_id top-level field to be used as state replication key. + """ + row["user_id"] = row["user"]["id"] + if context is not None: + row["repo_id"] = context["repo_id"] + return row + + def get_next_page_token( + self, response: requests.Response, previous_token: Optional[Any] + ) -> Optional[Any]: + """ + Exit early if a since parameter is provided. + """ + request_parameters = parse_qs(str(urlparse(response.request.url).query)) + + # parse_qs interprets "+" as a space, revert this to keep an aware datetime + try: + since = ( + request_parameters["since"][0].replace(" ", "+") + if "since" in request_parameters + else "" + ) + except IndexError: + since = "" + + # If since parameter is present, try to exit early by looking at the last "starred_at". + # Noting that we are traversing in DESCENDING order by STARRED_AT. + if since: + results = extract_jsonpath(self.query_jsonpath, input=response.json()) + *_, last = results + if parse(last["starred_at"]) < parse(since): + return None + return super().get_next_page_token(response, previous_token) + + @property + def query(self) -> str: + """Return dynamic GraphQL query.""" + # Graphql id is equivalent to REST node_id. To keep the tap consistent, we rename "id" to "node_id". + return """ + query repositoryStargazers($repo: String! $org: String! $nextPageCursor_0: String) { + repository(name: $repo owner: $org) { + stargazers(first: 100 orderBy: {field: STARRED_AT direction: DESC} after: $nextPageCursor_0) { + pageInfo { + hasNextPage_0: hasNextPage + startCursor_0: startCursor + endCursor_0: endCursor + } + edges { + user: node { + node_id: id + id: databaseId + login + avatar_url: avatarUrl + html_url: url + type: __typename + site_admin: isSiteAdmin + } + starred_at: starredAt + } + } + } + } + """ + + schema = th.PropertiesList( + # Parent Keys + th.Property("repo", th.StringType), + th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), + # Stargazer Info + th.Property("user_id", th.IntegerType), + th.Property("starred_at", th.DateTimeType), + th.Property("user", user_object), + ).to_dict() + + class StatsContributorsStream(GitHubRestStream): """ Defines 'StatsContributors' stream. Fetching contributors activity. diff --git a/tap_github/streams.py b/tap_github/streams.py index 2ae5df57..5a265468 100644 --- a/tap_github/streams.py +++ b/tap_github/streams.py @@ -5,6 +5,9 @@ from tap_github.repository_streams import ( AnonymousContributorsStream, + AssigneesStream, + CollaboratorsStream, + CommitCommentsStream, CommitsStream, CommunityProfileStream, ContributorsStream, @@ -13,26 +16,24 @@ IssueEventsStream, IssuesStream, LanguagesStream, + MilestonesStream, + ProjectCardsStream, + ProjectColumnsStream, + ProjectsStream, + PullRequestCommits, PullRequestsStream, ReadmeHtmlStream, ReadmeStream, + ReleasesStream, RepositoryStream, + ReviewCommentsStream, + ReviewsStream, + StargazersGraphqlStream, StargazersStream, StatsContributorsStream, - AssigneesStream, - CollaboratorsStream, - ReviewsStream, - ReviewCommentsStream, - ProjectsStream, - ProjectColumnsStream, - ProjectCardsStream, - PullRequestCommits, - MilestonesStream, - CommitCommentsStream, - ReleasesStream, - WorkflowsStream, WorkflowRunJobsStream, WorkflowRunsStream, + WorkflowsStream, ) from tap_github.user_streams import ( StarredStream, @@ -41,9 +42,9 @@ ) from tap_github.organization_streams import ( OrganizationStream, - TeamsStream, TeamMembersStream, TeamRolesStream, + TeamsStream, ) @@ -63,34 +64,35 @@ def __init__(self, valid_queries: Set[str], streams: List[Type[Stream]]): {"repositories", "organizations", "searches"}, [ AnonymousContributorsStream, - CommitsStream, + AssigneesStream, + CollaboratorsStream, CommitCommentsStream, + CommitsStream, CommunityProfileStream, ContributorsStream, EventsStream, - MilestonesStream, - ReleasesStream, - CollaboratorsStream, - AssigneesStream, - IssuesStream, IssueCommentsStream, IssueEventsStream, + IssuesStream, LanguagesStream, - PullRequestsStream, + MilestonesStream, + ProjectCardsStream, + ProjectColumnsStream, + ProjectsStream, PullRequestCommits, - ReviewsStream, - ReviewCommentsStream, + PullRequestsStream, ReadmeHtmlStream, ReadmeStream, + ReleasesStream, RepositoryStream, + ReviewCommentsStream, + ReviewsStream, + StargazersGraphqlStream, StargazersStream, StatsContributorsStream, - ProjectsStream, - ProjectColumnsStream, - ProjectCardsStream, - WorkflowsStream, WorkflowRunJobsStream, WorkflowRunsStream, + WorkflowsStream, ], ) USERS = ( @@ -103,7 +105,7 @@ def __init__(self, valid_queries: Set[str], streams: List[Type[Stream]]): ) ORGANIZATIONS = ( {"organizations"}, - [OrganizationStream, TeamsStream, TeamMembersStream, TeamRolesStream], + [OrganizationStream, TeamMembersStream, TeamRolesStream, TeamsStream], ) @classmethod diff --git a/tap_github/tests/fixtures.py b/tap_github/tests/fixtures.py index 16bd08ad..ec431e8e 100644 --- a/tap_github/tests/fixtures.py +++ b/tap_github/tests/fixtures.py @@ -1,3 +1,5 @@ +import os +import logging import datetime import pytest @@ -25,7 +27,7 @@ def repo_list_config(request): """ marker = request.node.get_closest_marker("repo_list") if marker is None: - repo_list = ["octocat/hello-world", "mapswipe/mapswipe"] + repo_list = ["MeltanoLabs/tap-github", "mapswipe/mapswipe"] else: repo_list = marker.args[0] @@ -93,3 +95,28 @@ def organization_list_config(request): "organizations": organization_list, "rate_limit_buffer": 100, } + + +def alternative_sync_chidren(self, child_context: dict) -> None: + """ + Override for Stream._sync_children. + Enabling us to use an ORG_LEVEL_TOKEN for the collaborators stream. + """ + for child_stream in self.child_streams: + # Use org:write access level credentials for collaborators stream + if child_stream.name in ["collaborators"]: + ORG_LEVEL_TOKEN = os.environ.get("ORG_LEVEL_TOKEN") + if not ORG_LEVEL_TOKEN: + logging.warning( + 'No "ORG_LEVEL_TOKEN" found. Skipping collaborators stream sync.' + ) + continue + SAVED_GTHUB_TOKEN = os.environ.get("GITHUB_TOKEN") + os.environ["GITHUB_TOKEN"] = ORG_LEVEL_TOKEN + child_stream.sync(context=child_context) + os.environ["GITHUB_TOKEN"] = SAVED_GTHUB_TOKEN or "" + continue + + # default behavior: + if child_stream.selected or child_stream.has_selected_descendents: + child_stream.sync(context=child_context) diff --git a/tap_github/tests/test_core.py b/tap_github/tests/test_core.py index edbc527a..59847341 100644 --- a/tap_github/tests/test_core.py +++ b/tap_github/tests/test_core.py @@ -3,7 +3,9 @@ import logging from unittest import mock +from unittest.mock import patch +from .fixtures import alternative_sync_chidren from singer_sdk.testing import get_standard_tap_tests from tap_github.tap import TapGitHub @@ -27,8 +29,11 @@ def test_standard_tap_tests_for_search_mode(search_config): def test_standard_tap_tests_for_repo_list_mode(repo_list_config): """Run standard tap tests from the SDK.""" tests = get_standard_tap_tests(TapGitHub, config=repo_list_config) - for test in tests: - test() + with patch( + "singer_sdk.streams.core.Stream._sync_children", alternative_sync_chidren + ): + for test in tests: + test() def test_standard_tap_tests_for_username_list_mode(username_list_config): diff --git a/tap_github/tests/test_tap.py b/tap_github/tests/test_tap.py index bdf6af54..c79c938e 100644 --- a/tap_github/tests/test_tap.py +++ b/tap_github/tests/test_tap.py @@ -9,7 +9,7 @@ from singer_sdk.helpers import _catalog as cat_helpers from tap_github.tap import TapGitHub -from .fixtures import repo_list_config, username_list_config +from .fixtures import alternative_sync_chidren, repo_list_config, username_list_config repo_list_2 = [ "MeltanoLabs/tap-github", @@ -49,31 +49,6 @@ def test_validate_repo_list_config(repo_list_config): assert partitions == repo_list_context -def alternative_sync_chidren(self, child_context: dict) -> None: - """ - Override for Stream._sync_children. - Enabling us to use an ORG_LEVEL_TOKEN for the collaborators sream. - """ - for child_stream in self.child_streams: - # Use org:write access level credentials for collaborators stream - if child_stream.name in ["collaborators"]: - ORG_LEVEL_TOKEN = os.environ.get("ORG_LEVEL_TOKEN") - if not ORG_LEVEL_TOKEN: - logging.warning( - 'No "ORG_LEVEL_TOKEN" found. Skipping collaborators stream sync.' - ) - continue - SAVED_GTHUB_TOKEN = os.environ.get("GITHUB_TOKEN") - os.environ["GITHUB_TOKEN"] = ORG_LEVEL_TOKEN - child_stream.sync(context=child_context) - os.environ["GITHUB_TOKEN"] = SAVED_GTHUB_TOKEN or "" - continue - - # default behavior: - if child_stream.selected or child_stream.has_selected_descendents: - child_stream.sync(context=child_context) - - def run_tap_with_config(capsys, config_obj: dict, skip_stream: Optional[str]) -> str: """ Run the tap with the given config and capture stdout, optionally