forked from mean-expert-official/loopback-component-pubsub
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
153 lines (127 loc) · 4.24 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
"use strict";
/**
* Dependencies
**/
var io = require("socket.io"),
ioAuth = require("socketio-auth"),
Pubsub = require("./pubsub"),
debug = require("debug")("lc:pubsub"),
NATS = require("nats");
/**
* @module LoopBack Component PubSub
* @author Jonathan Casarrubias <http://twitter.com/johncasarrubias>
* @description
*
* This module integrates LoopBack with Socket IO in order to provide
* PubSub functionallity.
*/
module.exports = (app, options) => {
/**
* Set Default Options
*/
options = Object.assign({}, {
auth: true,
removeApiRoot: true,
apiRoot: app.settings.restApiRoot,
natsUrl: process.env.NATS_URL || app.settings.natsUrl || ""
}, options);
debug("Options from component config:", options);
/**
* Set Listener waiting for Http Server
**/
app.on("started", start);
/**
* Setup Real Time Communications
**/
function start(server) {
debug("RTC server listening at %s", app.get("url").replace("http", "ws"));
// Lets create an instance of IO and reference it in app
var socket = io(server);
var nats = buildNatsClient(options);
// close the engine to let the app server stop
app.on("stopping", () => {
debug("Attached server is stopping, closing pubsub engines");
if ( socket ) socket.engine.close();
if ( nats ) nats.close();
});
// Add a pubsub instanceable module
app.pubsub = new Pubsub(socket, nats, options);
// Configure ioAuth
if (options.auth === true) {
debug("RTC authentication mechanism enabled");
ioAuth(socket, {
authenticate: (ctx, token, next) => {
var AccessToken = app.models.AccessToken;
//verify credentials sent by the client
token = AccessToken.find({
where: { id: token.id || 0, userId: token.userId || 0 }
}, (err, tokenInstance) => {
next(err, tokenInstance.length > 0 ? true : false);
});
},
postAuthenticate: () => {
socket.on("authentication", value => {
debug("A user (%s) has been authenticated over web sockets", value.userId);
});
}
});
}
socket.on("connection", connection => {
debug("A new client connected", connection.id);
connection.on("lb-ping", () => connection.emit("lb-pong", new Date().getTime() / 1000));
});
}
// this will build a NATS client and return it, even if it is not connected,
// with reconnection and persistent reopening handled via the callbacks available
// from the NATS client itself. Publishes will still work but will fall into
// the ether until the connection is re-established
var buildNatsClient = function(options) {
if ( options.natsUrl === "" || !options.natsUrl ) {
return null;
}
var opts = {
url: options.natsUrl,
reconnect: true
};
if ( options.natsAuth ) {
debug("Using nats authentication");
opts.user = options.natsAuth.user;
opts.pass = options.natsAuth.pass;
}
var nats = NATS.connect(opts);
nats.on("connect", function() {
debug("NATS connected on %s", options.natsUrl);
app.emit("log", {level: "info", msg: "Connection successful", service: "NATS", url: options.natsUrl})
});
nats.on("reconnecting", function() {
debug("NATS attempting reconnection to %s", options.natsUrl);
});
nats.on("error", function(err) {
debug("NATS error: %s", err);
app.emit("log", {level: "error", msg: err, service: "NATS", url: options.natsUrl})
if ( err.indexOf("ECONNREFUSED") ) {
setTimeout(function() {
debug("NATS attempting to re-open connection with %s", options.natsUrl)
nats.parseOptions(opts);
nats.initState();
nats.createConnection();
}, 2000)
}
});
nats.on("close", function() {
debug("NATS connection closed");
if ( nats.closed ) return;
debug("NATS attempting to re-open connection with %s", options.natsUrl)
nats.parseOptions(opts);
nats.initState();
nats.createConnection();
});
nats.on("disconnect", function() {
debug("NATS was disconnected");
});
nats.on("reconnecting", function() {
debug("NATS is reconnecting");
});
return nats;
};
};