Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Graphql implementation of the repo StargazersStream #123

Merged
merged 25 commits into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
7d90097
initialization graphql stargazers
ericboucher May 17, 2022
0440ec0
Fix stargazers query
ericboucher May 18, 2022
89bb766
simplify query
ericboucher May 18, 2022
348f303
Update repository_streams.py
ericboucher May 18, 2022
42310c8
Add early exit with fake "since"
ericboucher May 18, 2022
ed77f27
Add new stream and order alphabetically
ericboucher May 18, 2022
3977dee
Add missing user params
ericboucher May 18, 2022
eec8d3d
Test different path for api_calls_tests_cache
ericboucher May 18, 2022
3b8e9b3
Retry pytest once on error
ericboucher May 18, 2022
f46c2e1
Fix push target in github action
ericboucher May 18, 2022
6ee2765
Update test_tap.yml
ericboucher May 18, 2022
7d43ea6
Update test_tap.yml
ericboucher May 18, 2022
2b433c6
Update test_tap.yml
ericboucher May 18, 2022
8ad4f69
Update test_tap.yml
ericboucher May 18, 2022
5b5b64c
Use alternative_sync_chidren in tap_core
ericboucher May 18, 2022
9e0dd6d
Update test_tap.yml
ericboucher May 18, 2022
6c58936
Update test_tap.yml
ericboucher May 18, 2022
0bb3531
Update test_tap.yml
ericboucher May 18, 2022
b58691a
Update tap_github/tests/fixtures.py
ericboucher May 18, 2022
288dd81
Override REST stream
ericboucher May 19, 2022
dc52830
Update repository_streams.py
ericboucher May 19, 2022
79d1970
Update test_tap.yml
ericboucher May 19, 2022
443825b
Revert "Override REST stream"
ericboucher May 19, 2022
1c79c06
Keep both streams but add warning
ericboucher May 19, 2022
862e7ca
Update repository_streams.py
ericboucher May 19, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions .github/workflows/test_tap.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
name: Test tap-github

on:
# Run on all pull requests and on pushes to master.
# Run on all pull requests and on pushes to main.
pull_request:
push:
branches:
- master
- main

jobs:
tests:
Expand All @@ -26,13 +26,11 @@ jobs:
uses: actions/[email protected]
with:
# must match the path in tests/__init__.py
path: '**/api_calls_tests_cache.sqlite'
path: 'api_calls_tests_cache.sqlite'
# github cache expires after 1wk, and we expire the content after 24h
# this key should not need to change unless we need to clear the cache
key: api-cache-v3
restore-keys: |
api-cache-v3
api-cache-
restore-keys: api-cache-v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
Expand Down Expand Up @@ -61,5 +59,13 @@ jobs:
run: |
poetry run mypy . --ignore-missing-imports
- name: Test with pytest
id: test_pytest
continue-on-error: true
run: |
poetry run pytest --capture=no
- name: Test with pytest (run 2)
id: retry_test_pytest
if: steps.test_pytest.outcome=='failure' # check the step outcome, wait and retry
run: |
sleep 60m
poetry run pytest --capture=no
5 changes: 5 additions & 0 deletions tap_github/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,4 +333,9 @@ def get_url_params(
params["per_page"] = self.MAX_PER_PAGE
if next_page_token:
params.update(next_page_token)

since = self.get_starting_timestamp(context)
if self.replication_key and since:
params["since"] = str(since)

return params
93 changes: 93 additions & 0 deletions tap_github/repository_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
from singer_sdk import typing as th # JSON Schema typing helpers
from singer_sdk.helpers.jsonpath import extract_jsonpath

from dateutil.parser import parse
from urllib.parse import parse_qs, urlparse

from tap_github.client import GitHubGraphqlStream, GitHubRestStream
from tap_github.schema_objects import (
user_object,
Expand Down Expand Up @@ -1517,6 +1520,96 @@ def post_process(self, row: dict, context: Optional[Dict] = None) -> dict:
).to_dict()


class StargazersGraphqlStream(GitHubGraphqlStream):
"""Defines 'UserContributedToStream' stream. Warning: this stream 'only' gets the first 100 projects (by stars)."""

name = "stargazers"
ericboucher marked this conversation as resolved.
Show resolved Hide resolved
query_jsonpath = "$.data.repository.stargazers.edges.[*]"
primary_keys = ["user_id", "repo_id"]
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved
replication_key = "starred_at"
parent_stream_type = RepositoryStream
state_partitioning_keys = ["repo_id"]
# The parent repository object changes if the number of stargazers changes.
ignore_parent_replication_key = False

def post_process(self, row: dict, context: Optional[Dict] = None) -> dict:
"""
Add a user_id top-level field to be used as state replication key.
"""
row["user_id"] = row["user"]["id"]
if context is not None:
row["repo_id"] = context["repo_id"]
return row

def get_next_page_token(
self, response: requests.Response, previous_token: Optional[Any]
) -> Optional[Any]:
"""
Exit early if a since parameter is provided.
"""
request_parameters = parse_qs(str(urlparse(response.request.url).query))

# parse_qs interprets "+" as a space, revert this to keep an aware datetime
try:
since = (
request_parameters["since"][0].replace(" ", "+")
if "since" in request_parameters
else ""
)
except IndexError:
since = ""

# If since parameter is present, try to exit early by looking at the last "starred_at".
# Noting that we are traversing in DESCENDING order by STARRED_AT.
if since:
results = extract_jsonpath(self.query_jsonpath, input=response.json())
*_, last = results
if parse(last["starred_at"]) < parse(since):
return None
return super().get_next_page_token(response, previous_token)

@property
def query(self) -> str:
"""Return dynamic GraphQL query."""
# Graphql id is equivalent to REST node_id. To keep the tap consistent, we rename "id" to "node_id".
return """
query repositoryStargazers($repo: String! $org: String! $nextPageCursor_0: String) {
repository(name: $repo owner: $org) {
stargazers(first: 100 orderBy: {field: STARRED_AT direction: DESC} after: $nextPageCursor_0) {
pageInfo {
hasNextPage_0: hasNextPage
startCursor_0: startCursor
endCursor_0: endCursor
}
edges {
user: node {
node_id: id
id: databaseId
login
avatar_url: avatarUrl
html_url: url
type: __typename
site_admin: isSiteAdmin
}
starred_at: starredAt
}
}
}
}
"""

schema = th.PropertiesList(
# Parent Keys
th.Property("repo", th.StringType),
th.Property("org", th.StringType),
th.Property("repo_id", th.IntegerType),
# Stargazer Info
th.Property("user_id", th.IntegerType),
th.Property("starred_at", th.DateTimeType),
th.Property("user", user_object),
).to_dict()


class StatsContributorsStream(GitHubRestStream):
"""
Defines 'StatsContributors' stream. Fetching contributors activity.
Expand Down
56 changes: 29 additions & 27 deletions tap_github/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

from tap_github.repository_streams import (
AnonymousContributorsStream,
AssigneesStream,
CollaboratorsStream,
CommitCommentsStream,
CommitsStream,
CommunityProfileStream,
ContributorsStream,
Expand All @@ -13,26 +16,24 @@
IssueEventsStream,
IssuesStream,
LanguagesStream,
MilestonesStream,
ProjectCardsStream,
ProjectColumnsStream,
ProjectsStream,
PullRequestCommits,
PullRequestsStream,
ReadmeHtmlStream,
ReadmeStream,
ReleasesStream,
RepositoryStream,
ReviewCommentsStream,
ReviewsStream,
StargazersGraphqlStream,
StargazersStream,
StatsContributorsStream,
AssigneesStream,
CollaboratorsStream,
ReviewsStream,
ReviewCommentsStream,
ProjectsStream,
ProjectColumnsStream,
ProjectCardsStream,
PullRequestCommits,
MilestonesStream,
CommitCommentsStream,
ReleasesStream,
WorkflowsStream,
WorkflowRunJobsStream,
WorkflowRunsStream,
WorkflowsStream,
)
from tap_github.user_streams import (
StarredStream,
Expand All @@ -41,9 +42,9 @@
)
from tap_github.organization_streams import (
OrganizationStream,
TeamsStream,
TeamMembersStream,
TeamRolesStream,
TeamsStream,
)


Expand All @@ -63,34 +64,35 @@ def __init__(self, valid_queries: Set[str], streams: List[Type[Stream]]):
{"repositories", "organizations", "searches"},
[
AnonymousContributorsStream,
CommitsStream,
AssigneesStream,
CollaboratorsStream,
CommitCommentsStream,
CommitsStream,
CommunityProfileStream,
ContributorsStream,
EventsStream,
MilestonesStream,
ReleasesStream,
CollaboratorsStream,
AssigneesStream,
IssuesStream,
IssueCommentsStream,
IssueEventsStream,
IssuesStream,
LanguagesStream,
PullRequestsStream,
MilestonesStream,
ProjectCardsStream,
ProjectColumnsStream,
ProjectsStream,
PullRequestCommits,
ReviewsStream,
ReviewCommentsStream,
PullRequestsStream,
ReadmeHtmlStream,
ReadmeStream,
ReleasesStream,
RepositoryStream,
ReviewCommentsStream,
ReviewsStream,
StargazersGraphqlStream,
StargazersStream,
StatsContributorsStream,
ProjectsStream,
ProjectColumnsStream,
ProjectCardsStream,
WorkflowsStream,
WorkflowRunJobsStream,
WorkflowRunsStream,
WorkflowsStream,
],
)
USERS = (
Expand All @@ -103,7 +105,7 @@ def __init__(self, valid_queries: Set[str], streams: List[Type[Stream]]):
)
ORGANIZATIONS = (
{"organizations"},
[OrganizationStream, TeamsStream, TeamMembersStream, TeamRolesStream],
[OrganizationStream, TeamMembersStream, TeamRolesStream, TeamsStream],
)

@classmethod
Expand Down
29 changes: 28 additions & 1 deletion tap_github/tests/fixtures.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os
import logging
import datetime

import pytest
Expand Down Expand Up @@ -25,7 +27,7 @@ def repo_list_config(request):
"""
marker = request.node.get_closest_marker("repo_list")
if marker is None:
repo_list = ["octocat/hello-world", "mapswipe/mapswipe"]
repo_list = ["MeltanoLabs/tap-github", "mapswipe/mapswipe"]
else:
repo_list = marker.args[0]

Expand Down Expand Up @@ -93,3 +95,28 @@ def organization_list_config(request):
"organizations": organization_list,
"rate_limit_buffer": 100,
}


def alternative_sync_chidren(self, child_context: dict) -> None:
"""
Override for Stream._sync_children.
Enabling us to use an ORG_LEVEL_TOKEN for the collaborators sream.
ericboucher marked this conversation as resolved.
Show resolved Hide resolved
"""
for child_stream in self.child_streams:
# Use org:write access level credentials for collaborators stream
if child_stream.name in ["collaborators"]:
ORG_LEVEL_TOKEN = os.environ.get("ORG_LEVEL_TOKEN")
if not ORG_LEVEL_TOKEN:
logging.warning(
'No "ORG_LEVEL_TOKEN" found. Skipping collaborators stream sync.'
)
continue
SAVED_GTHUB_TOKEN = os.environ.get("GITHUB_TOKEN")
os.environ["GITHUB_TOKEN"] = ORG_LEVEL_TOKEN
child_stream.sync(context=child_context)
os.environ["GITHUB_TOKEN"] = SAVED_GTHUB_TOKEN or ""
continue

# default behavior:
if child_stream.selected or child_stream.has_selected_descendents:
child_stream.sync(context=child_context)
9 changes: 7 additions & 2 deletions tap_github/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import logging

from unittest import mock
from unittest.mock import patch

from .fixtures import alternative_sync_chidren
from singer_sdk.testing import get_standard_tap_tests

from tap_github.tap import TapGitHub
Expand All @@ -27,8 +29,11 @@ def test_standard_tap_tests_for_search_mode(search_config):
def test_standard_tap_tests_for_repo_list_mode(repo_list_config):
"""Run standard tap tests from the SDK."""
tests = get_standard_tap_tests(TapGitHub, config=repo_list_config)
for test in tests:
test()
with patch(
"singer_sdk.streams.core.Stream._sync_children", alternative_sync_chidren
):
for test in tests:
test()


def test_standard_tap_tests_for_username_list_mode(username_list_config):
Expand Down
27 changes: 1 addition & 26 deletions tap_github/tests/test_tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from singer_sdk.helpers import _catalog as cat_helpers
from tap_github.tap import TapGitHub

from .fixtures import repo_list_config, username_list_config
from .fixtures import alternative_sync_chidren, repo_list_config, username_list_config

repo_list_2 = [
"MeltanoLabs/tap-github",
Expand Down Expand Up @@ -49,31 +49,6 @@ def test_validate_repo_list_config(repo_list_config):
assert partitions == repo_list_context


def alternative_sync_chidren(self, child_context: dict) -> None:
"""
Override for Stream._sync_children.
Enabling us to use an ORG_LEVEL_TOKEN for the collaborators sream.
"""
for child_stream in self.child_streams:
# Use org:write access level credentials for collaborators stream
if child_stream.name in ["collaborators"]:
ORG_LEVEL_TOKEN = os.environ.get("ORG_LEVEL_TOKEN")
if not ORG_LEVEL_TOKEN:
logging.warning(
'No "ORG_LEVEL_TOKEN" found. Skipping collaborators stream sync.'
)
continue
SAVED_GTHUB_TOKEN = os.environ.get("GITHUB_TOKEN")
os.environ["GITHUB_TOKEN"] = ORG_LEVEL_TOKEN
child_stream.sync(context=child_context)
os.environ["GITHUB_TOKEN"] = SAVED_GTHUB_TOKEN or ""
continue

# default behavior:
if child_stream.selected or child_stream.has_selected_descendents:
child_stream.sync(context=child_context)


def run_tap_with_config(capsys, config_obj: dict, skip_stream: Optional[str]) -> str:
"""
Run the tap with the given config and capture stdout, optionally
Expand Down