From 9623b18bcd1e978f72c264be941a55b2358e15ee Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Fri, 14 Jun 2024 16:35:15 +0900 Subject: [PATCH 1/4] ClusterD PoC which serves as seed node --- Package.swift | 9 +- Sources/Clusterd/boot+ClusterD.swift | 46 ++++++++++ .../TestProbes+Receptionist.swift | 2 +- .../Cluster/DiscoveryShell.swift | 6 ++ .../ClusterSystem+Clusterd.swift | 90 +++++++++++++++++++ .../DistributedCluster/ClusterSystem.swift | 5 ++ .../ClusterSystemSettings.swift | 42 ++++++++- .../MultiNodeTestConductor.swift | 1 - .../MultiNodeTestKit/MultiNodeTestKit.swift | 7 +- .../boot+MultiNodeTestKitRunner+Exec.swift | 44 ++++----- .../Daemon/DaemonJoiningClusteredTests.swift | 58 ++++++++++++ 11 files changed, 282 insertions(+), 28 deletions(-) create mode 100644 Sources/Clusterd/boot+ClusterD.swift create mode 100644 Sources/DistributedCluster/ClusterSystem+Clusterd.swift create mode 100644 Tests/DistributedClusterTests/Cluster/Daemon/DaemonJoiningClusteredTests.swift diff --git a/Package.swift b/Package.swift index 901744248..4faa4b4f7 100644 --- a/Package.swift +++ b/Package.swift @@ -109,12 +109,19 @@ var targets: [PackageDescription.Target] = [ // Depend on tests to run: "DistributedActorsMultiNodeTests", - // Dependencies: "MultiNodeTestKit", .product(name: "ArgumentParser", package: "swift-argument-parser"), ] ), + .executableTarget( + name: "Clusterd", + dependencies: [ + "DistributedCluster", + .product(name: "ArgumentParser", package: "swift-argument-parser"), + ] + ), + // ==== ------------------------------------------------------------------------------------------------------------ // MARK: Multi Node Tests diff --git a/Sources/Clusterd/boot+ClusterD.swift b/Sources/Clusterd/boot+ClusterD.swift new file mode 100644 index 000000000..c589ba989 --- /dev/null +++ b/Sources/Clusterd/boot+ClusterD.swift @@ -0,0 +1,46 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2020-2024 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import DistributedCluster + +import ArgumentParser + +@main +struct ClusterDBoot: AsyncParsableCommand { + @Option(name: .shortAndLong, help: "The port to bind the cluster daemon on.") + var port: Int = ClusterDaemon.defaultEndpoint.port + + @Option(help: "The host address to bid the cluster daemon on.") + var host: String = ClusterDaemon.defaultEndpoint.host + + mutating func run() async throws { + let daemon = await ClusterSystem.startClusterDaemon(configuredWith: self.configure) + + #if DEBUG + daemon.system.log.warning("RUNNING ClusterD DEBUG binary, operation is likely to be negatively affected. Please build/run the ClusterD process using '-c release' configuration!") + #endif + + try await daemon.system.park() + } + + func configure(_ settings: inout ClusterSystemSettings) { + // other nodes will be discovering us, not the opposite + settings.discovery = .init(static: []) + + settings.endpoint = Cluster.Endpoint( + systemName: "clusterd", + host: host, + port: port) + } +} \ No newline at end of file diff --git a/Sources/DistributedActorsTestKit/TestProbes+Receptionist.swift b/Sources/DistributedActorsTestKit/TestProbes+Receptionist.swift index 055e4b1db..75f96950a 100644 --- a/Sources/DistributedActorsTestKit/TestProbes+Receptionist.swift +++ b/Sources/DistributedActorsTestKit/TestProbes+Receptionist.swift @@ -25,7 +25,7 @@ extension ActorTestProbe where Message == _Reception.Listing<_ActorRef> public func eventuallyExpectListing( expected: Set<_ActorRef>, within timeout: Duration, verbose: Bool = false, - file: StaticString = #filePath, line: UInt = #line, column: UInt = #column + file: StaticString = #fileID, line: UInt = #line, column: UInt = #column ) throws { do { let listing = try self.fishForMessages(within: timeout, file: file, line: line) { diff --git a/Sources/DistributedCluster/Cluster/DiscoveryShell.swift b/Sources/DistributedCluster/Cluster/DiscoveryShell.swift index b57687545..9b535f996 100644 --- a/Sources/DistributedCluster/Cluster/DiscoveryShell.swift +++ b/Sources/DistributedCluster/Cluster/DiscoveryShell.swift @@ -34,6 +34,12 @@ final class DiscoveryShell { var behavior: _Behavior { .setup { context in + + // FIXME: should have a behavior to bridge the async world... + context.log.info("Initializing discovery: \(self.settings.implementation)") + self.settings.initialize(context.system) + context.log.info("Initializing discovery, done.") + self.subscription = self.settings.subscribe(onNext: { result in switch result { case .success(let instances): diff --git a/Sources/DistributedCluster/ClusterSystem+Clusterd.swift b/Sources/DistributedCluster/ClusterSystem+Clusterd.swift new file mode 100644 index 000000000..6af20d533 --- /dev/null +++ b/Sources/DistributedCluster/ClusterSystem+Clusterd.swift @@ -0,0 +1,90 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Atomics +import Backtrace +import CDistributedActorsMailbox +import Dispatch +@_exported import Distributed +import DistributedActorsConcurrencyHelpers +import Foundation // for UUID +import Logging +import NIO + +extension ClusterSystem { + public static func startClusterDaemon(configuredWith configureSettings: (inout ClusterSystemSettings) -> Void = { _ in () }) async -> ClusterDaemon { + let system = await ClusterSystem("clusterd") { settings in + settings.endpoint = ClusterDaemon.defaultEndpoint + configureSettings(&settings) + } + + return ClusterDaemon(system: system) + } +} + +public struct ClusterDaemon { + public let system: ClusterSystem + public var settings: ClusterSystemSettings { + system.settings + } + + public init(system: ClusterSystem) { + self.system = system + } +} + +extension ClusterDaemon { + + /// Suspends until the ``ClusterSystem`` is terminated by a call to ``shutdown()``. + public var terminated: Void { + get async throws { + try await self.system.terminated + } + } + + /// Returns `true` if the system was already successfully terminated (i.e. awaiting ``terminated`` would resume immediately). + public var isTerminated: Bool { + self.system.isTerminated + } + + /// Forcefully stops this actor system and all actors that live within it. + /// This is an asynchronous operation and will be executed on a separate thread. + /// + /// You can use `shutdown().wait()` to synchronously await on the system's termination, + /// or provide a callback to be executed after the system has completed it's shutdown. + /// + /// - Returns: A `Shutdown` value that can be waited upon until the system has completed the shutdown. + @discardableResult + public func shutdown() throws -> ClusterSystem.Shutdown { + try self.system.shutdown() + } +} + +extension ClusterDaemon { + /// The default endpoint + public static let defaultEndpoint = Cluster.Endpoint(host: "127.0.0.1", port: 3137) +} + +internal distributed actor ClusterDaemonServant { + typealias ActorSystem = ClusterSystem + + @ActorID.Metadata(\.wellKnown) + public var wellKnownName: String + + init(system: ClusterSystem) async { + self.actorSystem = system + self.wellKnownName = "$cluster-daemon-servant" + } + +} \ No newline at end of file diff --git a/Sources/DistributedCluster/ClusterSystem.swift b/Sources/DistributedCluster/ClusterSystem.swift index 7f9429a4d..33d49f353 100644 --- a/Sources/DistributedCluster/ClusterSystem.swift +++ b/Sources/DistributedCluster/ClusterSystem.swift @@ -464,6 +464,7 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable { public func wait() async throws { // TODO: implement without blocking the internal task; try await Task.detached { + print("BLOCKING ON receptacle") if let error = self.receptacle.wait() { throw error } @@ -1049,6 +1050,10 @@ extension ClusterSystem { if let wellKnownName = actor.id.metadata.wellKnown { self._managedWellKnownDistributedActors[wellKnownName] = actor } + +// if let receptionID = actor.id.metadata.receptionID { +// self.receptionist.checkIn(actor) +// } } /// Advertise to the cluster system that a "well known" distributed actor has become ready. diff --git a/Sources/DistributedCluster/ClusterSystemSettings.swift b/Sources/DistributedCluster/ClusterSystemSettings.swift index 330f3229d..414fff58e 100644 --- a/Sources/DistributedCluster/ClusterSystemSettings.swift +++ b/Sources/DistributedCluster/ClusterSystemSettings.swift @@ -404,6 +404,7 @@ protocol ClusterSystemInstrumentationProvider { /// all the nodes of an existing cluster. public struct ServiceDiscoverySettings { let implementation: ServiceDiscoveryImplementation + private let _initialize: (ClusterSystem) -> Void private let _subscribe: (@escaping (Result<[Cluster.Endpoint], Error>) -> Void, @escaping (CompletionReason) -> Void) -> CancellationToken? public init(_ implementation: Discovery, service: S) @@ -411,24 +412,57 @@ public struct ServiceDiscoverySettings { S == Discovery.Service { self.implementation = .dynamic(AnyServiceDiscovery(implementation)) + self._initialize = { _ in } self._subscribe = { onNext, onComplete in implementation.subscribe(to: service, onNext: onNext, onComplete: onComplete) } } + init(clusterdEndpoint: Cluster.Endpoint) { + self.implementation = .clusterDaemon(clusterdEndpoint) + + self._initialize = { system in + system.log.info("Joining [clusterd] at \(clusterdEndpoint)") + system.cluster.join(endpoint: clusterdEndpoint) + } + self._subscribe = { onNext, onComplete in + return nil + } + } + + /// Locate the default `ClusterD` process and use it for discovering cluster nodes. + /// + /// + public static var clusterd: Self { + get { + Self.clusterd(endpoint: nil) + } + } + + /// Locate the default `ClusterD` process and use it for discovering cluster nodes. + public static func clusterd(endpoint: Cluster.Endpoint?) -> Self { + return ServiceDiscoverySettings(clusterdEndpoint: endpoint ?? ClusterDaemon.defaultEndpoint) + } + public init(_ implementation: Discovery, service: S, mapInstanceToNode transformer: @escaping (Discovery.Instance) throws -> Cluster.Endpoint) where Discovery: ServiceDiscovery, S == Discovery.Service { let mappedDiscovery: MapInstanceServiceDiscovery = implementation.mapInstance(transformer) self.implementation = .dynamic(AnyServiceDiscovery(mappedDiscovery)) + self._initialize = { _ in } self._subscribe = { onNext, onComplete in mappedDiscovery.subscribe(to: service, onNext: onNext, onComplete: onComplete) } } + public static func `seed`(nodes: Set) -> Self { + .init(static: nodes) + } + public init(static nodes: Set) { self.implementation = .static(nodes) + self._initialize = { _ in } self._subscribe = { onNext, _ in // Call onNext once and never again since the list of nodes doesn't change onNext(.success(Array(nodes))) @@ -441,12 +475,18 @@ public struct ServiceDiscoverySettings { /// Similar to `ServiceDiscovery.subscribe` however it allows the handling of the listings to be generic and handled by the cluster system. /// This function is only intended for internal use by the `DiscoveryShell`. - func subscribe(onNext nextResultHandler: @escaping (Result<[Cluster.Endpoint], Error>) -> Void, onComplete completionHandler: @escaping (CompletionReason) -> Void) -> CancellationToken? { + func subscribe(onNext nextResultHandler: @escaping (Result<[Cluster.Endpoint], Error>) -> Void, + onComplete completionHandler: @escaping (CompletionReason) -> Void) -> CancellationToken? { self._subscribe(nextResultHandler, completionHandler) } + func initialize(_ system: ClusterSystem) -> Void { + self._initialize(system) + } + enum ServiceDiscoveryImplementation { case `static`(Set) + case clusterDaemon(Cluster.Endpoint) case dynamic(AnyServiceDiscovery) } } diff --git a/Sources/MultiNodeTestKit/MultiNodeTestConductor.swift b/Sources/MultiNodeTestKit/MultiNodeTestConductor.swift index 0dc94a97c..bba4009fb 100644 --- a/Sources/MultiNodeTestKit/MultiNodeTestConductor.swift +++ b/Sources/MultiNodeTestKit/MultiNodeTestConductor.swift @@ -159,7 +159,6 @@ extension MultiNodeTestConductor { self.log.warning("Checkpoint failed, informing node [\(node)]", metadata: [ "checkPoint/node": "\(node)", "checkPoint/error": "\(checkPointError)", - "checkPoint/error": "\(checkPointError)", ]) cc.resume(throwing: checkPointError) diff --git a/Sources/MultiNodeTestKit/MultiNodeTestKit.swift b/Sources/MultiNodeTestKit/MultiNodeTestKit.swift index 5b5385c56..83806406c 100644 --- a/Sources/MultiNodeTestKit/MultiNodeTestKit.swift +++ b/Sources/MultiNodeTestKit/MultiNodeTestKit.swift @@ -36,6 +36,7 @@ public struct MultiNodeTest { public let crashRegex: String? public let runTest: (any MultiNodeTestControlProtocol) async throws -> Void public let configureActorSystem: (inout ClusterSystemSettings) -> Void + public let startNode: (ClusterSystemSettings) async throws -> ClusterSystem public let configureMultiNodeTest: (inout MultiNodeTestSettings) -> Void public let makeControl: (String) -> any MultiNodeTestControlProtocol @@ -51,6 +52,7 @@ public struct MultiNodeTest { } self.configureActorSystem = TestSuite.configureActorSystem + self.startNode = TestSuite.startNode self.configureMultiNodeTest = TestSuite.configureMultiNodeTest self.makeControl = { nodeName -> Control in @@ -80,6 +82,7 @@ public protocol MultiNodeTestSuite { init() associatedtype Nodes: MultiNodeNodes static func configureActorSystem(settings: inout ClusterSystemSettings) +// static func startNode(settings: ClusterSystemSettings) -> ClusterSystem static func configureMultiNodeTest(settings: inout MultiNodeTestSettings) } @@ -88,8 +91,8 @@ extension MultiNodeTestSuite { "\(Self.self)".split(separator: ".").last.map(String.init) ?? "" } - public func configureActorSystem(settings: inout ClusterSystemSettings) { - // do nothing by default + public static func startNode(settings: ClusterSystemSettings) async throws -> ClusterSystem { + await ClusterSystem(settings: settings) } var nodeNames: [String] { diff --git a/Sources/MultiNodeTestKitRunner/boot+MultiNodeTestKitRunner+Exec.swift b/Sources/MultiNodeTestKitRunner/boot+MultiNodeTestKitRunner+Exec.swift index 85867bc25..9536193e6 100644 --- a/Sources/MultiNodeTestKitRunner/boot+MultiNodeTestKitRunner+Exec.swift +++ b/Sources/MultiNodeTestKitRunner/boot+MultiNodeTestKitRunner+Exec.swift @@ -48,28 +48,28 @@ extension MultiNodeTestKitRunnerBoot { ) } - let actorSystem = await ClusterSystem(nodeName) { settings in - settings.bindHost = myNode.host - settings.bindPort = myNode.port - - /// By default get better backtraces in case we crash: - settings.installSwiftBacktrace = true - - /// Configure a nicer logger, that pretty prints metadata and also includes source location of logs - if multiNodeSettings.installPrettyLogger { - settings.logging.baseLogger = Logger(label: nodeName, factory: { label in - PrettyMultiNodeLogHandler(nodeName: label, settings: multiNodeSettings.logCapture) - }) - } - - // we use the singleton to implement a simple Coordinator - // TODO: if the node hosting the coordinator dies we'd potentially have some races at hand - // there's a few ways to solve this... but for now this is good enough. - settings += ClusterSingletonPlugin() - - multiNodeTest.configureActorSystem(&settings) - } - control._actorSystem = actorSystem + var settings = ClusterSystemSettings(name: nodeName) + settings.bindHost = myNode.host + settings.bindPort = myNode.port + + /// By default get better backtraces in case we crash: + settings.installSwiftBacktrace = true + + /// Configure a nicer logger, that pretty prints metadata and also includes source location of logs + if multiNodeSettings.installPrettyLogger { + settings.logging.baseLogger = Logger(label: nodeName, factory: { label in + PrettyMultiNodeLogHandler(nodeName: label, settings: multiNodeSettings.logCapture) + }) + } + + // we use the singleton to implement a simple Coordinator + // TODO: if the node hosting the coordinator dies we'd potentially have some races at hand + // there's a few ways to solve this... but for now this is good enough. + settings += ClusterSingletonPlugin() + multiNodeTest.configureActorSystem(&settings) + + let actorSystem = try await multiNodeTest.startNode(settings) + control._actorSystem = actorSystem let signalQueue = DispatchQueue(label: "multi.node.\(multiNodeTest.testSuiteName).\(multiNodeTest.testName).\(nodeName).SignalHandlerQueue") let signalSource = DispatchSource.makeSignalSource(signal: SIGINT, queue: signalQueue) diff --git a/Tests/DistributedClusterTests/Cluster/Daemon/DaemonJoiningClusteredTests.swift b/Tests/DistributedClusterTests/Cluster/Daemon/DaemonJoiningClusteredTests.swift new file mode 100644 index 000000000..75a441ece --- /dev/null +++ b/Tests/DistributedClusterTests/Cluster/Daemon/DaemonJoiningClusteredTests.swift @@ -0,0 +1,58 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2020-2024 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Distributed +import DistributedActorsTestKit +@testable import DistributedCluster +import XCTest + +final class DaemonJoiningClusteredTests: ClusteredActorSystemsXCTestCase { + override func configureLogCapture(settings: inout LogCapture.Settings) { + settings.excludeActorPaths = [ + "/system/cluster/swim", + "/system/cluster/gossip", + "/system/replicator", + "/system/cluster", + "/system/clusterEvents", + "/system/cluster/leadership", + "/system/nodeDeathWatcher", + + "/dead/system/receptionist-ref", // FIXME(distributed): it should simply be quiet + ] + settings.excludeGrep = [ + "timer", + ] + } + + var daemon: ClusterDaemon? + + override func tearDown() async throws { + try await super.tearDown() + try await self.daemon?.shutdown().wait() + } + + func test_shouldPerformLikeASeedNode() async throws { + self.daemon = await ClusterSystem.startClusterDaemon() + + let first = await self.setUpNode("first") { settings in + settings.discovery = .clusterd + } + let second = await self.setUpNode("second") { settings in + settings.discovery = .clusterd + } + + try await ensureNodes(atLeast: .up, nodes: [first.cluster.node, second.cluster.node]) + } + +} From 8e0cdd8748da28699900e493e1c0033a52e8f670 Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Fri, 19 Jul 2024 17:43:27 +0900 Subject: [PATCH 2/4] poc of distributed actor gossiper; the algorithm can be improved independent of user code --- .../MultiNode+SimpleGossipTests.swift | 186 ++++++++++++++++++ Samples/Package.swift | 8 + Samples/Sources/SimpleGossip/Gossiper.swift | 112 +++++++++++ .../GreetingGossip+Messages.swift | 38 ++++ .../Sources/SimpleGossip/GreetingGossip.swift | 79 ++++++++ .../SimpleGossip/SamplePrettyLogHandler.swift | 150 ++++++++++++++ .../Sources/SimpleGossip/SimpleGossip.swift | 48 +++++ Samples/Sources/SimpleGossip/boot.swift | 52 +++++ .../DistributedCluster/ActorMetadata.swift | 4 +- .../OperationLogDistributedReceptionist.swift | 28 +-- .../DistributedReceptionist.swift | 20 +- .../MultiNode+TestSuites.swift | 1 + 12 files changed, 708 insertions(+), 18 deletions(-) create mode 100644 MultiNodeTests/DistributedActorsMultiNodeTests/MultiNode+SimpleGossipTests.swift create mode 100644 Samples/Sources/SimpleGossip/Gossiper.swift create mode 100644 Samples/Sources/SimpleGossip/GreetingGossip+Messages.swift create mode 100644 Samples/Sources/SimpleGossip/GreetingGossip.swift create mode 100644 Samples/Sources/SimpleGossip/SamplePrettyLogHandler.swift create mode 100644 Samples/Sources/SimpleGossip/SimpleGossip.swift create mode 100644 Samples/Sources/SimpleGossip/boot.swift diff --git a/MultiNodeTests/DistributedActorsMultiNodeTests/MultiNode+SimpleGossipTests.swift b/MultiNodeTests/DistributedActorsMultiNodeTests/MultiNode+SimpleGossipTests.swift new file mode 100644 index 000000000..4700fb915 --- /dev/null +++ b/MultiNodeTests/DistributedActorsMultiNodeTests/MultiNode+SimpleGossipTests.swift @@ -0,0 +1,186 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2022-2024 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import DistributedCluster +import MultiNodeTestKit + +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import DistributedCluster +import MultiNodeTestKit + +public final class MultiNodeSimpleGossipTests: MultiNodeTestSuite { + public init() { + } + + public enum Nodes: String, MultiNodeNodes { + case first + case second + case third + } + + public static func configureMultiNodeTest(settings: inout MultiNodeTestSettings) { + settings.dumpNodeLogs = .always + + settings.logCapture.excludeGrep = [ + "SWIMActor.swift", + "SWIMInstance.swift", + "Gossiper+Shell.swift", + ] + + settings.installPrettyLogger = true + } + + public static func configureActorSystem(settings: inout ClusterSystemSettings) { + settings.logging.logLevel = .notice + + settings.autoLeaderElection = .lowestReachable(minNumberOfMembers: 3) + } + + public let test_receptionist_checkIn = MultiNodeTest(MultiNodeSimpleGossipTests.self) { multiNode in + // *All* nodes spawn a gossip participant + let localEcho = await GreetingGossipPeer(actorSystem: multiNode.system) + + try await multiNode.checkPoint("Spawned actors") // ------------------------------------------------------------ + + let expectedCount = Nodes.allCases.count + var discovered: Set = [] + for try await actor in await multiNode.system.receptionist.listing(of: .init(GreetingGossipPeer.self)) { + multiNode.log.notice("Discovered \(actor.id) from \(actor.id.node)") + discovered.insert(actor) + + if discovered.count == expectedCount { + break + } + } + + try await multiNode.checkPoint("All members found \(expectedCount) actors") // --------------------------------- + } + + struct GreetingGossip: Codable { + var sequenceNumber: UInt + var text: String + } + + struct GreetingGossipAck: Codable { + var sequenceNumber: UInt + + // push-pull gossip; when we receive an "old" version, we can immediately reply with a newer one + var supersededBy: GreetingGossip + } + + distributed actor GreetingGossipPeer { + typealias ActorSystem = ClusterSystem + + typealias Gossip = GreetingGossip + typealias Acknowledgement = GreetingGossipAck + + // This gossip peer belongs to some group identifier by this + @ActorID.Metadata(\.receptionID) + var gossipGroupID: String + + private var activeGreeting = GreetingGossip(sequenceNumber: 0, text: "") + + private var peers: Set = [] + private var peerDiscoveryTask: Task? + + init(actorSystem: ActorSystem) async { + self.actorSystem = actorSystem + self.gossipGroupID = "greeting-gossip" + + await actorSystem.receptionist.checkIn(self) + peerDiscoveryTask = Task { + for try await peer in actorSystem.receptionist.listing(of: .init(Self.self, id: self.gossipGroupID)) { + self.peers.insert(peer) + } + } + let gossipTask = Task { + while !Task.isCancelled { + try await Task.sleep(for: .seconds(15)) // TODO: jitter + + try await gossipRound() + } + } + } + + distributed func greet(name: String) -> String { + return "\(self.activeGreeting.text), \(name)!" + } + + // Call these e.g. locally to set a "new value"; you could set some date or order here as well + distributed func setGreeting(_ text: String) { + // Some way to come up with "next" value + let number = self.activeGreeting.sequenceNumber + self.activeGreeting = .init(sequenceNumber: number + 1, text: text) + } + + func makePayload() async -> Gossip { + self.activeGreeting + } + + distributed func gossip(_ gossip: Gossip, from peer: ID) async -> Acknowledgement { + guard activeGreeting.sequenceNumber < gossip.sequenceNumber else { + // Tell the caller immediately that we actually have a more up-to-date value + return .init(sequenceNumber: gossip.sequenceNumber, supersededBy: self.activeGreeting) + } + } + +// distributed func acknowledgement(_ acknowledgement: Acknowledgement, +// from peer: ID, +// confirming gossip: Gossip) { +// +// } + } +} + +protocol Gossiper: DistributedActor where ActorSystem == ClusterSystem { + associatedtype Peer: Gossiper + associatedtype Gossip: Codable + associatedtype Acknowledgement: Codable + + var peers: Set { get } + + + func makePayload() async -> Gossip + + /// Receive a gossip from some peer. + /// + /// Missing an acknowlagement will cause another delivery attempt eventually + distributed func gossip(_ gossip: Gossip, from peer: ID) async -> Acknowledgement + + // ==== Default impls -------------- + func gossipRound() async throws +} + +extension Gossiper { + func gossipRound() async { + let target = self.peers.shuffled().first + + let gossip = self.makePayload() + target?.gossip(gossip, from: self.id) + } + +} \ No newline at end of file diff --git a/Samples/Package.swift b/Samples/Package.swift index 3b8decbc1..ff46f93f0 100644 --- a/Samples/Package.swift +++ b/Samples/Package.swift @@ -29,6 +29,14 @@ var targets: [PackageDescription.Target] = [ ] ), + .executableTarget( + name: "SimpleGossip", + dependencies: [ + .product(name: "DistributedCluster", package: "swift-distributed-actors"), + ], + path: "Sources/SimpleGossip" + ), + /* --- tests --- */ // no-tests placeholder project to not have `swift test` fail on Samples/ diff --git a/Samples/Sources/SimpleGossip/Gossiper.swift b/Samples/Sources/SimpleGossip/Gossiper.swift new file mode 100644 index 000000000..55a573fd9 --- /dev/null +++ b/Samples/Sources/SimpleGossip/Gossiper.swift @@ -0,0 +1,112 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import DistributedCluster +import Logging + +protocol GossipAcknowledgement: Sendable, Codable{ + associatedtype Payload where Payload: Sendable & Codable + + var sequenceNumber: UInt { get } + + // Optional support for simple push-pull gossip; + // when we receive an "old" version, we can immediately reply with a newer one + var supersededBy: Payload? { get } +} + +protocol Gossiper: DistributedActor where ActorSystem == ClusterSystem { + associatedtype Gossip: Codable + associatedtype Acknowledgement: GossipAcknowledgement + + var gossipGroupID: String { get } + + var gossipPeers: Set { get set } + + func makePayload() async -> Gossip + + var gossipBaseInterval: Duration { get } + + /// Receive a gossip from some peer. + /// + /// Missing an acknowledgement will cause another delivery attempt eventually + distributed func gossip(_ gossip: Gossip, from peer: ID) async -> Acknowledgement + + // ==== Default impls -------------- + func startGossip() async throws + + func gossipRound() async throws +} + +extension Gossiper { + + var log: Logger { + Logger(actor: self) + } + + var gossipBaseInterval: Duration { + .seconds(1) + } + + func startGossip() async throws { + var sleepIntervalBackoff = Backoff.exponential(initialInterval: self.gossipBaseInterval, randomFactor: 0.50) + + log.warning("Start gossip: \(self.id)") + + await actorSystem.receptionist.checkIn(self) + + let listingTask = Task { + for try await peer in await actorSystem.receptionist.listing(of: .init(Self.self, id: self.gossipGroupID)) { + self.gossipPeers.insert(peer) + log.warning("\(self.id) discovered [\(peer.id)]", metadata: [ + "gossipPeers/count": "\(gossipPeers.count)" + ]) + } + } + defer { + listingTask.cancel() + } + + while !Task.isCancelled { + let duration = sleepIntervalBackoff.next()! +// log.notice("Gossip sleep: \(duration.prettyDescription)") + try await Task.sleep(for: duration) + + do { + try await self.gossipRound() + } catch { + log.warning("Gossip round failed: \(error)") // TODO: log target and more data + } + } + + log.notice("Gossip terminated...") + } + + func gossipRound() async throws { + guard let target = self.gossipPeers.shuffled().first else { + log.info("No peer to gossip with...") + return + } + + guard target.id != self.id else { + return try await gossipRound() // try again + } + + log.debug("Select peer: \(target.id)") + + let gossip = await makePayload() + let ack = try await target.gossip(gossip, from: self.id) + log.notice("Ack: \(ack)") + } + +} \ No newline at end of file diff --git a/Samples/Sources/SimpleGossip/GreetingGossip+Messages.swift b/Samples/Sources/SimpleGossip/GreetingGossip+Messages.swift new file mode 100644 index 000000000..f2dc34839 --- /dev/null +++ b/Samples/Sources/SimpleGossip/GreetingGossip+Messages.swift @@ -0,0 +1,38 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import DistributedCluster + +struct GreetingGossip: Codable { + var sequenceNumber: UInt + var text: String +} + +struct GreetingGossipAck: GossipAcknowledgement { + var sequenceNumber: UInt + + // push-pull gossip; when we receive an "old" version, we can immediately reply with a newer one + var supersededBy: GreetingGossip? + + init(sequenceNumber: UInt) { + self.sequenceNumber = sequenceNumber + self.supersededBy = nil + } + + init(sequenceNumber: UInt, supersededBy: GreetingGossip) { + self.sequenceNumber = sequenceNumber + self.supersededBy = supersededBy + } +} + diff --git a/Samples/Sources/SimpleGossip/GreetingGossip.swift b/Samples/Sources/SimpleGossip/GreetingGossip.swift new file mode 100644 index 000000000..d39ca58f1 --- /dev/null +++ b/Samples/Sources/SimpleGossip/GreetingGossip.swift @@ -0,0 +1,79 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import DistributedCluster + +distributed actor GreetingGossipPeer: Gossiper { + typealias ActorSystem = ClusterSystem + + typealias Gossip = GreetingGossip + typealias Acknowledgement = GreetingGossipAck + + // This gossip peer belongs to some group identifier by this + @ActorID.Metadata(\.receptionID) + var gossipGroupID: String + + private var activeGreeting = GreetingGossip(sequenceNumber: 0, text: "") + + var gossipPeers: Set = [] + private var peerDiscoveryTask: Task? + + init(actorSystem: ActorSystem) async { + self.actorSystem = actorSystem + self.gossipGroupID = "greeting-gossip" + + // FIXME: using Self.self in the init here will crash +// peerDiscoveryTask = Task { +// for try await peer in await actorSystem.receptionist.listing(of: .init(GreetingGossipPeer.self, id: self.gossipGroupID)) { +// self.peers.insert(peer) +// } +// } + + let gossipTask = Task { + try await startGossip() + } + } + + distributed func greet(name: String) -> String { + return "\(self.activeGreeting.text), \(name)!" + } + + // Call these e.g. locally to set a "new value"; you could set some date or order here as well + distributed func setGreeting(_ text: String) { + // Some way to come up with "next" value + let number = self.activeGreeting.sequenceNumber + self.activeGreeting = .init(sequenceNumber: number + 1, text: text) + } + + func makePayload() async -> Gossip { + self.activeGreeting + } + + distributed func gossip(_ gossip: Gossip, from peer: ID) async -> Acknowledgement { + guard activeGreeting.sequenceNumber < gossip.sequenceNumber else { + // Tell the caller immediately that we actually have a more up-to-date value + return .init(sequenceNumber: gossip.sequenceNumber, supersededBy: self.activeGreeting) + } + + self.activeGreeting = gossip + + return .init(sequenceNumber: gossip.sequenceNumber) + } + +// distributed func acknowledgement(_ acknowledgement: Acknowledgement, +// from peer: ID, +// confirming gossip: Gossip) { +// +// } +} diff --git a/Samples/Sources/SimpleGossip/SamplePrettyLogHandler.swift b/Samples/Sources/SimpleGossip/SamplePrettyLogHandler.swift new file mode 100644 index 000000000..fc9ce8142 --- /dev/null +++ b/Samples/Sources/SimpleGossip/SamplePrettyLogHandler.swift @@ -0,0 +1,150 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2021 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Distributed +@testable import DistributedCluster +import Logging + +#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS) +import Darwin +#elseif os(Windows) +import CRT +#elseif canImport(Glibc) +import Glibc +#elseif canImport(WASILibc) +import WASILibc +#else +#error("Unsupported runtime") +#endif + +/// Logger that prints "pretty" for showcasing the cluster nicely in sample applications. +struct SamplePrettyLogHandler: LogHandler { + static let CONSOLE_RESET = "\u{001B}[0;0m" + static let CONSOLE_BOLD = "\u{001B}[1m" + + public static func make(label: String) -> SamplePrettyLogHandler { + return SamplePrettyLogHandler(label: label) + } + + private let label: String + + public var logLevel: Logger.Level = .info + + public var metadata = Logger.Metadata() + + public subscript(metadataKey metadataKey: String) -> Logger.Metadata.Value? { + get { + return self.metadata[metadataKey] + } + set { + self.metadata[metadataKey] = newValue + } + } + + // internal for testing only + internal init(label: String) { + self.label = label + } + + // TODO: this implementation of getting a nice printout is a bit messy, but good enough for our sample apps + public func log( + level: Logger.Level, + message: Logger.Message, + metadata: Logger.Metadata?, + source: String, + file: String, + function: String, + line: UInt + ) { + var metadataString = "" + var nodeInfo = "" + + var effectiveMetadata = (metadata ?? [:]).merging(self.metadata, uniquingKeysWith: { _, new in new }) + if let node = effectiveMetadata.removeValue(forKey: "cluster/node") { + nodeInfo += "\(node)" + } + let label: String + if let path = effectiveMetadata.removeValue(forKey: "actor/path")?.description { + label = path + } else { + label = "" + } + + if !effectiveMetadata.isEmpty { + metadataString = "\n// metadata:\n" + for key in effectiveMetadata.keys.sorted() { + let value: Logger.MetadataValue = effectiveMetadata[key]! + let valueDescription = self.prettyPrint(metadata: value) + + var allString = "\n// \"\(key)\": \(valueDescription)" + if allString.contains("\n") { + allString = String( + allString.split(separator: "\n").map { valueLine in + if valueLine.starts(with: "// ") { + return "\(valueLine)\n" + } else { + return "// \(valueLine)\n" + } + }.joined(separator: "") + ) + } + metadataString.append(allString) + } + metadataString = String(metadataString.dropLast(1)) + } + + let file = file.split(separator: "/").last ?? "" + let line = line + print("\(self.timestamp()) \(level) [\(nodeInfo)\(Self.CONSOLE_BOLD)\(label)\(Self.CONSOLE_RESET)][\(file):\(line)] \(message)\(metadataString)") + } + + internal func prettyPrint(metadata: Logger.MetadataValue) -> String { + var valueDescription = "" + switch metadata { + case .string(let string): + valueDescription = string + case .stringConvertible(let convertible): + valueDescription = convertible.description + case .array(let array): + valueDescription = "\n \(array.map { "\($0)" }.joined(separator: "\n "))" + case .dictionary(let metadata): + for k in metadata.keys { + valueDescription += "\(Self.CONSOLE_BOLD)\(k)\(Self.CONSOLE_RESET): \(self.prettyPrint(metadata: metadata[k]!))" + } + } + + return valueDescription + } + + private func timestamp() -> String { + var buffer = [Int8](repeating: 0, count: 255) + var timestamp = time(nil) + let localTime = localtime(×tamp) + // This format is pleasant to read in local sample apps: + strftime(&buffer, buffer.count, "%H:%M:%S", localTime) + // The usual full format is: + // strftime(&buffer, buffer.count, "%Y-%m-%dT%H:%M:%S%z", localTime) + return buffer.withUnsafeBufferPointer { + $0.withMemoryRebound(to: CChar.self) { + String(cString: $0.baseAddress!) + } + } + } +} + +extension DistributedActor where Self: CustomStringConvertible { + public nonisolated var description: String { + "\(Self.self)(\(self.id))" + } +} diff --git a/Samples/Sources/SimpleGossip/SimpleGossip.swift b/Samples/Sources/SimpleGossip/SimpleGossip.swift new file mode 100644 index 000000000..0cca617b0 --- /dev/null +++ b/Samples/Sources/SimpleGossip/SimpleGossip.swift @@ -0,0 +1,48 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import DistributedCluster + +final class SimpleGossip { + private var forks: [GreetingGossipPeer] = [] + + func run(for duration: Duration) async throws { + let system = await ClusterSystem("GossipSystem") { settings in +// settings.logging.logLevel = .debug + } + + // prepare 5 forks, the resources, that the philosophers will compete for: + let fork1 = await GreetingGossipPeer(actorSystem: system) + let fork2 = await GreetingGossipPeer(actorSystem: system) +// let fork3 = await GreetingGossipPeer(actorSystem: system) +// let fork4 = await GreetingGossipPeer(actorSystem: system) +// let fork5 = await GreetingGossipPeer(actorSystem: system) + self.forks = [fork1, fork2, +// fork3, fork4, fork5 + ] + + try await fork1.setGreeting("Hello") + + print("Sleeping...") + _Thread.sleep(duration) + + for p in forks { + print("Greet: \(p.id)") + let greeting = try await p.greet(name: "Caplin") + print(" >>> \(greeting)") + } + } +} + + diff --git a/Samples/Sources/SimpleGossip/boot.swift b/Samples/Sources/SimpleGossip/boot.swift new file mode 100644 index 000000000..f3311989c --- /dev/null +++ b/Samples/Sources/SimpleGossip/boot.swift @@ -0,0 +1,52 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Distributed +import DistributedCluster +import Logging +import NIO + +/* + * Swift Distributed Actors implementation of the classic "Dining Philosophers" problem. + * + * The goal of this implementation is not to be efficient or solve the live-lock, + * but rather to be a nice application that continuously "does something" with + * messaging between various actors. + * + * The implementation is based on the following take on the problem: + * http://www.dalnefre.com/wp/2010/08/dining-philosophers-in-humus + */ + +typealias DefaultDistributedActorSystem = ClusterSystem + +@main enum Main { + static func main() async { + print("===-----------------------------------------------------===") + print("| Gossip Sample App |") + print("| |") + print("| USAGE: swift run SimpleGossip [dist] |") + print("===-----------------------------------------------------===") + + LoggingSystem.bootstrap(SamplePrettyLogHandler.init) + + let duration = Duration.seconds(10) + +// switch CommandLine.arguments.dropFirst().first { +// case "dist", "distributed": +// try! await DistributedDiningPhilosophers().run(for: duration) +// default: + try! await SimpleGossip().run(for: duration) +// } + } +} diff --git a/Sources/DistributedCluster/ActorMetadata.swift b/Sources/DistributedCluster/ActorMetadata.swift index e60d1f1df..f323161ac 100644 --- a/Sources/DistributedCluster/ActorMetadata.swift +++ b/Sources/DistributedCluster/ActorMetadata.swift @@ -183,14 +183,14 @@ public final class ActorMetadata: CustomStringConvertible, CustomDebugStringConv self.lock.wait() let copy = self._storage self.lock.signal() - return "\(copy)" + return "\(Array(copy).sorted(by: { $0.0 < $1.0 }))" } public var debugDescription: String { self.lock.wait() let copy = self._storage self.lock.signal() - return "\(Self.self)(\(copy))" + return "\(Array(copy).sorted(by: { $0.0 < $1.0 }))" } } diff --git a/Sources/DistributedCluster/Cluster/Reception/OperationLogDistributedReceptionist.swift b/Sources/DistributedCluster/Cluster/Reception/OperationLogDistributedReceptionist.swift index 99ad6a5c5..8f822667a 100644 --- a/Sources/DistributedCluster/Cluster/Reception/OperationLogDistributedReceptionist.swift +++ b/Sources/DistributedCluster/Cluster/Reception/OperationLogDistributedReceptionist.swift @@ -400,15 +400,15 @@ extension OpLogDistributedReceptionist: LifecycleWatch { "subscription/callSite": "\(file):\(line)", ]) - // We immediately flush all already-known registrations; - // as new ones come in, they will be reported to this subscription later on - for alreadyRegisteredAtSubscriptionTime in self.storage.registrations(forKey: subscription.key) ?? [] { - if subscription.tryOffer(registration: alreadyRegisteredAtSubscriptionTime) { - self.log.debug("Offered \(alreadyRegisteredAtSubscriptionTime.actorID) to subscription \(subscription)") - } else { - self.log.warning("Dropped \(alreadyRegisteredAtSubscriptionTime.actorID) on subscription \(subscription)") - } - } +// // We immediately flush all already-known registrations; +// // as new ones come in, they will be reported to this subscription later on +// for alreadyRegisteredAtSubscriptionTime in self.storage.registrations(forKey: subscription.key) ?? [] { +// if subscription.tryOffer(registration: alreadyRegisteredAtSubscriptionTime) { +// self.log.debug("Offered \(alreadyRegisteredAtSubscriptionTime.actorID) to subscription \(subscription)") +// } else { +// self.log.warning("Dropped \(alreadyRegisteredAtSubscriptionTime.actorID) on subscription \(subscription)") +// } +// } } } @@ -510,7 +510,7 @@ extension OpLogDistributedReceptionist { // Otherwise, if we process a newer version (i.e., with bigger sequence number) first, older // versions will be dropped because they are considered "seen". let registrations = (self.storage.registrations(forKey: key) ?? []).sorted { l, r in l.version < r.version } // FIXME: use ordered set or Deque now that we have them - self.log.notice( + self.log.debug( "Registrations to flush: \(registrations.count)", metadata: [ "registrations": Logger.MetadataValue.array( @@ -521,15 +521,15 @@ extension OpLogDistributedReceptionist { // self.instrumentation.listingPublished(key: key, subscribers: subscriptions.count, registrations: registrations.count) // TODO(distributed): make the instrumentation calls compatible with distributed actor based types for subscription in subscriptions { - self.log.notice("Offering registrations to subscription: \(subscription))", metadata: [ + self.log.debug("Offering registrations to subscription: \(subscription))", metadata: [ "registrations": "\(subscription.seenActorRegistrations)", ]) for registration in registrations { - if subscription.tryOffer(registration: registration) { - self.log.notice("OFFERED \(registration.actorID) TO \(subscription)") + if subscription.tryOffer(registration: registration, log: self.log) { + self.log.debug("Applied registration \(registration.actorID) to subscription \(subscription)") } else { - self.log.notice("DROPPED \(registration.actorID) TO \(subscription)") + self.log.debug("Dropped registration \(registration.actorID) to subscription \(subscription)") } } } diff --git a/Sources/DistributedCluster/Receptionist/DistributedReceptionist.swift b/Sources/DistributedCluster/Receptionist/DistributedReceptionist.swift index 486aa0470..ece297095 100644 --- a/Sources/DistributedCluster/Receptionist/DistributedReceptionist.swift +++ b/Sources/DistributedCluster/Receptionist/DistributedReceptionist.swift @@ -393,6 +393,13 @@ internal final class AnyDistributedReceptionListingSubscription: Hashable, @unch /// We very carefully only modify this from the owning actor (receptionist). // TODO: It would be lovely to be able to express this in the type system as "actor owned" or "actor local" to some actor instance. var seenActorRegistrations: VersionVector +// { +// willSet { +// print("SETTING seenActorRegistrations = \(newValue)") +// print(" WAS = \(seenActorRegistrations)") +// if seenActorRegistrations.isEmpty { fatalError("WHERE") } +// } +// } init( subscriptionID: ObjectIdentifier, @@ -425,21 +432,28 @@ internal final class AnyDistributedReceptionListingSubscription: Hashable, @unch /// seen this actor in this specific stream, and don't need to emit it again. /// /// - Returns: true if the value was successfully offered - func tryOffer(registration: VersionedRegistration) -> Bool { + func tryOffer(registration: VersionedRegistration, log: Logger? = nil) -> Bool { let oldSeenRegistrations = self.seenActorRegistrations + log?.debug("OLD SEEN: \(oldSeenRegistrations)") + log?.debug("registration: \(registration)") + log?.debug("registration version: \(registration.version)") self.seenActorRegistrations.merge(other: registration.version) + log?.debug("SEEN NOW: \(self.seenActorRegistrations)") switch self.seenActorRegistrations.compareTo(oldSeenRegistrations) { case .same: + log?.debug("->>> SAME") // the seen vector was unaffected by the merge, which means that the // incoming registration version was already seen, and thus we don't need to emit it again return false case .happenedAfter, .concurrent: + log?.debug("->>> happenedAfter || concurrent") // the incoming registration has not yet been seen before, // which means that we should emit the actor to the stream. self.onNext(registration.actorID) return true case .happenedBefore: + log?.debug("->>> happenedBefore") fatalError(""" It should not be possible for a *merged* version vector to be in the PAST as compared with itself before the merge Previously: \(oldSeenRegistrations) @@ -460,7 +474,9 @@ internal final class AnyDistributedReceptionListingSubscription: Hashable, @unch } var description: String { - var string = "AnyDistributedReceptionListingSubscription(subscriptionID: \(subscriptionID), key: \(key), , seenActorRegistrations: \(seenActorRegistrations)" + var string = "\(Self.self)(" + // string += "subscriptionID: \(subscriptionID), key: \(key), seenActorRegistrations: \(seenActorRegistrations)" + string += "type: \(key.guestType), id: \(key.id)" #if DEBUG string += ", at: \(self.file):\(self.line)" #endif diff --git a/Sources/MultiNodeTestKitRunner/MultiNode+TestSuites.swift b/Sources/MultiNodeTestKitRunner/MultiNode+TestSuites.swift index e60a62259..c24ecf587 100644 --- a/Sources/MultiNodeTestKitRunner/MultiNode+TestSuites.swift +++ b/Sources/MultiNodeTestKitRunner/MultiNode+TestSuites.swift @@ -19,4 +19,5 @@ let MultiNodeTestSuites: [any MultiNodeTestSuite.Type] = [ MultiNodeConductorTests.self, MultiNodeClusterSingletonTests.self, MultiNodeReceptionistTests.self, + MultiNodeSimpleGossipTests.self, ] From 7f1586b091a1855418cffb8d849722ad8c57defb Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Wed, 14 Aug 2024 16:01:03 +0900 Subject: [PATCH 3/4] add WorkerPool example and fix some of workerpool logic with remote DAs --- .../MultiNode+SimpleGossipTests.swift | 186 ------------------ Package.swift | 2 +- Samples/.gitignore | 2 + Samples/Package.swift | 11 +- Samples/Sources/SimpleGossip/App.swift | 41 ++++ .../Sources/SimpleGossip/SimpleGossip.swift | 48 ----- Samples/Sources/SimpleGossip/boot.swift | 38 ++-- Samples/Sources/SimpleWorkerPool/App.swift | 101 ++++++++++ .../SamplePrettyLogHandler.swift | 151 ++++++++++++++ Samples/Sources/SimpleWorkerPool/boot.swift | 38 ++++ Sources/DistributedCluster/ActorLogging.swift | 4 +- .../Cluster/ClusterShell.swift | 4 +- .../OperationLogDistributedReceptionist.swift | 18 +- .../DistributedCluster/ClusterSystem.swift | 34 ++-- .../_CancellableCheckedContinuation.swift | 135 +++++++++++++ .../Docs.docc/Receptionist.md | 2 +- .../InvocationBehavior.swift | 2 +- .../Pattern/WorkerPool.swift | 176 ++++++++++++----- .../DistributedReceptionist.swift | 7 - .../Serialization/Serialization.swift | 9 +- .../WeakActorDictionary.swift | 93 ++++++--- .../MultiNode+TestSuites.swift | 1 - 22 files changed, 733 insertions(+), 370 deletions(-) delete mode 100644 MultiNodeTests/DistributedActorsMultiNodeTests/MultiNode+SimpleGossipTests.swift create mode 100644 Samples/.gitignore create mode 100644 Samples/Sources/SimpleGossip/App.swift delete mode 100644 Samples/Sources/SimpleGossip/SimpleGossip.swift create mode 100644 Samples/Sources/SimpleWorkerPool/App.swift create mode 100644 Samples/Sources/SimpleWorkerPool/SamplePrettyLogHandler.swift create mode 100644 Samples/Sources/SimpleWorkerPool/boot.swift create mode 100644 Sources/DistributedCluster/Concurrency/_CancellableCheckedContinuation.swift diff --git a/MultiNodeTests/DistributedActorsMultiNodeTests/MultiNode+SimpleGossipTests.swift b/MultiNodeTests/DistributedActorsMultiNodeTests/MultiNode+SimpleGossipTests.swift deleted file mode 100644 index 4700fb915..000000000 --- a/MultiNodeTests/DistributedActorsMultiNodeTests/MultiNode+SimpleGossipTests.swift +++ /dev/null @@ -1,186 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift Distributed Actors open source project -// -// Copyright (c) 2022-2024 Apple Inc. and the Swift Distributed Actors project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import DistributedCluster -import MultiNodeTestKit - -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift Distributed Actors open source project -// -// Copyright (c) 2022 Apple Inc. and the Swift Distributed Actors project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import DistributedCluster -import MultiNodeTestKit - -public final class MultiNodeSimpleGossipTests: MultiNodeTestSuite { - public init() { - } - - public enum Nodes: String, MultiNodeNodes { - case first - case second - case third - } - - public static func configureMultiNodeTest(settings: inout MultiNodeTestSettings) { - settings.dumpNodeLogs = .always - - settings.logCapture.excludeGrep = [ - "SWIMActor.swift", - "SWIMInstance.swift", - "Gossiper+Shell.swift", - ] - - settings.installPrettyLogger = true - } - - public static func configureActorSystem(settings: inout ClusterSystemSettings) { - settings.logging.logLevel = .notice - - settings.autoLeaderElection = .lowestReachable(minNumberOfMembers: 3) - } - - public let test_receptionist_checkIn = MultiNodeTest(MultiNodeSimpleGossipTests.self) { multiNode in - // *All* nodes spawn a gossip participant - let localEcho = await GreetingGossipPeer(actorSystem: multiNode.system) - - try await multiNode.checkPoint("Spawned actors") // ------------------------------------------------------------ - - let expectedCount = Nodes.allCases.count - var discovered: Set = [] - for try await actor in await multiNode.system.receptionist.listing(of: .init(GreetingGossipPeer.self)) { - multiNode.log.notice("Discovered \(actor.id) from \(actor.id.node)") - discovered.insert(actor) - - if discovered.count == expectedCount { - break - } - } - - try await multiNode.checkPoint("All members found \(expectedCount) actors") // --------------------------------- - } - - struct GreetingGossip: Codable { - var sequenceNumber: UInt - var text: String - } - - struct GreetingGossipAck: Codable { - var sequenceNumber: UInt - - // push-pull gossip; when we receive an "old" version, we can immediately reply with a newer one - var supersededBy: GreetingGossip - } - - distributed actor GreetingGossipPeer { - typealias ActorSystem = ClusterSystem - - typealias Gossip = GreetingGossip - typealias Acknowledgement = GreetingGossipAck - - // This gossip peer belongs to some group identifier by this - @ActorID.Metadata(\.receptionID) - var gossipGroupID: String - - private var activeGreeting = GreetingGossip(sequenceNumber: 0, text: "") - - private var peers: Set = [] - private var peerDiscoveryTask: Task? - - init(actorSystem: ActorSystem) async { - self.actorSystem = actorSystem - self.gossipGroupID = "greeting-gossip" - - await actorSystem.receptionist.checkIn(self) - peerDiscoveryTask = Task { - for try await peer in actorSystem.receptionist.listing(of: .init(Self.self, id: self.gossipGroupID)) { - self.peers.insert(peer) - } - } - let gossipTask = Task { - while !Task.isCancelled { - try await Task.sleep(for: .seconds(15)) // TODO: jitter - - try await gossipRound() - } - } - } - - distributed func greet(name: String) -> String { - return "\(self.activeGreeting.text), \(name)!" - } - - // Call these e.g. locally to set a "new value"; you could set some date or order here as well - distributed func setGreeting(_ text: String) { - // Some way to come up with "next" value - let number = self.activeGreeting.sequenceNumber - self.activeGreeting = .init(sequenceNumber: number + 1, text: text) - } - - func makePayload() async -> Gossip { - self.activeGreeting - } - - distributed func gossip(_ gossip: Gossip, from peer: ID) async -> Acknowledgement { - guard activeGreeting.sequenceNumber < gossip.sequenceNumber else { - // Tell the caller immediately that we actually have a more up-to-date value - return .init(sequenceNumber: gossip.sequenceNumber, supersededBy: self.activeGreeting) - } - } - -// distributed func acknowledgement(_ acknowledgement: Acknowledgement, -// from peer: ID, -// confirming gossip: Gossip) { -// -// } - } -} - -protocol Gossiper: DistributedActor where ActorSystem == ClusterSystem { - associatedtype Peer: Gossiper - associatedtype Gossip: Codable - associatedtype Acknowledgement: Codable - - var peers: Set { get } - - - func makePayload() async -> Gossip - - /// Receive a gossip from some peer. - /// - /// Missing an acknowlagement will cause another delivery attempt eventually - distributed func gossip(_ gossip: Gossip, from peer: ID) async -> Acknowledgement - - // ==== Default impls -------------- - func gossipRound() async throws -} - -extension Gossiper { - func gossipRound() async { - let target = self.peers.shuffled().first - - let gossip = self.makePayload() - target?.gossip(gossip, from: self.id) - } - -} \ No newline at end of file diff --git a/Package.swift b/Package.swift index 4faa4b4f7..f23f197ff 100644 --- a/Package.swift +++ b/Package.swift @@ -186,7 +186,7 @@ var dependencies: [Package.Dependency] = [ // ~~~ Swift libraries ~~~ .package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0-beta"), - .package(url: "https://github.com/apple/swift-collections", from: "1.0.5"), + .package(url: "https://github.com/apple/swift-collections", from: "1.1.0"), // ~~~ Observability ~~~ .package(url: "https://github.com/apple/swift-log", from: "1.0.0"), diff --git a/Samples/.gitignore b/Samples/.gitignore new file mode 100644 index 000000000..af42b37c6 --- /dev/null +++ b/Samples/.gitignore @@ -0,0 +1,2 @@ +.idea +.build diff --git a/Samples/Package.swift b/Samples/Package.swift index ff46f93f0..d8ed17b47 100644 --- a/Samples/Package.swift +++ b/Samples/Package.swift @@ -22,7 +22,6 @@ var targets: [PackageDescription.Target] = [ dependencies: [ .product(name: "DistributedCluster", package: "swift-distributed-actors"), ], - path: "Sources/SampleDiningPhilosophers", exclude: [ "dining-philosopher-fsm.graffle", "dining-philosopher-fsm.svg", @@ -33,8 +32,14 @@ var targets: [PackageDescription.Target] = [ name: "SimpleGossip", dependencies: [ .product(name: "DistributedCluster", package: "swift-distributed-actors"), - ], - path: "Sources/SimpleGossip" + ] + ), + + .executableTarget( + name: "SimpleWorkerPool", + dependencies: [ + .product(name: "DistributedCluster", package: "swift-distributed-actors"), + ] ), /* --- tests --- */ diff --git a/Samples/Sources/SimpleGossip/App.swift b/Samples/Sources/SimpleGossip/App.swift new file mode 100644 index 000000000..d26c25bb2 --- /dev/null +++ b/Samples/Sources/SimpleGossip/App.swift @@ -0,0 +1,41 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import DistributedCluster + +struct App { + + let port: Int + + init(port: Int) { + self.port = port + } + + func run(greeting: String?, for duration: Duration) async throws { + let system = await ClusterSystem("GossipSystem") { settings in + settings.endpoint.port = .random(in: 7000...9999) + settings.discovery = .clusterd + } + let peer = await GreetingGossipPeer(actorSystem: system) + + if let greeting { + try await peer.setGreeting(greeting) + } + + print("Sleeping...") + _Thread.sleep(duration) + } +} + + diff --git a/Samples/Sources/SimpleGossip/SimpleGossip.swift b/Samples/Sources/SimpleGossip/SimpleGossip.swift deleted file mode 100644 index 0cca617b0..000000000 --- a/Samples/Sources/SimpleGossip/SimpleGossip.swift +++ /dev/null @@ -1,48 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift Distributed Actors open source project -// -// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import DistributedCluster - -final class SimpleGossip { - private var forks: [GreetingGossipPeer] = [] - - func run(for duration: Duration) async throws { - let system = await ClusterSystem("GossipSystem") { settings in -// settings.logging.logLevel = .debug - } - - // prepare 5 forks, the resources, that the philosophers will compete for: - let fork1 = await GreetingGossipPeer(actorSystem: system) - let fork2 = await GreetingGossipPeer(actorSystem: system) -// let fork3 = await GreetingGossipPeer(actorSystem: system) -// let fork4 = await GreetingGossipPeer(actorSystem: system) -// let fork5 = await GreetingGossipPeer(actorSystem: system) - self.forks = [fork1, fork2, -// fork3, fork4, fork5 - ] - - try await fork1.setGreeting("Hello") - - print("Sleeping...") - _Thread.sleep(duration) - - for p in forks { - print("Greet: \(p.id)") - let greeting = try await p.greet(name: "Caplin") - print(" >>> \(greeting)") - } - } -} - - diff --git a/Samples/Sources/SimpleGossip/boot.swift b/Samples/Sources/SimpleGossip/boot.swift index f3311989c..c701464a6 100644 --- a/Samples/Sources/SimpleGossip/boot.swift +++ b/Samples/Sources/SimpleGossip/boot.swift @@ -17,36 +17,22 @@ import DistributedCluster import Logging import NIO -/* - * Swift Distributed Actors implementation of the classic "Dining Philosophers" problem. - * - * The goal of this implementation is not to be efficient or solve the live-lock, - * but rather to be a nice application that continuously "does something" with - * messaging between various actors. - * - * The implementation is based on the following take on the problem: - * http://www.dalnefre.com/wp/2010/08/dining-philosophers-in-humus - */ - typealias DefaultDistributedActorSystem = ClusterSystem @main enum Main { - static func main() async { - print("===-----------------------------------------------------===") - print("| Gossip Sample App |") - print("| |") - print("| USAGE: swift run SimpleGossip [dist] |") - print("===-----------------------------------------------------===") + static func main() async { + print("===-----------------------------------------------------===") + print("| Gossip Sample App |") + print("| |") + print("| USAGE: swift run SimpleGossip ? |") + print("===-----------------------------------------------------===") + + LoggingSystem.bootstrap(SamplePrettyLogHandler.init) - LoggingSystem.bootstrap(SamplePrettyLogHandler.init) + let duration = Duration.seconds(60) - let duration = Duration.seconds(10) + let greeting = CommandLine.arguments.dropFirst().first -// switch CommandLine.arguments.dropFirst().first { -// case "dist", "distributed": -// try! await DistributedDiningPhilosophers().run(for: duration) -// default: - try! await SimpleGossip().run(for: duration) -// } - } + try! await App(port: 0).run(greeting: greeting, for: duration) + } } diff --git a/Samples/Sources/SimpleWorkerPool/App.swift b/Samples/Sources/SimpleWorkerPool/App.swift new file mode 100644 index 000000000..aa1f3da7d --- /dev/null +++ b/Samples/Sources/SimpleWorkerPool/App.swift @@ -0,0 +1,101 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import DistributedCluster +import Logging + +struct App { + + init() {} + + func run(greeting: String?, for duration: Duration) async throws { + let leaderSystem = await ClusterSystem("leader") { settings in + // ... + } + let workerSystem = await ClusterSystem("worker") { settings in + settings.bindPort = 9999 + } + let anotherWorkerSystem = await ClusterSystem("worker") { settings in + settings.bindPort = 9999 + } + + workerSystem.log.warning("Joining...") + try await leaderSystem.cluster.joined(endpoint: workerSystem.settings.endpoint, within: .seconds(10)) + try await workerSystem.cluster.joined(endpoint: leaderSystem.settings.endpoint, within: .seconds(10)) + workerSystem.log.warning("Joined!") + + var workers: [Worker] = [] + for _ in 0..<9 { + await workers.append(Worker(actorSystem: workerSystem)) + } + for _ in 0..<9 { + await workers.append(Worker(actorSystem: anotherWorkerSystem)) + } + + let LeaderActor = try await Leader(actorSystem: leaderSystem) + + Task { + try await LeaderActor.work() + } + + try await leaderSystem.terminated + + + print("Sleeping...") + _Thread.sleep(duration) + } +} + + + +extension DistributedReception.Key { + static var workers: DistributedReception.Key { "workers" } +} + +distributed actor Leader { + + let workerPool: DistributedCluster.WorkerPool + + init(actorSystem: ClusterSystem) async throws { + self.actorSystem = actorSystem + self.workerPool = try await .init( + selector: .dynamic(.workers), + actorSystem: actorSystem + ) + } + + distributed func work() async throws { + try? await self.workerPool.submit(work: "my_job") + try await Task.sleep(for: .seconds(1)) + try await self.work() + } +} + + +distributed actor Worker: DistributedWorker { + + lazy var log: Logger = Logger(actor: self) + + init(actorSystem: ClusterSystem) async { + self.actorSystem = actorSystem + await actorSystem.receptionist.checkIn(self, with: .workers) + } + + distributed func submit(work: String) async throws -> String { + try await Task.sleep(for: .seconds(1)) + log.info("Done \(work)") + return "Done" + } +} + diff --git a/Samples/Sources/SimpleWorkerPool/SamplePrettyLogHandler.swift b/Samples/Sources/SimpleWorkerPool/SamplePrettyLogHandler.swift new file mode 100644 index 000000000..212e74477 --- /dev/null +++ b/Samples/Sources/SimpleWorkerPool/SamplePrettyLogHandler.swift @@ -0,0 +1,151 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2021 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Distributed +@testable import DistributedCluster +import Logging + +#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS) +import Darwin +#elseif os(Windows) +import CRT +#elseif canImport(Glibc) +import Glibc +#elseif canImport(WASILibc) +import WASILibc +#else +#error("Unsupported runtime") +#endif + +/// Logger that prints "pretty" for showcasing the cluster nicely in sample applications. +struct SamplePrettyLogHandler: LogHandler { + static let CONSOLE_RESET = "\u{001B}[0;0m" + static let CONSOLE_BOLD = "\u{001B}[1m" + + public static func make(label: String) -> SamplePrettyLogHandler { + return SamplePrettyLogHandler(label: label) + } + + private let label: String + + public var logLevel: Logger.Level = .info + + public var metadata = Logger.Metadata() + + public subscript(metadataKey metadataKey: String) -> Logger.Metadata.Value? { + get { + return self.metadata[metadataKey] + } + set { + self.metadata[metadataKey] = newValue + } + } + + // internal for testing only + internal init(label: String) { + self.label = label + } + + // TODO: this implementation of getting a nice printout is a bit messy, but good enough for our sample apps + public func log( + level: Logger.Level, + message: Logger.Message, + metadata: Logger.Metadata?, + source: String, + file: String, + function: String, + line: UInt + ) { + var metadataString = "" + var nodeInfo = "" + + var effectiveMetadata = (metadata ?? [:]).merging(self.metadata, uniquingKeysWith: { _, new in new }) + if let node = effectiveMetadata.removeValue(forKey: "cluster/node") { + nodeInfo += "\(node)" + } + let label: String + if let path = effectiveMetadata.removeValue(forKey: "actor/path")?.description { + effectiveMetadata.removeValue(forKey: "actor/id") // duplicate information generally as path contains ID + label = path + } else { + label = "" + } + + if !effectiveMetadata.isEmpty { + metadataString = "\n// metadata:\n" + for key in effectiveMetadata.keys.sorted() { + let value: Logger.MetadataValue = effectiveMetadata[key]! + let valueDescription = self.prettyPrint(metadata: value) + + var allString = "\n// \"\(key)\": \(valueDescription)" + if allString.contains("\n") { + allString = String( + allString.split(separator: "\n").map { valueLine in + if valueLine.starts(with: "// ") { + return "\(valueLine)\n" + } else { + return "// \(valueLine)\n" + } + }.joined(separator: "") + ) + } + metadataString.append(allString) + } + metadataString = String(metadataString.dropLast(1)) + } + + let file = file.split(separator: "/").last ?? "" + let line = line + print("\(self.timestamp()) \(level) [\(nodeInfo)\(Self.CONSOLE_BOLD)\(label)\(Self.CONSOLE_RESET)][\(file):\(line)] \(message)\(metadataString)") + } + + internal func prettyPrint(metadata: Logger.MetadataValue) -> String { + var valueDescription = "" + switch metadata { + case .string(let string): + valueDescription = string + case .stringConvertible(let convertible): + valueDescription = convertible.description + case .array(let array): + valueDescription = "\n \(array.map { "\($0)" }.joined(separator: "\n "))" + case .dictionary(let metadata): + for k in metadata.keys { + valueDescription += "\(Self.CONSOLE_BOLD)\(k)\(Self.CONSOLE_RESET): \(self.prettyPrint(metadata: metadata[k]!))" + } + } + + return valueDescription + } + + private func timestamp() -> String { + var buffer = [Int8](repeating: 0, count: 255) + var timestamp = time(nil) + let localTime = localtime(×tamp) + // This format is pleasant to read in local sample apps: + strftime(&buffer, buffer.count, "%H:%M:%S", localTime) + // The usual full format is: + // strftime(&buffer, buffer.count, "%Y-%m-%dT%H:%M:%S%z", localTime) + return buffer.withUnsafeBufferPointer { + $0.withMemoryRebound(to: CChar.self) { + String(cString: $0.baseAddress!) + } + } + } +} + +extension DistributedActor where Self: CustomStringConvertible { + public nonisolated var description: String { + "\(Self.self)(\(self.id))" + } +} diff --git a/Samples/Sources/SimpleWorkerPool/boot.swift b/Samples/Sources/SimpleWorkerPool/boot.swift new file mode 100644 index 000000000..f420c3aac --- /dev/null +++ b/Samples/Sources/SimpleWorkerPool/boot.swift @@ -0,0 +1,38 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Distributed +import DistributedCluster +import Logging +import NIO + +typealias DefaultDistributedActorSystem = ClusterSystem + +@main enum Main { + static func main() async { + print("===-----------------------------------------------------===") + print("| Gossip Sample App |") + print("| |") + print("| USAGE: swift run SimpleWorkerPool |") + print("===-----------------------------------------------------===") + + LoggingSystem.bootstrap(SamplePrettyLogHandler.init) + + let duration = Duration.seconds(60) + + let greeting = CommandLine.arguments.dropFirst().first + + try! await App().run(greeting: greeting, for: duration) + } +} diff --git a/Sources/DistributedCluster/ActorLogging.swift b/Sources/DistributedCluster/ActorLogging.swift index ce0360033..7cff50295 100644 --- a/Sources/DistributedCluster/ActorLogging.swift +++ b/Sources/DistributedCluster/ActorLogging.swift @@ -74,7 +74,8 @@ internal final class LoggingContext { /// /// The preferred way of obtaining a logger for an actor or system is `context.log` or `system.log`, rather than creating new ones. extension Logger { - /// Create a logger specific to this actor. + /// Create a logger specific to this distributed actor, it will contain metadata for the actor's ID, + /// and default log level as configured by the actor's default log level (see ``LoggingSettings/baseLogger``). public init(actor: Act) where Act.ActorSystem == ClusterSystem { var log = actor.actorSystem.settings.logging.baseLogger log[metadataKey: "actor/path"] = "\(actor.id.path)" @@ -93,7 +94,6 @@ extension Logger { } } -// TODO: implement logging infrastructure - pipe as messages to dedicated logging actor struct ActorOriginLogHandler: LogHandler { public static func _createFormatter() -> DateFormatter { let formatter = DateFormatter() diff --git a/Sources/DistributedCluster/Cluster/ClusterShell.swift b/Sources/DistributedCluster/Cluster/ClusterShell.swift index 7137b277b..e42116c26 100644 --- a/Sources/DistributedCluster/Cluster/ClusterShell.swift +++ b/Sources/DistributedCluster/Cluster/ClusterShell.swift @@ -148,12 +148,12 @@ internal class ClusterShell { return } - system.log.warning("Terminate existing association [\(reflecting: remoteNode)].") + system.log.notice("Terminate existing association [\(reflecting: remoteNode)].") // notify the failure detector, that we shall assume this node as dead from here on. // it's gossip will also propagate the information through the cluster traceLog_Remote(system.cluster.node, "Finish terminate association [\(remoteNode)]: Notifying SWIM, .confirmDead") - system.log.warning("Confirm .dead to underlying SWIM, node: \(reflecting: remoteNode)") + system.log.debug("Confirm .dead to underlying SWIM, node: \(reflecting: remoteNode)") self._swimShell.confirmDead(node: remoteNode) // it is important that we first check the contains; as otherwise we'd re-add a .down member for what was already removed (!) diff --git a/Sources/DistributedCluster/Cluster/Reception/OperationLogDistributedReceptionist.swift b/Sources/DistributedCluster/Cluster/Reception/OperationLogDistributedReceptionist.swift index 8f822667a..83318963d 100644 --- a/Sources/DistributedCluster/Cluster/Reception/OperationLogDistributedReceptionist.swift +++ b/Sources/DistributedCluster/Cluster/Reception/OperationLogDistributedReceptionist.swift @@ -749,10 +749,16 @@ extension OpLogDistributedReceptionist { for subscription in subscriptions { for registration in registrations { - if subscription.tryOffer(registration: registration) { - self.log.notice("OFFERED \(registration.actorID) TO \(subscription)") + if subscription.tryOffer(registration: registration, log: log) { +// self.log.notice("OFFERED \(registration.actorID) subscription", metadata: [ +// "peer": "\(registration.actorID)", +// "subscription": "\(subscription)", +// ]) } else { - self.log.notice("DROPPED \(registration.actorID) TO \(subscription)") +// self.log.notice("DROPPED \(registration.actorID) from subscription", metadata: [ +// "peer": "\(registration.actorID)", +// "subscription": "\(subscription)", +// ]) } } } @@ -943,6 +949,12 @@ extension OpLogDistributedReceptionist { } case .membershipChange(let change): + // FIXME: more proper ignoring the 'clusterd' node + if change.node.endpoint.host == "127.0.0.1" && + change.node.endpoint.port == 3137 { + return // skip the Clusterd node + } + guard let effectiveChange = self.membership.applyMembershipChange(change) else { return } diff --git a/Sources/DistributedCluster/ClusterSystem.swift b/Sources/DistributedCluster/ClusterSystem.swift index 33d49f353..205ffad63 100644 --- a/Sources/DistributedCluster/ClusterSystem.swift +++ b/Sources/DistributedCluster/ClusterSystem.swift @@ -348,12 +348,14 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable { let lazyReceptionist = try! self._prepareSystemActor(Receptionist.naming, receptionistBehavior, props: ._wellKnown) _ = self._receptionistRef.storeIfNilThenLoad(Box(lazyReceptionist.ref)) - await _Props.$forSpawn.withValue(OpLogDistributedReceptionist.props) { - let receptionist = await OpLogDistributedReceptionist( - settings: self.settings.receptionist, - system: self - ) - self._receptionistStore = receptionist + if !self._isClusterd { + await _Props.$forSpawn.withValue(OpLogDistributedReceptionist.props) { + let receptionist = await OpLogDistributedReceptionist( + settings: self.settings.receptionist, + system: self + ) + self._receptionistStore = receptionist + } } // downing strategy (automatic downing) @@ -384,13 +386,13 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable { await self.settings.plugins.startAll(self) if settings.enabled { - self.log.info("ClusterSystem [\(self.name)] initialized, listening on: \(self.settings.bindNode): \(self.cluster.ref)") + self.log.notice("ClusterSystem [\(self.name)] initialized, listening on: \(self.settings.bindNode): \(self.cluster.ref)") - self.log.info("Setting in effect: .autoLeaderElection: \(self.settings.autoLeaderElection)") - self.log.info("Setting in effect: .downingStrategy: \(self.settings.downingStrategy)") - self.log.info("Setting in effect: .onDownAction: \(self.settings.onDownAction)") + self.log.notice("Setting in effect: .autoLeaderElection: \(self.settings.autoLeaderElection)") + self.log.notice("Setting in effect: .downingStrategy: \(self.settings.downingStrategy)") + self.log.notice("Setting in effect: .onDownAction: \(self.settings.onDownAction)") } else { - self.log.info("ClusterSystem [\(self.name)] initialized; Cluster disabled, not listening for connections.") + self.log.notice("ClusterSystem [\(self.name)] initialized; Cluster disabled, not listening for connections.") } } @@ -464,7 +466,6 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable { public func wait() async throws { // TODO: implement without blocking the internal task; try await Task.detached { - print("BLOCKING ON receptacle") if let error = self.receptacle.wait() { throw error } @@ -484,6 +485,11 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable { self.shutdownFlag.load(ordering: .relaxed) > 0 } + internal var _isClusterd: Bool { + self.cluster.endpoint.host == "127.0.0.1" && + self.cluster.endpoint.port == 3137 + } + /// Forcefully stops this actor system and all actors that live within it. /// This is an asynchronous operation and will be executed on a separate thread. /// @@ -1040,7 +1046,7 @@ extension ClusterSystem { } // Spawn a behavior actor for it: - let behavior = InvocationBehavior.behavior(instance: Weak(actor)) + let behavior = InvocationBehavior.behavior(instance: WeakLocalRef(actor)) let ref = self._spawnDistributedActor(behavior, identifiedBy: actor.id) // Store references @@ -1619,9 +1625,11 @@ public struct ClusterInvocationResultHandler: DistributedTargetInvocationResultH case .all: // compiler gets confused if this is grouped together with above reply = .init(callID: callID, error: codableError) default: + print("ERROR: \(error)") reply = .init(callID: callID, error: GenericRemoteCallError(errorType: errorType)) } } else { + print("ERROR: \(error)") reply = .init(callID: callID, error: GenericRemoteCallError(errorType: errorType)) } try await channel.writeAndFlush(TransportEnvelope(envelope: Payload(payload: .message(reply)), recipient: recipient)) diff --git a/Sources/DistributedCluster/Concurrency/_CancellableCheckedContinuation.swift b/Sources/DistributedCluster/Concurrency/_CancellableCheckedContinuation.swift new file mode 100644 index 000000000..fd1acfe4c --- /dev/null +++ b/Sources/DistributedCluster/Concurrency/_CancellableCheckedContinuation.swift @@ -0,0 +1,135 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2024 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import DistributedActorsConcurrencyHelpers +import NIOConcurrencyHelpers + +/// A checked continuation that offers easier APIs for working with cancellation, +/// as well as has its unique identity. +internal final class ClusterCancellableCheckedContinuation: + Hashable, @unchecked Sendable where Success: Sendable { + + private struct _State: Sendable { + var cancelled: Bool = false + var onCancel: (@Sendable (ClusterCancellableCheckedContinuation) -> Void)? + var continuation: CheckedContinuation? + } + + private let state: NIOLockedValueBox<_State> = .init(_State()) + + fileprivate init() {} + + func setContinuation(_ continuation: CheckedContinuation) -> Bool { + var alreadyCancelled = false + state.withLockedValue { state in + if state.cancelled { + alreadyCancelled = true + } else { + state.continuation = continuation + } + } + if alreadyCancelled { + continuation.resume(throwing: CancellationError()) + } + return !alreadyCancelled + } + + /// Register a cancellation handler, or call it immediately if the continuation was already cancelled. + @Sendable + func onCancel(handler: @Sendable @escaping (ClusterCancellableCheckedContinuation) -> Void) { + var alreadyCancelled: Bool = state.withLockedValue { state in + if state.cancelled { + return true + } + + state.onCancel = handler + return false + } + if alreadyCancelled { + handler(self) + } + } + + private func withContinuation(cancelled: Bool = false, _ operation: (CheckedContinuation) -> Void) { + var safeContinuation: CheckedContinuation? + var safeOnCancel: (@Sendable (ClusterCancellableCheckedContinuation) -> Void)? + state.withLockedValue { (state: inout _State) -> Void in + state.cancelled = state.cancelled || cancelled + safeContinuation = state.continuation + safeOnCancel = state.onCancel + state.continuation = nil + state.onCancel = nil + } + if let safeContinuation { + operation(safeContinuation) + } + if cancelled { + safeOnCancel?(self) + } + } + + func resume(returning value: Success) { + withContinuation { + $0.resume(returning: value) + } + } + + func resume(throwing error: any Error) { + withContinuation { + $0.resume(throwing: error) + } + } + + var isCancelled: Bool { + state.withLockedValue { $0.cancelled } + } + + func cancel() { + withContinuation(cancelled: true) { + $0.resume(throwing: CancellationError()) + } + } +} + +extension ClusterCancellableCheckedContinuation where Success == Void { + func resume() { + self.resume(returning: ()) + } +} +extension ClusterCancellableCheckedContinuation { + static func == (lhs: ClusterCancellableCheckedContinuation, rhs: ClusterCancellableCheckedContinuation) -> Bool { + return ObjectIdentifier(lhs) == ObjectIdentifier(rhs) + } + func hash(into hasher: inout Hasher) { + hasher.combine(ObjectIdentifier(self)) + } +} + + +func _withClusterCancellableCheckedContinuation( + of successType: Success.Type = Success.self, + _ body: @escaping (ClusterCancellableCheckedContinuation) -> Void, + function: String = #function) async throws -> Success + where Success: Sendable { + let cccc = ClusterCancellableCheckedContinuation() + return try await withTaskCancellationHandler { + return try await withCheckedThrowingContinuation(function: function) { continuation in + if cccc.setContinuation(continuation) { + body(cccc) + } + } + } onCancel: { + cccc.cancel() + } +} \ No newline at end of file diff --git a/Sources/DistributedCluster/Docs.docc/Receptionist.md b/Sources/DistributedCluster/Docs.docc/Receptionist.md index 8dd243454..60adc5453 100644 --- a/Sources/DistributedCluster/Docs.docc/Receptionist.md +++ b/Sources/DistributedCluster/Docs.docc/Receptionist.md @@ -74,7 +74,7 @@ Once that actor is deinitialized, that task should be cancelled as well, which w ```swift distributed actor Boss: LifecycleWatch { - var workers: WeakActorDictionary = [:] + var workers: WeakLocalRefDictionary = [:] var listingTask: Task? diff --git a/Sources/DistributedCluster/InvocationBehavior.swift b/Sources/DistributedCluster/InvocationBehavior.swift index f641f65d2..65e58c4f5 100644 --- a/Sources/DistributedCluster/InvocationBehavior.swift +++ b/Sources/DistributedCluster/InvocationBehavior.swift @@ -34,7 +34,7 @@ public struct InvocationMessage: Sendable, Codable, CustomStringConvertible { // FIXME(distributed): remove [#957](https://github.com/apple/swift-distributed-actors/issues/957) enum InvocationBehavior { - static func behavior(instance weakInstance: Weak) -> _Behavior { + static func behavior(instance weakInstance: WeakLocalRef) -> _Behavior { return _Behavior.setup { context in return ._receiveMessageAsync { (message) async throws -> _Behavior in guard let instance = weakInstance.actor else { diff --git a/Sources/DistributedCluster/Pattern/WorkerPool.swift b/Sources/DistributedCluster/Pattern/WorkerPool.swift index 166d6a5e0..e91067563 100644 --- a/Sources/DistributedCluster/Pattern/WorkerPool.swift +++ b/Sources/DistributedCluster/Pattern/WorkerPool.swift @@ -14,13 +14,15 @@ import Distributed import Logging +import OrderedCollections // ==== ---------------------------------------------------------------------------------------------------------------- // MARK: Worker -public protocol DistributedWorker: DistributedActor { - associatedtype WorkItem: Codable - associatedtype WorkResult: Codable +/// Protocol to be implemented by workers participating in a simple ``WorkerPool``. +public protocol DistributedWorker: DistributedActor where ActorSystem == ClusterSystem { + associatedtype WorkItem: Sendable & Codable + associatedtype WorkResult: Sendable & Codable distributed func submit(work: WorkItem) async throws -> WorkResult } @@ -30,21 +32,25 @@ public protocol DistributedWorker: DistributedActor { /// A `WorkerPool` represents a pool of actors that are all equally qualified to handle incoming work items. /// -/// Pool members may be local or remote, // TODO: and there should be ways to say "prefer local or something". -/// /// A pool can populate its member list using the `Receptionist` mechanism, and thus allows members to join and leave /// dynamically, e.g. if a node joins or removes itself from the cluster. -/// -// TODO: A pool can be configured to terminate itself when any of its workers terminate or attempt to spawn replacements. public distributed actor WorkerPool: DistributedWorker, LifecycleWatch, CustomStringConvertible where Worker.ActorSystem == ClusterSystem { public typealias ActorSystem = ClusterSystem public typealias WorkItem = Worker.WorkItem public typealias WorkResult = Worker.WorkResult + lazy var log: Logger = Logger(actor: self) + // Don't store `WorkerPoolSettings` or `Selector` because it would cause `WorkerPool` // to hold on to `Worker` references and prevent them from getting terminated. - private let whenAllWorkersTerminated: AllWorkersTerminatedDirective - private let logLevel: Logger.Level + private var whenAllWorkersTerminated: AllWorkersTerminatedDirective { + self.settings.whenAllWorkersTerminated + } + private var logLevel: Logger.Level { + self.settings.logLevel + } + + private let settings: WorkerPoolSettings /// `Task` for subscribing to receptionist listing in case of `Selector.dynamic` mode. private var newWorkersSubscribeTask: Task? @@ -53,33 +59,34 @@ public distributed actor WorkerPool: DistributedWorke // MARK: WorkerPool state /// The worker pool. Worker will be selected round-robin. - private var workers: [Worker.ID: Weak] = [:] + private var workers: OrderedSet> = [] private var roundRobinPos = 0 /// Boolean flag to help determine if pool becomes empty because at least one worker has terminated. private var hasTerminatedWorkers = false /// Control for waiting and getting notified for new worker. - private var newWorkerContinuations: [CheckedContinuation] = [] + private var newWorkerContinuations: OrderedSet> = [] - // TODO: remove the convenience marker; since SE-0327 we don't need it anymore: https://github.com/apple/swift-evolution/blob/main/proposals/0327-actor-initializers.md - public convenience init(selector: Selector, actorSystem: ActorSystem) async throws { + public init(selector: Selector, actorSystem: ActorSystem) async throws { try await self.init(settings: .init(selector: selector), actorSystem: actorSystem) } public init(settings: WorkerPoolSettings, actorSystem system: ActorSystem) async throws { try settings.validate() - + self.settings = settings self.actorSystem = system - self.whenAllWorkersTerminated = settings.whenAllWorkersTerminated - self.logLevel = settings.logLevel switch settings.selector.underlying { case .dynamic(let key): self.newWorkersSubscribeTask = Task { for await worker in await self.actorSystem.receptionist.listing(of: key) { - self.actorSystem.log.log(level: self.logLevel, "Got listing member for \(key): \(worker)") - self.workers[worker.id] = Weak(worker) + self.workers.append(WeakLocalRef(worker)) + // Notify those waiting for new worker + log.log(level: self.logLevel, "Updated workers for \(key)", metadata: [ + "workers": "\(self.workers)", + "newWorkerContinuations": "\(self.newWorkerContinuations.count)" + ]) for (i, continuation) in self.newWorkerContinuations.enumerated().reversed() { continuation.resume() self.newWorkerContinuations.remove(at: i) @@ -88,8 +95,10 @@ public distributed actor WorkerPool: DistributedWorke } } case .static(let workers): + self.workers.reserveCapacity(workers.count) workers.forEach { worker in - self.workers[worker.id] = Weak(worker) + // self.workers[worker.id] = WeakDistributedActorRef(worker) + self.workers.append(WeakLocalRef(worker)) watchTermination(of: worker) } } @@ -100,8 +109,15 @@ public distributed actor WorkerPool: DistributedWorke } public distributed func submit(work: WorkItem) async throws -> WorkResult { - let worker = try await self.selectWorker() - self.actorSystem.log.log(level: self.logLevel, "Submitting [\(work)] to [\(worker)]") + log.log(level: self.logLevel, "Incoming work, selecting worker", metadata: [ + "workers/count": "\(self.workers.count)", + "worker/item": "\(work)", + ]) + let worker = try await self.selectWorker(for: work) + log.log(level: self.logLevel, "Selected worker, submitting [\(work)] to [\(worker)]", metadata: [ + "worker": "\(worker.id)", + "workers/count": "\(self.workers.count)", + ]) return try await worker.submit(work: work) } @@ -110,45 +126,76 @@ public distributed actor WorkerPool: DistributedWorke self.workers.count } - private func selectWorker() async throws -> Worker { + private func selectWorker(for work: WorkItem) async throws -> Worker { // Wait if we haven't received the initial workers listing yet. // Otherwise, the pool has become empty because all workers have been terminated, // in which case we either wait for new worker or throw error. if self.workers.isEmpty { switch (self.hasTerminatedWorkers, self.whenAllWorkersTerminated) { - case (false, _), (true, .awaitNewWorkers): - self.actorSystem.log.log(level: self.logLevel, "Worker pool is empty, waiting for new worker.") - await withCheckedContinuation { (continuation: CheckedContinuation) in - self.newWorkerContinuations.append(continuation) + case (false, _), // if we never received any workers yet, wait for some to show up. + (true, .awaitNewWorkers): + log.log(level: self.logLevel, "Worker pool is empty, waiting for new worker.") + + try await _withClusterCancellableCheckedContinuation(of: Void.self) { cccc in + self.newWorkerContinuations.append(cccc) + let log = self.log + cccc.onCancel { cccc in + log.debug("Member selection was cancelled, call probably timed-out, schedule removal of continuation") + cccc.resume(throwing: CancellationError()) + Task { + await self.removeWorkerWaitContinuation(cccc) + } + } } case (true, .throw(let error)): throw error } } - let selectedWorkerID = self.nextWorkerID() - if let worker = self.workers[selectedWorkerID]?.actor { - return worker - } else { - // Worker terminated; clean up and try again - self.terminated(actor: selectedWorkerID) - return try await self.selectWorker() + guard let selected = nextWorker() else { + switch self.whenAllWorkersTerminated { + case .awaitNewWorkers: + // try again + return try await selectWorker(for: work) + case .throw(let error): + throw error + } } + + guard let selectedWorker = selected.actor else { + log.debug("Selected actor has deallocated: \(selected.id)!") + // remove this actor from the pool + self.terminated(actor: selected.id) + // and, try again + return try await selectWorker(for: work) + } + + return selectedWorker } - private func nextWorkerID() -> Worker.ID { - var ids = Array(self.workers.keys) - ids.sort { l, r in l.description < r.description } + private func removeWorkerWaitContinuation(_ cccc: ClusterCancellableCheckedContinuation) { + self.newWorkerContinuations.remove(cccc) + } - let selected = ids[self.roundRobinPos] - self.roundRobinPos = (self.roundRobinPos + 1) % ids.count - return selected + private func nextWorker() -> WeakLocalRef? { + switch settings.strategy.underlying { + case .random: + return workers.shuffled().first + case .simpleRoundRobin: + if self.roundRobinPos >= self.workers.count { + self.roundRobinPos = 0 // loop around from zero + } + let selected = self.workers[self.roundRobinPos] + self.roundRobinPos = self.workers.index(after: self.roundRobinPos) % workers.count + return selected + } } public func terminated(actor id: Worker.ID) { - self.workers.removeValue(forKey: id) + log.debug("Worker terminated: \(id)", metadata: ["worker": "\(id)"]) + self.workers.remove(WeakLocalRef(forRemoval: id)) self.hasTerminatedWorkers = true - self.roundRobinPos = 0 + self.roundRobinPos = 0 // FIXME: naively reset the round robin counter; we should do better than that } public nonisolated var description: String { @@ -174,7 +221,6 @@ extension WorkerPool { /// of members to be statically provided etc. public struct Selector { enum _Selector { - // TODO: let awaitAtLeast: Int // before starting to direct traffic case dynamic(DistributedReception.Key) case `static`([Worker]) } @@ -200,6 +246,30 @@ extension WorkerPool { .init(underlying: .static(workers)) } } + + public struct Strategy { + enum _Strategy { + case random + case simpleRoundRobin + } + let underlying: _Strategy + + /// Simple random selection on every target worker selection. + public static var random: Strategy { + .init(underlying: .random) + } + + /// Round-robin strategy which attempts to go "around" known workers one-by-one + /// giving them equal amounts of work. This strategy is NOT strict, and when new + /// workers arrive at the pool it may result in submitting work to previously notified + /// workers as the round-robin strategy "resets". + /// + /// We could consider implementing a strict round robin strategy which remains strict even + /// as new workers arrive in the pool. + public static var simpleRoundRobin: Strategy { + .init(underlying: .simpleRoundRobin) + } + } } // ==== ---------------------------------------------------------------------------------------------------------------- @@ -250,16 +320,24 @@ public struct WorkerPoolSettings where Worker.ActorSy /// Configures how to select / discover actors for the pool. var selector: WorkerPool.Selector + /// Configures how the "next" worker is determined for submitting a work request. + /// Generally random strategies or a form of round robin are preferred, but we + /// could implement more sophisticated workload balancing/estimating strategies as well. + /// + /// Defaults to a simple round-robin strategy. + var strategy: WorkerPool.Strategy + /// Determine what action should be taken once the number of alive workers in the pool reaches zero (after being positive for at least a moment). /// /// The default value depends on the `selector` and is: /// - `.crash` for the `.static` selector, /// - `.awaitNewWorkers` for the `.dynamic` selector, as it is assumed that replacement workers will likely be spawned - // in place of terminated workers. Messages sent to the pool while no workers are available will be buffered (up to `noWorkersAvailableBufferSize` messages). + /// in place of terminated workers. Messages sent to the pool while no workers are available will be buffered (up to `noWorkersAvailableBufferSize` messages). var whenAllWorkersTerminated: WorkerPool.AllWorkersTerminatedDirective - public init(selector: WorkerPool.Selector) { + public init(selector: WorkerPool.Selector, strategy: WorkerPool.Strategy = .random) { self.selector = selector + self.strategy = strategy switch selector.underlying { case .dynamic: @@ -278,11 +356,11 @@ public struct WorkerPoolSettings where Worker.ActorSy case .static(let workers): if case .awaitNewWorkers = self.whenAllWorkersTerminated { let message = """ - WorkerPool configured as [.static(\(workers))], MUST NOT be configured to await for new workers \ - as new workers are impossible to spawn and add to the pool in the static configuration. The pool \ - MUST terminate when in .static mode and all workers terminate. Alternatively, use a .dynamic pool, \ - and provide an initial set of workers. - """ + WorkerPool configured as [.static(\(workers))], MUST NOT be configured to await for new workers \ + as new workers are impossible to spawn and add to the pool in the static configuration. The pool \ + MUST terminate when in .static mode and all workers terminate. Alternatively, use a .dynamic pool, \ + and provide an initial set of workers. + """ throw WorkerPoolError(.illegalAwaitNewWorkersForStaticPoolConfigured(message)) } default: diff --git a/Sources/DistributedCluster/Receptionist/DistributedReceptionist.swift b/Sources/DistributedCluster/Receptionist/DistributedReceptionist.swift index ece297095..20850da80 100644 --- a/Sources/DistributedCluster/Receptionist/DistributedReceptionist.swift +++ b/Sources/DistributedCluster/Receptionist/DistributedReceptionist.swift @@ -434,26 +434,19 @@ internal final class AnyDistributedReceptionListingSubscription: Hashable, @unch /// - Returns: true if the value was successfully offered func tryOffer(registration: VersionedRegistration, log: Logger? = nil) -> Bool { let oldSeenRegistrations = self.seenActorRegistrations - log?.debug("OLD SEEN: \(oldSeenRegistrations)") - log?.debug("registration: \(registration)") - log?.debug("registration version: \(registration.version)") self.seenActorRegistrations.merge(other: registration.version) - log?.debug("SEEN NOW: \(self.seenActorRegistrations)") switch self.seenActorRegistrations.compareTo(oldSeenRegistrations) { case .same: - log?.debug("->>> SAME") // the seen vector was unaffected by the merge, which means that the // incoming registration version was already seen, and thus we don't need to emit it again return false case .happenedAfter, .concurrent: - log?.debug("->>> happenedAfter || concurrent") // the incoming registration has not yet been seen before, // which means that we should emit the actor to the stream. self.onNext(registration.actorID) return true case .happenedBefore: - log?.debug("->>> happenedBefore") fatalError(""" It should not be possible for a *merged* version vector to be in the PAST as compared with itself before the merge Previously: \(oldSeenRegistrations) diff --git a/Sources/DistributedCluster/Serialization/Serialization.swift b/Sources/DistributedCluster/Serialization/Serialization.swift index 9c5ba7204..47137e362 100644 --- a/Sources/DistributedCluster/Serialization/Serialization.swift +++ b/Sources/DistributedCluster/Serialization/Serialization.swift @@ -262,8 +262,9 @@ extension Serialization { internal func makeCodableSerializer(_ type: Message.Type, manifest: Manifest) throws -> AnySerializer { switch manifest.serializerID { - case .doNotSerialize: - throw SerializationError(.noNeedToEnsureSerializer) +// case .doNotSerialize: +// fatalError("message type \(type)") +// throw SerializationError(.noNeedToEnsureSerializer) case Serialization.SerializerID.specializedWithTypeHint: guard let make = self.settings.specializedSerializerMakers[manifest] else { @@ -279,7 +280,8 @@ extension Serialization { serializer.setSerializationContext(self.context) return serializer - case Serialization.SerializerID.foundationJSON: + case Serialization.SerializerID.foundationJSON, + .doNotSerialize: let serializer = JSONCodableSerializer() serializer.setSerializationContext(self.context) return serializer @@ -812,6 +814,7 @@ public struct SerializationError: Error, CustomStringConvertible { internal init(_ error: _SerializationError, file: String = #fileID, line: UInt = #line) { self.underlying = _Storage(error: error, file: file, line: line) + print("ERROR: \(self)") } public var description: String { diff --git a/Sources/DistributedCluster/WeakActorDictionary.swift b/Sources/DistributedCluster/WeakActorDictionary.swift index f32e608fe..2b04582c3 100644 --- a/Sources/DistributedCluster/WeakActorDictionary.swift +++ b/Sources/DistributedCluster/WeakActorDictionary.swift @@ -2,7 +2,7 @@ // // This source file is part of the Swift Distributed Actors open source project // -// Copyright (c) 2022 Apple Inc. and the Swift Distributed Actors project authors +// Copyright (c) 2022-2024 Apple Inc. and the Swift Distributed Actors project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -15,22 +15,10 @@ import Distributed /// A dictionary which only weakly retains the -public struct WeakActorDictionary: ExpressibleByDictionaryLiteral +public struct WeakLocalRefDictionary: ExpressibleByDictionaryLiteral where Act.ID == ClusterSystem.ActorID { - var underlying: [ClusterSystem.ActorID: WeakContainer] - - final class WeakContainer { - weak var actor: Act? - - init(_ actor: Act) { - self.actor = actor - } - -// init(idForRemoval id: ClusterSystem.ActorID) { -// self.actor = nil -// } - } + var underlying: [ClusterSystem.ActorID: WeakLocalRef] /// Initialize an empty dictionary. public init() { @@ -50,10 +38,8 @@ public struct WeakActorDictionary: ExpressibleByDictionar /// Note that the dictionary only holds the actor weakly, /// so if no other strong references to the actor remain this dictionary /// will not contain the actor anymore. - /// - /// - Parameter actor: public mutating func insert(_ actor: Act) { - self.underlying[actor.id] = WeakContainer(actor) + self.underlying[actor.id] = WeakLocalRef(actor) } public mutating func getActor(identifiedBy id: ClusterSystem.ActorID) -> Act? { @@ -86,7 +72,7 @@ public struct WeakAnyDistributedActorDictionary { self.actor = actor } - init(idForRemoval id: ClusterSystem.ActorID) { + init(forRemoval id: ClusterSystem.ActorID) { self.actor = nil } } @@ -119,14 +105,73 @@ public struct WeakAnyDistributedActorDictionary { } } -final class Weak { - weak var actor: Act? +/// Distributed actor reference helper which avoids strongly retaining local actors, +/// in order no to accidentally extend their lifetimes. Specifically very useful +/// when designing library code which should NOT keep user-actors alive, and should +/// work with remote and local actors in the same way. +/// +/// The reference is *weak* when the actor is **local**, +/// in order to not prevent the actor from being deallocated when all **other** +/// references to it are released. +/// +/// The reference is *strong* when referencing a **remote** distributed actor, +/// a strong reference to a remote actor does not necessarily keep it alive, +/// however it allows keeping `weak var` references to remote distributed actors +/// without them being immediately released if they were obtained from a `resolve` +/// call for example -- as by design, no-one else will be retaining them, there +/// is a risk of always observing an immediately released reference. +/// +/// Rather than relying on reference counting for remote references, utilize the +/// `LifecycleWatch/watchTermination(of:)` lifecycle monitoring method. This +/// mechanism will invoke an actors ``LifecycleWatch/actorTerminated(_:)` when +/// the remote actor has terminated and we should clean it up locally. +/// +/// Generally, the pattern should be to store actor references local-weakly +/// when we "don't want to keep them alive" on behalf of the user, and at the +/// same time always use ``LifecycleMonitoring`` for handling their lifecycle - +/// regardless if the actor is local or remote, lifecycle monitoring behaves +/// in the expected way. +final class WeakLocalRef: Hashable where Act.ID == ClusterSystem.ActorID { + let id: Act.ID + + private weak var weakLocalRef: Act? + private let strongRemoteRef: Act? + + var actor: Act? { + self.strongRemoteRef ?? self.weakLocalRef + } init(_ actor: Act) { - self.actor = actor + if isDistributedKnownRemote(actor) { + self.weakLocalRef = nil + self.strongRemoteRef = actor + } else { + self.weakLocalRef = actor + self.strongRemoteRef = nil + } + self.id = actor.id } - init(idForRemoval id: ClusterSystem.ActorID) { - self.actor = nil + init(forRemoval id: ClusterSystem.ActorID) { + self.weakLocalRef = nil + self.strongRemoteRef = nil + self.id = id } + + func hash(into hasher: inout Hasher) { + hasher.combine(id) + } + + static func ==(lhs: WeakLocalRef, rhs: WeakLocalRef) -> Bool { + if lhs === rhs { + return true + } + if lhs.id != rhs.id { + return false + } + return true + } } + +@_silgen_name("swift_distributed_actor_is_remote") +internal func isDistributedKnownRemote(_ actor: AnyObject) -> Bool diff --git a/Sources/MultiNodeTestKitRunner/MultiNode+TestSuites.swift b/Sources/MultiNodeTestKitRunner/MultiNode+TestSuites.swift index c24ecf587..e60a62259 100644 --- a/Sources/MultiNodeTestKitRunner/MultiNode+TestSuites.swift +++ b/Sources/MultiNodeTestKitRunner/MultiNode+TestSuites.swift @@ -19,5 +19,4 @@ let MultiNodeTestSuites: [any MultiNodeTestSuite.Type] = [ MultiNodeConductorTests.self, MultiNodeClusterSingletonTests.self, MultiNodeReceptionistTests.self, - MultiNodeSimpleGossipTests.self, ] From ea2bfdb555cdb316880469e421dc9eee54fa714b Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Wed, 14 Aug 2024 16:38:01 +0900 Subject: [PATCH 4/4] wip --- Samples/Sources/SimpleWorkerPool/App.swift | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Samples/Sources/SimpleWorkerPool/App.swift b/Samples/Sources/SimpleWorkerPool/App.swift index aa1f3da7d..b0bcded7b 100644 --- a/Samples/Sources/SimpleWorkerPool/App.swift +++ b/Samples/Sources/SimpleWorkerPool/App.swift @@ -24,15 +24,16 @@ struct App { // ... } let workerSystem = await ClusterSystem("worker") { settings in - settings.bindPort = 9999 + settings.bindPort = .random(in: 8000...9999) } let anotherWorkerSystem = await ClusterSystem("worker") { settings in - settings.bindPort = 9999 + settings.bindPort = .random(in: 8000...9999) } workerSystem.log.warning("Joining...") try await leaderSystem.cluster.joined(endpoint: workerSystem.settings.endpoint, within: .seconds(10)) try await workerSystem.cluster.joined(endpoint: leaderSystem.settings.endpoint, within: .seconds(10)) + try anotherWorkerSystem.cluster.join(endpoint: leaderSystem.settings.endpoint) // no need to wait at all workerSystem.log.warning("Joined!") var workers: [Worker] = []