Created
January 17, 2017 09:19
-
-
Save IharKrasnik/c9d493995663c624057701d6b0d139a1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var events = require('events'); | |
var util = require('util'); | |
var _ = require('underscore'); | |
var Q = require('q'); | |
var DEFAULTS = { | |
maxQueueSize: false //no limits | |
}; | |
/** | |
* Event Bus with API similar to EventEmitter. | |
* @constructor | |
*/ | |
function EventBus () { | |
/** | |
* Queue of event descriptor objects. Each event descriptor object has | |
* the following structure: { event: event, handler: handler } | |
* | |
* New descriptors are enqueued to the end of array (using "push()") and | |
* dequeued from the beginning of the array (using "shift()"). | |
* | |
* @type {Array} | |
*/ | |
this.queue = []; | |
/** | |
* Current event descriptor in the queue that is currently in the processing state | |
* @type {Object} | |
*/ | |
this.current = null; | |
/** | |
* Last processed event descriptor | |
* @type {Object} | |
*/ | |
this.last = null; | |
this.options = DEFAULTS; | |
// Calling base constructor | |
events.EventEmitter.call(this); | |
this.setMaxListeners(20); | |
} | |
util.inherits(EventBus, events.EventEmitter); | |
EventBus.prototype.setOptions = function (options) { | |
this.options = _.extend(DEFAULTS, options); | |
}; | |
/** | |
* Subscribes handler "handler" to the event "eventName". | |
* All handlers are executed one by one. Handler must return promise | |
* | |
* Example: | |
* | |
* bus.on('myevent', function(event) { | |
* return Q(true); | |
* }); | |
* | |
* | |
* @param {String } eventName Name of event to subscribe to | |
* @param {function(event)} handler Handler that will be called | |
* when event will be received | |
*/ | |
var handlers = {}; | |
EventBus.prototype.on = function (eventName, handler) { | |
var self = this; | |
var isNewEvent = !handlers[eventName]; | |
if (isNewEvent) { | |
handlers[eventName] = []; | |
} | |
handlers[eventName].push(handler); | |
if (isNewEvent) { | |
// Call "on" method of the "base class" | |
events.EventEmitter.prototype.on.call(this, eventName, function eventHandler (event) { | |
if (!self.stopped) { | |
self.enqueue({ | |
eventName: eventName, | |
event: event | |
}); | |
if (self.options.maxQueueSize && self.getQueueSize() >= self.options.maxQueueSize) { | |
self.emit('max-queue-size-reached', { | |
size: self.getQueueSize() | |
}); | |
} | |
next.call(self).done(); | |
} | |
}); | |
} | |
}; | |
EventBus.prototype.hasListenersFor = function (eventName) { | |
return events.EventEmitter.listenerCount(this, eventName) > 0; | |
}; | |
/** | |
* Subscribe on internal bus event | |
* | |
* @param eventName | |
* @param handler | |
*/ | |
EventBus.prototype.onInternal = function (eventName, handler) { | |
events.EventEmitter.prototype.on.call(this, eventName, handler); | |
}; | |
/** | |
* Subscribe on internal bus event once | |
* | |
* @param eventName | |
* @param handler | |
*/ | |
EventBus.prototype.onceInternal = function (eventName, handler) { | |
if (!_.isFunction(handler)) | |
throw TypeError('listener must be a function'); | |
var fired = false; | |
function g () { | |
events.EventEmitter.prototype.removeListener.call(this, eventName, g); | |
if (!fired) { | |
fired = true; | |
handler.apply(this, arguments); | |
} | |
} | |
g.listener = handler; | |
this.onInternal(eventName, g); | |
return this; | |
}; | |
/** | |
* Factory method. Creates new EventBus (used only in tests) | |
* @returns {EventBus} | |
*/ | |
EventBus.prototype.create = function () { | |
return new EventBus(); | |
}; | |
EventBus.prototype.getQueueSize = function () { | |
return this.queue.length; | |
}; | |
EventBus.prototype.isEmpty = function () { | |
return this.getQueueSize() == 0; | |
} | |
EventBus.prototype.next = function () { | |
next.call(this); | |
}; | |
EventBus.prototype.enqueue = function (eventDescriptor) { | |
return this.queue.push(eventDescriptor); | |
}; | |
EventBus.prototype.dequeue = function () { | |
return this.queue.shift(); | |
}; | |
this.stopped = false; | |
EventBus.prototype.stop = function () { | |
this.stopped = true; | |
this.removeAllListeners(); | |
this.queue = []; | |
}; | |
function handleError (e, evt, handler) { | |
evt.failed = true; | |
var data = { err: e, evt: evt, handler: handler }; | |
this.emit('error', data); | |
} | |
function next () { | |
var self = this; | |
if (self.stopped) { | |
return Q(); | |
} | |
if (self.isEmpty()) { | |
self.emit('queue-empty', {}); | |
return Q(); | |
} | |
if (self.current && self.current === self.queue[0]) { | |
return Q(); | |
} | |
self.current = self.queue[0]; | |
self.emit('event-processing-started', self.current.event); | |
var eventHandlers = handlers[self.current.eventName]; | |
return _.reduce(eventHandlers, function (chain, eventHandler) { | |
return chain.then(function () { | |
if (!self.stopped) { | |
return Q.fcall(function () { | |
return eventHandler(self.current.event); | |
}).catch(function (e) { | |
return handleError.call(self, e, self.current.event, eventHandler); | |
}); | |
} | |
}); | |
}, Q()) | |
.then(function () { | |
if (!self.stopped) { | |
self.emit('event-processed', self.current.event); | |
self.dequeue(); | |
self.current = null; | |
return next.call(self); | |
} | |
}); | |
} | |
module.exports = EventBus; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment