diff --git a/support/aphlict/server/aphlict_server.js b/support/aphlict/server/aphlict_server.js index aa5784f16b..2ec7f97714 100644 --- a/support/aphlict/server/aphlict_server.js +++ b/support/aphlict/server/aphlict_server.js @@ -1,263 +1,125 @@ var JX = require('./lib/javelin').JX; var http = require('http'); var https = require('https'); var util = require('util'); var fs = require('fs'); JX.require('lib/AphlictListenerList', __dirname); JX.require('lib/AphlictLog', __dirname); function parse_command_line_arguments(argv) { var config = { 'client-port': 22280, 'admin-port': 22281, 'client-host': '0.0.0.0', 'admin-host': '127.0.0.1', log: '/var/log/aphlict.log', 'ssl-key': null, 'ssl-cert': null, test: false }; for (var ii = 2; ii < argv.length; ii++) { var arg = argv[ii]; var matches = arg.match(/^--([^=]+)=(.*)$/); if (!matches) { throw new Error("Unknown argument '" + arg + "'!"); } if (!(matches[1] in config)) { throw new Error("Unknown argument '" + matches[1] + "'!"); } config[matches[1]] = matches[2]; } config['client-port'] = parseInt(config['client-port'], 10); config['admin-port'] = parseInt(config['admin-port'], 10); return config; } var debug = new JX.AphlictLog() .addConsole(console); var config = parse_command_line_arguments(process.argv); process.on('uncaughtException', function(err) { var context = null; - if ((err.code == 'EACCES') && - (err.path == config.log)) { + if (err.code == 'EACCES' && err.path == config.log) { context = util.format( 'Unable to open logfile ("%s"). Check that permissions are set ' + 'correctly.', err.path); } var message = [ '\n<<< UNCAUGHT EXCEPTION! >>>', ]; if (context) { message.push(context); } message.push(err.stack); debug.log(message.join('\n\n')); process.exit(1); }); -var WebSocket; try { - WebSocket = require('ws'); + require('ws'); } catch (ex) { throw new Error( 'You need to install the Node.js "ws" module for websocket support. ' + 'See "Notifications User Guide: Setup and Configuration" in the ' + 'documentation for instructions. ' + ex.toString()); } var ssl_config = { enabled: (config['ssl-key'] || config['ssl-cert']) }; // Load the SSL certificates (if any were provided) now, so that runs with // `--test` will see any errors. if (ssl_config.enabled) { ssl_config.key = fs.readFileSync(config['ssl-key']); ssl_config.cert = fs.readFileSync(config['ssl-cert']); } // Add the logfile so we'll fail if we can't write to it. if (config.log) { - debug.addLogfile(config.log); + debug.addLog(config.log); } // If we're just doing a configuration test, exit here before starting any // servers. if (config.test) { debug.log('Configuration test OK.'); process.exit(0); } -var start_time = new Date().getTime(); -var messages_out = 0; -var messages_in = 0; +JX.require('lib/AphlictAdminServer', __dirname); +JX.require('lib/AphlictClientServer', __dirname); -var clients = new JX.AphlictListenerList(); - -function https_discard_handler(req, res) { - res.writeHead(501); - res.end('HTTP/501 Use Websockets\n'); -} - -var ws; +var server; if (ssl_config.enabled) { - var https_server = https.createServer({ + server = https.createServer({ key: ssl_config.key, cert: ssl_config.cert - }, https_discard_handler).listen( - config['client-port'], - config['client-host']); - - ws = new WebSocket.Server({server: https_server}); -} else { - ws = new WebSocket.Server({ - port: config['client-port'], - host: config['client-host'], - }); -} - -ws.on('connection', function(ws) { - var listener = clients.addListener(ws); - - function log() { - debug.log( - util.format('<%s>', listener.getDescription()) + - ' ' + - util.format.apply(null, arguments)); - } - - log('Connected from %s.', ws._socket.remoteAddress); - - ws.on('message', function(data) { - log('Received message: %s', data); - - var message; - try { - message = JSON.parse(data); - } catch (err) { - log('Message is invalid: %s', err.message); - return; - } - - switch (message.command) { - case 'subscribe': - log( - 'Subscribed to: %s', - JSON.stringify(message.data)); - listener.subscribe(message.data); - break; - - case 'unsubscribe': - log( - 'Unsubscribed from: %s', - JSON.stringify(message.data)); - listener.unsubscribe(message.data); - break; - - default: - log('Unrecognized command "%s".', message.command || ''); - } + }, function(req, res) { + res.writeHead(501); + res.end('HTTP/501 Use Websockets\n'); }); - - ws.on('close', function() { - clients.removeListener(listener); - log('Disconnected.'); - }); - - ws.on('error', function(err) { - log('Error: %s', err.message); - }); -}); - -function transmit(msg) { - var listeners = clients.getListeners().filter(function(client) { - return client.isSubscribedToAny(msg.subscribers); - }); - - for (var i = 0; i < listeners.length; i++) { - var listener = listeners[i]; - - try { - listener.writeMessage(msg); - - ++messages_out; - debug.log('<%s> Wrote Message', listener.getDescription()); - } catch (error) { - clients.removeListener(listener); - debug.log('<%s> Write Error: %s', listener.getDescription(), error); - } - } +} else { + server = http.createServer(function() {}); } -http.createServer(function(request, response) { - // Publishing a notification. - if (request.url == '/') { - if (request.method == 'POST') { - var body = ''; - - request.on('data', function(data) { - body += data; - }); +var client_server = new JX.AphlictClientServer(server); +var admin_server = new JX.AphlictAdminServer(); - request.on('end', function() { - try { - var msg = JSON.parse(body); +client_server.setLogger(debug); +admin_server.setLogger(debug); +admin_server.setClientServer(client_server); - debug.log('Received notification: ' + JSON.stringify(msg)); - ++messages_in; - - try { - transmit(msg); - response.writeHead(200, {'Content-Type': 'text/plain'}); - } catch (err) { - debug.log( - '<%s> Internal Server Error! %s', - request.socket.remoteAddress, - err); - response.writeHead(500, 'Internal Server Error'); - } - } catch (err) { - debug.log( - '<%s> Bad Request! %s', - request.socket.remoteAddress, - err); - response.writeHead(400, 'Bad Request'); - } finally { - response.end(); - } - }); - } else { - response.writeHead(405, 'Method Not Allowed'); - response.end(); - } - } else if (request.url == '/status/') { - var status = { - 'uptime': (new Date().getTime() - start_time), - 'clients.active': clients.getActiveListenerCount(), - 'clients.total': clients.getTotalListenerCount(), - 'messages.in': messages_in, - 'messages.out': messages_out, - 'log': config.log, - 'version': 6 - }; - - response.writeHead(200, {'Content-Type': 'application/json'}); - response.write(JSON.stringify(status)); - response.end(); - } else { - response.writeHead(404, 'Not Found'); - response.end(); - } -}).listen(config['admin-port'], config['admin-host']); +client_server.listen(config['client-port'], config['client-host']); +admin_server.listen(config['admin-port'], config['admin-host']); debug.log('Started Server (PID %d)', process.pid); diff --git a/support/aphlict/server/lib/AphlictAdminServer.js b/support/aphlict/server/lib/AphlictAdminServer.js new file mode 100644 index 0000000000..a36841dacd --- /dev/null +++ b/support/aphlict/server/lib/AphlictAdminServer.js @@ -0,0 +1,135 @@ +var JX = require('javelin').JX; + +JX.require('AphlictListenerList', __dirname); + +var http = require('http'); + +JX.install('AphlictAdminServer', { + + construct: function() { + this.setLogger(new JX.AphlictLog()); + + this._startTime = new Date().getTime(); + this._messagesIn = 0; + this._messagesOut = 0; + + var handler = this._handler.bind(this); + this._server = http.createServer(handler); + }, + + members: { + _messagesIn: null, + _messagesOut: null, + _server: null, + _startTime: null, + + getListeners: function() { + return this.getListenerList().getListeners(); + }, + + getListenerList: function() { + return this.getClientServer().getListenerList(); + }, + + listen: function() { + return this._server.listen.apply(this._server, arguments); + }, + + _handler: function(request, response) { + var self = this; + + // Publishing a notification. + if (request.url == '/') { + if (request.method == 'POST') { + var body = ''; + + request.on('data', function(data) { + body += data; + }); + + request.on('end', function() { + try { + var msg = JSON.parse(body); + + self.getLogger().log( + 'Received notification: ' + JSON.stringify(msg)); + ++this._messagesIn; + + try { + self._transmit(msg); + response.writeHead(200, {'Content-Type': 'text/plain'}); + } catch (err) { + self.getLogger().log( + '<%s> Internal Server Error! %s', + request.socket.remoteAddress, + err); + response.writeHead(500, 'Internal Server Error'); + } + } catch (err) { + self.getLogger().log( + '<%s> Bad Request! %s', + request.socket.remoteAddress, + err); + response.writeHead(400, 'Bad Request'); + } finally { + response.end(); + } + }); + } else { + response.writeHead(405, 'Method Not Allowed'); + response.end(); + } + } else if (request.url == '/status/') { + var status = { + 'uptime': (new Date().getTime() - this._startTime), + 'clients.active': this.getListenerList().getActiveListenerCount(), + 'clients.total': this.getListenerList().getTotalListenerCount(), + 'messages.in': this._messagesIn, + 'messages.out': this._messagesOut, + 'version': 6 + }; + + response.writeHead(200, {'Content-Type': 'application/json'}); + response.write(JSON.stringify(status)); + response.end(); + } else { + response.writeHead(404, 'Not Found'); + response.end(); + } + }, + + /** + * Transmits a message to all subscribed listeners. + */ + _transmit: function(message) { + var listeners = this.getListeners().filter(function(client) { + return client.isSubscribedToAny(message.subscribers); + }); + + for (var i = 0; i < listeners.length; i++) { + var listener = listeners[i]; + + try { + listener.writeMessage(message); + + ++this._messagesOut; + this.getLogger().log( + '<%s> Wrote Message', + listener.getDescription()); + } catch (error) { + this.getListenerList().removeListener(listener); + this.getLogger().log( + '<%s> Write Error: %s', + listener.getDescription(), + error); + } + } + }, + }, + + properties: { + clientServer: null, + logger: null, + } + +}); diff --git a/support/aphlict/server/lib/AphlictClientServer.js b/support/aphlict/server/lib/AphlictClientServer.js new file mode 100644 index 0000000000..b4fcbd7353 --- /dev/null +++ b/support/aphlict/server/lib/AphlictClientServer.js @@ -0,0 +1,90 @@ +var JX = require('javelin').JX; + +JX.require('AphlictListenerList', __dirname); +JX.require('AphlictLog', __dirname); + +var util = require('util'); +var WebSocket = require('ws'); + +JX.install('AphlictClientServer', { + + construct: function(server) { + this.setListenerList(new JX.AphlictListenerList()); + this.setLogger(new JX.AphlictLog()); + this._server = server; + }, + + members: { + _server: null, + + listen: function() { + var self = this; + var server = this._server.listen.apply(this._server, arguments); + var wss = new WebSocket.Server({server: server}); + + wss.on('connection', function(ws) { + var listener = self.getListenerList().addListener(ws); + + function log() { + self.getLogger().log( + util.format('<%s>', listener.getDescription()) + + ' ' + + util.format.apply(null, arguments)); + } + + log('Connected from %s.', ws._socket.remoteAddress); + + ws.on('message', function(data) { + log('Received message: %s', data); + + var message; + try { + message = JSON.parse(data); + } catch (err) { + log('Message is invalid: %s', err.message); + return; + } + + switch (message.command) { + case 'subscribe': + log( + 'Subscribed to: %s', + JSON.stringify(message.data)); + listener.subscribe(message.data); + break; + + case 'unsubscribe': + log( + 'Unsubscribed from: %s', + JSON.stringify(message.data)); + listener.unsubscribe(message.data); + break; + + default: + log( + 'Unrecognized command "%s".', + message.command || ''); + } + }); + + wss.on('close', function() { + self.getListenerList().removeListener(listener); + log('Disconnected.'); + }); + + wss.on('error', function(err) { + log('Error: %s', err.message); + }); + + }); + + }, + + }, + + properties: { + listenerList: null, + logger: null, + } + +}); diff --git a/support/aphlict/server/lib/AphlictListener.js b/support/aphlict/server/lib/AphlictListener.js index e247557e4b..d330274520 100644 --- a/support/aphlict/server/lib/AphlictListener.js +++ b/support/aphlict/server/lib/AphlictListener.js @@ -1,57 +1,55 @@ var JX = require('javelin').JX; JX.install('AphlictListener', { construct: function(id, socket) { this._id = id; this._socket = socket; }, members: { _id: null, _socket: null, _subscriptions: {}, getID: function() { return this._id; }, subscribe: function(phids) { for (var i = 0; i < phids.length; i++) { var phid = phids[i]; this._subscriptions[phid] = true; } return this; }, unsubscribe: function(phids) { for (var i = 0; i < phids.length; i++) { var phid = phids[i]; delete this._subscriptions[phid]; } return this; }, isSubscribedToAny: function(phids) { var intersection = phids.filter(function(phid) { return phid in this._subscriptions; }, this); return intersection.length > 0; }, getSocket: function() { return this._socket; }, getDescription: function() { return 'Listener/' + this.getID(); }, writeMessage: function(message) { this._socket.send(JSON.stringify(message)); - } - - } - + }, + }, }); diff --git a/support/aphlict/server/lib/AphlictListenerList.js b/support/aphlict/server/lib/AphlictListenerList.js index 904b172e6f..23e36a6285 100644 --- a/support/aphlict/server/lib/AphlictListenerList.js +++ b/support/aphlict/server/lib/AphlictListenerList.js @@ -1,59 +1,57 @@ var JX = require('javelin').JX; JX.require('AphlictListener', __dirname); JX.install('AphlictListenerList', { construct: function() { this._listeners = {}; }, members: { _listeners: null, _nextID: 0, _totalListenerCount: 0, addListener: function(socket) { var listener = new JX.AphlictListener(this._generateNextID(), socket); this._listeners[listener.getID()] = listener; this._totalListenerCount++; return listener; }, removeListener: function(listener) { var id = listener.getID(); if (id in this._listeners) { delete this._listeners[id]; } }, getListeners: function() { var keys = Object.keys(this._listeners); var listeners = []; for (var i = 0; i < keys.length; i++) { listeners.push(this._listeners[keys[i]]); } return listeners; }, getActiveListenerCount: function() { - return Object.keys(this._listeners).length; + return this._listeners.length; }, getTotalListenerCount: function() { return this._totalListenerCount; }, _generateNextID: function() { do { - this._nextID = ((this._nextID + 1) % 1000000000000); + this._nextID = (this._nextID + 1) % 1000000000000; } while (this._nextID in this._listeners); return this._nextID; - } - - } - + }, + }, }); diff --git a/support/aphlict/server/lib/AphlictLog.js b/support/aphlict/server/lib/AphlictLog.js index 3c40979aa1..b9ee12459e 100644 --- a/support/aphlict/server/lib/AphlictLog.js +++ b/support/aphlict/server/lib/AphlictLog.js @@ -1,52 +1,45 @@ var JX = require('javelin').JX; var fs = require('fs'); var util = require('util'); JX.install('AphlictLog', { construct: function() { - this._writeToLogs = []; - this._writeToConsoles = []; + this._consoles = []; + this._logs = []; }, members: { - _writeToConsoles: null, - _writeToLogs: null, - - addLogfile: function(path) { - var options = { - flags: 'a', - encoding: 'utf8', - mode: 0664, - }; - - var logfile = fs.createWriteStream(path, options); - - this._writeToLogs.push(logfile); + _consoles: null, + _logs: null, + addConsole: function(console) { + this._consoles.push(console); return this; }, - addConsole: function(console) { - this._writeToConsoles.push(console); + addLog: function(path) { + this._logs.push(fs.createWriteStream(path, { + flags: 'a', + encoding: 'utf8', + mode: 0664, + })); return this; }, log: function() { var str = util.format.apply(null, arguments); var date = new Date().toLocaleString(); str = '[' + date + '] ' + str; var ii; - for (ii = 0; ii < this._writeToConsoles.length; ii++) { - this._writeToConsoles[ii].log(str); + for (ii = 0; ii < this._consoles.length; ii++) { + this._consoles[ii].log(str); } - for (ii = 0; ii < this._writeToLogs.length; ii++) { - this._writeToLogs[ii].write(str + '\n'); + for (ii = 0; ii < this._logs.length; ii++) { + this._logs[ii].write(str + '\n'); } - } - - } - + }, + }, });