Skip to content

Instantly share code, notes, and snippets.

@IharKrasnik
Created January 17, 2017 09:19
Show Gist options
  • Save IharKrasnik/c9d493995663c624057701d6b0d139a1 to your computer and use it in GitHub Desktop.
Save IharKrasnik/c9d493995663c624057701d6b0d139a1 to your computer and use it in GitHub Desktop.
var events = require('events');
var util = require('util');
var _ = require('underscore');
var Q = require('q');
var DEFAULTS = {
maxQueueSize: false //no limits
};
/**
* Event Bus with API similar to EventEmitter.
* @constructor
*/
function EventBus () {
/**
* Queue of event descriptor objects. Each event descriptor object has
* the following structure: { event: event, handler: handler }
*
* New descriptors are enqueued to the end of array (using "push()") and
* dequeued from the beginning of the array (using "shift()").
*
* @type {Array}
*/
this.queue = [];
/**
* Current event descriptor in the queue that is currently in the processing state
* @type {Object}
*/
this.current = null;
/**
* Last processed event descriptor
* @type {Object}
*/
this.last = null;
this.options = DEFAULTS;
// Calling base constructor
events.EventEmitter.call(this);
this.setMaxListeners(20);
}
util.inherits(EventBus, events.EventEmitter);
EventBus.prototype.setOptions = function (options) {
this.options = _.extend(DEFAULTS, options);
};
/**
* Subscribes handler "handler" to the event "eventName".
* All handlers are executed one by one. Handler must return promise
*
* Example:
*
* bus.on('myevent', function(event) {
* return Q(true);
* });
*
*
* @param {String } eventName Name of event to subscribe to
* @param {function(event)} handler Handler that will be called
* when event will be received
*/
var handlers = {};
EventBus.prototype.on = function (eventName, handler) {
var self = this;
var isNewEvent = !handlers[eventName];
if (isNewEvent) {
handlers[eventName] = [];
}
handlers[eventName].push(handler);
if (isNewEvent) {
// Call "on" method of the "base class"
events.EventEmitter.prototype.on.call(this, eventName, function eventHandler (event) {
if (!self.stopped) {
self.enqueue({
eventName: eventName,
event: event
});
if (self.options.maxQueueSize && self.getQueueSize() >= self.options.maxQueueSize) {
self.emit('max-queue-size-reached', {
size: self.getQueueSize()
});
}
next.call(self).done();
}
});
}
};
EventBus.prototype.hasListenersFor = function (eventName) {
return events.EventEmitter.listenerCount(this, eventName) > 0;
};
/**
* Subscribe on internal bus event
*
* @param eventName
* @param handler
*/
EventBus.prototype.onInternal = function (eventName, handler) {
events.EventEmitter.prototype.on.call(this, eventName, handler);
};
/**
* Subscribe on internal bus event once
*
* @param eventName
* @param handler
*/
EventBus.prototype.onceInternal = function (eventName, handler) {
if (!_.isFunction(handler))
throw TypeError('listener must be a function');
var fired = false;
function g () {
events.EventEmitter.prototype.removeListener.call(this, eventName, g);
if (!fired) {
fired = true;
handler.apply(this, arguments);
}
}
g.listener = handler;
this.onInternal(eventName, g);
return this;
};
/**
* Factory method. Creates new EventBus (used only in tests)
* @returns {EventBus}
*/
EventBus.prototype.create = function () {
return new EventBus();
};
EventBus.prototype.getQueueSize = function () {
return this.queue.length;
};
EventBus.prototype.isEmpty = function () {
return this.getQueueSize() == 0;
}
EventBus.prototype.next = function () {
next.call(this);
};
EventBus.prototype.enqueue = function (eventDescriptor) {
return this.queue.push(eventDescriptor);
};
EventBus.prototype.dequeue = function () {
return this.queue.shift();
};
this.stopped = false;
EventBus.prototype.stop = function () {
this.stopped = true;
this.removeAllListeners();
this.queue = [];
};
function handleError (e, evt, handler) {
evt.failed = true;
var data = { err: e, evt: evt, handler: handler };
this.emit('error', data);
}
function next () {
var self = this;
if (self.stopped) {
return Q();
}
if (self.isEmpty()) {
self.emit('queue-empty', {});
return Q();
}
if (self.current && self.current === self.queue[0]) {
return Q();
}
self.current = self.queue[0];
self.emit('event-processing-started', self.current.event);
var eventHandlers = handlers[self.current.eventName];
return _.reduce(eventHandlers, function (chain, eventHandler) {
return chain.then(function () {
if (!self.stopped) {
return Q.fcall(function () {
return eventHandler(self.current.event);
}).catch(function (e) {
return handleError.call(self, e, self.current.event, eventHandler);
});
}
});
}, Q())
.then(function () {
if (!self.stopped) {
self.emit('event-processed', self.current.event);
self.dequeue();
self.current = null;
return next.call(self);
}
});
}
module.exports = EventBus;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment