Skip to content

Instantly share code, notes, and snippets.

@lucas-barake
Last active June 16, 2025 20:06
Show Gist options
  • Save lucas-barake/d80e87c3532d6a3cafe48a65c9441682 to your computer and use it in GitHub Desktop.
Save lucas-barake/d80e87c3532d6a3cafe48a65c9441682 to your computer and use it in GitHub Desktop.
How I Handle WebSockets in Web Apps
import { Effect, type Schema } from "effect";
export type EventHandler<A, I, X> = {
readonly channel: `/${string}/`;
readonly schema: Schema.Schema<A, I>;
readonly handle: (data: A) => Effect.Effect<void, X, never>;
};
export type InfallibleEventHandler<A, I> = {
[K in keyof EventHandler<A, I, never>]: EventHandler<A, I, never>[K];
};
export const createEventHandler = <A, I, X>(
handler: EventHandler<A, I, X>
): InfallibleEventHandler<A, I> => ({
channel: handler.channel,
schema: handler.schema,
handle: (data) => handler.handle(data).pipe(Effect.catchAllCause(() => Effect.void)),
});
import { createEventHandler } from "./event-handler.ts"
import { Effect, Schema } from "effect"
const testHandler = createEventHandler({
channel: "/test/",
schema: Schema.Struct({
test: Schema.String,
}),
handle: (data) => Effect.log(data),
})
export const eventHandlers = [testHandler] as const
export type LiveEventHandler = (typeof eventHandlers)[number]
import { Chunk, Effect, Stream, SubscriptionRef } from "effect"
export class NetworkMonitor extends Effect.Service<NetworkMonitor>()(
"NetworkMonitor",
{
effect: Effect.gen(function* () {
const latch = yield* Effect.makeLatch(window.navigator.onLine)
const ref = yield* SubscriptionRef.make<boolean>(window.navigator.onLine)
yield* Stream.async<boolean>((emit) => {
window.addEventListener("online", () =>
emit(Effect.succeed(Chunk.of(true)))
)
window.addEventListener("offline", () =>
emit(Effect.succeed(Chunk.of(false)))
)
}).pipe(
Stream.tap((isOnline) =>
(isOnline ? latch.open : latch.close).pipe(
Effect.zipRight(SubscriptionRef.update(ref, () => isOnline))
)
),
Stream.runDrain,
Effect.forkDaemon
)
return { latch, ref }
}),
accessors: true,
}
) {}
import {
Cause,
Effect,
HashMap,
Option,
PubSub,
Schedule,
Schema,
Stream,
} from "effect"
import { SubscriptionId } from "./types.ts"
import type { LiveEventHandler } from "./event-handlers.ts"
export class SubscriptionManager extends Effect.Service<SubscriptionManager>()(
"SubscriptionManager",
{
effect: Effect.gen(function* () {
const subscriptions = HashMap.empty<SubscriptionId, LiveEventHandler>()
const successPubSub = yield* PubSub.unbounded<SubscriptionId>()
const subscribe = ({
handler,
send,
}: {
handler: LiveEventHandler
send: (
message: Record<string, unknown>
) => Effect.Effect<void, never, never>
}) =>
Effect.gen(function* () {
const id = SubscriptionId.make(crypto.randomUUID())
HashMap.set(subscriptions, id, handler)
yield* send({
id,
type: "subscribe",
channel: handler.channel,
})
yield* Effect.logInfo(
`sent subscription request for ${handler.channel}`
)
yield* Stream.fromPubSub(successPubSub).pipe(
Stream.filter((successId) => successId === id),
Stream.take(1),
Stream.timeoutFail(() => new Cause.TimeoutException(), "5 seconds"),
Stream.tapError(() =>
Effect.logInfo(
`failed to subscribe to ${handler.channel} with ID ${id}`
)
),
Stream.onEnd(
Effect.addFinalizer(() => send({ type: "unsubscribe", id }))
),
Stream.runDrain
)
}).pipe(
Effect.retry({
schedule: Schedule.jittered(Schedule.spaced("400 millis")),
})
)
const confirmSubscription = (id: SubscriptionId) =>
PubSub.publish(successPubSub, id)
const handleEvent = (id: SubscriptionId, event: unknown) =>
Effect.gen(function* () {
const sub = HashMap.get(subscriptions, id)
if (Option.isNone(sub)) return
const payload = yield* Effect.option(
Schema.decodeUnknown(sub.value.schema)(event)
)
if (Option.isNone(payload)) return
yield* sub.value.handle(payload.value)
})
return {
subscribe,
confirmSubscription,
handleEvent,
}
}),
}
) {}
import {
Cause,
Chunk,
Duration,
Effect,
Fiber,
ManagedRuntime,
Option,
pipe,
Schedule,
Schema,
Stream,
} from "effect"
import {
AppSyncMessage,
ConnectionAck,
Event,
KeepAlive,
SubscribeSuccess,
} from "./types.ts"
import { SubscriptionManager } from "./subscription-manager.ts"
import { eventHandlers } from "./event-handlers.ts"
import React from "react"
import { NetworkMonitor } from "./network-monitor.ts"
export class Subscriptions extends Effect.Service<Subscriptions>()(
"Subscriptions",
{
accessors: true,
dependencies: [SubscriptionManager.Default, NetworkMonitor.Default],
effect: Effect.gen(function* () {
const subscriptionManager = yield* SubscriptionManager
const networkMonitor = yield* NetworkMonitor
const setup = Effect.gen(function* () {
const ws = new WebSocket("ws://")
yield* Effect.addFinalizer(() =>
Effect.sync(() => {
ws.close()
})
)
ws.onopen = () => {
ws.send(
JSON.stringify({
type: "connection_init",
})
)
}
const source = yield* Stream.async<
AppSyncMessage | null,
Cause.TimeoutException
>((emit) => {
ws.addEventListener("message", (e) => {
void emit(
pipe(
e.data,
Schema.decodeUnknown(AppSyncMessage),
Effect.map((data) => Chunk.make(data)),
Effect.catchTag("ParseError", () =>
Effect.succeed(Chunk.make(null))
)
)
)
})
ws.addEventListener("error", () => {
void emit(Effect.fail(Option.some(new Cause.TimeoutException())))
})
ws.addEventListener("close", () => {
void emit(Effect.fail(Option.some(new Cause.TimeoutException())))
})
}).pipe(
Stream.share({
capacity: "unbounded",
})
)
const subscribeToChannels = (connectionTimeout: Duration.Duration) =>
Effect.gen(function* () {
yield* Effect.logInfo(
`connection_ack received with ack timeout: ${Duration.toSeconds(connectionTimeout)}s`
)
const subscribeToChannels = Effect.forEach(
eventHandlers,
(handler) =>
subscriptionManager.subscribe({
handler,
send: (payload) =>
Effect.sync(() => ws.send(JSON.stringify(payload))),
}),
{
concurrency: "unbounded",
}
)
const keepAliveStream = source.pipe(
Stream.filter(Schema.is(KeepAlive)),
Stream.timeoutFail(
() => new Cause.TimeoutException("KeepAlive"),
connectionTimeout
),
Stream.tapError(() =>
Effect.logInfo(
"keepAlive timeout exceeded - recreating websocket connection"
)
),
Stream.runDrain
)
yield* Effect.all([subscribeToChannels, keepAliveStream], {
mode: "validate",
})
})
const connectionAckStream = source.pipe(
Stream.filter(Schema.is(ConnectionAck)),
Stream.take(1),
Stream.timeoutFail(
() => new Cause.TimeoutException("connection_ack"),
"5 seconds"
),
Stream.tap((msg) => subscribeToChannels(msg.connectionTimeout))
)
const subscriptionSuccessStream = source.pipe(
Stream.filter(Schema.is(SubscribeSuccess)),
Stream.tap((event) =>
subscriptionManager.confirmSubscription(event.id)
)
)
const eventStream = source.pipe(
Stream.filter(Schema.is(Event)),
Stream.tap((event) =>
subscriptionManager.handleEvent(event.id, event.event)
)
)
yield* Stream.merge(eventStream, connectionAckStream).pipe(
Stream.merge(subscriptionSuccessStream),
Stream.runDrain
)
}).pipe(
networkMonitor.latch.whenOpen,
Effect.scoped,
Effect.catchAllCause(() => new Cause.TimeoutException()),
Effect.retry({
schedule: Schedule.jittered(
Schedule.exponential("300 millis", 1.25).pipe(
Schedule.map((duration) => Duration.min(duration, "3 seconds"))
)
),
}),
Effect.onInterrupt(() =>
Effect.logInfo("websocket connection interrupted")
)
)
return { setup }
}),
}
) {}
// Example Usage
const MyRuntime = ManagedRuntime.make(Subscriptions.Default)
React.useEffect(() => {
const fiber = MyRuntime.runFork(Subscriptions.setup)
return () => {
MyRuntime.runCallback(Fiber.interrupt(fiber))
}
})
import { Schema } from "effect"
export const ConnectionAck = Schema.Struct({
type: Schema.Literal("connection_ack"),
connectionTimeoutMs: Schema.DurationFromMillis,
}).pipe(
Schema.rename({
connectionTimeoutMs: "connectionTimeout",
})
)
export const KeepAlive = Schema.Struct({
type: Schema.Literal("ka"),
})
export const SubscriptionId = Schema.String.pipe(Schema.brand("SubscriptionId"))
export type SubscriptionId = typeof SubscriptionId.Type
export const Event = Schema.Struct({
type: Schema.Literal("data"),
id: SubscriptionId,
event: Schema.Unknown,
})
export const SubscribeSuccess = Schema.Struct({
type: Schema.Literal("subscribe_success"),
id: SubscriptionId,
})
export const AppSyncMessage = Schema.parseJson(
Schema.Union(ConnectionAck, KeepAlive, Event, SubscribeSuccess)
)
export type AppSyncMessage = typeof AppSyncMessage.Type
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment