diff --git a/support/aphlict/server/aphlict_server.js b/support/aphlict/server/aphlict_server.js --- a/support/aphlict/server/aphlict_server.js +++ b/support/aphlict/server/aphlict_server.js @@ -99,8 +99,29 @@ if (ssl_config.enabled) { ssl_config.key = fs.readFileSync(config['ssl-key']); ssl_config.cert = fs.readFileSync(config['ssl-cert']); +} else { + ssl_config.key = null; + ssl_config.cert = null; } +var servers = []; + +servers.push({ + type: 'client', + port: config['client-port'], + listen: config['client-host'], + 'ssl.key': ssl_config.key, + 'ssl.certificate': ssl_config.cert +}); + +servers.push({ + type: 'admin', + port: config['admin-port'], + listen: config['admin-host'], + 'ssl.key': null, + 'ssl.cert': null +}); + // If we're just doing a configuration test, exit here before starting any // servers. if (config.test) { @@ -109,27 +130,49 @@ return; } -var server; -if (ssl_config.enabled) { - server = https.createServer({ - key: ssl_config.key, - cert: ssl_config.cert - }, function(req, res) { - res.writeHead(501); - res.end('HTTP/501 Use Websockets\n'); - }); -} else { - server = http.createServer(function() {}); -} +var aphlict_servers = []; +var aphlict_clients = []; +var aphlict_admins = []; + +var ii; +for (ii = 0; ii < servers.length; ii++) { + var server = servers[ii]; + var is_client = (server.type == 'client'); + + var http_server; + if (server['ssl.key']) { + var https_config = { + key: server['ssl.key'], + cert: server['ssl.cert'] + }; + + http_server = https.createServer(https_config); + } else { + http_server = http.createServer(); + } + + var aphlict_server; + if (is_client) { + aphlict_server = new JX.AphlictClientServer(http_server); + } else { + aphlict_server = new JX.AphlictAdminServer(http_server); + } -var client_server = new JX.AphlictClientServer(server); -var admin_server = new JX.AphlictAdminServer(); + aphlict_server.setLogger(debug); + aphlict_server.listen(server.port, server.listen); -client_server.setLogger(debug); -admin_server.setLogger(debug); -admin_server.setClientServer(client_server); + aphlict_servers.push(aphlict_server); -client_server.listen(config['client-port'], config['client-host']); -admin_server.listen(config['admin-port'], config['admin-host']); + if (is_client) { + aphlict_clients.push(aphlict_server); + } else { + aphlict_admins.push(aphlict_server); + } +} + +for (ii = 0; ii < aphlict_admins.length; ii++) { + var admin_server = aphlict_admins[ii]; + admin_server.setClientServers(aphlict_clients); +} debug.log('Started Server (PID %d)', process.pid); diff --git a/support/aphlict/server/lib/AphlictAdminServer.js b/support/aphlict/server/lib/AphlictAdminServer.js --- a/support/aphlict/server/lib/AphlictAdminServer.js +++ b/support/aphlict/server/lib/AphlictAdminServer.js @@ -9,15 +9,19 @@ JX.install('AphlictAdminServer', { - construct: function() { - this.setLogger(new JX.AphlictLog()); - + construct: function(server) { this._startTime = new Date().getTime(); this._messagesIn = 0; this._messagesOut = 0; - var handler = this._handler.bind(this); - this._server = http.createServer(handler); + server.on('request', JX.bind(this, this._onrequest)); + this._server = server; + this._clientServers = []; + }, + + properties: { + clientServers: null, + logger: null, }, members: { @@ -26,15 +30,32 @@ _server: null, _startTime: null, - getListenerList: function(instance) { - return this.getClientServer().getListenerList(instance); + getListenerLists: function(instance) { + var clients = this.getClientServers(); + + var lists = []; + for (var ii = 0; ii < clients.length; ii++) { + lists.push(clients[ii].getListenerList(instance)); + } + return lists; + }, + + log: function() { + var logger = this.getLogger(); + if (!logger) { + return; + } + + logger.log.apply(logger, arguments); + + return this; }, listen: function() { return this._server.listen.apply(this._server, arguments); }, - _handler: function(request, response) { + _onrequest: function(request, response) { var self = this; var u = url.parse(request.url, true); var instance = u.query.instance || '/'; @@ -52,7 +73,7 @@ try { var msg = JSON.parse(body); - self.getLogger().log( + self.log( 'Received notification (' + instance + '): ' + JSON.stringify(msg)); ++self._messagesIn; @@ -61,14 +82,14 @@ self._transmit(instance, msg); response.writeHead(200, {'Content-Type': 'text/plain'}); } catch (err) { - self.getLogger().log( + self.log( '<%s> Internal Server Error! %s', request.socket.remoteAddress, err); response.writeHead(500, 'Internal Server Error'); } } catch (err) { - self.getLogger().log( + self.log( '<%s> Bad Request! %s', request.socket.remoteAddress, err); @@ -82,61 +103,77 @@ response.end(); } } else if (u.pathname == '/status/') { - var status = { - 'instance': instance, - 'uptime': (new Date().getTime() - this._startTime), - 'clients.active': this.getListenerList(instance) - .getActiveListenerCount(), - 'clients.total': this.getListenerList(instance) - .getTotalListenerCount(), - 'messages.in': this._messagesIn, - 'messages.out': this._messagesOut, - 'version': 7 - }; - - response.writeHead(200, {'Content-Type': 'application/json'}); - response.write(JSON.stringify(status)); - response.end(); + this._handleStatusRequest(request, response, instance); } else { response.writeHead(404, 'Not Found'); response.end(); } }, + _handleStatusRequest: function(request, response, instance) { + var active_count = 0; + var total_count = 0; + + var lists = this.getListenerLists(instance); + for (var ii = 0; ii < lists.length; ii++) { + var list = lists[ii]; + active_count += list.getActiveListenerCount(); + total_count += list.getTotalListenerCount(); + } + + var server_status = { + 'instance': instance, + 'uptime': (new Date().getTime() - this._startTime), + 'clients.active': active_count, + 'clients.total': total_count, + 'messages.in': this._messagesIn, + 'messages.out': this._messagesOut, + 'version': 7 + }; + + response.writeHead(200, {'Content-Type': 'application/json'}); + response.write(JSON.stringify(server_status)); + response.end(); + }, + /** * Transmits a message to all subscribed listeners. */ _transmit: function(instance, message) { - var listeners = this.getListenerList(instance) - .getListeners() - .filter(function(client) { - return client.isSubscribedToAny(message.subscribers); - }); + var lists = this.getListenerLists(instance); + + for (var ii = 0; ii < lists.length; ii++) { + var list = lists[ii]; + var listeners = list.getListeners(); + this._transmitToListeners(list, listeners, message); + } + }, + + _transmitToListeners: function(list, listeners, message) { + for (var ii = 0; ii < listeners.length; ii++) { + var listener = listeners[ii]; - for (var i = 0; i < listeners.length; i++) { - var listener = listeners[i]; + if (!listener.isSubscribedToAny(message.subscribers)) { + continue; + } try { listener.writeMessage(message); ++this._messagesOut; - this.getLogger().log( + this.log( '<%s> Wrote Message', listener.getDescription()); } catch (error) { - this.getListenerList(instance).removeListener(listener); - this.getLogger().log( + list.removeListener(listener); + + this.log( '<%s> Write Error: %s', listener.getDescription(), error); } } - }, - }, - - properties: { - clientServer: null, - logger: null, + } } }); diff --git a/support/aphlict/server/lib/AphlictClientServer.js b/support/aphlict/server/lib/AphlictClientServer.js --- a/support/aphlict/server/lib/AphlictClientServer.js +++ b/support/aphlict/server/lib/AphlictClientServer.js @@ -12,11 +12,16 @@ JX.install('AphlictClientServer', { construct: function(server) { - this.setLogger(new JX.AphlictLog()); + server.on('request', JX.bind(this, this._onrequest)); + this._server = server; this._lists = {}; }, + properties: { + logger: null, + }, + members: { _server: null, _lists: null, @@ -28,6 +33,25 @@ return this._lists[path]; }, + log: function() { + var logger = this.getLogger(); + if (!logger) { + return; + } + + logger.log.apply(logger, arguments); + + return this; + }, + + _onrequest: function(request, response) { + // The websocket code upgrades connections before they get here, so + // this only handles normal HTTP connections. We just fail them with + // a 501 response. + response.writeHead(501); + response.end('HTTP/501 Use Websockets\n'); + }, + listen: function() { var self = this; var server = this._server.listen.apply(this._server, arguments); @@ -38,7 +62,7 @@ var listener = self.getListenerList(path).addListener(ws); function log() { - self.getLogger().log( + self.log( util.format('<%s>', listener.getDescription()) + ' ' + util.format.apply(null, arguments)); @@ -97,10 +121,6 @@ }, - }, - - properties: { - logger: null, } });