Last active
June 16, 2025 20:06
-
-
Save lucas-barake/d80e87c3532d6a3cafe48a65c9441682 to your computer and use it in GitHub Desktop.
How I Handle WebSockets in Web Apps
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Effect, type Schema } from "effect"; | |
export type EventHandler<A, I, X> = { | |
readonly channel: `/${string}/`; | |
readonly schema: Schema.Schema<A, I>; | |
readonly handle: (data: A) => Effect.Effect<void, X, never>; | |
}; | |
export type InfallibleEventHandler<A, I> = { | |
[K in keyof EventHandler<A, I, never>]: EventHandler<A, I, never>[K]; | |
}; | |
export const createEventHandler = <A, I, X>( | |
handler: EventHandler<A, I, X> | |
): InfallibleEventHandler<A, I> => ({ | |
channel: handler.channel, | |
schema: handler.schema, | |
handle: (data) => handler.handle(data).pipe(Effect.catchAllCause(() => Effect.void)), | |
}); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { createEventHandler } from "./event-handler.ts" | |
import { Effect, Schema } from "effect" | |
const testHandler = createEventHandler({ | |
channel: "/test/", | |
schema: Schema.Struct({ | |
test: Schema.String, | |
}), | |
handle: (data) => Effect.log(data), | |
}) | |
export const eventHandlers = [testHandler] as const | |
export type LiveEventHandler = (typeof eventHandlers)[number] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Chunk, Effect, Stream, SubscriptionRef } from "effect" | |
export class NetworkMonitor extends Effect.Service<NetworkMonitor>()( | |
"NetworkMonitor", | |
{ | |
effect: Effect.gen(function* () { | |
const latch = yield* Effect.makeLatch(window.navigator.onLine) | |
const ref = yield* SubscriptionRef.make<boolean>(window.navigator.onLine) | |
yield* Stream.async<boolean>((emit) => { | |
window.addEventListener("online", () => | |
emit(Effect.succeed(Chunk.of(true))) | |
) | |
window.addEventListener("offline", () => | |
emit(Effect.succeed(Chunk.of(false))) | |
) | |
}).pipe( | |
Stream.tap((isOnline) => | |
(isOnline ? latch.open : latch.close).pipe( | |
Effect.zipRight(SubscriptionRef.update(ref, () => isOnline)) | |
) | |
), | |
Stream.runDrain, | |
Effect.forkDaemon | |
) | |
return { latch, ref } | |
}), | |
accessors: true, | |
} | |
) {} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { | |
Cause, | |
Effect, | |
HashMap, | |
Option, | |
PubSub, | |
Schedule, | |
Schema, | |
Stream, | |
} from "effect" | |
import { SubscriptionId } from "./types.ts" | |
import type { LiveEventHandler } from "./event-handlers.ts" | |
export class SubscriptionManager extends Effect.Service<SubscriptionManager>()( | |
"SubscriptionManager", | |
{ | |
effect: Effect.gen(function* () { | |
const subscriptions = HashMap.empty<SubscriptionId, LiveEventHandler>() | |
const successPubSub = yield* PubSub.unbounded<SubscriptionId>() | |
const subscribe = ({ | |
handler, | |
send, | |
}: { | |
handler: LiveEventHandler | |
send: ( | |
message: Record<string, unknown> | |
) => Effect.Effect<void, never, never> | |
}) => | |
Effect.gen(function* () { | |
const id = SubscriptionId.make(crypto.randomUUID()) | |
HashMap.set(subscriptions, id, handler) | |
yield* send({ | |
id, | |
type: "subscribe", | |
channel: handler.channel, | |
}) | |
yield* Effect.logInfo( | |
`sent subscription request for ${handler.channel}` | |
) | |
yield* Stream.fromPubSub(successPubSub).pipe( | |
Stream.filter((successId) => successId === id), | |
Stream.take(1), | |
Stream.timeoutFail(() => new Cause.TimeoutException(), "5 seconds"), | |
Stream.tapError(() => | |
Effect.logInfo( | |
`failed to subscribe to ${handler.channel} with ID ${id}` | |
) | |
), | |
Stream.onEnd( | |
Effect.addFinalizer(() => send({ type: "unsubscribe", id })) | |
), | |
Stream.runDrain | |
) | |
}).pipe( | |
Effect.retry({ | |
schedule: Schedule.jittered(Schedule.spaced("400 millis")), | |
}) | |
) | |
const confirmSubscription = (id: SubscriptionId) => | |
PubSub.publish(successPubSub, id) | |
const handleEvent = (id: SubscriptionId, event: unknown) => | |
Effect.gen(function* () { | |
const sub = HashMap.get(subscriptions, id) | |
if (Option.isNone(sub)) return | |
const payload = yield* Effect.option( | |
Schema.decodeUnknown(sub.value.schema)(event) | |
) | |
if (Option.isNone(payload)) return | |
yield* sub.value.handle(payload.value) | |
}) | |
return { | |
subscribe, | |
confirmSubscription, | |
handleEvent, | |
} | |
}), | |
} | |
) {} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { | |
Cause, | |
Chunk, | |
Duration, | |
Effect, | |
Fiber, | |
ManagedRuntime, | |
Option, | |
pipe, | |
Schedule, | |
Schema, | |
Stream, | |
} from "effect" | |
import { | |
AppSyncMessage, | |
ConnectionAck, | |
Event, | |
KeepAlive, | |
SubscribeSuccess, | |
} from "./types.ts" | |
import { SubscriptionManager } from "./subscription-manager.ts" | |
import { eventHandlers } from "./event-handlers.ts" | |
import React from "react" | |
import { NetworkMonitor } from "./network-monitor.ts" | |
export class Subscriptions extends Effect.Service<Subscriptions>()( | |
"Subscriptions", | |
{ | |
accessors: true, | |
dependencies: [SubscriptionManager.Default, NetworkMonitor.Default], | |
effect: Effect.gen(function* () { | |
const subscriptionManager = yield* SubscriptionManager | |
const networkMonitor = yield* NetworkMonitor | |
const setup = Effect.gen(function* () { | |
const ws = new WebSocket("ws://") | |
yield* Effect.addFinalizer(() => | |
Effect.sync(() => { | |
ws.close() | |
}) | |
) | |
ws.onopen = () => { | |
ws.send( | |
JSON.stringify({ | |
type: "connection_init", | |
}) | |
) | |
} | |
const source = yield* Stream.async< | |
AppSyncMessage | null, | |
Cause.TimeoutException | |
>((emit) => { | |
ws.addEventListener("message", (e) => { | |
void emit( | |
pipe( | |
e.data, | |
Schema.decodeUnknown(AppSyncMessage), | |
Effect.map((data) => Chunk.make(data)), | |
Effect.catchTag("ParseError", () => | |
Effect.succeed(Chunk.make(null)) | |
) | |
) | |
) | |
}) | |
ws.addEventListener("error", () => { | |
void emit(Effect.fail(Option.some(new Cause.TimeoutException()))) | |
}) | |
ws.addEventListener("close", () => { | |
void emit(Effect.fail(Option.some(new Cause.TimeoutException()))) | |
}) | |
}).pipe( | |
Stream.share({ | |
capacity: "unbounded", | |
}) | |
) | |
const subscribeToChannels = (connectionTimeout: Duration.Duration) => | |
Effect.gen(function* () { | |
yield* Effect.logInfo( | |
`connection_ack received with ack timeout: ${Duration.toSeconds(connectionTimeout)}s` | |
) | |
const subscribeToChannels = Effect.forEach( | |
eventHandlers, | |
(handler) => | |
subscriptionManager.subscribe({ | |
handler, | |
send: (payload) => | |
Effect.sync(() => ws.send(JSON.stringify(payload))), | |
}), | |
{ | |
concurrency: "unbounded", | |
} | |
) | |
const keepAliveStream = source.pipe( | |
Stream.filter(Schema.is(KeepAlive)), | |
Stream.timeoutFail( | |
() => new Cause.TimeoutException("KeepAlive"), | |
connectionTimeout | |
), | |
Stream.tapError(() => | |
Effect.logInfo( | |
"keepAlive timeout exceeded - recreating websocket connection" | |
) | |
), | |
Stream.runDrain | |
) | |
yield* Effect.all([subscribeToChannels, keepAliveStream], { | |
mode: "validate", | |
}) | |
}) | |
const connectionAckStream = source.pipe( | |
Stream.filter(Schema.is(ConnectionAck)), | |
Stream.take(1), | |
Stream.timeoutFail( | |
() => new Cause.TimeoutException("connection_ack"), | |
"5 seconds" | |
), | |
Stream.tap((msg) => subscribeToChannels(msg.connectionTimeout)) | |
) | |
const subscriptionSuccessStream = source.pipe( | |
Stream.filter(Schema.is(SubscribeSuccess)), | |
Stream.tap((event) => | |
subscriptionManager.confirmSubscription(event.id) | |
) | |
) | |
const eventStream = source.pipe( | |
Stream.filter(Schema.is(Event)), | |
Stream.tap((event) => | |
subscriptionManager.handleEvent(event.id, event.event) | |
) | |
) | |
yield* Stream.merge(eventStream, connectionAckStream).pipe( | |
Stream.merge(subscriptionSuccessStream), | |
Stream.runDrain | |
) | |
}).pipe( | |
networkMonitor.latch.whenOpen, | |
Effect.scoped, | |
Effect.catchAllCause(() => new Cause.TimeoutException()), | |
Effect.retry({ | |
schedule: Schedule.jittered( | |
Schedule.exponential("300 millis", 1.25).pipe( | |
Schedule.map((duration) => Duration.min(duration, "3 seconds")) | |
) | |
), | |
}), | |
Effect.onInterrupt(() => | |
Effect.logInfo("websocket connection interrupted") | |
) | |
) | |
return { setup } | |
}), | |
} | |
) {} | |
// Example Usage | |
const MyRuntime = ManagedRuntime.make(Subscriptions.Default) | |
React.useEffect(() => { | |
const fiber = MyRuntime.runFork(Subscriptions.setup) | |
return () => { | |
MyRuntime.runCallback(Fiber.interrupt(fiber)) | |
} | |
}) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Schema } from "effect" | |
export const ConnectionAck = Schema.Struct({ | |
type: Schema.Literal("connection_ack"), | |
connectionTimeoutMs: Schema.DurationFromMillis, | |
}).pipe( | |
Schema.rename({ | |
connectionTimeoutMs: "connectionTimeout", | |
}) | |
) | |
export const KeepAlive = Schema.Struct({ | |
type: Schema.Literal("ka"), | |
}) | |
export const SubscriptionId = Schema.String.pipe(Schema.brand("SubscriptionId")) | |
export type SubscriptionId = typeof SubscriptionId.Type | |
export const Event = Schema.Struct({ | |
type: Schema.Literal("data"), | |
id: SubscriptionId, | |
event: Schema.Unknown, | |
}) | |
export const SubscribeSuccess = Schema.Struct({ | |
type: Schema.Literal("subscribe_success"), | |
id: SubscriptionId, | |
}) | |
export const AppSyncMessage = Schema.parseJson( | |
Schema.Union(ConnectionAck, KeepAlive, Event, SubscribeSuccess) | |
) | |
export type AppSyncMessage = typeof AppSyncMessage.Type |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment