Skip to content

Instantly share code, notes, and snippets.

@dilame
Last active March 29, 2025 05:36
Show Gist options
  • Save dilame/4c4540167255346ef4d07ee5a8cfd2dc to your computer and use it in GitHub Desktop.
Save dilame/4c4540167255346ef4d07ee5a8cfd2dc to your computer and use it in GitHub Desktop.
RxJS <-> GraphQL subscriptions interop with proper resource cleaning
import { Observable, Subscriber } from 'rxjs'
/**
* Converts an `AsyncIterableIterator` into an RxJS `Observable`.
*
* The function iterates over the async iterator and emits its values
* through `observer.next()`. When the subscription is terminated
* (via `unsubscribe()`), the iterator's `return()` method is called
* (if available) to release resources.
* It is important, because native RxJS `from` operator don't do this
*/
export function asyncIterableToObservable<T>(iterable: AsyncIterableIterator<T>): Observable<T> {
return new Observable<T>((observer: Subscriber<T>) => {
const iterator = iterable[Symbol.asyncIterator]()
;(async (): Promise<void> => {
try {
for await (const item of iterator) {
if (observer.closed) {
return
}
observer.next(item)
}
observer.complete()
} catch (e) {
observer.error(e)
}
})()
return () => {
if (typeof iterator.return === 'function') {
const finalize = iterator.return
;(async (): Promise<void> => {
await finalize.call(iterable)
})()
}
}
})
}
import { Observable, Subscription } from 'rxjs'
type Resolver<T, TReturn> = (result: IteratorResult<T, TReturn>) => void
class AsyncQueue<T, TReturn = any, TNext = any> implements AsyncIterableIterator<T, TReturn, TNext> {
private values: T[] = []
private resolvers: Resolver<T, TReturn>[] = []
private ended = false
private error: unknown = null;
[Symbol.asyncIterator](): AsyncIterableIterator<T, TReturn, TNext> {
return this
}
enqueue(value: T): void {
if (this.ended) return
if (this.resolvers.length > 0) {
const resolve = this.resolvers.shift()!
resolve({ value, done: false })
} else {
this.values.push(value)
}
}
fail(error: unknown): void {
if (this.ended) return
this.error = error
this.finish()
}
finish(): void {
if (this.ended) return
this.ended = true
while (this.resolvers.length > 0) {
const resolve = this.resolvers.shift()!
if (this.error) {
resolve(Promise.reject(this.error) as never)
} else {
resolve({ value: undefined as any, done: true })
}
}
}
next(...[value]: [] | [TNext]): Promise<IteratorResult<T, TReturn>> {
void value // not used — but signature required
if (this.values.length > 0) {
const val = this.values.shift()!
return Promise.resolve({ value: val, done: false })
}
if (this.ended) {
if (this.error) {
return Promise.reject(this.error)
} else {
return Promise.resolve({ value: undefined as any, done: true })
}
}
return new Promise((resolve) => this.resolvers.push(resolve))
}
return(value?: TReturn | PromiseLike<TReturn>): Promise<IteratorResult<T, TReturn>> {
this.finish()
return Promise.resolve({ value: value as TReturn, done: true })
}
}
/**
* This implementation converts an RxJS Observable into an AsyncIterableIterator,
* fully compliant with the ECMAScript Async Iterator protocol.
*
* Motivation:
* Originally, I used `rxjs-for-await` to convert observables into async iterables.
* However, it lacked a proper `return()` implementation, which made it impossible
* to clean up resources (unsubscribe from the observable) when the consumer unsubscribed early.
*
* Problems with such naive conversions:
* 1. Observable subscriptions are not torn down immediately when the consumer stops iteration.
* Instead, they stay open until the next value is emitted or the observable completes.
* 2. In long-lived or infrequently emitting streams, this leads to memory leaks and wasted resources.
* 3. Some implementations may continue delivering values after the consumer has stopped, which is unsafe.
*
* This implementation addresses all these problems:
* - It provides a full AsyncIterableIterator, including `return()` and `[Symbol.asyncIterator]()`.
* - As soon as the consumer unsubscribes (via `.return()` or `break` from `for await`),
* the Observable subscription is explicitly torn down.
* - No further values are delivered after cancellation or completion.
* - Errors from the source are properly forwarded to the consumer via `.next()` rejection.
*
* Internally, an async push/pull queue is implemented:
* - Values from the Observable are queued if the consumer hasn't called `.next()` yet.
* - If the consumer is awaiting `.next()`, values are delivered immediately.
* - Once the Observable completes or errors, all pending `.next()` promises are resolved or rejected.
*
* This approach ensures safe, deterministic, and memory-leak-free conversion of
* RxJS Observables into AsyncIterableIterator, suitable for use in `for await...of`,
* GraphQL Subscriptions, and other async stream consumers.
*/
export function observableToAsyncIterable<T>(source$: Observable<T>): AsyncIterableIterator<T> {
const queue = new AsyncQueue<T>()
const subscription: Subscription = source$.subscribe({
next: (value) => queue.enqueue(value),
error: (err) => queue.fail(err),
complete: () => queue.finish()
})
const iterator: AsyncIterableIterator<T> = {
[Symbol.asyncIterator]: () => iterator,
next: queue.next.bind(queue),
return: (value?: any) => {
subscription.unsubscribe()
return queue.return(value)
}
}
return iterator
}
@dilame
Copy link
Author

dilame commented Nov 19, 2024

Initially, I used rxjs-for-await to convert RxJS observables into async iterables.
However, it does not implement the AsyncIterator.return method, which can lead to
memory leaks under certain conditions:

  1. Resources tied to a subscription are not cleaned up immediately when the client unsubscribes.
    Instead, they are only cleaned up after the next event from the source is received.
  2. In cases where there are multiple sources that emit events infrequently,
    this delay in cleanup can result in significant resource consumption and memory leaks.

Another undesirable side effect is that the client may still receive one more event
even after it has unsubscribed.

To address these issues:

  • I implemented return() in this custom converter, ensuring that resources
    are cleaned up as soon as the client unsubscribes, regardless of source activity.
  • The implementation ensures that no further events are delivered to the client
    once it unsubscribes, preventing unexpected post-unsubscription events.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment