From c7df10f6cf322df882f57367e7891a7419bd4049 Mon Sep 17 00:00:00 2001 From: "Bob.Brockbernd" Date: Wed, 18 Dec 2024 15:55:06 +0100 Subject: [PATCH 1/5] Thread parking for kotlin/common --- atomicfu/api/atomicfu.api | 8 + atomicfu/build.gradle.kts | 158 +++++------- .../kotlinx/atomicfu/locks/NativeMutexNode.kt | 0 .../atomicfu/locks/PosixParkingDelegator.kt | 62 +++++ .../atomicfu/locks/PosixParkingDelegator.kt | 13 + .../atomicfu/locks/ParkingDelegator.kt | 36 +++ .../kotlinx/atomicfu/locks/ParkingSupport.kt | 84 +++++++ .../kotlinx/atomicfu/locks/ThreadParker.kt | 110 +++++++++ .../kotlinx/atomicfu/locks/BarrierTest.kt | 100 ++++++++ .../atomicfu/locks/CyclicBarrierTest.kt | 117 +++++++++ .../kotlinx/atomicfu/locks/ExchangerTest.kt | 57 +++++ .../kotlinx/atomicfu/locks/LatchTest.kt | 112 +++++++++ .../kotlinx/atomicfu/locks/TestThread.kt | 9 + .../atomicfu/locks/ThreadParkerTest.kt | 49 ++++ .../atomicfu/locks/ThreadParkingStressTest.kt | 117 +++++++++ .../atomicfu/locks/TimeArithmeticTests.kt | 47 ++++ .../atomicfu/locks/TimedParkingTest.kt | 231 ++++++++++++++++++ .../atomicfu/locks/JvmParkingDelegator.kt | 14 ++ .../kotlinx/atomicfu/locks/ParkingSupport.kt | 17 ++ .../kotlinx/atomicfu/locks/TestThread.jvm.kt | 9 + .../atomicfu/locks/PosixParkingDelegator.kt | 12 + .../atomicfu/locks/PosixParkingDelegator.kt | 59 +++++ .../atomicfu/locks/NativeParkingUtils.kt | 18 ++ .../kotlinx/atomicfu/locks/ParkingSupport.kt | 22 ++ .../atomicfu/locks/TestThread.native.kt | 15 ++ .../atomicfu/locks/PosixParkingDelegator.kt | 66 +++++ 26 files changed, 1439 insertions(+), 103 deletions(-) rename atomicfu/src/{androidNative32BitMain => androidNativeMain}/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt (100%) create mode 100644 atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt create mode 100644 atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt create mode 100644 atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ParkingDelegator.kt create mode 100644 atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt create mode 100644 atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ThreadParker.kt create mode 100644 atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/BarrierTest.kt create mode 100644 atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/CyclicBarrierTest.kt create mode 100644 atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ExchangerTest.kt create mode 100644 atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/LatchTest.kt create mode 100644 atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TestThread.kt create mode 100644 atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ThreadParkerTest.kt create mode 100644 atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ThreadParkingStressTest.kt create mode 100644 atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TimeArithmeticTests.kt create mode 100644 atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TimedParkingTest.kt create mode 100644 atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/JvmParkingDelegator.kt create mode 100644 atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt create mode 100644 atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/locks/TestThread.jvm.kt create mode 100644 atomicfu/src/linuxMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt create mode 100644 atomicfu/src/mingwMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt create mode 100644 atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/NativeParkingUtils.kt create mode 100644 atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt create mode 100644 atomicfu/src/nativeTest/kotlin/kotlinx/atomicfu/locks/TestThread.native.kt create mode 100644 atomicfu/src/nativeUnixLikeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt diff --git a/atomicfu/api/atomicfu.api b/atomicfu/api/atomicfu.api index e1428174..a9497bd8 100644 --- a/atomicfu/api/atomicfu.api +++ b/atomicfu/api/atomicfu.api @@ -135,3 +135,11 @@ public final class kotlinx/atomicfu/TraceKt { public static final fun named (Lkotlinx/atomicfu/TraceBase;Ljava/lang/String;)Lkotlinx/atomicfu/TraceBase; } +public final class kotlinx/atomicfu/locks/ParkingSupport { + public static final field INSTANCE Lkotlinx/atomicfu/locks/ParkingSupport; + public final fun currentThreadHandle ()Ljava/lang/Thread; + public final fun park-LRDsOJo (J)V + public final fun parkUntil (Lkotlin/time/TimeMark;)V + public final fun unpark (Ljava/lang/Thread;)V +} + diff --git a/atomicfu/build.gradle.kts b/atomicfu/build.gradle.kts index e2886562..131add18 100644 --- a/atomicfu/build.gradle.kts +++ b/atomicfu/build.gradle.kts @@ -10,6 +10,35 @@ plugins { kotlin { jvmToolchain(8) + + // Tier 1 + macosX64() + macosArm64() + iosSimulatorArm64() + iosX64() + + // Tier 2 + linuxX64() + linuxArm64() + watchosSimulatorArm64() + watchosX64() + watchosArm32() + watchosArm64() + tvosSimulatorArm64() + tvosX64() + tvosArm64() + iosArm64() + + // Tier 3 + androidNativeArm32() + androidNativeArm64() + androidNativeX86() + androidNativeX64() + mingwX64() + watchosDeviceArm64() + + @Suppress("DEPRECATION") //https://github.com/Kotlin/kotlinx-atomicfu/issues/207 + linuxArm32Hfp() // JS -- always js(IR) { @@ -34,6 +63,31 @@ kotlin { nodejs() } + @OptIn(ExperimentalKotlinGradlePluginApi::class) + applyDefaultHierarchyTemplate { + common { + group("jsAndWasmShared") { + withJs() + withWasmJs() + withWasmWasi() + } + group("concurrent") { + withJvm() + group("native") { + group("nativeUnixLike") { + group("apple") { + withApple() + } + group("linux") { + withLinux() + } + } + withMingwX64() + } + } + } + } + sourceSets { commonMain.dependencies { implementation("org.jetbrains.kotlin:kotlin-stdlib") { @@ -46,13 +100,8 @@ kotlin { implementation("org.jetbrains.kotlin:kotlin-test-common") implementation("org.jetbrains.kotlin:kotlin-test-annotations-common") } - - val jsAndWasmSharedMain by creating { - dependsOn(commonMain.get()) - } - + jsMain { - dependsOn(jsAndWasmSharedMain) dependencies { compileOnly("org.jetbrains.kotlin:kotlin-dom-api-compat") } @@ -64,21 +113,12 @@ kotlin { } } - wasmJsMain { - dependsOn(jsAndWasmSharedMain) - } - wasmJsTest { dependencies { implementation("org.jetbrains.kotlin:kotlin-test-wasm-js") } } - - wasmWasiMain { - dependsOn(jsAndWasmSharedMain) - } - wasmWasiTest { dependencies { implementation("org.jetbrains.kotlin:kotlin-test-wasm-wasi") @@ -94,93 +134,6 @@ kotlin { } } } -} - -// Support of all non-deprecated targets from the official tier list: https://kotlinlang.org/docs/native-target-support.html -kotlin { - // Tier 1 - macosX64() - macosArm64() - iosSimulatorArm64() - iosX64() - - // Tier 2 - linuxX64() - linuxArm64() - watchosSimulatorArm64() - watchosX64() - watchosArm32() - watchosArm64() - tvosSimulatorArm64() - tvosX64() - tvosArm64() - iosArm64() - - // Tier 3 - androidNativeArm32() - androidNativeArm64() - androidNativeX86() - androidNativeX64() - mingwX64() - watchosDeviceArm64() - - @Suppress("DEPRECATION") //https://github.com/Kotlin/kotlinx-atomicfu/issues/207 - linuxArm32Hfp() - - @OptIn(ExperimentalKotlinGradlePluginApi::class) - applyDefaultHierarchyTemplate { - group("nativeUnixLike") { - withLinux() - } - group("androidNative32Bit") { - withAndroidNativeX86() - withCompilations { compilation -> - (compilation.target as? KotlinNativeTarget)?.konanTarget?.name == "android_arm32" - } - } - group("androidNative64Bit") { - withAndroidNativeArm64() - withAndroidNativeX64() - } - } - - sourceSets { - val nativeNonAppleMain by creating { - kotlin.srcDir("src/nativeNonAppleMain/kotlin") - dependsOn(nativeMain.get()) - } - - val nativeUnixLikeMain by getting { - kotlin.srcDir("src/nativeUnixLikeMain/kotlin") - dependsOn(nativeNonAppleMain) - } - - val androidNative32BitMain by getting { - kotlin.srcDir("src/androidNative32BitMain/kotlin") - dependsOn(nativeNonAppleMain) - } - - val androidNative64BitMain by getting { - kotlin.srcDir("src/androidNative64BitMain/kotlin") - dependsOn(nativeNonAppleMain) - } - - val mingwMain by getting { - kotlin.srcDir("src/mingwMain/kotlin") - dependsOn(nativeNonAppleMain) - } - - val androidNative32BitTest by getting { - kotlin.srcDir("src/androidNative32BitTest/kotlin") - dependsOn(nativeTest.get()) - } - - val androidNative64BitTest by getting { - kotlin.srcDir("src/androidNative64BitTest/kotlin") - dependsOn(nativeTest.get()) - } - - } // atomicfu-cinterop-interop.klib with an empty interop.def file will still be published for compatibility reasons (see KT-68411) // This block can be removed when this issue in K/N compiler is resolved: KT-60874 @@ -387,4 +340,3 @@ val jvmTest by tasks.getting(Test::class) { ) // run them only for transformed code } - diff --git a/atomicfu/src/androidNative32BitMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt b/atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt similarity index 100% rename from atomicfu/src/androidNative32BitMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt rename to atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt diff --git a/atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt b/atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt new file mode 100644 index 00000000..57de1e50 --- /dev/null +++ b/atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt @@ -0,0 +1,62 @@ +package kotlinx.atomicfu.locks + +import kotlinx.cinterop.* +import kotlinx.cinterop.alloc +import kotlinx.cinterop.free +import kotlinx.cinterop.pointed +import kotlinx.cinterop.ptr +import platform.posix.* + +@OptIn(ExperimentalForeignApi::class, UnsafeNumber::class) +internal actual object ParkingDelegator { + actual fun createRef(): ParkingData { + val mut = nativeHeap.alloc().ptr + val cond = nativeHeap.alloc().ptr + val attr = nativeHeap.alloc().ptr + callAndVerify(0) { pthread_mutex_init(mut, null) } + callAndVerify(0) { pthread_condattr_init(attr) } + callAndVerify(0) { pthread_condattr_setclock(attr, CLOCK_MONOTONIC) } + callAndVerify(0) { pthread_cond_init(cond, attr) } + + callAndVerify(0) { pthread_condattr_destroy(attr) } + nativeHeap.free(attr) + return ParkingData(mut, cond) + } + + actual inline fun wait(ref: ParkingData, shouldWait: () -> Boolean){ + callAndVerify(0) { pthread_mutex_lock(ref.mut) } + if (shouldWait()) callAndVerify(0) { pthread_cond_wait(ref.cond, ref.mut) } + callAndVerify(0) { pthread_mutex_unlock(ref.mut) } + } + + actual inline fun timedWait(ref: ParkingData, nanos: Long, shouldWait: () -> Boolean): Unit = memScoped { + val ts = alloc().ptr + + // Add nanos to current time + callAndVerify(0) { clock_gettime(CLOCK_MONOTONIC.convert(), ts) } + ts.pointed.tv_sec = ts.pointed.tv_sec.addNanosToSeconds(nanos) + ts.pointed.tv_nsec = (ts.pointed.tv_nsec + nanos % 1_000_000_000).convert() + //Fix overflow + if (ts.pointed.tv_nsec >= 1_000_000_000) { + ts.pointed.tv_sec = ts.pointed.tv_sec.addNanosToSeconds(1_000_000_000) + ts.pointed.tv_nsec -= 1_000_000_000 + } + callAndVerify(0) { pthread_mutex_lock(ref.mut) } + if (shouldWait()) callAndVerify(0, ETIMEDOUT) { pthread_cond_timedwait(ref.cond, ref.mut, ts) } + callAndVerify(0) { pthread_mutex_unlock(ref.mut) } + } + + actual fun wake(ref: ParkingData) { + callAndVerify(0) { pthread_mutex_lock(ref.mut) } + callAndVerify(0) { pthread_cond_signal(ref.cond) } + callAndVerify(0) { pthread_mutex_unlock(ref.mut) } + } + + actual fun destroyRef(ref: ParkingData) { + callAndVerify(0) { pthread_mutex_destroy(ref.mut) } + callAndVerify(0) { pthread_cond_destroy(ref.cond) } + nativeHeap.free(ref.mut) + nativeHeap.free(ref.cond) + } +} +internal actual class ParkingData @OptIn(UnsafeNumber::class) constructor(val mut: CPointer, val cond: CPointer) diff --git a/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt b/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt new file mode 100644 index 00000000..eeb1cb41 --- /dev/null +++ b/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt @@ -0,0 +1,13 @@ +package kotlinx.atomicfu.locks + +import kotlinx.cinterop.CPointer +import platform.posix.CLOCK_REALTIME +import platform.posix.* + +// CLOCK_REALTIME should be equal to CLOCK_SYSTEM on darwin. +// https://github.com/apple/darwin-xnu/blob/2ff845c2e033bd0ff64b5b6aa6063a1f8f65aa32/osfmk/mach/clock_types.h#L70-L73 +// Where CLOCK_CALENDAR is the time from epoch. +internal actual val clockId: Int + get() = CLOCK_REALTIME + +actual fun setClock(attr: CPointer): Int = 0 diff --git a/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ParkingDelegator.kt b/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ParkingDelegator.kt new file mode 100644 index 00000000..93a8d6cd --- /dev/null +++ b/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ParkingDelegator.kt @@ -0,0 +1,36 @@ +package kotlinx.atomicfu.locks + +/** + * Object that stores references that need to be manually destroyed and deallocated, + * after native pthread_cond_wait usage. + */ +internal expect class ParkingData + +/** + * Internal utility that delegates the thread suspending and resuming to pthread_cond_wait on native. + * On jvm delegates to LockSupport.Park. + */ +internal expect object ParkingDelegator { + fun createRef(): ParkingData + fun wait(ref: ParkingData, shouldWait: () -> Boolean) + fun timedWait(ref: ParkingData, nanos: Long, shouldWait: () -> Boolean) + fun wake(ref: ParkingData) + fun destroyRef(ref: ParkingData) +} + +/** + * Adds nano seconds to current time in seconds. + * Clamps for Int. + */ +internal fun Int.addNanosToSeconds(nanos: Long): Int = + (this + nanos / 1_000_000_000).coerceIn(Int.MIN_VALUE.toLong(), Int.MAX_VALUE.toLong()).toInt() +internal fun Long.addNanosToSeconds(nanos: Long): Long { + + // Should never happen as this is checked in `ThreadParker` + check(nanos >= 0) { "Cannot wait for a negative number of nanoseconds" } + val result = this + nanos / 1_000_000_000 + + // Overflow check: should never happen since this is very far into the future. + check(!(this xor result < 0 && this >= 0)) { "Nano seconds addition overflowed, current time in seconds is $this" } + return result +} diff --git a/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt b/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt new file mode 100644 index 00000000..1ebb5311 --- /dev/null +++ b/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt @@ -0,0 +1,84 @@ +package kotlinx.atomicfu.locks + +import kotlin.time.Duration +import kotlin.time.TimeMark + +/** + * Parking and unparking support for threads on Kotlin/Native and Kotlin/JVM. + * Can be used as a building block to create locks and other synchronization primitives. + * + * A call to [ParkingSupport.park] or [ParkingSupport.parkUntil] will suspend the current thread. + * A suspended thread will wake up in one of the following four cases: + * - A different thread calls [ParkingSupport.unpark]. + * - The given `timeout` or `deadline` is exceeded. + * - A spurious wakeup + * - (Only on JVM) The thread was interrupted. The interrupted flag stays set after wakeup. + * A future call to [park] this thread will return immediately, unless the `Thread.interrupted` flag is cleared. + * + * The caller is responsible for verifying the reason of wakeup and how to respond accordingly. + * + * Example usage parking thread: + * ```Kotlin + * // publish my parking handle + * handleReference.value = ParkingSupport.currentThreadHandle() + * // wait + * while (state.value == WAIT) { + * ParkingSupport.park(Duration.INFINITE) + * } + * ``` + * + * Example usage unparker thread: + * ```Kotlin + * state.value = WAKE + * ParkingSupport.unpark(handleReference.value) + * ``` + */ +expect object ParkingSupport { + + /** + * Parks the current thread for [timeout] duration. + * + * Wakes up in the following cases: + * - A different thread calls [ParkingSupport.unpark]. + * - The [timeout] is exceeded. + * - A spurious wakeup + * - (Only on JVM) The thread was interrupted. The interrupted flag stays set after wakeup. + * A future call to [park] this thread will return immediately, unless the `Thread.interrupted` flag is cleared. + */ + fun park(timeout: Duration) + + /** + * Parks the current thread until [deadline] is reached. + * + * Wakes up in the following cases: + * - A different thread calls [ParkingSupport.unpark]. + * - The given [deadline] has passed. + * - A spurious wakeup + * - (Only on JVM) The thread was interrupted. The interrupted flag stays set after wakeup. + * A future call to [park] this thread will return immediately, unless the `Thread.interrupted` flag is cleared. + */ + fun parkUntil(deadline: TimeMark) + + /** + * Unparks the thread corresponding to [handle]. + * If [unpark] is called while the corresponding thread is not parked, the next [park] call will return immediately + * — the [ParkingHandle] is unparked ahead of time. + * + * A [ParkingHandle] can only _remember_ one pre-unpark attempt at a time. + * Meaning, when two consecutive [unpark] calls are made while the corresponding thread is not parked, + * only the next park call will return immediately — [unpark] calls are not accumulated. + */ + fun unpark(handle: ParkingHandle) + + /** + * Returns the [ParkingHandle] that can be used to [unpark] the current thread. + */ + fun currentThreadHandle(): ParkingHandle +} + +/** + * Is used to unpark a thread. + * Can be obtained by calling [ParkingSupport.currentThreadHandle]. + * Is required by [ParkingSupport.unpark]. + */ +expect class ParkingHandle \ No newline at end of file diff --git a/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ThreadParker.kt b/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ThreadParker.kt new file mode 100644 index 00000000..9cbb3340 --- /dev/null +++ b/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ThreadParker.kt @@ -0,0 +1,110 @@ +package kotlinx.atomicfu.locks + +import kotlinx.atomicfu.atomic +import kotlin.time.DurationUnit +import kotlin.time.TimeSource + +/** + * Thread parker for Kotlin/Native based on POSIX calls. + * Resides in a shared sourceSet with JVM, to be testable with Lincheck. + * (Which is part of PR #508) + */ +internal class ThreadParker { + private val delegator = ParkingDelegator + private val state = atomic(Free) + + fun park() = parkWith { data -> + delegator.wait(data) { state.value is Parked } + } + + fun parkNanos(nanos: Long) { + val mark = TimeSource.Monotonic.markNow() + parkWith { data -> + val remainingTime = nanos - mark.elapsedNow().toLong(DurationUnit.NANOSECONDS) + if (remainingTime > 0) delegator.timedWait(data, remainingTime) { state.value is Parked } + } + } + + private fun parkWith(invokeWait: (ParkingData) -> Unit) { + while (true) { + when (state.value) { + Free -> { + val pd = delegator.createRef() + // If state changed, cleanup and reiterate. + if (!state.compareAndSet(Free, Parked(pd))) { + delegator.destroyRef(pd) + continue + } + + invokeWait(pd) + + while (true) { + when (val changedState = state.value) { + // If still parked invoke wait. + is Parked -> if (state.compareAndSet(changedState, Free)) { + delegator.destroyRef(pd) + return + } + + // If other thread is unparking return. Let unparking thread deal with cleanup. + is Unparking -> if (state.compareAndSet(changedState, Free)) return + + // Unparking thread is done unparking + Free -> { + delegator.destroyRef(pd) + return + } + // Unparking thread is done unparking (And a concurrent thread pre unparked) + Unparked -> { + delegator.destroyRef(pd) + return + } + } + } + } + // Parker was pre unparked. Set to free and continue. + Unparked -> if (state.compareAndSet(Unparked, Free)) return + + // The states below should only be reachable if parking thread has not yet returned. + is Parked -> throw IllegalStateException("Thread should not be able to call park when it is already parked") + is Unparking -> throw IllegalStateException("Thread should not be able to call park when it is already parked") + } + } + } + + fun unpark() { + val myUnparkingState = Unparking() + while (true) { + when (val currentState = state.value) { + + // Is already unparked + Unparked -> return + is Unparking -> return + + Free -> if (state.compareAndSet(Free, Unparked)) return + + // Is parked -> try unpark + is Parked -> if (state.compareAndSet(currentState, myUnparkingState)) { + delegator.wake(currentState.data) + // state hasn't changed so parker is not awake yet, and responsible for cleanup. + if (state.compareAndSet(myUnparkingState, Free)) return + delegator.destroyRef(currentState.data) + return + } + } + } + } +} + +private interface ParkingState +// The Parker is pre-unparked. The next park call will change state to Free and return immediately. +private object Unparked : ParkingState + +// Starting state. Can be pre unparked or parked from here. A park call will result in Parked state and suspended thread. +private object Free : ParkingState + +// Parker is suspended and can be signaled with data. +private class Parked(val data: ParkingData) : ParkingState + +// An unpark call happened while the parker was parked. In process of unparking. +private class Unparking : ParkingState diff --git a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/BarrierTest.kt b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/BarrierTest.kt new file mode 100644 index 00000000..d1a8c206 --- /dev/null +++ b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/BarrierTest.kt @@ -0,0 +1,100 @@ +package kotlinx.atomicfu.locks + +import kotlinx.atomicfu.AtomicIntArray +import kotlinx.atomicfu.atomic +import kotlinx.atomicfu.atomicArrayOfNulls +import kotlin.random.Random +import kotlin.test.Test +import kotlin.test.fail +import kotlin.time.Duration + +private const val TEST_ITERATIONS = 5 +private const val MAX_RANDOM_WAIT_MILLIS = 5L +private val THREAD_SETS = listOf(2, 5, 7) + +class BarrierTest { + private class Arrs(numberOfThreads: Int) { + val after = AtomicIntArray(numberOfThreads) + val before = AtomicIntArray(numberOfThreads) + init {repeat(numberOfThreads) { + after[it].value = 0 + before[it].value = 0 + }} + } + @Test + fun testBarrier() { + repeat(TEST_ITERATIONS) { iteration -> + THREAD_SETS.forEach { numberOfThreads -> + val barrier = Barrier(numberOfThreads) + val ar = Arrs(numberOfThreads) + val threads = List(numberOfThreads) { myThread -> + Fut { + repeat(numberOfThreads) { otherThread -> + if (otherThread != myThread && ar.after[otherThread].value != 0) { + fail("Thread $myThread arrived too early") + } + } + sleepMillis(Random.nextLong(MAX_RANDOM_WAIT_MILLIS)) + ar.before[myThread].value = 1 + + barrier.await() + + ar.after[myThread].value = 1 + repeat(numberOfThreads) { otherThread -> + if (ar.before[otherThread].value == 0) { + fail("Thread $myThread continued too early: $otherThread had value ${ar.before[otherThread].value}") + } + } + } + } + Fut.waitAllAndThrow(threads) + } + } + } +} + +/** + * Single-use barrier that blocks all participants until they all arrive. + */ +private class Barrier(private val parties: Int) { + init { + require(parties > 1) + } + private val count = atomic(0) + private val waiters = atomicArrayOfNulls(parties - 1) + + fun await() { + val myIndex = count.getAndIncrement() + if (myIndex == parties - 1) { + wakeUpEveryone() + return + } + val currentThread = ParkingSupport.currentThreadHandle() + while (true) { + val waiter = waiters[myIndex].value + when { + waiter === null -> waiters[myIndex].compareAndSet(null, currentThread) + waiter === FINISHED -> return + else -> ParkingSupport.park(Duration.INFINITE) + } + } + } + + private fun wakeUpEveryone() { + for (i in 0.. + val bar = CyclicBarrier(barrierSize) + val syncBar = CyclicBarrier(barrierSize * THREADS_PER_BARRIER_SLOT) + val ar = Arrs(barrierSize * THREADS_PER_BARRIER_SLOT) + val threads = List(barrierSize * THREADS_PER_BARRIER_SLOT) { tId -> + Fut { + repeat(TEST_ITERATIONS) { internalIteration -> + sleepMillis(Random.nextLong(MAX_RANDOM_WAIT_MILLIS)) + bar.await() + sleepMillis(Random.nextLong(MAX_RANDOM_WAIT_MILLIS)) + val newN = ar.before[tId].value + 1 + ar.before[tId].value = newN + syncBar.await() + repeat(ar.before.size) { otherThread -> + if (ar.before[otherThread].value < newN) { + fail("Thread $tId (value: $newN, id: ${ParkingSupport.currentThreadHandle()}) continued too early: $otherThread had value ${ar.before[otherThread].value}") + } + if (ar.before[otherThread].value > newN + 1) { + fail("Thread $tId (value: $newN, id: ${ParkingSupport.currentThreadHandle()}) too far behind: $otherThread had value ${ar.before[otherThread].value}") + } + } + } + } + } + Fut.waitAllAndThrow(threads) + } + } +} + +private class HandleWrapper(val handle: ParkingHandle) { + val woken = atomic(false) +} + +private class CyclicBarrier(private val parties: Int) { + private val queue = MSQueueCyclicBarrier() + + fun await() { + val wrapper = HandleWrapper(ParkingSupport.currentThreadHandle()) + val n = queue.enqueue(wrapper) + if (n % parties == 0L) { + var wokenUp = 0 + while (wokenUp < parties - 1) { + val deq = queue.dequeue() + if (deq == null) fail("Not enough parties enqueued") + if (deq.first % parties == 0L) continue + if (deq.second.woken.compareAndSet(false, true)) { + ParkingSupport.unpark(deq.second.handle) + wokenUp++ + } + } + } else { + while (!wrapper.woken.value) { + ParkingSupport.park(Duration.INFINITE) + } + } + } +} + + +private class MSQueueCyclicBarrier { + private val head = atomic(Node(null, 0)) + private val tail = atomic(head.value) + + fun enqueue(element: E): Long { + while (true) { + val curTail = tail.value + val node = Node(element, curTail.id + 1) + if (curTail.next.compareAndSet(null, node)) { + tail.compareAndSet(curTail, node) + return node.id + } + else tail.compareAndSet(curTail, curTail.next.value!!) + } + } + + fun dequeue(): Pair? { + while (true) { + val currentHead = head.value + val currentHeadNext = currentHead.next.value ?: return null + if (head.compareAndSet(currentHead, currentHeadNext)) { + val element = currentHeadNext.element + currentHeadNext.element = null + val id = currentHeadNext.id + return element?.let { Pair(id, it) } + } + } + } + private class Node(var element: E?, val id: Long) { + val next = atomic?>(null) + } +} \ No newline at end of file diff --git a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ExchangerTest.kt b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ExchangerTest.kt new file mode 100644 index 00000000..a30c2b50 --- /dev/null +++ b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ExchangerTest.kt @@ -0,0 +1,57 @@ +package kotlinx.atomicfu.locks + +import kotlinx.atomicfu.atomic +import kotlin.test.Test +import kotlin.test.assertContentEquals +import kotlin.time.Duration + +private const val N_ITEMS_TO_SWAP = 100_000 + +class ExchangerTest { + + @Test + fun exchangeTwoLists() { + val aBefore = List(N_ITEMS_TO_SWAP) { 0 } + val bBefore = List(N_ITEMS_TO_SWAP) { 1 } + val aAfter = mutableListOf() + val bAfter = mutableListOf() + + val exchanger = Exchanger() + + val at = testThread { + aBefore.forEachIndexed { i, v -> + val item = exchanger.exchange(v) + aAfter.add(item) + } + } + val bt = testThread { + bBefore.forEachIndexed { i, v -> + val item = exchanger.exchange(v) + bAfter.add(item) + } + } + at.join() + bt.join() + assertContentEquals(aBefore, bAfter) + assertContentEquals(bBefore, aAfter) + } +} + +internal class Exchanger { + private val slot = atomic?>(null) + fun exchange(item: T): T { + val myPair = Pair(ParkingSupport.currentThreadHandle(), item) + if (slot.compareAndSet(null, myPair)) { + while (slot.value == myPair) ParkingSupport.park(Duration.INFINITE) + val waiterPair = slot.value!! + slot.value = null + ParkingSupport.unpark(waiterPair.first) + return waiterPair.second + } else { + val waiterPair = slot.getAndSet(myPair) + ParkingSupport.unpark(waiterPair!!.first) + while (slot.value == myPair) ParkingSupport.park(Duration.INFINITE) + return waiterPair.second + } + } +} \ No newline at end of file diff --git a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/LatchTest.kt b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/LatchTest.kt new file mode 100644 index 00000000..9eaa7624 --- /dev/null +++ b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/LatchTest.kt @@ -0,0 +1,112 @@ +package kotlinx.atomicfu.locks + +import kotlinx.atomicfu.AtomicIntArray +import kotlinx.atomicfu.atomic +import kotlin.random.Random +import kotlin.test.Test +import kotlin.test.fail +import kotlin.time.Duration + +private const val TEST_ITERATIONS = 5 +private const val MAX_RANDOM_WAIT_MILLIS = 5L +private val THREAD_SETS = listOf(2, 5, 7) + +class LatchTest { + private class Arrs(numberOfThreads: Int) { + val after = AtomicIntArray(numberOfThreads) + val before = AtomicIntArray(numberOfThreads) + init {repeat(numberOfThreads) { + after[it].value = 0 + before[it].value = 0 + }} + } + + @Test + fun latchTest() { + repeat(TEST_ITERATIONS) { iteration -> + THREAD_SETS.forEach { numberOfThreads -> + val countingDownTo = iteration + 2 + val ar = Arrs(numberOfThreads) + val latch = CustomCountDownLatch(countingDownTo) + val countingThread = Fut { + repeat(countingDownTo) { + sleepMillis(Random.nextLong(MAX_RANDOM_WAIT_MILLIS)) + + repeat(ar.after.size) { threadToCheck -> + if (ar.after[threadToCheck].value != 0) fail("Thread passed latch too early") + } + + latch.countDown() + } + } + + val waiters = List(numberOfThreads) { i -> + Fut { + sleepMillis(Random.nextLong(MAX_RANDOM_WAIT_MILLIS)) + latch.await() + ar.after[i].value = 1 + } + } + + Fut.waitAllAndThrow(waiters + countingThread) + + repeat(ar.after.size) { threadToCheck -> + if (ar.after[threadToCheck].value != 1) fail("Thread $threadToCheck stuck") + } + } + } + } +} + +class CustomCountDownLatch(count: Int) { + private val c = atomic(count) + private val waiters = MSQueueLatch() + + fun await() { + val thread = ParkingSupport.currentThreadHandle() + waiters.enqueue(thread) + while (c.value > 0) ParkingSupport.park(Duration.INFINITE) + } + + fun countDown() { + val myIndex = c.decrementAndGet() + if (myIndex != 0) return + while (true) { + val thread = waiters.dequeue() + if (thread == null) return + ParkingSupport.unpark(thread) + } + } +} + +private class MSQueueLatch { + private val head = atomic(Node(null)) + private val tail = atomic(head.value) + + fun enqueue(element: E) { + while (true) { + val node = Node(element) + val curTail = tail.value + if (curTail.next.compareAndSet(null, node)) { + tail.compareAndSet(curTail, node) + return + } + else tail.compareAndSet(curTail, curTail.next.value!!) + } + } + + fun dequeue(): E? { + while (true) { + val currentHead = head.value + val currentHeadNext = currentHead.next.value ?: return null + if (head.compareAndSet(currentHead, currentHeadNext)) { + val element = currentHeadNext.element + currentHeadNext.element = null + return element + } + } + } + private class Node(var element: E?) { + val next = atomic?>(null) + } +} diff --git a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TestThread.kt b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TestThread.kt new file mode 100644 index 00000000..a66ffa5e --- /dev/null +++ b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TestThread.kt @@ -0,0 +1,9 @@ +package kotlinx.atomicfu.locks + +internal fun testThread(doConcurrent: () -> Unit): TestThread = TestThread(doConcurrent) + +internal expect class TestThread(toDo: () -> Unit) { + fun join() +} + +expect fun sleepMillis(millis: Long) \ No newline at end of file diff --git a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ThreadParkerTest.kt b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ThreadParkerTest.kt new file mode 100644 index 00000000..39424d94 --- /dev/null +++ b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ThreadParkerTest.kt @@ -0,0 +1,49 @@ +package kotlinx.atomicfu.locks + +import kotlinx.atomicfu.atomic +import kotlin.test.Test +import kotlin.time.Duration + +class ThreadParkerTest { + val atomicHandle = atomic(null) + val isPreUnparked = atomic(false) + + @Test + fun parkUnpark() { + var parkingHandle: ParkingHandle? = null + + val f = Fut { + parkingHandle = ParkingSupport.currentThreadHandle() + ParkingSupport.park(Duration.INFINITE) + } + + // Allow thread to be parked before unpark call + sleepMillis(100) + ParkingSupport.unpark(parkingHandle!!) + + f.waitThrowing() + } + + @Test + fun unparkPark() { + + val f = Fut { + atomicHandle.value = ParkingSupport.currentThreadHandle() + + while (!isPreUnparked.value) { + sleepMillis(10) + } + + ParkingSupport.park(Duration.INFINITE) + } + + while (atomicHandle.value == null) { + sleepMillis(10) + } + + ParkingSupport.unpark(atomicHandle.value!!) + isPreUnparked.value = true + + f.waitThrowing() + } +} \ No newline at end of file diff --git a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ThreadParkingStressTest.kt b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ThreadParkingStressTest.kt new file mode 100644 index 00000000..c3075e4c --- /dev/null +++ b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ThreadParkingStressTest.kt @@ -0,0 +1,117 @@ +package kotlinx.atomicfu.locks + +import kotlinx.atomicfu.atomic +import kotlin.random.Random +import kotlin.test.Test +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.measureTime + +private const val NUMBER_OF_PARKS = 1000 +private const val DURATION_MILLIS = 5L + +class ThreadParkingStressTest { + private class Atomics { + val handle = atomic(null) + val done = atomic(false) + } + @Test + fun parkingStress() { + val duration = measureTime { + val a = Atomics() + + val thread1 = testThread { + a.handle.value = ParkingSupport.currentThreadHandle() + repeat(NUMBER_OF_PARKS) { i -> + if (Random.nextBoolean()) { + sleepMillis(Random.nextLong(DURATION_MILLIS)) + ParkingSupport.park(Duration.INFINITE) + } else { + ParkingSupport.park(Random.nextLong(DURATION_MILLIS).milliseconds) + } + } + a.done.value = true + } + + val thread2 = testThread { + while (a.done.value) { + sleepMillis(Random.nextLong(DURATION_MILLIS)) + a.handle.value?.let { ParkingSupport.unpark(it) } + } + } + + val thread3 = testThread { + while (!a.done.value) { + sleepMillis(Random.nextLong(DURATION_MILLIS)) + a.handle.value?.let { ParkingSupport.unpark(it) } + } + } + + thread1.join() + thread2.join() + thread3.join() + } + println(duration) + } + + @Test + fun testPublicApi() { + val duration = measureTime { + val a0 = Atomics() + val a1 = Atomics() + + val thread0 = testThread { + a0.handle.value = ParkingSupport.currentThreadHandle() + repeat(NUMBER_OF_PARKS) { i -> + if (Random.nextBoolean()) { + sleepMillis(Random.nextLong(DURATION_MILLIS)) + ParkingSupport.park(Duration.INFINITE) + } else { + ParkingSupport.park(Random.nextLong(DURATION_MILLIS).milliseconds) + } + } + a0.done.value = true + } + + val thread1 = testThread { + a1.handle.value = ParkingSupport.currentThreadHandle() + repeat(NUMBER_OF_PARKS) { i -> + if (Random.nextBoolean()) { + sleepMillis(Random.nextLong(DURATION_MILLIS)) + ParkingSupport.park(Duration.INFINITE) + } else { + ParkingSupport.park(Random.nextLong(DURATION_MILLIS).milliseconds) + } + } + a1.done.value = true + } + + val thread2 = testThread { + while (!a0.done.value || !a1.done.value) { + sleepMillis(Random.nextLong(DURATION_MILLIS)) + if (Random.nextBoolean()) { + a0.handle.value?.let { ParkingSupport.unpark(it) } + } else { + a1.handle.value?.let { ParkingSupport.unpark(it) } + } + } + } + + val thread3 = testThread { + while (!a0.done.value || !a1.done.value) { + sleepMillis(Random.nextLong(DURATION_MILLIS)) + if (Random.nextBoolean()) { + a0.handle.value?.let { ParkingSupport.unpark(it) } + } else { + a1.handle.value?.let { ParkingSupport.unpark(it) } + } + } + } + thread0.join() + thread1.join() + thread2.join() + thread3.join() + } + println(duration) + } +} \ No newline at end of file diff --git a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TimeArithmeticTests.kt b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TimeArithmeticTests.kt new file mode 100644 index 00000000..f219df30 --- /dev/null +++ b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TimeArithmeticTests.kt @@ -0,0 +1,47 @@ +package kotlinx.atomicfu.locks + +import kotlin.math.absoluteValue +import kotlin.random.Random +import kotlin.test.Test +import kotlin.test.assertTrue + +class TimeArithmeticTests { + + + + @Test + fun timeArithmeticTest() { + val currentTimes = listOf( + 31_560_000, // one year in seconds + 31_560_000 * 5, + 31_560_000 * 25, + 31_560_000 * 60, + 31_560_000 * 100, + 31_560_000 * 500, + 31_560_000 * 1000, + 31_560_000 * 10000, + ) + + currentTimes.forEach { currentTimeInSeconds -> + + // Test Long + repeat(1000) { + val nanos = Random.nextLong().absoluteValue + val updatedTime = currentTimeInSeconds.addNanosToSeconds(nanos) + assertTrue { updatedTime - currentTimeInSeconds == nanos / 1_000_000_000 } + } + + // Test Int + repeat(1000) { + val currentTimeInInt = currentTimeInSeconds.toInt() + val nanos = Random.nextLong().absoluteValue + val updatedTime = currentTimeInInt.addNanosToSeconds(nanos) + if (nanos > 0) assertTrue { + updatedTime.toLong() - currentTimeInInt == nanos / 1_000_000_000 || updatedTime == Int.MAX_VALUE + } + } + } + } + + +} \ No newline at end of file diff --git a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TimedParkingTest.kt b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TimedParkingTest.kt new file mode 100644 index 00000000..6baefa5d --- /dev/null +++ b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TimedParkingTest.kt @@ -0,0 +1,231 @@ +package kotlinx.atomicfu.locks + +import kotlinx.atomicfu.atomic +import kotlin.test.Test +import kotlin.test.assertTrue +import kotlin.time.measureTime +import kotlin.IllegalStateException +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.nanoseconds + +class TimedParkingTest { + + @Test + fun testNanosFirstUnpark400() = retry(3) { + var handle1: ParkingHandle? = null + + val thread1 = Fut { + handle1 = ParkingSupport.currentThreadHandle() + val t = measureTime { + ParkingSupport.park(600.milliseconds) + } + assertTrue(t.inWholeMilliseconds > 300) + assertTrue(t.inWholeMilliseconds < 500) + } + + sleepMillis(400) + ParkingSupport.unpark(handle1!!) + + thread1.waitThrowing() + } + + @Test + fun testNanosFirstUnpark700() = retry(3) { + var handle1: ParkingHandle? = null + + val thread1 = Fut { + handle1 = ParkingSupport.currentThreadHandle() + val t = measureTime { + ParkingSupport.park(900.milliseconds) + } + assertTrue(t.inWholeMilliseconds > 600) + assertTrue(t.inWholeMilliseconds < 800) + } + + sleepMillis(700) + ParkingSupport.unpark(handle1!!) + + thread1.waitThrowing() + } + + @Test + fun testNanosFirstUnpark1000() = retry(3) { + var handle1: ParkingHandle? = null + + val thread1 = Fut { + handle1 = ParkingSupport.currentThreadHandle() + val t = measureTime { + ParkingSupport.park(1200.milliseconds) + } + assertTrue(t.inWholeMilliseconds > 900) + assertTrue(t.inWholeMilliseconds < 1100) + } + + sleepMillis(1000) + ParkingSupport.unpark(handle1!!) + + thread1.waitThrowing() + } + + @Test + fun testNanosFirstUnparkLongMax() = retry(3) { + var handle1: ParkingHandle? = null + + val thread1 = Fut { + handle1 = ParkingSupport.currentThreadHandle() + val t = measureTime { + ParkingSupport.park(Long.MAX_VALUE.nanoseconds) + } + assertTrue(t.inWholeMilliseconds > 900) + assertTrue(t.inWholeMilliseconds < 1100) + } + + sleepMillis(1000) + ParkingSupport.unpark(handle1!!) + + thread1.waitThrowing() + } + + @Test + fun testNanosFirstUnparkIntMax() = retry(3) { + var handle1: ParkingHandle? = null + + val thread1 = Fut { + handle1 = ParkingSupport.currentThreadHandle() + val t = measureTime { + ParkingSupport.park(Int.MAX_VALUE.toLong().nanoseconds) + } + assertTrue(t.inWholeMilliseconds > 900) + assertTrue(t.inWholeMilliseconds < 1100) + } + + sleepMillis(1000) + ParkingSupport.unpark(handle1!!) + + thread1.waitThrowing() + } + + @Test + fun testNanosFirstUnpark3rdLong() = retry(3) { + var handle1: ParkingHandle? = null + + val thread1 = Fut { + handle1 = ParkingSupport.currentThreadHandle() + val t = measureTime { + ParkingSupport.park((Long.MAX_VALUE / 3).nanoseconds) + } + assertTrue(t.inWholeMilliseconds > 900) + assertTrue(t.inWholeMilliseconds < 1100) + } + + sleepMillis(1000) + ParkingSupport.unpark(handle1!!) + + thread1.waitThrowing() + } + + @Test + fun testNanosFirstDeadline400() = retry(3) { + var handle1: ParkingHandle? = null + + val thread1 = Fut { + handle1 = ParkingSupport.currentThreadHandle() + val t = measureTime { + ParkingSupport.park(400.milliseconds) + } + assertTrue(t.inWholeMilliseconds > 300) + assertTrue(t.inWholeMilliseconds < 500) + } + + sleepMillis(600) + ParkingSupport.unpark(handle1!!) + + thread1.waitThrowing() + } + + @Test + fun testNanosFirstDeadline700() = retry(3) { + var handle1: ParkingHandle? = null + + val thread1 = Fut { + handle1 = ParkingSupport.currentThreadHandle() + val t = measureTime { + ParkingSupport.park(700.milliseconds) + } + assertTrue(t.inWholeMilliseconds > 600) + assertTrue(t.inWholeMilliseconds < 800) + } + + sleepMillis(900) + ParkingSupport.unpark(handle1!!) + + thread1.waitThrowing() + } + + @Test + fun testNanosFirstDeadline1200() = retry(3) { + var handle1: ParkingHandle? = null + + val thread1 = Fut { + handle1 = ParkingSupport.currentThreadHandle() + val t = measureTime { + ParkingSupport.park(1000.milliseconds) + } + assertTrue(t.inWholeMilliseconds > 900) + assertTrue(t.inWholeMilliseconds < 1100) + } + + sleepMillis(1200) + ParkingSupport.unpark(handle1!!) + + thread1.waitThrowing() + } + + private fun retry(times: Int, block: () -> Unit): Unit { + var lastThrowable: Throwable? = null + repeat(times) { + try { + return block() + } catch (t: Throwable) { + lastThrowable = t + } + } + lastThrowable?.let { + throw IllegalStateException("Failed after $times retries").apply { addSuppressed(it) } + } + } +} + + +internal class Fut(block: () -> Unit) { + private val done = atomic(false) + private val atomicError = atomic(null) + private val thread: TestThread = testThread { + try { + block() + } catch (t: Throwable) { + atomicError.value = t + throw t + } finally { + done.value = true + } + } + + fun waitThrowing() { + thread.join() + throwIfError() + } + + private fun throwIfError() = atomicError.value?.let { throw it } + + companion object { + fun waitAllAndThrow(futs: List) { + var remainingFuts = futs + while (remainingFuts.isNotEmpty()) { + remainingFuts.forEach { it.throwIfError() } + remainingFuts = remainingFuts.filter { !it.done.value } + sleepMillis(50) + } + } + } +} diff --git a/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/JvmParkingDelegator.kt b/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/JvmParkingDelegator.kt new file mode 100644 index 00000000..5ef5dbac --- /dev/null +++ b/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/JvmParkingDelegator.kt @@ -0,0 +1,14 @@ +package kotlinx.atomicfu.locks +import java.util.concurrent.locks.LockSupport + +internal actual object ParkingDelegator { + + actual fun createRef(): ParkingData = Thread.currentThread() + actual fun wait(ref: ParkingData, shouldWait: () -> Boolean) = LockSupport.park() + actual fun timedWait(ref: ParkingData, nanos: Long, shouldWait: () -> Boolean) = LockSupport.parkNanos(nanos) + actual fun wake(ref: ParkingData) = LockSupport.unpark(ref) + actual fun destroyRef(ref: ParkingData) {} + +} + +internal actual typealias ParkingData = Thread \ No newline at end of file diff --git a/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt b/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt new file mode 100644 index 00000000..fe50be64 --- /dev/null +++ b/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt @@ -0,0 +1,17 @@ +package kotlinx.atomicfu.locks + +import java.util.concurrent.locks.LockSupport +import kotlin.time.Duration +import kotlin.time.DurationUnit +import kotlin.time.TimeMark + +actual typealias ParkingHandle = Thread +actual object ParkingSupport { + actual fun park(timeout: Duration) { + if (timeout == Duration.INFINITE) LockSupport.park() + else LockSupport.parkNanos(timeout.toLong(DurationUnit.NANOSECONDS)) + } + actual fun parkUntil(deadline: TimeMark) = park(deadline.elapsedNow() * -1) + actual fun unpark(handle: ParkingHandle) = LockSupport.unpark(handle) + actual fun currentThreadHandle(): ParkingHandle = Thread.currentThread() +} \ No newline at end of file diff --git a/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/locks/TestThread.jvm.kt b/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/locks/TestThread.jvm.kt new file mode 100644 index 00000000..e1b3d2b9 --- /dev/null +++ b/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/locks/TestThread.jvm.kt @@ -0,0 +1,9 @@ +package kotlinx.atomicfu.locks +import kotlin.concurrent.thread + +internal actual class TestThread actual constructor(toDo: () -> Unit) { + private val th = thread { toDo() } + actual fun join() = th.join() +} + +actual fun sleepMillis(millis: Long) = Thread.sleep(millis) \ No newline at end of file diff --git a/atomicfu/src/linuxMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt b/atomicfu/src/linuxMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt new file mode 100644 index 00000000..feeb1eb7 --- /dev/null +++ b/atomicfu/src/linuxMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt @@ -0,0 +1,12 @@ +package kotlinx.atomicfu.locks + +import kotlinx.cinterop.CPointer +import platform.posix.* + +internal actual val clockId: Int + get() = CLOCK_MONOTONIC + +// Sets monotonic clock to prevent time updates from interfering with waiting durations. +internal actual fun setClock(attr: CPointer): Int = + pthread_condattr_setclock(attr, CLOCK_MONOTONIC) + diff --git a/atomicfu/src/mingwMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt b/atomicfu/src/mingwMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt new file mode 100644 index 00000000..088a4bc8 --- /dev/null +++ b/atomicfu/src/mingwMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt @@ -0,0 +1,59 @@ +package kotlinx.atomicfu.locks + +import kotlinx.cinterop.* +import kotlinx.cinterop.alloc +import kotlinx.cinterop.pointed +import kotlinx.cinterop.ptr +import platform.posix.* + +@OptIn(ExperimentalForeignApi::class) +internal actual object ParkingDelegator { + actual fun createRef(): ParkingData { + val mut = nativeHeap.alloc().ptr + val cond = nativeHeap.alloc().ptr + callAndVerify(0) { pthread_mutex_init(mut, null) } + callAndVerify(0) { pthread_cond_init(cond, null) } + return ParkingData(mut, cond) + } + + actual inline fun wait(ref: ParkingData, shouldWait: () -> Boolean) { + callAndVerify(0) { pthread_mutex_lock(ref.mut) } + if (shouldWait()) callAndVerify(0) { pthread_cond_wait(ref.cond, ref.mut) } + callAndVerify(0) { pthread_mutex_unlock(ref.mut) } + } + + actual inline fun timedWait(ref: ParkingData, nanos: Long, shouldWait: () -> Boolean): Unit = memScoped { + val ts = alloc().ptr + + // Add nanos to current time + callAndVerify(0) { clock_gettime(CLOCK_REALTIME, ts) } + // According to https://learn.microsoft.com/en-us/windows/win32/api/minwinbase/ns-minwinbase-systemtime + // the maximum year on windows is 30827. + // Adding Long.MAX_VALUE / 1_000_000_000 should not be able to overflow. + ts.pointed.tv_sec += nanos / 1_000_000_000 + ts.pointed.tv_nsec += (nanos % 1_000_000_000).toInt() + + //Fix overflow + if (ts.pointed.tv_nsec >= 1_000_000_000) { + ts.pointed.tv_sec += 1 + ts.pointed.tv_nsec -= 1_000_000_000 + } + callAndVerify(0) { pthread_mutex_lock(ref.mut) } + if (shouldWait()) callAndVerify(0, ETIMEDOUT) { pthread_cond_timedwait(ref.cond, ref.mut, ts) } + callAndVerify(0) { pthread_mutex_unlock(ref.mut) } + } + + actual fun wake(ref: ParkingData) { + callAndVerify(0) { pthread_mutex_lock(ref.mut) } + callAndVerify(0) { pthread_cond_signal(ref.cond) } + callAndVerify(0) { pthread_mutex_unlock(ref.mut) } + } + + actual fun destroyRef(ref: ParkingData) { + callAndVerify(0) { pthread_mutex_destroy(ref.mut) } + callAndVerify(0) { pthread_cond_destroy(ref.cond) } + nativeHeap.free(ref.mut) + nativeHeap.free(ref.cond) + } +} +internal actual class ParkingData(val mut: CPointer, val cond: CPointer) diff --git a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/NativeParkingUtils.kt b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/NativeParkingUtils.kt new file mode 100644 index 00000000..2aaa71a4 --- /dev/null +++ b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/NativeParkingUtils.kt @@ -0,0 +1,18 @@ +package kotlinx.atomicfu.locks + +import platform.posix.errno + +internal inline fun callAndVerify(expectedReturn: Int, block: () -> Int) = block().also { + check(it == expectedReturn) { + errorString(it, expectedReturn) + } + } + +internal inline fun callAndVerify(firstExpectedReturn: Int, secondExpectedReturn: Int, block: () -> Int) = block().also { + check(it == firstExpectedReturn || it == secondExpectedReturn) { + errorString(it, firstExpectedReturn, secondExpectedReturn) + } + } + +private fun errorString(actualValue: Int, vararg expectedReturn: Int) = + "Calling native, expected one return status of ${expectedReturn.joinToString(", ")}, but was $actualValue. With errno: $errno" \ No newline at end of file diff --git a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt new file mode 100644 index 00000000..d966d553 --- /dev/null +++ b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt @@ -0,0 +1,22 @@ +package kotlinx.atomicfu.locks + +import kotlin.time.Duration +import kotlin.time.DurationUnit +import kotlin.time.TimeMark + +@kotlin.native.concurrent.ThreadLocal +private val threadLocalParkingHandle = ParkingHandle() + +actual class ParkingHandle internal constructor() { + internal val parker: ThreadParker = ThreadParker() +} + +actual object ParkingSupport { + actual fun park(timeout: Duration) { + if (timeout == Duration.INFINITE) threadLocalParkingHandle.parker.park() + else threadLocalParkingHandle.parker.parkNanos(timeout.toLong(DurationUnit.NANOSECONDS)) + } + actual fun parkUntil(deadline: TimeMark) = park(deadline.elapsedNow() * -1) + actual fun unpark(handle: ParkingHandle) = handle.parker.unpark() + actual fun currentThreadHandle(): ParkingHandle = threadLocalParkingHandle +} diff --git a/atomicfu/src/nativeTest/kotlin/kotlinx/atomicfu/locks/TestThread.native.kt b/atomicfu/src/nativeTest/kotlin/kotlinx/atomicfu/locks/TestThread.native.kt new file mode 100644 index 00000000..554785b5 --- /dev/null +++ b/atomicfu/src/nativeTest/kotlin/kotlinx/atomicfu/locks/TestThread.native.kt @@ -0,0 +1,15 @@ +package kotlinx.atomicfu.locks + +import platform.posix.usleep +import kotlin.native.concurrent.Future +import kotlin.native.concurrent.TransferMode +import kotlin.native.concurrent.Worker + +internal actual class TestThread actual constructor(toDo: () -> Unit) { + private val future: Future = Worker.start().execute(TransferMode.UNSAFE, { toDo }) { toDo -> toDo() } + actual fun join() = future.result +} + +actual fun sleepMillis(millis: Long) { + usleep(millis.toUInt() * 1000u) +} \ No newline at end of file diff --git a/atomicfu/src/nativeUnixLikeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt b/atomicfu/src/nativeUnixLikeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt new file mode 100644 index 00000000..80560858 --- /dev/null +++ b/atomicfu/src/nativeUnixLikeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt @@ -0,0 +1,66 @@ +package kotlinx.atomicfu.locks + +import kotlinx.cinterop.* +import kotlinx.cinterop.alloc +import kotlinx.cinterop.free +import kotlinx.cinterop.pointed +import kotlinx.cinterop.ptr +import platform.posix.* + +@OptIn(ExperimentalForeignApi::class, UnsafeNumber::class) +internal actual object ParkingDelegator { + actual fun createRef(): ParkingData { + val mut = nativeHeap.alloc().ptr + val cond = nativeHeap.alloc().ptr + val attr = nativeHeap.alloc().ptr + callAndVerify(0) { pthread_mutex_init(mut, null) } + callAndVerify(0) { pthread_condattr_init(attr) } + callAndVerify(0) { setClock(attr) } + callAndVerify(0) { pthread_cond_init(cond, attr) } + + callAndVerify(0) { pthread_condattr_destroy(attr) } + nativeHeap.free(attr) + return ParkingData(mut, cond) + } + + actual inline fun wait(ref: ParkingData, shouldWait: () -> Boolean){ + callAndVerify(0) { pthread_mutex_lock(ref.mut) } + if (shouldWait()) callAndVerify(0) { pthread_cond_wait(ref.cond, ref.mut) } + callAndVerify(0) { pthread_mutex_unlock(ref.mut) } + } + + actual inline fun timedWait(ref: ParkingData, nanos: Long, shouldWait: () -> Boolean): Unit = memScoped { + val ts = alloc().ptr + + // Add nanos to current time + callAndVerify(0) { clock_gettime(clockId.convert(), ts) } + ts.pointed.tv_sec = ts.pointed.tv_sec.addNanosToSeconds(nanos) + ts.pointed.tv_nsec = (ts.pointed.tv_nsec + nanos % 1_000_000_000).convert() + //Fix overflow + if (ts.pointed.tv_nsec >= 1_000_000_000) { + ts.pointed.tv_sec = ts.pointed.tv_sec.addNanosToSeconds(1_000_000_000) + ts.pointed.tv_nsec -= 1_000_000_000 + } + callAndVerify(0) { pthread_mutex_lock(ref.mut) } + if (shouldWait()) callAndVerify(0, ETIMEDOUT) { pthread_cond_timedwait(ref.cond, ref.mut, ts) } + callAndVerify(0) { pthread_mutex_unlock(ref.mut) } + } + + actual fun wake(ref: ParkingData) { + callAndVerify(0) { pthread_mutex_lock(ref.mut) } + callAndVerify(0) { pthread_cond_signal(ref.cond) } + callAndVerify(0) { pthread_mutex_unlock(ref.mut) } + } + + actual fun destroyRef(ref: ParkingData) { + callAndVerify(0) { pthread_mutex_destroy(ref.mut) } + callAndVerify(0) { pthread_cond_destroy(ref.cond) } + nativeHeap.free(ref.mut) + nativeHeap.free(ref.cond) + } +} + +internal actual class ParkingData(val mut: CPointer, val cond: CPointer) + +internal expect val clockId: Int +internal expect fun setClock(attr: CPointer): Int \ No newline at end of file From 0d9ea0732e0c77b11e05f1184a79699e4babe3a1 Mon Sep 17 00:00:00 2001 From: "Bob.Brockbernd" Date: Tue, 29 Apr 2025 13:54:15 +0200 Subject: [PATCH 2/5] Fix after rebase --- .../kotlinx/atomicfu/locks/NativeMutexNode.kt | 50 ------------------- .../kotlinx/atomicfu/locks/NativeMutexNode.kt | 8 ++- .../kotlinx/atomicfu/locks/NativeMutexNode.kt | 5 ++ .../kotlinx/atomicfu/locks/NativeMutexNode.kt | 5 ++ .../kotlin/kotlinx/atomicfu/locks/ThreadId.kt | 7 --- 5 files changed, 17 insertions(+), 58 deletions(-) delete mode 100644 atomicfu/src/androidNative64BitMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt rename atomicfu/src/{nativeUnixLikeMain => linuxMain}/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt (90%) delete mode 100644 atomicfu/src/nativeNonAppleMain/kotlin/kotlinx/atomicfu/locks/ThreadId.kt diff --git a/atomicfu/src/androidNative64BitMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt b/atomicfu/src/androidNative64BitMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt deleted file mode 100644 index b1cb9d6a..00000000 --- a/atomicfu/src/androidNative64BitMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2025 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ -package kotlinx.atomicfu.locks - -import kotlinx.cinterop.Arena -import kotlinx.cinterop.ExperimentalForeignApi -import kotlinx.cinterop.alloc -import kotlinx.cinterop.ptr -import platform.posix.* - -@OptIn(ExperimentalForeignApi::class) -actual class NativeMutexNode { - actual var next: NativeMutexNode? = null - - private val arena: Arena = Arena() - private val cond: pthread_cond_t = arena.alloc() - private val mutex: pthread_mutex_t = arena.alloc() - private val attr: pthread_mutexattr_tVar = arena.alloc() - - init { - require(pthread_cond_init(cond.ptr, null) == 0) - require(pthread_mutexattr_init(attr.ptr) == 0) - require(pthread_mutexattr_settype(attr.ptr, PTHREAD_MUTEX_ERRORCHECK.toInt()) == 0) - require(pthread_mutex_init(mutex.ptr, attr.ptr) == 0) - } - - actual fun lock() { - pthread_mutex_lock(mutex.ptr) - } - - actual fun unlock() { - pthread_mutex_unlock(mutex.ptr) - } - - internal actual fun wait(lockOwner: Long) { - pthread_cond_wait(cond.ptr, mutex.ptr) - } - - internal actual fun notify() { - pthread_cond_signal(cond.ptr) - } - - internal actual fun dispose() { - pthread_cond_destroy(cond.ptr) - pthread_mutex_destroy(mutex.ptr) - pthread_mutexattr_destroy(attr.ptr) - arena.clear() - } -} diff --git a/atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt b/atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt index a3ba4125..25a01442 100644 --- a/atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt +++ b/atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt @@ -3,13 +3,15 @@ */ package kotlinx.atomicfu.locks +import kotlinx.atomicfu.atomic import kotlinx.cinterop.Arena import kotlinx.cinterop.ExperimentalForeignApi +import kotlinx.cinterop.UnsafeNumber import kotlinx.cinterop.alloc import kotlinx.cinterop.ptr import platform.posix.* -@OptIn(ExperimentalForeignApi::class) +@OptIn(ExperimentalForeignApi::class, UnsafeNumber::class) actual class NativeMutexNode { actual var next: NativeMutexNode? = null @@ -49,3 +51,7 @@ actual class NativeMutexNode { arena.clear() } } + +private val threadCounter = atomic(0L) + +actual fun createThreadId(): Long = threadCounter.incrementAndGet() diff --git a/atomicfu/src/nativeUnixLikeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt b/atomicfu/src/linuxMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt similarity index 90% rename from atomicfu/src/nativeUnixLikeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt rename to atomicfu/src/linuxMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt index d40da0d2..b13bce3b 100644 --- a/atomicfu/src/nativeUnixLikeMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt +++ b/atomicfu/src/linuxMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt @@ -3,6 +3,7 @@ */ package kotlinx.atomicfu.locks +import kotlinx.atomicfu.atomic import kotlinx.cinterop.Arena import kotlinx.cinterop.ExperimentalForeignApi import kotlinx.cinterop.alloc @@ -48,3 +49,7 @@ actual class NativeMutexNode { arena.clear() } } + +private val threadCounter = atomic(0L) + +internal actual fun createThreadId(): Long = threadCounter.incrementAndGet() diff --git a/atomicfu/src/mingwMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt b/atomicfu/src/mingwMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt index 6bc7d7fb..591034ea 100644 --- a/atomicfu/src/mingwMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt +++ b/atomicfu/src/mingwMain/kotlin/kotlinx/atomicfu/locks/NativeMutexNode.kt @@ -3,6 +3,7 @@ */ package kotlinx.atomicfu.locks +import kotlinx.atomicfu.atomic import kotlinx.cinterop.Arena import kotlinx.cinterop.ExperimentalForeignApi import kotlinx.cinterop.alloc @@ -48,3 +49,7 @@ actual class NativeMutexNode { arena.clear() } } + +private val threadCounter = atomic(0L) + +actual fun createThreadId(): Long = threadCounter.incrementAndGet() \ No newline at end of file diff --git a/atomicfu/src/nativeNonAppleMain/kotlin/kotlinx/atomicfu/locks/ThreadId.kt b/atomicfu/src/nativeNonAppleMain/kotlin/kotlinx/atomicfu/locks/ThreadId.kt deleted file mode 100644 index 59bc1441..00000000 --- a/atomicfu/src/nativeNonAppleMain/kotlin/kotlinx/atomicfu/locks/ThreadId.kt +++ /dev/null @@ -1,7 +0,0 @@ -package kotlinx.atomicfu.locks - -import kotlinx.atomicfu.atomic - -private val threadCounter = atomic(0L) - -internal actual fun createThreadId(): Long = threadCounter.incrementAndGet() From 524811c2caf251235fb19a3a76ea8f9e349b3ea7 Mon Sep 17 00:00:00 2001 From: "Bob.Brockbernd" Date: Tue, 29 Apr 2025 17:18:06 +0200 Subject: [PATCH 3/5] PR comments: docs and naming, default return value for `callAndVerify`, try/finally for pthread_mutex_unlock, --- .../atomicfu/locks/PosixParkingDelegator.kt | 43 +++++++++------- .../atomicfu/locks/PosixParkingDelegator.kt | 4 +- .../atomicfu/locks/ParkingDelegator.kt | 4 ++ .../kotlinx/atomicfu/locks/ParkingSupport.kt | 25 +++++++--- .../kotlinx/atomicfu/locks/ThreadParker.kt | 2 +- .../atomicfu/locks/PosixParkingDelegator.kt | 4 +- .../atomicfu/locks/PosixParkingDelegator.kt | 37 ++++++++------ .../atomicfu/locks/NativeParkingUtils.kt | 6 +-- .../atomicfu/locks/PosixParkingDelegator.kt | 49 +++++++++++-------- 9 files changed, 109 insertions(+), 65 deletions(-) diff --git a/atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt b/atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt index 57de1e50..4039120c 100644 --- a/atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt +++ b/atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt @@ -13,27 +13,30 @@ internal actual object ParkingDelegator { val mut = nativeHeap.alloc().ptr val cond = nativeHeap.alloc().ptr val attr = nativeHeap.alloc().ptr - callAndVerify(0) { pthread_mutex_init(mut, null) } - callAndVerify(0) { pthread_condattr_init(attr) } - callAndVerify(0) { pthread_condattr_setclock(attr, CLOCK_MONOTONIC) } - callAndVerify(0) { pthread_cond_init(cond, attr) } + callAndVerify { pthread_mutex_init(mut, null) } + callAndVerify { pthread_condattr_init(attr) } + callAndVerify { pthread_condattr_setclock(attr, CLOCK_MONOTONIC) } + callAndVerify { pthread_cond_init(cond, attr) } - callAndVerify(0) { pthread_condattr_destroy(attr) } + callAndVerify { pthread_condattr_destroy(attr) } nativeHeap.free(attr) return ParkingData(mut, cond) } actual inline fun wait(ref: ParkingData, shouldWait: () -> Boolean){ - callAndVerify(0) { pthread_mutex_lock(ref.mut) } - if (shouldWait()) callAndVerify(0) { pthread_cond_wait(ref.cond, ref.mut) } - callAndVerify(0) { pthread_mutex_unlock(ref.mut) } + callAndVerify { pthread_mutex_lock(ref.mut) } + try { + if (shouldWait()) callAndVerify { pthread_cond_wait(ref.cond, ref.mut) } + } finally { + callAndVerify { pthread_mutex_unlock(ref.mut) } + } } actual inline fun timedWait(ref: ParkingData, nanos: Long, shouldWait: () -> Boolean): Unit = memScoped { val ts = alloc().ptr // Add nanos to current time - callAndVerify(0) { clock_gettime(CLOCK_MONOTONIC.convert(), ts) } + callAndVerify { clock_gettime(CLOCK_MONOTONIC.convert(), ts) } ts.pointed.tv_sec = ts.pointed.tv_sec.addNanosToSeconds(nanos) ts.pointed.tv_nsec = (ts.pointed.tv_nsec + nanos % 1_000_000_000).convert() //Fix overflow @@ -41,20 +44,26 @@ internal actual object ParkingDelegator { ts.pointed.tv_sec = ts.pointed.tv_sec.addNanosToSeconds(1_000_000_000) ts.pointed.tv_nsec -= 1_000_000_000 } - callAndVerify(0) { pthread_mutex_lock(ref.mut) } - if (shouldWait()) callAndVerify(0, ETIMEDOUT) { pthread_cond_timedwait(ref.cond, ref.mut, ts) } - callAndVerify(0) { pthread_mutex_unlock(ref.mut) } + callAndVerify { pthread_mutex_lock(ref.mut) } + try { + if (shouldWait()) callAndVerify(0, ETIMEDOUT) { pthread_cond_timedwait(ref.cond, ref.mut, ts) } + } finally { + callAndVerify { pthread_mutex_unlock(ref.mut) } + } } actual fun wake(ref: ParkingData) { - callAndVerify(0) { pthread_mutex_lock(ref.mut) } - callAndVerify(0) { pthread_cond_signal(ref.cond) } - callAndVerify(0) { pthread_mutex_unlock(ref.mut) } + callAndVerify { pthread_mutex_lock(ref.mut) } + try { + callAndVerify { pthread_cond_signal(ref.cond) } + } finally { + callAndVerify { pthread_mutex_unlock(ref.mut) } + } } actual fun destroyRef(ref: ParkingData) { - callAndVerify(0) { pthread_mutex_destroy(ref.mut) } - callAndVerify(0) { pthread_cond_destroy(ref.cond) } + callAndVerify { pthread_mutex_destroy(ref.mut) } + callAndVerify { pthread_cond_destroy(ref.cond) } nativeHeap.free(ref.mut) nativeHeap.free(ref.cond) } diff --git a/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt b/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt index eeb1cb41..f727fb98 100644 --- a/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt +++ b/atomicfu/src/appleMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt @@ -7,7 +7,7 @@ import platform.posix.* // CLOCK_REALTIME should be equal to CLOCK_SYSTEM on darwin. // https://github.com/apple/darwin-xnu/blob/2ff845c2e033bd0ff64b5b6aa6063a1f8f65aa32/osfmk/mach/clock_types.h#L70-L73 // Where CLOCK_CALENDAR is the time from epoch. -internal actual val clockId: Int +internal actual val posixGetTimeClockId: Int get() = CLOCK_REALTIME -actual fun setClock(attr: CPointer): Int = 0 +actual fun pthreadCondAttrSetClock(attr: CPointer): Int = 0 diff --git a/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ParkingDelegator.kt b/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ParkingDelegator.kt index 93a8d6cd..d1c23deb 100644 --- a/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ParkingDelegator.kt +++ b/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ParkingDelegator.kt @@ -24,6 +24,10 @@ internal expect object ParkingDelegator { */ internal fun Int.addNanosToSeconds(nanos: Long): Int = (this + nanos / 1_000_000_000).coerceIn(Int.MIN_VALUE.toLong(), Int.MAX_VALUE.toLong()).toInt() + +/** + * Adds nano seconds to current time in seconds. + */ internal fun Long.addNanosToSeconds(nanos: Long): Long { // Should never happen as this is checked in `ThreadParker` diff --git a/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt b/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt index 1ebb5311..5b49ac0d 100644 --- a/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt +++ b/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt @@ -7,8 +7,8 @@ import kotlin.time.TimeMark * Parking and unparking support for threads on Kotlin/Native and Kotlin/JVM. * Can be used as a building block to create locks and other synchronization primitives. * - * A call to [ParkingSupport.park] or [ParkingSupport.parkUntil] will suspend the current thread. - * A suspended thread will wake up in one of the following four cases: + * A call to [ParkingSupport.park] or [ParkingSupport.parkUntil] will pause the current thread. + * A paused thread will resume in one of the following four cases: * - A different thread calls [ParkingSupport.unpark]. * - The given `timeout` or `deadline` is exceeded. * - A spurious wakeup @@ -32,6 +32,10 @@ import kotlin.time.TimeMark * state.value = WAKE * ParkingSupport.unpark(handleReference.value) * ``` + * + * PLEASE NOTE: this is a low-level API and should be used with caution. + * Unless the goal is to create a _synchronization primitive_ like a mutex or semaphore, + * it is advised to a higher level concurrency API like `kotlinx.coroutines` */ expect object ParkingSupport { @@ -71,14 +75,23 @@ expect object ParkingSupport { fun unpark(handle: ParkingHandle) /** - * Returns the [ParkingHandle] that can be used to [unpark] the current thread. + * Returns the [ParkingHandle] corresponding to the current thread. + * This [ParkingHandle] should be shared with other threads which allow them to [unpark] the current thread. + * + * A [ParkingHandle] is uniquely associated with a specific thread, maintaining a one-to-one correspondence. + * When the _same_ thread makes multiple calls to [currentThreadHandle], + * it always returns the _same_ [ParkingHandle]. + * + * Note: as this function returns a unique [ParkingHandle] for each thread it should not be cached or memoized. */ fun currentThreadHandle(): ParkingHandle } /** - * Is used to unpark a thread. - * Can be obtained by calling [ParkingSupport.currentThreadHandle]. - * Is required by [ParkingSupport.unpark]. + * A handle allowing to unpark a thread of execution using [ParkingSupport.unpark]. + * There is a one-to-one mapping between threads and parking handles. + * A handle can be obtained by calling [ParkingSupport.currentThreadHandle]. + * Refer to [ParkingSupport] documentation for more details + * on how to use [ParkingHandle] and how parking works in general. */ expect class ParkingHandle \ No newline at end of file diff --git a/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ThreadParker.kt b/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ThreadParker.kt index 9cbb3340..8b46900a 100644 --- a/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ThreadParker.kt +++ b/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ThreadParker.kt @@ -96,7 +96,7 @@ internal class ThreadParker { } } -private interface ParkingState +private sealed interface ParkingState // The Parker is pre-unparked. The next park call will change state to Free and return immediately. private object Unparked : ParkingState diff --git a/atomicfu/src/linuxMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt b/atomicfu/src/linuxMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt index feeb1eb7..04aed602 100644 --- a/atomicfu/src/linuxMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt +++ b/atomicfu/src/linuxMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt @@ -3,10 +3,10 @@ package kotlinx.atomicfu.locks import kotlinx.cinterop.CPointer import platform.posix.* -internal actual val clockId: Int +internal actual val posixGetTimeClockId: Int get() = CLOCK_MONOTONIC // Sets monotonic clock to prevent time updates from interfering with waiting durations. -internal actual fun setClock(attr: CPointer): Int = +internal actual fun pthreadCondAttrSetClock(attr: CPointer): Int = pthread_condattr_setclock(attr, CLOCK_MONOTONIC) diff --git a/atomicfu/src/mingwMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt b/atomicfu/src/mingwMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt index 088a4bc8..4a302a25 100644 --- a/atomicfu/src/mingwMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt +++ b/atomicfu/src/mingwMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt @@ -11,22 +11,25 @@ internal actual object ParkingDelegator { actual fun createRef(): ParkingData { val mut = nativeHeap.alloc().ptr val cond = nativeHeap.alloc().ptr - callAndVerify(0) { pthread_mutex_init(mut, null) } - callAndVerify(0) { pthread_cond_init(cond, null) } + callAndVerify { pthread_mutex_init(mut, null) } + callAndVerify { pthread_cond_init(cond, null) } return ParkingData(mut, cond) } actual inline fun wait(ref: ParkingData, shouldWait: () -> Boolean) { - callAndVerify(0) { pthread_mutex_lock(ref.mut) } - if (shouldWait()) callAndVerify(0) { pthread_cond_wait(ref.cond, ref.mut) } - callAndVerify(0) { pthread_mutex_unlock(ref.mut) } + callAndVerify { pthread_mutex_lock(ref.mut) } + try { + if (shouldWait()) callAndVerify { pthread_cond_wait(ref.cond, ref.mut) } + } finally { + callAndVerify { pthread_mutex_unlock(ref.mut) } + } } actual inline fun timedWait(ref: ParkingData, nanos: Long, shouldWait: () -> Boolean): Unit = memScoped { val ts = alloc().ptr // Add nanos to current time - callAndVerify(0) { clock_gettime(CLOCK_REALTIME, ts) } + callAndVerify { clock_gettime(CLOCK_REALTIME, ts) } // According to https://learn.microsoft.com/en-us/windows/win32/api/minwinbase/ns-minwinbase-systemtime // the maximum year on windows is 30827. // Adding Long.MAX_VALUE / 1_000_000_000 should not be able to overflow. @@ -38,20 +41,26 @@ internal actual object ParkingDelegator { ts.pointed.tv_sec += 1 ts.pointed.tv_nsec -= 1_000_000_000 } - callAndVerify(0) { pthread_mutex_lock(ref.mut) } - if (shouldWait()) callAndVerify(0, ETIMEDOUT) { pthread_cond_timedwait(ref.cond, ref.mut, ts) } - callAndVerify(0) { pthread_mutex_unlock(ref.mut) } + callAndVerify { pthread_mutex_lock(ref.mut) } + try { + if (shouldWait()) callAndVerify(0, ETIMEDOUT) { pthread_cond_timedwait(ref.cond, ref.mut, ts) } + } finally { + callAndVerify { pthread_mutex_unlock(ref.mut) } + } } actual fun wake(ref: ParkingData) { - callAndVerify(0) { pthread_mutex_lock(ref.mut) } - callAndVerify(0) { pthread_cond_signal(ref.cond) } - callAndVerify(0) { pthread_mutex_unlock(ref.mut) } + callAndVerify { pthread_mutex_lock(ref.mut) } + try { + callAndVerify { pthread_cond_signal(ref.cond) } + } finally { + callAndVerify { pthread_mutex_unlock(ref.mut) } + } } actual fun destroyRef(ref: ParkingData) { - callAndVerify(0) { pthread_mutex_destroy(ref.mut) } - callAndVerify(0) { pthread_cond_destroy(ref.cond) } + callAndVerify { pthread_mutex_destroy(ref.mut) } + callAndVerify { pthread_cond_destroy(ref.cond) } nativeHeap.free(ref.mut) nativeHeap.free(ref.cond) } diff --git a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/NativeParkingUtils.kt b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/NativeParkingUtils.kt index 2aaa71a4..4f6b96ab 100644 --- a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/NativeParkingUtils.kt +++ b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/NativeParkingUtils.kt @@ -1,8 +1,8 @@ package kotlinx.atomicfu.locks -import platform.posix.errno +import platform.posix.strerror -internal inline fun callAndVerify(expectedReturn: Int, block: () -> Int) = block().also { +internal inline fun callAndVerify(expectedReturn: Int = 0, block: () -> Int) = block().also { check(it == expectedReturn) { errorString(it, expectedReturn) } @@ -15,4 +15,4 @@ internal inline fun callAndVerify(firstExpectedReturn: Int, secondExpectedReturn } private fun errorString(actualValue: Int, vararg expectedReturn: Int) = - "Calling native, expected one return status of ${expectedReturn.joinToString(", ")}, but was $actualValue. With errno: $errno" \ No newline at end of file + "Calling native, expected one return status of ${expectedReturn.joinToString(", ")}, but was $actualValue. With message ${strerror(actualValue)}" \ No newline at end of file diff --git a/atomicfu/src/nativeUnixLikeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt b/atomicfu/src/nativeUnixLikeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt index 80560858..54b7d28c 100644 --- a/atomicfu/src/nativeUnixLikeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt +++ b/atomicfu/src/nativeUnixLikeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt @@ -13,27 +13,30 @@ internal actual object ParkingDelegator { val mut = nativeHeap.alloc().ptr val cond = nativeHeap.alloc().ptr val attr = nativeHeap.alloc().ptr - callAndVerify(0) { pthread_mutex_init(mut, null) } - callAndVerify(0) { pthread_condattr_init(attr) } - callAndVerify(0) { setClock(attr) } - callAndVerify(0) { pthread_cond_init(cond, attr) } + callAndVerify { pthread_mutex_init(mut, null) } + callAndVerify { pthread_condattr_init(attr) } + callAndVerify { pthreadCondAttrSetClock(attr) } + callAndVerify { pthread_cond_init(cond, attr) } - callAndVerify(0) { pthread_condattr_destroy(attr) } + callAndVerify { pthread_condattr_destroy(attr) } nativeHeap.free(attr) return ParkingData(mut, cond) } - actual inline fun wait(ref: ParkingData, shouldWait: () -> Boolean){ - callAndVerify(0) { pthread_mutex_lock(ref.mut) } - if (shouldWait()) callAndVerify(0) { pthread_cond_wait(ref.cond, ref.mut) } - callAndVerify(0) { pthread_mutex_unlock(ref.mut) } + actual inline fun wait(ref: ParkingData, shouldWait: () -> Boolean) { + callAndVerify { pthread_mutex_lock(ref.mut) } + try { + if (shouldWait()) callAndVerify { pthread_cond_wait(ref.cond, ref.mut) } + } finally { + callAndVerify { pthread_mutex_unlock(ref.mut) } + } } actual inline fun timedWait(ref: ParkingData, nanos: Long, shouldWait: () -> Boolean): Unit = memScoped { val ts = alloc().ptr // Add nanos to current time - callAndVerify(0) { clock_gettime(clockId.convert(), ts) } + callAndVerify { clock_gettime(posixGetTimeClockId.convert(), ts) } ts.pointed.tv_sec = ts.pointed.tv_sec.addNanosToSeconds(nanos) ts.pointed.tv_nsec = (ts.pointed.tv_nsec + nanos % 1_000_000_000).convert() //Fix overflow @@ -41,20 +44,26 @@ internal actual object ParkingDelegator { ts.pointed.tv_sec = ts.pointed.tv_sec.addNanosToSeconds(1_000_000_000) ts.pointed.tv_nsec -= 1_000_000_000 } - callAndVerify(0) { pthread_mutex_lock(ref.mut) } - if (shouldWait()) callAndVerify(0, ETIMEDOUT) { pthread_cond_timedwait(ref.cond, ref.mut, ts) } - callAndVerify(0) { pthread_mutex_unlock(ref.mut) } + callAndVerify { pthread_mutex_lock(ref.mut) } + try { + if (shouldWait()) callAndVerify(0, ETIMEDOUT) { pthread_cond_timedwait(ref.cond, ref.mut, ts) } + } finally { + callAndVerify { pthread_mutex_unlock(ref.mut) } + } } actual fun wake(ref: ParkingData) { - callAndVerify(0) { pthread_mutex_lock(ref.mut) } - callAndVerify(0) { pthread_cond_signal(ref.cond) } - callAndVerify(0) { pthread_mutex_unlock(ref.mut) } + callAndVerify { pthread_mutex_lock(ref.mut) } + try { + callAndVerify { pthread_cond_signal(ref.cond) } + } finally { + callAndVerify { pthread_mutex_unlock(ref.mut) } + } } actual fun destroyRef(ref: ParkingData) { - callAndVerify(0) { pthread_mutex_destroy(ref.mut) } - callAndVerify(0) { pthread_cond_destroy(ref.cond) } + callAndVerify { pthread_mutex_destroy(ref.mut) } + callAndVerify { pthread_cond_destroy(ref.cond) } nativeHeap.free(ref.mut) nativeHeap.free(ref.cond) } @@ -62,5 +71,5 @@ internal actual object ParkingDelegator { internal actual class ParkingData(val mut: CPointer, val cond: CPointer) -internal expect val clockId: Int -internal expect fun setClock(attr: CPointer): Int \ No newline at end of file +internal expect val posixGetTimeClockId: Int +internal expect fun pthreadCondAttrSetClock(attr: CPointer): Int \ No newline at end of file From 32217bda56f5bd23a7a8d7a3e31f3effe620f861 Mon Sep 17 00:00:00 2001 From: "Bob.Brockbernd" Date: Wed, 30 Apr 2025 15:32:40 +0200 Subject: [PATCH 4/5] Timed parking test improvements: parameterize tests, more slack in time asserts, cover `parkUntil` --- .../atomicfu/locks/TimedParkingTest.kt | 173 +++--------------- 1 file changed, 29 insertions(+), 144 deletions(-) diff --git a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TimedParkingTest.kt b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TimedParkingTest.kt index 6baefa5d..b5abe55c 100644 --- a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TimedParkingTest.kt +++ b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TimedParkingTest.kt @@ -6,176 +6,61 @@ import kotlin.test.assertTrue import kotlin.time.measureTime import kotlin.IllegalStateException import kotlin.time.Duration.Companion.milliseconds -import kotlin.time.Duration.Companion.nanoseconds +import kotlin.time.TimeSource class TimedParkingTest { @Test - fun testNanosFirstUnpark400() = retry(3) { - var handle1: ParkingHandle? = null - - val thread1 = Fut { - handle1 = ParkingSupport.currentThreadHandle() - val t = measureTime { - ParkingSupport.park(600.milliseconds) - } - assertTrue(t.inWholeMilliseconds > 300) - assertTrue(t.inWholeMilliseconds < 500) - } - - sleepMillis(400) - ParkingSupport.unpark(handle1!!) - - thread1.waitThrowing() - } - + fun testFirstUnpark400() = testFirstUnpark(400) @Test - fun testNanosFirstUnpark700() = retry(3) { - var handle1: ParkingHandle? = null - - val thread1 = Fut { - handle1 = ParkingSupport.currentThreadHandle() - val t = measureTime { - ParkingSupport.park(900.milliseconds) - } - assertTrue(t.inWholeMilliseconds > 600) - assertTrue(t.inWholeMilliseconds < 800) - } - - sleepMillis(700) - ParkingSupport.unpark(handle1!!) - - thread1.waitThrowing() - } - + fun testFirstUnpark700() = testFirstUnpark(700) @Test - fun testNanosFirstUnpark1000() = retry(3) { - var handle1: ParkingHandle? = null - - val thread1 = Fut { - handle1 = ParkingSupport.currentThreadHandle() - val t = measureTime { - ParkingSupport.park(1200.milliseconds) - } - assertTrue(t.inWholeMilliseconds > 900) - assertTrue(t.inWholeMilliseconds < 1100) - } - - sleepMillis(1000) - ParkingSupport.unpark(handle1!!) - - thread1.waitThrowing() - } - + fun testFirstUnpark1000() = testFirstUnpark(1000) @Test - fun testNanosFirstUnparkLongMax() = retry(3) { - var handle1: ParkingHandle? = null - - val thread1 = Fut { - handle1 = ParkingSupport.currentThreadHandle() - val t = measureTime { - ParkingSupport.park(Long.MAX_VALUE.nanoseconds) - } - assertTrue(t.inWholeMilliseconds > 900) - assertTrue(t.inWholeMilliseconds < 1100) - } - - sleepMillis(1000) - ParkingSupport.unpark(handle1!!) - - thread1.waitThrowing() - } - + fun testFirstUnparkLongMax() = testFirstUnpark(1000, Long.MAX_VALUE) @Test - fun testNanosFirstUnparkIntMax() = retry(3) { - var handle1: ParkingHandle? = null - - val thread1 = Fut { - handle1 = ParkingSupport.currentThreadHandle() - val t = measureTime { - ParkingSupport.park(Int.MAX_VALUE.toLong().nanoseconds) - } - assertTrue(t.inWholeMilliseconds > 900) - assertTrue(t.inWholeMilliseconds < 1100) - } + fun testFirstUnpark3rdLong() = testFirstUnpark(1000, Long.MAX_VALUE / 3) - sleepMillis(1000) - ParkingSupport.unpark(handle1!!) - - thread1.waitThrowing() - } - @Test - fun testNanosFirstUnpark3rdLong() = retry(3) { - var handle1: ParkingHandle? = null - - val thread1 = Fut { - handle1 = ParkingSupport.currentThreadHandle() - val t = measureTime { - ParkingSupport.park((Long.MAX_VALUE / 3).nanoseconds) - } - assertTrue(t.inWholeMilliseconds > 900) - assertTrue(t.inWholeMilliseconds < 1100) - } - - sleepMillis(1000) - ParkingSupport.unpark(handle1!!) - - thread1.waitThrowing() - } - + fun testFirstTimeout400() = testFirstTimeout(400) @Test - fun testNanosFirstDeadline400() = retry(3) { - var handle1: ParkingHandle? = null + fun testFirstTimeout700() = testFirstTimeout(700) + @Test + fun testFirstTimeout1200() = testFirstTimeout(1200) + @Test + fun testFirstDeadline400() = testFirstDeadline(400) + @Test + fun testFirstDeadline700() = testFirstDeadline(700) + @Test + fun testFirstDeadline1200() = testFirstDeadline(1200) + + private fun testFirstTimeout(timeOutMillis: Long) = retry(3) { val thread1 = Fut { - handle1 = ParkingSupport.currentThreadHandle() - val t = measureTime { - ParkingSupport.park(400.milliseconds) - } - assertTrue(t.inWholeMilliseconds > 300) - assertTrue(t.inWholeMilliseconds < 500) + val t = measureTime { ParkingSupport.park(timeOutMillis.milliseconds) } + assertTrue(t.inWholeMilliseconds > timeOutMillis - 50) } - - sleepMillis(600) - ParkingSupport.unpark(handle1!!) - thread1.waitThrowing() } - - @Test - fun testNanosFirstDeadline700() = retry(3) { - var handle1: ParkingHandle? = null - + + private fun testFirstDeadline(timeOutMillis: Long) = retry(3) { val thread1 = Fut { - handle1 = ParkingSupport.currentThreadHandle() - val t = measureTime { - ParkingSupport.park(700.milliseconds) - } - assertTrue(t.inWholeMilliseconds > 600) - assertTrue(t.inWholeMilliseconds < 800) + val mark = TimeSource.Monotonic.markNow() + timeOutMillis.milliseconds + val t = measureTime { ParkingSupport.parkUntil(mark) } + assertTrue(t.inWholeMilliseconds > timeOutMillis - 50) } - - sleepMillis(900) - ParkingSupport.unpark(handle1!!) - thread1.waitThrowing() } - @Test - fun testNanosFirstDeadline1200() = retry(3) { + private fun testFirstUnpark(unparkAfterMillis: Long, parkForMillis: Long = unparkAfterMillis + 500) = retry(3) { var handle1: ParkingHandle? = null - val thread1 = Fut { handle1 = ParkingSupport.currentThreadHandle() - val t = measureTime { - ParkingSupport.park(1000.milliseconds) - } - assertTrue(t.inWholeMilliseconds > 900) - assertTrue(t.inWholeMilliseconds < 1100) + val t = measureTime { ParkingSupport.park((parkForMillis).milliseconds) } + assertTrue(t.inWholeMilliseconds < parkForMillis ) } - sleepMillis(1200) + sleepMillis(unparkAfterMillis) ParkingSupport.unpark(handle1!!) thread1.waitThrowing() From 0260de294d4a08ce9e415c9de300fbfe530dc503 Mon Sep 17 00:00:00 2001 From: "Bob.Brockbernd" Date: Wed, 30 Apr 2025 15:42:03 +0200 Subject: [PATCH 5/5] Formatting --- .../atomicfu/locks/PosixParkingDelegator.kt | 14 ++++---- .../atomicfu/locks/ParkingDelegator.kt | 12 +++---- .../kotlinx/atomicfu/locks/ParkingSupport.kt | 35 ++++++++++--------- .../kotlinx/atomicfu/locks/ThreadParker.kt | 19 +++++----- .../kotlinx/atomicfu/locks/BarrierTest.kt | 9 +++-- .../atomicfu/locks/CyclicBarrierTest.kt | 17 +++++---- .../kotlinx/atomicfu/locks/ExchangerTest.kt | 6 ++-- .../kotlinx/atomicfu/locks/LatchTest.kt | 17 +++++---- .../kotlinx/atomicfu/locks/TestThread.kt | 2 +- .../atomicfu/locks/ThreadParkerTest.kt | 18 +++++----- .../atomicfu/locks/ThreadParkingStressTest.kt | 1 + .../atomicfu/locks/TimeArithmeticTests.kt | 17 +++++---- .../atomicfu/locks/TimedParkingTest.kt | 23 +++++++----- .../atomicfu/locks/JvmParkingDelegator.kt | 5 +-- .../kotlinx/atomicfu/locks/ParkingSupport.kt | 2 ++ .../kotlinx/atomicfu/locks/TestThread.jvm.kt | 3 +- .../atomicfu/locks/PosixParkingDelegator.kt | 4 +-- .../kotlinx/atomicfu/locks/ParkingSupport.kt | 1 + .../atomicfu/locks/PosixParkingDelegator.kt | 6 +--- 19 files changed, 116 insertions(+), 95 deletions(-) diff --git a/atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt b/atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt index 4039120c..c74497db 100644 --- a/atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt +++ b/atomicfu/src/androidNativeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt @@ -1,10 +1,6 @@ package kotlinx.atomicfu.locks import kotlinx.cinterop.* -import kotlinx.cinterop.alloc -import kotlinx.cinterop.free -import kotlinx.cinterop.pointed -import kotlinx.cinterop.ptr import platform.posix.* @OptIn(ExperimentalForeignApi::class, UnsafeNumber::class) @@ -23,7 +19,7 @@ internal actual object ParkingDelegator { return ParkingData(mut, cond) } - actual inline fun wait(ref: ParkingData, shouldWait: () -> Boolean){ + actual inline fun wait(ref: ParkingData, shouldWait: () -> Boolean) { callAndVerify { pthread_mutex_lock(ref.mut) } try { if (shouldWait()) callAndVerify { pthread_cond_wait(ref.cond, ref.mut) } @@ -31,7 +27,7 @@ internal actual object ParkingDelegator { callAndVerify { pthread_mutex_unlock(ref.mut) } } } - + actual inline fun timedWait(ref: ParkingData, nanos: Long, shouldWait: () -> Boolean): Unit = memScoped { val ts = alloc().ptr @@ -68,4 +64,8 @@ internal actual object ParkingDelegator { nativeHeap.free(ref.cond) } } -internal actual class ParkingData @OptIn(UnsafeNumber::class) constructor(val mut: CPointer, val cond: CPointer) + +internal actual class ParkingData @OptIn(UnsafeNumber::class) constructor( + val mut: CPointer, + val cond: CPointer +) diff --git a/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ParkingDelegator.kt b/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ParkingDelegator.kt index d1c23deb..9a53ec70 100644 --- a/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ParkingDelegator.kt +++ b/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ParkingDelegator.kt @@ -1,7 +1,7 @@ package kotlinx.atomicfu.locks /** - * Object that stores references that need to be manually destroyed and deallocated, + * Object that stores references that need to be manually destroyed and deallocated, * after native pthread_cond_wait usage. */ internal expect class ParkingData @@ -12,7 +12,7 @@ internal expect class ParkingData */ internal expect object ParkingDelegator { fun createRef(): ParkingData - fun wait(ref: ParkingData, shouldWait: () -> Boolean) + fun wait(ref: ParkingData, shouldWait: () -> Boolean) fun timedWait(ref: ParkingData, nanos: Long, shouldWait: () -> Boolean) fun wake(ref: ParkingData) fun destroyRef(ref: ParkingData) @@ -22,18 +22,18 @@ internal expect object ParkingDelegator { * Adds nano seconds to current time in seconds. * Clamps for Int. */ -internal fun Int.addNanosToSeconds(nanos: Long): Int = +internal fun Int.addNanosToSeconds(nanos: Long): Int = (this + nanos / 1_000_000_000).coerceIn(Int.MIN_VALUE.toLong(), Int.MAX_VALUE.toLong()).toInt() /** * Adds nano seconds to current time in seconds. */ internal fun Long.addNanosToSeconds(nanos: Long): Long { - + // Should never happen as this is checked in `ThreadParker` check(nanos >= 0) { "Cannot wait for a negative number of nanoseconds" } - val result = this + nanos / 1_000_000_000 - + val result = this + nanos / 1_000_000_000 + // Overflow check: should never happen since this is very far into the future. check(!(this xor result < 0 && this >= 0)) { "Nano seconds addition overflowed, current time in seconds is $this" } return result diff --git a/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt b/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt index 5b49ac0d..afa4e879 100644 --- a/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt +++ b/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt @@ -1,10 +1,13 @@ package kotlinx.atomicfu.locks +import kotlinx.atomicfu.locks.ParkingSupport.currentThreadHandle +import kotlinx.atomicfu.locks.ParkingSupport.park +import kotlinx.atomicfu.locks.ParkingSupport.unpark import kotlin.time.Duration import kotlin.time.TimeMark /** - * Parking and unparking support for threads on Kotlin/Native and Kotlin/JVM. + * Parking and unparking support for threads on Kotlin/Native and Kotlin/JVM. * Can be used as a building block to create locks and other synchronization primitives. * * A call to [ParkingSupport.park] or [ParkingSupport.parkUntil] will pause the current thread. @@ -14,9 +17,9 @@ import kotlin.time.TimeMark * - A spurious wakeup * - (Only on JVM) The thread was interrupted. The interrupted flag stays set after wakeup. * A future call to [park] this thread will return immediately, unless the `Thread.interrupted` flag is cleared. - * + * * The caller is responsible for verifying the reason of wakeup and how to respond accordingly. - * + * * Example usage parking thread: * ```Kotlin * // publish my parking handle @@ -26,22 +29,22 @@ import kotlin.time.TimeMark * ParkingSupport.park(Duration.INFINITE) * } * ``` - * + * * Example usage unparker thread: * ```Kotlin * state.value = WAKE * ParkingSupport.unpark(handleReference.value) * ``` - * + * * PLEASE NOTE: this is a low-level API and should be used with caution. * Unless the goal is to create a _synchronization primitive_ like a mutex or semaphore, * it is advised to a higher level concurrency API like `kotlinx.coroutines` */ expect object ParkingSupport { - + /** * Parks the current thread for [timeout] duration. - * + * * Wakes up in the following cases: * - A different thread calls [ParkingSupport.unpark]. * - The [timeout] is exceeded. @@ -50,7 +53,7 @@ expect object ParkingSupport { * A future call to [park] this thread will return immediately, unless the `Thread.interrupted` flag is cleared. */ fun park(timeout: Duration) - + /** * Parks the current thread until [deadline] is reached. * @@ -67,21 +70,21 @@ expect object ParkingSupport { * Unparks the thread corresponding to [handle]. * If [unpark] is called while the corresponding thread is not parked, the next [park] call will return immediately * — the [ParkingHandle] is unparked ahead of time. - * - * A [ParkingHandle] can only _remember_ one pre-unpark attempt at a time. - * Meaning, when two consecutive [unpark] calls are made while the corresponding thread is not parked, + * + * A [ParkingHandle] can only _remember_ one pre-unpark attempt at a time. + * Meaning, when two consecutive [unpark] calls are made while the corresponding thread is not parked, * only the next park call will return immediately — [unpark] calls are not accumulated. */ fun unpark(handle: ParkingHandle) /** - * Returns the [ParkingHandle] corresponding to the current thread. + * Returns the [ParkingHandle] corresponding to the current thread. * This [ParkingHandle] should be shared with other threads which allow them to [unpark] the current thread. - * + * * A [ParkingHandle] is uniquely associated with a specific thread, maintaining a one-to-one correspondence. - * When the _same_ thread makes multiple calls to [currentThreadHandle], + * When the _same_ thread makes multiple calls to [currentThreadHandle], * it always returns the _same_ [ParkingHandle]. - * + * * Note: as this function returns a unique [ParkingHandle] for each thread it should not be cached or memoized. */ fun currentThreadHandle(): ParkingHandle @@ -91,7 +94,7 @@ expect object ParkingSupport { * A handle allowing to unpark a thread of execution using [ParkingSupport.unpark]. * There is a one-to-one mapping between threads and parking handles. * A handle can be obtained by calling [ParkingSupport.currentThreadHandle]. - * Refer to [ParkingSupport] documentation for more details + * Refer to [ParkingSupport] documentation for more details * on how to use [ParkingHandle] and how parking works in general. */ expect class ParkingHandle \ No newline at end of file diff --git a/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ThreadParker.kt b/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ThreadParker.kt index 8b46900a..b3ed5c9b 100644 --- a/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ThreadParker.kt +++ b/atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/ThreadParker.kt @@ -6,7 +6,7 @@ import kotlin.time.TimeSource /** * Thread parker for Kotlin/Native based on POSIX calls. - * Resides in a shared sourceSet with JVM, to be testable with Lincheck. + * Resides in a shared sourceSet with JVM, to be testable with Lincheck. * (Which is part of PR #508) */ internal class ThreadParker { @@ -16,7 +16,7 @@ internal class ThreadParker { fun park() = parkWith { data -> delegator.wait(data) { state.value is Parked } } - + fun parkNanos(nanos: Long) { val mark = TimeSource.Monotonic.markNow() parkWith { data -> @@ -35,7 +35,7 @@ internal class ThreadParker { delegator.destroyRef(pd) continue } - + invokeWait(pd) while (true) { @@ -45,7 +45,7 @@ internal class ThreadParker { delegator.destroyRef(pd) return } - + // If other thread is unparking return. Let unparking thread deal with cleanup. is Unparking -> if (state.compareAndSet(changedState, Free)) return @@ -64,7 +64,7 @@ internal class ThreadParker { } // Parker was pre unparked. Set to free and continue. Unparked -> if (state.compareAndSet(Unparked, Free)) return - + // The states below should only be reachable if parking thread has not yet returned. is Parked -> throw IllegalStateException("Thread should not be able to call park when it is already parked") is Unparking -> throw IllegalStateException("Thread should not be able to call park when it is already parked") @@ -75,14 +75,14 @@ internal class ThreadParker { fun unpark() { val myUnparkingState = Unparking() while (true) { - when (val currentState = state.value) { - + when (val currentState = state.value) { + // Is already unparked Unparked -> return is Unparking -> return - + Free -> if (state.compareAndSet(Free, Unparked)) return - + // Is parked -> try unpark is Parked -> if (state.compareAndSet(currentState, myUnparkingState)) { delegator.wake(currentState.data) @@ -97,6 +97,7 @@ internal class ThreadParker { } private sealed interface ParkingState + // The Parker is pre-unparked. The next park call will change state to Free and return immediately. private object Unparked : ParkingState diff --git a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/BarrierTest.kt b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/BarrierTest.kt index d1a8c206..d6e0f05f 100644 --- a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/BarrierTest.kt +++ b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/BarrierTest.kt @@ -16,11 +16,15 @@ class BarrierTest { private class Arrs(numberOfThreads: Int) { val after = AtomicIntArray(numberOfThreads) val before = AtomicIntArray(numberOfThreads) - init {repeat(numberOfThreads) { + + init { + repeat(numberOfThreads) { after[it].value = 0 before[it].value = 0 - }} + } + } } + @Test fun testBarrier() { repeat(TEST_ITERATIONS) { iteration -> @@ -60,6 +64,7 @@ private class Barrier(private val parties: Int) { init { require(parties > 1) } + private val count = atomic(0) private val waiters = atomicArrayOfNulls(parties - 1) diff --git a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/CyclicBarrierTest.kt b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/CyclicBarrierTest.kt index 6cd56eb0..4cb04f20 100644 --- a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/CyclicBarrierTest.kt +++ b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/CyclicBarrierTest.kt @@ -16,12 +16,15 @@ class CyclicBarrierTest { private class Arrs(numberOfThreads: Int) { val after = AtomicIntArray(numberOfThreads) val before = AtomicIntArray(numberOfThreads) - init {repeat(numberOfThreads) { - after[it].value = 0 - before[it].value = 0 - }} + + init { + repeat(numberOfThreads) { + after[it].value = 0 + before[it].value = 0 + } + } } - + @Test fun stressCyclicBarrier() { BARRIER_SIZES.forEach { barrierSize -> @@ -94,8 +97,7 @@ private class MSQueueCyclicBarrier { if (curTail.next.compareAndSet(null, node)) { tail.compareAndSet(curTail, node) return node.id - } - else tail.compareAndSet(curTail, curTail.next.value!!) + } else tail.compareAndSet(curTail, curTail.next.value!!) } } @@ -111,6 +113,7 @@ private class MSQueueCyclicBarrier { } } } + private class Node(var element: E?, val id: Long) { val next = atomic?>(null) } diff --git a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ExchangerTest.kt b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ExchangerTest.kt index a30c2b50..5d83eb1c 100644 --- a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ExchangerTest.kt +++ b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ExchangerTest.kt @@ -8,16 +8,16 @@ import kotlin.time.Duration private const val N_ITEMS_TO_SWAP = 100_000 class ExchangerTest { - + @Test fun exchangeTwoLists() { val aBefore = List(N_ITEMS_TO_SWAP) { 0 } val bBefore = List(N_ITEMS_TO_SWAP) { 1 } val aAfter = mutableListOf() val bAfter = mutableListOf() - + val exchanger = Exchanger() - + val at = testThread { aBefore.forEachIndexed { i, v -> val item = exchanger.exchange(v) diff --git a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/LatchTest.kt b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/LatchTest.kt index 9eaa7624..1da920cf 100644 --- a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/LatchTest.kt +++ b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/LatchTest.kt @@ -15,12 +15,15 @@ class LatchTest { private class Arrs(numberOfThreads: Int) { val after = AtomicIntArray(numberOfThreads) val before = AtomicIntArray(numberOfThreads) - init {repeat(numberOfThreads) { - after[it].value = 0 - before[it].value = 0 - }} + + init { + repeat(numberOfThreads) { + after[it].value = 0 + before[it].value = 0 + } + } } - + @Test fun latchTest() { repeat(TEST_ITERATIONS) { iteration -> @@ -90,8 +93,7 @@ private class MSQueueLatch { if (curTail.next.compareAndSet(null, node)) { tail.compareAndSet(curTail, node) return - } - else tail.compareAndSet(curTail, curTail.next.value!!) + } else tail.compareAndSet(curTail, curTail.next.value!!) } } @@ -106,6 +108,7 @@ private class MSQueueLatch { } } } + private class Node(var element: E?) { val next = atomic?>(null) } diff --git a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TestThread.kt b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TestThread.kt index a66ffa5e..3eeb4e23 100644 --- a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TestThread.kt +++ b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TestThread.kt @@ -1,6 +1,6 @@ package kotlinx.atomicfu.locks -internal fun testThread(doConcurrent: () -> Unit): TestThread = TestThread(doConcurrent) +internal fun testThread(doConcurrent: () -> Unit): TestThread = TestThread(doConcurrent) internal expect class TestThread(toDo: () -> Unit) { fun join() diff --git a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ThreadParkerTest.kt b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ThreadParkerTest.kt index 39424d94..2b5b8488 100644 --- a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ThreadParkerTest.kt +++ b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ThreadParkerTest.kt @@ -11,16 +11,16 @@ class ThreadParkerTest { @Test fun parkUnpark() { var parkingHandle: ParkingHandle? = null - - val f = Fut { + + val f = Fut { parkingHandle = ParkingSupport.currentThreadHandle() ParkingSupport.park(Duration.INFINITE) } - + // Allow thread to be parked before unpark call sleepMillis(100) ParkingSupport.unpark(parkingHandle!!) - + f.waitThrowing() } @@ -29,11 +29,11 @@ class ThreadParkerTest { val f = Fut { atomicHandle.value = ParkingSupport.currentThreadHandle() - - while (!isPreUnparked.value) { - sleepMillis(10) + + while (!isPreUnparked.value) { + sleepMillis(10) } - + ParkingSupport.park(Duration.INFINITE) } @@ -43,7 +43,7 @@ class ThreadParkerTest { ParkingSupport.unpark(atomicHandle.value!!) isPreUnparked.value = true - + f.waitThrowing() } } \ No newline at end of file diff --git a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ThreadParkingStressTest.kt b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ThreadParkingStressTest.kt index c3075e4c..c32f3ede 100644 --- a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ThreadParkingStressTest.kt +++ b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/ThreadParkingStressTest.kt @@ -15,6 +15,7 @@ class ThreadParkingStressTest { val handle = atomic(null) val done = atomic(false) } + @Test fun parkingStress() { val duration = measureTime { diff --git a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TimeArithmeticTests.kt b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TimeArithmeticTests.kt index f219df30..dc52a16c 100644 --- a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TimeArithmeticTests.kt +++ b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TimeArithmeticTests.kt @@ -6,9 +6,8 @@ import kotlin.test.Test import kotlin.test.assertTrue class TimeArithmeticTests { - - - + + @Test fun timeArithmeticTest() { val currentTimes = listOf( @@ -20,10 +19,10 @@ class TimeArithmeticTests { 31_560_000 * 500, 31_560_000 * 1000, 31_560_000 * 10000, - ) - - currentTimes.forEach { currentTimeInSeconds -> - + ) + + currentTimes.forEach { currentTimeInSeconds -> + // Test Long repeat(1000) { val nanos = Random.nextLong().absoluteValue @@ -42,6 +41,6 @@ class TimeArithmeticTests { } } } - - + + } \ No newline at end of file diff --git a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TimedParkingTest.kt b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TimedParkingTest.kt index b5abe55c..ae76137e 100644 --- a/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TimedParkingTest.kt +++ b/atomicfu/src/concurrentTest/kotlin/kotlinx/atomicfu/locks/TimedParkingTest.kt @@ -3,38 +3,45 @@ package kotlinx.atomicfu.locks import kotlinx.atomicfu.atomic import kotlin.test.Test import kotlin.test.assertTrue -import kotlin.time.measureTime -import kotlin.IllegalStateException import kotlin.time.Duration.Companion.milliseconds import kotlin.time.TimeSource +import kotlin.time.measureTime class TimedParkingTest { @Test fun testFirstUnpark400() = testFirstUnpark(400) + @Test fun testFirstUnpark700() = testFirstUnpark(700) + @Test fun testFirstUnpark1000() = testFirstUnpark(1000) + @Test fun testFirstUnparkLongMax() = testFirstUnpark(1000, Long.MAX_VALUE) + @Test fun testFirstUnpark3rdLong() = testFirstUnpark(1000, Long.MAX_VALUE / 3) @Test fun testFirstTimeout400() = testFirstTimeout(400) + @Test fun testFirstTimeout700() = testFirstTimeout(700) + @Test fun testFirstTimeout1200() = testFirstTimeout(1200) @Test fun testFirstDeadline400() = testFirstDeadline(400) + @Test fun testFirstDeadline700() = testFirstDeadline(700) + @Test fun testFirstDeadline1200() = testFirstDeadline(1200) - + private fun testFirstTimeout(timeOutMillis: Long) = retry(3) { val thread1 = Fut { val t = measureTime { ParkingSupport.park(timeOutMillis.milliseconds) } @@ -42,7 +49,7 @@ class TimedParkingTest { } thread1.waitThrowing() } - + private fun testFirstDeadline(timeOutMillis: Long) = retry(3) { val thread1 = Fut { val mark = TimeSource.Monotonic.markNow() + timeOutMillis.milliseconds @@ -57,7 +64,7 @@ class TimedParkingTest { val thread1 = Fut { handle1 = ParkingSupport.currentThreadHandle() val t = measureTime { ParkingSupport.park((parkForMillis).milliseconds) } - assertTrue(t.inWholeMilliseconds < parkForMillis ) + assertTrue(t.inWholeMilliseconds < parkForMillis) } sleepMillis(unparkAfterMillis) @@ -66,7 +73,7 @@ class TimedParkingTest { thread1.waitThrowing() } - private fun retry(times: Int, block: () -> Unit): Unit { + private fun retry(times: Int, block: () -> Unit) { var lastThrowable: Throwable? = null repeat(times) { try { @@ -75,8 +82,8 @@ class TimedParkingTest { lastThrowable = t } } - lastThrowable?.let { - throw IllegalStateException("Failed after $times retries").apply { addSuppressed(it) } + lastThrowable?.let { + throw IllegalStateException("Failed after $times retries").apply { addSuppressed(it) } } } } diff --git a/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/JvmParkingDelegator.kt b/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/JvmParkingDelegator.kt index 5ef5dbac..a629cb14 100644 --- a/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/JvmParkingDelegator.kt +++ b/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/JvmParkingDelegator.kt @@ -1,14 +1,15 @@ package kotlinx.atomicfu.locks + import java.util.concurrent.locks.LockSupport internal actual object ParkingDelegator { - + actual fun createRef(): ParkingData = Thread.currentThread() actual fun wait(ref: ParkingData, shouldWait: () -> Boolean) = LockSupport.park() actual fun timedWait(ref: ParkingData, nanos: Long, shouldWait: () -> Boolean) = LockSupport.parkNanos(nanos) actual fun wake(ref: ParkingData) = LockSupport.unpark(ref) actual fun destroyRef(ref: ParkingData) {} - + } internal actual typealias ParkingData = Thread \ No newline at end of file diff --git a/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt b/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt index fe50be64..135c3274 100644 --- a/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt +++ b/atomicfu/src/jvmMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt @@ -6,11 +6,13 @@ import kotlin.time.DurationUnit import kotlin.time.TimeMark actual typealias ParkingHandle = Thread + actual object ParkingSupport { actual fun park(timeout: Duration) { if (timeout == Duration.INFINITE) LockSupport.park() else LockSupport.parkNanos(timeout.toLong(DurationUnit.NANOSECONDS)) } + actual fun parkUntil(deadline: TimeMark) = park(deadline.elapsedNow() * -1) actual fun unpark(handle: ParkingHandle) = LockSupport.unpark(handle) actual fun currentThreadHandle(): ParkingHandle = Thread.currentThread() diff --git a/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/locks/TestThread.jvm.kt b/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/locks/TestThread.jvm.kt index e1b3d2b9..3c1e8c99 100644 --- a/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/locks/TestThread.jvm.kt +++ b/atomicfu/src/jvmTest/kotlin/kotlinx/atomicfu/locks/TestThread.jvm.kt @@ -1,8 +1,9 @@ package kotlinx.atomicfu.locks + import kotlin.concurrent.thread internal actual class TestThread actual constructor(toDo: () -> Unit) { - private val th = thread { toDo() } + private val th = thread { toDo() } actual fun join() = th.join() } diff --git a/atomicfu/src/mingwMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt b/atomicfu/src/mingwMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt index 4a302a25..cbb87f50 100644 --- a/atomicfu/src/mingwMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt +++ b/atomicfu/src/mingwMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt @@ -1,9 +1,6 @@ package kotlinx.atomicfu.locks import kotlinx.cinterop.* -import kotlinx.cinterop.alloc -import kotlinx.cinterop.pointed -import kotlinx.cinterop.ptr import platform.posix.* @OptIn(ExperimentalForeignApi::class) @@ -65,4 +62,5 @@ internal actual object ParkingDelegator { nativeHeap.free(ref.cond) } } + internal actual class ParkingData(val mut: CPointer, val cond: CPointer) diff --git a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt index d966d553..7a4c3776 100644 --- a/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt +++ b/atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/ParkingSupport.kt @@ -16,6 +16,7 @@ actual object ParkingSupport { if (timeout == Duration.INFINITE) threadLocalParkingHandle.parker.park() else threadLocalParkingHandle.parker.parkNanos(timeout.toLong(DurationUnit.NANOSECONDS)) } + actual fun parkUntil(deadline: TimeMark) = park(deadline.elapsedNow() * -1) actual fun unpark(handle: ParkingHandle) = handle.parker.unpark() actual fun currentThreadHandle(): ParkingHandle = threadLocalParkingHandle diff --git a/atomicfu/src/nativeUnixLikeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt b/atomicfu/src/nativeUnixLikeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt index 54b7d28c..0c8ddef0 100644 --- a/atomicfu/src/nativeUnixLikeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt +++ b/atomicfu/src/nativeUnixLikeMain/kotlin/kotlinx/atomicfu/locks/PosixParkingDelegator.kt @@ -1,10 +1,6 @@ package kotlinx.atomicfu.locks import kotlinx.cinterop.* -import kotlinx.cinterop.alloc -import kotlinx.cinterop.free -import kotlinx.cinterop.pointed -import kotlinx.cinterop.ptr import platform.posix.* @OptIn(ExperimentalForeignApi::class, UnsafeNumber::class) @@ -31,7 +27,7 @@ internal actual object ParkingDelegator { callAndVerify { pthread_mutex_unlock(ref.mut) } } } - + actual inline fun timedWait(ref: ParkingData, nanos: Long, shouldWait: () -> Boolean): Unit = memScoped { val ts = alloc().ptr