Created
December 23, 2020 18:41
-
-
Save fmo91/06510821a65fe798e02b485567ff50eb to your computer and use it in GitHub Desktop.
EventStream implementation. It's something similar to Observable<Element> from RxSwift but just as a proof of concept.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Foundation | |
public final class EventStream<Element> { | |
private(set) var handlers: [StreamSubscription: StreamHandler<Element>] = [:] | |
private var eventsQueue: [Event<Element>] = [] | |
private var isEnabled: Bool = true | |
private init() {} | |
public static func create(_ handler: @escaping (StreamHandler<Element>) -> Void) -> EventStream<Element> { | |
let stream = EventStream<Element>() | |
handler(.init({ (event: Event<Element>) in | |
stream.sendIfPossible(event: event) | |
})) | |
return stream | |
} | |
public static func just(_ value: Element) -> EventStream<Element> { | |
return .create { | |
$0.send(.next(value)) | |
$0.send(.completed) | |
} | |
} | |
public static func error(_ errorValue: Swift.Error) -> EventStream<Element> { | |
return .create { | |
$0.send(.error(errorValue)) | |
} | |
} | |
private func sendIfPossible(event: Event<Element>) { | |
guard isEnabled else { | |
return | |
} | |
if handlers.isEmpty { | |
eventsQueue.append(event) | |
} else { | |
send(event: event) | |
} | |
} | |
private func send(event: Event<Element>) { | |
defer { | |
if event.isCompletion { | |
isEnabled = false | |
} | |
} | |
handlers.values.forEach { (handler: StreamHandler<Element>) in | |
handler.send(event) | |
} | |
} | |
private func dispose(withId id: UUID) { | |
self.handlers.removeValue(forKey: StreamSubscription(generator: { id })) | |
} | |
private func streamEventsQueue() { | |
for event in eventsQueue { | |
guard isEnabled else { | |
break | |
} | |
send(event: event) | |
} | |
eventsQueue.removeAll() | |
} | |
public func subscribe(handler: StreamHandler<Element>) -> StreamSubscription { | |
let subscriptionIdentifier = StreamSubscription(dispose: dispose(withId:)) | |
handlers[subscriptionIdentifier] = handler | |
streamEventsQueue() | |
return subscriptionIdentifier | |
} | |
public func subscribe( | |
onNext: @escaping (Element) -> Void = { _ in }, | |
onError: @escaping (Swift.Error) -> Void = { _ in }, | |
onComplete: @escaping () -> Void = {} | |
) -> StreamSubscription { | |
return subscribe( | |
handler: StreamHandler<Element>( | |
onNext: onNext, | |
onError: onError, | |
onComplete: onComplete | |
) | |
) | |
} | |
} | |
public struct StreamSubscription: Hashable { | |
let id: UUID | |
let dispose: (UUID) -> Void | |
init(dispose: @escaping (UUID) -> Void = {_ in}, generator: () -> UUID = UUID.init) { | |
self.dispose = dispose | |
self.id = generator() | |
} | |
public func hash(into hasher: inout Hasher) { | |
hasher.combine(id) | |
} | |
public static func ==(lhs: StreamSubscription, rhs: StreamSubscription) -> Bool { | |
return lhs.id == rhs.id | |
} | |
} | |
public struct StreamHandler<Element> { | |
private let handler: (Event<Element>) -> Void | |
public init(_ handler: @escaping (Event<Element>) -> Void) { | |
self.handler = handler | |
} | |
public init( | |
onNext: @escaping (Element) -> Void = { _ in }, | |
onError: @escaping (Swift.Error) -> Void = { _ in }, | |
onComplete: @escaping () -> Void = {} | |
) { | |
handler = { event in | |
switch event { | |
case .next(let element): | |
onNext(element) | |
case .error(let error): | |
onError(error) | |
case .completed: | |
onComplete() | |
} | |
} | |
} | |
public func send(_ event: Event<Element>) { | |
handler(event) | |
} | |
} | |
public enum Event<Element> { | |
case next(Element) | |
case completed | |
case error(Swift.Error) | |
var isCompletion: Bool { | |
switch self { | |
case .completed, .error: | |
return true | |
case .next: | |
return false | |
} | |
} | |
func map<NewElement>(f: (Element) -> NewElement) -> Event<NewElement> { | |
switch self { | |
case .next(let element): | |
return .next(f(element)) | |
case .error(let error): | |
return .error(error) | |
case .completed: | |
return .completed | |
} | |
} | |
} | |
extension EventStream { | |
public func map<NewElement>(_ transform: @escaping (Element) -> NewElement) -> EventStream<NewElement> { | |
return EventStream<NewElement>.create { (handler: StreamHandler<NewElement>) in | |
_ = self.subscribe( | |
onNext: { value in | |
handler.send(.next(transform(value))) | |
}, | |
onError: { error in | |
handler.send(.error(error)) | |
}, | |
onComplete: { | |
handler.send(.completed) | |
} | |
) | |
} | |
} | |
public func flatMap<NewElement>(_ transform: @escaping (Element) -> EventStream<NewElement>) -> EventStream<NewElement> { | |
return .create { handler in | |
_ = self.subscribe( | |
onNext: { value in | |
let newStream = transform(value) | |
_ = newStream.subscribe(handler: handler) | |
}, | |
onError: { error in | |
handler.send(.error(error)) | |
}, | |
onComplete: { | |
handler.send(.completed) | |
} | |
) | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import UIKit | |
import PlaygroundSupport | |
let stream = EventStream<Int>.create { (handler: StreamHandler<Int>) in | |
handler.send(.next(10)) | |
handler.send(.next(20)) | |
handler.send(.next(30)) | |
} | |
func createDelayedDescription(for number: Int) -> EventStream<String> { | |
return .create { handler in | |
DispatchQueue.main.asyncAfter(deadline: .now() + 3.0) { | |
handler.send(.next("Number is: \(number)")) | |
} | |
} | |
} | |
print("Starting!") | |
let subscription = stream | |
.map { $0 * 2 } | |
.map { $0 - 12 } | |
.flatMap(createDelayedDescription(for:)) | |
.subscribe( | |
onNext: { value in | |
print(value) | |
}, | |
onComplete: { | |
print("COMPLETED!") | |
} | |
) | |
PlaygroundPage.current.needsIndefiniteExecution = true | |
// It prints... | |
// | |
// Starting! | |
// Number is: 8 | |
// Number is: 28 | |
// Number is: 48 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment