Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F15379722
D9458.id22578.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
10 KB
Referenced Files
None
Subscribers
None
D9458.id22578.diff
View Options
diff --git a/src/applications/notification/client/PhabricatorNotificationClient.php b/src/applications/notification/client/PhabricatorNotificationClient.php
--- a/src/applications/notification/client/PhabricatorNotificationClient.php
+++ b/src/applications/notification/client/PhabricatorNotificationClient.php
@@ -2,7 +2,7 @@
final class PhabricatorNotificationClient {
- const EXPECT_VERSION = 5;
+ const EXPECT_VERSION = 6;
public static function getServerStatus() {
$uri = PhabricatorEnv::getEnvConfig('notification.server-uri');
diff --git a/support/aphlict/server/aphlict_server.js b/support/aphlict/server/aphlict_server.js
--- a/support/aphlict/server/aphlict_server.js
+++ b/support/aphlict/server/aphlict_server.js
@@ -12,6 +12,8 @@
JX.require('lib/AphlictListenerList', __dirname);
JX.require('lib/AphlictLog', __dirname);
+require('./lib/Array');
+
var debug = new JX.AphlictLog()
.addConsole(console);
@@ -25,10 +27,10 @@
function parse_command_line_arguments(argv) {
var config = {
- port : 22280,
- admin : 22281,
- host : '127.0.0.1',
- user : null,
+ port: 22280,
+ admin: 22281,
+ host: '127.0.0.1',
+ user: null,
log: '/var/log/aphlict.log'
};
@@ -36,10 +38,10 @@
var arg = argv[ii];
var matches = arg.match(/^--([^=]+)=(.*)$/);
if (!matches) {
- throw new Error("Unknown argument '"+arg+"'!");
+ throw new Error("Unknown argument '" + arg + "'!");
}
if (!(matches[1] in config)) {
- throw new Error("Unknown argument '"+matches[1]+"'!");
+ throw new Error("Unknown argument '" + matches[1] + "'!");
}
config[matches[1]] = matches[2];
}
@@ -52,19 +54,19 @@
if (process.getuid() !== 0) {
console.log(
- "ERROR: "+
- "This server must be run as root because it needs to bind to privileged "+
- "port 843 to start a Flash policy server. It will downgrade to run as a "+
- "less-privileged user after binding if you pass a user in the command "+
+ "ERROR: " +
+ "This server must be run as root because it needs to bind to privileged " +
+ "port 843 to start a Flash policy server. It will downgrade to run as a " +
+ "less-privileged user after binding if you pass a user in the command " +
"line arguments with '--user=alincoln'.");
process.exit(1);
}
var net = require('net');
-var http = require('http');
+var http = require('http');
var url = require('url');
-process.on('uncaughtException', function (err) {
+process.on('uncaughtException', function(err) {
debug.log("\n<<< UNCAUGHT EXCEPTION! >>>\n\n" + err);
process.exit(1);
});
@@ -82,6 +84,36 @@
listener.getDescription(),
socket.remoteAddress);
+ socket.on('data', function(msg) {
+ var buffer = new Buffer(msg);
+ var message = JSON.parse(buffer.toString());
+
+ debug.log('<%s> Received data: %s',
+ listener.getDescription(),
+ JSON.stringify(message));
+
+ switch (message.command) {
+ case 'subscribe':
+ debug.log(
+ '<%s> Subscribed to: %s',
+ listener.getDescription(),
+ JSON.stringify(message.data));
+ listener.subscribe(message.data);
+ break;
+
+ case 'unsubscribe':
+ debug.log(
+ '<%s> Unsubscribed from: %s',
+ listener.getDescription(),
+ JSON.stringify(message.data));
+ listener.unsubscribe(message.data);
+ break;
+
+ default:
+ debug.log('<s> Unrecognized command.', listener.getDescription());
+ }
+ });
+
socket.on('close', function() {
clients.removeListener(listener);
debug.log('<%s> Disconnected', listener.getDescription());
@@ -95,7 +127,7 @@
debug.log('<%s> Ended Connection', listener.getDescription());
});
- socket.on('error', function (e) {
+ socket.on('error', function(e) {
debug.log('<%s> Error: %s', listener.getDescription(), e);
});
@@ -107,22 +139,22 @@
var start_time = new Date().getTime();
var receive_server = http.createServer(function(request, response) {
- response.writeHead(200, {'Content-Type' : 'text/plain'});
+ response.writeHead(200, {'Content-Type': 'text/plain'});
// Publishing a notification.
if (request.method == 'POST') {
var body = '';
- request.on('data', function (data) {
+ request.on('data', function(data) {
body += data;
});
- request.on('end', function () {
+ request.on('end', function() {
++messages_in;
var msg = JSON.parse(body);
debug.log('notification: ' + JSON.stringify(msg));
- broadcast(msg.data);
+ transmit(msg.data, msg.subscribers);
response.end();
});
} else if (request.url == '/status/') {
@@ -139,7 +171,7 @@
'messages.in': messages_in,
'messages.out': messages_out,
'log': config.log,
- 'version': 5
+ 'version': 6
};
response.write(JSON.stringify(status));
@@ -153,10 +185,14 @@
}).listen(config.admin, config.host);
-function broadcast(data) {
- var listeners = clients.getListeners();
- for (var id in listeners) {
- var listener = listeners[id];
+function transmit(data, subscribers) {
+ var listeners = clients.getListeners().filter(function() {
+ return this.subscribed(subscribers);
+ });
+
+ for (var i = 0; i < listeners.length; i++) {
+ var listener = listeners[i];
+
try {
listener.writeMessage(data);
diff --git a/support/aphlict/server/lib/AphlictFlashPolicyServer.js b/support/aphlict/server/lib/AphlictFlashPolicyServer.js
--- a/support/aphlict/server/lib/AphlictFlashPolicyServer.js
+++ b/support/aphlict/server/lib/AphlictFlashPolicyServer.js
@@ -17,12 +17,12 @@
_accessPort: null,
_debug: null,
- setDebugLog : function(log) {
+ setDebugLog: function(log) {
this._debug = log;
return this;
},
- setAccessPort : function(port) {
+ setAccessPort: function(port) {
this._accessPort = port;
return this;
},
diff --git a/support/aphlict/server/lib/AphlictListener.js b/support/aphlict/server/lib/AphlictListener.js
--- a/support/aphlict/server/lib/AphlictListener.js
+++ b/support/aphlict/server/lib/AphlictListener.js
@@ -7,22 +7,40 @@
},
members : {
- _id : null,
- _socket : null,
+ _id: null,
+ _socket: null,
+ _subscriptions: null,
- getID : function() {
+ getID: function() {
return this._id;
},
- getSocket : function() {
+ subscribe: function(phids) {
+ // TODO: Add phids from this._subscriptions
+ return this;
+ },
+
+ unsubscribe: function(phids) {
+ // TODO: Remove phids from this._subscriptions
+ return this;
+ },
+
+ subscribed: function(phids) {
+ var intersection = this._subscriptions.filter(function(phid) {
+ return phids.indexOf(phid) != -1;
+ });
+ return intersection.length > 0;
+ },
+
+ getSocket: function() {
return this._socket;
},
- getDescription : function() {
+ getDescription: function() {
return 'Listener/' + this.getID();
},
- writeMessage : function(message) {
+ writeMessage: function(message) {
var serial = JSON.stringify(message);
var length = Buffer.byteLength(serial, 'utf8');
diff --git a/support/aphlict/server/lib/AphlictListenerList.js b/support/aphlict/server/lib/AphlictListenerList.js
--- a/support/aphlict/server/lib/AphlictListenerList.js
+++ b/support/aphlict/server/lib/AphlictListenerList.js
@@ -2,17 +2,17 @@
JX.require('AphlictListener', __dirname);
JX.install('AphlictListenerList', {
- construct : function() {
+ construct: function() {
this._listeners = {};
},
- members : {
- _listeners : null,
- _nextID : 0,
- _activeListenerCount : 0,
- _totalListenerCount : 0,
+ members: {
+ _listeners: null,
+ _nextID: 0,
+ _activeListenerCount: 0,
+ _totalListenerCount: 0,
- addListener : function(socket) {
+ addListener: function(socket) {
var listener = new JX.AphlictListener(
this._generateNextID(),
socket);
@@ -24,7 +24,7 @@
return listener;
},
- removeListener : function(listener) {
+ removeListener: function(listener) {
var id = listener.getID();
if (id in this._listeners) {
delete this._listeners[id];
@@ -32,19 +32,19 @@
}
},
- getListeners : function() {
+ getListeners: function() {
return this._listeners;
},
- getActiveListenerCount : function() {
+ getActiveListenerCount: function() {
return this._activeListenerCount;
},
- getTotalListenerCount : function() {
+ getTotalListenerCount: function() {
return this._totalListenerCount;
},
- _generateNextID : function() {
+ _generateNextID: function() {
do {
this._nextID = ((this._nextID + 1) % 1000000000000);
} while (this._nextID in this._listeners);
diff --git a/support/aphlict/server/lib/AphlictLog.js b/support/aphlict/server/lib/AphlictLog.js
--- a/support/aphlict/server/lib/AphlictLog.js
+++ b/support/aphlict/server/lib/AphlictLog.js
@@ -4,16 +4,16 @@
var util = require('util');
JX.install('AphlictLog', {
- construct : function() {
+ construct: function() {
this._writeToLogs = [];
this._writeToConsoles = [];
},
- members : {
- _writeToConsoles : null,
- _writeToLogs : null,
+ members: {
+ _writeToConsoles: null,
+ _writeToLogs: null,
- addLogfile : function(path) {
+ addLogfile: function(path) {
var options = {
flags: 'a',
encoding: 'utf8',
@@ -27,12 +27,12 @@
return this;
},
- addConsole : function(console) {
+ addConsole: function(console) {
this._writeToConsoles.push(console);
return this;
},
- log : function(pattern) {
+ log: function(pattern) {
var str = util.format.apply(null, arguments);
var date = new Date().toLocaleString();
str = '[' + date + '] ' + str;
diff --git a/support/aphlict/server/lib/javelin.js b/support/aphlict/server/lib/javelin.js
--- a/support/aphlict/server/lib/javelin.js
+++ b/support/aphlict/server/lib/javelin.js
@@ -6,7 +6,7 @@
// NOTE: This is faking out a piece of code in JX.install which waits for
// Stratcom before running static initializers.
-JX.Stratcom = {ready : true};
+JX.Stratcom = {ready: true};
JX.require('core/Event');
JX.require('core/Stratcom');
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Fri, Mar 14, 11:00 PM (1 w, 3 d ago)
Storage Engine
blob
Storage Format
Encrypted (AES-256-CBC)
Storage Handle
7675157
Default Alt Text
D9458.id22578.diff (10 KB)
Attached To
Mode
D9458: Modify the Aphlict server to transmit messages instead of broadcasting them.
Attached
Detach File
Event Timeline
Log In to Comment