From 4de9b6adfa92d3ab7dad62034ad567ccb8517aaa Mon Sep 17 00:00:00 2001 From: Samira El Aabidi <54845154+Samira-El@users.noreply.github.com> Date: Tue, 28 Feb 2023 15:36:50 +0000 Subject: [PATCH] AP-1335 circumvent rate limits in users endpoint (#29) --- .github/workflows/ci.yml | 2 +- setup.py | 2 +- tap_slack/__init__.py | 4 ++-- tap_slack/client.py | 4 ++-- tap_slack/streams.py | 15 +++++++++++---- 5 files changed, 17 insertions(+), 10 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c1c9af2..66fd868 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -12,7 +12,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [ 3.6, 3.7, 3.8 ] + python-version: [ 3.7, 3.8, 3.9 ] steps: - name: Checkout repository diff --git a/setup.py b/setup.py index 13d7723..10f3027 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,7 @@ py_modules=['tap_slack'], install_requires=[ 'pipelinewise-singer-python==1.*', - 'slackclient==2.6.0', + 'slack-sdk==3.20.0', ], extras_require={ 'test': [ diff --git a/tap_slack/__init__.py b/tap_slack/__init__.py index 30aec19..ff1513c 100644 --- a/tap_slack/__init__.py +++ b/tap_slack/__init__.py @@ -1,13 +1,13 @@ import sys import json import singer -from slack import WebClient +from slack_sdk import WebClient from tap_slack.client import SlackClient from tap_slack.streams import AVAILABLE_STREAMS from tap_slack.catalog import generate_catalog -LOGGER = singer.get_logger() +LOGGER = singer.get_logger(__name__) def auto_join(client, config): diff --git a/tap_slack/client.py b/tap_slack/client.py index 1c22187..2c990ba 100644 --- a/tap_slack/client.py +++ b/tap_slack/client.py @@ -6,9 +6,9 @@ import backoff import singer -from slack.errors import SlackApiError +from slack_sdk.errors import SlackApiError -LOGGER = singer.get_logger() +LOGGER = singer.get_logger(__name__) class SlackClient(object): diff --git a/tap_slack/streams.py b/tap_slack/streams.py index bb9a1b1..6da6de0 100644 --- a/tap_slack/streams.py +++ b/tap_slack/streams.py @@ -8,7 +8,7 @@ from tap_slack.transform import transform_json -LOGGER = singer.get_logger() +LOGGER = singer.get_logger(__name__) DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S' utc = pytz.UTC @@ -328,11 +328,17 @@ def sync(self, mdata): bookmark = self.config.get('start_date') new_bookmark = bookmark + LOGGER.info('Fetching all users that have been updated since %s', bookmark) + # pylint: disable=unused-variable with singer.metrics.job_timer(job_type='list_users') as timer: with singer.metrics.record_counter(endpoint=self.name) as counter: - users_list = self.client.get_users(limit=100) + # API returns users in no particular order. + # let's fetch 1000 users per page for now, it saves on api requests and avoids running + # into rate limiting soon + users_list = self.client.get_users(limit=1000) + # this will encounter rate limit at some point for page in users_list: users = page.get('members') transformed_users = transform_json(stream=self.name, data=users, @@ -342,8 +348,8 @@ def sync(self, mdata): integer_datetime_fmt="unix-seconds-integer-datetime-parsing") \ as transformer: transformed_record = transformer.transform(data=user, schema=schema, - metadata=metadata.to_map( - mdata)) + metadata=metadata.to_map(mdata)) + new_bookmark = max(new_bookmark, transformed_record.get('updated')) if transformed_record.get('updated') > bookmark: if self.write_to_singer: @@ -352,6 +358,7 @@ def sync(self, mdata): record=transformed_record) counter.increment() + LOGGER.info('Updating users state bookmark to %s', new_bookmark) self.state = singer.write_bookmark(state=self.state, tap_stream_id=self.name, key=self.replication_key, val=new_bookmark)