Skip to content

[PoC] distributed actor gossiper #1159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,19 @@ var targets: [PackageDescription.Target] = [
// Depend on tests to run:
"DistributedActorsMultiNodeTests",

// Dependencies:
"MultiNodeTestKit",
.product(name: "ArgumentParser", package: "swift-argument-parser"),
]
),

.executableTarget(
name: "Clusterd",
dependencies: [
"DistributedCluster",
.product(name: "ArgumentParser", package: "swift-argument-parser"),
]
),

// ==== ------------------------------------------------------------------------------------------------------------
// MARK: Multi Node Tests

Expand Down Expand Up @@ -179,7 +186,7 @@ var dependencies: [Package.Dependency] = [

// ~~~ Swift libraries ~~~
.package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0-beta"),
.package(url: "https://github.com/apple/swift-collections", from: "1.0.5"),
.package(url: "https://github.com/apple/swift-collections", from: "1.1.0"),

// ~~~ Observability ~~~
.package(url: "https://github.com/apple/swift-log", from: "1.0.0"),
Expand Down
2 changes: 2 additions & 0 deletions Samples/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.idea
.build
15 changes: 14 additions & 1 deletion Samples/Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,26 @@ var targets: [PackageDescription.Target] = [
dependencies: [
.product(name: "DistributedCluster", package: "swift-distributed-actors"),
],
path: "Sources/SampleDiningPhilosophers",
exclude: [
"dining-philosopher-fsm.graffle",
"dining-philosopher-fsm.svg",
]
),

.executableTarget(
name: "SimpleGossip",
dependencies: [
.product(name: "DistributedCluster", package: "swift-distributed-actors"),
]
),

.executableTarget(
name: "SimpleWorkerPool",
dependencies: [
.product(name: "DistributedCluster", package: "swift-distributed-actors"),
]
),

/* --- tests --- */

// no-tests placeholder project to not have `swift test` fail on Samples/
Expand Down
41 changes: 41 additions & 0 deletions Samples/Sources/SimpleGossip/App.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import DistributedCluster

struct App {

let port: Int

init(port: Int) {
self.port = port
}

func run(greeting: String?, for duration: Duration) async throws {
let system = await ClusterSystem("GossipSystem") { settings in
settings.endpoint.port = .random(in: 7000...9999)
settings.discovery = .clusterd
}
let peer = await GreetingGossipPeer(actorSystem: system)

if let greeting {
try await peer.setGreeting(greeting)
}

print("Sleeping...")
_Thread.sleep(duration)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guess if there is a plan to update repo, we can stop using _Thread and use Task.sleep(for:) instead already?

}
}


112 changes: 112 additions & 0 deletions Samples/Sources/SimpleGossip/Gossiper.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import DistributedCluster
import Logging

protocol GossipAcknowledgement<Payload>: Sendable, Codable{
associatedtype Payload where Payload: Sendable & Codable

var sequenceNumber: UInt { get }

// Optional support for simple push-pull gossip;
// when we receive an "old" version, we can immediately reply with a newer one
var supersededBy: Payload? { get }
}

protocol Gossiper<Gossip, Acknowledgement>: DistributedActor where ActorSystem == ClusterSystem {
associatedtype Gossip: Codable
associatedtype Acknowledgement: GossipAcknowledgement<Gossip>

var gossipGroupID: String { get }

var gossipPeers: Set<Self> { get set }

func makePayload() async -> Gossip

var gossipBaseInterval: Duration { get }

/// Receive a gossip from some peer.
///
/// Missing an acknowledgement will cause another delivery attempt eventually
distributed func gossip(_ gossip: Gossip, from peer: ID) async -> Acknowledgement

// ==== Default impls --------------
func startGossip() async throws

func gossipRound() async throws
}

extension Gossiper {

var log: Logger {
Logger(actor: self)
}

var gossipBaseInterval: Duration {
.seconds(1)
}

func startGossip() async throws {
var sleepIntervalBackoff = Backoff.exponential(initialInterval: self.gossipBaseInterval, randomFactor: 0.50)

log.warning("Start gossip: \(self.id)")

await actorSystem.receptionist.checkIn(self)

let listingTask = Task {
for try await peer in await actorSystem.receptionist.listing(of: .init(Self.self, id: self.gossipGroupID)) {
self.gossipPeers.insert(peer)
log.warning("\(self.id) discovered [\(peer.id)]", metadata: [
"gossipPeers/count": "\(gossipPeers.count)"
])
}
}
defer {
listingTask.cancel()
}

while !Task.isCancelled {
let duration = sleepIntervalBackoff.next()!
// log.notice("Gossip sleep: \(duration.prettyDescription)")
try await Task.sleep(for: duration)

do {
try await self.gossipRound()
} catch {
log.warning("Gossip round failed: \(error)") // TODO: log target and more data
}
}

log.notice("Gossip terminated...")
}

func gossipRound() async throws {
guard let target = self.gossipPeers.shuffled().first else {
log.info("No peer to gossip with...")
return
}

guard target.id != self.id else {
return try await gossipRound() // try again
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also while testing and while cluster is forming this function goes in loop heavily 🤔 maybe adding something like:

Suggested change
return try await gossipRound() // try again
if self.gossipPeers.count == 1 {
try await Task.sleep(for: .seconds(0.1))
}
return try await gossipRound() // try again

}
Comment on lines +96 to +103
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe missing something, but a bit confusing guard, like why not to filter for id in set?


log.debug("Select peer: \(target.id)")

let gossip = await makePayload()
let ack = try await target.gossip(gossip, from: self.id)
log.notice("Ack: \(ack)")
}

}
38 changes: 38 additions & 0 deletions Samples/Sources/SimpleGossip/GreetingGossip+Messages.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import DistributedCluster

struct GreetingGossip: Codable {
var sequenceNumber: UInt
var text: String
}

struct GreetingGossipAck: GossipAcknowledgement {
var sequenceNumber: UInt

// push-pull gossip; when we receive an "old" version, we can immediately reply with a newer one
var supersededBy: GreetingGossip?

init(sequenceNumber: UInt) {
self.sequenceNumber = sequenceNumber
self.supersededBy = nil
}

init(sequenceNumber: UInt, supersededBy: GreetingGossip) {
self.sequenceNumber = sequenceNumber
self.supersededBy = supersededBy
}
}

79 changes: 79 additions & 0 deletions Samples/Sources/SimpleGossip/GreetingGossip.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import DistributedCluster

distributed actor GreetingGossipPeer: Gossiper {
typealias ActorSystem = ClusterSystem

typealias Gossip = GreetingGossip
typealias Acknowledgement = GreetingGossipAck

// This gossip peer belongs to some group identifier by this
@ActorID.Metadata(\.receptionID)
var gossipGroupID: String

private var activeGreeting = GreetingGossip(sequenceNumber: 0, text: "<no greeting>")

var gossipPeers: Set<GreetingGossipPeer> = []
private var peerDiscoveryTask: Task<Void, Error>?

init(actorSystem: ActorSystem) async {
self.actorSystem = actorSystem
self.gossipGroupID = "greeting-gossip"

// FIXME: using Self.self in the init here will crash
// peerDiscoveryTask = Task {
// for try await peer in await actorSystem.receptionist.listing(of: .init(GreetingGossipPeer.self, id: self.gossipGroupID)) {
// self.peers.insert(peer)
// }
// }

let gossipTask = Task {
try await startGossip()
}
}

distributed func greet(name: String) -> String {
return "\(self.activeGreeting.text), \(name)!"
}

// Call these e.g. locally to set a "new value"; you could set some date or order here as well
distributed func setGreeting(_ text: String) {
// Some way to come up with "next" value
let number = self.activeGreeting.sequenceNumber
self.activeGreeting = .init(sequenceNumber: number + 1, text: text)
}

func makePayload() async -> Gossip {
self.activeGreeting
}

distributed func gossip(_ gossip: Gossip, from peer: ID) async -> Acknowledgement {
guard activeGreeting.sequenceNumber < gossip.sequenceNumber else {
// Tell the caller immediately that we actually have a more up-to-date value
return .init(sequenceNumber: gossip.sequenceNumber, supersededBy: self.activeGreeting)
}

self.activeGreeting = gossip

return .init(sequenceNumber: gossip.sequenceNumber)
}

// distributed func acknowledgement(_ acknowledgement: Acknowledgement,
// from peer: ID,
// confirming gossip: Gossip) {
//
// }
Comment on lines +74 to +78
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it needed? Not like nitpicking, but rather thinking about future refactoring—I've noticed there are some commented code here and there (not much though) and sometimes not sure if I should just remove it or not.

}
Loading