Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
AP-1335 circumvent rate limits in users endpoint (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
Samira-El authored Feb 28, 2023
1 parent 313ce9b commit 4de9b6a
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [ 3.6, 3.7, 3.8 ]
python-version: [ 3.7, 3.8, 3.9 ]

steps:
- name: Checkout repository
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
py_modules=['tap_slack'],
install_requires=[
'pipelinewise-singer-python==1.*',
'slackclient==2.6.0',
'slack-sdk==3.20.0',
],
extras_require={
'test': [
Expand Down
4 changes: 2 additions & 2 deletions tap_slack/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import sys
import json
import singer
from slack import WebClient
from slack_sdk import WebClient

from tap_slack.client import SlackClient
from tap_slack.streams import AVAILABLE_STREAMS
from tap_slack.catalog import generate_catalog

LOGGER = singer.get_logger()
LOGGER = singer.get_logger(__name__)


def auto_join(client, config):
Expand Down
4 changes: 2 additions & 2 deletions tap_slack/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

import backoff
import singer
from slack.errors import SlackApiError
from slack_sdk.errors import SlackApiError

LOGGER = singer.get_logger()
LOGGER = singer.get_logger(__name__)


class SlackClient(object):
Expand Down
15 changes: 11 additions & 4 deletions tap_slack/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from tap_slack.transform import transform_json

LOGGER = singer.get_logger()
LOGGER = singer.get_logger(__name__)
DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S'
utc = pytz.UTC

Expand Down Expand Up @@ -328,11 +328,17 @@ def sync(self, mdata):
bookmark = self.config.get('start_date')
new_bookmark = bookmark

LOGGER.info('Fetching all users that have been updated since %s', bookmark)

# pylint: disable=unused-variable
with singer.metrics.job_timer(job_type='list_users') as timer:
with singer.metrics.record_counter(endpoint=self.name) as counter:
users_list = self.client.get_users(limit=100)
# API returns users in no particular order.
# let's fetch 1000 users per page for now, it saves on api requests and avoids running
# into rate limiting soon
users_list = self.client.get_users(limit=1000)

# this will encounter rate limit at some point
for page in users_list:
users = page.get('members')
transformed_users = transform_json(stream=self.name, data=users,
Expand All @@ -342,8 +348,8 @@ def sync(self, mdata):
integer_datetime_fmt="unix-seconds-integer-datetime-parsing") \
as transformer:
transformed_record = transformer.transform(data=user, schema=schema,
metadata=metadata.to_map(
mdata))
metadata=metadata.to_map(mdata))

new_bookmark = max(new_bookmark, transformed_record.get('updated'))
if transformed_record.get('updated') > bookmark:
if self.write_to_singer:
Expand All @@ -352,6 +358,7 @@ def sync(self, mdata):
record=transformed_record)
counter.increment()

LOGGER.info('Updating users state bookmark to %s', new_bookmark)
self.state = singer.write_bookmark(state=self.state, tap_stream_id=self.name,
key=self.replication_key, val=new_bookmark)

Expand Down

0 comments on commit 4de9b6a

Please sign in to comment.