diff --git a/src/infrastructure/daemon/bot/adapter/PhabricatorBotBaseStreamingProtocolAdapter.php b/src/infrastructure/daemon/bot/adapter/PhabricatorBotBaseStreamingProtocolAdapter.php index 2511d739e9..83b0dd40bb 100644 --- a/src/infrastructure/daemon/bot/adapter/PhabricatorBotBaseStreamingProtocolAdapter.php +++ b/src/infrastructure/daemon/bot/adapter/PhabricatorBotBaseStreamingProtocolAdapter.php @@ -1,162 +1,170 @@ server); return $uri->getDomain(); } public function connect() { $this->server = $this->getConfig('server'); $this->authtoken = $this->getConfig('authtoken'); $rooms = $this->getConfig('join'); // First, join the room if (!$rooms) { throw new Exception('Not configured to join any rooms!'); } $this->readBuffers = array(); // Set up our long poll in a curl multi request so we can // continue running while it executes in the background $this->multiHandle = curl_multi_init(); $this->readHandles = array(); foreach ($rooms as $room_id) { $this->joinRoom($room_id); // Set up the curl stream for reading $url = $this->buildStreamingUrl($room_id); - $this->readHandle[$url] = curl_init(); - curl_setopt($this->readHandle[$url], CURLOPT_URL, $url); - curl_setopt($this->readHandle[$url], CURLOPT_RETURNTRANSFER, true); - curl_setopt($this->readHandle[$url], CURLOPT_FOLLOWLOCATION, 1); + $ch = $this->readHandles[$url] = curl_init(); + + curl_setopt($ch, CURLOPT_URL, $url); + curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); + curl_setopt($ch, CURLOPT_FOLLOWLOCATION, 1); curl_setopt( - $this->readHandle[$url], + $ch, CURLOPT_USERPWD, $this->authtoken.':x'); + curl_setopt( - $this->readHandle[$url], + $ch, CURLOPT_HTTPHEADER, array('Content-type: application/json')); curl_setopt( - $this->readHandle[$url], + $ch, CURLOPT_WRITEFUNCTION, array($this, 'read')); - curl_setopt($this->readHandle[$url], CURLOPT_BUFFERSIZE, 128); - curl_setopt($this->readHandle[$url], CURLOPT_TIMEOUT, 0); + curl_setopt($ch, CURLOPT_BUFFERSIZE, 128); + curl_setopt($ch, CURLOPT_TIMEOUT, 0); - curl_multi_add_handle($this->multiHandle, $this->readHandle[$url]); + curl_multi_add_handle($this->multiHandle, $ch); // Initialize read buffer $this->readBuffers[$url] = ''; } $this->active = null; $this->blockingMultiExec(); } protected function joinRoom($room_id) { // Optional hook, by default, do nothing } // This is our callback for the background curl multi-request. // Puts the data read in on the readBuffer for processing. private function read($ch, $data) { $info = curl_getinfo($ch); $length = strlen($data); $this->readBuffers[$info['url']] .= $data; return $length; } private function blockingMultiExec() { do { $status = curl_multi_exec($this->multiHandle, $this->active); } while ($status == CURLM_CALL_MULTI_PERFORM); // Check for errors if ($status != CURLM_OK) { throw new Exception( 'Phabricator Bot had a problem reading from stream.'); } } public function getNextMessages($poll_frequency) { $messages = array(); if (!$this->active) { throw new Exception('Phabricator Bot stopped reading from stream.'); } // Prod our http request curl_multi_select($this->multiHandle, $poll_frequency); $this->blockingMultiExec(); // Process anything waiting on the read buffer while ($m = $this->processReadBuffer()) { $messages[] = $m; } return $messages; } private function processReadBuffer() { foreach ($this->readBuffers as $url => &$buffer) { $until = strpos($buffer, "}\r"); if ($until == false) { continue; } $message = substr($buffer, 0, $until + 1); $buffer = substr($buffer, $until + 2); $m_obj = json_decode($message, true); if ($message = $this->processMessage($m_obj)) { return $message; } } // If we're here, there's nothing to process return false; } protected function performPost($endpoint, $data = null) { $uri = new PhutilURI($this->server); $uri->setPath($endpoint); $payload = json_encode($data); list($output) = id(new HTTPSFuture($uri)) ->setMethod('POST') ->addHeader('Content-Type', 'application/json') ->addHeader('Authorization', $this->getAuthorizationHeader()) ->setData($payload) ->resolvex(); $output = trim($output); if (strlen($output)) { return json_decode($output, true); } return true; } protected function getAuthorizationHeader() { - return 'Basic '.base64_encode($this->authtoken.':x'); + return 'Basic '.$this->getEncodedAuthToken(); + } + + protected function getEncodedAuthToken() { + return base64_encode($this->authtoken.':x'); } abstract protected function buildStreamingUrl($channel); abstract protected function processMessage($raw_object); + } diff --git a/src/infrastructure/daemon/bot/adapter/PhabricatorBotFlowdockProtocolAdapter.php b/src/infrastructure/daemon/bot/adapter/PhabricatorBotFlowdockProtocolAdapter.php index dc3b568a88..e5f4b872d4 100644 --- a/src/infrastructure/daemon/bot/adapter/PhabricatorBotFlowdockProtocolAdapter.php +++ b/src/infrastructure/daemon/bot/adapter/PhabricatorBotFlowdockProtocolAdapter.php @@ -1,82 +1,91 @@ getConfig('organization'); + $organization = $this->getConfig('flowdock.organization'); + if (empty($organization)) { + $this->getConfig('organization'); + } + if (empty($organization)) { + throw new Exception( + '"flowdock.organization" configuration variable not set'); + } + + $ssl = $this->getConfig('ssl'); $url = ($ssl) ? 'https://' : 'http://'; - $url .= "stream.flowdock.com/flows/{$organization}/{$channel}"; - + $url .= "{$this->authtoken}@stream.flowdock.com/flows/{$organization}/{$channel}"; return $url; } protected function processMessage($m_obj) { - $command = null; - switch ($m_obj['event']) { - case 'message': - $command = 'MESSAGE'; - break; - default: - // For now, ignore anything which we don't otherwise know about. - break; - } + $command = null; + switch ($m_obj['event']) { + case 'message': + $command = 'MESSAGE'; + break; + default: + // For now, ignore anything which we don't otherwise know about. + break; + } - if ($command === null) { - return false; - } + if ($command === null) { + return false; + } - // TODO: These should be usernames, not user IDs. - $sender = id(new PhabricatorBotUser()) - ->setName($m_obj['user']); + // TODO: These should be usernames, not user IDs. + $sender = id(new PhabricatorBotUser()) + ->setName($m_obj['user']); - $target = id(new PhabricatorBotChannel()) - ->setName($m_obj['flow']); + $target = id(new PhabricatorBotChannel()) + ->setName($m_obj['flow']); - return id(new PhabricatorBotMessage()) - ->setCommand($command) - ->setSender($sender) - ->setTarget($target) - ->setBody($m_obj['content']); + return id(new PhabricatorBotMessage()) + ->setCommand($command) + ->setSender($sender) + ->setTarget($target) + ->setBody($m_obj['content']); } public function writeMessage(PhabricatorBotMessage $message) { switch ($message->getCommand()) { case 'MESSAGE': $this->speak( $message->getBody(), $message->getTarget()); break; } } private function speak( $body, PhabricatorBotTarget $flow) { - - list($organization, $room_id) = explode(':', $flow->getName()); - - $this->performPost( - "/flows/{$organization}/{$room_id}/messages", - array( - 'event' => 'message', - 'content' => $body)); - } + // The $flow->getName() returns the flow's UUID, + // as such, the Flowdock API does not require the organization + // to be specified in the URI + $this->performPost( + '/messages', + array( + 'flow' => $flow->getName(), + 'event' => 'message', + 'content' => $body)); + } public function __destruct() { if ($this->readHandles) { foreach ($this->readHandles as $read_handle) { curl_multi_remove_handle($this->multiHandle, $read_handle); curl_close($read_handle); } } curl_multi_close($this->multiHandle); } }