-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathindex.js
106 lines (92 loc) · 2.7 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
var redis = require('redis');
var events = require('events');
module.exports = function(options) {
options = options || {};
if (!(options.port && options.host) && !options.url) {
throw new Error('redis-eventemitter needs a url or port+host');
}
var pub = options.pub;
var sub = options.sub;
if (!options.pub) {
if (options.auth) options.auth_pass = options.auth;
pub = redis.createClient(options);
sub = redis.createClient(options);
}
var prefix = options.prefix || '';
var that = new events.EventEmitter();
var emit = events.EventEmitter.prototype.emit;
var removeListener = events.EventEmitter.prototype.removeListener;
var pending = 0;
var queue = [];
var onflush = function() {
if (--pending) return;
while (queue.length) queue.shift()();
};
var callback = function() {
pending++;
return onflush;
};
var onerror = function(err) {
if (!that.listeners('error').length) return;
emit.apply(that, Array.prototype.concat.apply(['error'], arguments));
};
sub.on('error', onerror);
pub.on('error', onerror);
sub.on('pmessage', function(pattern, channel, messages) {
pattern = pattern.slice(prefix.length);
channel = channel.slice(prefix.length);
try {
emit.apply(that, [pattern, channel].concat(JSON.parse(messages)));
}
catch(err) {
process.nextTick(emit.bind(that, 'error', err));
}
});
that.on('newListener', function(pattern, listener) {
if (pattern === 'error') return;
pattern = prefix + pattern;
if (that.listeners(pattern).length) return;
sub.psubscribe(pattern, callback());
});
that.emit = function(channel, messages) {
if (channel in {newListener:1, error:1}) return emit.apply(this, arguments);
var cb = callback();
messages = Array.prototype.slice.call(arguments, 1);
if (typeof messages[messages.length - 1] === 'function') {
var onflush = callback();
var realCb = messages.pop();
cb = function() {
realCb.apply(null, arguments);
onflush();
}
}
pub.publish(prefix + channel, JSON.stringify(messages), cb);
};
that.removeListener = function(pattern, listener) {
if (pattern in {newListener:1, error:1}) return removeListener.apply(that, arguments);
removeListener.apply(that, arguments);
if (that.listeners(pattern).length) return that;
sub.punsubscribe(prefix+pattern, callback());
return that;
};
that.removeAllListeners = function(pattern) {
that.listeners(pattern).forEach(function(listener) {
that.removeListener(pattern, listener);
});
return that;
};
that.close = function() {
pub.quit();
pub.unref();
sub.quit();
sub.unref();
};
that.flush = function(fn) {
if (!fn) return;
if (!pending) return process.nextTick(fn);
queue.push(fn);
};
that.pub = pub;
that.sub = sub;
return that;
};