Last active
March 16, 2025 12:01
-
-
Save thomsmed/fa31e8aab769576dc64621e55b8009d0 to your computer and use it in GitHub Desktop.
A utility type for letting multiple caller Tasks wait for a future value, whom exact arrival is unknown and will be manually triggered some time in the future.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// PendingValue.swift | |
// | |
import Foundation | |
/// A utility type for letting multiple caller Tasks wait for a future value, whom exact arrival is unknown and will be manually triggered some time in the future. | |
public final class PendingValue<T: Sendable>: @unchecked Sendable { | |
private let lock = NSLock() | |
private var result: Result<T, any Error>? | |
private var waitingContinuations: [UUID: CheckedContinuation<T, any Error>] = [:] | |
public init() {} | |
deinit { | |
assert(waitingContinuations.isEmpty, "Expecting all pending continuations to be fulfilled") | |
} | |
/// Wait for the future value. Task cancelation is handled as expected. | |
public func value() async throws -> T { | |
let id = UUID() | |
return try await withTaskCancellationHandler { | |
try await withCheckedThrowingContinuation { continuation in | |
do { | |
let readyResult: Result<T, any Error>? = try lock.withLock { | |
try Task.checkCancellation() | |
if let readyResult = self.result { | |
return readyResult | |
} else { | |
self.waitingContinuations[id] = continuation | |
return nil | |
} | |
} | |
if let readyResult { | |
// Must not resume the continuation while holding the lock! | |
continuation.resume(with: readyResult) | |
} | |
} catch { | |
continuation.resume(throwing: error) | |
} | |
} | |
} onCancel: { | |
let continuation = lock.withLock { | |
self.waitingContinuations.removeValue(forKey: id) | |
} | |
// Must not resume the continuation while holding the lock! | |
continuation?.resume(throwing: CancellationError()) | |
} | |
} | |
/// Resolve this ``PendingValue`` by returning `result`. Waiting Tasks will be notified immediately. | |
public func resolve(with result: Result<T, any Error>) { | |
let continuations = lock.withLock { | |
if self.result != nil { | |
// Should only be allowed to resolve once. | |
return [CheckedContinuation<T, any Error>]() | |
} | |
self.result = result | |
let continuations = self.waitingContinuations.map { $1 } | |
self.waitingContinuations.removeAll() | |
return continuations | |
} | |
// Must not resume the continuations while holding the lock! | |
for continuation in continuations { | |
continuation.resume(with: result) | |
} | |
} | |
} | |
public extension PendingValue { | |
/// Resolve this ``PendingValue`` by returning `value`. Waiting Tasks will be notified immediately. | |
func resolve(returning value: T) { | |
resolve(with: .success(value)) | |
} | |
/// Resolve this ``PendingValue`` by throwing `error`. Waiting Tasks will be notified immediately. | |
func resolve(throwing error: any Error) { | |
resolve(with: .failure(error)) | |
} | |
/// Resolve this ``PendingValue`` by throwing ``CancellationError``. Waiting Tasks will be notified immediately. | |
func cancel() { | |
resolve(throwing: CancellationError()) | |
} | |
} | |
public extension PendingValue where T == Void { | |
/// Resolve this ``PendingValue``. Waiting Tasks will be notified immediately. | |
func resolve() { | |
resolve(returning: Void()) | |
} | |
} | |
// MARK: Usage/Tests | |
import Testing | |
struct PendingValueTests { | |
@Test func test_instantCancelation() async throws { | |
let pendingValue = PendingValue<Void>() | |
pendingValue.cancel() | |
await #expect(throws: CancellationError.self) { | |
_ = try await pendingValue.value() | |
} | |
} | |
@Test func test_cancelation() async throws { | |
let pendingValue = PendingValue<Void>() | |
try await Task.sleep(for: .seconds(1)) | |
pendingValue.cancel() | |
await #expect(throws: CancellationError.self) { | |
_ = try await pendingValue.value() | |
} | |
} | |
@Test func test_immediateCancelationAfterImmediateResolve() async throws { | |
let pendingValue = PendingValue<Void>() | |
pendingValue.resolve() // In a series of immediate call to resolve/cancel, the first one wins | |
pendingValue.cancel() | |
pendingValue.resolve() | |
_ = try await pendingValue.value() | |
} | |
@Test func test_cancelationAfterResolve() async throws { | |
let pendingValue = PendingValue<Void>() | |
try await Task.sleep(for: .seconds(1)) | |
pendingValue.resolve() | |
pendingValue.cancel() // Canceling should have no effect at this point | |
_ = try await pendingValue.value() | |
} | |
@Test func test_multipleCancelationAfterResolve() async throws { | |
let pendingValue = PendingValue<Void>() | |
try await Task.sleep(for: .seconds(1)) | |
pendingValue.resolve() | |
pendingValue.cancel() // Canceling should have no effect at this point | |
pendingValue.cancel() | |
pendingValue.cancel() | |
_ = try await pendingValue.value() | |
} | |
@Test func test_multipleCancelationAfterMultipleResolve() async throws { | |
let pendingValue = PendingValue<Void>() | |
try await Task.sleep(for: .seconds(1)) | |
pendingValue.resolve() // Only the first resolve should have an effect at this point | |
pendingValue.resolve() | |
pendingValue.resolve() | |
pendingValue.cancel() // Canceling should have no effect at this point | |
pendingValue.cancel() | |
pendingValue.cancel() | |
_ = try await pendingValue.value() | |
} | |
@Test func test_immediateResolveAfterImmediateCancelation() async throws { | |
let pendingValue = PendingValue<Void>() | |
pendingValue.cancel() | |
pendingValue.resolve() // Resolve should have no effect at this point | |
await #expect(throws: CancellationError.self) { | |
_ = try await pendingValue.value() | |
} | |
} | |
@Test func test_multipleImmediateResolveAfterImmediateCancelation() async throws { | |
let pendingValue = PendingValue<Void>() | |
pendingValue.cancel() | |
pendingValue.resolve() // Resolve should have no effect at this point | |
pendingValue.resolve() | |
pendingValue.resolve() | |
await #expect(throws: CancellationError.self) { | |
_ = try await pendingValue.value() | |
} | |
} | |
@Test func test_immediateResolve() async throws { | |
let pendingValue = PendingValue<Void>() | |
pendingValue.resolve() | |
_ = try await pendingValue.value() | |
} | |
@Test func test_multipleImmediateResolve() async throws { | |
let pendingValue = PendingValue<String>() | |
pendingValue.resolve(returning: "First call to resolve") // In a series of immediate call to resolve, the first one wins | |
pendingValue.resolve(returning: "Second call to resolve") | |
pendingValue.resolve(returning: "Third call to resolve") | |
let value = try await pendingValue.value() | |
#expect(value == "First call to resolve") | |
} | |
@Test func test_immediateResolveReturningString() async throws { | |
let pendingValue = PendingValue<String>() | |
pendingValue.resolve(returning: "Hello World!") | |
let value = try await pendingValue.value() | |
#expect(value == "Hello World!") | |
} | |
@Test func test_resolveReturningString() async throws { | |
let pendingValue = PendingValue<String>() | |
try await Task.sleep(for: .seconds(1)) | |
pendingValue.resolve(returning: "Hello World!") | |
let value = try await pendingValue.value() | |
#expect(value == "Hello World!") | |
} | |
@Test func test_immediateResolveThrowing() async throws { | |
struct MyError: Error {} | |
let pendingValue = PendingValue<String>() | |
pendingValue.resolve(throwing: MyError()) | |
await #expect(throws: MyError.self) { | |
_ = try await pendingValue.value() | |
} | |
} | |
@Test func test_resolveThrowing() async throws { | |
struct MyError: Error {} | |
let pendingValue = PendingValue<String>() | |
try await Task.sleep(for: .seconds(1)) | |
pendingValue.resolve(throwing: MyError()) | |
await #expect(throws: MyError.self) { | |
_ = try await pendingValue.value() | |
} | |
} | |
@Test func test_immediateResolveThrowingAndResolveReturningString() async throws { | |
struct MyError: Error {} | |
struct MyOtherError: Error {} | |
let pendingValue = PendingValue<String>() | |
pendingValue.resolve(throwing: MyError()) // In a series of immediate call to resolve, the first one wins | |
pendingValue.resolve(returning: "Hello World!") | |
pendingValue.resolve(throwing: MyOtherError()) | |
await #expect(throws: MyError.self) { | |
_ = try await pendingValue.value() | |
} | |
} | |
@Test func test_parallelResolveThrowing() async throws { | |
struct MyError: Error {} | |
let pendingValue = PendingValue<String>() | |
try await withThrowingTaskGroup(of: Void.self) { group in | |
group.addTask { | |
pendingValue.resolve(throwing: MyError()) | |
} | |
group.addTask { | |
pendingValue.resolve(throwing: MyError()) | |
} | |
group.addTask { | |
pendingValue.resolve(throwing: MyError()) | |
} | |
try await group.waitForAll() | |
} | |
await #expect(throws: MyError.self) { | |
_ = try await pendingValue.value() | |
} | |
} | |
@Test func test_parallelResolveThrowingAndWaiting() async throws { | |
struct MyError: Error {} | |
let pendingValue = PendingValue<String>() | |
try await withThrowingTaskGroup(of: Void.self) { group in | |
group.addTask { | |
pendingValue.resolve(throwing: MyError()) | |
} | |
group.addTask { | |
pendingValue.resolve(throwing: MyError()) | |
} | |
group.addTask { | |
pendingValue.resolve(throwing: MyError()) | |
} | |
group.addTask { | |
await #expect(throws: MyError.self) { | |
_ = try await pendingValue.value() | |
} | |
} | |
try await group.waitForAll() | |
} | |
} | |
@Test func test_parallelResolveAndWaiting() async throws { | |
struct MyError: Error {} | |
let pendingValue = PendingValue<String>() | |
try await withThrowingTaskGroup(of: Void.self) { group in | |
group.addTask { | |
let value = try? await pendingValue.value() | |
#expect(value == "Hello World!") | |
} | |
group.addTask { | |
pendingValue.resolve(returning: "Hello World!") | |
} | |
group.addTask { | |
let value = try? await pendingValue.value() | |
#expect(value == "Hello World!") | |
} | |
try await group.waitForAll() | |
} | |
} | |
@Test func test_immediateResolveWithParallelWaiting() async throws { | |
struct MyError: Error {} | |
let pendingValue = PendingValue<String>() | |
pendingValue.resolve(returning: "Hello World!") | |
try await withThrowingTaskGroup(of: Void.self) { group in | |
group.addTask { | |
let value = try await pendingValue.value() | |
#expect(value == "Hello World!") | |
} | |
group.addTask { | |
let value = try await pendingValue.value() | |
#expect(value == "Hello World!") | |
} | |
try await group.waitForAll() | |
} | |
} | |
@Test func test_parallelDelayedResolveAndWaiting() async throws { | |
struct MyError: Error {} | |
let pendingValue = PendingValue<String>() | |
try await withThrowingTaskGroup(of: Void.self) { group in | |
group.addTask { | |
let value = try await pendingValue.value() | |
#expect(value == "Hello World!") | |
} | |
group.addTask { | |
try await Task.sleep(for: .seconds(1)) | |
pendingValue.resolve(returning: "Hello World!") | |
} | |
group.addTask { | |
let value = try await pendingValue.value() | |
#expect(value == "Hello World!") | |
} | |
try await group.waitForAll() | |
} | |
} | |
@Test func test_parallelWaitingForString() async throws { | |
struct MyError: Error {} | |
let pendingValue = PendingValue<String>() | |
async let values = withThrowingTaskGroup(of: String.self, returning: [String].self) { group in | |
group.addTask { | |
try await pendingValue.value() | |
} | |
group.addTask { | |
try await pendingValue.value() | |
} | |
var values: [String] = [] | |
for try await value in group { | |
values.append(value) | |
} | |
return values | |
} | |
try await Task.sleep(for: .seconds(1)) | |
pendingValue.resolve(returning: "Hello World!") | |
#expect(try await values == ["Hello World!", "Hello World!"]) | |
} | |
@Test func test_parallelWaitingForCancelation() async throws { | |
struct MyError: Error {} | |
let pendingValue = PendingValue<String>() | |
async let nothing: Void = #expect(throws: CancellationError.self) { | |
_ = try await withThrowingTaskGroup(of: String.self, returning: [String].self) { group in | |
group.addTask { | |
try await pendingValue.value() | |
} | |
group.addTask { | |
try await pendingValue.value() | |
} | |
var values: [String] = [] | |
for try await value in group { | |
values.append(value) | |
} | |
return values | |
} | |
} | |
async let anotherNothing: Void = Task.sleep(for: .seconds(1)) | |
pendingValue.cancel() | |
_ = try await (nothing, anotherNothing) | |
} | |
@Test func test_waitingForCancelation() async throws { | |
struct MyError: Error {} | |
let pendingValue = PendingValue<String>() | |
async let nothing: Void = #expect(throws: CancellationError.self) { | |
try await withThrowingTaskGroup(of: String.self) { group in | |
group.addTask { | |
try await pendingValue.value() | |
} | |
group.addTask { | |
try await pendingValue.value() | |
} | |
group.cancelAll() | |
try await group.waitForAll() | |
} | |
} | |
async let anotherNothing: Void = Task.sleep(for: .seconds(1)) | |
_ = try await (nothing, anotherNothing) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment