Skip to content

Commit

Permalink
Add a Graphql implementation of the repo StargazersStream (#123)
Browse files Browse the repository at this point in the history
Co-authored-by: Laurent Savaete <[email protected]>
  • Loading branch information
ericboucher and laurentS authored May 19, 2022
1 parent 38215c7 commit 44564b2
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 63 deletions.
22 changes: 16 additions & 6 deletions .github/workflows/test_tap.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
name: Test tap-github

on:
# Run on all pull requests and on pushes to master.
# Run on all pull requests and on pushes to main.
pull_request:
push:
branches:
- master
- main

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
tests:
Expand All @@ -26,13 +30,11 @@ jobs:
uses: actions/[email protected]
with:
# must match the path in tests/__init__.py
path: '**/api_calls_tests_cache.sqlite'
path: 'api_calls_tests_cache.sqlite'
# github cache expires after 1wk, and we expire the content after 24h
# this key should not need to change unless we need to clear the cache
key: api-cache-v3
restore-keys: |
api-cache-v3
api-cache-
restore-keys: api-cache-v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
Expand Down Expand Up @@ -61,5 +63,13 @@ jobs:
run: |
poetry run mypy . --ignore-missing-imports
- name: Test with pytest
id: test_pytest
continue-on-error: true
run: |
poetry run pytest --capture=no
- name: Test with pytest (run 2)
id: retry_test_pytest
if: steps.test_pytest.outcome=='failure' # check the step outcome, wait and retry
run: |
sleep 60m
poetry run pytest --capture=no
5 changes: 5 additions & 0 deletions tap_github/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,4 +333,9 @@ def get_url_params(
params["per_page"] = self.MAX_PER_PAGE
if next_page_token:
params.update(next_page_token)

since = self.get_starting_timestamp(context)
if self.replication_key and since:
params["since"] = str(since)

return params
110 changes: 109 additions & 1 deletion tap_github/repository_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
from singer_sdk import typing as th # JSON Schema typing helpers
from singer_sdk.helpers.jsonpath import extract_jsonpath

from dateutil.parser import parse
from urllib.parse import parse_qs, urlparse

from tap_github.client import GitHubGraphqlStream, GitHubRestStream
from tap_github.schema_objects import (
user_object,
Expand Down Expand Up @@ -1476,7 +1479,7 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]:
class StargazersStream(GitHubRestStream):
"""Defines 'Stargazers' stream. Warning: this stream does NOT track star deletions."""

name = "stargazers"
name = "stargazers_rest"
path = "/repos/{org}/{repo}/stargazers"
primary_keys = ["user_id", "repo", "org"]
parent_stream_type = RepositoryStream
Expand All @@ -1485,6 +1488,13 @@ class StargazersStream(GitHubRestStream):
# GitHub is missing the "since" parameter on this endpoint.
missing_since_parameter = True

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# TODO - remove warning with next release.
self.logger.warning(
"The stream 'stargazers_rest' is deprecated. Please use the Graphql version instead: 'stargazers'."
)

@property
def http_headers(self) -> dict:
"""Return the http headers needed.
Expand Down Expand Up @@ -1517,6 +1527,104 @@ def post_process(self, row: dict, context: Optional[Dict] = None) -> dict:
).to_dict()


class StargazersGraphqlStream(GitHubGraphqlStream):
"""Defines 'UserContributedToStream' stream. Warning: this stream 'only' gets the first 100 projects (by stars)."""

name = "stargazers"
query_jsonpath = "$.data.repository.stargazers.edges.[*]"
primary_keys = ["user_id", "repo_id"]
replication_key = "starred_at"
parent_stream_type = RepositoryStream
state_partitioning_keys = ["repo_id"]
# The parent repository object changes if the number of stargazers changes.
ignore_parent_replication_key = False

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# TODO - remove warning with next release.
self.logger.warning(
"This stream 'stargazers' might conflict with previous implementation. "
"Looking for the older version? Use 'stargazers_rest'."
)

def post_process(self, row: dict, context: Optional[Dict] = None) -> dict:
"""
Add a user_id top-level field to be used as state replication key.
"""
row["user_id"] = row["user"]["id"]
if context is not None:
row["repo_id"] = context["repo_id"]
return row

def get_next_page_token(
self, response: requests.Response, previous_token: Optional[Any]
) -> Optional[Any]:
"""
Exit early if a since parameter is provided.
"""
request_parameters = parse_qs(str(urlparse(response.request.url).query))

# parse_qs interprets "+" as a space, revert this to keep an aware datetime
try:
since = (
request_parameters["since"][0].replace(" ", "+")
if "since" in request_parameters
else ""
)
except IndexError:
since = ""

# If since parameter is present, try to exit early by looking at the last "starred_at".
# Noting that we are traversing in DESCENDING order by STARRED_AT.
if since:
results = extract_jsonpath(self.query_jsonpath, input=response.json())
*_, last = results
if parse(last["starred_at"]) < parse(since):
return None
return super().get_next_page_token(response, previous_token)

@property
def query(self) -> str:
"""Return dynamic GraphQL query."""
# Graphql id is equivalent to REST node_id. To keep the tap consistent, we rename "id" to "node_id".
return """
query repositoryStargazers($repo: String! $org: String! $nextPageCursor_0: String) {
repository(name: $repo owner: $org) {
stargazers(first: 100 orderBy: {field: STARRED_AT direction: DESC} after: $nextPageCursor_0) {
pageInfo {
hasNextPage_0: hasNextPage
startCursor_0: startCursor
endCursor_0: endCursor
}
edges {
user: node {
node_id: id
id: databaseId
login
avatar_url: avatarUrl
html_url: url
type: __typename
site_admin: isSiteAdmin
}
starred_at: starredAt
}
}
}
}
"""

schema = th.PropertiesList(
# Parent Keys
th.Property("repo", th.StringType),
th.Property("org", th.StringType),
th.Property("repo_id", th.IntegerType),
# Stargazer Info
th.Property("user_id", th.IntegerType),
th.Property("starred_at", th.DateTimeType),
th.Property("user", user_object),
).to_dict()


class StatsContributorsStream(GitHubRestStream):
"""
Defines 'StatsContributors' stream. Fetching contributors activity.
Expand Down
56 changes: 29 additions & 27 deletions tap_github/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

from tap_github.repository_streams import (
AnonymousContributorsStream,
AssigneesStream,
CollaboratorsStream,
CommitCommentsStream,
CommitsStream,
CommunityProfileStream,
ContributorsStream,
Expand All @@ -13,26 +16,24 @@
IssueEventsStream,
IssuesStream,
LanguagesStream,
MilestonesStream,
ProjectCardsStream,
ProjectColumnsStream,
ProjectsStream,
PullRequestCommits,
PullRequestsStream,
ReadmeHtmlStream,
ReadmeStream,
ReleasesStream,
RepositoryStream,
ReviewCommentsStream,
ReviewsStream,
StargazersGraphqlStream,
StargazersStream,
StatsContributorsStream,
AssigneesStream,
CollaboratorsStream,
ReviewsStream,
ReviewCommentsStream,
ProjectsStream,
ProjectColumnsStream,
ProjectCardsStream,
PullRequestCommits,
MilestonesStream,
CommitCommentsStream,
ReleasesStream,
WorkflowsStream,
WorkflowRunJobsStream,
WorkflowRunsStream,
WorkflowsStream,
)
from tap_github.user_streams import (
StarredStream,
Expand All @@ -41,9 +42,9 @@
)
from tap_github.organization_streams import (
OrganizationStream,
TeamsStream,
TeamMembersStream,
TeamRolesStream,
TeamsStream,
)


Expand All @@ -63,34 +64,35 @@ def __init__(self, valid_queries: Set[str], streams: List[Type[Stream]]):
{"repositories", "organizations", "searches"},
[
AnonymousContributorsStream,
CommitsStream,
AssigneesStream,
CollaboratorsStream,
CommitCommentsStream,
CommitsStream,
CommunityProfileStream,
ContributorsStream,
EventsStream,
MilestonesStream,
ReleasesStream,
CollaboratorsStream,
AssigneesStream,
IssuesStream,
IssueCommentsStream,
IssueEventsStream,
IssuesStream,
LanguagesStream,
PullRequestsStream,
MilestonesStream,
ProjectCardsStream,
ProjectColumnsStream,
ProjectsStream,
PullRequestCommits,
ReviewsStream,
ReviewCommentsStream,
PullRequestsStream,
ReadmeHtmlStream,
ReadmeStream,
ReleasesStream,
RepositoryStream,
ReviewCommentsStream,
ReviewsStream,
StargazersGraphqlStream,
StargazersStream,
StatsContributorsStream,
ProjectsStream,
ProjectColumnsStream,
ProjectCardsStream,
WorkflowsStream,
WorkflowRunJobsStream,
WorkflowRunsStream,
WorkflowsStream,
],
)
USERS = (
Expand All @@ -103,7 +105,7 @@ def __init__(self, valid_queries: Set[str], streams: List[Type[Stream]]):
)
ORGANIZATIONS = (
{"organizations"},
[OrganizationStream, TeamsStream, TeamMembersStream, TeamRolesStream],
[OrganizationStream, TeamMembersStream, TeamRolesStream, TeamsStream],
)

@classmethod
Expand Down
29 changes: 28 additions & 1 deletion tap_github/tests/fixtures.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os
import logging
import datetime

import pytest
Expand Down Expand Up @@ -25,7 +27,7 @@ def repo_list_config(request):
"""
marker = request.node.get_closest_marker("repo_list")
if marker is None:
repo_list = ["octocat/hello-world", "mapswipe/mapswipe"]
repo_list = ["MeltanoLabs/tap-github", "mapswipe/mapswipe"]
else:
repo_list = marker.args[0]

Expand Down Expand Up @@ -93,3 +95,28 @@ def organization_list_config(request):
"organizations": organization_list,
"rate_limit_buffer": 100,
}


def alternative_sync_chidren(self, child_context: dict) -> None:
"""
Override for Stream._sync_children.
Enabling us to use an ORG_LEVEL_TOKEN for the collaborators stream.
"""
for child_stream in self.child_streams:
# Use org:write access level credentials for collaborators stream
if child_stream.name in ["collaborators"]:
ORG_LEVEL_TOKEN = os.environ.get("ORG_LEVEL_TOKEN")
if not ORG_LEVEL_TOKEN:
logging.warning(
'No "ORG_LEVEL_TOKEN" found. Skipping collaborators stream sync.'
)
continue
SAVED_GTHUB_TOKEN = os.environ.get("GITHUB_TOKEN")
os.environ["GITHUB_TOKEN"] = ORG_LEVEL_TOKEN
child_stream.sync(context=child_context)
os.environ["GITHUB_TOKEN"] = SAVED_GTHUB_TOKEN or ""
continue

# default behavior:
if child_stream.selected or child_stream.has_selected_descendents:
child_stream.sync(context=child_context)
9 changes: 7 additions & 2 deletions tap_github/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import logging

from unittest import mock
from unittest.mock import patch

from .fixtures import alternative_sync_chidren
from singer_sdk.testing import get_standard_tap_tests

from tap_github.tap import TapGitHub
Expand All @@ -27,8 +29,11 @@ def test_standard_tap_tests_for_search_mode(search_config):
def test_standard_tap_tests_for_repo_list_mode(repo_list_config):
"""Run standard tap tests from the SDK."""
tests = get_standard_tap_tests(TapGitHub, config=repo_list_config)
for test in tests:
test()
with patch(
"singer_sdk.streams.core.Stream._sync_children", alternative_sync_chidren
):
for test in tests:
test()


def test_standard_tap_tests_for_username_list_mode(username_list_config):
Expand Down
Loading

0 comments on commit 44564b2

Please sign in to comment.