diff --git a/src/applications/notification/client/PhabricatorNotificationServerRef.php b/src/applications/notification/client/PhabricatorNotificationServerRef.php index cfdbb5e8eb..b183221eee 100644 --- a/src/applications/notification/client/PhabricatorNotificationServerRef.php +++ b/src/applications/notification/client/PhabricatorNotificationServerRef.php @@ -1,234 +1,234 @@ type = $type; return $this; } public function getType() { return $this->type; } public function setHost($host) { $this->host = $host; return $this; } public function getHost() { return $this->host; } public function setPort($port) { $this->port = $port; return $this; } public function getPort() { return $this->port; } public function setProtocol($protocol) { $this->protocol = $protocol; return $this; } public function getProtocol() { return $this->protocol; } public function setPath($path) { $this->path = $path; return $this; } public function getPath() { return $this->path; } public function setIsDisabled($is_disabled) { $this->isDisabled = $is_disabled; return $this; } public function getIsDisabled() { return $this->isDisabled; } public static function getLiveServers() { $cache = PhabricatorCaches::getRequestCache(); $refs = $cache->getKey(self::KEY_REFS); if (!$refs) { $refs = self::newRefs(); $cache->setKey(self::KEY_REFS, $refs); } return $refs; } public static function newRefs() { $configs = PhabricatorEnv::getEnvConfig('notification.servers'); $refs = array(); foreach ($configs as $config) { $ref = id(new self()) ->setType($config['type']) ->setHost($config['host']) ->setPort($config['port']) ->setProtocol($config['protocol']) ->setPath(idx($config, 'path')) ->setIsDisabled(idx($config, 'disabled', false)); $refs[] = $ref; } return $refs; } public static function getEnabledServers() { $servers = self::getLiveServers(); foreach ($servers as $key => $server) { if ($server->getIsDisabled()) { unset($servers[$key]); } } return array_values($servers); } public static function getEnabledAdminServers() { $servers = self::getEnabledServers(); foreach ($servers as $key => $server) { if (!$server->isAdminServer()) { unset($servers[$key]); } } return array_values($servers); } public static function getEnabledClientServers($with_protocol) { $servers = self::getEnabledServers(); foreach ($servers as $key => $server) { if ($server->isAdminServer()) { unset($servers[$key]); continue; } $protocol = $server->getProtocol(); if ($protocol != $with_protocol) { unset($servers[$key]); continue; } } return array_values($servers); } public function isAdminServer() { return ($this->type == 'admin'); } public function getURI($to_path = null) { $full_path = rtrim($this->getPath(), '/').'/'.ltrim($to_path, '/'); $uri = id(new PhutilURI('http://'.$this->getHost())) ->setProtocol($this->getProtocol()) ->setPort($this->getPort()) ->setPath($full_path); $instance = PhabricatorEnv::getEnvConfig('cluster.instance'); if (strlen($instance)) { $uri->setQueryParam('instance', $instance); } return $uri; } public function getWebsocketURI($to_path = null) { $instance = PhabricatorEnv::getEnvConfig('cluster.instance'); if (strlen($instance)) { - $to_path = $to_path.$instance.'/'; + $to_path = $to_path.'~'.$instance.'/'; } $uri = $this->getURI($to_path); if ($this->getProtocol() == 'https') { $uri->setProtocol('wss'); } else { $uri->setProtocol('ws'); } return $uri; } public function testClient() { if ($this->isAdminServer()) { throw new Exception( pht('Unable to test client on an admin server!')); } $server_uri = $this->getURI(); try { id(new HTTPSFuture($server_uri)) ->setTimeout(2) ->resolvex(); } catch (HTTPFutureHTTPResponseStatus $ex) { // This is what we expect when things are working correctly. if ($ex->getStatusCode() == 501) { return true; } throw $ex; } throw new Exception( pht('Got HTTP 200, but expected HTTP 501 (WebSocket Upgrade)!')); } public function loadServerStatus() { if (!$this->isAdminServer()) { throw new Exception( pht( 'Unable to load server status: this is not an admin server!')); } $server_uri = $this->getURI('/status/'); list($body) = id(new HTTPSFuture($server_uri)) ->setTimeout(2) ->resolvex(); return phutil_json_decode($body); } public function postMessage(array $data) { if (!$this->isAdminServer()) { throw new Exception( pht('Unable to post message: this is not an admin server!')); } $server_uri = $this->getURI('/'); $payload = phutil_json_encode($data); id(new HTTPSFuture($server_uri, $payload)) ->setMethod('POST') ->setTimeout(2) ->resolvex(); } } diff --git a/support/aphlict/server/lib/AphlictClientServer.js b/support/aphlict/server/lib/AphlictClientServer.js index 4d173171c2..1d4375cbba 100644 --- a/support/aphlict/server/lib/AphlictClientServer.js +++ b/support/aphlict/server/lib/AphlictClientServer.js @@ -1,121 +1,136 @@ 'use strict'; var JX = require('./javelin').JX; require('./AphlictListenerList'); require('./AphlictLog'); var url = require('url'); var util = require('util'); var WebSocket = require('ws'); JX.install('AphlictClientServer', { construct: function(server) { server.on('request', JX.bind(this, this._onrequest)); this._server = server; this._lists = {}; }, properties: { logger: null, }, members: { _server: null, _lists: null, getListenerList: function(instance) { if (!this._lists[instance]) { this._lists[instance] = new JX.AphlictListenerList(instance); } return this._lists[instance]; }, log: function() { var logger = this.getLogger(); if (!logger) { return; } logger.log.apply(logger, arguments); return this; }, _onrequest: function(request, response) { // The websocket code upgrades connections before they get here, so // this only handles normal HTTP connections. We just fail them with // a 501 response. response.writeHead(501); response.end('HTTP/501 Use Websockets\n'); }, + _parseInstanceFromPath: function(path) { + // If there's no "~" marker in the path, it's not an instance name. + // Users sometimes configure nginx or Apache to proxy based on the + // path. + if (path.indexOf('~') === -1) { + return 'default'; + } + + var instance = path.split('~')[1]; + + // Remove any "/" characters. + instance = instance.replace(/\//g, ''); + if (!instance.length) { + return 'default'; + } + + return instance; + }, + listen: function() { var self = this; var server = this._server.listen.apply(this._server, arguments); var wss = new WebSocket.Server({server: server}); wss.on('connection', function(ws) { - var instance = url.parse(ws.upgradeReq.url).pathname; - - instance = instance.replace(/\//g, ''); - if (!instance.length) { - instance = 'default'; - } + var path = url.parse(ws.upgradeReq.url).pathname; + var instance = self._parseInstanceFromPath(path); var listener = self.getListenerList(instance).addListener(ws); function log() { self.log( util.format('<%s>', listener.getDescription()) + ' ' + util.format.apply(null, arguments)); } log('Connected from %s.', ws._socket.remoteAddress); ws.on('message', function(data) { log('Received message: %s', data); var message; try { message = JSON.parse(data); } catch (err) { log('Message is invalid: %s', err.message); return; } switch (message.command) { case 'subscribe': log( 'Subscribed to: %s', JSON.stringify(message.data)); listener.subscribe(message.data); break; case 'unsubscribe': log( 'Unsubscribed from: %s', JSON.stringify(message.data)); listener.unsubscribe(message.data); break; default: log( 'Unrecognized command "%s".', message.command || ''); } }); ws.on('close', function() { self.getListenerList(instance).removeListener(listener); log('Disconnected.'); }); }); } } });