Created
August 15, 2023 21:16
-
-
Save schickling/4624bc16c68b586b9bf0d1d0e05ac931 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import * as Context from '@effect/data/Context' | |
import * as Duration from '@effect/data/Duration' | |
import type * as Either from '@effect/data/Either' | |
import { pipe } from '@effect/data/Function' | |
import * as Deferred from '@effect/io/Deferred' | |
import * as FiberRefs from '@effect/io/FiberRefs' | |
import * as Queue from '@effect/io/Queue' | |
import * as Effect from './Effect.js' | |
import * as Otel from './Otel/index.js' | |
export type DataLoaderOptions = { | |
maxBatchSize?: number | |
/** | |
* Time in milliseconds how long the dataloader should wait for new keys before calling `batchCallback` | |
* | |
* @default 10 | |
* | |
* TODO replace with a more sophisticated scheduler (e.g. https://github.com/graphql/dataloader#batch-scheduling) | |
*/ | |
batchTimeoutMs?: number | |
otelBatchKey: string | |
} | |
export const makeDataLoader = <R, Err, TKey, TOut>( | |
batchCallback: (keys: ReadonlyArray<TKey>) => Effect.Effect<R, never, ReadonlyArray<Either.Either<Err, TOut>>>, | |
{ batchTimeoutMs = 10, maxBatchSize = Number.POSITIVE_INFINITY, otelBatchKey }: DataLoaderOptions, | |
) => | |
Effect.gen(function* ($) { | |
type QueueItem = [ | |
key: TKey, | |
deferred: Deferred.Deferred<Err, TOut>, | |
env: Context.Context<R>, | |
refs: FiberRefs.FiberRefs, | |
] | |
const queue = yield* $(Queue.unbounded<QueueItem>()) | |
const loadSingle = (key: TKey): Effect.Effect<R, Err, TOut> => | |
pipe( | |
Effect.Do, | |
Effect.bind('deferred', () => Deferred.make<Err, TOut>()), | |
Effect.bind('env', () => Effect.context<R>()), | |
Effect.bind('refs', () => Effect.getFiberRefs), | |
Effect.tap(({ deferred, env, refs }) => Queue.offer(queue, [key, deferred, env, refs])), | |
Effect.flatMap(({ deferred }) => Deferred.await(deferred)), | |
) | |
function load(key: TKey): Effect.Effect<R, Err, TOut> | |
function load(key: ReadonlyArray<TKey>): Effect.Effect<R, Err, ReadonlyArray<TOut>> | |
// eslint-disable-next-line prefer-arrow/prefer-arrow-functions | |
function load(key: TKey | ReadonlyArray<TKey>): Effect.Effect<R, Err, TOut | ReadonlyArray<TOut>> { | |
return Array.isArray(key) | |
? Effect.forEach(key, loadSingle, { concurrency: 'unbounded' }) | |
: loadSingle(key as TKey) | |
} | |
let batchRunNumber = 0 | |
yield* $( | |
Effect.gen(function* ($) { | |
const batch: QueueItem[] = [] | |
let timeOfFirstItemMs: number | undefined | |
while (true) { | |
if (batch.length === 0) { | |
const itemChunk = yield* $(Queue.takeBetween(queue, 1, maxBatchSize - batch.length)) | |
batch.push(...itemChunk) | |
timeOfFirstItemMs = Date.now() | |
} else { | |
const remainingTime = batchTimeoutMs - (Date.now() - timeOfFirstItemMs!) | |
const itemChunkOption = yield* $( | |
Queue.takeBetween(queue, 1, maxBatchSize - batch.length), | |
Effect.timeout(Duration.millis(remainingTime)), | |
) | |
if (itemChunkOption._tag === 'None') { | |
break | |
} | |
batch.push(...itemChunkOption.value) | |
// NOTE this can happen if the timeout above didn't trigger yet and still too much time has passed | |
const newRemainingTime = batchTimeoutMs - (Date.now() - timeOfFirstItemMs!) | |
if (newRemainingTime <= 0) { | |
break | |
} | |
} | |
if (batch.length === maxBatchSize) { | |
break | |
} | |
} | |
return batch | |
}), | |
Effect.flatMap((queueItemsBatch) => { | |
const keys = queueItemsBatch.map(([key]) => key) | |
const env = queueItemsBatch[0]![2] | |
const refs = queueItemsBatch[0]![3] | |
return Effect.acquireUseRelease( | |
Effect.getFiberRefs, | |
() => | |
Effect.contextWithEffect((parentContext: Context.Context<never>) => | |
Effect.flatMap(FiberRefs.setAll(refs), () => | |
pipe( | |
batchCallback([...keys]), | |
Effect.tap((results) => | |
Effect.forEach(results, (result, index) => { | |
const deferred = queueItemsBatch[index]![1] | |
return result._tag === 'Left' | |
? Deferred.fail(deferred, result.left) | |
: Deferred.succeed(deferred, result.right) | |
}), | |
), | |
Otel.withSpan('DataLoader:batchedLoad', { | |
attributes: { | |
batchKey: otelBatchKey, | |
maxBatchSize, | |
batchTimeoutMs, | |
batchSize: queueItemsBatch.length, | |
batchRunNumber, | |
}, | |
}), | |
Otel.histogram(`dataloader_batch_size`, queueItemsBatch.length, { batchKey: otelBatchKey }), | |
Effect.provideSomeContext(Context.merge(parentContext, env)), | |
), | |
), | |
), | |
(oldRefs) => Effect.setFiberRefs(oldRefs), | |
) | |
}), | |
Effect.tap(() => Effect.sync(() => batchRunNumber++)), | |
Effect.forever, | |
Effect.tapCauseLogPretty, | |
Effect.interruptible, | |
Effect.forkScoped, | |
) | |
return { load } | |
}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment