Created
December 11, 2017 08:23
-
-
Save matejc/268e8dd97998bf0791319af8da87eb02 to your computer and use it in GitHub Desktop.
local queue with concurrency
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const PQ = require('p-q'); | |
// so many parallel promises should run at the same time | |
const concurrency = 3; | |
// simulate work with delay function | |
function delay(times, ms = 1000) { | |
return new Promise((resolve) => { | |
setTimeout(resolve, times * ms); | |
}); | |
} | |
// somewhat official function for returning random integer in range | |
function getRandomInt(min, max) { | |
min = Math.ceil(min); | |
max = Math.floor(max); | |
return Math.floor(Math.random() * (max - min)) + min; //The maximum is exclusive and the minimum is inclusive | |
} | |
// initialize local queue - set pre-defined function, that returns a promise | |
const queue = new PQ(async(data) => { | |
await delay(getRandomInt(0, 5)); | |
console.log(`finished ${data.i}`); | |
}); | |
// the heart of the operation | |
// add one item to queue and block until one finishes, | |
// so there is no more than 3 running | |
async function add(data) { | |
console.log(`add ${data.i}`); | |
queue.add(data); | |
if (queue.length() >= concurrency) { | |
await new Promise(resolve => { | |
queue.once('processed', () => { | |
resolve(); | |
}); | |
}); | |
} | |
} | |
async function main() { | |
// let's simulate rabbitmq - should not accept more than 3 items at a time, | |
// so that other instances can fetch new events | |
for (let i = 0; i < 100; i++) { | |
if (i < 10) { | |
await add({i}); | |
} else if (i > 30 && i < 40) { | |
await add({i}); | |
} else { | |
await delay(1); | |
} | |
} | |
} | |
main() | |
.then(() => { | |
console.log('END'); | |
}) | |
.catch(err => { | |
throw err; | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment