Skip to content

Instantly share code, notes, and snippets.

@groue
Last active January 22, 2025 20:01
Show Gist options
  • Save groue/289d8b0c4d58ffc2cd38964e3d2fa206 to your computer and use it in GitHub Desktop.
Save groue/289d8b0c4d58ffc2cd38964e3d2fa206 to your computer and use it in GitHub Desktop.
Observation+Utils.swift
import Combine
import Dispatch
import Foundation
import Observation
/// A LockedValue protects a value with an NSLock.
private final class LockedValue<T> {
private var value: T
private var lock = NSLock()
public init(_ value: T) {
self.value = value
}
/// Runs the provided closure while holding a lock on the value.
///
/// - parameter body: A closure that can modify the value.
public func withLock<U>(_ body: (inout T) throws -> U) rethrows -> U {
lock.lock()
defer { lock.unlock() }
return try body(&value)
}
}
extension LockedValue: @unchecked Sendable where T: Sendable { }
/// Tracks access to observable properties.
///
/// This method tracks access to any observable property within the `apply`
/// closure, and calls that closure again after properties have been
/// modified, until the returned cancellable is cancelled.
///
/// For example, the following code prints all changes to the name of the
/// observed player:
///
/// ```swift
/// let player: Player = ...
/// let cancellable = observe {
/// print(player.name)
/// }
/// ```
///
/// - important: Modifications performed from the `apply` closure are
/// not notified.
///
/// - Parameter apply: A closure that contains properties to track.
/// - Returns: A cancellable that stops the observation.
@MainActor public func observe(_ apply: @escaping @MainActor () -> Void) -> AnyCancellable {
let cancelled = LockedValue<Bool>(false)
let cancellable = AnyCancellable {
cancelled.withLock { $0 = true }
}
observe(until: cancelled, apply)
return cancellable
}
/// Returns a stream of values built from observable properties.
///
/// The returned stream immediately emits the result of the `value` closure,
/// and tracks access to any observable property. It emits a new value after
/// properties have been modified, until the iteration is ended.
///
/// For example, the following code prints all changes to the name of the
/// observed player:
///
/// ```swift
/// Task {
/// let player: Player = ...
/// let names = makeObservationStream { player.name }
/// for await name in names {
/// print(name)
/// }
/// }
/// ```
///
/// - important: Modifications performed from the `value` closure are
/// not notified.
///
/// - Parameter value: A closure that returns a value to track.
/// - Returns: A stream of tracked values.
@MainActor public func makeObservationStream<T>(_ value: @escaping @MainActor () -> sending T) -> AsyncStream<T> {
let (stream, continuation) = AsyncStream.makeStream(of: T.self, bufferingPolicy: .bufferingNewest(1))
let cancelled = LockedValue<Bool>(false)
continuation.onTermination = { termination in
cancelled.withLock { $0 = true }
}
observe(until: cancelled) {
continuation.yield(value())
}
return stream
}
@MainActor private func observe(until cancelled: LockedValue<Bool>, _ apply: @escaping @MainActor () -> Void) {
withObservationTracking {
if cancelled.withLock({ $0 }) {
return
}
apply()
} onChange: {
if cancelled.withLock({ $0 }) {
return
}
DispatchQueue.main.async {
observe(until: cancelled, apply)
}
}
}
import Combine
import Dispatch
import Observation
import Testing
@Observable private class Model {
var value: Int
init(value: Int) {
self.value = value
}
}
@MainActor @Suite struct ObservationTests {
@Test func observationOfOneChange() async throws {
await confirmation(expectedCount: 2) { modelIsObserved in
var cancellable: AnyCancellable?
await withCheckedContinuation { continuation in
let model = Model(value: 0)
cancellable = observe {
modelIsObserved()
if model.value == 1 {
continuation.resume()
}
}
// Perform the change
model.value = 1
}
withExtendedLifetime(cancellable) { }
}
}
@Test func observationOfTwoChanges() async throws {
await confirmation(expectedCount: 3) { modelIsObserved in
var cancellable: AnyCancellable?
await withCheckedContinuation { continuation in
let model = Model(value: 0)
cancellable = observe {
modelIsObserved()
if model.value == 2 {
continuation.resume()
}
}
// Perform 1st change
model.value = 1
// Perform 2nd change
DispatchQueue.main.async {
model.value = 2
}
}
withExtendedLifetime(cancellable) { }
}
}
@Test func observationStream_no_change() async throws {
await confirmation(expectedCount: 1) { modelIsObserved in
let model = Model(value: 0)
let stream = makeObservationStream {
model.value
}
for await value in stream {
modelIsObserved()
switch value {
case 0:
return
default:
Issue.record("Unexpected value \(value)")
return
}
}
}
}
@Test func observationStream_one_change() async throws {
await confirmation(expectedCount: 2) { modelIsObserved in
let model = Model(value: 0)
let stream = makeObservationStream {
model.value
}
for await value in stream {
modelIsObserved()
switch value {
case 0:
model.value = 1
case 1:
return
default:
Issue.record("Unexpected value \(value)")
return
}
}
}
}
@Test func observationStream_two_changes() async throws {
await confirmation(expectedCount: 3) { modelIsObserved in
let model = Model(value: 0)
let stream = makeObservationStream {
model.value
}
for await value in stream {
modelIsObserved()
switch value {
case 0:
model.value = 1
case 1:
model.value = 2
case 2:
return
default:
Issue.record("Unexpected value \(value)")
return
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment