Skip to content

Batch process bot messages #842

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
21 changes: 17 additions & 4 deletions MixinServices/MixinServices/Database/Task/BlazeMessageDAO.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public final class BlazeMessageDAO {
}

public func save(messageId: String, conversationId: String, data: Data, createdAt: String) -> Bool {
let msg = MessageBlaze(messageId: messageId, message: data, createdAt: createdAt)
let msg = MessageBlaze(messageId: messageId, message: data, conversationId: conversationId, createdAt: createdAt)
return TaskDatabase.current.save(msg)
}

Expand All @@ -28,13 +28,20 @@ public final class BlazeMessageDAO {
TaskDatabase.current.select(where: MessageBlaze.column(of: .messageId) == messageId)
}

public func getBlazeMessageData(createdAt: String? = nil, limit: Int) -> [BlazeMessageData] {
public func getBlazeMessages(createdAt: String? = nil, limit: Int) -> [MessageBlaze] {
let condition: SQLSpecificExpressible?
if let createdAt = createdAt {
condition = MessageBlaze.column(of: .createdAt) <= createdAt
} else {
condition = nil
}
return TaskDatabase.current.select(where: condition,
order: [MessageBlaze.column(of: .createdAt).asc],
limit: limit)
}

public func getBlazeMessageData(conversationId: String, limit: Int) -> [BlazeMessageData] {
let condition: SQLSpecificExpressible = MessageBlaze.column(of: .conversationId) == conversationId
let data: [Data] = TaskDatabase.current.select(column: MessageBlaze.column(of: .message),
from: MessageBlaze.self,
where: condition,
Expand All @@ -45,9 +52,15 @@ public final class BlazeMessageDAO {
}
}

public func delete(data: BlazeMessageData) {
public func delete(messageId: String) {
TaskDatabase.current.delete(MessageBlaze.self,
where: MessageBlaze.column(of: .messageId) == messageId)
}

public func delete(messageIds: [String]) {
TaskDatabase.current.delete(MessageBlaze.self,
where: MessageBlaze.column(of: .messageId) == data.messageId)
where: messageIds.contains(MessageBlaze.column(of: .messageId)))
}


}
6 changes: 4 additions & 2 deletions MixinServices/MixinServices/Database/Task/MessageBlaze.swift
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import Foundation
import GRDB

struct MessageBlaze {
public struct MessageBlaze {

public let messageId: String
public let message: Data
public let conversationId: String
public let createdAt: String

}

extension MessageBlaze: Codable, DatabaseColumnConvertible, MixinFetchableRecord, MixinEncodableRecord {

enum CodingKeys: String, CodingKey {
public enum CodingKeys: String, CodingKey {
case messageId = "_id"
case message
case conversationId = "conversation_id"
case createdAt = "created_at"
}

Expand Down
11 changes: 11 additions & 0 deletions MixinServices/MixinServices/Database/Task/TaskDatabase.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class TaskDatabase: Database {
let messageBlaze = ColumnMigratableTableDefinition<MessageBlaze>(constraints: nil, columns: [
.init(key: .messageId, constraints: "TEXT PRIMARY KEY"),
.init(key: .message, constraints: "BLOB NOT NULL"),
.init(key: .conversationId, constraints: "TEXT NOT NULL"),
.init(key: .createdAt, constraints: "TEXT NOT NULL"),
])
try self.migrateTable(with: messageBlaze, into: db)
Expand All @@ -35,6 +36,16 @@ public class TaskDatabase: Database {
try db.execute(sql: "DROP INDEX IF EXISTS messages_blaze_conversation_indexs")
}

migrator.registerMigration("batch_process_messages") { (db) in
let infos = try TableInfo.fetchAll(db, sql: "PRAGMA table_info(messages_blaze)")
let columnNames = infos.map(\.name)
if !columnNames.contains("conversation_id") {
try db.execute(sql: "ALTER TABLE messages_blaze ADD COLUMN conversation_id TEXT NOT NULL DEFAULT ''")
}

try db.execute(sql: "CREATE INDEX IF NOT EXISTS index_conversation_messages ON messages_blaze(conversation_id, created_at)")
}

return migrator
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ public final class ConversationDAO: UserDatabaseDAO {
return value > 0
}

public func isBotConversation(conversationId: String) -> Bool {
let sql = """
SELECT 1 FROM conversations c
INNER JOIN users u ON u.user_id = c.owner_id AND u.app_id IS NOT NULL AND u.identity_number > '0'
WHERE c.conversation_id = ? AND c.category = 'CONTACT'
"""
let value: Int64 = db.select(with: sql, arguments: [conversationId]) ?? 0
return value > 0
}

public func getUnreadMessageCount() -> Int {
let sql = "SELECT ifnull(SUM(unseen_message_count),0) FROM conversations WHERE category IS NOT NULL"
return db.select(with: sql) ?? 0
Expand Down Expand Up @@ -84,6 +94,25 @@ public final class ConversationDAO: UserDatabaseDAO {
return db.select(with: sql)
}

public func updateUnseenMessageCount(database: GRDB.Database, conversationId: String) throws {
let sql = """
UPDATE conversations SET unseen_message_count = (
SELECT count(*) FROM messages
WHERE conversation_id = ? AND status = 'DELIVERED' AND user_id != ?
) WHERE conversation_id = ?
"""
try database.execute(sql: sql,
arguments: [conversationId, myUserId, conversationId])
}

public func updateLastMessage(database: GRDB.Database, conversationId: String, messageId: String, createdAt: String) throws {
let sql = """
UPDATE conversations SET last_message_id = ?, last_message_created_at = ? WHERE conversation_id = ?
"""
try database.execute(sql: sql,
arguments: [messageId, createdAt, conversationId])
}

public func isExist(conversationId: String) -> Bool {
db.recordExists(in: Conversation.self,
where: Conversation.column(of: .conversationId) == conversationId)
Expand Down
42 changes: 25 additions & 17 deletions MixinServices/MixinServices/Database/User/DAO/MessageDAO.swift
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,6 @@ public final class MessageDAO: UserDatabaseDAO {
WHERE a.album_id = messages.album_id AND s.name = messages.name
) WHERE category LIKE '%_STICKER' AND ifnull(sticker_id, '') = ''
"""
private static let sqlUpdateUnseenMessageCount = """
UPDATE conversations SET unseen_message_count = (
SELECT count(*) FROM messages
WHERE conversation_id = ? AND status = 'DELIVERED' AND user_id != ?
) WHERE conversation_id = ?
"""

private let updateMediaStatusQueue = DispatchQueue(label: "one.mixin.services.queue.media.status.queue")

Expand Down Expand Up @@ -201,6 +195,15 @@ public final class MessageDAO: UserDatabaseDAO {
db.recordExists(in: Message.self, where: Message.column(of: .messageId) == messageId)
}

public func getExistMessageIds(messageIds: [String]) -> [String] {
guard !messageIds.isEmpty else {
return []
}
return db.select(column: Message.column(of: .messageId),
from: Message.self,
where: messageIds.contains(Message.column(of: .messageId)))
}

public func batchUpdateMessageStatus(readMessageIds: [String], mentionMessageIds: [String]) {
var readMessageIds = readMessageIds
var readMessages: [Message] = []
Expand Down Expand Up @@ -229,7 +232,7 @@ public final class MessageDAO: UserDatabaseDAO {
.filter(readMessageIds.contains(Message.column(of: .messageId)))
.updateAll(db, [Message.column(of: .status).set(to: MessageStatus.READ.rawValue)])
for conversationId in conversationIds {
try MessageDAO.shared.updateUnseenMessageCount(database: db, conversationId: conversationId)
try ConversationDAO.shared.updateUnseenMessageCount(database: db, conversationId: conversationId)
}
}

Expand Down Expand Up @@ -294,7 +297,7 @@ public final class MessageDAO: UserDatabaseDAO {
try Message
.filter(Message.column(of: .messageId) == messageId)
.updateAll(db, [Message.column(of: .status).set(to: status)])
try updateUnseenMessageCount(database: db, conversationId: conversationId)
try ConversationDAO.shared.updateUnseenMessageCount(database: db, conversationId: conversationId)
if let completion = completion {
db.afterNextTransactionCommit(completion)
}
Expand All @@ -309,11 +312,6 @@ public final class MessageDAO: UserDatabaseDAO {
return true
}

public func updateUnseenMessageCount(database: GRDB.Database, conversationId: String) throws {
try database.execute(sql: Self.sqlUpdateUnseenMessageCount,
arguments: [conversationId, myUserId, conversationId])
}

@discardableResult
public func updateMediaMessage(messageId: String, assignments: [ColumnAssignment], completion: Database.Completion? = nil) -> Bool {
let condition = Message.column(of: .messageId) == messageId
Expand Down Expand Up @@ -511,6 +509,15 @@ public final class MessageDAO: UserDatabaseDAO {
return db.select(with: sql, arguments: [conversationId, count])
}

public func getMessages(messageIds: [String]) -> [MessageItem] {
let sql = """
\(Self.sqlQueryFullMessage)
WHERE m.id in ('\(messageIds.joined(separator: "', '"))')
ORDER BY m.created_at DESC
"""
return db.select(with: sql, arguments: [])
}

public func getFirstNMessages(conversationId: String, count: Int) -> [MessageItem] {
db.select(with: MessageDAO.sqlQueryFirstNMessages, arguments: [conversationId, count])
}
Expand Down Expand Up @@ -594,7 +601,8 @@ public final class MessageDAO: UserDatabaseDAO {
if shouldInsertIntoFTSTable {
try insertFTSContent(database, message: message, children: children)
}
try MessageDAO.shared.updateUnseenMessageCount(database: database, conversationId: message.conversationId)
try ConversationDAO.shared.updateUnseenMessageCount(database: database, conversationId: message.conversationId)
try ConversationDAO.shared.updateLastMessage(database: database, conversationId: message.conversationId, messageId: message.messageId, createdAt: message.createdAt)

database.afterNextTransactionCommit { (_) in
// Dispatch to global queue to prevent deadlock
Expand Down Expand Up @@ -695,7 +703,7 @@ public final class MessageDAO: UserDatabaseDAO {
}

if status == MessageStatus.FAILED.rawValue {
try MessageDAO.shared.updateUnseenMessageCount(database: database, conversationId: conversationId)
try ConversationDAO.shared.updateUnseenMessageCount(database: database, conversationId: conversationId)
}


Expand Down Expand Up @@ -802,7 +810,7 @@ extension MessageDAO {
&& Message.column(of: .category) != MessageCategory.MESSAGE_RECALL.rawValue
let changes = try Message.filter(condition).updateAll(db, assignments)
if changes > 0 {
try MessageDAO.shared.updateUnseenMessageCount(database: db, conversationId: conversationId)
try ConversationDAO.shared.updateUnseenMessageCount(database: db, conversationId: conversationId)
newMessage = try MessageItem.fetchOne(db, sql: MessageDAO.sqlQueryFullMessageById, arguments: [messageId], adapter: nil)
}

Expand Down Expand Up @@ -954,7 +962,7 @@ extension MessageDAO {

extension MessageDAO {

private func insertFTSContent(_ db: GRDB.Database, message: Message, children: [TranscriptMessage]?) throws {
public func insertFTSContent(_ db: GRDB.Database, message: Message, children: [TranscriptMessage]?) throws {
let shouldInsertIntoFTSTable = AppGroupUserDefaults.Database.isFTSInitialized
&& message.status != MessageStatus.FAILED.rawValue
&& MessageCategory.ftsAvailableCategoryStrings.contains(message.category)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ public final class MessageHistoryDAO: UserDatabaseDAO {
where: MessageHistory.column(of: .messageId) == messageId)
}

public func getExistMessageIds(messageIds: [String]) -> [String] {
guard !messageIds.isEmpty else {
return []
}
return db.select(column: MessageHistory.column(of: .messageId),
from: MessageHistory.self,
where: messageIds.contains(MessageHistory.column(of: .messageId)))
}

public func replaceMessageHistory(messageId: String) {
let history = MessageHistory(messageId: messageId)
db.save(history)
Expand Down
20 changes: 16 additions & 4 deletions MixinServices/MixinServices/Database/User/DAO/UserDAO.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ public final class UserDAO: UserDatabaseDAO {
db.recordExists(in: User.self, where: User.column(of: .userId) == userId)
}

public func getExistUserIds(userIds: [String]) -> [String] {
guard !userIds.isEmpty else {
return []
}
return db.select(column: User.column(of: .userId),
from: User.self,
where: userIds.contains(User.column(of: .userId)))
}

public func getBlockUsers() -> [UserItem] {
let sql = "\(Self.sqlQueryColumns) WHERE relationship = 'BLOCKING'"
return db.select(with: sql)
Expand Down Expand Up @@ -188,10 +197,13 @@ public final class UserDAO: UserDatabaseDAO {
}

public func mentionRepresentation(identityNumbers: [String]) -> [String: String] {
db.select(keyColumn: User.column(of: .identityNumber),
valueColumn: User.column(of: .fullName),
from: User.self,
where: identityNumbers.contains(User.column(of: .identityNumber)))
guard !identityNumbers.isEmpty else {
return [:]
}
return db.select(keyColumn: User.column(of: .identityNumber),
valueColumn: User.column(of: .fullName),
from: User.self,
where: identityNumbers.contains(User.column(of: .identityNumber)))
}

public func userIds(identityNumbers: [String]) -> [String] {
Expand Down
11 changes: 11 additions & 0 deletions MixinServices/MixinServices/Database/User/Model/Message.swift
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,17 @@ public enum MessageCategory: String, Decodable {

public static let allMediaCategoriesString: Set<String> = Set(allMediaCategories.map(\.rawValue))

public static let allBotCategories: [MessageCategory] = [
.PLAIN_TEXT, .PLAIN_IMAGE, .PLAIN_VIDEO, .PLAIN_DATA,
.PLAIN_STICKER, .PLAIN_CONTACT, .PLAIN_AUDIO, .PLAIN_LIVE,
.PLAIN_POST, .PLAIN_LOCATION, .PLAIN_TRANSCRIPT,
.ENCRYPTED_TEXT, .ENCRYPTED_IMAGE, .ENCRYPTED_VIDEO, .ENCRYPTED_DATA,
.ENCRYPTED_STICKER, .ENCRYPTED_CONTACT, .ENCRYPTED_AUDIO, .ENCRYPTED_LIVE,
.ENCRYPTED_POST, .ENCRYPTED_LOCATION, .ENCRYPTED_TRANSCRIPT
]

public static let allBotCategoriesString: Set<String> = Set(allBotCategories.map(\.rawValue))

public static let endCallCategories: [MessageCategory] = [
.WEBRTC_AUDIO_END,
.WEBRTC_AUDIO_BUSY,
Expand Down
11 changes: 4 additions & 7 deletions MixinServices/MixinServices/Database/User/UserDatabase.swift
Original file line number Diff line number Diff line change
Expand Up @@ -328,14 +328,7 @@ public final class UserDatabase: Database {
UPDATE conversations SET last_message_id = (select id from messages where conversation_id = old.conversation_id order by created_at DESC limit 1) WHERE conversation_id = old.conversation_id;
END
"""
let lastMessageUpdate = """
CREATE TRIGGER IF NOT EXISTS conversation_last_message_update AFTER INSERT ON messages
BEGIN
UPDATE conversations SET last_message_id = new.id, last_message_created_at = new.created_at WHERE conversation_id = new.conversation_id;
END
"""
try db.execute(sql: lastMessageDelete)
try db.execute(sql: lastMessageUpdate)
}

migrator.registerMigration("fts5_v3_2") { (db) in
Expand Down Expand Up @@ -475,6 +468,10 @@ public final class UserDatabase: Database {
}
}

migrator.registerMigration("batch_process_messages") { (db) in
try db.execute(sql: "DROP TRIGGER IF EXISTS conversation_last_message_update")
}

return migrator
}

Expand Down
Loading