Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F15468193
D11383.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
14 KB
Referenced Files
None
Subscribers
None
D11383.diff
View Options
diff --git a/support/aphlict/server/aphlict_server.js b/support/aphlict/server/aphlict_server.js
--- a/support/aphlict/server/aphlict_server.js
+++ b/support/aphlict/server/aphlict_server.js
@@ -44,8 +44,7 @@
process.on('uncaughtException', function(err) {
var context = null;
- if ((err.code == 'EACCES') &&
- (err.path == config.log)) {
+ if (err.code == 'EACCES' && err.path == config.log) {
context = util.format(
'Unable to open logfile ("%s"). Check that permissions are set ' +
'correctly.',
@@ -65,9 +64,8 @@
process.exit(1);
});
-var WebSocket;
try {
- WebSocket = require('ws');
+ require('ws');
} catch (ex) {
throw new Error(
'You need to install the Node.js "ws" module for websocket support. ' +
@@ -88,7 +86,7 @@
// Add the logfile so we'll fail if we can't write to it.
if (config.log) {
- debug.addLogfile(config.log);
+ debug.addLog(config.log);
}
// If we're just doing a configuration test, exit here before starting any
@@ -98,166 +96,30 @@
process.exit(0);
}
-var start_time = new Date().getTime();
-var messages_out = 0;
-var messages_in = 0;
+JX.require('lib/AphlictAdminServer', __dirname);
+JX.require('lib/AphlictClientServer', __dirname);
-var clients = new JX.AphlictListenerList();
-
-function https_discard_handler(req, res) {
- res.writeHead(501);
- res.end('HTTP/501 Use Websockets\n');
-}
-
-var ws;
+var server;
if (ssl_config.enabled) {
- var https_server = https.createServer({
+ server = https.createServer({
key: ssl_config.key,
cert: ssl_config.cert
- }, https_discard_handler).listen(
- config['client-port'],
- config['client-host']);
-
- ws = new WebSocket.Server({server: https_server});
-} else {
- ws = new WebSocket.Server({
- port: config['client-port'],
- host: config['client-host'],
- });
-}
-
-ws.on('connection', function(ws) {
- var listener = clients.addListener(ws);
-
- function log() {
- debug.log(
- util.format('<%s>', listener.getDescription()) +
- ' ' +
- util.format.apply(null, arguments));
- }
-
- log('Connected from %s.', ws._socket.remoteAddress);
-
- ws.on('message', function(data) {
- log('Received message: %s', data);
-
- var message;
- try {
- message = JSON.parse(data);
- } catch (err) {
- log('Message is invalid: %s', err.message);
- return;
- }
-
- switch (message.command) {
- case 'subscribe':
- log(
- 'Subscribed to: %s',
- JSON.stringify(message.data));
- listener.subscribe(message.data);
- break;
-
- case 'unsubscribe':
- log(
- 'Unsubscribed from: %s',
- JSON.stringify(message.data));
- listener.unsubscribe(message.data);
- break;
-
- default:
- log('Unrecognized command "%s".', message.command || '<undefined>');
- }
+ }, function(req, res) {
+ res.writeHead(501);
+ res.end('HTTP/501 Use Websockets\n');
});
-
- ws.on('close', function() {
- clients.removeListener(listener);
- log('Disconnected.');
- });
-
- ws.on('error', function(err) {
- log('Error: %s', err.message);
- });
-});
-
-function transmit(msg) {
- var listeners = clients.getListeners().filter(function(client) {
- return client.isSubscribedToAny(msg.subscribers);
- });
-
- for (var i = 0; i < listeners.length; i++) {
- var listener = listeners[i];
-
- try {
- listener.writeMessage(msg);
-
- ++messages_out;
- debug.log('<%s> Wrote Message', listener.getDescription());
- } catch (error) {
- clients.removeListener(listener);
- debug.log('<%s> Write Error: %s', listener.getDescription(), error);
- }
- }
+} else {
+ server = http.createServer(function() {});
}
-http.createServer(function(request, response) {
- // Publishing a notification.
- if (request.url == '/') {
- if (request.method == 'POST') {
- var body = '';
-
- request.on('data', function(data) {
- body += data;
- });
+var client_server = new JX.AphlictClientServer(server);
+var admin_server = new JX.AphlictAdminServer();
- request.on('end', function() {
- try {
- var msg = JSON.parse(body);
+client_server.setLogger(debug);
+admin_server.setLogger(debug);
+admin_server.setClientServer(client_server);
- debug.log('Received notification: ' + JSON.stringify(msg));
- ++messages_in;
-
- try {
- transmit(msg);
- response.writeHead(200, {'Content-Type': 'text/plain'});
- } catch (err) {
- debug.log(
- '<%s> Internal Server Error! %s',
- request.socket.remoteAddress,
- err);
- response.writeHead(500, 'Internal Server Error');
- }
- } catch (err) {
- debug.log(
- '<%s> Bad Request! %s',
- request.socket.remoteAddress,
- err);
- response.writeHead(400, 'Bad Request');
- } finally {
- response.end();
- }
- });
- } else {
- response.writeHead(405, 'Method Not Allowed');
- response.end();
- }
- } else if (request.url == '/status/') {
- var status = {
- 'uptime': (new Date().getTime() - start_time),
- 'clients.active': clients.getActiveListenerCount(),
- 'clients.total': clients.getTotalListenerCount(),
- 'messages.in': messages_in,
- 'messages.out': messages_out,
- 'log': config.log,
- 'version': 6
- };
-
- response.writeHead(200, {'Content-Type': 'application/json'});
- response.write(JSON.stringify(status));
- response.end();
- } else {
- response.writeHead(404, 'Not Found');
- response.end();
- }
-}).listen(config['admin-port'], config['admin-host']);
+client_server.listen(config['client-port'], config['client-host']);
+admin_server.listen(config['admin-port'], config['admin-host']);
debug.log('Started Server (PID %d)', process.pid);
diff --git a/support/aphlict/server/lib/AphlictAdminServer.js b/support/aphlict/server/lib/AphlictAdminServer.js
new file mode 100644
--- /dev/null
+++ b/support/aphlict/server/lib/AphlictAdminServer.js
@@ -0,0 +1,135 @@
+var JX = require('javelin').JX;
+
+JX.require('AphlictListenerList', __dirname);
+
+var http = require('http');
+
+JX.install('AphlictAdminServer', {
+
+ construct: function() {
+ this.setLogger(new JX.AphlictLog());
+
+ this._startTime = new Date().getTime();
+ this._messagesIn = 0;
+ this._messagesOut = 0;
+
+ var handler = this._handler.bind(this);
+ this._server = http.createServer(handler);
+ },
+
+ members: {
+ _messagesIn: null,
+ _messagesOut: null,
+ _server: null,
+ _startTime: null,
+
+ getListeners: function() {
+ return this.getListenerList().getListeners();
+ },
+
+ getListenerList: function() {
+ return this.getClientServer().getListenerList();
+ },
+
+ listen: function() {
+ return this._server.listen.apply(this._server, arguments);
+ },
+
+ _handler: function(request, response) {
+ var self = this;
+
+ // Publishing a notification.
+ if (request.url == '/') {
+ if (request.method == 'POST') {
+ var body = '';
+
+ request.on('data', function(data) {
+ body += data;
+ });
+
+ request.on('end', function() {
+ try {
+ var msg = JSON.parse(body);
+
+ self.getLogger().log(
+ 'Received notification: ' + JSON.stringify(msg));
+ ++this._messagesIn;
+
+ try {
+ self._transmit(msg);
+ response.writeHead(200, {'Content-Type': 'text/plain'});
+ } catch (err) {
+ self.getLogger().log(
+ '<%s> Internal Server Error! %s',
+ request.socket.remoteAddress,
+ err);
+ response.writeHead(500, 'Internal Server Error');
+ }
+ } catch (err) {
+ self.getLogger().log(
+ '<%s> Bad Request! %s',
+ request.socket.remoteAddress,
+ err);
+ response.writeHead(400, 'Bad Request');
+ } finally {
+ response.end();
+ }
+ });
+ } else {
+ response.writeHead(405, 'Method Not Allowed');
+ response.end();
+ }
+ } else if (request.url == '/status/') {
+ var status = {
+ 'uptime': (new Date().getTime() - this._startTime),
+ 'clients.active': this.getListenerList().getActiveListenerCount(),
+ 'clients.total': this.getListenerList().getTotalListenerCount(),
+ 'messages.in': this._messagesIn,
+ 'messages.out': this._messagesOut,
+ 'version': 6
+ };
+
+ response.writeHead(200, {'Content-Type': 'application/json'});
+ response.write(JSON.stringify(status));
+ response.end();
+ } else {
+ response.writeHead(404, 'Not Found');
+ response.end();
+ }
+ },
+
+ /**
+ * Transmits a message to all subscribed listeners.
+ */
+ _transmit: function(message) {
+ var listeners = this.getListeners().filter(function(client) {
+ return client.isSubscribedToAny(message.subscribers);
+ });
+
+ for (var i = 0; i < listeners.length; i++) {
+ var listener = listeners[i];
+
+ try {
+ listener.writeMessage(message);
+
+ ++this._messagesOut;
+ this.getLogger().log(
+ '<%s> Wrote Message',
+ listener.getDescription());
+ } catch (error) {
+ this.getListenerList().removeListener(listener);
+ this.getLogger().log(
+ '<%s> Write Error: %s',
+ listener.getDescription(),
+ error);
+ }
+ }
+ },
+ },
+
+ properties: {
+ clientServer: null,
+ logger: null,
+ }
+
+});
diff --git a/support/aphlict/server/lib/AphlictClientServer.js b/support/aphlict/server/lib/AphlictClientServer.js
new file mode 100644
--- /dev/null
+++ b/support/aphlict/server/lib/AphlictClientServer.js
@@ -0,0 +1,90 @@
+var JX = require('javelin').JX;
+
+JX.require('AphlictListenerList', __dirname);
+JX.require('AphlictLog', __dirname);
+
+var util = require('util');
+var WebSocket = require('ws');
+
+JX.install('AphlictClientServer', {
+
+ construct: function(server) {
+ this.setListenerList(new JX.AphlictListenerList());
+ this.setLogger(new JX.AphlictLog());
+ this._server = server;
+ },
+
+ members: {
+ _server: null,
+
+ listen: function() {
+ var self = this;
+ var server = this._server.listen.apply(this._server, arguments);
+ var wss = new WebSocket.Server({server: server});
+
+ wss.on('connection', function(ws) {
+ var listener = self.getListenerList().addListener(ws);
+
+ function log() {
+ self.getLogger().log(
+ util.format('<%s>', listener.getDescription()) +
+ ' ' +
+ util.format.apply(null, arguments));
+ }
+
+ log('Connected from %s.', ws._socket.remoteAddress);
+
+ ws.on('message', function(data) {
+ log('Received message: %s', data);
+
+ var message;
+ try {
+ message = JSON.parse(data);
+ } catch (err) {
+ log('Message is invalid: %s', err.message);
+ return;
+ }
+
+ switch (message.command) {
+ case 'subscribe':
+ log(
+ 'Subscribed to: %s',
+ JSON.stringify(message.data));
+ listener.subscribe(message.data);
+ break;
+
+ case 'unsubscribe':
+ log(
+ 'Unsubscribed from: %s',
+ JSON.stringify(message.data));
+ listener.unsubscribe(message.data);
+ break;
+
+ default:
+ log(
+ 'Unrecognized command "%s".',
+ message.command || '<undefined>');
+ }
+ });
+
+ wss.on('close', function() {
+ self.getListenerList().removeListener(listener);
+ log('Disconnected.');
+ });
+
+ wss.on('error', function(err) {
+ log('Error: %s', err.message);
+ });
+
+ });
+
+ },
+
+ },
+
+ properties: {
+ listenerList: null,
+ logger: null,
+ }
+
+});
diff --git a/support/aphlict/server/lib/AphlictListener.js b/support/aphlict/server/lib/AphlictListener.js
--- a/support/aphlict/server/lib/AphlictListener.js
+++ b/support/aphlict/server/lib/AphlictListener.js
@@ -50,8 +50,6 @@
writeMessage: function(message) {
this._socket.send(JSON.stringify(message));
- }
-
- }
-
+ },
+ },
});
diff --git a/support/aphlict/server/lib/AphlictListenerList.js b/support/aphlict/server/lib/AphlictListenerList.js
--- a/support/aphlict/server/lib/AphlictListenerList.js
+++ b/support/aphlict/server/lib/AphlictListenerList.js
@@ -39,7 +39,7 @@
},
getActiveListenerCount: function() {
- return Object.keys(this._listeners).length;
+ return this._listeners.length;
},
getTotalListenerCount: function() {
@@ -48,12 +48,10 @@
_generateNextID: function() {
do {
- this._nextID = ((this._nextID + 1) % 1000000000000);
+ this._nextID = (this._nextID + 1) % 1000000000000;
} while (this._nextID in this._listeners);
return this._nextID;
- }
-
- }
-
+ },
+ },
});
diff --git a/support/aphlict/server/lib/AphlictLog.js b/support/aphlict/server/lib/AphlictLog.js
--- a/support/aphlict/server/lib/AphlictLog.js
+++ b/support/aphlict/server/lib/AphlictLog.js
@@ -5,30 +5,25 @@
JX.install('AphlictLog', {
construct: function() {
- this._writeToLogs = [];
- this._writeToConsoles = [];
+ this._consoles = [];
+ this._logs = [];
},
members: {
- _writeToConsoles: null,
- _writeToLogs: null,
-
- addLogfile: function(path) {
- var options = {
- flags: 'a',
- encoding: 'utf8',
- mode: 0664,
- };
-
- var logfile = fs.createWriteStream(path, options);
-
- this._writeToLogs.push(logfile);
+ _consoles: null,
+ _logs: null,
+ addConsole: function(console) {
+ this._consoles.push(console);
return this;
},
- addConsole: function(console) {
- this._writeToConsoles.push(console);
+ addLog: function(path) {
+ this._logs.push(fs.createWriteStream(path, {
+ flags: 'a',
+ encoding: 'utf8',
+ mode: 0664,
+ }));
return this;
},
@@ -38,15 +33,13 @@
str = '[' + date + '] ' + str;
var ii;
- for (ii = 0; ii < this._writeToConsoles.length; ii++) {
- this._writeToConsoles[ii].log(str);
+ for (ii = 0; ii < this._consoles.length; ii++) {
+ this._consoles[ii].log(str);
}
- for (ii = 0; ii < this._writeToLogs.length; ii++) {
- this._writeToLogs[ii].write(str + '\n');
+ for (ii = 0; ii < this._logs.length; ii++) {
+ this._logs[ii].write(str + '\n');
}
- }
-
- }
-
+ },
+ },
});
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Fri, Apr 4, 9:33 PM (3 w, 1 d ago)
Storage Engine
blob
Storage Format
Encrypted (AES-256-CBC)
Storage Handle
7383160
Default Alt Text
D11383.diff (14 KB)
Attached To
Mode
D11383: Refactoring of the Aphlict server
Attached
Detach File
Event Timeline
Log In to Comment