diff --git a/MixinServices/MixinServices/Database/Task/BlazeMessageDAO.swift b/MixinServices/MixinServices/Database/Task/BlazeMessageDAO.swift index ab68047940..aaba89486f 100644 --- a/MixinServices/MixinServices/Database/Task/BlazeMessageDAO.swift +++ b/MixinServices/MixinServices/Database/Task/BlazeMessageDAO.swift @@ -9,7 +9,7 @@ public final class BlazeMessageDAO { } public func save(messageId: String, conversationId: String, data: Data, createdAt: String) -> Bool { - let msg = MessageBlaze(messageId: messageId, message: data, createdAt: createdAt) + let msg = MessageBlaze(messageId: messageId, message: data, conversationId: conversationId, createdAt: createdAt) return TaskDatabase.current.save(msg) } @@ -28,13 +28,20 @@ public final class BlazeMessageDAO { TaskDatabase.current.select(where: MessageBlaze.column(of: .messageId) == messageId) } - public func getBlazeMessageData(createdAt: String? = nil, limit: Int) -> [BlazeMessageData] { + public func getBlazeMessages(createdAt: String? = nil, limit: Int) -> [MessageBlaze] { let condition: SQLSpecificExpressible? if let createdAt = createdAt { condition = MessageBlaze.column(of: .createdAt) <= createdAt } else { condition = nil } + return TaskDatabase.current.select(where: condition, + order: [MessageBlaze.column(of: .createdAt).asc], + limit: limit) + } + + public func getBlazeMessageData(conversationId: String, limit: Int) -> [BlazeMessageData] { + let condition: SQLSpecificExpressible = MessageBlaze.column(of: .conversationId) == conversationId let data: [Data] = TaskDatabase.current.select(column: MessageBlaze.column(of: .message), from: MessageBlaze.self, where: condition, @@ -45,9 +52,15 @@ public final class BlazeMessageDAO { } } - public func delete(data: BlazeMessageData) { + public func delete(messageId: String) { + TaskDatabase.current.delete(MessageBlaze.self, + where: MessageBlaze.column(of: .messageId) == messageId) + } + + public func delete(messageIds: [String]) { TaskDatabase.current.delete(MessageBlaze.self, - where: MessageBlaze.column(of: .messageId) == data.messageId) + where: messageIds.contains(MessageBlaze.column(of: .messageId))) } + } diff --git a/MixinServices/MixinServices/Database/Task/MessageBlaze.swift b/MixinServices/MixinServices/Database/Task/MessageBlaze.swift index 41da31da20..ae66feb783 100644 --- a/MixinServices/MixinServices/Database/Task/MessageBlaze.swift +++ b/MixinServices/MixinServices/Database/Task/MessageBlaze.swift @@ -1,19 +1,21 @@ import Foundation import GRDB -struct MessageBlaze { +public struct MessageBlaze { public let messageId: String public let message: Data + public let conversationId: String public let createdAt: String } extension MessageBlaze: Codable, DatabaseColumnConvertible, MixinFetchableRecord, MixinEncodableRecord { - enum CodingKeys: String, CodingKey { + public enum CodingKeys: String, CodingKey { case messageId = "_id" case message + case conversationId = "conversation_id" case createdAt = "created_at" } diff --git a/MixinServices/MixinServices/Database/Task/TaskDatabase.swift b/MixinServices/MixinServices/Database/Task/TaskDatabase.swift index da3d3d8c17..5d27e8067c 100644 --- a/MixinServices/MixinServices/Database/Task/TaskDatabase.swift +++ b/MixinServices/MixinServices/Database/Task/TaskDatabase.swift @@ -24,6 +24,7 @@ public class TaskDatabase: Database { let messageBlaze = ColumnMigratableTableDefinition(constraints: nil, columns: [ .init(key: .messageId, constraints: "TEXT PRIMARY KEY"), .init(key: .message, constraints: "BLOB NOT NULL"), + .init(key: .conversationId, constraints: "TEXT NOT NULL"), .init(key: .createdAt, constraints: "TEXT NOT NULL"), ]) try self.migrateTable(with: messageBlaze, into: db) @@ -35,6 +36,16 @@ public class TaskDatabase: Database { try db.execute(sql: "DROP INDEX IF EXISTS messages_blaze_conversation_indexs") } + migrator.registerMigration("batch_process_messages") { (db) in + let infos = try TableInfo.fetchAll(db, sql: "PRAGMA table_info(messages_blaze)") + let columnNames = infos.map(\.name) + if !columnNames.contains("conversation_id") { + try db.execute(sql: "ALTER TABLE messages_blaze ADD COLUMN conversation_id TEXT NOT NULL DEFAULT ''") + } + + try db.execute(sql: "CREATE INDEX IF NOT EXISTS index_conversation_messages ON messages_blaze(conversation_id, created_at)") + } + return migrator } diff --git a/MixinServices/MixinServices/Database/User/DAO/ConversationDAO.swift b/MixinServices/MixinServices/Database/User/DAO/ConversationDAO.swift index ad89bf481e..804a5aefb4 100644 --- a/MixinServices/MixinServices/Database/User/DAO/ConversationDAO.swift +++ b/MixinServices/MixinServices/Database/User/DAO/ConversationDAO.swift @@ -46,6 +46,16 @@ public final class ConversationDAO: UserDatabaseDAO { return value > 0 } + public func isBotConversation(conversationId: String) -> Bool { + let sql = """ + SELECT 1 FROM conversations c + INNER JOIN users u ON u.user_id = c.owner_id AND u.app_id IS NOT NULL AND u.identity_number > '0' + WHERE c.conversation_id = ? AND c.category = 'CONTACT' + """ + let value: Int64 = db.select(with: sql, arguments: [conversationId]) ?? 0 + return value > 0 + } + public func getUnreadMessageCount() -> Int { let sql = "SELECT ifnull(SUM(unseen_message_count),0) FROM conversations WHERE category IS NOT NULL" return db.select(with: sql) ?? 0 @@ -84,6 +94,25 @@ public final class ConversationDAO: UserDatabaseDAO { return db.select(with: sql) } + public func updateUnseenMessageCount(database: GRDB.Database, conversationId: String) throws { + let sql = """ + UPDATE conversations SET unseen_message_count = ( + SELECT count(*) FROM messages + WHERE conversation_id = ? AND status = 'DELIVERED' AND user_id != ? + ) WHERE conversation_id = ? + """ + try database.execute(sql: sql, + arguments: [conversationId, myUserId, conversationId]) + } + + public func updateLastMessage(database: GRDB.Database, conversationId: String, messageId: String, createdAt: String) throws { + let sql = """ + UPDATE conversations SET last_message_id = ?, last_message_created_at = ? WHERE conversation_id = ? + """ + try database.execute(sql: sql, + arguments: [messageId, createdAt, conversationId]) + } + public func isExist(conversationId: String) -> Bool { db.recordExists(in: Conversation.self, where: Conversation.column(of: .conversationId) == conversationId) diff --git a/MixinServices/MixinServices/Database/User/DAO/MessageDAO.swift b/MixinServices/MixinServices/Database/User/DAO/MessageDAO.swift index 099d08f9b4..7caa1733c0 100644 --- a/MixinServices/MixinServices/Database/User/DAO/MessageDAO.swift +++ b/MixinServices/MixinServices/Database/User/DAO/MessageDAO.swift @@ -82,12 +82,6 @@ public final class MessageDAO: UserDatabaseDAO { WHERE a.album_id = messages.album_id AND s.name = messages.name ) WHERE category LIKE '%_STICKER' AND ifnull(sticker_id, '') = '' """ - private static let sqlUpdateUnseenMessageCount = """ - UPDATE conversations SET unseen_message_count = ( - SELECT count(*) FROM messages - WHERE conversation_id = ? AND status = 'DELIVERED' AND user_id != ? - ) WHERE conversation_id = ? - """ private let updateMediaStatusQueue = DispatchQueue(label: "one.mixin.services.queue.media.status.queue") @@ -201,6 +195,15 @@ public final class MessageDAO: UserDatabaseDAO { db.recordExists(in: Message.self, where: Message.column(of: .messageId) == messageId) } + public func getExistMessageIds(messageIds: [String]) -> [String] { + guard !messageIds.isEmpty else { + return [] + } + return db.select(column: Message.column(of: .messageId), + from: Message.self, + where: messageIds.contains(Message.column(of: .messageId))) + } + public func batchUpdateMessageStatus(readMessageIds: [String], mentionMessageIds: [String]) { var readMessageIds = readMessageIds var readMessages: [Message] = [] @@ -229,7 +232,7 @@ public final class MessageDAO: UserDatabaseDAO { .filter(readMessageIds.contains(Message.column(of: .messageId))) .updateAll(db, [Message.column(of: .status).set(to: MessageStatus.READ.rawValue)]) for conversationId in conversationIds { - try MessageDAO.shared.updateUnseenMessageCount(database: db, conversationId: conversationId) + try ConversationDAO.shared.updateUnseenMessageCount(database: db, conversationId: conversationId) } } @@ -294,7 +297,7 @@ public final class MessageDAO: UserDatabaseDAO { try Message .filter(Message.column(of: .messageId) == messageId) .updateAll(db, [Message.column(of: .status).set(to: status)]) - try updateUnseenMessageCount(database: db, conversationId: conversationId) + try ConversationDAO.shared.updateUnseenMessageCount(database: db, conversationId: conversationId) if let completion = completion { db.afterNextTransactionCommit(completion) } @@ -309,11 +312,6 @@ public final class MessageDAO: UserDatabaseDAO { return true } - public func updateUnseenMessageCount(database: GRDB.Database, conversationId: String) throws { - try database.execute(sql: Self.sqlUpdateUnseenMessageCount, - arguments: [conversationId, myUserId, conversationId]) - } - @discardableResult public func updateMediaMessage(messageId: String, assignments: [ColumnAssignment], completion: Database.Completion? = nil) -> Bool { let condition = Message.column(of: .messageId) == messageId @@ -511,6 +509,15 @@ public final class MessageDAO: UserDatabaseDAO { return db.select(with: sql, arguments: [conversationId, count]) } + public func getMessages(messageIds: [String]) -> [MessageItem] { + let sql = """ + \(Self.sqlQueryFullMessage) + WHERE m.id in ('\(messageIds.joined(separator: "', '"))') + ORDER BY m.created_at DESC + """ + return db.select(with: sql, arguments: []) + } + public func getFirstNMessages(conversationId: String, count: Int) -> [MessageItem] { db.select(with: MessageDAO.sqlQueryFirstNMessages, arguments: [conversationId, count]) } @@ -594,7 +601,8 @@ public final class MessageDAO: UserDatabaseDAO { if shouldInsertIntoFTSTable { try insertFTSContent(database, message: message, children: children) } - try MessageDAO.shared.updateUnseenMessageCount(database: database, conversationId: message.conversationId) + try ConversationDAO.shared.updateUnseenMessageCount(database: database, conversationId: message.conversationId) + try ConversationDAO.shared.updateLastMessage(database: database, conversationId: message.conversationId, messageId: message.messageId, createdAt: message.createdAt) database.afterNextTransactionCommit { (_) in // Dispatch to global queue to prevent deadlock @@ -695,7 +703,7 @@ public final class MessageDAO: UserDatabaseDAO { } if status == MessageStatus.FAILED.rawValue { - try MessageDAO.shared.updateUnseenMessageCount(database: database, conversationId: conversationId) + try ConversationDAO.shared.updateUnseenMessageCount(database: database, conversationId: conversationId) } @@ -802,7 +810,7 @@ extension MessageDAO { && Message.column(of: .category) != MessageCategory.MESSAGE_RECALL.rawValue let changes = try Message.filter(condition).updateAll(db, assignments) if changes > 0 { - try MessageDAO.shared.updateUnseenMessageCount(database: db, conversationId: conversationId) + try ConversationDAO.shared.updateUnseenMessageCount(database: db, conversationId: conversationId) newMessage = try MessageItem.fetchOne(db, sql: MessageDAO.sqlQueryFullMessageById, arguments: [messageId], adapter: nil) } @@ -954,7 +962,7 @@ extension MessageDAO { extension MessageDAO { - private func insertFTSContent(_ db: GRDB.Database, message: Message, children: [TranscriptMessage]?) throws { + public func insertFTSContent(_ db: GRDB.Database, message: Message, children: [TranscriptMessage]?) throws { let shouldInsertIntoFTSTable = AppGroupUserDefaults.Database.isFTSInitialized && message.status != MessageStatus.FAILED.rawValue && MessageCategory.ftsAvailableCategoryStrings.contains(message.category) diff --git a/MixinServices/MixinServices/Database/User/DAO/MessageHistoryDAO.swift b/MixinServices/MixinServices/Database/User/DAO/MessageHistoryDAO.swift index dc591d3294..e070309ffc 100644 --- a/MixinServices/MixinServices/Database/User/DAO/MessageHistoryDAO.swift +++ b/MixinServices/MixinServices/Database/User/DAO/MessageHistoryDAO.swift @@ -9,6 +9,15 @@ public final class MessageHistoryDAO: UserDatabaseDAO { where: MessageHistory.column(of: .messageId) == messageId) } + public func getExistMessageIds(messageIds: [String]) -> [String] { + guard !messageIds.isEmpty else { + return [] + } + return db.select(column: MessageHistory.column(of: .messageId), + from: MessageHistory.self, + where: messageIds.contains(MessageHistory.column(of: .messageId))) + } + public func replaceMessageHistory(messageId: String) { let history = MessageHistory(messageId: messageId) db.save(history) diff --git a/MixinServices/MixinServices/Database/User/DAO/UserDAO.swift b/MixinServices/MixinServices/Database/User/DAO/UserDAO.swift index d332dcb447..4ee43c4ec6 100644 --- a/MixinServices/MixinServices/Database/User/DAO/UserDAO.swift +++ b/MixinServices/MixinServices/Database/User/DAO/UserDAO.swift @@ -35,6 +35,15 @@ public final class UserDAO: UserDatabaseDAO { db.recordExists(in: User.self, where: User.column(of: .userId) == userId) } + public func getExistUserIds(userIds: [String]) -> [String] { + guard !userIds.isEmpty else { + return [] + } + return db.select(column: User.column(of: .userId), + from: User.self, + where: userIds.contains(User.column(of: .userId))) + } + public func getBlockUsers() -> [UserItem] { let sql = "\(Self.sqlQueryColumns) WHERE relationship = 'BLOCKING'" return db.select(with: sql) @@ -188,10 +197,13 @@ public final class UserDAO: UserDatabaseDAO { } public func mentionRepresentation(identityNumbers: [String]) -> [String: String] { - db.select(keyColumn: User.column(of: .identityNumber), - valueColumn: User.column(of: .fullName), - from: User.self, - where: identityNumbers.contains(User.column(of: .identityNumber))) + guard !identityNumbers.isEmpty else { + return [:] + } + return db.select(keyColumn: User.column(of: .identityNumber), + valueColumn: User.column(of: .fullName), + from: User.self, + where: identityNumbers.contains(User.column(of: .identityNumber))) } public func userIds(identityNumbers: [String]) -> [String] { diff --git a/MixinServices/MixinServices/Database/User/Model/Message.swift b/MixinServices/MixinServices/Database/User/Model/Message.swift index a0eaead698..944426257e 100644 --- a/MixinServices/MixinServices/Database/User/Model/Message.swift +++ b/MixinServices/MixinServices/Database/User/Model/Message.swift @@ -329,6 +329,17 @@ public enum MessageCategory: String, Decodable { public static let allMediaCategoriesString: Set = Set(allMediaCategories.map(\.rawValue)) + public static let allBotCategories: [MessageCategory] = [ + .PLAIN_TEXT, .PLAIN_IMAGE, .PLAIN_VIDEO, .PLAIN_DATA, + .PLAIN_STICKER, .PLAIN_CONTACT, .PLAIN_AUDIO, .PLAIN_LIVE, + .PLAIN_POST, .PLAIN_LOCATION, .PLAIN_TRANSCRIPT, + .ENCRYPTED_TEXT, .ENCRYPTED_IMAGE, .ENCRYPTED_VIDEO, .ENCRYPTED_DATA, + .ENCRYPTED_STICKER, .ENCRYPTED_CONTACT, .ENCRYPTED_AUDIO, .ENCRYPTED_LIVE, + .ENCRYPTED_POST, .ENCRYPTED_LOCATION, .ENCRYPTED_TRANSCRIPT + ] + + public static let allBotCategoriesString: Set = Set(allBotCategories.map(\.rawValue)) + public static let endCallCategories: [MessageCategory] = [ .WEBRTC_AUDIO_END, .WEBRTC_AUDIO_BUSY, diff --git a/MixinServices/MixinServices/Database/User/UserDatabase.swift b/MixinServices/MixinServices/Database/User/UserDatabase.swift index 150fc76d73..a78c4f14b8 100644 --- a/MixinServices/MixinServices/Database/User/UserDatabase.swift +++ b/MixinServices/MixinServices/Database/User/UserDatabase.swift @@ -328,14 +328,7 @@ public final class UserDatabase: Database { UPDATE conversations SET last_message_id = (select id from messages where conversation_id = old.conversation_id order by created_at DESC limit 1) WHERE conversation_id = old.conversation_id; END """ - let lastMessageUpdate = """ - CREATE TRIGGER IF NOT EXISTS conversation_last_message_update AFTER INSERT ON messages - BEGIN - UPDATE conversations SET last_message_id = new.id, last_message_created_at = new.created_at WHERE conversation_id = new.conversation_id; - END - """ try db.execute(sql: lastMessageDelete) - try db.execute(sql: lastMessageUpdate) } migrator.registerMigration("fts5_v3_2") { (db) in @@ -475,6 +468,10 @@ public final class UserDatabase: Database { } } + migrator.registerMigration("batch_process_messages") { (db) in + try db.execute(sql: "DROP TRIGGER IF EXISTS conversation_last_message_update") + } + return migrator } diff --git a/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift b/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift index 1e1cf1f0f9..fbe4bae20f 100644 --- a/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift +++ b/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift @@ -1,6 +1,7 @@ import Foundation import UIKit import SDWebImage +import GRDB public protocol CallMessageCoordinator: AnyObject { func shouldSendRtcBlazeMessage(with category: MessageCategory) -> Bool @@ -91,23 +92,55 @@ public class ReceiveMessageService: MixinService { return } else if let createdAt = BlazeMessageDAO.shared.getMessageBlaze(messageId: messageId)?.createdAt { repeat { - let blazeMessageDatas = BlazeMessageDAO.shared.getBlazeMessageData(createdAt: createdAt, limit: 50) - guard blazeMessageDatas.count > 0 else { + let blazeMessages = BlazeMessageDAO.shared.getBlazeMessages(createdAt: createdAt, limit: 50) + guard blazeMessages.count > 0 else { callback(nil) return } - - for data in blazeMessageDatas { + + var nonBotMessages = [BlazeMessageData]() + var botMessages = [String: BlazeMessageData]() + + for blazeMessage in blazeMessages { + guard let data = try? JSONDecoder.default.decode(BlazeMessageData.self, from: blazeMessage.message) else { + ReceiveMessageService.shared.processBadMessage(messageId: blazeMessage.messageId) + continue + } + + if MessageCategory.allBotCategoriesString.contains(data.category) && !blazeMessage.conversationId.isEmpty { + if botMessages[data.conversationId] == nil { + botMessages[data.conversationId] = data + } + } else { + nonBotMessages.append(data) + } + } + + for blazeMessage in nonBotMessages { guard !AppGroupUserDefaults.isRunningInMainApp, !extensionTimeWillExpire() else { callback(nil) return } - ReceiveMessageService.shared.processReceiveMessage(data: data) - if data.messageId == messageId { + ReceiveMessageService.shared.processReceiveMessage(data: blazeMessage) + if blazeMessage.messageId == messageId { callback(MessageDAO.shared.getFullMessage(messageId: messageId)) return } } + + for blazeMessage in botMessages.values { + ReceiveMessageService.shared.processBotMessages(data: blazeMessage) { messageIds in + guard !AppGroupUserDefaults.isRunningInMainApp, !extensionTimeWillExpire() else { + callback(nil) + return false + } + guard !messageIds.contains(messageId) else { + callback(MessageDAO.shared.getFullMessage(messageId: messageId)) + return false + } + return true + } + } } while true } else if let message = MessageDAO.shared.getFullMessage(messageId: messageId) { callback(message) @@ -159,36 +192,240 @@ public class ReceiveMessageService: MixinService { } var finishedJobCount = 0 - + + func updateProgress(remainJobCount: Int, finishedJobCount: Int) { + guard displaySyncProcess else { + return + } + var progress = Int(Float(finishedJobCount) / Float(remainJobCount + finishedJobCount) * 100) + if progress > 100 { + progress = 100 + } + NotificationCenter.default.post(onMainThread: Self.progressNotification, + object: self, + userInfo: [Self.UserInfoKey.progress: progress]) + } + repeat { guard LoginManager.shared.isLoggedIn, !MixinService.isStopProcessMessages else { return } - let blazeMessageDatas = BlazeMessageDAO.shared.getBlazeMessageData(limit: 50) - guard blazeMessageDatas.count > 0 else { + let blazeMessages = BlazeMessageDAO.shared.getBlazeMessages(limit: 50) + guard blazeMessages.count > 0 else { return } - + let remainJobCount = BlazeMessageDAO.shared.getCount() if remainJobCount + finishedJobCount > 500 { displaySyncProcess = true - let progress = blazeMessageDatas.count == 0 ? 100 : Int(Float(finishedJobCount) / Float(remainJobCount + finishedJobCount) * 100) - NotificationCenter.default.post(onMainThread: Self.progressNotification, - object: self, - userInfo: [Self.UserInfoKey.progress: progress]) + updateProgress(remainJobCount: remainJobCount, finishedJobCount: finishedJobCount) } - - for data in blazeMessageDatas { + + var nonBotMessages = [BlazeMessageData]() + var botMessages = [String: BlazeMessageData]() + + for blazeMessage in blazeMessages { + guard let data = try? JSONDecoder.default.decode(BlazeMessageData.self, from: blazeMessage.message) else { + finishedJobCount += 1 + ReceiveMessageService.shared.processBadMessage(messageId: blazeMessage.messageId) + continue + } + + if MessageCategory.allBotCategoriesString.contains(data.category) && !blazeMessage.conversationId.isEmpty { + if botMessages[data.conversationId] == nil { + botMessages[data.conversationId] = data + } + } else { + nonBotMessages.append(data) + } + } + + for blazeMessage in nonBotMessages { if MixinService.isStopProcessMessages { return } - ReceiveMessageService.shared.processReceiveMessage(data: data) + ReceiveMessageService.shared.processReceiveMessage(data: blazeMessage) + } + finishedJobCount += nonBotMessages.count + + for blazeMessage in botMessages.values { + ReceiveMessageService.shared.processBotMessages(data: blazeMessage) { messageIds in + finishedJobCount += messageIds.count + updateProgress(remainJobCount: remainJobCount, finishedJobCount: finishedJobCount) + return true + } } - - finishedJobCount += blazeMessageDatas.count } while true } } + + private func processBotMessages(data: BlazeMessageData, processBlock: ([String]) -> Bool) { + let conversationId = data.conversationId + ReceiveMessageService.shared.syncConversation(data: data) + guard ConversationDAO.shared.isBotConversation(conversationId: conversationId) else { + // plain message in group chat + ReceiveMessageService.shared.processReceiveMessage(data: data) + return + } + + ReceiveMessageService.shared.checkSession(data: data) + _ = ReceiveMessageService.shared.syncUser(userId: data.userId) + + let pageCount = 200 + var loopEnd = false + repeat { + if MixinService.isStopProcessMessages { + return + } + let blazeMessages = BlazeMessageDAO.shared.getBlazeMessageData(conversationId: conversationId, limit: pageCount) + guard let lastMessage = blazeMessages.last else { + return + } + loopEnd = blazeMessages.count < pageCount + + ReceiveMessageService.shared.syncUsers(userIds: blazeMessages.map{ $0.userId }) + + if MixinService.isStopProcessMessages { + return + } + + let blazeMessageDict = Dictionary(uniqueKeysWithValues: blazeMessages.map { ($0.messageId, $0) }) + let messageIds = blazeMessages.map{ $0.messageId } + let existMessageIds = MessageDAO.shared.getExistMessageIds(messageIds: messageIds) + let existHistoryIds = MessageHistoryDAO.shared.getExistMessageIds(messageIds: messageIds) + + var jobs = [Job]() + var downloadJobs = [AttachmentDownloadJob]() + var pairMessages = [(Message, [TranscriptMessage]?)]() + var sharedContactIds = [String]() + + for blazeMessage in blazeMessages { + let messageId = blazeMessage.messageId + let ackBlazeMessage = BlazeMessage(ackBlazeMessage: messageId, status: MessageStatus.DELIVERED.rawValue) + jobs.append(Job(jobId: ackBlazeMessage.id, action: .SEND_DELIVERED_ACK_MESSAGE, blazeMessage: ackBlazeMessage)) + + guard !(existMessageIds.contains(messageId) || existHistoryIds.contains(messageId)) else { + pairMessages.append((makeUnknownMessage(data: blazeMessage), nil)) + continue + } + + var decryptedData: Data? + if blazeMessage.category.hasPrefix("PLAIN_") { + decryptedData = Data(base64Encoded: blazeMessage.data) + } else { + decryptedData = parseEncryptedMessage(data: data) + } + + if let decryptedData = decryptedData, let (message, children) = makeDecryptMessage(data: blazeMessage, decryptedData: decryptedData) { + if message.category.hasSuffix("_CONTACT"), let sharedUserId = message.sharedUserId { + sharedContactIds.append(sharedUserId) + } + if message.category.hasSuffix("_AUDIO") { + downloadJobs.append(AttachmentDownloadJob(message: message)) + } + pairMessages.append((message, children)) + } else { + pairMessages.append((makeUnknownMessage(data: blazeMessage), nil)) + } + } + + ReceiveMessageService.shared.syncUsers(userIds: sharedContactIds) + + let transcriptMessages = pairMessages.compactMap { $0.1 }.flatMap { $0 } + var ftsMessages = [(Message, [TranscriptMessage]?)]() + if AppGroupUserDefaults.Database.isFTSInitialized { + ftsMessages = pairMessages.filter{ MessageCategory.ftsAvailableCategoryStrings.contains($0.0.category) } + } + let messages = pairMessages.compactMap { $0.0 } + + if MixinService.isStopProcessMessages { + return + } + + UserDatabase.current.write { db in + try messages.save(db) + try transcriptMessages.save(db) + try jobs.save(db) + + for message in messages { + guard message.category.hasSuffix("_TEXT") || message.quoteMessageId != nil else { + continue + } + + var quotedMessage: MessageItem? + if let quoteMessageId = message.quoteMessageId { + quotedMessage = try MessageItem.fetchOne(db, sql: MessageDAO.sqlQueryQuoteMessageById, arguments: [quoteMessageId], adapter: nil) + } + if let quoted = quotedMessage, let quoteContent = try? JSONEncoder.default.encode(quoted) { + let assignments: [ColumnAssignment] = [ + Message.column(of: .quoteContent).set(to: quoteContent) + ] + + try Message + .filter(Message.column(of: .messageId) == message.messageId) + .updateAll(db, assignments) + } + + if let mention = MessageMention(message: message, quotedMessage: quotedMessage) { + try mention.save(db) + } + } + + for (ftsMessage, childrenMessages) in ftsMessages { + try MessageDAO.shared.insertFTSContent(db, message: ftsMessage, children: childrenMessages) + } + + try ConversationDAO.shared.updateUnseenMessageCount(database: db, conversationId: conversationId) + try ConversationDAO.shared.updateLastMessage(database: db, conversationId: conversationId, messageId: lastMessage.messageId, createdAt: lastMessage.createdAt) + + db.afterNextTransactionCommit { _ in + DispatchQueue.global().async { + for job in downloadJobs { + ConcurrentJobQueue.shared.addJob(job: job) + } + + if isAppExtension { + if AppGroupUserDefaults.isRunningInMainApp { + DarwinNotificationManager.shared.notifyConversationDidChangeInMainApp() + } + if AppGroupUserDefaults.User.currentConversationId == data.conversationId { + AppGroupUserDefaults.User.reloadConversation = true + } + } else { + guard let conversation = ConversationDAO.shared.getConversation(conversationId: conversationId), conversation.status == ConversationStatus.SUCCESS.rawValue else { + return + } + + if conversation.isMuted && AppGroupUserDefaults.User.currentConversationId != conversationId { + let change = ConversationChange(conversationId: conversationId, action: .reload) + NotificationCenter.default.post(onMainThread: conversationDidChangeNotification, object: change) + } else { + let messageIds = messages.map{ $0.messageId } + let messages = MessageDAO.shared.getMessages(messageIds: messageIds) + for newMessage in messages { + var userInfo: [String: Any] = [ + MessageDAO.UserInfoKey.conversationId: newMessage.conversationId, + MessageDAO.UserInfoKey.message: newMessage, + ] + if let data = blazeMessageDict[newMessage.messageId] { + userInfo[MessageDAO.UserInfoKey.messsageSource] = data.source + userInfo[MessageDAO.UserInfoKey.silentNotification] = data.silentNotification + } + NotificationCenter.default.post(onMainThread: MessageDAO.didInsertMessageNotification, object: self, userInfo: userInfo) + } + } + } + } + } + } + + BlazeMessageDAO.shared.delete(messageIds: messageIds) + + if !processBlock(messageIds) { + return + } + } while LoginManager.shared.isLoggedIn && !loopEnd + } private func processReceiveMessage(data: BlazeMessageData) { guard LoginManager.shared.isLoggedIn else { @@ -196,7 +433,7 @@ public class ReceiveMessageService: MixinService { } if MessageDAO.shared.isExist(messageId: data.messageId) || MessageHistoryDAO.shared.isExist(messageId: data.messageId) { - ReceiveMessageService.shared.processBadMessage(data: data) + ReceiveMessageService.shared.processBadMessage(messageId: data.messageId) return } @@ -223,7 +460,7 @@ public class ReceiveMessageService: MixinService { ReceiveMessageService.shared.processUnknownMessage(data: data) ReceiveMessageService.shared.updateRemoteMessageStatus(messageId: data.messageId, status: .DELIVERED) } - BlazeMessageDAO.shared.delete(data: data) + BlazeMessageDAO.shared.delete(messageId: data.messageId) } private func checkSession(data: BlazeMessageData) { @@ -244,8 +481,8 @@ public class ReceiveMessageService: MixinService { UserDatabase.current.save(session) } } - - private func processUnknownMessage(data: BlazeMessageData) { + + private func makeUnknownMessage(data: BlazeMessageData) -> Message { var unknownMessage = Message.createMessage(messageId: data.messageId, category: data.category, conversationId: data.conversationId, @@ -253,12 +490,17 @@ public class ReceiveMessageService: MixinService { userId: data.getSenderId()) unknownMessage.status = MessageStatus.UNKNOWN.rawValue unknownMessage.content = data.data - MessageDAO.shared.insertMessage(message: unknownMessage, messageSource: data.source, silentNotification: data.silentNotification) + return unknownMessage + } + + private func processUnknownMessage(data: BlazeMessageData) { + MessageDAO.shared.insertMessage(message: makeUnknownMessage(data: data), messageSource: data.source, silentNotification: data.silentNotification) } - private func processBadMessage(data: BlazeMessageData) { - ReceiveMessageService.shared.updateRemoteMessageStatus(messageId: data.messageId, status: .DELIVERED) - BlazeMessageDAO.shared.delete(data: data) + private func processBadMessage(messageId: String) { + MessageHistoryDAO.shared.replaceMessageHistory(messageId: messageId) + ReceiveMessageService.shared.updateRemoteMessageStatus(messageId: messageId, status: .DELIVERED) + BlazeMessageDAO.shared.delete(messageId: messageId) } private func processCallMessage(data: BlazeMessageData) { @@ -520,10 +762,7 @@ public class ReceiveMessageService: MixinService { } } - private func processEncryptedMessage(data: BlazeMessageData) { - guard data.category.hasPrefix("ENCRYPTED_") else { - return - } + private func parseEncryptedMessage(data: BlazeMessageData) -> Data? { guard let cipher = Data(base64Encoded: data.data), let pk = RequestSigning.edDSAPrivateKey, @@ -539,18 +778,28 @@ public class ReceiveMessageService: MixinService { ] Logger.conversation(id: data.conversationId).error(category: "EncryptedBotMessage", message: "Failed to decrypt", userInfo: info) reporter.report(error: MixinServicesError.decryptBotMessage(info)) - updateRemoteMessageStatus(messageId: data.messageId, status: .DELIVERED) - ReceiveMessageService.shared.processUnknownMessage(data: data) - return + return nil } do { - let decryptedData = try EncryptedProtocol.decrypt(cipher: cipher, with: pk, sessionId: mySessionId) - _ = syncUser(userId: data.getSenderId()) - processDecryptSuccess(data: data, decryptedData: decryptedData) + return try EncryptedProtocol.decrypt(cipher: cipher, with: pk, sessionId: mySessionId) } catch { reporter.report(error: error) + return nil + } + } + + private func processEncryptedMessage(data: BlazeMessageData) { + guard data.category.hasPrefix("ENCRYPTED_") else { + return + } + guard let decryptedData = parseEncryptedMessage(data: data) else { + updateRemoteMessageStatus(messageId: data.messageId, status: .DELIVERED) ReceiveMessageService.shared.processUnknownMessage(data: data) + return } + + _ = syncUser(userId: data.getSenderId()) + processDecryptSuccess(data: data, decryptedData: decryptedData) updateRemoteMessageStatus(messageId: data.messageId, status: .DELIVERED) } @@ -578,24 +827,20 @@ public class ReceiveMessageService: MixinService { } } - private func processDecryptSuccess(data: BlazeMessageData, decryptedData: Data) { + private func makeDecryptMessage(data: BlazeMessageData, decryptedData: Data) -> (Message, [TranscriptMessage]?)? { if data.category.hasSuffix("_TEXT") || data.category.hasSuffix("_POST") { guard let content = String(data: decryptedData, encoding: .utf8) else { - ReceiveMessageService.shared.processUnknownMessage(data: data) - return + return nil } - let message = Message.createMessage(textMessage: content, data: data) - MessageDAO.shared.insertMessage(message: message, messageSource: data.source, silentNotification: data.silentNotification) + return (Message.createMessage(textMessage: content, data: data), nil) } else if data.category.hasSuffix("_IMAGE") || data.category.hasSuffix("_VIDEO") { guard let transferMediaData = (try? JSONDecoder.default.decode(TransferAttachmentData.self, from: decryptedData)) else { Logger.conversation(id: data.conversationId).error(category: "DecryptSuccess", message: "Invalid data for category: \(data.category), data: \(String(data: decryptedData, encoding: .utf8))") - ReceiveMessageService.shared.processUnknownMessage(data: data) - return + return nil } guard let height = transferMediaData.height, let width = transferMediaData.width, height > 0, width > 0 else { Logger.conversation(id: data.conversationId).error(category: "DecryptSuccess", message: "Invalid TransferAttachmentData for category: \(data.category), data: \(String(data: decryptedData, encoding: .utf8))") - ReceiveMessageService.shared.processUnknownMessage(data: data) - return + return nil } if transferMediaData.mimeType?.isEmpty ?? true { @@ -610,86 +855,84 @@ public class ReceiveMessageService: MixinService { reporter.report(error: error) } - let message = Message.createMessage(mediaData: transferMediaData, data: data) - MessageDAO.shared.insertMessage(message: message, messageSource: data.source, silentNotification: data.silentNotification) + return (Message.createMessage(mediaData: transferMediaData, data: data), nil) } else if data.category.hasSuffix("_LIVE") { guard let live = (try? JSONDecoder.default.decode(TransferLiveData.self, from: decryptedData)) else { Logger.conversation(id: data.conversationId).error(category: "DecryptSuccess", message: "Invalid TransferLiveData for category: \(data.category), data: \(String(data: decryptedData, encoding: .utf8))") - ReceiveMessageService.shared.processUnknownMessage(data: data) - return + return nil } - let message = Message.createMessage(liveData: live, + return (Message.createMessage(liveData: live, content: String(data: decryptedData, encoding: .utf8), - data: data) - MessageDAO.shared.insertMessage(message: message, messageSource: data.source, silentNotification: data.silentNotification) + data: data), nil) } else if data.category.hasSuffix("_DATA") { guard let transferMediaData = (try? JSONDecoder.default.decode(TransferAttachmentData.self, from: decryptedData)) else { Logger.conversation(id: data.conversationId).error(category: "DecryptSuccess", message: "Invalid TransferAttachmentData for category: \(data.category), data: \(String(data: decryptedData, encoding: .utf8))") - ReceiveMessageService.shared.processUnknownMessage(data: data) - return + return nil } guard transferMediaData.size > 0 else { - ReceiveMessageService.shared.processUnknownMessage(data: data) - return + return nil } - let message = Message.createMessage(mediaData: transferMediaData, data: data) - MessageDAO.shared.insertMessage(message: message, messageSource: data.source, silentNotification: data.silentNotification) + return (Message.createMessage(mediaData: transferMediaData, data: data), nil) } else if data.category.hasSuffix("_AUDIO") { guard let transferMediaData = (try? JSONDecoder.default.decode(TransferAttachmentData.self, from: decryptedData)) else { Logger.conversation(id: data.conversationId).error(category: "DecryptSuccess", message: "Invalid TransferAttachmentData for category: \(data.category), data: \(String(data: decryptedData, encoding: .utf8))") - ReceiveMessageService.shared.processUnknownMessage(data: data) - return + return nil } - let message = Message.createMessage(mediaData: transferMediaData, data: data) - MessageDAO.shared.insertMessage(message: message, messageSource: data.source, silentNotification: data.silentNotification) - let job = AttachmentDownloadJob(message: message) - ConcurrentJobQueue.shared.addJob(job: job) + return (Message.createMessage(mediaData: transferMediaData, data: data), nil) } else if data.category.hasSuffix("_STICKER") { guard let transferStickerData = parseSticker(data: data, decryptedData: decryptedData) else { - return + return nil } - let message = Message.createMessage(stickerData: transferStickerData, data: data) - MessageDAO.shared.insertMessage(message: message, messageSource: data.source, silentNotification: data.silentNotification) + return (Message.createMessage(stickerData: transferStickerData, data: data), nil) } else if data.category.hasSuffix("_CONTACT") { guard let transferData = (try? JSONDecoder.default.decode(TransferContactData.self, from: decryptedData)) else { Logger.conversation(id: data.conversationId).error(category: "DecryptSuccess", message: "Invalid TransferContactData for category: \(data.category), data: \(String(data: decryptedData, encoding: .utf8))") - ReceiveMessageService.shared.processUnknownMessage(data: data) - return + return nil } guard !transferData.userId.isEmpty, UUID(uuidString: transferData.userId) != nil else { Logger.conversation(id: data.conversationId).error(category: "DecryptSuccess", message: "Invalid TransferContactData for category: \(data.category), data: \(String(data: decryptedData, encoding: .utf8))") - ReceiveMessageService.shared.processUnknownMessage(data: data) - return - } - guard syncUser(userId: transferData.userId) else { - return + return nil } - let message = Message.createMessage(contactData: transferData, data: data) - MessageDAO.shared.insertMessage(message: message, messageSource: data.source, silentNotification: data.silentNotification) + return (Message.createMessage(contactData: transferData, data: data), nil) } else if data.category.hasSuffix("_LOCATION") { guard (try? JSONDecoder.default.decode(Location.self, from: decryptedData)) != nil else { - ReceiveMessageService.shared.processUnknownMessage(data: data) - return + return nil } guard let content = String(data: decryptedData, encoding: .utf8) else { - ReceiveMessageService.shared.processUnknownMessage(data: data) - return + return nil } - let message = Message.createLocationMessage(content: content, data: data) - MessageDAO.shared.insertMessage(message: message, messageSource: data.source, silentNotification: data.silentNotification) + return (Message.createLocationMessage(content: content, data: data), nil) } else if data.category.hasSuffix("_TRANSCRIPT") { guard let (content, children, hasAttachment) = parseTranscript(decryptedData: decryptedData, transcriptId: data.messageId) else { - ReceiveMessageService.shared.processUnknownMessage(data: data) - return + return nil } guard !children.isEmpty else { - ReceiveMessageService.shared.processUnknownMessage(data: data) - return + return nil } let message = Message.createTranscriptMessage(content: content, mediaStatus: hasAttachment ? .PENDING : .DONE, data: data) - MessageDAO.shared.insertMessage(message: message, children: children, messageSource: data.source, silentNotification: data.silentNotification) + return (message, children) + } + + return nil + } + + private func processDecryptSuccess(data: BlazeMessageData, decryptedData: Data) { + guard let (message, children) = makeDecryptMessage(data: data, decryptedData: decryptedData) else { + ReceiveMessageService.shared.processUnknownMessage(data: data) + return + } + + if data.category.hasSuffix("_CONTACT"), let sharedUserId = message.sharedUserId { + checkUser(userId: sharedUserId, tryAgain: true) + } + + MessageDAO.shared.insertMessage(message: message, children: children, messageSource: data.source, silentNotification: data.silentNotification) + + if data.category.hasSuffix("_AUDIO") { + let job = AttachmentDownloadJob(message: message) + ConcurrentJobQueue.shared.addJob(job: job) } } @@ -795,6 +1038,7 @@ public class ReceiveMessageService: MixinService { MessageDAO.shared.updateLiveMessage(liveData: liveData, content: plainText.base64Decoded(), status: Message.getStatus(data: data), messageId: messageId, category: data.category, conversationId: data.conversationId, messageSource: data.source, silentNotification: data.silentNotification) case MessageCategory.SIGNAL_STICKER.rawValue: guard let decryptedData = plainText.data(using: .utf8), let transferStickerData = parseSticker(data: data, decryptedData: decryptedData) else { + ReceiveMessageService.shared.processUnknownMessage(data: data) return } MessageDAO.shared.updateStickerMessage(stickerData: transferStickerData, status: Message.getStatus(data: data), messageId: messageId, category: data.category, conversationId: data.conversationId, messageSource: data.source, silentNotification: data.silentNotification) @@ -840,14 +1084,12 @@ public class ReceiveMessageService: MixinService { private func parseSticker(data: BlazeMessageData, decryptedData: Data) -> TransferStickerData? { guard let transferStickerData = (try? JSONDecoder.default.decode(TransferStickerData.self, from: decryptedData)) else { Logger.conversation(id: data.conversationId).error(category: "ParseSticker", message: "Invalid TransferStickerData: \(String(data: decryptedData, encoding: .utf8))") - ReceiveMessageService.shared.processUnknownMessage(data: data) return nil } if let stickerId = transferStickerData.stickerId { guard !stickerId.isEmpty, UUID(uuidString: stickerId) != nil else { Logger.conversation(id: data.conversationId).error(category: "ParseSticker", message: "Invalid TransferStickerData: \(String(data: decryptedData, encoding: .utf8))") - ReceiveMessageService.shared.processUnknownMessage(data: data) return nil } guard !StickerDAO.shared.isExist(stickerId: stickerId) else { @@ -1017,6 +1259,30 @@ public class ReceiveMessageService: MixinService { return false } + + private func syncUsers(userIds: [String]) { + guard userIds.count > 0 else { + return + } + let ids = Array(Set(userIds)).filter({ $0 != User.systemUser && $0 != currentAccountId && !$0.isEmpty }) + let existUserIds = UserDAO.shared.getExistUserIds(userIds: ids) + let syncUserIds = Array(Set(ids).subtracting(Set(existUserIds))) + guard syncUserIds.count > 0 else { + return + } + + repeat { + switch UserSessionAPI.showUsers(userIds: syncUserIds) { + case let .success(users): + UserDAO.shared.updateUsers(users: users) + return + case .failure(.unauthorized): + return + case .failure: + checkNetworkAndWebSocket() + } + } while LoginManager.shared.isLoggedIn + } private func processPlainMessage(data: BlazeMessageData) { guard data.category.hasPrefix("PLAIN_") else { diff --git a/MixinServices/MixinServices/Services/WebSocket/Service/SendMessageService.swift b/MixinServices/MixinServices/Services/WebSocket/Service/SendMessageService.swift index 776e3b4368..828927daea 100644 --- a/MixinServices/MixinServices/Services/WebSocket/Service/SendMessageService.swift +++ b/MixinServices/MixinServices/Services/WebSocket/Service/SendMessageService.swift @@ -229,7 +229,7 @@ public class SendMessageService: MixinService { try jobs.insert(db) try db.execute(sql: "UPDATE messages SET status = '\(MessageStatus.READ.rawValue)' WHERE conversation_id = ? AND status = ? AND user_id != ? AND ROWID <= ?", arguments: [conversationId, MessageStatus.DELIVERED.rawValue, myUserId, lastRowID]) - try MessageDAO.shared.updateUnseenMessageCount(database: db, conversationId: conversationId) + try ConversationDAO.shared.updateUnseenMessageCount(database: db, conversationId: conversationId) if isLastLoop { db.afterNextTransactionCommit { (_) in NotificationCenter.default.post(name: MixinService.messageReadStatusDidChangeNotification, object: self)