-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathwordclouds.py
153 lines (128 loc) · 4.64 KB
/
wordclouds.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import aiohttp
import asyncio
import discord
import functools
import re
from io import BytesIO
import numpy as np
import os
from PIL import Image
from discord.ext import commands
from wordcloud import WordCloud as WCloud
from wordcloud import ImageColorGenerator
# Special thanks to co-author aikaterna for pressing onward
# with this cog when I had lost motivation!
URL_RE = re.compile(
r"([\w+]+\:\/\/)?([\w\d-]+\.)*[\w-]+[\.\:]\w+([\/\?\=\&\#]?[\w-]+)*\/?", flags=re.I
)
# https://stackoverflow.com/questions/6038061/regular-expression-to-find-urls-within-a-string
class WordClouds(commands.Cog):
"""Word Clouds"""
def __init__(self, bot):
self.bot = bot
self.session = aiohttp.ClientSession()
@commands.guild_only()
@commands.command(name="wordcloud", aliases=["wc"])
@commands.cooldown(1, 15, commands.BucketType.guild)
async def wordcloud(self, ctx, *argv):
"""Generate a wordcloud. Optional arguments are channel, user, and
message limit (capped at 10,000)."""
author = ctx.author
channel = ctx.channel
user = None
limit = 10000
# a bit clunky, see if Red has already implemented converters
channel_converter = commands.TextChannelConverter()
member_converter = commands.MemberConverter()
for arg in argv:
try:
channel = await channel_converter.convert(ctx, arg)
continue
except discord.ext.commands.BadArgument:
pass
try:
user = await member_converter.convert(ctx, arg)
continue
except discord.ext.commands.BadArgument:
pass
if arg.isdecimal() and int(arg) <= 10000:
limit = int(arg)
guild = channel.guild
# Verify that wordcloud requester is not being a sneaky snek
if not channel.permissions_for(author).read_messages or guild != ctx.guild:
await ctx.send(":smirk: Nice try.")
return
# Default settings
mask = None
coloring = None
width = 800
height = 600
mode = "RGB"
bg_color = "deepskyblue"
if bg_color == "clear":
mode += "A"
bg_color = None
max_words = 200
if max_words == 0:
max_words = 200
excluded = []
if not excluded:
excluded = None
mask_name = None
kwargs = {
"mask": mask,
"color_func": coloring,
"mode": mode,
"background_color": bg_color,
"max_words": max_words,
"stopwords": excluded,
"width": width,
"height": height,
}
msg = "Generating wordcloud for **" + guild.name + "/" + channel.name
if user is not None:
msg += "/" + user.display_name
msg += "** using the last {} messages. (this might take a while)".format(limit)
await ctx.send(msg)
text = ""
try:
async for message in channel.history(limit=limit):
if not message.author.bot:
if user is None or user == message.author:
text += message.clean_content + " "
text = URL_RE.sub("", text)
except discord.errors.Forbidden:
await ctx.send("Wordcloud creation failed. I can't see that channel!")
return
if not text or text.isspace():
await ctx.send(
"Wordcloud creation failed. I couldn't find "
"any words. You may have entered a very small "
"message limit, or I may not have permission "
"to view message history in that channel."
)
return
task = functools.partial(self.generate, text, **kwargs)
task = self.bot.loop.run_in_executor(None, task)
try:
image = await asyncio.wait_for(task, timeout=45)
except asyncio.TimeoutError:
await ctx.send("Wordcloud creation timed out.")
return
msg = "Wordcloud for **" + guild.name + "/" + channel.name
if user is not None:
msg += "/" + user.display_name
msg += "** using the last {} messages.".format(limit)
await ctx.send(msg, file=discord.File(image))
@staticmethod
def generate(text, **kwargs):
# Designed to be run in executor to avoid blocking
wc = WCloud(**kwargs)
wc.generate(text)
file = BytesIO()
file.name = "wordcloud.png"
wc.to_file(file)
file.seek(0)
return file
def setup(bot):
bot.add_cog(WordClouds(bot))