From c216daa7ba010d92ffb9d638ceffd156d65826cd Mon Sep 17 00:00:00 2001 From: Aravind Raveendran Date: Mon, 11 Mar 2024 12:11:45 +1100 Subject: [PATCH] Add 1.8.1 and Use SDK's new swift concurrency API's --- Package.resolved | 14 - Package.swift | 2 +- .../Builder/StreamSourceBuilder.swift | 11 +- .../Manager/SubscriptionManager.swift | 349 ++++++------------ .../DolbyIORTSCore/Model/StreamState.swift | 8 - Sources/DolbyIORTSCore/State/State.swift | 20 +- .../DolbyIORTSCore/State/StateMachine.swift | 41 +- .../DolbyIORTSCore/StreamOrchestrator.swift | 348 ++++++++--------- .../Private/ViewModels/StreamViewModel.swift | 10 +- .../VideoRendererViewModel.swift | 6 +- .../Screens/Media/StreamingScreen.swift | 2 +- 11 files changed, 328 insertions(+), 483 deletions(-) delete mode 100644 Package.resolved diff --git a/Package.resolved b/Package.resolved deleted file mode 100644 index 3af4f76..0000000 --- a/Package.resolved +++ /dev/null @@ -1,14 +0,0 @@ -{ - "pins" : [ - { - "identity" : "millicast-sdk-swift-package", - "kind" : "remoteSourceControl", - "location" : "https://github.com/millicast/millicast-sdk-swift-package", - "state" : { - "revision" : "001f8654ba31461ecf805ae8a15e4d92efa8064e", - "version" : "1.7.0" - } - } - ], - "version" : 2 -} diff --git a/Package.swift b/Package.swift index e5233c2..9858a94 100644 --- a/Package.swift +++ b/Package.swift @@ -19,7 +19,7 @@ let package = Package( targets: ["DolbyIORTSUIKit"]) ], dependencies: [ - .package(url: "https://github.com/millicast/millicast-sdk-swift-package", exact: "1.7.0") + .package(url: "https://github.com/millicast/millicast-sdk-swift-package", exact: "1.8.1") ], targets: [ // Targets are the basic building blocks of a package. A target can define a module or a test suite. diff --git a/Sources/DolbyIORTSCore/Builder/StreamSourceBuilder.swift b/Sources/DolbyIORTSCore/Builder/StreamSourceBuilder.swift index 344aeba..166f6bb 100644 --- a/Sources/DolbyIORTSCore/Builder/StreamSourceBuilder.swift +++ b/Sources/DolbyIORTSCore/Builder/StreamSourceBuilder.swift @@ -18,19 +18,12 @@ final class StreamSourceBuilder { let trackID: String let mediaType: StreamSource.MediaType - /// Initialises the Track Item, if possible from the passed-in String - /// - Parameter track: A string value passed in by the SDK and is expected to be of format `{mediaType}/{trackID}` init?(track: String) { - let trackInfoList = track.split(separator: "/") - - guard - trackInfoList.count == 2, - let mediaType = StreamSource.MediaType(rawValue: String(trackInfoList[0].lowercased())) - else { + guard let mediaType = StreamSource.MediaType(rawValue: track) else { return nil } self.mediaType = mediaType - self.trackID = String(trackInfoList[1]) + self.trackID = track } } diff --git a/Sources/DolbyIORTSCore/Manager/SubscriptionManager.swift b/Sources/DolbyIORTSCore/Manager/SubscriptionManager.swift index 14ae5e0..043e36e 100644 --- a/Sources/DolbyIORTSCore/Manager/SubscriptionManager.swift +++ b/Sources/DolbyIORTSCore/Manager/SubscriptionManager.swift @@ -7,47 +7,25 @@ import MillicastSDK import os import AVFAudio -protocol SubscriptionManagerDelegate: AnyObject { - func onSubscribed() - - func onSubscribedError(_ reason: String) - - func onVideoTrack(_ track: MCVideoTrack, withMid mid: String) - - func onAudioTrack(_ track: MCAudioTrack, withMid mid: String) - - func onActive(_ streamId: String, tracks: [String], sourceId: String?) - - func onInactive(_ streamId: String, sourceId: String?) - - func onStopped() - - func onLayers(_ mid: String, activeLayers: [MCLayerData], inactiveLayers: [String]) - - func onConnected() - - func onConnectionError(_ status: Int32, withReason reason: String) - - func onDisconnected() - - func onSignalingError(_ message: String) - - func onStatsReport(_ report: MCStatsReport) - - func onViewerCount(_ count: Int32) -} - protocol SubscriptionManagerProtocol: AnyObject { - var delegate: SubscriptionManagerDelegate? { get set } - - func connect(streamName: String, accountID: String, configuration: SubscriptionConfiguration) async -> Bool - func startSubscribe(configuration: SubscriptionConfiguration) async -> Bool - func stopSubscribe() async -> Bool - func addRemoteTrack(_ sourceBuilder: StreamSourceBuilder) - func projectVideo(for source: StreamSource, withQuality quality: VideoQuality) - func unprojectVideo(for source: StreamSource) - func projectAudio(for source: StreamSource) - func unprojectAudio(for source: StreamSource) + + var state: AsyncStream { get } + var statsReport: AsyncStream { get } + var activityStream: AsyncStream { get } + var tracks: AsyncStream { get } + var layers: AsyncStream { get } + var viewerCount: AsyncStream { get } + + static func makeSubscriptionManager(configuration: SubscriptionConfiguration) -> Self + + func connect(streamName: String, accountID: String) async throws -> Bool + func subscribe() async throws -> Bool + func unubscribeAndDisconnect() async throws -> Bool + func addRemoteTrack(_ sourceBuilder: StreamSourceBuilder) async + func projectVideo(for source: StreamSource, withQuality quality: VideoQuality) async throws + func unprojectVideo(for source: StreamSource) async throws + func projectAudio(for source: StreamSource) async throws + func unprojectAudio(for source: StreamSource) async throws } final class SubscriptionManager: SubscriptionManagerProtocol { @@ -58,127 +36,118 @@ final class SubscriptionManager: SubscriptionManagerProtocol { } private static let logger = Logger.make(category: String(describing: SubscriptionManager.self)) + + lazy var state: AsyncStream = subscriber.state() + + lazy var statsReport: AsyncStream = subscriber.statsReport() + + lazy var activityStream: AsyncStream = subscriber.activity() + + lazy var tracks: AsyncStream = subscriber.tracks() + + lazy var layers: AsyncStream = subscriber.layers() + + lazy var viewerCount: AsyncStream = subscriber.viewerCount() - private var subscriber: MCSubscriber! - - weak var delegate: SubscriptionManagerDelegate? - - func connect(streamName: String, accountID: String, configuration: SubscriptionConfiguration) async -> Bool { - guard let subscriber = makeSubscriber(with: configuration) else { - Self.logger.error("💼 Failed to initialise subscriber") - return false - } + private let configuration: SubscriptionConfiguration + private let subscriber: MCSubscriber + + static func makeSubscriptionManager(configuration: SubscriptionConfiguration) -> Self { + Self.init(configuration: configuration) + } + init(configuration: SubscriptionConfiguration) { + self.configuration = configuration + self.subscriber = MCSubscriber() + } + + func connect(streamName: String, accountID: String) async throws -> Bool { Self.logger.debug("💼 Connect with streamName & accountID") - - subscriber.setListener(self) - self.subscriber = subscriber - + guard streamName.count > 0, accountID.count > 0 else { Self.logger.error("💼 Invalid credentials passed to connect") return false } - let task = Task { [weak self] () -> Bool in - guard let self = self else { - return false - } - - guard !self.isSubscribed, !self.isConnected else { - Self.logger.error("💼 Subscriber has already connected or subscribed") - return false - } - - let credentials = self.makeCredentials(streamName: streamName, accountID: accountID, useDevelopmentServer: configuration.useDevelopmentServer) - - self.subscriber.setCredentials(credentials) + let isConnected = await subscriber.isConnected + let isSubscribed = await subscriber.isSubscribed + + guard !isSubscribed, !isConnected else { + Self.logger.error("💼 Subscriber has already connected or subscribed") + return false + } - let connectionOptions = MCConnectionOptions() - connectionOptions.autoReconnect = configuration.autoReconnect - - guard self.subscriber.connect(with: connectionOptions) else { - Self.logger.error("💼 Subscriber has failed to connect") - return false - } + let credentials = makeCredentials(streamName: streamName, accountID: accountID, useDevelopmentServer: configuration.useDevelopmentServer) - return true - } + try await subscriber.setCredentials(credentials) - return await task.value + let connectionOptions = MCConnectionOptions() + connectionOptions.autoReconnect = configuration.autoReconnect + + try await subscriber.connect(with: connectionOptions) + + Self.logger.debug("💼 Connect successful") + return true } - func startSubscribe(configuration: SubscriptionConfiguration) async -> Bool { - let task = Task { [weak self] () -> Bool in - Self.logger.debug("💼 Start subscribe") + func subscribe() async throws -> Bool { + Self.logger.debug("💼 Start subscribe") - guard let self = self else { - return false - } + let isConnected = await subscriber.isConnected - guard self.isConnected else { - Self.logger.error("💼 Subscriber hasn't completed connect to start subscribe") - return false - } - - guard !self.isSubscribed else { - Self.logger.error("💼 Subscriber has already subscribed") - return false - } + guard isConnected else { + Self.logger.error("💼 Subscriber hasn't completed connect to start subscribe") + return false + } - let options = MCClientOptions() - options.videoJitterMinimumDelayMs = Int32(configuration.videoJitterMinimumDelayInMs) - options.statsDelayMs = Int32(configuration.statsDelayMs) - if let rtcEventLogOutputPath = configuration.rtcEventLogPath { - options.rtcEventLogOutputPath = rtcEventLogOutputPath - } - options.disableAudio = configuration.disableAudio - options.forcePlayoutDelay = configuration.noPlayoutDelay + let isSubscribed = await subscriber.isSubscribed + guard !isSubscribed else { + Self.logger.error("💼 Subscriber has already subscribed") + return false + } - guard self.subscriber.subscribe(with: options) else { - Self.logger.error("💼 Subscribe call has failed") - return false - } - - return true + let options = MCClientOptions() + options.videoJitterMinimumDelayMs = Int32(configuration.videoJitterMinimumDelayInMs) + options.statsDelayMs = Int32(configuration.statsDelayMs) + if let rtcEventLogOutputPath = configuration.rtcEventLogPath { + options.rtcEventLogOutputPath = rtcEventLogOutputPath } + options.disableAudio = configuration.disableAudio + options.forcePlayoutDelay = configuration.noPlayoutDelay - return await task.value + await subscriber.enableStats(configuration.enableStats) + try await subscriber.subscribe(with: options) + + Self.logger.debug("💼 Subscribe successful") + return true } - func stopSubscribe() async -> Bool { - let task = Task { [weak self] () -> Bool in - Self.logger.debug("💼 Stop subscribe") - - guard let self = self, let subscriber = subscriber else { - return false - } + func unubscribeAndDisconnect() async throws -> Bool { + Self.logger.debug("💼 Stop subscribe") + + await subscriber.enableStats(false) + try await subscriber.unsubscribe() + try await subscriber.disconnect() + + return true + } - defer { - self.subscriber.setListener(nil) - self.subscriber = nil - } - - let unsubscribeResult = subscriber.unsubscribe() - if !unsubscribeResult { - Self.logger.error("💼 Failed to unsubscribe") - } - - let disconnectResult = subscriber.disconnect() - if !disconnectResult { - Self.logger.error("💼 Failed to disconnect") + func addRemoteTrack(_ sourceBuilder: StreamSourceBuilder) async { + Self.logger.debug("💼 Add remote track for source - \(sourceBuilder.sourceId), \(sourceBuilder.supportedTrackItems)") + + await withThrowingTaskGroup( + of: (Void).self + ) { [self] group in + for trackItem in sourceBuilder.supportedTrackItems { + group.addTask { + Self.logger.debug("💼 Add remote track for media type - \(trackItem.mediaType.rawValue)") + try await self.subscriber.addRemoteTrack(trackItem.mediaType.rawValue) + } } - - return disconnectResult && unsubscribeResult } - return await task.value } - func addRemoteTrack(_ sourceBuilder: StreamSourceBuilder) { - Self.logger.debug("💼 Add remote track for source - \(sourceBuilder.sourceId)") - sourceBuilder.supportedTrackItems.forEach { subscriber.addRemoteTrack($0.mediaType.rawValue) } - } - - func projectVideo(for source: StreamSource, withQuality quality: VideoQuality) { + func projectVideo(for source: StreamSource, withQuality quality: VideoQuality) async throws { let videoTrack = source.videoTrack let matchingVideoQuality = source.lowLevelVideoQualityList.matching(videoQuality: quality) @@ -190,16 +159,16 @@ final class SubscriptionManager: SubscriptionManagerProtocol { projectionData.trackId = videoTrack.trackInfo.trackID projectionData.layer = matchingVideoQuality?.layerData - subscriber.project(source.sourceId.value, withData: [projectionData]) + try await subscriber.project(source.sourceId.value ?? "", withData: [projectionData]) } - func unprojectVideo(for source: StreamSource) { + func unprojectVideo(for source: StreamSource) async throws { Self.logger.debug("💼 Unproject video for source \(source.sourceId)") let videoTrack = source.videoTrack - subscriber.unproject([videoTrack.trackInfo.mid]) + try await subscriber.unproject([videoTrack.trackInfo.mid]) } - func projectAudio(for source: StreamSource) { + func projectAudio(for source: StreamSource) async throws { Self.logger.debug("💼 Project audio for source \(source.sourceId)") guard let audioTrack = source.audioTracks.first else { return @@ -212,16 +181,16 @@ final class SubscriptionManager: SubscriptionManagerProtocol { projectionData.mid = audioTrack.trackInfo.mid projectionData.trackId = audioTrack.trackInfo.trackID - subscriber.project(source.sourceId.value, withData: [projectionData]) + try await subscriber.project(source.sourceId.value ?? "", withData: [projectionData]) } - func unprojectAudio(for source: StreamSource) { + func unprojectAudio(for source: StreamSource) async throws { Self.logger.debug("💼 Unproject audio for source \(source.sourceId)") guard let audioTrack = source.audioTracks.first else { return } - subscriber.unproject([audioTrack.trackInfo.mid]) + try await subscriber.unproject([audioTrack.trackInfo.mid]) } } @@ -229,10 +198,8 @@ final class SubscriptionManager: SubscriptionManagerProtocol { private extension SubscriptionManager { - func makeSubscriber(with configuration: SubscriptionConfiguration) -> MCSubscriber? { - let subscriber = MCSubscriber.create() - subscriber?.enableStats(configuration.enableStats) - + static func makeSubscriber(with configuration: SubscriptionConfiguration) -> MCSubscriber? { + let subscriber = MCSubscriber() return subscriber } @@ -246,93 +213,3 @@ private extension SubscriptionManager { return credentials } } - -// MARK: MCSubscriberListener implementation - -extension SubscriptionManager: MCSubscriberListener { - func onDisconnected() { - Self.logger.debug("💼 Delegate - onDisconnected") - delegate?.onDisconnected() - } - - func onSubscribed() { - Self.logger.debug("💼 Delegate - onSubscribed") - delegate?.onSubscribed() - } - - func onSubscribedError(_ reason: String) { - Self.logger.error("💼 Delegate - onSubscribedError \(reason)") - delegate?.onSubscribedError(reason) - } - - func onVideoTrack(_ track: MCVideoTrack, withMid mid: String) { - Self.logger.debug("💼 Delegate - onVideoTrack with mid \(mid)") - delegate?.onVideoTrack(track, withMid: mid) - } - - func onAudioTrack(_ track: MCAudioTrack, withMid mid: String) { - Self.logger.debug("💼 Delegate - onAudioTrack with mid \(mid)") - delegate?.onAudioTrack(track, withMid: mid) - } - - func onActive(_ streamId: String, tracks: [String], sourceId: String) { - Self.logger.debug("💼 Delegate - onActive with sourceId \(sourceId), tracks - \(tracks)") - delegate?.onActive(streamId, tracks: tracks, sourceId: sourceId) - } - - func onInactive(_ streamId: String, sourceId: String) { - Self.logger.debug("💼 Delegate - onInactive with sourceId \(sourceId)") - delegate?.onInactive(streamId, sourceId: sourceId) - } - - func onStopped() { - Self.logger.debug("💼 Delegate - onStopped") - delegate?.onStopped() - } - - func onVad(_ mid: String, sourceId: String) { - Self.logger.debug("💼 Delegate - onVad with mid \(mid), sourceId \(sourceId)") - } - - func onLayers(_ mid: String, activeLayers: [MCLayerData], inactiveLayers: [String]) { - Self.logger.debug("💼 Delegate - onLayers for mid - \(mid) with activeLayers \(activeLayers), inactiveLayers \(inactiveLayers)") - delegate?.onLayers(mid, activeLayers: activeLayers, inactiveLayers: inactiveLayers) - } - - func onConnected() { - Self.logger.debug("💼 Delegate - onConnected") - delegate?.onConnected() - } - - func onConnectionError(_ status: Int32, withReason reason: String) { - Self.logger.error("💼 Delegate - onConnectionError") - delegate?.onConnectionError(status, withReason: reason) - } - - func onSignalingError(_ message: String) { - Self.logger.error("💼 Delegate - onSignalingError") - delegate?.onSignalingError(message) - } - - func onStatsReport(_ report: MCStatsReport) { - Self.logger.debug("💼 Delegate - onStatsReport") - delegate?.onStatsReport(report) - } - - func onViewerCount(_ count: Int32) { - Self.logger.debug("💼 Delegate - onViewerCount") - delegate?.onViewerCount(count) - } -} - -// MARK: Helper functions - -private extension SubscriptionManager { - var isSubscribed: Bool { - subscriber.isSubscribed() - } - - var isConnected: Bool { - subscriber.isConnected() - } -} diff --git a/Sources/DolbyIORTSCore/Model/StreamState.swift b/Sources/DolbyIORTSCore/Model/StreamState.swift index 9730cab..b5fcba9 100644 --- a/Sources/DolbyIORTSCore/Model/StreamState.swift +++ b/Sources/DolbyIORTSCore/Model/StreamState.swift @@ -6,9 +6,7 @@ import Foundation public enum StreamState: Equatable { case disconnected - case connecting case connected - case subscribing case subscribed(sources: [StreamSource], numberOfStreamViewers: Int) case stopped case error(StreamError) @@ -18,9 +16,6 @@ public enum StreamState: Equatable { case .disconnected: self = .disconnected - case .connecting: - self = .connecting - case .connected: self = .connected @@ -40,9 +35,6 @@ public enum StreamState: Equatable { case let .error(state): self = .error(state.error) - - case .subscribing: - self = .subscribing } } } diff --git a/Sources/DolbyIORTSCore/State/State.swift b/Sources/DolbyIORTSCore/State/State.swift index 94c6447..1ed7add 100644 --- a/Sources/DolbyIORTSCore/State/State.swift +++ b/Sources/DolbyIORTSCore/State/State.swift @@ -17,9 +17,7 @@ struct AudioTrackAndMid { enum State: CustomStringConvertible { case disconnected - case connecting case connected - case subscribing case subscribed(SubscribedState) case stopped case error(ErrorState) @@ -28,12 +26,8 @@ enum State: CustomStringConvertible { switch self { case .disconnected: return "disconnected" - case .connecting: - return "connecting" case .connected: return "connected" - case .subscribing: - return "subscribing" case .subscribed: return "subscribed" case .stopped: @@ -49,27 +43,21 @@ struct SubscribedState { private(set) var streamSourceBuilders: [StreamSourceBuilder] private(set) var numberOfStreamViewers: Int private(set) var streamingStats: AllStreamStatistics? - private(set) var cachedSourceZeroVideoTrackAndMid: VideoTrackAndMid? - private(set) var cachedSourceZeroAudioTrackAndMid: AudioTrackAndMid? private(set) var configuration: SubscriptionConfiguration - init(cachedVideoTrackDetail: VideoTrackAndMid?, cachedAudioTrackDetail: AudioTrackAndMid?, configuration: SubscriptionConfiguration) { - cachedSourceZeroVideoTrackAndMid = cachedVideoTrackDetail - cachedSourceZeroAudioTrackAndMid = cachedAudioTrackDetail + init(configuration: SubscriptionConfiguration) { self.configuration = configuration streamSourceBuilders = [] numberOfStreamViewers = 0 } - mutating func add(streamId: String, sourceId: String?, tracks: [String]) { + mutating func add(streamId: String, sourceId: String?, tracks: [String], cachedVideoTrackDetail: VideoTrackAndMid? = nil, cachedAudioTrackDetail: AudioTrackAndMid? = nil) { let streamSourceBuilder = StreamSourceBuilder(streamId: streamId, sourceId: sourceId, tracks: tracks) - if let videoTrackAndMid = cachedSourceZeroVideoTrackAndMid { + if let videoTrackAndMid = cachedVideoTrackDetail { streamSourceBuilder.addVideoTrack(videoTrackAndMid.videoTrack, mid: videoTrackAndMid.mid) - cachedSourceZeroVideoTrackAndMid = nil } - if let audioTrackAndMid = cachedSourceZeroAudioTrackAndMid { + if let audioTrackAndMid = cachedAudioTrackDetail { streamSourceBuilder.addAudioTrack(audioTrackAndMid.audioTrack, mid: audioTrackAndMid.mid) - cachedSourceZeroAudioTrackAndMid = nil } streamSourceBuilders.append(streamSourceBuilder) } diff --git a/Sources/DolbyIORTSCore/State/StateMachine.swift b/Sources/DolbyIORTSCore/State/StateMachine.swift index deee425..e22a198 100644 --- a/Sources/DolbyIORTSCore/State/StateMachine.swift +++ b/Sources/DolbyIORTSCore/State/StateMachine.swift @@ -30,11 +30,6 @@ final class StateMachine { func startConnection(streamName: String, accountID: String, configuration: SubscriptionConfiguration) { self.configuration = configuration - currentState = .connecting - } - - func startSubscribe() { - currentState = .subscribing } func stopSubscribe() { @@ -62,7 +57,12 @@ final class StateMachine { } func onConnected() { - currentState = .connected + switch currentState { + case .connected, .subscribed: + break + default: + currentState = .connected + } } func onConnectionError(_ status: Int32, withReason reason: String) { @@ -77,15 +77,7 @@ final class StateMachine { if case .subscribed = currentState { return } - currentState = .subscribed( - .init( - cachedVideoTrackDetail: cachedSourceZeroVideoTrackAndMid, - cachedAudioTrackDetail: cachedSourceZeroAudioTrackAndMid, - configuration: configuration - ) - ) - cachedSourceZeroAudioTrackAndMid = nil - cachedSourceZeroAudioTrackAndMid = nil + currentState = .subscribed(.init(configuration: configuration)) } func onSubscribedError(_ reason: String) { @@ -98,15 +90,27 @@ final class StateMachine { func onActive(_ streamId: String, tracks: [String], sourceId: String?) { // This is a workaround for an SDK behaviour where the some `onActive` callbacks arrive even before the `onSubscribed` - // In this case it's safe to assume a state change to `.subscribed` provided the current state is `.subscribing` - if case .subscribing = currentState { + // In this case it's safe to assume a state change to `.subscribed` provided the current state is `.connected` + if case .connected = currentState { // Mimic an `onSubscribed` callback onSubscribed() } switch currentState { case var .subscribed(state): - state.add(streamId: streamId, sourceId: sourceId, tracks: tracks) + if let sourceId = sourceId, !sourceId.isEmpty { + state.add(streamId: streamId, sourceId: sourceId, tracks: tracks) + } else { + state.add( + streamId: streamId, + sourceId: sourceId, + tracks: tracks, + cachedVideoTrackDetail: cachedSourceZeroVideoTrackAndMid, + cachedAudioTrackDetail: cachedSourceZeroAudioTrackAndMid + ) + cachedSourceZeroAudioTrackAndMid = nil + cachedSourceZeroAudioTrackAndMid = nil + } self.currentState = .subscribed(state) default: Self.logger.error("🛑 Unexpected state on onActive - \(self.currentState.description)") @@ -177,6 +181,7 @@ final class StateMachine { } } + layersForSelection = Array(layersForSelection.prefix(3)) switch layersForSelection.count { case 2: streamTypes = [ diff --git a/Sources/DolbyIORTSCore/StreamOrchestrator.swift b/Sources/DolbyIORTSCore/StreamOrchestrator.swift index f92d170..82c8492 100644 --- a/Sources/DolbyIORTSCore/StreamOrchestrator.swift +++ b/Sources/DolbyIORTSCore/StreamOrchestrator.swift @@ -19,8 +19,6 @@ public final actor StreamOrchestrator { public static let shared: StreamOrchestrator = StreamOrchestrator() private let stateMachine: StateMachine = StateMachine(initialState: .disconnected) - private let subscriptionManager: SubscriptionManagerProtocol - private let rendererRegistry: RendererRegistryProtocol private var subscriptions: Set = [] private lazy var stateSubject: CurrentValueSubject = CurrentValueSubject(.disconnected) @@ -30,24 +28,18 @@ public final actor StreamOrchestrator { private var activeStreamDetail: StreamDetail? private let logHandler: MillicastLoggerHandler = .init() - private var subscriptionConfiguration: SubscriptionConfiguration = .init() - - private init() { - self.init( - subscriptionManager: SubscriptionManager(), - rendererRegistry: RendererRegistry() - ) - } + private var rendererRegistry: RendererRegistryProtocol? + private var subscriptionManager: SubscriptionManagerProtocol? + private var subscriptionConfiguration: SubscriptionConfiguration? - init( - subscriptionManager: SubscriptionManagerProtocol, - rendererRegistry: RendererRegistryProtocol - ) { - self.subscriptionManager = subscriptionManager - self.rendererRegistry = rendererRegistry - - self.subscriptionManager.delegate = self - + private var stateObservationTask: Task? + private var statsObservationTask: Task? + private var activityObservationTask: Task? + private var tracksObservationTask: Task? + private var layersObservationTask: Task? + private var viewerObservationTask: Task? + + init() { Utils.configureAudioSession() Task { [weak self] in @@ -56,40 +48,55 @@ public final actor StreamOrchestrator { } } - public func connect(streamName: String, accountID: String, configuration: SubscriptionConfiguration = .init()) async -> Bool { + public func connect(streamName: String, accountID: String, configuration: SubscriptionConfiguration = .init()) async throws -> Bool { Self.logger.debug("👮‍♂️ Start subscribe") logHandler.setLogFilePath(filePath: configuration.sdkLogPath) + let subscriptionManager = SubscriptionManager(configuration: configuration) + + self.observeSubscriptionManagerEvents() + self.subscriptionManager = subscriptionManager self.subscriptionConfiguration = configuration - + self.rendererRegistry = RendererRegistry() + async let startConnectionStateUpdate: Void = stateMachine.startConnection( streamName: streamName, accountID: accountID, configuration: configuration ) - async let startConnection = subscriptionManager.connect(streamName: streamName, accountID: accountID, configuration: configuration) + async let startConnection = subscriptionManager.connect(streamName: streamName, accountID: accountID) - let (_, connectionResult) = await (startConnectionStateUpdate, startConnection) + let (_, connectionResult) = try await (startConnectionStateUpdate, startConnection) if connectionResult { activeStreamDetail = StreamDetail(streamName: streamName, accountID: accountID) } else { activeStreamDetail = nil } - return connectionResult + + stateMachine.onConnected() + + let subscribeResult = try await subscriptionManager.subscribe() + + stateMachine.onSubscribed() + + return subscribeResult } - public func stopConnection() async -> Bool { + public func stopConnection() async throws -> Bool { Self.logger.debug("👮‍♂️ Stop subscribe") - reset() async let stopSubscribeOnStateMachine: Void = stateMachine.stopSubscribe() - async let resetRegistry: Void = rendererRegistry.reset() - async let stopSubscription: Bool = await subscriptionManager.stopSubscribe() - let (_, _, stopSubscribeResult) = await (stopSubscribeOnStateMachine, resetRegistry, stopSubscription) - return stopSubscribeResult + async let resetRegistry: Void? = rendererRegistry?.reset() + async let stopSubscription: Bool? = await subscriptionManager?.unubscribeAndDisconnect() + let (_, _, stopSubscribeResult) = try await (stopSubscribeOnStateMachine, resetRegistry, stopSubscription) + + reset() + return stopSubscribeResult ?? false } - public func playAudio(for source: StreamSource) async { + public func playAudio(for source: StreamSource) async throws { Self.logger.debug("👮‍♂️ Play Audio for source - \(String(describing: source.sourceId.value))") + guard let subscriptionManager else { return } + switch stateMachine.currentState { case let .subscribed(subscribedState): guard @@ -98,24 +105,33 @@ public final actor StreamOrchestrator { else { return } - for source in subscribedState.sources { - guard source.isPlayingAudio else { - continue + + await withThrowingTaskGroup(of: Void.self) { [self] group in + for source in subscribedState.sources { + guard source.isPlayingAudio else { + continue + } + + group.addTask { + self.stateMachine.setPlayingAudio(false, for: source) + try await subscriptionManager.unprojectAudio(for: source) + } } - stateMachine.setPlayingAudio(false, for: source) - subscriptionManager.unprojectAudio(for: source) + group.addTask { + try await subscriptionManager.projectAudio(for: source) + self.stateMachine.setPlayingAudio(true, for: source) + } } - subscriptionManager.projectAudio(for: source) - stateMachine.setPlayingAudio(true, for: source) default: return } } - public func stopAudio(for source: StreamSource) async { + public func stopAudio(for source: StreamSource) async throws { Self.logger.debug("👮‍♂️ Stop Audio for source - \(String(describing: source.sourceId.value))") + guard let subscriptionManager else { return } switch stateMachine.currentState { case let .subscribed(subscribedState): @@ -125,7 +141,7 @@ public final actor StreamOrchestrator { else { return } - subscriptionManager.unprojectAudio(for: matchingSource) + try await subscriptionManager.unprojectAudio(for: matchingSource) stateMachine.setPlayingAudio(false, for: matchingSource) default: @@ -133,8 +149,9 @@ public final actor StreamOrchestrator { } } - public func playVideo(for source: StreamSource, on renderer: StreamSourceViewRenderer, with quality: VideoQuality) async { + public func playVideo(for source: StreamSource, on renderer: StreamSourceViewRenderer, with quality: VideoQuality) async throws { Self.logger.debug("👮‍♂️ Play Video for source - \(String(describing: source.sourceId.value)) on renderer - \(renderer.id) with quality - \(quality.description)") + guard let subscriptionManager, let rendererRegistry else { return } switch stateMachine.currentState { case let .subscribed(subscribedState): @@ -146,11 +163,10 @@ public final actor StreamOrchestrator { let videoTrack = matchingSource.videoTrack.track rendererRegistry.registerRenderer(renderer, with: quality) let requestedVideoQuality = rendererRegistry.requestedVideoQuality(for: videoTrack) - let videoQualityToRender = matchingSource.videoQualityList.contains(requestedVideoQuality) ? - requestedVideoQuality : .auto + let videoQualityToRender = matchingSource.videoQualityList.contains(requestedVideoQuality) ? requestedVideoQuality : .auto if !matchingSource.isPlayingVideo || matchingSource.selectedVideoQuality != videoQualityToRender { - subscriptionManager.projectVideo(for: matchingSource, withQuality: videoQualityToRender) + try await subscriptionManager.projectVideo(for: matchingSource, withQuality: videoQualityToRender) stateMachine.setPlayingVideo(true, for: matchingSource) stateMachine.selectVideoQuality(videoQualityToRender, for: matchingSource) } @@ -160,8 +176,9 @@ public final actor StreamOrchestrator { } } - public func stopVideo(for source: StreamSource, on renderer: StreamSourceViewRenderer) async { + public func stopVideo(for source: StreamSource, on renderer: StreamSourceViewRenderer) async throws { Self.logger.debug("👮‍♂️ Stop Video for source - \(String(describing: source.sourceId.value)) on renderer - \(renderer.id)") + guard let subscriptionManager, let rendererRegistry else { return } switch stateMachine.currentState { case let .subscribed(subscribedState): @@ -176,7 +193,7 @@ public final actor StreamOrchestrator { let hasActiveRenderer = rendererRegistry.hasActiveRenderer(for: videoTrack) if !hasActiveRenderer { - subscriptionManager.unprojectVideo(for: source) + try await subscriptionManager.unprojectVideo(for: source) stateMachine.setPlayingVideo(false, for: matchingSource) stateMachine.onLayers( matchingSource.videoTrack.trackInfo.mid, @@ -185,11 +202,10 @@ public final actor StreamOrchestrator { ) } else { let requestedVideoQuality = rendererRegistry.requestedVideoQuality(for: videoTrack) - let videoQualityToRender = matchingSource.videoQualityList.contains(requestedVideoQuality) ? - requestedVideoQuality : .auto + let videoQualityToRender = matchingSource.videoQualityList.contains(requestedVideoQuality) ? requestedVideoQuality : .auto if matchingSource.selectedVideoQuality != videoQualityToRender { - subscriptionManager.projectVideo(for: matchingSource, withQuality: videoQualityToRender) + try await subscriptionManager.projectVideo(for: matchingSource, withQuality: videoQualityToRender) stateMachine.setPlayingVideo(true, for: matchingSource) stateMachine.selectVideoQuality(videoQualityToRender, for: matchingSource) } @@ -210,27 +226,24 @@ private extension StreamOrchestrator { Task { [weak self] in guard let self = self else { return } // Populate updates public facing states - await self.stateSubject.send(StreamState(state: state)) + let streamState = StreamState(state: state) + await self.stateSubject.send(streamState) } } .store(in: &subscriptions) } - func reconnectToStream(streamDetail: StreamDetail) async { + func reconnectToStream(streamDetail: StreamDetail) async throws { Self.logger.debug("👮‍♂️ Attempting a reconnect") - _ = await connect(streamName: streamDetail.streamName, accountID: streamDetail.accountID) - } - - func startSubscribe() async -> Bool { - stateMachine.startSubscribe() - return await subscriptionManager.startSubscribe(configuration: subscriptionConfiguration) + _ = try await connect(streamName: streamDetail.streamName, accountID: streamDetail.accountID) } - func stopAudio(for sourceId: String?) { + func stopAudio(for sourceId: String?) async throws { + guard let subscriptionManager else { return } switch stateSubject.value { case let .subscribed(sources: sources, numberOfStreamViewers: _): if let source = sources.first (where: { $0.sourceId == StreamSource.SourceId(id: sourceId) }), source.isPlayingAudio { - subscriptionManager.unprojectAudio(for: source) + try await subscriptionManager.unprojectAudio(for: source) } default: break } @@ -239,128 +252,119 @@ private extension StreamOrchestrator { func reset() { activeStreamDetail = nil logHandler.setLogFilePath(filePath: nil) - subscriptionConfiguration = .init() + subscriptionConfiguration = nil + subscriptionManager = nil + rendererRegistry = nil + + stateObservationTask?.cancel() + stateObservationTask = nil + + statsObservationTask?.cancel() + statsObservationTask = nil } } // MARK: SubscriptionManagerDelegate implementation -extension StreamOrchestrator: SubscriptionManagerDelegate { - - nonisolated func onDisconnected() { - Task { @StreamOrchestrator [weak self] in - guard let self = self else { return } - self.stateMachine.onDisconnected() - } - } +extension StreamOrchestrator { - nonisolated func onSubscribedError(_ reason: String) { - Task { @StreamOrchestrator [weak self] in - guard let self = self else { return } - self.stateMachine.onSubscribedError(reason) - } - } - - nonisolated func onSignalingError(_ message: String) { - Task { @StreamOrchestrator [weak self] in - guard let self = self else { return } - self.stateMachine.onSignalingError(message) - } - } - - nonisolated func onConnectionError(_ status: Int32, withReason reason: String) { - Task { @StreamOrchestrator [weak self] in - guard let self = self else { return } - self.stateMachine.onConnectionError(status, withReason: reason) - } - } - - nonisolated func onStopped() { - Task { @StreamOrchestrator [weak self] in - guard let self = self else { return } - self.stateMachine.onStopped() - } - } - - nonisolated func onConnected() { - Task { @StreamOrchestrator [weak self] in - guard let self = self else { return } - self.stateMachine.onConnected() - _ = await self.startSubscribe() - } - } - - nonisolated func onSubscribed() { - Task { @StreamOrchestrator [weak self] in - guard let self = self else { return } - self.stateMachine.onSubscribed() - } - } - - nonisolated func onVideoTrack(_ track: MCVideoTrack, withMid mid: String) { - Task { @StreamOrchestrator [weak self] in - guard let self = self else { return } - self.stateMachine.onVideoTrack(track, withMid: mid) - } - } - - nonisolated func onAudioTrack(_ track: MCAudioTrack, withMid mid: String) { - Task { @StreamOrchestrator [weak self] in - guard let self = self else { return } - self.stateMachine.onAudioTrack(track, withMid: mid) - } - } - - nonisolated public func onStatsReport(_ report: MCStatsReport) { - guard let streamingStats = AllStreamStatistics(report) else { - return - } - Task { @StreamOrchestrator [weak self] in - guard let self = self else { return } - self.stateMachine.onStatsReport(streamingStats) - } - } - - nonisolated func onViewerCount(_ count: Int32) { - Task { @StreamOrchestrator [weak self] in - guard let self = self else { return } - self.stateMachine.updateNumberOfStreamViewers(count) - } - } - - nonisolated func onLayers(_ mid: String, activeLayers: [MCLayerData], inactiveLayers: [String]) { - Task { @StreamOrchestrator [weak self] in - guard let self = self else { return } - self.stateMachine.onLayers(mid, activeLayers: activeLayers, inactiveLayers: inactiveLayers) - } - } - - nonisolated func onActive(_ streamId: String, tracks: [String], sourceId: String?) { - Task { @StreamOrchestrator [weak self] in - guard let self = self else { return } - self.stateMachine.onActive(streamId, tracks: tracks, sourceId: sourceId) - let stateMachineState = self.stateMachine.currentState - switch stateMachineState { - case let .subscribed(state): - guard let sourceBuilder = state.streamSourceBuilders.first(where: { $0.sourceId == StreamSource.SourceId(id: sourceId) }) else { - return + func observeSubscriptionManagerEvents() { + Task { + self.stateObservationTask = Task { @StreamOrchestrator [weak self] in + guard let self, let subscriptionManager = await self.subscriptionManager else { return } + for await state in subscriptionManager.state { + switch state { + case .connected: + self.stateMachine.onConnected() + + case .disconnected: + self.stateMachine.onDisconnected() + + case let .connectionError(status: status, reason: reason): + self.stateMachine.onConnectionError(status, withReason: reason) + + case .subscribed: + self.stateMachine.onSubscribed() + + case let .signalingError(reason: reason): + self.stateMachine.onSignalingError(reason) + } + } - self.subscriptionManager.addRemoteTrack(sourceBuilder) - - default: - return } - } - } - - nonisolated func onInactive(_ streamId: String, sourceId: String?) { - Task { @StreamOrchestrator [weak self] in - guard let self = self else { return } - // Unproject audio whose source is inactive - await self.stopAudio(for: sourceId) - - self.stateMachine.onInactive(streamId, sourceId: sourceId) + self.viewerObservationTask = Task { @StreamOrchestrator [weak self] in + guard let self, let subscriptionManager = await self.subscriptionManager else { return } + for await viewerCount in subscriptionManager.viewerCount { + self.stateMachine.updateNumberOfStreamViewers(viewerCount) + } + } + + self.activityObservationTask = Task { @StreamOrchestrator [weak self] in + guard let self, let subscriptionManager = await self.subscriptionManager else { return } + for await activity in subscriptionManager.activityStream { + switch activity { + case let .active(streamId: streamId, tracks: tracks, sourceId: sourceId): + self.stateMachine.onActive(streamId, tracks: tracks, sourceId: sourceId) + let stateMachineState = self.stateMachine.currentState + switch stateMachineState { + case let .subscribed(state): + guard let sourceBuilder = state.streamSourceBuilders.first(where: { $0.sourceId == StreamSource.SourceId(id: sourceId) }) else { + return + } + await subscriptionManager.addRemoteTrack(sourceBuilder) + + default: + return + } + + case let .inactive(streamId: streamId, sourceId: sourceId): + // Unproject audio whose source is inactive + try await self.stopAudio(for: sourceId) + + self.stateMachine.onInactive(streamId, sourceId: sourceId) + } + } + } + + self.tracksObservationTask = Task { @StreamOrchestrator [weak self] in + guard let self, let subscriptionManager = await self.subscriptionManager else { return } + + for await trackEvent in subscriptionManager.tracks { + switch trackEvent { + case let .audio(track: track, mid: mid): + self.stateMachine.onAudioTrack(track, withMid: mid) + + case let .video(track: track, mid: mid): + self.stateMachine.onVideoTrack(track, withMid: mid) + } + } + } + + self.layersObservationTask = Task { @StreamOrchestrator [weak self] in + guard let self, let subscriptionManager = await self.subscriptionManager else { return } + + for await layerEvent in subscriptionManager.layers { + self.stateMachine.onLayers(layerEvent.mid, activeLayers: layerEvent.activeLayers, inactiveLayers: layerEvent.inactiveLayers) + } + } + + self.statsObservationTask = Task { @StreamOrchestrator [weak self] in + guard let self, let subscriptionManager = await self.subscriptionManager else { return } + for await stats in subscriptionManager.statsReport { + guard let streamingStats = AllStreamStatistics(stats) else { return } + self.stateMachine.onStatsReport(streamingStats) + } + } + + _ = try await [ + self.stateObservationTask?.value, + self.tracksObservationTask?.value, + self.activityObservationTask?.value, + self.layersObservationTask?.value, + self.statsObservationTask?.value, + self.viewerObservationTask?.value + ] } } } diff --git a/Sources/DolbyIORTSUIKit/Private/ViewModels/StreamViewModel.swift b/Sources/DolbyIORTSUIKit/Private/ViewModels/StreamViewModel.swift index 0d0d046..afcca33 100644 --- a/Sources/DolbyIORTSUIKit/Private/ViewModels/StreamViewModel.swift +++ b/Sources/DolbyIORTSUIKit/Private/ViewModels/StreamViewModel.swift @@ -290,22 +290,22 @@ final class StreamViewModel: ObservableObject { } // swiftlint:enable function_body_length - func endStream() async { - _ = await streamOrchestrator.stopConnection() + func endStream() async throws { + _ = try await streamOrchestrator.stopConnection() subscriptions.removeAll() } func playAudio(for source: StreamSource) { Task { @StreamOrchestrator [weak self] in guard let self = self else { return } - await self.streamOrchestrator.playAudio(for: source) + try await self.streamOrchestrator.playAudio(for: source) } } func stopAudio(for source: StreamSource) { Task { @StreamOrchestrator [weak self] in guard let self = self else { return } - await self.streamOrchestrator.stopAudio(for: source) + try await self.streamOrchestrator.stopAudio(for: source) } } @@ -320,7 +320,7 @@ final class StreamViewModel: ObservableObject { switch state { case let .subscribed(sources: sources, numberOfStreamViewers: _): self.updateState(from: sources, settings: settings) - case .connecting, .subscribing, .connected: + case .connected: self.internalState = .loading case let .error(streamError): self.internalState = .error(ErrorViewModel(error: streamError)) diff --git a/Sources/DolbyIORTSUIKit/Private/Views/VideoRenderer/VideoRendererViewModel.swift b/Sources/DolbyIORTSUIKit/Private/Views/VideoRenderer/VideoRendererViewModel.swift index 75e8643..c04f3ba 100644 --- a/Sources/DolbyIORTSUIKit/Private/Views/VideoRenderer/VideoRendererViewModel.swift +++ b/Sources/DolbyIORTSUIKit/Private/Views/VideoRenderer/VideoRendererViewModel.swift @@ -40,9 +40,9 @@ final class VideoRendererViewModel: ObservableObject { self.streamOrchestrator = streamOrchestrator } - func playVideo(on viewRenderer: StreamSourceViewRenderer, quality: VideoQuality? = nil) { + func playVideo(on viewRenderer: StreamSourceViewRenderer, quality: VideoQuality? = nil) { Task { @StreamOrchestrator in - await self.streamOrchestrator.playVideo( + try await self.streamOrchestrator.playVideo( for: streamSource, on: viewRenderer, with: quality ?? videoQuality @@ -52,7 +52,7 @@ final class VideoRendererViewModel: ObservableObject { func stopVideo(on viewRenderer: StreamSourceViewRenderer) { Task { @StreamOrchestrator in - await self.streamOrchestrator.stopVideo( + try await self.streamOrchestrator.stopVideo( for: streamSource, on: viewRenderer ) diff --git a/Sources/DolbyIORTSUIKit/Public/Screens/Media/StreamingScreen.swift b/Sources/DolbyIORTSUIKit/Public/Screens/Media/StreamingScreen.swift index 72f2fe7..0aaed29 100644 --- a/Sources/DolbyIORTSUIKit/Public/Screens/Media/StreamingScreen.swift +++ b/Sources/DolbyIORTSUIKit/Public/Screens/Media/StreamingScreen.swift @@ -226,7 +226,7 @@ extension StreamingScreen { func endStream() { onClose() Task { - await viewModel.endStream() + try await viewModel.endStream() } } }