Skip to content

Clean PR: Slack member sync implementation #1629

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 113 additions & 88 deletions backend/apps/slack/management/commands/slack_sync_data.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
"""A command to populate Slack channels and members data based on workspaces's bot tokens."""
"""Management command to sync Slack workspaces, channels, and members."""

import logging
import time
import os
import random
import secrets
import time

from django.core.management.base import BaseCommand
from slack_sdk import WebClient
Expand All @@ -10,25 +14,15 @@
from apps.slack.models import Conversation, Member, Workspace

logger = logging.getLogger(__name__)
MAX_RETRIES = 5


class Command(BaseCommand):
help = "Populate channels and members for all Slack workspaces using their bot tokens"

def add_arguments(self, parser):
"""Define command line arguments."""
parser.add_argument(
"--batch-size",
type=int,
default=1000,
help="Number of conversations to retrieve per request",
)
parser.add_argument(
"--delay",
type=float,
default=0.5,
help="Delay between API requests in seconds",
)
parser.add_argument("--batch-size", type=int, default=1000)
parser.add_argument("--delay", type=float, default=0.5)

def handle(self, *args, **options):
batch_size = options["batch_size"]
Expand All @@ -41,90 +35,121 @@ def handle(self, *args, **options):

for workspace in workspaces:
self.stdout.write(f"\nProcessing workspace: {workspace}")

if not (bot_token := workspace.bot_token):
bot_token = (
(getattr(workspace, "bot_token", "") or "").strip()
or os.environ.get("DJANGO_SLACK_BOT_TOKEN")
)
if not bot_token:
self.stdout.write(self.style.ERROR(f"No bot token found for {workspace}"))
continue

client = WebClient(token=bot_token)

self.stdout.write(f"Fetching conversations for {workspace}...")
conversations = []
total_channels = 0
try:
cursor = None
while True:
response = client.conversations_list(
cursor=cursor,
exclude_archived=False,
limit=batch_size,
timeout=30,
types="public_channel,private_channel",
)
self._handle_slack_response(response, "conversations_list")

conversations.extend(
member
for conversation_data in response["channels"]
if (member := Conversation.update_data(conversation_data, workspace))
)
total_channels += len(response["channels"])
self._fetch_conversations(client, workspace, batch_size, delay)
self._fetch_members(client, workspace, batch_size, delay)

if not (cursor := response.get("response_metadata", {}).get("next_cursor")):
break

if delay:
time.sleep(delay)
except SlackApiError as e:
self.stdout.write(
self.style.ERROR(f"Failed to fetch conversations: {e.response['error']}")
)
if conversations:
Conversation.bulk_save(conversations)
self.stdout.write(self.style.SUCCESS(f"Populated {total_channels} channels"))
self.stdout.write(self.style.SUCCESS("\nFinished processing all workspaces"))

self.stdout.write(f"Fetching members for {workspace}...")
members = []
total_members = 0
def _call_slack_api(self, func, *args, **kwargs):
def _raise_max_retries():
raise RuntimeError("Max retries exceeded while calling Slack API")
retries = 0
while retries < MAX_RETRIES:
try:
cursor = None
while True:
response = client.users_list(
cursor=cursor,
limit=batch_size,
timeout=30,
)
self._handle_slack_response(response, "users_list")

members.extend(
member
for member_data in response["members"]
if (member := Member.update_data(member_data, workspace))
)
total_members += len(response["members"])

cursor = response.get("response_metadata", {}).get("next_cursor")
if not cursor:
break
response = func(*args, **kwargs)
if not response.get("ok", False):
raise RuntimeError(f"{func.__name__} returned ok=False: {response!r}")
return response
except SlackApiError as e:
self.stdout.write(
self.style.ERROR(f"Failed to fetch members: {e.response['error']}")
if e.response.status_code == 429:
retry_after = int(e.response.headers.get("Retry-After", 1))
self.stdout.write(
self.style.WARNING(f"Rate limited. Sleeping {retry_after}s...")
)
time.sleep(retry_after)
else:
raise
retries += 1
backoff = 2**retries + secrets.randbelow(1000) / 1000
time.sleep(backoff)
_raise_max_retries()
return # explicit return to satisfy RET503

def _fetch_conversations(self, client, workspace, batch_size, delay):
self.stdout.write(f"Fetching conversations for {workspace}...")
conversations = []
total_channels = 0
try:
cursor = None
while True:
response = self._call_slack_api(
client.conversations_list,
cursor=cursor,
exclude_archived=False,
limit=batch_size,
timeout=30,
types="public_channel,private_channel",
)

if members:
Member.bulk_save(members)

# Update the workspace with the total members count.
workspace.total_members_count = total_members
workspace.save(update_fields=["total_members_count"])
for conversation_data in response["channels"]:
if "num_members" not in conversation_data:
try:
info = self._call_slack_api(
client.conversations_info,
channel=conversation_data["id"],
include_num_members=True,
)
conversation_data["num_members"] = info["channel"].get("num_members")
except SlackApiError as e:
self.stdout.write(
self.style.WARNING(
f"Failed to get member count for {conversation_data['id']}: {e}"
)
)
conversation_data["num_members"] = None

if (conversation := Conversation.update_data(conversation_data, workspace)):
conversations.append(conversation)

total_channels += len(response["channels"])
cursor = response.get("response_metadata", {}).get("next_cursor")
if not cursor:
break
time.sleep(delay)
except SlackApiError as e:
self.stdout.write(self.style.ERROR(f"Error fetching conversations: {e}"))

if conversations:
Conversation.bulk_save(conversations)
self.stdout.write(self.style.SUCCESS(f"Populated {total_channels} channels"))

def _fetch_members(self, client, workspace, batch_size, delay):
self.stdout.write(f"Fetching members for {workspace}...")
members = []
total_members = 0
try:
cursor = None
while True:
response = self._call_slack_api(
client.users_list,
cursor=cursor,
limit=batch_size,
timeout=30,
)

members.extend(
member
for member_data in response["members"]
if (member := Member.update_data(member_data, workspace))
)
total_members += len(response["members"])
cursor = response.get("response_metadata", {}).get("next_cursor")
if not cursor:
break
time.sleep(delay)
except SlackApiError as e:
self.stdout.write(self.style.ERROR(f"Error fetching members: {e}"))

if members:
Member.bulk_save(members)
self.stdout.write(self.style.SUCCESS(f"Populated {total_members} members"))

self.stdout.write(self.style.SUCCESS("\nFinished processing all workspaces"))

def _handle_slack_response(self, response, api_method):
"""Handle Slack API response and raise exception if needed."""
if not response["ok"]:
error_message = f"{api_method} API call failed"
logger.error(error_message)
self.stdout.write(self.style.ERROR(error_message))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the migration if no changes in the models?

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Generated by Django 5.2.1 on 2025-06-11 08:36

from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
('slack', '0013_alter_conversation_total_members_count_and_more'),
]

operations = [
migrations.RemoveField(
model_name='workspace',
name='total_members_count',
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 5.2.3 on 2025-06-18 11:51

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('slack', '0016_merge_20250618_1130'),
]

operations = [
migrations.AddField(
model_name='workspace',
name='total_members_count',
field=models.IntegerField(blank=True, null=True),
),
]
Loading
Loading