Skip to content

Commit 8788e76

Browse files
committed
Locks discord events in Redis for replicated shard operation
1 parent ab137ff commit 8788e76

File tree

5 files changed

+66
-27
lines changed

5 files changed

+66
-27
lines changed

discord/bot.go

+7-13
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ func (bot *Bot) GracefulClose() {
144144
func (bot *Bot) Close() {
145145
bot.SessionManager.Close()
146146
bot.RedisInterface.Close()
147+
bot.StorageInterface.Close()
147148
}
148149

149150
func (bot *Bot) PurgeConnection(socketID string) {
@@ -200,15 +201,18 @@ func (bot *Bot) newGuild(emojiGuildID string) func(s *discordgo.Session, m *disc
200201
bot.addAllMissingEmojis(s, m.Guild.ID, false, allEmojis)
201202
}
202203

203-
games := bot.RedisInterface.LoadAllActiveGamesAndDelete(m.Guild.ID)
204+
games := bot.RedisInterface.LoadAllActiveGames(m.Guild.ID)
204205

205206
for _, connCode := range games {
206207
gsr := GameStateRequest{
207208
GuildID: m.Guild.ID,
208209
ConnectCode: connCode,
209210
}
210211
lock, dgs := bot.RedisInterface.GetDiscordGameStateAndLock(gsr)
211-
if lock != nil && dgs != nil && !dgs.Subscribed && dgs.ConnectCode != "" {
212+
for lock == nil {
213+
lock, dgs = bot.RedisInterface.GetDiscordGameStateAndLock(gsr)
214+
}
215+
if dgs != nil && dgs.ConnectCode != "" {
212216
log.Println("Resubscribing to Redis events for an old game: " + connCode)
213217
killChan := make(chan EndGameMessage)
214218
go bot.SubscribeToGameByConnectCode(gsr.GuildID, dgs.ConnectCode, killChan)
@@ -219,19 +223,9 @@ func (bot *Bot) newGuild(emojiGuildID string) func(s *discordgo.Session, m *disc
219223
bot.ChannelsMapLock.Lock()
220224
bot.EndGameChannels[dgs.ConnectCode] = killChan
221225
bot.ChannelsMapLock.Unlock()
222-
} else if lock != nil {
223-
//log.Println("UNLOCKING")
224-
lock.Release(ctx)
225226
}
227+
lock.Release(ctx)
226228
}
227-
228-
//if len(games) == 0 {
229-
// dsg := NewDiscordGameState(m.Guild.ID)
230-
//
231-
// //put an empty entry in Redis
232-
// bot.RedisInterface.SetDiscordGameState(dsg, nil)
233-
//}
234-
235229
}
236230
}
237231

discord/eventHandler.go

+1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ func (bot *Bot) SubscribeToGameByConnectCode(guildID, connectCode string, endGam
5858
}
5959
log.Println("Popped job w/ payload " + job.Payload.(string))
6060
bot.refreshGameLiveness(connectCode)
61+
bot.RedisInterface.RefreshActiveGame(guildID, connectCode)
6162

6263
switch job.JobType {
6364
case broker.Connection:

discord/message_handlers.go

+21-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,13 @@ func (bot *Bot) handleMessageCreate(s *discordgo.Session, m *discordgo.MessageCr
2323
return
2424
}
2525

26+
lock := bot.RedisInterface.LockSnowflake(m.ID)
27+
//couldn't obtain lock; bail bail bail!
28+
if lock == nil {
29+
return
30+
}
31+
defer lock.Release(ctx)
32+
2633
g, err := s.State.Guild(m.GuildID)
2734
if err != nil {
2835
log.Println(err)
@@ -89,6 +96,12 @@ func (bot *Bot) handleReactionGameStartAdd(s *discordgo.Session, m *discordgo.Me
8996
if m.UserID == s.State.User.ID {
9097
return
9198
}
99+
lock := bot.RedisInterface.LockSnowflake(m.ChannelID + m.UserID + m.MessageID)
100+
//couldn't obtain lock; bail bail bail!
101+
if lock == nil {
102+
return
103+
}
104+
defer lock.Release(ctx)
92105

93106
g, err := s.State.Guild(m.GuildID)
94107
if err != nil {
@@ -157,6 +170,13 @@ func (bot *Bot) handleReactionGameStartAdd(s *discordgo.Session, m *discordgo.Me
157170
//relevant discord api requests are fully applied successfully. Otherwise, we can issue multiple requests for
158171
//the same mute/unmute, erroneously
159172
func (bot *Bot) handleVoiceStateChange(s *discordgo.Session, m *discordgo.VoiceStateUpdate) {
173+
lock := bot.RedisInterface.LockSnowflake(m.ChannelID + m.UserID + m.SessionID)
174+
//couldn't obtain lock; bail bail bail!
175+
if lock == nil {
176+
return
177+
}
178+
defer lock.Release(ctx)
179+
160180
sett := bot.StorageInterface.GetGuildSettings(m.GuildID)
161181
gsr := GameStateRequest{
162182
GuildID: m.GuildID,
@@ -241,7 +261,7 @@ func (bot *Bot) handleNewGameMessage(s *discordgo.Session, m *discordgo.MessageC
241261

242262
dgs.ConnectCode = connectCode
243263

244-
bot.RedisInterface.AppendToActiveGames(m.GuildID, connectCode)
264+
bot.RedisInterface.RefreshActiveGame(m.GuildID, connectCode)
245265

246266
killChan := make(chan EndGameMessage)
247267

discord/redis.go

+36-11
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,13 @@ var ctx = context.Background()
1616
const LockTimeoutSecs = 3
1717
const LinearBackoffMs = 200
1818
const MaxRetries = 10
19+
const SnowflakeLockMs = 100
1920

2021
const SecsPerHour = 3600
2122

23+
//10 minute threshold for games to not be loaded up
24+
const StaleGameThresholdSec = 600
25+
2226
type RedisInterface struct {
2327
client *redis.Client
2428
}
@@ -87,6 +91,10 @@ func matchIDKey() string {
8791
return "automuteus:match:counter"
8892
}
8993

94+
func snowflakeLockID(snowflake string) string {
95+
return "automuteus:snowflake:" + snowflake + ":lock"
96+
}
97+
9098
func (redisInterface *RedisInterface) GetAndIncrementMatchID() int64 {
9199
num, err := redisInterface.client.Incr(ctx, matchIDKey()).Result()
92100
if err != nil {
@@ -297,15 +305,16 @@ func (redisInterface *RedisInterface) SetDiscordGameState(data *DiscordGameState
297305
}
298306
}
299307

300-
func (redisInterface *RedisInterface) AppendToActiveGames(guildID, connectCode string) {
308+
func (redisInterface *RedisInterface) RefreshActiveGame(guildID, connectCode string) {
301309
key := activeGamesKey(guildID)
302-
303-
count, err := redisInterface.client.SAdd(ctx, key, connectCode).Result()
310+
t := time.Now().Unix()
311+
_, err := redisInterface.client.ZAdd(ctx, key, &redis.Z{
312+
Score: float64(t),
313+
Member: connectCode,
314+
}).Result()
304315

305316
if err != nil {
306317
log.Println(err)
307-
} else {
308-
log.Printf("Active games: %d", count)
309318
}
310319
}
311320

@@ -319,19 +328,23 @@ func (redisInterface *RedisInterface) RemoveOldGame(guildID, connectCode string)
319328
}
320329

321330
//only deletes from the guild's responsibility, NOT the entire guild counter!
322-
func (redisInterface *RedisInterface) LoadAllActiveGamesAndDelete(guildID string) []string {
331+
func (redisInterface *RedisInterface) LoadAllActiveGames(guildID string) []string {
323332
hash := activeGamesKey(guildID)
324333

325-
games, err := redisInterface.client.SMembers(ctx, hash).Result()
334+
before := time.Now().Add(-time.Second * StaleGameThresholdSec).Unix()
335+
336+
games, err := redisInterface.client.ZRangeByScore(ctx, hash, &redis.ZRangeBy{
337+
Min: fmt.Sprintf("%d", before),
338+
Max: fmt.Sprintf("%d", time.Now().Unix()),
339+
Offset: 0,
340+
Count: 0,
341+
}).Result()
342+
326343
if err != nil {
327344
log.Println(err)
328345
return []string{}
329346
}
330347

331-
_, err = redisInterface.client.Del(ctx, hash).Result()
332-
if err != nil {
333-
log.Println(err)
334-
}
335348
return games
336349
}
337350

@@ -449,6 +462,18 @@ func (redisInterface *RedisInterface) setUsernameOrUserIDMappings(guildID, key s
449462
return redisInterface.client.HSet(ctx, cacheHash, key, jBytes).Err()
450463
}
451464

465+
func (redisInterface *RedisInterface) LockSnowflake(snowflake string) *redislock.Lock {
466+
locker := redislock.New(redisInterface.client)
467+
lock, err := locker.Obtain(ctx, snowflakeLockID(snowflake), time.Millisecond*SnowflakeLockMs, nil)
468+
if err == redislock.ErrNotObtained {
469+
return nil
470+
} else if err != nil {
471+
log.Println(err)
472+
return nil
473+
}
474+
return lock
475+
}
476+
452477
func (redisInterface *RedisInterface) Close() error {
453478
return redisInterface.client.Close()
454479
}

main.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -170,11 +170,10 @@ func discordMainWrapper() error {
170170
bot := discord.MakeAndStartBot(version, commit, discordToken, discordToken2, url, emojiGuildID, numShards, shardID, &redisClient, &storageInterface, logPath, captureTimeout)
171171

172172
<-sc
173-
bot.GracefulClose()
173+
//bot.GracefulClose()
174174
log.Printf("Received Sigterm or Kill signal. Bot will terminate in 1 second")
175175
time.Sleep(time.Second)
176176

177177
bot.Close()
178-
redisClient.Close()
179178
return nil
180179
}

0 commit comments

Comments
 (0)