Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F15489927
D11383.id27343.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
15 KB
Referenced Files
None
Subscribers
None
D11383.id27343.diff
View Options
diff --git a/support/aphlict/server/aphlict_server.js b/support/aphlict/server/aphlict_server.js
--- a/support/aphlict/server/aphlict_server.js
+++ b/support/aphlict/server/aphlict_server.js
@@ -7,6 +7,9 @@
JX.require('lib/AphlictListenerList', __dirname);
JX.require('lib/AphlictLog', __dirname);
+JX.require('lib/AphlictAdminServer', __dirname);
+JX.require('lib/AphlictClientServer', __dirname);
+
function parse_command_line_arguments(argv) {
var config = {
'client-port': 22280,
@@ -44,8 +47,7 @@
process.on('uncaughtException', function(err) {
var context = null;
- if ((err.code == 'EACCES') &&
- (err.path == config.log)) {
+ if (err.code == 'EACCES' && err.path == config.log) {
context = util.format(
'Unable to open logfile ("%s"). Check that permissions are set ' +
'correctly.',
@@ -65,16 +67,6 @@
process.exit(1);
});
-var WebSocket;
-try {
- WebSocket = require('ws');
-} catch (ex) {
- throw new Error(
- 'You need to install the Node.js "ws" module for websocket support. ' +
- 'See "Notifications User Guide: Setup and Configuration" in the ' +
- 'documentation for instructions. ' + ex.toString());
-}
-
var ssl_config = {
enabled: (config['ssl-key'] || config['ssl-cert'])
};
@@ -88,7 +80,7 @@
// Add the logfile so we'll fail if we can't write to it.
if (config.log) {
- debug.addLogfile(config.log);
+ debug.addLog(config.log);
}
// If we're just doing a configuration test, exit here before starting any
@@ -98,166 +90,27 @@
process.exit(0);
}
-var start_time = new Date().getTime();
-var messages_out = 0;
-var messages_in = 0;
-
-var clients = new JX.AphlictListenerList();
-
-function https_discard_handler(req, res) {
- res.writeHead(501);
- res.end('HTTP/501 Use Websockets\n');
-}
-
-var ws;
+var server;
if (ssl_config.enabled) {
- var https_server = https.createServer({
+ server = https.createServer({
key: ssl_config.key,
cert: ssl_config.cert
- }, https_discard_handler).listen(
- config['client-port'],
- config['client-host']);
-
- ws = new WebSocket.Server({server: https_server});
-} else {
- ws = new WebSocket.Server({
- port: config['client-port'],
- host: config['client-host'],
- });
-}
-
-ws.on('connection', function(ws) {
- var listener = clients.addListener(ws);
-
- function log() {
- debug.log(
- util.format('<%s>', listener.getDescription()) +
- ' ' +
- util.format.apply(null, arguments));
- }
-
- log('Connected from %s.', ws._socket.remoteAddress);
-
- ws.on('message', function(data) {
- log('Received message: %s', data);
-
- var message;
- try {
- message = JSON.parse(data);
- } catch (err) {
- log('Message is invalid: %s', err.message);
- return;
- }
-
- switch (message.command) {
- case 'subscribe':
- log(
- 'Subscribed to: %s',
- JSON.stringify(message.data));
- listener.subscribe(message.data);
- break;
-
- case 'unsubscribe':
- log(
- 'Unsubscribed from: %s',
- JSON.stringify(message.data));
- listener.unsubscribe(message.data);
- break;
-
- default:
- log('Unrecognized command "%s".', message.command || '<undefined>');
- }
- });
-
- ws.on('close', function() {
- clients.removeListener(listener);
- log('Disconnected.');
- });
-
- ws.on('error', function(err) {
- log('Error: %s', err.message);
+ }, function(req, res) {
+ res.writeHead(501);
+ res.end('HTTP/501 Use Websockets\n');
});
-});
-
-function transmit(msg) {
- var listeners = clients.getListeners().filter(function(client) {
- return client.isSubscribedToAny(msg.subscribers);
- });
-
- for (var i = 0; i < listeners.length; i++) {
- var listener = listeners[i];
-
- try {
- listener.writeMessage(msg);
-
- ++messages_out;
- debug.log('<%s> Wrote Message', listener.getDescription());
- } catch (error) {
- clients.removeListener(listener);
- debug.log('<%s> Write Error: %s', listener.getDescription(), error);
- }
- }
+} else {
+ server = http.createServer(function() {});
}
-http.createServer(function(request, response) {
- // Publishing a notification.
- if (request.url == '/') {
- if (request.method == 'POST') {
- var body = '';
+var client_server = new JX.AphlictClientServer(server);
+var admin_server = new JX.AphlictAdminServer();
- request.on('data', function(data) {
- body += data;
- });
+client_server.setLogger(debug);
+admin_server.setLogger(debug);
+admin_server.setListenerList(client_server.getListenerList());
- request.on('end', function() {
- try {
- var msg = JSON.parse(body);
-
- debug.log('Received notification: ' + JSON.stringify(msg));
- ++messages_in;
-
- try {
- transmit(msg);
- response.writeHead(200, {'Content-Type': 'text/plain'});
- } catch (err) {
- debug.log(
- '<%s> Internal Server Error! %s',
- request.socket.remoteAddress,
- err);
- response.writeHead(500, 'Internal Server Error');
- }
- } catch (err) {
- debug.log(
- '<%s> Bad Request! %s',
- request.socket.remoteAddress,
- err);
- response.writeHead(400, 'Bad Request');
- } finally {
- response.end();
- }
- });
- } else {
- response.writeHead(405, 'Method Not Allowed');
- response.end();
- }
- } else if (request.url == '/status/') {
- var status = {
- 'uptime': (new Date().getTime() - start_time),
- 'clients.active': clients.getActiveListenerCount(),
- 'clients.total': clients.getTotalListenerCount(),
- 'messages.in': messages_in,
- 'messages.out': messages_out,
- 'log': config.log,
- 'version': 6
- };
-
- response.writeHead(200, {'Content-Type': 'application/json'});
- response.write(JSON.stringify(status));
- response.end();
- } else {
- response.writeHead(404, 'Not Found');
- response.end();
- }
-}).listen(config['admin-port'], config['admin-host']);
+client_server.listen(config['client-port'], config['client-host']);
+admin_server.listen(config['admin-port'], config['admin-host']);
debug.log('Started Server (PID %d)', process.pid);
diff --git a/support/aphlict/server/lib/AphlictAdminServer.js b/support/aphlict/server/lib/AphlictAdminServer.js
new file mode 100644
--- /dev/null
+++ b/support/aphlict/server/lib/AphlictAdminServer.js
@@ -0,0 +1,130 @@
+var JX = require('javelin').JX;
+
+JX.require('AphlictListenerList', __dirname);
+
+var http = require('http');
+
+JX.install('AphlictAdminServer', {
+ construct: function() {
+ this._listeners = new JX.AphlictListenerList();
+ this._logger = new JX.AphlictLog();
+
+ this._startTime = new Date().getTime();
+ this._messagesIn = 0;
+ this._messagesOut = 0;
+
+ var handler = this._handler.bind(this);
+ this._server = http.createServer(handler);
+ },
+
+ members: {
+ _listeners: null,
+ _logger: null,
+ _messagesIn: null,
+ _messagesOut: null,
+ _server: null,
+ _startTime: null,
+
+ listen: function() {
+ return this._server.listen.apply(this._server, arguments);
+ },
+
+ setListenerList: function(listeners) {
+ this._listeners = listeners;
+ },
+
+ setLogger: function(logger) {
+ this._logger = logger;
+ },
+
+ _handler: function(request, response) {
+ var self = this;
+
+ // Publishing a notification.
+ if (request.url == '/') {
+ if (request.method == 'POST') {
+ var body = '';
+
+ request.on('data', function(data) {
+ body += data;
+ });
+
+ request.on('end', function() {
+ try {
+ var msg = JSON.parse(body);
+
+ self._logger.log('Received notification: ' + JSON.stringify(msg));
+ ++this._messagesIn;
+
+ try {
+ self._transmit(msg);
+ response.writeHead(200, {'Content-Type': 'text/plain'});
+ } catch (err) {
+ self._logger.log(
+ '<%s> Internal Server Error! %s',
+ request.socket.remoteAddress,
+ err);
+ response.writeHead(500, 'Internal Server Error');
+ }
+ } catch (err) {
+ self._logger.log(
+ '<%s> Bad Request! %s',
+ request.socket.remoteAddress,
+ err);
+ response.writeHead(400, 'Bad Request');
+ } finally {
+ response.end();
+ }
+ });
+ } else {
+ response.writeHead(405, 'Method Not Allowed');
+ response.end();
+ }
+ } else if (request.url == '/status/') {
+ var status = {
+ 'uptime': (new Date().getTime() - this._startTime),
+ 'clients.active': this._listeners.getActiveListenerCount(),
+ 'clients.total': this._listeners.getTotalListenerCount(),
+ 'messages.in': this._messagesIn,
+ 'messages.out': this._messagesOut,
+ 'version': 6
+ };
+
+ response.writeHead(200, {'Content-Type': 'application/json'});
+ response.write(JSON.stringify(status));
+ response.end();
+ } else {
+ response.writeHead(404, 'Not Found');
+ response.end();
+ }
+ },
+
+ /**
+ * Transmits a message to all subscribed listeners.
+ */
+ _transmit: function(message) {
+ var listeners = this._listeners.getListeners().filter(function(client) {
+ return client.isSubscribedToAny(message.subscribers);
+ });
+
+ for (var i = 0; i < listeners.length; i++) {
+ var listener = listeners[i];
+
+ try {
+ listener.writeMessage(message);
+
+ ++this._messagesOut;
+ this._logger.log(
+ '<%s> Wrote Message',
+ listener.getDescription());
+ } catch (error) {
+ this._listeners.removeListener(listener);
+ this._logger.log(
+ '<%s> Write Error: %s',
+ listener.getDescription(),
+ error);
+ }
+ }
+ },
+ },
+});
diff --git a/support/aphlict/server/lib/AphlictClientServer.js b/support/aphlict/server/lib/AphlictClientServer.js
new file mode 100644
--- /dev/null
+++ b/support/aphlict/server/lib/AphlictClientServer.js
@@ -0,0 +1,90 @@
+var JX = require('javelin').JX;
+
+JX.require('AphlictListenerList', __dirname);
+JX.require('AphlictLog', __dirname);
+
+var util = require('util');
+var WebSocket = require('ws');
+
+JX.install('AphlictClientServer', {
+ construct: function(server) {
+ this._listeners = new JX.AphlictListenerList();
+ this._logger = new JX.AphlictLog();
+ this._server = server;
+ },
+
+ members: {
+ _listeners: null,
+ _logger: null,
+ _server: null,
+
+ getListenerList: function() {
+ return this._listeners;
+ },
+
+ listen: function() {
+ var self = this;
+ var server = this._server.listen.apply(this._server, arguments);
+ var wss = new WebSocket.Server({server: server});
+
+ wss.on('connection', function(ws) {
+ var listener = self._listeners.addListener(ws);
+
+ function log() {
+ self._logger.log(
+ util.format('<%s>', listener.getDescription()) +
+ ' ' +
+ util.format.apply(null, arguments));
+ }
+
+ log('Connected from %s.', ws._socket.remoteAddress);
+
+ ws.on('message', function(data) {
+ log('Received message: %s', data);
+
+ var message;
+ try {
+ message = JSON.parse(data);
+ } catch (err) {
+ log('Message is invalid: %s', err.message);
+ return;
+ }
+
+ switch (message.command) {
+ case 'subscribe':
+ log(
+ 'Subscribed to: %s',
+ JSON.stringify(message.data));
+ listener.subscribe(message.data);
+ break;
+
+ case 'unsubscribe':
+ log(
+ 'Unsubscribed from: %s',
+ JSON.stringify(message.data));
+ listener.unsubscribe(message.data);
+ break;
+
+ default:
+ log(
+ 'Unrecognized command "%s".',
+ message.command || '<undefined>');
+ }
+ });
+
+ wss.on('close', function() {
+ self._listeners.removeListener(listener);
+ log('Disconnected.');
+ });
+
+ wss.on('error', function(err) {
+ log('Error: %s', err.message);
+ });
+ });
+ },
+
+ setLogger: function(logger) {
+ this._logger = logger;
+ },
+ },
+});
diff --git a/support/aphlict/server/lib/AphlictListener.js b/support/aphlict/server/lib/AphlictListener.js
--- a/support/aphlict/server/lib/AphlictListener.js
+++ b/support/aphlict/server/lib/AphlictListener.js
@@ -50,8 +50,6 @@
writeMessage: function(message) {
this._socket.send(JSON.stringify(message));
- }
-
- }
-
+ },
+ },
});
diff --git a/support/aphlict/server/lib/AphlictListenerList.js b/support/aphlict/server/lib/AphlictListenerList.js
--- a/support/aphlict/server/lib/AphlictListenerList.js
+++ b/support/aphlict/server/lib/AphlictListenerList.js
@@ -39,7 +39,7 @@
},
getActiveListenerCount: function() {
- return Object.keys(this._listeners).length;
+ return this._listeners.length;
},
getTotalListenerCount: function() {
@@ -48,12 +48,10 @@
_generateNextID: function() {
do {
- this._nextID = ((this._nextID + 1) % 1000000000000);
+ this._nextID = (this._nextID + 1) % 1000000000000;
} while (this._nextID in this._listeners);
return this._nextID;
- }
-
- }
-
+ },
+ },
});
diff --git a/support/aphlict/server/lib/AphlictLog.js b/support/aphlict/server/lib/AphlictLog.js
--- a/support/aphlict/server/lib/AphlictLog.js
+++ b/support/aphlict/server/lib/AphlictLog.js
@@ -5,30 +5,25 @@
JX.install('AphlictLog', {
construct: function() {
- this._writeToLogs = [];
- this._writeToConsoles = [];
+ this._consoles = [];
+ this._logs = [];
},
members: {
- _writeToConsoles: null,
- _writeToLogs: null,
-
- addLogfile: function(path) {
- var options = {
- flags: 'a',
- encoding: 'utf8',
- mode: 0664,
- };
-
- var logfile = fs.createWriteStream(path, options);
-
- this._writeToLogs.push(logfile);
+ _consoles: null,
+ _logs: null,
+ addConsole: function(console) {
+ this._consoles.push(console);
return this;
},
- addConsole: function(console) {
- this._writeToConsoles.push(console);
+ addLog: function(path) {
+ this._logs.push(fs.createWriteStream(path, {
+ flags: 'a',
+ encoding: 'utf8',
+ mode: 0664,
+ }));
return this;
},
@@ -38,15 +33,13 @@
str = '[' + date + '] ' + str;
var ii;
- for (ii = 0; ii < this._writeToConsoles.length; ii++) {
- this._writeToConsoles[ii].log(str);
+ for (ii = 0; ii < this._consoles.length; ii++) {
+ this._consoles[ii].log(str);
}
- for (ii = 0; ii < this._writeToLogs.length; ii++) {
- this._writeToLogs[ii].write(str + '\n');
+ for (ii = 0; ii < this._logs.length; ii++) {
+ this._logs[ii].write(str + '\n');
}
- }
-
- }
-
+ },
+ },
});
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Apr 12, 1:25 PM (2 w, 9 h ago)
Storage Engine
blob
Storage Format
Encrypted (AES-256-CBC)
Storage Handle
7713447
Default Alt Text
D11383.id27343.diff (15 KB)
Attached To
Mode
D11383: Refactoring of the Aphlict server
Attached
Detach File
Event Timeline
Log In to Comment