Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F15523278
D15700.id37826.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
10 KB
Referenced Files
None
Subscribers
None
D15700.id37826.diff
View Options
diff --git a/support/aphlict/server/aphlict_server.js b/support/aphlict/server/aphlict_server.js
--- a/support/aphlict/server/aphlict_server.js
+++ b/support/aphlict/server/aphlict_server.js
@@ -99,8 +99,29 @@
if (ssl_config.enabled) {
ssl_config.key = fs.readFileSync(config['ssl-key']);
ssl_config.cert = fs.readFileSync(config['ssl-cert']);
+} else {
+ ssl_config.key = null;
+ ssl_config.cert = null;
}
+var servers = [];
+
+servers.push({
+ type: 'client',
+ port: config['client-port'],
+ listen: config['client-host'],
+ 'ssl.key': ssl_config.key,
+ 'ssl.certificate': ssl_config.cert
+});
+
+servers.push({
+ type: 'admin',
+ port: config['admin-port'],
+ listen: config['admin-host'],
+ 'ssl.key': null,
+ 'ssl.cert': null
+});
+
// If we're just doing a configuration test, exit here before starting any
// servers.
if (config.test) {
@@ -109,27 +130,49 @@
return;
}
-var server;
-if (ssl_config.enabled) {
- server = https.createServer({
- key: ssl_config.key,
- cert: ssl_config.cert
- }, function(req, res) {
- res.writeHead(501);
- res.end('HTTP/501 Use Websockets\n');
- });
-} else {
- server = http.createServer(function() {});
-}
+var aphlict_servers = [];
+var aphlict_clients = [];
+var aphlict_admins = [];
+
+var ii;
+for (ii = 0; ii < servers.length; ii++) {
+ var server = servers[ii];
+ var is_client = (server.type == 'client');
+
+ var http_server;
+ if (server['ssl.key']) {
+ var https_config = {
+ key: server['ssl.key'],
+ cert: server['ssl.cert']
+ };
+
+ http_server = https.createServer(https_config);
+ } else {
+ http_server = http.createServer();
+ }
+
+ var aphlict_server;
+ if (is_client) {
+ aphlict_server = new JX.AphlictClientServer(http_server);
+ } else {
+ aphlict_server = new JX.AphlictAdminServer(http_server);
+ }
-var client_server = new JX.AphlictClientServer(server);
-var admin_server = new JX.AphlictAdminServer();
+ aphlict_server.setLogger(debug);
+ aphlict_server.listen(server.port, server.listen);
-client_server.setLogger(debug);
-admin_server.setLogger(debug);
-admin_server.setClientServer(client_server);
+ aphlict_servers.push(aphlict_server);
-client_server.listen(config['client-port'], config['client-host']);
-admin_server.listen(config['admin-port'], config['admin-host']);
+ if (is_client) {
+ aphlict_clients.push(aphlict_server);
+ } else {
+ aphlict_admins.push(aphlict_server);
+ }
+}
+
+for (ii = 0; ii < aphlict_admins.length; ii++) {
+ var admin_server = aphlict_admins[ii];
+ admin_server.setClientServers(aphlict_clients);
+}
debug.log('Started Server (PID %d)', process.pid);
diff --git a/support/aphlict/server/lib/AphlictAdminServer.js b/support/aphlict/server/lib/AphlictAdminServer.js
--- a/support/aphlict/server/lib/AphlictAdminServer.js
+++ b/support/aphlict/server/lib/AphlictAdminServer.js
@@ -9,15 +9,19 @@
JX.install('AphlictAdminServer', {
- construct: function() {
- this.setLogger(new JX.AphlictLog());
-
+ construct: function(server) {
this._startTime = new Date().getTime();
this._messagesIn = 0;
this._messagesOut = 0;
- var handler = this._handler.bind(this);
- this._server = http.createServer(handler);
+ server.on('request', JX.bind(this, this._onrequest));
+ this._server = server;
+ this._clientServers = [];
+ },
+
+ properties: {
+ clientServers: null,
+ logger: null,
},
members: {
@@ -26,15 +30,32 @@
_server: null,
_startTime: null,
- getListenerList: function(instance) {
- return this.getClientServer().getListenerList(instance);
+ getListenerLists: function(instance) {
+ var clients = this.getClientServers();
+
+ var lists = [];
+ for (var ii = 0; ii < clients.length; ii++) {
+ lists.push(clients[ii].getListenerList(instance));
+ }
+ return lists;
+ },
+
+ log: function() {
+ var logger = this.getLogger();
+ if (!logger) {
+ return;
+ }
+
+ logger.log.apply(logger, arguments);
+
+ return this;
},
listen: function() {
return this._server.listen.apply(this._server, arguments);
},
- _handler: function(request, response) {
+ _onrequest: function(request, response) {
var self = this;
var u = url.parse(request.url, true);
var instance = u.query.instance || '/';
@@ -52,7 +73,7 @@
try {
var msg = JSON.parse(body);
- self.getLogger().log(
+ self.log(
'Received notification (' + instance + '): ' +
JSON.stringify(msg));
++self._messagesIn;
@@ -61,14 +82,14 @@
self._transmit(instance, msg);
response.writeHead(200, {'Content-Type': 'text/plain'});
} catch (err) {
- self.getLogger().log(
+ self.log(
'<%s> Internal Server Error! %s',
request.socket.remoteAddress,
err);
response.writeHead(500, 'Internal Server Error');
}
} catch (err) {
- self.getLogger().log(
+ self.log(
'<%s> Bad Request! %s',
request.socket.remoteAddress,
err);
@@ -82,61 +103,77 @@
response.end();
}
} else if (u.pathname == '/status/') {
- var status = {
- 'instance': instance,
- 'uptime': (new Date().getTime() - this._startTime),
- 'clients.active': this.getListenerList(instance)
- .getActiveListenerCount(),
- 'clients.total': this.getListenerList(instance)
- .getTotalListenerCount(),
- 'messages.in': this._messagesIn,
- 'messages.out': this._messagesOut,
- 'version': 7
- };
-
- response.writeHead(200, {'Content-Type': 'application/json'});
- response.write(JSON.stringify(status));
- response.end();
+ this._handleStatusRequest(request, response, instance);
} else {
response.writeHead(404, 'Not Found');
response.end();
}
},
+ _handleStatusRequest: function(request, response, instance) {
+ var active_count = 0;
+ var total_count = 0;
+
+ var lists = this.getListenerLists(instance);
+ for (var ii = 0; ii < lists.length; ii++) {
+ var list = lists[ii];
+ active_count += list.getActiveListenerCount();
+ total_count += list.getTotalListenerCount();
+ }
+
+ var server_status = {
+ 'instance': instance,
+ 'uptime': (new Date().getTime() - this._startTime),
+ 'clients.active': active_count,
+ 'clients.total': total_count,
+ 'messages.in': this._messagesIn,
+ 'messages.out': this._messagesOut,
+ 'version': 7
+ };
+
+ response.writeHead(200, {'Content-Type': 'application/json'});
+ response.write(JSON.stringify(server_status));
+ response.end();
+ },
+
/**
* Transmits a message to all subscribed listeners.
*/
_transmit: function(instance, message) {
- var listeners = this.getListenerList(instance)
- .getListeners()
- .filter(function(client) {
- return client.isSubscribedToAny(message.subscribers);
- });
+ var lists = this.getListenerLists(instance);
+
+ for (var ii = 0; ii < lists.length; ii++) {
+ var list = lists[ii];
+ var listeners = list.getListeners();
+ this._transmitToListeners(list, listeners, message);
+ }
+ },
+
+ _transmitToListeners: function(list, listeners, message) {
+ for (var ii = 0; ii < listeners.length; ii++) {
+ var listener = listeners[ii];
- for (var i = 0; i < listeners.length; i++) {
- var listener = listeners[i];
+ if (!listener.isSubscribedToAny(message.subscribers)) {
+ continue;
+ }
try {
listener.writeMessage(message);
++this._messagesOut;
- this.getLogger().log(
+ this.log(
'<%s> Wrote Message',
listener.getDescription());
} catch (error) {
- this.getListenerList(instance).removeListener(listener);
- this.getLogger().log(
+ list.removeListener(listener);
+
+ this.log(
'<%s> Write Error: %s',
listener.getDescription(),
error);
}
}
- },
- },
-
- properties: {
- clientServer: null,
- logger: null,
+ }
}
});
diff --git a/support/aphlict/server/lib/AphlictClientServer.js b/support/aphlict/server/lib/AphlictClientServer.js
--- a/support/aphlict/server/lib/AphlictClientServer.js
+++ b/support/aphlict/server/lib/AphlictClientServer.js
@@ -12,11 +12,16 @@
JX.install('AphlictClientServer', {
construct: function(server) {
- this.setLogger(new JX.AphlictLog());
+ server.on('request', JX.bind(this, this._onrequest));
+
this._server = server;
this._lists = {};
},
+ properties: {
+ logger: null,
+ },
+
members: {
_server: null,
_lists: null,
@@ -28,6 +33,25 @@
return this._lists[path];
},
+ log: function() {
+ var logger = this.getLogger();
+ if (!logger) {
+ return;
+ }
+
+ logger.log.apply(logger, arguments);
+
+ return this;
+ },
+
+ _onrequest: function(request, response) {
+ // The websocket code upgrades connections before they get here, so
+ // this only handles normal HTTP connections. We just fail them with
+ // a 501 response.
+ response.writeHead(501);
+ response.end('HTTP/501 Use Websockets\n');
+ },
+
listen: function() {
var self = this;
var server = this._server.listen.apply(this._server, arguments);
@@ -38,7 +62,7 @@
var listener = self.getListenerList(path).addListener(ws);
function log() {
- self.getLogger().log(
+ self.log(
util.format('<%s>', listener.getDescription()) +
' ' +
util.format.apply(null, arguments));
@@ -97,10 +121,6 @@
},
- },
-
- properties: {
- logger: null,
}
});
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, Apr 22, 12:24 AM (3 d, 2 h ago)
Storage Engine
blob
Storage Format
Encrypted (AES-256-CBC)
Storage Handle
7229828
Default Alt Text
D15700.id37826.diff (10 KB)
Attached To
Mode
D15700: Begin generalizing Aphlict server to prepare for clustering/sensible config file
Attached
Detach File
Event Timeline
Log In to Comment