Last active
March 29, 2025 05:36
-
-
Save dilame/4c4540167255346ef4d07ee5a8cfd2dc to your computer and use it in GitHub Desktop.
RxJS <-> GraphQL subscriptions interop with proper resource cleaning
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Observable, Subscriber } from 'rxjs' | |
/** | |
* Converts an `AsyncIterableIterator` into an RxJS `Observable`. | |
* | |
* The function iterates over the async iterator and emits its values | |
* through `observer.next()`. When the subscription is terminated | |
* (via `unsubscribe()`), the iterator's `return()` method is called | |
* (if available) to release resources. | |
* It is important, because native RxJS `from` operator don't do this | |
*/ | |
export function asyncIterableToObservable<T>(iterable: AsyncIterableIterator<T>): Observable<T> { | |
return new Observable<T>((observer: Subscriber<T>) => { | |
const iterator = iterable[Symbol.asyncIterator]() | |
;(async (): Promise<void> => { | |
try { | |
for await (const item of iterator) { | |
if (observer.closed) { | |
return | |
} | |
observer.next(item) | |
} | |
observer.complete() | |
} catch (e) { | |
observer.error(e) | |
} | |
})() | |
return () => { | |
if (typeof iterator.return === 'function') { | |
const finalize = iterator.return | |
;(async (): Promise<void> => { | |
await finalize.call(iterable) | |
})() | |
} | |
} | |
}) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Observable, Subscription } from 'rxjs' | |
type Resolver<T, TReturn> = (result: IteratorResult<T, TReturn>) => void | |
class AsyncQueue<T, TReturn = any, TNext = any> implements AsyncIterableIterator<T, TReturn, TNext> { | |
private values: T[] = [] | |
private resolvers: Resolver<T, TReturn>[] = [] | |
private ended = false | |
private error: unknown = null; | |
[Symbol.asyncIterator](): AsyncIterableIterator<T, TReturn, TNext> { | |
return this | |
} | |
enqueue(value: T): void { | |
if (this.ended) return | |
if (this.resolvers.length > 0) { | |
const resolve = this.resolvers.shift()! | |
resolve({ value, done: false }) | |
} else { | |
this.values.push(value) | |
} | |
} | |
fail(error: unknown): void { | |
if (this.ended) return | |
this.error = error | |
this.finish() | |
} | |
finish(): void { | |
if (this.ended) return | |
this.ended = true | |
while (this.resolvers.length > 0) { | |
const resolve = this.resolvers.shift()! | |
if (this.error) { | |
resolve(Promise.reject(this.error) as never) | |
} else { | |
resolve({ value: undefined as any, done: true }) | |
} | |
} | |
} | |
next(...[value]: [] | [TNext]): Promise<IteratorResult<T, TReturn>> { | |
void value // not used — but signature required | |
if (this.values.length > 0) { | |
const val = this.values.shift()! | |
return Promise.resolve({ value: val, done: false }) | |
} | |
if (this.ended) { | |
if (this.error) { | |
return Promise.reject(this.error) | |
} else { | |
return Promise.resolve({ value: undefined as any, done: true }) | |
} | |
} | |
return new Promise((resolve) => this.resolvers.push(resolve)) | |
} | |
return(value?: TReturn | PromiseLike<TReturn>): Promise<IteratorResult<T, TReturn>> { | |
this.finish() | |
return Promise.resolve({ value: value as TReturn, done: true }) | |
} | |
} | |
/** | |
* This implementation converts an RxJS Observable into an AsyncIterableIterator, | |
* fully compliant with the ECMAScript Async Iterator protocol. | |
* | |
* Motivation: | |
* Originally, I used `rxjs-for-await` to convert observables into async iterables. | |
* However, it lacked a proper `return()` implementation, which made it impossible | |
* to clean up resources (unsubscribe from the observable) when the consumer unsubscribed early. | |
* | |
* Problems with such naive conversions: | |
* 1. Observable subscriptions are not torn down immediately when the consumer stops iteration. | |
* Instead, they stay open until the next value is emitted or the observable completes. | |
* 2. In long-lived or infrequently emitting streams, this leads to memory leaks and wasted resources. | |
* 3. Some implementations may continue delivering values after the consumer has stopped, which is unsafe. | |
* | |
* This implementation addresses all these problems: | |
* - It provides a full AsyncIterableIterator, including `return()` and `[Symbol.asyncIterator]()`. | |
* - As soon as the consumer unsubscribes (via `.return()` or `break` from `for await`), | |
* the Observable subscription is explicitly torn down. | |
* - No further values are delivered after cancellation or completion. | |
* - Errors from the source are properly forwarded to the consumer via `.next()` rejection. | |
* | |
* Internally, an async push/pull queue is implemented: | |
* - Values from the Observable are queued if the consumer hasn't called `.next()` yet. | |
* - If the consumer is awaiting `.next()`, values are delivered immediately. | |
* - Once the Observable completes or errors, all pending `.next()` promises are resolved or rejected. | |
* | |
* This approach ensures safe, deterministic, and memory-leak-free conversion of | |
* RxJS Observables into AsyncIterableIterator, suitable for use in `for await...of`, | |
* GraphQL Subscriptions, and other async stream consumers. | |
*/ | |
export function observableToAsyncIterable<T>(source$: Observable<T>): AsyncIterableIterator<T> { | |
const queue = new AsyncQueue<T>() | |
const subscription: Subscription = source$.subscribe({ | |
next: (value) => queue.enqueue(value), | |
error: (err) => queue.fail(err), | |
complete: () => queue.finish() | |
}) | |
const iterator: AsyncIterableIterator<T> = { | |
[Symbol.asyncIterator]: () => iterator, | |
next: queue.next.bind(queue), | |
return: (value?: any) => { | |
subscription.unsubscribe() | |
return queue.return(value) | |
} | |
} | |
return iterator | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Initially, I used
rxjs-for-await
to convert RxJS observables into async iterables.However, it does not implement the
AsyncIterator.return
method, which can lead tomemory leaks under certain conditions:
Instead, they are only cleaned up after the next event from the source is received.
this delay in cleanup can result in significant resource consumption and memory leaks.
Another undesirable side effect is that the client may still receive one more event
even after it has unsubscribed.
To address these issues:
return()
in this custom converter, ensuring that resourcesare cleaned up as soon as the client unsubscribes, regardless of source activity.
once it unsubscribes, preventing unexpected post-unsubscription events.