diff --git a/Examples/HelloJSON/Package.swift b/Examples/HelloJSON/Package.swift index 9f26ff9d..d8b5c162 100644 --- a/Examples/HelloJSON/Package.swift +++ b/Examples/HelloJSON/Package.swift @@ -15,10 +15,7 @@ let package = Package( // during CI, the dependency on local version of swift-aws-lambda-runtime is added dynamically below .package( url: "https://github.com/swift-server/swift-aws-lambda-runtime.git", - branch: "ff-package-traits", - traits: [ - .trait(name: "FoundationJSONSupport") - ] + branch: "main" ) ], targets: [ diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index ccb45ae5..76bad460 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -48,11 +48,9 @@ extension Lambda { @usableFromInline static func withLocalServer( invocationEndpoint: String? = nil, + logger: Logger, _ body: sending @escaping () async throws -> Void ) async throws { - var logger = Logger(label: "LocalServer") - logger.logLevel = Lambda.env("LOG_LEVEL").flatMap(Logger.Level.init) ?? .info - try await LambdaHTTPServer.withLocalServer( invocationEndpoint: invocationEndpoint, logger: logger @@ -133,6 +131,7 @@ internal struct LambdaHTTPServer { } } + // it's ok to keep this at `info` level because it is only used for local testing and unit tests logger.info( "Server started and listening", metadata: [ @@ -202,12 +201,18 @@ internal struct LambdaHTTPServer { return result case .serverReturned(let result): - logger.error( - "Server shutdown before closure completed", - metadata: [ - "error": "\(result.maybeError != nil ? "\(result.maybeError!)" : "none")" - ] - ) + + if (result.maybeError as? CancellationError) != nil { + logger.trace("Server's task cancelled") + } else { + logger.error( + "Server shutdown before closure completed", + metadata: [ + "error": "\(result.maybeError != nil ? "\(result.maybeError!)" : "none")" + ] + ) + } + switch await group.next()! { case .closureResult(let result): return result @@ -265,9 +270,12 @@ internal struct LambdaHTTPServer { } } } + } catch let error as CancellationError { + logger.trace("The task was cancelled", metadata: ["error": "\(error)"]) } catch { logger.error("Hit error: \(error)") } + } onCancel: { channel.channel.close(promise: nil) } diff --git a/Sources/AWSLambdaRuntime/LambdaRuntime.swift b/Sources/AWSLambdaRuntime/LambdaRuntime.swift index 7aba2812..5df458bc 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntime.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntime.swift @@ -15,6 +15,7 @@ import Logging import NIOConcurrencyHelpers import NIOCore +import Synchronization #if canImport(FoundationEssentials) import FoundationEssentials @@ -22,9 +23,13 @@ import FoundationEssentials import Foundation #endif +// This is our gardian to ensure only one LambdaRuntime is running at the time +// We use an Atomic here to ensure thread safety +private let _isRunning = Atomic(false) + // We need `@unchecked` Sendable here, as `NIOLockedValueBox` does not understand `sending` today. // We don't want to use `NIOLockedValueBox` here anyway. We would love to use Mutex here, but this -// sadly crashes the compiler today. +// sadly crashes the compiler today (on Linux). public final class LambdaRuntime: @unchecked Sendable where Handler: StreamingLambdaHandler { // TODO: We want to change this to Mutex as soon as this doesn't crash the Swift compiler on Linux anymore @usableFromInline @@ -58,8 +63,22 @@ public final class LambdaRuntime: @unchecked Sendable where Handler: St } #endif - @inlinable + /// Make sure only one run() is called at a time + // @inlinable internal func _run() async throws { + + // we use an atomic global variable to ensure only one LambdaRuntime is running at the time + let (_, original) = _isRunning.compareExchange(expected: false, desired: true, ordering: .acquiringAndReleasing) + + // if the original value was already true, run() is already running + if original { + throw LambdaRuntimeError(code: .moreThanOneLambdaRuntimeInstance) + } + + defer { + _isRunning.store(false, ordering: .releasing) + } + let handler = self.handlerMutex.withLockedValue { handler in let result = handler handler = nil @@ -96,8 +115,10 @@ public final class LambdaRuntime: @unchecked Sendable where Handler: St #if LocalServerSupport // we're not running on Lambda and we're compiled in DEBUG mode, // let's start a local server for testing - try await Lambda.withLocalServer(invocationEndpoint: Lambda.env("LOCAL_LAMBDA_SERVER_INVOCATION_ENDPOINT")) - { + try await Lambda.withLocalServer( + invocationEndpoint: Lambda.env("LOCAL_LAMBDA_SERVER_INVOCATION_ENDPOINT"), + logger: self.logger + ) { try await LambdaRuntimeClient.withRuntimeClient( configuration: .init(ip: "127.0.0.1", port: 7000), diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeError.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeError.swift index 1a52801e..f3d3933b 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeError.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeError.swift @@ -16,6 +16,7 @@ package struct LambdaRuntimeError: Error { @usableFromInline package enum Code: Sendable { + /// internal error codes for LambdaRuntimeClient case closingRuntimeClient case connectionToControlPlaneLost @@ -34,6 +35,9 @@ package struct LambdaRuntimeError: Error { case missingLambdaRuntimeAPIEnvironmentVariable case runtimeCanOnlyBeStartedOnce case invalidPort + + /// public error codes for LambdaRuntime + case moreThanOneLambdaRuntimeInstance } @usableFromInline diff --git a/Sources/MockServer/MockHTTPServer.swift b/Sources/MockServer/MockHTTPServer.swift index 0849e325..78685c52 100644 --- a/Sources/MockServer/MockHTTPServer.swift +++ b/Sources/MockServer/MockHTTPServer.swift @@ -123,6 +123,7 @@ struct HttpServer { } } } + // it's ok to keep this at `info` level because it is only used for local testing and unit tests logger.info("Server shutting down") } diff --git a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift index 62255d66..cc901461 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift +++ b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift @@ -147,12 +147,11 @@ struct LambdaRuntimeClientTests { (event: String, context: LambdaContext) in "Hello \(event)" } - var logger = Logger(label: "LambdaRuntime") - logger.logLevel = .debug + let serviceGroup = ServiceGroup( services: [runtime], gracefulShutdownSignals: [.sigterm, .sigint], - logger: logger + logger: Logger(label: "TestLambdaRuntimeGracefulShutdown") ) try await withThrowingTaskGroup(of: Void.self) { group in group.addTask { diff --git a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeTests.swift new file mode 100644 index 00000000..cd519d76 --- /dev/null +++ b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeTests.swift @@ -0,0 +1,88 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2025 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Foundation +import Logging +import NIOCore +import Synchronization +import Testing + +@testable import AWSLambdaRuntime + +@Suite("LambdaRuntimeTests") +struct LambdaRuntimeTests { + + @Test("LambdaRuntime can only be run once") + func testLambdaRuntimerunOnce() async throws { + + // First runtime + let runtime1 = LambdaRuntime( + handler: MockHandler(), + eventLoop: Lambda.defaultEventLoop, + logger: Logger(label: "LambdaRuntimeTests.Runtime1") + ) + + // Second runtime + let runtime2 = LambdaRuntime( + handler: MockHandler(), + eventLoop: Lambda.defaultEventLoop, + logger: Logger(label: "LambdaRuntimeTests.Runtime2") + ) + + try await withThrowingTaskGroup(of: Void.self) { taskGroup in + // start the first runtime + taskGroup.addTask { + // ChannelError will be thrown when we cancel the task group + await #expect(throws: ChannelError.self) { + try await runtime1.run() + } + } + + // wait a small amount to ensure runtime1 task is started + try await Task.sleep(for: .seconds(1)) + + // Running the second runtime should trigger LambdaRuntimeError + await #expect(throws: LambdaRuntimeError.self) { + try await runtime2.run() + } + + // cancel runtime 1 / task 1 + taskGroup.cancelAll() + } + + // Running the second runtime should work now + try await withThrowingTaskGroup(of: Void.self) { taskGroup in + taskGroup.addTask { + // ChannelError will be thrown when we cancel the task group + await #expect(throws: ChannelError.self) { + try await runtime2.run() + } + } + + // Set timeout and cancel the runtime 2 + try await Task.sleep(for: .seconds(2)) + taskGroup.cancelAll() + } + } +} + +struct MockHandler: StreamingLambdaHandler { + mutating func handle( + _ event: NIOCore.ByteBuffer, + responseWriter: some AWSLambdaRuntime.LambdaResponseStreamWriter, + context: AWSLambdaRuntime.LambdaContext + ) async throws { + + } +}