Created
January 13, 2015 15:35
-
-
Save calvinmetcalf/161db5bd9a8679886caa to your computer and use it in GitHub Desktop.
whatwg readable streams translated to es5
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var Promise = global.Promise || require('lie'); | |
function noop(){} | |
function typeIsObject(x) { | |
return (typeof x === 'object' && x !== null) || typeof x === 'function'; | |
} | |
function ExclusiveStreamReader () { | |
throw new Error('not implimented'); | |
} | |
function ReadableStream(opts) { | |
var start = opts.start || noop; | |
this._onPull = opts.pull || noop; | |
this._onCancel = opts.cancel || noop; | |
this._strategy = opts.strategy || defaultReadableStreamStrategy; | |
var self = this; | |
this._readyPromise = new Promise(function (resolve) { | |
self._resolveReadyPromise = resolve; | |
}); | |
this.closed = new Promise(function (resolve, reject) { | |
self._resolveClosedPromise = resolve; | |
self._rejectClosedPromise = reject; | |
}); | |
this._queue = []; | |
this._state = 'waiting'; | |
this._started = false; | |
this._draining = false; | |
this._pulling = false; | |
this._reader = undefined; | |
this._enqueue = function (chunk) { | |
return self.__enqueue(chunk); | |
}; | |
this._close = function () { | |
return self.__close(); | |
}; | |
this._error = function (e) { | |
return self.__error(e); | |
}; | |
Promise.resolve(start(this._enqueue, this._close, this._error)).then( | |
function () { | |
self._started = true; | |
callReadableStreamPull(this); | |
}, | |
function (r) { | |
return self._error(r); | |
} | |
); | |
} | |
ReadableStream.prototype.__error = function (e) { | |
var stream = this; | |
if (stream._state === 'waiting') { | |
stream._resolveReadyPromise(); | |
} | |
if (stream._state === 'readable') { | |
stream._queue = []; | |
} | |
if (stream._state === 'waiting' || stream._state === 'readable') { | |
stream._state = 'errored'; | |
stream._storedError = e; | |
stream._rejectClosedPromise(e); | |
if (stream._reader !== undefined) { | |
stream._reader.releaseLock(); | |
} | |
} | |
}; | |
ReadableStream.prototype.__close = function () { | |
var stream = this; | |
if (stream._state === 'waiting') { | |
stream._resolveReadyPromise(); | |
return closeReadableStream(stream); | |
} | |
if (stream._state === 'readable') { | |
stream._draining = true; | |
} | |
}; | |
ReadableStream.prototype.__enqueue = function (chunk) { | |
var stream = this; | |
if (stream._state === 'errored') { | |
throw stream._storedError; | |
} | |
if (stream._state === 'closed') { | |
throw new TypeError('stream is closed'); | |
} | |
if (stream._draining === true) { | |
throw new TypeError('stream is draining'); | |
} | |
var chunkSize; | |
try { | |
chunkSize = stream._strategy.size(chunk); | |
} catch (chunkSizeE) { | |
stream._error(chunkSizeE); | |
throw chunkSizeE; | |
} | |
enqueueValueWithSize(stream._queue, chunk, chunkSize); | |
stream._pulling = false; | |
var shouldApplyBackpressure = shouldReadableStreamApplyBackpressure(stream); | |
if (stream._state === 'waiting') { | |
stream._state = 'readable'; | |
stream._resolveReadyPromise(); | |
} | |
if (shouldApplyBackpressure === true) { | |
return false; | |
} | |
return true; | |
}; | |
ReadableStream.prototype.cancel = function (reason) { | |
if (this._reader !== undefined) { | |
return Promise.reject( | |
new TypeError('This stream is locked to a single exclusive reader and cannot be cancelled directly')); | |
} | |
if (this._state === 'closed') { | |
return Promise.resolve(); | |
} | |
if (this._state === 'errored') { | |
return Promise.reject(this._storedError); | |
} | |
if (this._state === 'waiting') { | |
this._resolveReadyPromise(); | |
} | |
this._queue = []; | |
closeReadableStream(this); | |
var self = this; | |
var sourceCancelPromise = new Promise(function (resolve) { | |
resolve(self._onCancel(reason)); | |
}); | |
return sourceCancelPromise.then(noop); | |
}; | |
ReadableStream.prototype.getReader = function() { | |
if (this._state === 'closed') { | |
throw new TypeError('The stream has already been closed, so a reader cannot be acquired.'); | |
} | |
if (this._state === 'errored') { | |
throw this._storedError; | |
} | |
return new ExclusiveStreamReader(this); | |
}; | |
ReadableStream.prototype.pipeThrough = function(duplex, options) { | |
if (!typeIsObject(duplex.writable)) { | |
throw new TypeError('A transform stream must have an writable property that is an object.'); | |
} | |
if (!typeIsObject(duplex.readable)) { | |
throw new TypeError('A transform stream must have a readable property that is an object.'); | |
} | |
this.pipeTo(duplex.writable, options); | |
return duplex.readable; | |
}; | |
ReadableStream.prototype.read = function () { | |
if (this._reader !== undefined) { | |
throw new TypeError('This stream is locked to a single exclusive reader and cannot be read from directly'); | |
} | |
if (this._state === 'waiting') { | |
throw new TypeError('no chunks available (yet)'); | |
} | |
if (this._state === 'closed') { | |
throw new TypeError('stream has already been consumed'); | |
} | |
if (this._state === 'errored') { | |
throw this._storedError; | |
} | |
if (this._state !== 'readable') { | |
throw new Error('stream state ' + this._state + ' is invalid'); | |
} | |
if (this._queue.length <= 0) { | |
throw new Error('there must be chunks available to read'); | |
} | |
var chunk = dequeueValue(this._queue); | |
if (this._queue.length === 0) { | |
if (this._draining === true) { | |
closeReadableStream(this); | |
} else { | |
this._state = 'waiting'; | |
this._initReadyPromise(); | |
} | |
} | |
callReadableStreamPull(this); | |
return chunk; | |
}; | |
ReadableStream.prototype.pipeTo = function (dest, opts) { | |
opts = opts || {}; | |
var preventClose = opts.preventClose; | |
var preventAbort = opts.preventAbort; | |
var preventCancel = opts.preventCancel; | |
var source; | |
var resolvePipeToPromise; | |
var rejectPipeToPromise; | |
return new Promise(function (resolve, reject) { | |
resolvePipeToPromise = resolve; | |
rejectPipeToPromise = reject; | |
source = this.getReader(); | |
doPipe(); | |
}); | |
function doPipe() { | |
var ds = dest.state; | |
switch (ds) { | |
case 'writable': | |
if (source.state === 'readable') { | |
while (source.state === 'readable' && dest.state === 'writable') { | |
dest.write(source.read()); | |
} | |
Promise.resolve(doPipe); | |
} | |
if (source.state === 'waiting') { | |
Promise.race([source.ready, dest.closed]).then(doPipe, doPipe); | |
} else if (source.state === 'errored') { | |
source.closed.catch(abortDest); | |
} else if (source.state === 'closed') { | |
closeDest(); | |
} | |
return; | |
case 'waiting': | |
if (source.state === 'readable') { | |
Promise.race([source.closed, dest.ready]).then(doPipe, doPipe); | |
} else if (source.state === 'waiting') { | |
Promise.race([source.ready, dest.ready]).then(doPipe); | |
} else if (source.state === 'errored') { | |
source.closed.catch(abortDest); | |
} else if (source.state === 'closed') { | |
closeDest(); | |
} | |
return; | |
case 'errored': | |
if (source.state === 'readable' || source.state === 'waiting') { | |
dest.closed.catch(cancelSource); | |
} | |
return; | |
case 'closing': | |
case 'closed': | |
if (source.state === 'readable' || source.state === 'waiting') { | |
cancelSource(new TypeError('destination is closing or closed and cannot be piped to anymore')); | |
} | |
return; | |
} | |
} | |
function cancelSource(reason) { | |
if (!preventCancel) { | |
// implicitly releases the lock | |
source.cancel(reason); | |
} else { | |
source.releaseLock(); | |
} | |
rejectPipeToPromise(reason); | |
} | |
function closeDest() { | |
source.releaseLock(); | |
if (!preventClose) { | |
dest.close().then(resolvePipeToPromise, rejectPipeToPromise); | |
} else { | |
resolvePipeToPromise(); | |
} | |
} | |
function abortDest(reason) { | |
source.releaseLock(); | |
if (!preventAbort) { | |
dest.abort(reason); | |
} | |
rejectPipeToPromise(reason); | |
} | |
}; | |
Object.defineProperty(ReadableStream.prototype, 'ready', { | |
get: function () { | |
if (this._reader !== undefined) { | |
return this._reader._lockReleased; | |
} | |
return this._readyPromise; | |
}, | |
emumerable: true | |
}); | |
Object.defineProperty(ReadableStream.prototype, 'state', { | |
get: function () { | |
if (this._reader !== undefined) { | |
return 'waiting'; | |
} | |
return this._state; | |
}, | |
emumerable: true | |
}); | |
var defaultReadableStreamStrategy = { | |
shouldApplyBackpressure: function (queueSize) { | |
if (typeof queueSize !== 'number' || queueSize !== queueSize) { | |
throw new Error('invalid queue size'); | |
} | |
return queueSize > 1; | |
}, | |
size: function () { | |
return 1; | |
} | |
}; | |
function callReadableStreamPull(stream) { | |
if (stream._pulling === true || stream._draining === true || stream._started === false || | |
stream._state === 'closed' || stream._state === 'errored') { | |
return; | |
} | |
var shouldApplyBackpressure = shouldReadableStreamApplyBackpressure(stream); | |
if (shouldApplyBackpressure) { | |
return; | |
} | |
stream._pulling = true; | |
try { | |
stream._onPull( | |
stream._enqueue, | |
stream._close, | |
stream._error | |
); | |
} catch (pullResultE) { | |
stream._error(pullResultE); | |
throw pullResultE; | |
} | |
} | |
function shouldReadableStreamApplyBackpressure(stream) { | |
var queueSize = getTotalQueueSize(stream._queue); | |
try { | |
return stream._strategy.shouldApplyBackpressure(queueSize); | |
} catch (shouldApplyBackpressureE) { | |
stream._error(shouldApplyBackpressureE); | |
throw shouldApplyBackpressureE; | |
} | |
} | |
function getTotalQueueSize(queue) { | |
return queue.reduce(function (total, pair) { | |
if(!(typeof pair.size === 'number' && !Number.isNaN(pair.size) && | |
pair.size !== +Infinity && pair.size !== -Infinity)) { | |
throw new Error('Spec-level failure: should never find an invalid size in the queue.'); | |
} | |
return total + pair.size; | |
}, 0); | |
} | |
function dequeueValue(queue) { | |
if(queue.length <= 0) { | |
throw new Error('Spec-level failure: should never dequeue from an empty queue.'); | |
} | |
var pair = queue.shift(); | |
return pair.value; | |
} | |
function callReadableStreamPull(stream) { | |
if (stream._pulling === true || stream._draining === true || stream._started === false || | |
stream._state === 'closed' || stream._state === 'errored') { | |
return; | |
} | |
var shouldApplyBackpressure = shouldReadableStreamApplyBackpressure(stream); | |
if (shouldApplyBackpressure === true) { | |
return; | |
} | |
stream._pulling = true; | |
try { | |
stream._onPull( | |
stream._enqueue, | |
stream._close, | |
stream._error | |
); | |
} catch (pullResultE) { | |
stream._error(pullResultE); | |
throw pullResultE; | |
} | |
} | |
export function enqueueValueWithSize(queue, value, size) { | |
size = Number(size); | |
if (size !== size || size === +Infinity || size === -Infinity) { | |
throw new RangeError('Size must be a finite, non-NaN number.'); | |
} | |
queue.push({ value: value, size: size }); | |
} | |
function closeReadableStream(stream) { | |
stream._state = 'closed'; | |
stream._resolveClosedPromise(); | |
if (stream._reader !== undefined) { | |
stream._reader.releaseLock(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment