Created
July 28, 2018 20:03
-
-
Save funkjunky/57186f4b34dc5273110e0b9339408807 to your computer and use it in GitHub Desktop.
This is some fun code I had written to handle uploading any number of files of any size, using compression and streaming. (Note: Node doesn't require you convert a stream to an iterator)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export const webFileToResumableDescriptor = (file, uploadId) => async function(chunkSize) { | |
uploadId = uploadId || file.name; | |
return { | |
bytesCount: file.size, | |
uploadId: uploadId, | |
filename: file.name, | |
generator: async function*() { | |
let chunkIndex = 0; | |
// While starting byte is less than the file size, continue. | |
while (chunkIndex * chunkSize < file.size) { | |
yield file.slice(chunkIndex * chunkSize, Math.min((chunkIndex + 1) * chunkSize, file.size)); | |
++chunkIndex; | |
} | |
}, | |
}; | |
}; | |
export const nodeFileToResumableDescriptor = (filename, uploadId) => async function(chunkSize) { | |
uploadId = uploadId || path.basename(filename); | |
const { nodeModule, path, fs } = await getElectron; | |
const mzfs = nodeModule('mz/fs'); | |
const bytesCount = (await mzfs.stat(filename)).size; | |
return { | |
bytesCount, | |
uploadId: uploadId, | |
filename: path.basename(filename), | |
generator: async function*() { | |
let chunkIndex = 0; | |
const stream = fs.createReadStream(filename, { highWaterMark: chunkSize }); | |
await new Promise(resolve => stream.on('readable', resolve)); | |
// While starting byte is less than the file size, continue. | |
while (chunkIndex * chunkSize < bytesCount) { | |
const chunk = stream.read(chunkSize); | |
yield chunk; | |
++chunkIndex; | |
} | |
stream.close(); | |
}, | |
}; | |
}; | |
export const filesToResumableDescriptor = (files, uploadId) => async function(chunkSize) { | |
const { nodeModule } = await getElectron; | |
const fs = nodeModule('mz/fs'); | |
let bytesCount = 0; | |
for (let i=0; i<files.length; ++i) { | |
bytesCount += (await fs.stat(files[i])).size; | |
} | |
const filename = Date.now() + '-scanAssets.zip'; | |
uploadId = uploadId || filename; | |
return ({ | |
bytesCount, | |
uploadId, | |
filename, | |
generator: async function*() { | |
var zip = new JSZip(); | |
let streams = []; | |
for (let i=0; i!==files.length; ++i) { | |
streams[i] = fs.createReadStream(files[i]); | |
zip.file(files[i], streams[i]); | |
} | |
let stream = zip.generateInternalStream({ type: 'array', streamFiles: true }).on('error', e => console.log('JSZIP stream error: ', e)); | |
yield* streamToAsyncGenerator(chunkSize, stream, streams); | |
}, | |
}); | |
}; | |
function streamToAsyncIterator(chunkSize, stream) { | |
let done = false; | |
let endPromise = new Promise(resolve => { | |
//flush out the last data. | |
stream.on('end', () => { | |
resolve({ value: collector, done: true }); | |
}); | |
}); | |
let collector = []; //type of data? String | |
let currentFile; | |
//two-track queue for expecting and sending data with promises | |
let dataPromises = []; | |
let dataResolves = []; | |
let dataRejects = []; | |
stream.on('data', (data, meta) => { | |
if (currentFile !== meta.currentFile) { | |
// Handy for debugging purposes... could be removed eventually. | |
currentFile = meta.currentFile; | |
} | |
collector = collector.concat(data); | |
if (collector.length >= chunkSize) { | |
//console.log('promises and resolves: ', dataPromises.length, dataResolves.length); | |
const value = collector.slice(0, chunkSize); | |
collector = collector.slice(chunkSize); | |
const dataResolve = dataResolves.shift(); | |
if (dataResolve) { | |
dataResolve({ value, done: false }); | |
} else { | |
dataPromises.push(Promise.resolve({ value, done: false })); | |
} | |
stream.pause(); | |
} | |
}); | |
stream.on('error', (...errArgs) => { | |
console.error('Rejecting resolves. Error zipping: ', errArgs); | |
dataRejects.forEach(reject => reject(errArgs)); | |
}); | |
return { | |
[Symbol.asyncIterator]() { | |
return this; | |
}, | |
//TODO handle return() to close the stream | |
next() { | |
if (done) return Promise.resolve({ done }); | |
//Only once we're on the last promise, should we continue the stream. | |
if (dataPromises.length <= 1) { | |
stream.resume(); | |
} | |
let dataPromise = dataPromises.shift(); | |
// If there is no data available... | |
if (!dataPromise) { | |
// queue up a resolve for the next time data is available | |
dataPromise = new Promise((resolve, reject) => { | |
dataResolves.push(resolve); | |
dataRejects.push(reject); | |
}); | |
} | |
return Promise.race([ | |
dataPromise, | |
endPromise, | |
]).then(next => { | |
if (next.done) { | |
done = true; | |
next.done = false; | |
} | |
return next; | |
}); | |
}, | |
}; | |
} | |
async function* streamToAsyncGenerator(chunkSize, stream, streams) { | |
const iterator = streamToAsyncIterator(chunkSize, stream, streams); | |
let next = await iterator.next(); | |
while (!next.done) { | |
yield next.value; | |
// Delete is needed to release resources. | |
delete next.value; | |
next = await iterator.next(); | |
} | |
}; | |
export async function chunkedUpload(url, formData, resumableDescriptor, onProgress) { | |
const chunkSize = 1000000; //approx 1MB | |
let { bytesCount, filename, generator, uploadId } = await resumableDescriptor(chunkSize); | |
//Default for uploadId is url, if it wasn't provided. | |
uploadId = uploadId || url; | |
const chunksLength = Math.ceil(bytesCount / chunkSize); | |
formData.set('chunkSize', chunkSize); | |
formData.set('chunksLength', chunksLength); | |
formData.set('uploadId', uploadId); | |
formData.set('bytesCount', bytesCount); | |
formData.set('filename', filename); | |
const onChunkProgress = chunkIndex => ({ loaded }) => onProgress({ loaded: (chunkIndex - 1) * chunkSize + loaded, total: bytesCount }); | |
// ask the server for uploaded chunks... maybe we can shortcut! | |
const uploadStatus = await uploadPost('/api/files/', formData); | |
let result; | |
let chunkIndex = 0; | |
for await (const chunkData of generator()) { | |
formData.set('chunkIndex', ++chunkIndex); | |
if (chunkIndex < chunksLength && chunkIndex < uploadStatus.res.completedChunks) { | |
continue; | |
} | |
if (chunkData instanceof Blob) { | |
formData.set('chunkData', chunkData); | |
} else { | |
formData.set('chunkData', new File([new Uint8Array(chunkData)], filename)); | |
} | |
result = await uploadPost(url, formData, onChunkProgress(chunkIndex)); | |
//Retry: If it didn't succeed or critically fail, then try again. | |
while (result.status && ![200, 201, 202, 404, 415, 500, 501].includes(result.status)) { | |
result = await uploadPost(url, formData, onChunkProgress(chunkIndex)); | |
} | |
// if the status is not in the 200 range, then it's critically failed. | |
if (result.status && (result.status < 200 || result.status > 400)) { | |
return { err: { status: result.status, message: 'upload failed with a 400 error code probably' } }; | |
} | |
} | |
return result.res; | |
} | |
export const uploadPost = (url, body, onProgress) => xhrResult(new Promise(resolve => { | |
let xhr = new XMLHttpRequest(); | |
xhr.open('post', url); | |
xhr.withCredentials = true; | |
xhr.setRequestHeader('Authorization', 'Token ' + getToken()); | |
xhr.setRequestHeader('Accept', 'application/json'); | |
function onComplete(e) { | |
return resolve({ | |
ok: true, | |
status: xhr.status, | |
json: async function() { | |
return JSON.parse(e.target.responseText); | |
}, | |
}); | |
}; | |
xhr.onload = xhr.onerror = onComplete; | |
if (xhr.upload && onProgress) { | |
xhr.upload.onprogress = onProgress; | |
} | |
xhr.send(body); | |
})); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment