From 954fdf47bfbe14385e9907e1e95109fa758d6797 Mon Sep 17 00:00:00 2001 From: Jun Tang Date: Sun, 31 Oct 2021 15:41:26 +0800 Subject: [PATCH 01/12] Batch process bot messages --- .../Database/Task/BlazeMessageDAO.swift | 20 +- .../Database/Task/MessageBlaze.swift | 2 + .../Database/Task/TaskDatabase.swift | 11 + .../Database/User/DAO/ConversationDAO.swift | 19 ++ .../Database/User/DAO/MessageDAO.swift | 33 ++- .../Database/User/DAO/MessageHistoryDAO.swift | 9 + .../Database/User/DAO/UserDAO.swift | 20 +- .../Database/User/Model/Message.swift | 11 + .../Database/User/UserDatabase.swift | 11 +- .../Service/ReceiveMessageService.swift | 271 ++++++++++++++---- .../Service/SendMessageService.swift | 2 +- 11 files changed, 320 insertions(+), 89 deletions(-) diff --git a/MixinServices/MixinServices/Database/Task/BlazeMessageDAO.swift b/MixinServices/MixinServices/Database/Task/BlazeMessageDAO.swift index ab68047940..8430542ec8 100644 --- a/MixinServices/MixinServices/Database/Task/BlazeMessageDAO.swift +++ b/MixinServices/MixinServices/Database/Task/BlazeMessageDAO.swift @@ -9,7 +9,7 @@ public final class BlazeMessageDAO { } public func save(messageId: String, conversationId: String, data: Data, createdAt: String) -> Bool { - let msg = MessageBlaze(messageId: messageId, message: data, createdAt: createdAt) + let msg = MessageBlaze(messageId: messageId, message: data, conversationId: conversationId, createdAt: createdAt) return TaskDatabase.current.save(msg) } @@ -45,9 +45,27 @@ public final class BlazeMessageDAO { } } + public func getBlazeMessageData(conversationId: String, limit: Int) -> [BlazeMessageData] { + let condition: SQLSpecificExpressible = MessageBlaze.column(of: .conversationId) == conversationId + let data: [Data] = TaskDatabase.current.select(column: MessageBlaze.column(of: .message), + from: MessageBlaze.self, + where: condition, + order: [MessageBlaze.column(of: .createdAt).asc], + limit: limit) + return data.compactMap { (data) -> BlazeMessageData? in + try? JSONDecoder.default.decode(BlazeMessageData.self, from: data) + } + } + public func delete(data: BlazeMessageData) { TaskDatabase.current.delete(MessageBlaze.self, where: MessageBlaze.column(of: .messageId) == data.messageId) } + public func delete(messageIds: [String]) { + TaskDatabase.current.delete(MessageBlaze.self, + where: messageIds.contains(MessageBlaze.column(of: .messageId))) + } + + } diff --git a/MixinServices/MixinServices/Database/Task/MessageBlaze.swift b/MixinServices/MixinServices/Database/Task/MessageBlaze.swift index 41da31da20..b2158a39b2 100644 --- a/MixinServices/MixinServices/Database/Task/MessageBlaze.swift +++ b/MixinServices/MixinServices/Database/Task/MessageBlaze.swift @@ -5,6 +5,7 @@ struct MessageBlaze { public let messageId: String public let message: Data + public let conversationId: String public let createdAt: String } @@ -14,6 +15,7 @@ extension MessageBlaze: Codable, DatabaseColumnConvertible, MixinFetchableRecord enum CodingKeys: String, CodingKey { case messageId = "_id" case message + case conversationId = "conversation_id" case createdAt = "created_at" } diff --git a/MixinServices/MixinServices/Database/Task/TaskDatabase.swift b/MixinServices/MixinServices/Database/Task/TaskDatabase.swift index a6023b64ab..c6715af454 100644 --- a/MixinServices/MixinServices/Database/Task/TaskDatabase.swift +++ b/MixinServices/MixinServices/Database/Task/TaskDatabase.swift @@ -24,6 +24,7 @@ public class TaskDatabase: Database { let messageBlaze = ColumnMigratableTableDefinition(constraints: nil, columns: [ .init(key: .messageId, constraints: "TEXT PRIMARY KEY"), .init(key: .message, constraints: "BLOB NOT NULL"), + .init(key: .conversationId, constraints: "TEXT NOT NULL"), .init(key: .createdAt, constraints: "TEXT NOT NULL"), ]) try self.migrateTable(with: messageBlaze, into: db) @@ -35,6 +36,16 @@ public class TaskDatabase: Database { try db.execute(sql: "DROP INDEX IF EXISTS messages_blaze_conversation_indexs") } + migrator.registerMigration("batch_process_messages") { (db) in + let infos = try TableInfo.fetchAll(db, sql: "PRAGMA table_info(messages_blaze)") + let columnNames = infos.map(\.name) + if !columnNames.contains("conversation_id") { + try db.execute(sql: "ALTER TABLE messages_blaze ADD COLUMN conversation_id TEXT") + } + + try db.execute(sql: "CREATE INDEX IF NOT EXISTS index_conversation_messages ON messages_blaze(conversation_id, created_at)") + } + return migrator } diff --git a/MixinServices/MixinServices/Database/User/DAO/ConversationDAO.swift b/MixinServices/MixinServices/Database/User/DAO/ConversationDAO.swift index eb1e9eeafd..7cf398c26e 100644 --- a/MixinServices/MixinServices/Database/User/DAO/ConversationDAO.swift +++ b/MixinServices/MixinServices/Database/User/DAO/ConversationDAO.swift @@ -84,6 +84,25 @@ public final class ConversationDAO: UserDatabaseDAO { return db.select(with: sql) } + public func updateUnseenMessageCount(database: GRDB.Database, conversationId: String) throws { + let sql = """ + UPDATE conversations SET unseen_message_count = ( + SELECT count(*) FROM messages + WHERE conversation_id = ? AND status = 'DELIVERED' AND user_id != ? + ) WHERE conversation_id = ? + """ + try database.execute(sql: sql, + arguments: [conversationId, myUserId, conversationId]) + } + + public func updateLastMessage(database: GRDB.Database, conversationId: String, messageId: String, createdAt: String) throws { + let sql = """ + UPDATE conversations SET last_message_id = ?, last_message_created_at = ? WHERE conversation_id = ? + """ + try database.execute(sql: sql, + arguments: [messageId, createdAt, conversationId]) + } + public func isExist(conversationId: String) -> Bool { db.recordExists(in: Conversation.self, where: Conversation.column(of: .conversationId) == conversationId) diff --git a/MixinServices/MixinServices/Database/User/DAO/MessageDAO.swift b/MixinServices/MixinServices/Database/User/DAO/MessageDAO.swift index ec5e95d344..28db2792eb 100644 --- a/MixinServices/MixinServices/Database/User/DAO/MessageDAO.swift +++ b/MixinServices/MixinServices/Database/User/DAO/MessageDAO.swift @@ -81,12 +81,6 @@ public final class MessageDAO: UserDatabaseDAO { WHERE a.album_id = messages.album_id AND s.name = messages.name ) WHERE category LIKE '%_STICKER' AND ifnull(sticker_id, '') = '' """ - private static let sqlUpdateUnseenMessageCount = """ - UPDATE conversations SET unseen_message_count = ( - SELECT count(*) FROM messages - WHERE conversation_id = ? AND status = 'DELIVERED' AND user_id != ? - ) WHERE conversation_id = ? - """ private let updateMediaStatusQueue = DispatchQueue(label: "one.mixin.services.queue.media.status.queue") @@ -200,6 +194,15 @@ public final class MessageDAO: UserDatabaseDAO { db.recordExists(in: Message.self, where: Message.column(of: .messageId) == messageId) } + public func getExistMessageIds(messageIds: [String]) -> [String] { + guard messageIds.count > 0 else { + return [] + } + return db.select(column: Message.column(of: .messageId), + from: Message.self, + where: messageIds.contains(Message.column(of: .messageId))) + } + public func batchUpdateMessageStatus(readMessageIds: [String], mentionMessageIds: [String]) { var readMessageIds = readMessageIds var readMessages: [Message] = [] @@ -228,7 +231,7 @@ public final class MessageDAO: UserDatabaseDAO { .filter(readMessageIds.contains(Message.column(of: .messageId))) .updateAll(db, [Message.column(of: .status).set(to: MessageStatus.READ.rawValue)]) for conversationId in conversationIds { - try MessageDAO.shared.updateUnseenMessageCount(database: db, conversationId: conversationId) + try ConversationDAO.shared.updateUnseenMessageCount(database: db, conversationId: conversationId) } } @@ -293,7 +296,7 @@ public final class MessageDAO: UserDatabaseDAO { try Message .filter(Message.column(of: .messageId) == messageId) .updateAll(db, [Message.column(of: .status).set(to: status)]) - try updateUnseenMessageCount(database: db, conversationId: conversationId) + try ConversationDAO.shared.updateUnseenMessageCount(database: db, conversationId: conversationId) if let completion = completion { db.afterNextTransactionCommit(completion) } @@ -308,11 +311,6 @@ public final class MessageDAO: UserDatabaseDAO { return true } - public func updateUnseenMessageCount(database: GRDB.Database, conversationId: String) throws { - try database.execute(sql: Self.sqlUpdateUnseenMessageCount, - arguments: [conversationId, myUserId, conversationId]) - } - @discardableResult public func updateMediaMessage(messageId: String, assignments: [ColumnAssignment], completion: Database.Completion? = nil) -> Bool { let condition = Message.column(of: .messageId) == messageId @@ -585,7 +583,8 @@ public final class MessageDAO: UserDatabaseDAO { if shouldInsertIntoFTSTable { try insertFTSContent(database, message: message, children: children) } - try MessageDAO.shared.updateUnseenMessageCount(database: database, conversationId: message.conversationId) + try ConversationDAO.shared.updateUnseenMessageCount(database: database, conversationId: message.conversationId) + try ConversationDAO.shared.updateLastMessage(database: database, conversationId: message.conversationId, messageId: message.messageId, createdAt: message.createdAt) database.afterNextTransactionCommit { (_) in // Dispatch to global queue to prevent deadlock @@ -686,7 +685,7 @@ public final class MessageDAO: UserDatabaseDAO { } if status == MessageStatus.FAILED.rawValue { - try MessageDAO.shared.updateUnseenMessageCount(database: database, conversationId: conversationId) + try ConversationDAO.shared.updateUnseenMessageCount(database: database, conversationId: conversationId) } @@ -793,7 +792,7 @@ extension MessageDAO { && Message.column(of: .category) != MessageCategory.MESSAGE_RECALL.rawValue let changes = try Message.filter(condition).updateAll(db, assignments) if changes > 0 { - try MessageDAO.shared.updateUnseenMessageCount(database: db, conversationId: conversationId) + try ConversationDAO.shared.updateUnseenMessageCount(database: db, conversationId: conversationId) newMessage = try MessageItem.fetchOne(db, sql: MessageDAO.sqlQueryFullMessageById, arguments: [messageId], adapter: nil) } @@ -944,7 +943,7 @@ extension MessageDAO { extension MessageDAO { - private func insertFTSContent(_ db: GRDB.Database, message: Message, children: [TranscriptMessage]?) throws { + public func insertFTSContent(_ db: GRDB.Database, message: Message, children: [TranscriptMessage]?) throws { let shouldInsertIntoFTSTable = AppGroupUserDefaults.Database.isFTSInitialized && message.status != MessageStatus.FAILED.rawValue && MessageCategory.ftsAvailableCategoryStrings.contains(message.category) diff --git a/MixinServices/MixinServices/Database/User/DAO/MessageHistoryDAO.swift b/MixinServices/MixinServices/Database/User/DAO/MessageHistoryDAO.swift index dc591d3294..0363de14ea 100644 --- a/MixinServices/MixinServices/Database/User/DAO/MessageHistoryDAO.swift +++ b/MixinServices/MixinServices/Database/User/DAO/MessageHistoryDAO.swift @@ -9,6 +9,15 @@ public final class MessageHistoryDAO: UserDatabaseDAO { where: MessageHistory.column(of: .messageId) == messageId) } + public func getExistMessageIds(messageIds: [String]) -> [String] { + guard messageIds.count > 0 else { + return [] + } + return db.select(column: MessageHistory.column(of: .messageId), + from: MessageHistory.self, + where: messageIds.contains(MessageHistory.column(of: .messageId))) + } + public func replaceMessageHistory(messageId: String) { let history = MessageHistory(messageId: messageId) db.save(history) diff --git a/MixinServices/MixinServices/Database/User/DAO/UserDAO.swift b/MixinServices/MixinServices/Database/User/DAO/UserDAO.swift index 2bff5780ab..49d8afebb6 100644 --- a/MixinServices/MixinServices/Database/User/DAO/UserDAO.swift +++ b/MixinServices/MixinServices/Database/User/DAO/UserDAO.swift @@ -19,6 +19,12 @@ public final class UserDAO: UserDatabaseDAO { LEFT JOIN apps a ON a.app_id = u.app_id """ + public func isBotUser(userId: String) -> Bool { + db.recordExists(in: User.self, where: User.column(of: .userId) == userId + && User.column(of: .identityNumber) > "0" + && User.column(of: .appId) != nil) + } + public func deleteUser(userId: String) { db.delete(User.self, where: User.column(of: .userId) == userId) } @@ -35,6 +41,15 @@ public final class UserDAO: UserDatabaseDAO { db.recordExists(in: User.self, where: User.column(of: .userId) == userId) } + public func getExistUserIds(userIds: [String]) -> [String] { + guard userIds.count > 0 else { + return [] + } + return db.select(column: User.column(of: .userId), + from: User.self, + where: userIds.contains(User.column(of: .userId))) + } + public func getBlockUsers() -> [UserItem] { let sql = "\(Self.sqlQueryColumns) WHERE relationship = 'BLOCKING'" return db.select(with: sql) @@ -165,7 +180,10 @@ public final class UserDAO: UserDatabaseDAO { } public func mentionRepresentation(identityNumbers: [String]) -> [String: String] { - db.select(keyColumn: User.column(of: .identityNumber), + guard identityNumbers.count > 0 else { + return [:] + } + return db.select(keyColumn: User.column(of: .identityNumber), valueColumn: User.column(of: .fullName), from: User.self, where: identityNumbers.contains(User.column(of: .identityNumber))) diff --git a/MixinServices/MixinServices/Database/User/Model/Message.swift b/MixinServices/MixinServices/Database/User/Model/Message.swift index ab7c6ac1c5..29f851d71a 100644 --- a/MixinServices/MixinServices/Database/User/Model/Message.swift +++ b/MixinServices/MixinServices/Database/User/Model/Message.swift @@ -327,6 +327,17 @@ public enum MessageCategory: String, Decodable { public static let allMediaCategoriesString: Set = Set(allMediaCategories.map(\.rawValue)) + public static let allBotCategories: [MessageCategory] = [ + .PLAIN_TEXT, .PLAIN_IMAGE, .PLAIN_VIDEO, .PLAIN_DATA, + .PLAIN_STICKER, .PLAIN_CONTACT, .PLAIN_AUDIO, .PLAIN_LIVE, + .PLAIN_POST, .PLAIN_LOCATION, .PLAIN_TRANSCRIPT, + .ENCRYPTED_TEXT, .ENCRYPTED_IMAGE, .ENCRYPTED_VIDEO, .ENCRYPTED_DATA, + .ENCRYPTED_STICKER, .ENCRYPTED_CONTACT, .ENCRYPTED_AUDIO, .ENCRYPTED_LIVE, + .ENCRYPTED_POST, .ENCRYPTED_LOCATION, .ENCRYPTED_TRANSCRIPT + ] + + public static let allBotCategoriesString: Set = Set(allBotCategories.map(\.rawValue)) + public static let endCallCategories: [MessageCategory] = [ .WEBRTC_AUDIO_END, .WEBRTC_AUDIO_BUSY, diff --git a/MixinServices/MixinServices/Database/User/UserDatabase.swift b/MixinServices/MixinServices/Database/User/UserDatabase.swift index b576ba34cf..01612d355c 100644 --- a/MixinServices/MixinServices/Database/User/UserDatabase.swift +++ b/MixinServices/MixinServices/Database/User/UserDatabase.swift @@ -322,14 +322,7 @@ public final class UserDatabase: Database { UPDATE conversations SET last_message_id = (select id from messages where conversation_id = old.conversation_id order by created_at DESC limit 1) WHERE conversation_id = old.conversation_id; END """ - let lastMessageUpdate = """ - CREATE TRIGGER IF NOT EXISTS conversation_last_message_update AFTER INSERT ON messages - BEGIN - UPDATE conversations SET last_message_id = new.id, last_message_created_at = new.created_at WHERE conversation_id = new.conversation_id; - END - """ try db.execute(sql: lastMessageDelete) - try db.execute(sql: lastMessageUpdate) } migrator.registerMigration("fts5_v3_2") { (db) in @@ -420,6 +413,10 @@ public final class UserDatabase: Database { } } + migrator.registerMigration("batch_process_messages") { (db) in + try db.execute(sql: "DROP INDEX IF EXISTS conversation_last_message_update") + } + /* Remaining works: try db.execute(sql: "DROP INDEX IF EXISTS messages_unread_indexs") try db.execute(sql: "DROP INDEX IF EXISTS messages_user_indexs") diff --git a/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift b/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift index cffa359208..61919f4f04 100644 --- a/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift +++ b/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift @@ -1,6 +1,7 @@ import Foundation import UIKit import SDWebImage +import GRDB public protocol CallMessageCoordinator: AnyObject { func shouldSendRtcBlazeMessage(with category: MessageCategory) -> Bool @@ -158,8 +159,6 @@ public class ReceiveMessageService: MixinService { } while AppGroupUserDefaults.isProcessingMessagesInAppExtension && !MixinService.isStopProcessMessages } - var finishedJobCount = 0 - repeat { guard LoginManager.shared.isLoggedIn, !MixinService.isStopProcessMessages else { return @@ -169,26 +168,127 @@ public class ReceiveMessageService: MixinService { return } - let remainJobCount = BlazeMessageDAO.shared.getCount() - if remainJobCount + finishedJobCount > 500 { - displaySyncProcess = true - let progress = blazeMessageDatas.count == 0 ? 100 : Int(Float(finishedJobCount) / Float(remainJobCount + finishedJobCount) * 100) - NotificationCenter.default.post(onMainThread: Self.progressNotification, - object: self, - userInfo: [Self.UserInfoKey.progress: progress]) - } - for data in blazeMessageDatas { if MixinService.isStopProcessMessages { return } + if MessageCategory.allBotCategoriesString.contains(data.category) { + ReceiveMessageService.shared.processBotMessages(data: data) + break + } ReceiveMessageService.shared.processReceiveMessage(data: data) } - - finishedJobCount += blazeMessageDatas.count } while true } } + + private func processBotMessages(data: BlazeMessageData) { + ReceiveMessageService.shared.syncConversation(data: data) + ReceiveMessageService.shared.checkSession(data: data) + _ = ReceiveMessageService.shared.syncUser(userId: data.userId) + + guard UserDAO.shared.isBotUser(userId: data.userId) else { + // plain message in group chat + ReceiveMessageService.shared.processReceiveMessage(data: data) + return + } + + let pageCount = 200 + var blazeMessages = [BlazeMessageData]() + repeat { + if MixinService.isStopProcessMessages { + return + } + blazeMessages = BlazeMessageDAO.shared.getBlazeMessageData(conversationId: data.conversationId, limit: pageCount) + guard let lastMessage = blazeMessages.last else { + return + } + + ReceiveMessageService.shared.syncUsers(userIds: blazeMessages.map{ $0.userId }) + + let messageIds = blazeMessages.map{ $0.messageId } + let messageSet = Set(messageIds) + + let existMessageIds = MessageDAO.shared.getExistMessageIds(messageIds: messageIds) + let existHistoryIds = MessageHistoryDAO.shared.getExistMessageIds(messageIds: messageIds) + + let notExistIds = messageSet.subtracting(existMessageIds).union(messageSet.subtracting(existHistoryIds)) + let existIds = messageSet.subtracting(notExistIds) + + var jobs = [Job]() + var pairMessages = [(Message, [TranscriptMessage]?)]() + + for blazeMessage in blazeMessages { + let messageId = blazeMessage.messageId + let ackBlazeMessage = BlazeMessage(ackBlazeMessage: messageId, status: MessageStatus.DELIVERED.rawValue) + jobs.append(Job(jobId: ackBlazeMessage.id, action: .SEND_DELIVERED_ACK_MESSAGE, blazeMessage: ackBlazeMessage)) + + guard !(existMessageIds.contains(messageId) || existHistoryIds.contains(messageId)) else { + continue + } + + var decryptedData: Data? + if blazeMessage.category.hasPrefix("PLAIN_") { + decryptedData = Data(base64Encoded: blazeMessage.data) + } else { + decryptedData = parseEncryptedMessage(data: data) + } + + if let decryptedData = decryptedData, var (message, children) = makeDecryptMessage(data: blazeMessage, decryptedData: decryptedData) { + pairMessages.append((message, children)) + } else { + pairMessages.append((makeUnknownMessage(data: blazeMessage), nil)) + } + } + + let quoteMessages = pairMessages.compactMap { ($0.0.quoteMessageId?.isEmpty ?? true) ? nil : $0.0} + let transcriptMessages = pairMessages.compactMap { $0.1 }.flatMap { $0 } + var ftsMessages = [(Message, [TranscriptMessage]?)]() + if AppGroupUserDefaults.Database.isFTSInitialized { + ftsMessages = pairMessages.filter({ MessageCategory.ftsAvailableCategoryStrings.contains($0.0.category) }) + } + let messages = pairMessages.compactMap { $0.0 } + + UserDatabase.current.write { db in + try messages.save(db) + try transcriptMessages.save(db) + try jobs.save(db) + + for message in messages { + guard message.category.hasSuffix("_TEXT") || message.quoteMessageId != nil else { + continue + } + + var quotedMessage: MessageItem? + if let quoteMessageId = message.quoteMessageId { + quotedMessage = try MessageItem.fetchOne(db, sql: MessageDAO.sqlQueryQuoteMessageById, arguments: [quoteMessageId], adapter: nil) + } + if let quoted = quotedMessage, let quoteContent = try? JSONEncoder.default.encode(quoted) { + let assignments: [ColumnAssignment] = [ + Message.column(of: .quoteContent).set(to: quoteContent) + ] + + try Message + .filter(Message.column(of: .messageId) == message.messageId) + .updateAll(db, assignments) + } + + if let mention = MessageMention(message: message, quotedMessage: quotedMessage) { + try mention.save(db) + } + } + + for (ftsMessage, childrenMessages) in ftsMessages { + try MessageDAO.shared.insertFTSContent(db, message: ftsMessage, children: childrenMessages) + } + + try ConversationDAO.shared.updateUnseenMessageCount(database: db, conversationId: data.conversationId) + try ConversationDAO.shared.updateLastMessage(database: db, conversationId: data.conversationId, messageId: lastMessage.messageId, createdAt: lastMessage.createdAt) + } + + BlazeMessageDAO.shared.delete(messageIds: messageIds) + } while LoginManager.shared.isLoggedIn && blazeMessages.count >= pageCount + } private func processReceiveMessage(data: BlazeMessageData) { guard LoginManager.shared.isLoggedIn else { @@ -244,8 +344,8 @@ public class ReceiveMessageService: MixinService { UserDatabase.current.save(session) } } - - private func processUnknownMessage(data: BlazeMessageData) { + + private func makeUnknownMessage(data: BlazeMessageData) -> Message { var unknownMessage = Message.createMessage(messageId: data.messageId, category: data.category, conversationId: data.conversationId, @@ -253,7 +353,11 @@ public class ReceiveMessageService: MixinService { userId: data.getSenderId()) unknownMessage.status = MessageStatus.UNKNOWN.rawValue unknownMessage.content = data.data - MessageDAO.shared.insertMessage(message: unknownMessage, messageSource: data.source, silentNotification: data.silentNotification) + return unknownMessage + } + + private func processUnknownMessage(data: BlazeMessageData) { + MessageDAO.shared.insertMessage(message: makeUnknownMessage(data: data), messageSource: data.source, silentNotification: data.silentNotification) } private func processBadMessage(data: BlazeMessageData) { @@ -520,10 +624,7 @@ public class ReceiveMessageService: MixinService { } } - private func processEncryptedMessage(data: BlazeMessageData) { - guard data.category.hasPrefix("ENCRYPTED_") else { - return - } + private func parseEncryptedMessage(data: BlazeMessageData) -> Data? { guard let cipher = Data(base64Encoded: data.data), let pk = RequestSigning.edDSAPrivateKey, @@ -539,18 +640,28 @@ public class ReceiveMessageService: MixinService { ] Logger.conversation(id: data.conversationId).error(category: "EncryptedBotMessage", message: "Failed to decrypt", userInfo: info) reporter.report(error: MixinServicesError.decryptBotMessage(info)) - updateRemoteMessageStatus(messageId: data.messageId, status: .DELIVERED) - ReceiveMessageService.shared.processUnknownMessage(data: data) - return + return nil } do { - let decryptedData = try EncryptedProtocol.decrypt(cipher: cipher, with: pk, sessionId: mySessionId) - _ = syncUser(userId: data.getSenderId()) - processDecryptSuccess(data: data, decryptedData: decryptedData) + return try EncryptedProtocol.decrypt(cipher: cipher, with: pk, sessionId: mySessionId) } catch { reporter.report(error: error) + return nil + } + } + + private func processEncryptedMessage(data: BlazeMessageData) { + guard data.category.hasPrefix("ENCRYPTED_") else { + return + } + guard let decryptedData = parseEncryptedMessage(data: data) else { + updateRemoteMessageStatus(messageId: data.messageId, status: .DELIVERED) ReceiveMessageService.shared.processUnknownMessage(data: data) + return } + + _ = syncUser(userId: data.getSenderId()) + processDecryptSuccess(data: data, decryptedData: decryptedData) updateRemoteMessageStatus(messageId: data.messageId, status: .DELIVERED) } @@ -578,24 +689,23 @@ public class ReceiveMessageService: MixinService { } } - private func processDecryptSuccess(data: BlazeMessageData, decryptedData: Data) { + private func makeDecryptMessage(data: BlazeMessageData, decryptedData: Data) -> (Message, [TranscriptMessage]?)? { if data.category.hasSuffix("_TEXT") || data.category.hasSuffix("_POST") { guard let content = String(data: decryptedData, encoding: .utf8) else { ReceiveMessageService.shared.processUnknownMessage(data: data) - return + return nil } - let message = Message.createMessage(textMessage: content, data: data) - MessageDAO.shared.insertMessage(message: message, messageSource: data.source, silentNotification: data.silentNotification) + return (Message.createMessage(textMessage: content, data: data), nil) } else if data.category.hasSuffix("_IMAGE") || data.category.hasSuffix("_VIDEO") { guard let transferMediaData = (try? JSONDecoder.default.decode(TransferAttachmentData.self, from: decryptedData)) else { Logger.conversation(id: data.conversationId).error(category: "DecryptSuccess", message: "Invalid data for category: \(data.category), data: \(String(data: decryptedData, encoding: .utf8))") ReceiveMessageService.shared.processUnknownMessage(data: data) - return + return nil } guard let height = transferMediaData.height, let width = transferMediaData.width, height > 0, width > 0 else { Logger.conversation(id: data.conversationId).error(category: "DecryptSuccess", message: "Invalid TransferAttachmentData for category: \(data.category), data: \(String(data: decryptedData, encoding: .utf8))") ReceiveMessageService.shared.processUnknownMessage(data: data) - return + return nil } if transferMediaData.mimeType?.isEmpty ?? true { @@ -610,86 +720,92 @@ public class ReceiveMessageService: MixinService { reporter.report(error: error) } - let message = Message.createMessage(mediaData: transferMediaData, data: data) - MessageDAO.shared.insertMessage(message: message, messageSource: data.source, silentNotification: data.silentNotification) + return (Message.createMessage(mediaData: transferMediaData, data: data), nil) } else if data.category.hasSuffix("_LIVE") { guard let live = (try? JSONDecoder.default.decode(TransferLiveData.self, from: decryptedData)) else { Logger.conversation(id: data.conversationId).error(category: "DecryptSuccess", message: "Invalid TransferLiveData for category: \(data.category), data: \(String(data: decryptedData, encoding: .utf8))") ReceiveMessageService.shared.processUnknownMessage(data: data) - return + return nil } - let message = Message.createMessage(liveData: live, + return (Message.createMessage(liveData: live, content: String(data: decryptedData, encoding: .utf8), - data: data) - MessageDAO.shared.insertMessage(message: message, messageSource: data.source, silentNotification: data.silentNotification) + data: data), nil) } else if data.category.hasSuffix("_DATA") { guard let transferMediaData = (try? JSONDecoder.default.decode(TransferAttachmentData.self, from: decryptedData)) else { Logger.conversation(id: data.conversationId).error(category: "DecryptSuccess", message: "Invalid TransferAttachmentData for category: \(data.category), data: \(String(data: decryptedData, encoding: .utf8))") ReceiveMessageService.shared.processUnknownMessage(data: data) - return + return nil } guard transferMediaData.size > 0 else { ReceiveMessageService.shared.processUnknownMessage(data: data) - return + return nil } - let message = Message.createMessage(mediaData: transferMediaData, data: data) - MessageDAO.shared.insertMessage(message: message, messageSource: data.source, silentNotification: data.silentNotification) + return (Message.createMessage(mediaData: transferMediaData, data: data), nil) } else if data.category.hasSuffix("_AUDIO") { guard let transferMediaData = (try? JSONDecoder.default.decode(TransferAttachmentData.self, from: decryptedData)) else { Logger.conversation(id: data.conversationId).error(category: "DecryptSuccess", message: "Invalid TransferAttachmentData for category: \(data.category), data: \(String(data: decryptedData, encoding: .utf8))") ReceiveMessageService.shared.processUnknownMessage(data: data) - return + return nil } - let message = Message.createMessage(mediaData: transferMediaData, data: data) - MessageDAO.shared.insertMessage(message: message, messageSource: data.source, silentNotification: data.silentNotification) - let job = AttachmentDownloadJob(message: message) - ConcurrentJobQueue.shared.addJob(job: job) + return (Message.createMessage(mediaData: transferMediaData, data: data), nil) } else if data.category.hasSuffix("_STICKER") { guard let transferStickerData = parseSticker(data: data, decryptedData: decryptedData) else { - return + return nil } - let message = Message.createMessage(stickerData: transferStickerData, data: data) - MessageDAO.shared.insertMessage(message: message, messageSource: data.source, silentNotification: data.silentNotification) + return (Message.createMessage(stickerData: transferStickerData, data: data), nil) } else if data.category.hasSuffix("_CONTACT") { guard let transferData = (try? JSONDecoder.default.decode(TransferContactData.self, from: decryptedData)) else { Logger.conversation(id: data.conversationId).error(category: "DecryptSuccess", message: "Invalid TransferContactData for category: \(data.category), data: \(String(data: decryptedData, encoding: .utf8))") ReceiveMessageService.shared.processUnknownMessage(data: data) - return + return nil } guard !transferData.userId.isEmpty, UUID(uuidString: transferData.userId) != nil else { Logger.conversation(id: data.conversationId).error(category: "DecryptSuccess", message: "Invalid TransferContactData for category: \(data.category), data: \(String(data: decryptedData, encoding: .utf8))") ReceiveMessageService.shared.processUnknownMessage(data: data) - return + return nil } guard syncUser(userId: transferData.userId) else { - return + return nil } - let message = Message.createMessage(contactData: transferData, data: data) - MessageDAO.shared.insertMessage(message: message, messageSource: data.source, silentNotification: data.silentNotification) + return (Message.createMessage(contactData: transferData, data: data), nil) } else if data.category.hasSuffix("_LOCATION") { guard (try? JSONDecoder.default.decode(Location.self, from: decryptedData)) != nil else { ReceiveMessageService.shared.processUnknownMessage(data: data) - return + return nil } guard let content = String(data: decryptedData, encoding: .utf8) else { ReceiveMessageService.shared.processUnknownMessage(data: data) - return + return nil } - let message = Message.createLocationMessage(content: content, data: data) - MessageDAO.shared.insertMessage(message: message, messageSource: data.source, silentNotification: data.silentNotification) + return (Message.createLocationMessage(content: content, data: data), nil) } else if data.category.hasSuffix("_TRANSCRIPT") { guard let (content, children, hasAttachment) = parseTranscript(decryptedData: decryptedData, transcriptId: data.messageId) else { ReceiveMessageService.shared.processUnknownMessage(data: data) - return + return nil } guard !children.isEmpty else { ReceiveMessageService.shared.processUnknownMessage(data: data) - return + return nil } let message = Message.createTranscriptMessage(content: content, mediaStatus: hasAttachment ? .PENDING : .DONE, data: data) - MessageDAO.shared.insertMessage(message: message, children: children, messageSource: data.source, silentNotification: data.silentNotification) + return (message, children) + } + + return nil + } + + private func processDecryptSuccess(data: BlazeMessageData, decryptedData: Data) { + guard let (message, children) = makeDecryptMessage(data: data, decryptedData: decryptedData) else { + return + } + + MessageDAO.shared.insertMessage(message: message, children: children, messageSource: data.source, silentNotification: data.silentNotification) + + if data.category.hasSuffix("_AUDIO") { + let job = AttachmentDownloadJob(message: message) + ConcurrentJobQueue.shared.addJob(job: job) } } @@ -1017,6 +1133,30 @@ public class ReceiveMessageService: MixinService { return false } + + private func syncUsers(userIds: [String]) { + guard userIds.count > 0 else { + return + } + let ids = userIds.distinct().filter({ $0 != User.systemUser && $0 != currentAccountId && !$0.isEmpty }) + let existUserIds = UserDAO.shared.getExistUserIds(userIds: ids) + let syncUserIds = Array(Set(ids).subtracting(Set(existUserIds))) + guard syncUserIds.count > 0 else { + return + } + + repeat { + switch UserSessionAPI.showUsers(userIds: syncUserIds) { + case let .success(users): + UserDAO.shared.updateUsers(users: users) + return + case .failure(.unauthorized): + return + case .failure: + checkNetworkAndWebSocket() + } + } while LoginManager.shared.isLoggedIn + } private func processPlainMessage(data: BlazeMessageData) { guard data.category.hasPrefix("PLAIN_") else { @@ -1361,3 +1501,10 @@ extension CiphertextMessage.MessageType { } } } + +extension Array where Iterator.Element: Hashable { + + func distinct() -> [Iterator.Element] { + return Array(Set(self)) + } +} diff --git a/MixinServices/MixinServices/Services/WebSocket/Service/SendMessageService.swift b/MixinServices/MixinServices/Services/WebSocket/Service/SendMessageService.swift index 9db2031d11..5250bb4394 100644 --- a/MixinServices/MixinServices/Services/WebSocket/Service/SendMessageService.swift +++ b/MixinServices/MixinServices/Services/WebSocket/Service/SendMessageService.swift @@ -231,7 +231,7 @@ public class SendMessageService: MixinService { try jobs.insert(db) try db.execute(sql: "UPDATE messages SET status = '\(MessageStatus.READ.rawValue)' WHERE conversation_id = ? AND status = ? AND user_id != ? AND ROWID <= ?", arguments: [conversationId, MessageStatus.DELIVERED.rawValue, myUserId, lastRowID]) - try MessageDAO.shared.updateUnseenMessageCount(database: db, conversationId: conversationId) + try ConversationDAO.shared.updateUnseenMessageCount(database: db, conversationId: conversationId) if isLastLoop { db.afterNextTransactionCommit { (_) in NotificationCenter.default.post(name: MixinService.messageReadStatusDidChangeNotification, object: self) From 33bb77189b8459518f9087dccd8d968dacdc8924 Mon Sep 17 00:00:00 2001 From: Jun Tang Date: Thu, 4 Nov 2021 00:37:25 +0800 Subject: [PATCH 02/12] Process old messages --- .../MixinServices/Database/Database.swift | 7 ++- .../Database/Task/BlazeMessageDAO.swift | 17 +++--- .../Database/Task/MessageBlaze.swift | 4 +- .../Service/ReceiveMessageService.swift | 54 +++++++++++++------ 4 files changed, 51 insertions(+), 31 deletions(-) diff --git a/MixinServices/MixinServices/Database/Database.swift b/MixinServices/MixinServices/Database/Database.swift index f3f55ba896..1e3fb53da6 100644 --- a/MixinServices/MixinServices/Database/Database.swift +++ b/MixinServices/MixinServices/Database/Database.swift @@ -235,12 +235,15 @@ extension Database { } public func select( - where condition: SQLSpecificExpressible, + where condition: SQLSpecificExpressible? = nil, order orderings: [SQLOrderingTerm] = [], limit: Int? = nil ) -> [Record] { try! pool.read { (db) -> [Record] in - var request = Record.filter(condition) + var request = Record.all() + if let condition = condition { + request = request.filter(condition) + } if !orderings.isEmpty { request = request.order(orderings) } diff --git a/MixinServices/MixinServices/Database/Task/BlazeMessageDAO.swift b/MixinServices/MixinServices/Database/Task/BlazeMessageDAO.swift index 8430542ec8..aaba89486f 100644 --- a/MixinServices/MixinServices/Database/Task/BlazeMessageDAO.swift +++ b/MixinServices/MixinServices/Database/Task/BlazeMessageDAO.swift @@ -28,21 +28,16 @@ public final class BlazeMessageDAO { TaskDatabase.current.select(where: MessageBlaze.column(of: .messageId) == messageId) } - public func getBlazeMessageData(createdAt: String? = nil, limit: Int) -> [BlazeMessageData] { + public func getBlazeMessages(createdAt: String? = nil, limit: Int) -> [MessageBlaze] { let condition: SQLSpecificExpressible? if let createdAt = createdAt { condition = MessageBlaze.column(of: .createdAt) <= createdAt } else { condition = nil } - let data: [Data] = TaskDatabase.current.select(column: MessageBlaze.column(of: .message), - from: MessageBlaze.self, - where: condition, - order: [MessageBlaze.column(of: .createdAt).asc], - limit: limit) - return data.compactMap { (data) -> BlazeMessageData? in - try? JSONDecoder.default.decode(BlazeMessageData.self, from: data) - } + return TaskDatabase.current.select(where: condition, + order: [MessageBlaze.column(of: .createdAt).asc], + limit: limit) } public func getBlazeMessageData(conversationId: String, limit: Int) -> [BlazeMessageData] { @@ -57,9 +52,9 @@ public final class BlazeMessageDAO { } } - public func delete(data: BlazeMessageData) { + public func delete(messageId: String) { TaskDatabase.current.delete(MessageBlaze.self, - where: MessageBlaze.column(of: .messageId) == data.messageId) + where: MessageBlaze.column(of: .messageId) == messageId) } public func delete(messageIds: [String]) { diff --git a/MixinServices/MixinServices/Database/Task/MessageBlaze.swift b/MixinServices/MixinServices/Database/Task/MessageBlaze.swift index b2158a39b2..ae66feb783 100644 --- a/MixinServices/MixinServices/Database/Task/MessageBlaze.swift +++ b/MixinServices/MixinServices/Database/Task/MessageBlaze.swift @@ -1,7 +1,7 @@ import Foundation import GRDB -struct MessageBlaze { +public struct MessageBlaze { public let messageId: String public let message: Data @@ -12,7 +12,7 @@ struct MessageBlaze { extension MessageBlaze: Codable, DatabaseColumnConvertible, MixinFetchableRecord, MixinEncodableRecord { - enum CodingKeys: String, CodingKey { + public enum CodingKeys: String, CodingKey { case messageId = "_id" case message case conversationId = "conversation_id" diff --git a/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift b/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift index 61919f4f04..7ed8babc81 100644 --- a/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift +++ b/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift @@ -92,18 +92,26 @@ public class ReceiveMessageService: MixinService { return } else if let createdAt = BlazeMessageDAO.shared.getMessageBlaze(messageId: messageId)?.createdAt { repeat { - let blazeMessageDatas = BlazeMessageDAO.shared.getBlazeMessageData(createdAt: createdAt, limit: 50) - guard blazeMessageDatas.count > 0 else { + let blazeMessages = BlazeMessageDAO.shared.getBlazeMessages(createdAt: createdAt, limit: 50) + guard blazeMessages.count > 0 else { callback(nil) return } - for data in blazeMessageDatas { + for blazeMessage in blazeMessages { guard !AppGroupUserDefaults.isRunningInMainApp, !extensionTimeWillExpire() else { callback(nil) return } - ReceiveMessageService.shared.processReceiveMessage(data: data) + guard let data = try? JSONDecoder.default.decode(BlazeMessageData.self, from: blazeMessage.message) else { + ReceiveMessageService.shared.processBadMessage(messageId: blazeMessage.messageId) + continue + } + if MessageCategory.allBotCategoriesString.contains(data.category) && !blazeMessage.conversationId.isEmpty { + ReceiveMessageService.shared.processBotMessages(data: data) + } else { + ReceiveMessageService.shared.processReceiveMessage(data: data) + } if data.messageId == messageId { callback(MessageDAO.shared.getFullMessage(messageId: messageId)) return @@ -163,20 +171,25 @@ public class ReceiveMessageService: MixinService { guard LoginManager.shared.isLoggedIn, !MixinService.isStopProcessMessages else { return } - let blazeMessageDatas = BlazeMessageDAO.shared.getBlazeMessageData(limit: 50) - guard blazeMessageDatas.count > 0 else { + let blazeMessages = BlazeMessageDAO.shared.getBlazeMessages(limit: 50) + guard blazeMessages.count > 0 else { return } - for data in blazeMessageDatas { + for blazeMessage in blazeMessages { if MixinService.isStopProcessMessages { return } - if MessageCategory.allBotCategoriesString.contains(data.category) { + guard let data = try? JSONDecoder.default.decode(BlazeMessageData.self, from: blazeMessage.message) else { + ReceiveMessageService.shared.processBadMessage(messageId: blazeMessage.messageId) + continue + } + + if MessageCategory.allBotCategoriesString.contains(data.category) && !blazeMessage.conversationId.isEmpty { ReceiveMessageService.shared.processBotMessages(data: data) - break + } else { + ReceiveMessageService.shared.processReceiveMessage(data: data) } - ReceiveMessageService.shared.processReceiveMessage(data: data) } } while true } @@ -206,6 +219,10 @@ public class ReceiveMessageService: MixinService { ReceiveMessageService.shared.syncUsers(userIds: blazeMessages.map{ $0.userId }) + if MixinService.isStopProcessMessages { + return + } + let messageIds = blazeMessages.map{ $0.messageId } let messageSet = Set(messageIds) @@ -249,6 +266,10 @@ public class ReceiveMessageService: MixinService { } let messages = pairMessages.compactMap { $0.0 } + if MixinService.isStopProcessMessages { + return + } + UserDatabase.current.write { db in try messages.save(db) try transcriptMessages.save(db) @@ -287,7 +308,7 @@ public class ReceiveMessageService: MixinService { } BlazeMessageDAO.shared.delete(messageIds: messageIds) - } while LoginManager.shared.isLoggedIn && blazeMessages.count >= pageCount + } while LoginManager.shared.isLoggedIn && blazeMessages.count >= pageCount && !isAppExtension } private func processReceiveMessage(data: BlazeMessageData) { @@ -296,7 +317,7 @@ public class ReceiveMessageService: MixinService { } if MessageDAO.shared.isExist(messageId: data.messageId) || MessageHistoryDAO.shared.isExist(messageId: data.messageId) { - ReceiveMessageService.shared.processBadMessage(data: data) + ReceiveMessageService.shared.processBadMessage(messageId: data.messageId) return } @@ -323,7 +344,7 @@ public class ReceiveMessageService: MixinService { ReceiveMessageService.shared.processUnknownMessage(data: data) ReceiveMessageService.shared.updateRemoteMessageStatus(messageId: data.messageId, status: .DELIVERED) } - BlazeMessageDAO.shared.delete(data: data) + BlazeMessageDAO.shared.delete(messageId: data.messageId) } private func checkSession(data: BlazeMessageData) { @@ -360,9 +381,10 @@ public class ReceiveMessageService: MixinService { MessageDAO.shared.insertMessage(message: makeUnknownMessage(data: data), messageSource: data.source, silentNotification: data.silentNotification) } - private func processBadMessage(data: BlazeMessageData) { - ReceiveMessageService.shared.updateRemoteMessageStatus(messageId: data.messageId, status: .DELIVERED) - BlazeMessageDAO.shared.delete(data: data) + private func processBadMessage(messageId: String) { + MessageHistoryDAO.shared.replaceMessageHistory(messageId: messageId) + ReceiveMessageService.shared.updateRemoteMessageStatus(messageId: messageId, status: .DELIVERED) + BlazeMessageDAO.shared.delete(messageId: messageId) } private func processCallMessage(data: BlazeMessageData) { From 4cf94b6ec1f3d1ce4b9f63df32a83f4eabfb629c Mon Sep 17 00:00:00 2001 From: Jun Tang Date: Thu, 4 Nov 2021 01:32:39 +0800 Subject: [PATCH 03/12] Update process --- .../MixinServices/Database/Database.swift | 4 +- .../Service/ReceiveMessageService.swift | 55 +++++++++++-------- 2 files changed, 34 insertions(+), 25 deletions(-) diff --git a/MixinServices/MixinServices/Database/Database.swift b/MixinServices/MixinServices/Database/Database.swift index 1e3fb53da6..157a946ec5 100644 --- a/MixinServices/MixinServices/Database/Database.swift +++ b/MixinServices/MixinServices/Database/Database.swift @@ -236,7 +236,7 @@ extension Database { public func select( where condition: SQLSpecificExpressible? = nil, - order orderings: [SQLOrderingTerm] = [], + order orderings: [SQLOrderingTerm]? = nil, limit: Int? = nil ) -> [Record] { try! pool.read { (db) -> [Record] in @@ -244,7 +244,7 @@ extension Database { if let condition = condition { request = request.filter(condition) } - if !orderings.isEmpty { + if let orderings = orderings { request = request.order(orderings) } if let limit = limit { diff --git a/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift b/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift index 7ed8babc81..a052691d6d 100644 --- a/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift +++ b/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift @@ -167,6 +167,21 @@ public class ReceiveMessageService: MixinService { } while AppGroupUserDefaults.isProcessingMessagesInAppExtension && !MixinService.isStopProcessMessages } + var finishedJobCount = 0 + + func updateProgress(remainJobCount: Int, finishedJobCount: Int) { + guard displaySyncProcess else { + return + } + var progress = Int(Float(finishedJobCount) / Float(remainJobCount + finishedJobCount) * 100) + if progress > 100 { + progress = 100 + } + NotificationCenter.default.post(onMainThread: Self.progressNotification, + object: self, + userInfo: [Self.UserInfoKey.progress: progress]) + } + repeat { guard LoginManager.shared.isLoggedIn, !MixinService.isStopProcessMessages else { return @@ -175,19 +190,30 @@ public class ReceiveMessageService: MixinService { guard blazeMessages.count > 0 else { return } + + let remainJobCount = BlazeMessageDAO.shared.getCount() + if remainJobCount + finishedJobCount > 500 { + displaySyncProcess = true + updateProgress(remainJobCount: remainJobCount, finishedJobCount: finishedJobCount) + } for blazeMessage in blazeMessages { if MixinService.isStopProcessMessages { return } guard let data = try? JSONDecoder.default.decode(BlazeMessageData.self, from: blazeMessage.message) else { + finishedJobCount += 1 ReceiveMessageService.shared.processBadMessage(messageId: blazeMessage.messageId) continue } if MessageCategory.allBotCategoriesString.contains(data.category) && !blazeMessage.conversationId.isEmpty { - ReceiveMessageService.shared.processBotMessages(data: data) + ReceiveMessageService.shared.processBotMessages(data: data) { count in + finishedJobCount += count + updateProgress(remainJobCount: remainJobCount, finishedJobCount: finishedJobCount) + } } else { + finishedJobCount += 1 ReceiveMessageService.shared.processReceiveMessage(data: data) } } @@ -195,7 +221,7 @@ public class ReceiveMessageService: MixinService { } } - private func processBotMessages(data: BlazeMessageData) { + private func processBotMessages(data: BlazeMessageData, finishedBlock: ((Int) -> Void)? = nil) { ReceiveMessageService.shared.syncConversation(data: data) ReceiveMessageService.shared.checkSession(data: data) _ = ReceiveMessageService.shared.syncUser(userId: data.userId) @@ -308,6 +334,8 @@ public class ReceiveMessageService: MixinService { } BlazeMessageDAO.shared.delete(messageIds: messageIds) + + finishedBlock?(messageIds.count) } while LoginManager.shared.isLoggedIn && blazeMessages.count >= pageCount && !isAppExtension } @@ -714,19 +742,16 @@ public class ReceiveMessageService: MixinService { private func makeDecryptMessage(data: BlazeMessageData, decryptedData: Data) -> (Message, [TranscriptMessage]?)? { if data.category.hasSuffix("_TEXT") || data.category.hasSuffix("_POST") { guard let content = String(data: decryptedData, encoding: .utf8) else { - ReceiveMessageService.shared.processUnknownMessage(data: data) return nil } return (Message.createMessage(textMessage: content, data: data), nil) } else if data.category.hasSuffix("_IMAGE") || data.category.hasSuffix("_VIDEO") { guard let transferMediaData = (try? JSONDecoder.default.decode(TransferAttachmentData.self, from: decryptedData)) else { Logger.conversation(id: data.conversationId).error(category: "DecryptSuccess", message: "Invalid data for category: \(data.category), data: \(String(data: decryptedData, encoding: .utf8))") - ReceiveMessageService.shared.processUnknownMessage(data: data) return nil } guard let height = transferMediaData.height, let width = transferMediaData.width, height > 0, width > 0 else { Logger.conversation(id: data.conversationId).error(category: "DecryptSuccess", message: "Invalid TransferAttachmentData for category: \(data.category), data: \(String(data: decryptedData, encoding: .utf8))") - ReceiveMessageService.shared.processUnknownMessage(data: data) return nil } @@ -746,7 +771,6 @@ public class ReceiveMessageService: MixinService { } else if data.category.hasSuffix("_LIVE") { guard let live = (try? JSONDecoder.default.decode(TransferLiveData.self, from: decryptedData)) else { Logger.conversation(id: data.conversationId).error(category: "DecryptSuccess", message: "Invalid TransferLiveData for category: \(data.category), data: \(String(data: decryptedData, encoding: .utf8))") - ReceiveMessageService.shared.processUnknownMessage(data: data) return nil } return (Message.createMessage(liveData: live, @@ -755,18 +779,15 @@ public class ReceiveMessageService: MixinService { } else if data.category.hasSuffix("_DATA") { guard let transferMediaData = (try? JSONDecoder.default.decode(TransferAttachmentData.self, from: decryptedData)) else { Logger.conversation(id: data.conversationId).error(category: "DecryptSuccess", message: "Invalid TransferAttachmentData for category: \(data.category), data: \(String(data: decryptedData, encoding: .utf8))") - ReceiveMessageService.shared.processUnknownMessage(data: data) return nil } guard transferMediaData.size > 0 else { - ReceiveMessageService.shared.processUnknownMessage(data: data) return nil } return (Message.createMessage(mediaData: transferMediaData, data: data), nil) } else if data.category.hasSuffix("_AUDIO") { guard let transferMediaData = (try? JSONDecoder.default.decode(TransferAttachmentData.self, from: decryptedData)) else { Logger.conversation(id: data.conversationId).error(category: "DecryptSuccess", message: "Invalid TransferAttachmentData for category: \(data.category), data: \(String(data: decryptedData, encoding: .utf8))") - ReceiveMessageService.shared.processUnknownMessage(data: data) return nil } return (Message.createMessage(mediaData: transferMediaData, data: data), nil) @@ -778,12 +799,10 @@ public class ReceiveMessageService: MixinService { } else if data.category.hasSuffix("_CONTACT") { guard let transferData = (try? JSONDecoder.default.decode(TransferContactData.self, from: decryptedData)) else { Logger.conversation(id: data.conversationId).error(category: "DecryptSuccess", message: "Invalid TransferContactData for category: \(data.category), data: \(String(data: decryptedData, encoding: .utf8))") - ReceiveMessageService.shared.processUnknownMessage(data: data) return nil } guard !transferData.userId.isEmpty, UUID(uuidString: transferData.userId) != nil else { Logger.conversation(id: data.conversationId).error(category: "DecryptSuccess", message: "Invalid TransferContactData for category: \(data.category), data: \(String(data: decryptedData, encoding: .utf8))") - ReceiveMessageService.shared.processUnknownMessage(data: data) return nil } guard syncUser(userId: transferData.userId) else { @@ -792,21 +811,17 @@ public class ReceiveMessageService: MixinService { return (Message.createMessage(contactData: transferData, data: data), nil) } else if data.category.hasSuffix("_LOCATION") { guard (try? JSONDecoder.default.decode(Location.self, from: decryptedData)) != nil else { - ReceiveMessageService.shared.processUnknownMessage(data: data) return nil } guard let content = String(data: decryptedData, encoding: .utf8) else { - ReceiveMessageService.shared.processUnknownMessage(data: data) return nil } return (Message.createLocationMessage(content: content, data: data), nil) } else if data.category.hasSuffix("_TRANSCRIPT") { guard let (content, children, hasAttachment) = parseTranscript(decryptedData: decryptedData, transcriptId: data.messageId) else { - ReceiveMessageService.shared.processUnknownMessage(data: data) return nil } guard !children.isEmpty else { - ReceiveMessageService.shared.processUnknownMessage(data: data) return nil } let message = Message.createTranscriptMessage(content: content, @@ -820,6 +835,7 @@ public class ReceiveMessageService: MixinService { private func processDecryptSuccess(data: BlazeMessageData, decryptedData: Data) { guard let (message, children) = makeDecryptMessage(data: data, decryptedData: decryptedData) else { + ReceiveMessageService.shared.processUnknownMessage(data: data) return } @@ -1160,7 +1176,7 @@ public class ReceiveMessageService: MixinService { guard userIds.count > 0 else { return } - let ids = userIds.distinct().filter({ $0 != User.systemUser && $0 != currentAccountId && !$0.isEmpty }) + let ids = Array(Set(userIds)).filter({ $0 != User.systemUser && $0 != currentAccountId && !$0.isEmpty }) let existUserIds = UserDAO.shared.getExistUserIds(userIds: ids) let syncUserIds = Array(Set(ids).subtracting(Set(existUserIds))) guard syncUserIds.count > 0 else { @@ -1523,10 +1539,3 @@ extension CiphertextMessage.MessageType { } } } - -extension Array where Iterator.Element: Hashable { - - func distinct() -> [Iterator.Element] { - return Array(Set(self)) - } -} From 0e661bab4d73b0235d269defc56d1a32ad77f3e6 Mon Sep 17 00:00:00 2001 From: Jun Tang Date: Thu, 4 Nov 2021 05:37:58 +0800 Subject: [PATCH 04/12] Notify messages has been inserted --- .../Database/User/DAO/MessageDAO.swift | 9 ++++ .../Service/ReceiveMessageService.swift | 52 +++++++++++++++---- 2 files changed, 52 insertions(+), 9 deletions(-) diff --git a/MixinServices/MixinServices/Database/User/DAO/MessageDAO.swift b/MixinServices/MixinServices/Database/User/DAO/MessageDAO.swift index 28db2792eb..1c16bc26b5 100644 --- a/MixinServices/MixinServices/Database/User/DAO/MessageDAO.swift +++ b/MixinServices/MixinServices/Database/User/DAO/MessageDAO.swift @@ -500,6 +500,15 @@ public final class MessageDAO: UserDatabaseDAO { return db.select(with: sql, arguments: [conversationId, count]) } + public func getMessages(messageIds: [String]) -> [MessageItem] { + let sql = """ + \(Self.sqlQueryFullMessage) + WHERE m.id in ('\(messageIds.joined(separator: "', '"))') + ORDER BY m.created_at DESC + """ + return db.select(with: sql, arguments: []) + } + public func getFirstNMessages(conversationId: String, count: Int) -> [MessageItem] { db.select(with: MessageDAO.sqlQueryFirstNMessages, arguments: [conversationId, count]) } diff --git a/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift b/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift index a052691d6d..07c6c92192 100644 --- a/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift +++ b/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift @@ -233,12 +233,13 @@ public class ReceiveMessageService: MixinService { } let pageCount = 200 + let conversationId = data.conversationId var blazeMessages = [BlazeMessageData]() repeat { if MixinService.isStopProcessMessages { return } - blazeMessages = BlazeMessageDAO.shared.getBlazeMessageData(conversationId: data.conversationId, limit: pageCount) + blazeMessages = BlazeMessageDAO.shared.getBlazeMessageData(conversationId: conversationId, limit: pageCount) guard let lastMessage = blazeMessages.last else { return } @@ -249,14 +250,11 @@ public class ReceiveMessageService: MixinService { return } + let blazeMessageDict = Dictionary(uniqueKeysWithValues: blazeMessages.map { ($0.messageId, $0) }) let messageIds = blazeMessages.map{ $0.messageId } - let messageSet = Set(messageIds) - let existMessageIds = MessageDAO.shared.getExistMessageIds(messageIds: messageIds) let existHistoryIds = MessageHistoryDAO.shared.getExistMessageIds(messageIds: messageIds) - let notExistIds = messageSet.subtracting(existMessageIds).union(messageSet.subtracting(existHistoryIds)) - let existIds = messageSet.subtracting(notExistIds) var jobs = [Job]() var pairMessages = [(Message, [TranscriptMessage]?)]() @@ -267,6 +265,7 @@ public class ReceiveMessageService: MixinService { jobs.append(Job(jobId: ackBlazeMessage.id, action: .SEND_DELIVERED_ACK_MESSAGE, blazeMessage: ackBlazeMessage)) guard !(existMessageIds.contains(messageId) || existHistoryIds.contains(messageId)) else { + pairMessages.append((makeUnknownMessage(data: blazeMessage), nil)) continue } @@ -277,14 +276,13 @@ public class ReceiveMessageService: MixinService { decryptedData = parseEncryptedMessage(data: data) } - if let decryptedData = decryptedData, var (message, children) = makeDecryptMessage(data: blazeMessage, decryptedData: decryptedData) { + if let decryptedData = decryptedData, let (message, children) = makeDecryptMessage(data: blazeMessage, decryptedData: decryptedData) { pairMessages.append((message, children)) } else { pairMessages.append((makeUnknownMessage(data: blazeMessage), nil)) } } - let quoteMessages = pairMessages.compactMap { ($0.0.quoteMessageId?.isEmpty ?? true) ? nil : $0.0} let transcriptMessages = pairMessages.compactMap { $0.1 }.flatMap { $0 } var ftsMessages = [(Message, [TranscriptMessage]?)]() if AppGroupUserDefaults.Database.isFTSInitialized { @@ -329,8 +327,44 @@ public class ReceiveMessageService: MixinService { try MessageDAO.shared.insertFTSContent(db, message: ftsMessage, children: childrenMessages) } - try ConversationDAO.shared.updateUnseenMessageCount(database: db, conversationId: data.conversationId) - try ConversationDAO.shared.updateLastMessage(database: db, conversationId: data.conversationId, messageId: lastMessage.messageId, createdAt: lastMessage.createdAt) + try ConversationDAO.shared.updateUnseenMessageCount(database: db, conversationId: conversationId) + try ConversationDAO.shared.updateLastMessage(database: db, conversationId: conversationId, messageId: lastMessage.messageId, createdAt: lastMessage.createdAt) + + db.afterNextTransactionCommit { _ in + DispatchQueue.global().async { + if isAppExtension { + if AppGroupUserDefaults.isRunningInMainApp { + DarwinNotificationManager.shared.notifyConversationDidChangeInMainApp() + } + if AppGroupUserDefaults.User.currentConversationId == data.conversationId { + AppGroupUserDefaults.User.reloadConversation = true + } + } else { + guard let conversation = ConversationDAO.shared.getConversation(conversationId: conversationId), conversation.status == ConversationStatus.SUCCESS.rawValue else { + return + } + + if conversation.isMuted && AppGroupUserDefaults.User.currentConversationId != conversationId { + let change = ConversationChange(conversationId: conversationId, action: .reload) + NotificationCenter.default.post(onMainThread: conversationDidChangeNotification, object: change) + } else { + let messageIds = messages.map{ $0.messageId } + let messages = MessageDAO.shared.getMessages(messageIds: messageIds) + for newMessage in messages { + var userInfo: [String: Any] = [ + MessageDAO.UserInfoKey.conversationId: newMessage.conversationId, + MessageDAO.UserInfoKey.message: newMessage, + ] + if let data = blazeMessageDict[newMessage.messageId] { + userInfo[MessageDAO.UserInfoKey.messsageSource] = data.source + userInfo[MessageDAO.UserInfoKey.silentNotification] = data.silentNotification + } + NotificationCenter.default.post(onMainThread: MessageDAO.didInsertMessageNotification, object: self, userInfo: userInfo) + } + } + } + } + } } BlazeMessageDAO.shared.delete(messageIds: messageIds) From 13e18161074d38e20adb0830e221c4e87f42988c Mon Sep 17 00:00:00 2001 From: Jun Tang Date: Thu, 4 Nov 2021 08:34:20 +0800 Subject: [PATCH 05/12] Batch sync shared users --- .../Service/ReceiveMessageService.swift | 30 ++++++++++++------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift b/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift index 07c6c92192..3a2e3318a4 100644 --- a/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift +++ b/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift @@ -212,6 +212,7 @@ public class ReceiveMessageService: MixinService { finishedJobCount += count updateProgress(remainJobCount: remainJobCount, finishedJobCount: finishedJobCount) } + break } else { finishedJobCount += 1 ReceiveMessageService.shared.processReceiveMessage(data: data) @@ -232,17 +233,18 @@ public class ReceiveMessageService: MixinService { return } - let pageCount = 200 let conversationId = data.conversationId - var blazeMessages = [BlazeMessageData]() + let pageCount = 200 + var loopEnd = false repeat { if MixinService.isStopProcessMessages { return } - blazeMessages = BlazeMessageDAO.shared.getBlazeMessageData(conversationId: conversationId, limit: pageCount) + let blazeMessages = BlazeMessageDAO.shared.getBlazeMessageData(conversationId: conversationId, limit: pageCount) guard let lastMessage = blazeMessages.last else { return } + loopEnd = blazeMessages.count < pageCount ReceiveMessageService.shared.syncUsers(userIds: blazeMessages.map{ $0.userId }) @@ -258,6 +260,7 @@ public class ReceiveMessageService: MixinService { var jobs = [Job]() var pairMessages = [(Message, [TranscriptMessage]?)]() + var sharedContactIds = [String]() for blazeMessage in blazeMessages { let messageId = blazeMessage.messageId @@ -277,16 +280,21 @@ public class ReceiveMessageService: MixinService { } if let decryptedData = decryptedData, let (message, children) = makeDecryptMessage(data: blazeMessage, decryptedData: decryptedData) { + if message.category.hasSuffix("_CONTACT"), let sharedUserId = message.sharedUserId { + sharedContactIds.append(sharedUserId) + } pairMessages.append((message, children)) } else { pairMessages.append((makeUnknownMessage(data: blazeMessage), nil)) } } - + + ReceiveMessageService.shared.syncUsers(userIds: sharedContactIds) + let transcriptMessages = pairMessages.compactMap { $0.1 }.flatMap { $0 } var ftsMessages = [(Message, [TranscriptMessage]?)]() if AppGroupUserDefaults.Database.isFTSInitialized { - ftsMessages = pairMessages.filter({ MessageCategory.ftsAvailableCategoryStrings.contains($0.0.category) }) + ftsMessages = pairMessages.filter{ MessageCategory.ftsAvailableCategoryStrings.contains($0.0.category) } } let messages = pairMessages.compactMap { $0.0 } @@ -370,7 +378,7 @@ public class ReceiveMessageService: MixinService { BlazeMessageDAO.shared.delete(messageIds: messageIds) finishedBlock?(messageIds.count) - } while LoginManager.shared.isLoggedIn && blazeMessages.count >= pageCount && !isAppExtension + } while LoginManager.shared.isLoggedIn && !loopEnd && !isAppExtension } private func processReceiveMessage(data: BlazeMessageData) { @@ -839,9 +847,6 @@ public class ReceiveMessageService: MixinService { Logger.conversation(id: data.conversationId).error(category: "DecryptSuccess", message: "Invalid TransferContactData for category: \(data.category), data: \(String(data: decryptedData, encoding: .utf8))") return nil } - guard syncUser(userId: transferData.userId) else { - return nil - } return (Message.createMessage(contactData: transferData, data: data), nil) } else if data.category.hasSuffix("_LOCATION") { guard (try? JSONDecoder.default.decode(Location.self, from: decryptedData)) != nil else { @@ -873,6 +878,10 @@ public class ReceiveMessageService: MixinService { return } + if data.category.hasSuffix("_CONTACT"), let sharedUserId = message.sharedUserId { + checkUser(userId: sharedUserId, tryAgain: true) + } + MessageDAO.shared.insertMessage(message: message, children: children, messageSource: data.source, silentNotification: data.silentNotification) if data.category.hasSuffix("_AUDIO") { @@ -983,6 +992,7 @@ public class ReceiveMessageService: MixinService { MessageDAO.shared.updateLiveMessage(liveData: liveData, content: plainText.base64Decoded(), status: Message.getStatus(data: data), messageId: messageId, category: data.category, conversationId: data.conversationId, messageSource: data.source, silentNotification: data.silentNotification) case MessageCategory.SIGNAL_STICKER.rawValue: guard let decryptedData = plainText.data(using: .utf8), let transferStickerData = parseSticker(data: data, decryptedData: decryptedData) else { + ReceiveMessageService.shared.processUnknownMessage(data: data) return } MessageDAO.shared.updateStickerMessage(stickerData: transferStickerData, status: Message.getStatus(data: data), messageId: messageId, category: data.category, conversationId: data.conversationId, messageSource: data.source, silentNotification: data.silentNotification) @@ -1028,14 +1038,12 @@ public class ReceiveMessageService: MixinService { private func parseSticker(data: BlazeMessageData, decryptedData: Data) -> TransferStickerData? { guard let transferStickerData = (try? JSONDecoder.default.decode(TransferStickerData.self, from: decryptedData)) else { Logger.conversation(id: data.conversationId).error(category: "ParseSticker", message: "Invalid TransferStickerData: \(String(data: decryptedData, encoding: .utf8))") - ReceiveMessageService.shared.processUnknownMessage(data: data) return nil } if let stickerId = transferStickerData.stickerId { guard !stickerId.isEmpty, UUID(uuidString: stickerId) != nil else { Logger.conversation(id: data.conversationId).error(category: "ParseSticker", message: "Invalid TransferStickerData: \(String(data: decryptedData, encoding: .utf8))") - ReceiveMessageService.shared.processUnknownMessage(data: data) return nil } guard !StickerDAO.shared.isExist(stickerId: stickerId) else { From fd35cff4779856e88b63e39399c4602a31252af3 Mon Sep 17 00:00:00 2001 From: Jun Tang Date: Sat, 6 Nov 2021 04:05:24 +0800 Subject: [PATCH 06/12] Fix conversation type judgment --- .../Database/User/DAO/ConversationDAO.swift | 10 ++++++++++ .../MixinServices/Database/User/DAO/UserDAO.swift | 6 ------ .../WebSocket/Service/ReceiveMessageService.swift | 6 +++--- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/MixinServices/MixinServices/Database/User/DAO/ConversationDAO.swift b/MixinServices/MixinServices/Database/User/DAO/ConversationDAO.swift index 7cf398c26e..4d5980729c 100644 --- a/MixinServices/MixinServices/Database/User/DAO/ConversationDAO.swift +++ b/MixinServices/MixinServices/Database/User/DAO/ConversationDAO.swift @@ -46,6 +46,16 @@ public final class ConversationDAO: UserDatabaseDAO { return value > 0 } + public func isBotConversation(conversationId: String) -> Bool { + let sql = """ + SELECT 1 FROM conversations c + INNER JOIN users u ON u.user_id = c.owner_id AND u.app_id IS NOT NULL AND u.identity_number > '0' + WHERE c.conversation_id = ? AND c.category = 'CONTACT' + """ + let value: Int64 = db.select(with: sql, arguments: [conversationId]) ?? 0 + return value > 0 + } + public func getUnreadMessageCount() -> Int { let sql = "SELECT ifnull(SUM(unseen_message_count),0) FROM conversations WHERE category IS NOT NULL" return db.select(with: sql) ?? 0 diff --git a/MixinServices/MixinServices/Database/User/DAO/UserDAO.swift b/MixinServices/MixinServices/Database/User/DAO/UserDAO.swift index 49d8afebb6..fa745e022e 100644 --- a/MixinServices/MixinServices/Database/User/DAO/UserDAO.swift +++ b/MixinServices/MixinServices/Database/User/DAO/UserDAO.swift @@ -19,12 +19,6 @@ public final class UserDAO: UserDatabaseDAO { LEFT JOIN apps a ON a.app_id = u.app_id """ - public func isBotUser(userId: String) -> Bool { - db.recordExists(in: User.self, where: User.column(of: .userId) == userId - && User.column(of: .identityNumber) > "0" - && User.column(of: .appId) != nil) - } - public func deleteUser(userId: String) { db.delete(User.self, where: User.column(of: .userId) == userId) } diff --git a/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift b/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift index 3a2e3318a4..e89b2cc09b 100644 --- a/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift +++ b/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift @@ -223,17 +223,17 @@ public class ReceiveMessageService: MixinService { } private func processBotMessages(data: BlazeMessageData, finishedBlock: ((Int) -> Void)? = nil) { + let conversationId = data.conversationId ReceiveMessageService.shared.syncConversation(data: data) ReceiveMessageService.shared.checkSession(data: data) _ = ReceiveMessageService.shared.syncUser(userId: data.userId) - - guard UserDAO.shared.isBotUser(userId: data.userId) else { + + guard ConversationDAO.shared.isBotConversation(conversationId: conversationId) else { // plain message in group chat ReceiveMessageService.shared.processReceiveMessage(data: data) return } - let conversationId = data.conversationId let pageCount = 200 var loopEnd = false repeat { From 33860c60aab7c5db2335f1a1b1607328b4fa906a Mon Sep 17 00:00:00 2001 From: Jun Tang Date: Sat, 6 Nov 2021 05:06:14 +0800 Subject: [PATCH 07/12] Improve process messages in backgroud --- .../Service/ReceiveMessageService.swift | 85 ++++++++++++++----- 1 file changed, 62 insertions(+), 23 deletions(-) diff --git a/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift b/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift index e89b2cc09b..e5b081f2aa 100644 --- a/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift +++ b/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift @@ -97,26 +97,50 @@ public class ReceiveMessageService: MixinService { callback(nil) return } - + + var nonBotMessages = [BlazeMessageData]() + var botMessages = [String: BlazeMessageData]() + for blazeMessage in blazeMessages { - guard !AppGroupUserDefaults.isRunningInMainApp, !extensionTimeWillExpire() else { - callback(nil) - return - } guard let data = try? JSONDecoder.default.decode(BlazeMessageData.self, from: blazeMessage.message) else { ReceiveMessageService.shared.processBadMessage(messageId: blazeMessage.messageId) continue } + if MessageCategory.allBotCategoriesString.contains(data.category) && !blazeMessage.conversationId.isEmpty { - ReceiveMessageService.shared.processBotMessages(data: data) + if botMessages[data.conversationId] == nil { + botMessages[data.conversationId] = data + } } else { - ReceiveMessageService.shared.processReceiveMessage(data: data) + nonBotMessages.append(data) + } + } + + for blazeMessage in nonBotMessages { + guard !AppGroupUserDefaults.isRunningInMainApp, !extensionTimeWillExpire() else { + callback(nil) + return } - if data.messageId == messageId { + ReceiveMessageService.shared.processReceiveMessage(data: blazeMessage) + if blazeMessage.messageId == messageId { callback(MessageDAO.shared.getFullMessage(messageId: messageId)) return } } + + for blazeMessage in botMessages.values { + ReceiveMessageService.shared.processBotMessages(data: blazeMessage) { messageIds in + guard !AppGroupUserDefaults.isRunningInMainApp, !extensionTimeWillExpire() else { + callback(nil) + return false + } + guard !messageIds.contains(messageId) else { + callback(MessageDAO.shared.getFullMessage(messageId: messageId)) + return false + } + return true + } + } } while true } else if let message = MessageDAO.shared.getFullMessage(messageId: messageId) { callback(message) @@ -196,11 +220,11 @@ public class ReceiveMessageService: MixinService { displaySyncProcess = true updateProgress(remainJobCount: remainJobCount, finishedJobCount: finishedJobCount) } - + + var nonBotMessages = [BlazeMessageData]() + var botMessages = [String: BlazeMessageData]() + for blazeMessage in blazeMessages { - if MixinService.isStopProcessMessages { - return - } guard let data = try? JSONDecoder.default.decode(BlazeMessageData.self, from: blazeMessage.message) else { finishedJobCount += 1 ReceiveMessageService.shared.processBadMessage(messageId: blazeMessage.messageId) @@ -208,14 +232,27 @@ public class ReceiveMessageService: MixinService { } if MessageCategory.allBotCategoriesString.contains(data.category) && !blazeMessage.conversationId.isEmpty { - ReceiveMessageService.shared.processBotMessages(data: data) { count in - finishedJobCount += count - updateProgress(remainJobCount: remainJobCount, finishedJobCount: finishedJobCount) + if botMessages[data.conversationId] == nil { + botMessages[data.conversationId] = data } - break } else { - finishedJobCount += 1 - ReceiveMessageService.shared.processReceiveMessage(data: data) + nonBotMessages.append(data) + } + } + + for blazeMessage in nonBotMessages { + if MixinService.isStopProcessMessages { + return + } + ReceiveMessageService.shared.processReceiveMessage(data: blazeMessage) + } + finishedJobCount += nonBotMessages.count + + for blazeMessage in botMessages.values { + ReceiveMessageService.shared.processBotMessages(data: blazeMessage) { messageIds in + finishedJobCount += messageIds.count + updateProgress(remainJobCount: remainJobCount, finishedJobCount: finishedJobCount) + return true } } } while true @@ -225,15 +262,15 @@ public class ReceiveMessageService: MixinService { private func processBotMessages(data: BlazeMessageData, finishedBlock: ((Int) -> Void)? = nil) { let conversationId = data.conversationId ReceiveMessageService.shared.syncConversation(data: data) - ReceiveMessageService.shared.checkSession(data: data) - _ = ReceiveMessageService.shared.syncUser(userId: data.userId) - guard ConversationDAO.shared.isBotConversation(conversationId: conversationId) else { // plain message in group chat ReceiveMessageService.shared.processReceiveMessage(data: data) return } + ReceiveMessageService.shared.checkSession(data: data) + _ = ReceiveMessageService.shared.syncUser(userId: data.userId) + let pageCount = 200 var loopEnd = false repeat { @@ -377,8 +414,10 @@ public class ReceiveMessageService: MixinService { BlazeMessageDAO.shared.delete(messageIds: messageIds) - finishedBlock?(messageIds.count) - } while LoginManager.shared.isLoggedIn && !loopEnd && !isAppExtension + if !processBlock(messageIds) { + return + } + } while LoginManager.shared.isLoggedIn && !loopEnd } private func processReceiveMessage(data: BlazeMessageData) { From f6ead881799961c0090b574b5ea9a04d1a8103eb Mon Sep 17 00:00:00 2001 From: Jun Tang Date: Sun, 7 Nov 2021 20:30:03 +0800 Subject: [PATCH 08/12] Remove blank line --- .../Services/WebSocket/Service/ReceiveMessageService.swift | 1 - 1 file changed, 1 deletion(-) diff --git a/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift b/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift index e5b081f2aa..0c742a1f8c 100644 --- a/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift +++ b/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift @@ -294,7 +294,6 @@ public class ReceiveMessageService: MixinService { let existMessageIds = MessageDAO.shared.getExistMessageIds(messageIds: messageIds) let existHistoryIds = MessageHistoryDAO.shared.getExistMessageIds(messageIds: messageIds) - var jobs = [Job]() var pairMessages = [(Message, [TranscriptMessage]?)]() var sharedContactIds = [String]() From a03389e7f0272a180d25f5a413c059ce77a12618 Mon Sep 17 00:00:00 2001 From: Jun Tang Date: Mon, 8 Nov 2021 18:01:03 +0800 Subject: [PATCH 09/12] Fix database upgrade --- MixinServices/MixinServices/Database/User/UserDatabase.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/MixinServices/MixinServices/Database/User/UserDatabase.swift b/MixinServices/MixinServices/Database/User/UserDatabase.swift index 01612d355c..64ca282b5e 100644 --- a/MixinServices/MixinServices/Database/User/UserDatabase.swift +++ b/MixinServices/MixinServices/Database/User/UserDatabase.swift @@ -414,7 +414,7 @@ public final class UserDatabase: Database { } migrator.registerMigration("batch_process_messages") { (db) in - try db.execute(sql: "DROP INDEX IF EXISTS conversation_last_message_update") + try db.execute(sql: "DROP TRIGGER IF EXISTS conversation_last_message_update") } /* Remaining works: From 4d36ebe5b4b52e31991361048b8eb581f3939835 Mon Sep 17 00:00:00 2001 From: over140 Date: Mon, 15 Nov 2021 20:47:49 +0400 Subject: [PATCH 10/12] Auto download audio file --- .../WebSocket/Service/ReceiveMessageService.swift | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift b/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift index 0c742a1f8c..a5ddef2343 100644 --- a/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift +++ b/MixinServices/MixinServices/Services/WebSocket/Service/ReceiveMessageService.swift @@ -259,7 +259,7 @@ public class ReceiveMessageService: MixinService { } } - private func processBotMessages(data: BlazeMessageData, finishedBlock: ((Int) -> Void)? = nil) { + private func processBotMessages(data: BlazeMessageData, processBlock: ([String]) -> Bool) { let conversationId = data.conversationId ReceiveMessageService.shared.syncConversation(data: data) guard ConversationDAO.shared.isBotConversation(conversationId: conversationId) else { @@ -295,6 +295,7 @@ public class ReceiveMessageService: MixinService { let existHistoryIds = MessageHistoryDAO.shared.getExistMessageIds(messageIds: messageIds) var jobs = [Job]() + var downloadJobs = [AttachmentDownloadJob]() var pairMessages = [(Message, [TranscriptMessage]?)]() var sharedContactIds = [String]() @@ -319,6 +320,9 @@ public class ReceiveMessageService: MixinService { if message.category.hasSuffix("_CONTACT"), let sharedUserId = message.sharedUserId { sharedContactIds.append(sharedUserId) } + if message.category.hasSuffix("_AUDIO") { + downloadJobs.append(AttachmentDownloadJob(message: message)) + } pairMessages.append((message, children)) } else { pairMessages.append((makeUnknownMessage(data: blazeMessage), nil)) @@ -376,6 +380,10 @@ public class ReceiveMessageService: MixinService { db.afterNextTransactionCommit { _ in DispatchQueue.global().async { + for job in downloadJobs { + ConcurrentJobQueue.shared.addJob(job: job) + } + if isAppExtension { if AppGroupUserDefaults.isRunningInMainApp { DarwinNotificationManager.shared.notifyConversationDidChangeInMainApp() From b4f80f607d9b81e091953cee833570915f1b7afe Mon Sep 17 00:00:00 2001 From: wuyueyang Date: Wed, 17 Nov 2021 17:57:34 +0800 Subject: [PATCH 11/12] Apply code style --- .../Database/User/DAO/MessageDAO.swift | 6 +++--- .../Database/User/DAO/MessageHistoryDAO.swift | 6 +++--- .../MixinServices/Database/User/DAO/UserDAO.swift | 14 +++++++------- .../MixinServices/Database/User/UserDatabase.swift | 8 ++++---- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/MixinServices/MixinServices/Database/User/DAO/MessageDAO.swift b/MixinServices/MixinServices/Database/User/DAO/MessageDAO.swift index f48d30dc69..70b2388d53 100644 --- a/MixinServices/MixinServices/Database/User/DAO/MessageDAO.swift +++ b/MixinServices/MixinServices/Database/User/DAO/MessageDAO.swift @@ -195,12 +195,12 @@ public final class MessageDAO: UserDatabaseDAO { } public func getExistMessageIds(messageIds: [String]) -> [String] { - guard messageIds.count > 0 else { + guard !messageIds.isEmpty else { return [] } return db.select(column: Message.column(of: .messageId), - from: Message.self, - where: messageIds.contains(Message.column(of: .messageId))) + from: Message.self, + where: messageIds.contains(Message.column(of: .messageId))) } public func batchUpdateMessageStatus(readMessageIds: [String], mentionMessageIds: [String]) { diff --git a/MixinServices/MixinServices/Database/User/DAO/MessageHistoryDAO.swift b/MixinServices/MixinServices/Database/User/DAO/MessageHistoryDAO.swift index 0363de14ea..e070309ffc 100644 --- a/MixinServices/MixinServices/Database/User/DAO/MessageHistoryDAO.swift +++ b/MixinServices/MixinServices/Database/User/DAO/MessageHistoryDAO.swift @@ -10,12 +10,12 @@ public final class MessageHistoryDAO: UserDatabaseDAO { } public func getExistMessageIds(messageIds: [String]) -> [String] { - guard messageIds.count > 0 else { + guard !messageIds.isEmpty else { return [] } return db.select(column: MessageHistory.column(of: .messageId), - from: MessageHistory.self, - where: messageIds.contains(MessageHistory.column(of: .messageId))) + from: MessageHistory.self, + where: messageIds.contains(MessageHistory.column(of: .messageId))) } public func replaceMessageHistory(messageId: String) { diff --git a/MixinServices/MixinServices/Database/User/DAO/UserDAO.swift b/MixinServices/MixinServices/Database/User/DAO/UserDAO.swift index b6f7ea1280..4ee43c4ec6 100644 --- a/MixinServices/MixinServices/Database/User/DAO/UserDAO.swift +++ b/MixinServices/MixinServices/Database/User/DAO/UserDAO.swift @@ -36,12 +36,12 @@ public final class UserDAO: UserDatabaseDAO { } public func getExistUserIds(userIds: [String]) -> [String] { - guard userIds.count > 0 else { + guard !userIds.isEmpty else { return [] } return db.select(column: User.column(of: .userId), - from: User.self, - where: userIds.contains(User.column(of: .userId))) + from: User.self, + where: userIds.contains(User.column(of: .userId))) } public func getBlockUsers() -> [UserItem] { @@ -197,13 +197,13 @@ public final class UserDAO: UserDatabaseDAO { } public func mentionRepresentation(identityNumbers: [String]) -> [String: String] { - guard identityNumbers.count > 0 else { + guard !identityNumbers.isEmpty else { return [:] } return db.select(keyColumn: User.column(of: .identityNumber), - valueColumn: User.column(of: .fullName), - from: User.self, - where: identityNumbers.contains(User.column(of: .identityNumber))) + valueColumn: User.column(of: .fullName), + from: User.self, + where: identityNumbers.contains(User.column(of: .identityNumber))) } public func userIds(identityNumbers: [String]) -> [String] { diff --git a/MixinServices/MixinServices/Database/User/UserDatabase.swift b/MixinServices/MixinServices/Database/User/UserDatabase.swift index a476be4b7c..f88b05d3ee 100644 --- a/MixinServices/MixinServices/Database/User/UserDatabase.swift +++ b/MixinServices/MixinServices/Database/User/UserDatabase.swift @@ -413,16 +413,16 @@ public final class UserDatabase: Database { } } - migrator.registerMigration("batch_process_messages") { (db) in - try db.execute(sql: "DROP TRIGGER IF EXISTS conversation_last_message_update") - } - migrator.registerMigration("index_optimization_2") { (db) in try db.execute(sql: "DROP INDEX IF EXISTS messages_unread_indexs") try db.execute(sql: "DROP INDEX IF EXISTS messages_user_indexs") try db.execute(sql: "CREATE INDEX IF NOT EXISTS index_messages_pick ON messages(conversation_id, status, user_id, created_at)") } + migrator.registerMigration("batch_process_messages") { (db) in + try db.execute(sql: "DROP TRIGGER IF EXISTS conversation_last_message_update") + } + return migrator } From e957abe21783d1d8b3260df7405d5bcaffdc8566 Mon Sep 17 00:00:00 2001 From: wuyueyang Date: Wed, 17 Nov 2021 23:23:41 +0800 Subject: [PATCH 12/12] Add column of conversation_id with not null constraint --- MixinServices/MixinServices/Database/Task/TaskDatabase.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/MixinServices/MixinServices/Database/Task/TaskDatabase.swift b/MixinServices/MixinServices/Database/Task/TaskDatabase.swift index c6715af454..f42a11bafb 100644 --- a/MixinServices/MixinServices/Database/Task/TaskDatabase.swift +++ b/MixinServices/MixinServices/Database/Task/TaskDatabase.swift @@ -40,7 +40,7 @@ public class TaskDatabase: Database { let infos = try TableInfo.fetchAll(db, sql: "PRAGMA table_info(messages_blaze)") let columnNames = infos.map(\.name) if !columnNames.contains("conversation_id") { - try db.execute(sql: "ALTER TABLE messages_blaze ADD COLUMN conversation_id TEXT") + try db.execute(sql: "ALTER TABLE messages_blaze ADD COLUMN conversation_id TEXT NOT NULL DEFAULT ''") } try db.execute(sql: "CREATE INDEX IF NOT EXISTS index_conversation_messages ON messages_blaze(conversation_id, created_at)")