Page MenuHomePhabricator

D9458.id22581.diff
No OneTemporary

D9458.id22581.diff

diff --git a/src/applications/notification/client/PhabricatorNotificationClient.php b/src/applications/notification/client/PhabricatorNotificationClient.php
--- a/src/applications/notification/client/PhabricatorNotificationClient.php
+++ b/src/applications/notification/client/PhabricatorNotificationClient.php
@@ -2,7 +2,7 @@
final class PhabricatorNotificationClient {
- const EXPECT_VERSION = 5;
+ const EXPECT_VERSION = 6;
public static function getServerStatus() {
$uri = PhabricatorEnv::getEnvConfig('notification.server-uri');
diff --git a/support/aphlict/server/aphlict_server.js b/support/aphlict/server/aphlict_server.js
--- a/support/aphlict/server/aphlict_server.js
+++ b/support/aphlict/server/aphlict_server.js
@@ -25,10 +25,10 @@
function parse_command_line_arguments(argv) {
var config = {
- port : 22280,
- admin : 22281,
- host : '127.0.0.1',
- user : null,
+ port: 22280,
+ admin: 22281,
+ host: '127.0.0.1',
+ user: null,
log: '/var/log/aphlict.log'
};
@@ -36,10 +36,10 @@
var arg = argv[ii];
var matches = arg.match(/^--([^=]+)=(.*)$/);
if (!matches) {
- throw new Error("Unknown argument '"+arg+"'!");
+ throw new Error("Unknown argument '" + arg + "'!");
}
if (!(matches[1] in config)) {
- throw new Error("Unknown argument '"+matches[1]+"'!");
+ throw new Error("Unknown argument '" + matches[1] + "'!");
}
config[matches[1]] = matches[2];
}
@@ -52,19 +52,19 @@
if (process.getuid() !== 0) {
console.log(
- "ERROR: "+
- "This server must be run as root because it needs to bind to privileged "+
- "port 843 to start a Flash policy server. It will downgrade to run as a "+
- "less-privileged user after binding if you pass a user in the command "+
+ "ERROR: " +
+ "This server must be run as root because it needs to bind to privileged " +
+ "port 843 to start a Flash policy server. It will downgrade to run as a " +
+ "less-privileged user after binding if you pass a user in the command " +
"line arguments with '--user=alincoln'.");
process.exit(1);
}
var net = require('net');
-var http = require('http');
+var http = require('http');
var url = require('url');
-process.on('uncaughtException', function (err) {
+process.on('uncaughtException', function(err) {
debug.log("\n<<< UNCAUGHT EXCEPTION! >>>\n\n" + err);
process.exit(1);
});
@@ -82,6 +82,51 @@
listener.getDescription(),
socket.remoteAddress);
+ var buffer = new Buffer([]);
+ var length = 0;
+
+ socket.on('data', function(data) {
+ var buf = new Buffer(data);
+
+ if (!length) {
+ length = buf.readUInt16BE();
+ buf = buf.slice(2);
+ }
+
+ buffer = buffer.concat(buf);
+ if (buffer.length < length) {
+ // read more bytes
+ }
+
+ var message = JSON.parse(buffer.toString('utf8', 0, length));
+ buffer = buffer.slice(length);
+
+ debug.log('<%s> Received data: %s',
+ listener.getDescription(),
+ JSON.stringify(message));
+
+ switch (message.command) {
+ case 'subscribe':
+ debug.log(
+ '<%s> Subscribed to: %s',
+ listener.getDescription(),
+ JSON.stringify(message.data));
+ listener.subscribe(message.data);
+ break;
+
+ case 'unsubscribe':
+ debug.log(
+ '<%s> Unsubscribed from: %s',
+ listener.getDescription(),
+ JSON.stringify(message.data));
+ listener.unsubscribe(message.data);
+ break;
+
+ default:
+ debug.log('<s> Unrecognized command.', listener.getDescription());
+ }
+ });
+
socket.on('close', function() {
clients.removeListener(listener);
debug.log('<%s> Disconnected', listener.getDescription());
@@ -95,7 +140,7 @@
debug.log('<%s> Ended Connection', listener.getDescription());
});
- socket.on('error', function (e) {
+ socket.on('error', function(e) {
debug.log('<%s> Error: %s', listener.getDescription(), e);
});
@@ -107,22 +152,22 @@
var start_time = new Date().getTime();
var receive_server = http.createServer(function(request, response) {
- response.writeHead(200, {'Content-Type' : 'text/plain'});
+ response.writeHead(200, {'Content-Type': 'text/plain'});
// Publishing a notification.
if (request.method == 'POST') {
var body = '';
- request.on('data', function (data) {
+ request.on('data', function(data) {
body += data;
});
- request.on('end', function () {
+ request.on('end', function() {
++messages_in;
var msg = JSON.parse(body);
debug.log('notification: ' + JSON.stringify(msg));
- broadcast(msg.data);
+ transmit(msg.data, msg.subscribers);
response.end();
});
} else if (request.url == '/status/') {
@@ -139,7 +184,7 @@
'messages.in': messages_in,
'messages.out': messages_out,
'log': config.log,
- 'version': 5
+ 'version': 6
};
response.write(JSON.stringify(status));
@@ -153,10 +198,14 @@
}).listen(config.admin, config.host);
-function broadcast(data) {
- var listeners = clients.getListeners();
- for (var id in listeners) {
- var listener = listeners[id];
+function transmit(data, subscribers) {
+ var listeners = clients.getListeners().filter(function() {
+ return this.subscribed(subscribers);
+ });
+
+ for (var i = 0; i < listeners.length; i++) {
+ var listener = listeners[i];
+
try {
listener.writeMessage(data);
diff --git a/support/aphlict/server/lib/AphlictFlashPolicyServer.js b/support/aphlict/server/lib/AphlictFlashPolicyServer.js
--- a/support/aphlict/server/lib/AphlictFlashPolicyServer.js
+++ b/support/aphlict/server/lib/AphlictFlashPolicyServer.js
@@ -17,12 +17,12 @@
_accessPort: null,
_debug: null,
- setDebugLog : function(log) {
+ setDebugLog: function(log) {
this._debug = log;
return this;
},
- setAccessPort : function(port) {
+ setAccessPort: function(port) {
this._accessPort = port;
return this;
},
diff --git a/support/aphlict/server/lib/AphlictListener.js b/support/aphlict/server/lib/AphlictListener.js
--- a/support/aphlict/server/lib/AphlictListener.js
+++ b/support/aphlict/server/lib/AphlictListener.js
@@ -7,22 +7,54 @@
},
members : {
- _id : null,
- _socket : null,
+ _id: null,
+ _socket: null,
+ _subscriptions: {},
- getID : function() {
+ getID: function() {
return this._id;
},
- getSocket : function() {
+ subscribe: function(phids) {
+ for (var i = 0; i < phids.length; i++) {
+ var phid = phids[i];
+
+ if (!(phid in this._subscriptions)) {
+ this._subscriptions[phid] = true;
+ }
+ }
+
+ return this;
+ },
+
+ unsubscribe: function(phids) {
+ for (var i = 0; i < phids.length; i++) {
+ var phid = phids[i];
+
+ if (phid in this._subscriptions) {
+ delete this._subscriptions[phid];
+ }
+ }
+
+ return this;
+ },
+
+ isSubscribedToAny: function(phids) {
+ var intersection = phids.filter(function(phid) {
+ return phid in this._subscriptions;
+ });
+ return intersection.length > 0;
+ },
+
+ getSocket: function() {
return this._socket;
},
- getDescription : function() {
+ getDescription: function() {
return 'Listener/' + this.getID();
},
- writeMessage : function(message) {
+ writeMessage: function(message) {
var serial = JSON.stringify(message);
var length = Buffer.byteLength(serial, 'utf8');
diff --git a/support/aphlict/server/lib/AphlictListenerList.js b/support/aphlict/server/lib/AphlictListenerList.js
--- a/support/aphlict/server/lib/AphlictListenerList.js
+++ b/support/aphlict/server/lib/AphlictListenerList.js
@@ -2,17 +2,17 @@
JX.require('AphlictListener', __dirname);
JX.install('AphlictListenerList', {
- construct : function() {
+ construct: function() {
this._listeners = {};
},
- members : {
- _listeners : null,
- _nextID : 0,
- _activeListenerCount : 0,
- _totalListenerCount : 0,
+ members: {
+ _listeners: null,
+ _nextID: 0,
+ _activeListenerCount: 0,
+ _totalListenerCount: 0,
- addListener : function(socket) {
+ addListener: function(socket) {
var listener = new JX.AphlictListener(
this._generateNextID(),
socket);
@@ -24,7 +24,7 @@
return listener;
},
- removeListener : function(listener) {
+ removeListener: function(listener) {
var id = listener.getID();
if (id in this._listeners) {
delete this._listeners[id];
@@ -32,19 +32,19 @@
}
},
- getListeners : function() {
+ getListeners: function() {
return this._listeners;
},
- getActiveListenerCount : function() {
+ getActiveListenerCount: function() {
return this._activeListenerCount;
},
- getTotalListenerCount : function() {
+ getTotalListenerCount: function() {
return this._totalListenerCount;
},
- _generateNextID : function() {
+ _generateNextID: function() {
do {
this._nextID = ((this._nextID + 1) % 1000000000000);
} while (this._nextID in this._listeners);
diff --git a/support/aphlict/server/lib/AphlictLog.js b/support/aphlict/server/lib/AphlictLog.js
--- a/support/aphlict/server/lib/AphlictLog.js
+++ b/support/aphlict/server/lib/AphlictLog.js
@@ -4,16 +4,16 @@
var util = require('util');
JX.install('AphlictLog', {
- construct : function() {
+ construct: function() {
this._writeToLogs = [];
this._writeToConsoles = [];
},
- members : {
- _writeToConsoles : null,
- _writeToLogs : null,
+ members: {
+ _writeToConsoles: null,
+ _writeToLogs: null,
- addLogfile : function(path) {
+ addLogfile: function(path) {
var options = {
flags: 'a',
encoding: 'utf8',
@@ -27,12 +27,12 @@
return this;
},
- addConsole : function(console) {
+ addConsole: function(console) {
this._writeToConsoles.push(console);
return this;
},
- log : function(pattern) {
+ log: function(pattern) {
var str = util.format.apply(null, arguments);
var date = new Date().toLocaleString();
str = '[' + date + '] ' + str;
diff --git a/support/aphlict/server/lib/javelin.js b/support/aphlict/server/lib/javelin.js
--- a/support/aphlict/server/lib/javelin.js
+++ b/support/aphlict/server/lib/javelin.js
@@ -6,7 +6,7 @@
// NOTE: This is faking out a piece of code in JX.install which waits for
// Stratcom before running static initializers.
-JX.Stratcom = {ready : true};
+JX.Stratcom = {ready: true};
JX.require('core/Event');
JX.require('core/Stratcom');

File Metadata

Mime Type
text/plain
Expires
Thu, May 9, 7:17 PM (3 w, 2 d ago)
Storage Engine
blob
Storage Format
Encrypted (AES-256-CBC)
Storage Handle
6275522
Default Alt Text
D9458.id22581.diff (10 KB)

Event Timeline