diff --git a/support/aphlict/server/aphlict_server.js b/support/aphlict/server/aphlict_server.js --- a/support/aphlict/server/aphlict_server.js +++ b/support/aphlict/server/aphlict_server.js @@ -44,8 +44,7 @@ process.on('uncaughtException', function(err) { var context = null; - if ((err.code == 'EACCES') && - (err.path == config.log)) { + if (err.code == 'EACCES' && err.path == config.log) { context = util.format( 'Unable to open logfile ("%s"). Check that permissions are set ' + 'correctly.', @@ -65,9 +64,8 @@ process.exit(1); }); -var WebSocket; try { - WebSocket = require('ws'); + require('ws'); } catch (ex) { throw new Error( 'You need to install the Node.js "ws" module for websocket support. ' + @@ -88,7 +86,7 @@ // Add the logfile so we'll fail if we can't write to it. if (config.log) { - debug.addLogfile(config.log); + debug.addLog(config.log); } // If we're just doing a configuration test, exit here before starting any @@ -98,166 +96,30 @@ process.exit(0); } -var start_time = new Date().getTime(); -var messages_out = 0; -var messages_in = 0; +JX.require('lib/AphlictAdminServer', __dirname); +JX.require('lib/AphlictClientServer', __dirname); -var clients = new JX.AphlictListenerList(); - -function https_discard_handler(req, res) { - res.writeHead(501); - res.end('HTTP/501 Use Websockets\n'); -} - -var ws; +var server; if (ssl_config.enabled) { - var https_server = https.createServer({ + server = https.createServer({ key: ssl_config.key, cert: ssl_config.cert - }, https_discard_handler).listen( - config['client-port'], - config['client-host']); - - ws = new WebSocket.Server({server: https_server}); -} else { - ws = new WebSocket.Server({ - port: config['client-port'], - host: config['client-host'], - }); -} - -ws.on('connection', function(ws) { - var listener = clients.addListener(ws); - - function log() { - debug.log( - util.format('<%s>', listener.getDescription()) + - ' ' + - util.format.apply(null, arguments)); - } - - log('Connected from %s.', ws._socket.remoteAddress); - - ws.on('message', function(data) { - log('Received message: %s', data); - - var message; - try { - message = JSON.parse(data); - } catch (err) { - log('Message is invalid: %s', err.message); - return; - } - - switch (message.command) { - case 'subscribe': - log( - 'Subscribed to: %s', - JSON.stringify(message.data)); - listener.subscribe(message.data); - break; - - case 'unsubscribe': - log( - 'Unsubscribed from: %s', - JSON.stringify(message.data)); - listener.unsubscribe(message.data); - break; - - default: - log('Unrecognized command "%s".', message.command || ''); - } + }, function(req, res) { + res.writeHead(501); + res.end('HTTP/501 Use Websockets\n'); }); - - ws.on('close', function() { - clients.removeListener(listener); - log('Disconnected.'); - }); - - ws.on('error', function(err) { - log('Error: %s', err.message); - }); -}); - -function transmit(msg) { - var listeners = clients.getListeners().filter(function(client) { - return client.isSubscribedToAny(msg.subscribers); - }); - - for (var i = 0; i < listeners.length; i++) { - var listener = listeners[i]; - - try { - listener.writeMessage(msg); - - ++messages_out; - debug.log('<%s> Wrote Message', listener.getDescription()); - } catch (error) { - clients.removeListener(listener); - debug.log('<%s> Write Error: %s', listener.getDescription(), error); - } - } +} else { + server = http.createServer(function() {}); } -http.createServer(function(request, response) { - // Publishing a notification. - if (request.url == '/') { - if (request.method == 'POST') { - var body = ''; - - request.on('data', function(data) { - body += data; - }); +var client_server = new JX.AphlictClientServer(server); +var admin_server = new JX.AphlictAdminServer(); - request.on('end', function() { - try { - var msg = JSON.parse(body); +client_server.setLogger(debug); +admin_server.setLogger(debug); +admin_server.setClientServer(client_server); - debug.log('Received notification: ' + JSON.stringify(msg)); - ++messages_in; - - try { - transmit(msg); - response.writeHead(200, {'Content-Type': 'text/plain'}); - } catch (err) { - debug.log( - '<%s> Internal Server Error! %s', - request.socket.remoteAddress, - err); - response.writeHead(500, 'Internal Server Error'); - } - } catch (err) { - debug.log( - '<%s> Bad Request! %s', - request.socket.remoteAddress, - err); - response.writeHead(400, 'Bad Request'); - } finally { - response.end(); - } - }); - } else { - response.writeHead(405, 'Method Not Allowed'); - response.end(); - } - } else if (request.url == '/status/') { - var status = { - 'uptime': (new Date().getTime() - start_time), - 'clients.active': clients.getActiveListenerCount(), - 'clients.total': clients.getTotalListenerCount(), - 'messages.in': messages_in, - 'messages.out': messages_out, - 'log': config.log, - 'version': 6 - }; - - response.writeHead(200, {'Content-Type': 'application/json'}); - response.write(JSON.stringify(status)); - response.end(); - } else { - response.writeHead(404, 'Not Found'); - response.end(); - } -}).listen(config['admin-port'], config['admin-host']); +client_server.listen(config['client-port'], config['client-host']); +admin_server.listen(config['admin-port'], config['admin-host']); debug.log('Started Server (PID %d)', process.pid); diff --git a/support/aphlict/server/lib/AphlictAdminServer.js b/support/aphlict/server/lib/AphlictAdminServer.js new file mode 100644 --- /dev/null +++ b/support/aphlict/server/lib/AphlictAdminServer.js @@ -0,0 +1,135 @@ +var JX = require('javelin').JX; + +JX.require('AphlictListenerList', __dirname); + +var http = require('http'); + +JX.install('AphlictAdminServer', { + + construct: function() { + this.setLogger(new JX.AphlictLog()); + + this._startTime = new Date().getTime(); + this._messagesIn = 0; + this._messagesOut = 0; + + var handler = this._handler.bind(this); + this._server = http.createServer(handler); + }, + + members: { + _messagesIn: null, + _messagesOut: null, + _server: null, + _startTime: null, + + getListeners: function() { + return this.getListenerList().getListeners(); + }, + + getListenerList: function() { + return this.getClientServer().getListenerList(); + }, + + listen: function() { + return this._server.listen.apply(this._server, arguments); + }, + + _handler: function(request, response) { + var self = this; + + // Publishing a notification. + if (request.url == '/') { + if (request.method == 'POST') { + var body = ''; + + request.on('data', function(data) { + body += data; + }); + + request.on('end', function() { + try { + var msg = JSON.parse(body); + + self.getLogger().log( + 'Received notification: ' + JSON.stringify(msg)); + ++this._messagesIn; + + try { + self._transmit(msg); + response.writeHead(200, {'Content-Type': 'text/plain'}); + } catch (err) { + self.getLogger().log( + '<%s> Internal Server Error! %s', + request.socket.remoteAddress, + err); + response.writeHead(500, 'Internal Server Error'); + } + } catch (err) { + self.getLogger().log( + '<%s> Bad Request! %s', + request.socket.remoteAddress, + err); + response.writeHead(400, 'Bad Request'); + } finally { + response.end(); + } + }); + } else { + response.writeHead(405, 'Method Not Allowed'); + response.end(); + } + } else if (request.url == '/status/') { + var status = { + 'uptime': (new Date().getTime() - this._startTime), + 'clients.active': this.getListenerList().getActiveListenerCount(), + 'clients.total': this.getListenerList().getTotalListenerCount(), + 'messages.in': this._messagesIn, + 'messages.out': this._messagesOut, + 'version': 6 + }; + + response.writeHead(200, {'Content-Type': 'application/json'}); + response.write(JSON.stringify(status)); + response.end(); + } else { + response.writeHead(404, 'Not Found'); + response.end(); + } + }, + + /** + * Transmits a message to all subscribed listeners. + */ + _transmit: function(message) { + var listeners = this.getListeners().filter(function(client) { + return client.isSubscribedToAny(message.subscribers); + }); + + for (var i = 0; i < listeners.length; i++) { + var listener = listeners[i]; + + try { + listener.writeMessage(message); + + ++this._messagesOut; + this.getLogger().log( + '<%s> Wrote Message', + listener.getDescription()); + } catch (error) { + this.getListenerList().removeListener(listener); + this.getLogger().log( + '<%s> Write Error: %s', + listener.getDescription(), + error); + } + } + }, + }, + + properties: { + clientServer: null, + logger: null, + } + +}); diff --git a/support/aphlict/server/lib/AphlictClientServer.js b/support/aphlict/server/lib/AphlictClientServer.js new file mode 100644 --- /dev/null +++ b/support/aphlict/server/lib/AphlictClientServer.js @@ -0,0 +1,90 @@ +var JX = require('javelin').JX; + +JX.require('AphlictListenerList', __dirname); +JX.require('AphlictLog', __dirname); + +var util = require('util'); +var WebSocket = require('ws'); + +JX.install('AphlictClientServer', { + + construct: function(server) { + this.setListenerList(new JX.AphlictListenerList()); + this.setLogger(new JX.AphlictLog()); + this._server = server; + }, + + members: { + _server: null, + + listen: function() { + var self = this; + var server = this._server.listen.apply(this._server, arguments); + var wss = new WebSocket.Server({server: server}); + + wss.on('connection', function(ws) { + var listener = self.getListenerList().addListener(ws); + + function log() { + self.getLogger().log( + util.format('<%s>', listener.getDescription()) + + ' ' + + util.format.apply(null, arguments)); + } + + log('Connected from %s.', ws._socket.remoteAddress); + + ws.on('message', function(data) { + log('Received message: %s', data); + + var message; + try { + message = JSON.parse(data); + } catch (err) { + log('Message is invalid: %s', err.message); + return; + } + + switch (message.command) { + case 'subscribe': + log( + 'Subscribed to: %s', + JSON.stringify(message.data)); + listener.subscribe(message.data); + break; + + case 'unsubscribe': + log( + 'Unsubscribed from: %s', + JSON.stringify(message.data)); + listener.unsubscribe(message.data); + break; + + default: + log( + 'Unrecognized command "%s".', + message.command || ''); + } + }); + + wss.on('close', function() { + self.getListenerList().removeListener(listener); + log('Disconnected.'); + }); + + wss.on('error', function(err) { + log('Error: %s', err.message); + }); + + }); + + }, + + }, + + properties: { + listenerList: null, + logger: null, + } + +}); diff --git a/support/aphlict/server/lib/AphlictListener.js b/support/aphlict/server/lib/AphlictListener.js --- a/support/aphlict/server/lib/AphlictListener.js +++ b/support/aphlict/server/lib/AphlictListener.js @@ -50,8 +50,6 @@ writeMessage: function(message) { this._socket.send(JSON.stringify(message)); - } - - } - + }, + }, }); diff --git a/support/aphlict/server/lib/AphlictListenerList.js b/support/aphlict/server/lib/AphlictListenerList.js --- a/support/aphlict/server/lib/AphlictListenerList.js +++ b/support/aphlict/server/lib/AphlictListenerList.js @@ -39,7 +39,7 @@ }, getActiveListenerCount: function() { - return Object.keys(this._listeners).length; + return this._listeners.length; }, getTotalListenerCount: function() { @@ -48,12 +48,10 @@ _generateNextID: function() { do { - this._nextID = ((this._nextID + 1) % 1000000000000); + this._nextID = (this._nextID + 1) % 1000000000000; } while (this._nextID in this._listeners); return this._nextID; - } - - } - + }, + }, }); diff --git a/support/aphlict/server/lib/AphlictLog.js b/support/aphlict/server/lib/AphlictLog.js --- a/support/aphlict/server/lib/AphlictLog.js +++ b/support/aphlict/server/lib/AphlictLog.js @@ -5,30 +5,25 @@ JX.install('AphlictLog', { construct: function() { - this._writeToLogs = []; - this._writeToConsoles = []; + this._consoles = []; + this._logs = []; }, members: { - _writeToConsoles: null, - _writeToLogs: null, - - addLogfile: function(path) { - var options = { - flags: 'a', - encoding: 'utf8', - mode: 0664, - }; - - var logfile = fs.createWriteStream(path, options); - - this._writeToLogs.push(logfile); + _consoles: null, + _logs: null, + addConsole: function(console) { + this._consoles.push(console); return this; }, - addConsole: function(console) { - this._writeToConsoles.push(console); + addLog: function(path) { + this._logs.push(fs.createWriteStream(path, { + flags: 'a', + encoding: 'utf8', + mode: 0664, + })); return this; }, @@ -38,15 +33,13 @@ str = '[' + date + '] ' + str; var ii; - for (ii = 0; ii < this._writeToConsoles.length; ii++) { - this._writeToConsoles[ii].log(str); + for (ii = 0; ii < this._consoles.length; ii++) { + this._consoles[ii].log(str); } - for (ii = 0; ii < this._writeToLogs.length; ii++) { - this._writeToLogs[ii].write(str + '\n'); + for (ii = 0; ii < this._logs.length; ii++) { + this._logs[ii].write(str + '\n'); } - } - - } - + }, + }, });