Skip to content

Instantly share code, notes, and snippets.

@RWOverdijk
Created February 2, 2017 21:11
Show Gist options
  • Save RWOverdijk/1e1dfda6f4a00f598cd1d941197068ae to your computer and use it in GitHub Desktop.
Save RWOverdijk/1e1dfda6f4a00f598cd1d941197068ae to your computer and use it in GitHub Desktop.
let messageQueue = require('./messageQueue');
function saySomething() {
messageQueue.publish('sync', {hello: process.argv[2] || 'world', when: Date.now()});
setTimeout(saySomething, 1000);
}
saySomething();
const amqplib = require('amqplib');
class MessageQueue {
constructor() {
this.connection = amqplib.connect(process.env.CLOUDAMQP_URL || 'amqp://localhost');
this.channels = {};
}
channel(queue) {
if (!this.channels[queue]) {
this.channels[queue] = this.connection
.then(connection => connection.createChannel())
.tap(channel => channel.assertQueue(queue))
.tap(channel => channel.prefetch(1));
}
return this.channels[queue];
}
publish(queue, message) {
this.channel(queue).then(channel => {
channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)));
});
}
consume(queue, callback) {
this.channel(queue).then(channel => {
channel.consume(queue, message => {
let decoded = message;
if (decoded !== null) {
decoded = JSON.parse(decoded.content.toString());
}
callback(decoded, () => {
channel.ack(message);
});
});
});
}
}
module.exports = new MessageQueue();
{
"name": "knijntje",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"amqplib": "^0.5.1"
}
}
let messageQueue = require('./messageQueue');
messageQueue.consume('sync', (message, done) => {
console.log(message);
setTimeout(() => {
console.log('ready for next message');
done();
}, 1500);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment