diff --git a/src/phage/action/PhageAgentAction.php b/src/phage/action/PhageAgentAction.php index 087a81e..61894f0 100644 --- a/src/phage/action/PhageAgentAction.php +++ b/src/phage/action/PhageAgentAction.php @@ -1,185 +1,189 @@ isActive; } abstract protected function newAgentFuture(PhutilCommandString $command); protected function getAllWaitingChannels() { $channels = array(); if ($this->isActiveAgent()) { $channels[] = $this->channel; } return $channels; } public function startAgent() { $bootloader = new PhagePHPAgentBootloader(); $future = $this->newAgentFuture($bootloader->getBootCommand()); $future->write($bootloader->getBootSequence(), $keep_open = true); $channel = new PhutilExecChannel($future); $channel->setStderrHandler(array($this, 'didReadAgentStderr')); $channel = new PhutilJSONProtocolChannel($channel); foreach ($this->getActions() as $command) { $key = 'command/'.$this->commandKey++; $this->commands[$key] = array( 'key' => $key, 'command' => $command, ); $channel->write( array( 'type' => 'EXEC', 'key' => $key, 'command' => $command->getCommand()->getUnmaskedString(), )); } $this->future = $future; $this->channel = $channel; $this->isActive = true; } public function updateAgent() { if (!$this->isActiveAgent()) { return; } $channel = $this->channel; while (true) { $is_open = $channel->update(); $message = $channel->read(); if ($message !== null) { switch ($message['type']) { case 'TEXT': $key = $message['key']; $this->writeOutput($key, $message['kind'], $message['text']); break; case 'RSLV': $key = $message['key']; $command = $this->commands[$key]['command']; $this->writeOutput($key, 'stdout', $message['stdout']); $this->writeOutput($key, 'stderr', $message['stderr']); $exit_code = $message['err']; $command->setExitCode($exit_code); if ($exit_code != 0) { $exit_code = $this->formatOutput( pht( 'Command ("%s") exited nonzero ("%s")!', $command->getCommand(), $exit_code), $command->getLabel()); fprintf(STDOUT, '%s', $exit_code); } unset($this->commands[$key]); if (!$this->commands) { $channel->write( array( 'type' => 'EXIT', 'key' => 'exit', )); $this->isExiting = true; break; } } } if (!$is_open) { if ($this->isExiting) { $this->isActive = false; break; } else { throw new Exception(pht('Channel closed unexpectedly!')); } } + + if ($message === null) { + break; + } } } private function writeOutput($key, $kind, $text) { if (!strlen($text)) { return; } switch ($kind) { case 'stdout': $target = STDOUT; break; case 'stderr': $target = STDERR; break; default: throw new Exception(pht('Unknown output kind "%s".', $kind)); } $command = $this->commands[$key]['command']; $label = $command->getLabel(); if (!strlen($label)) { $label = pht('Unknown Command'); } $text = $this->formatOutput($text, $label); fprintf($target, '%s', $text); } private function formatOutput($output, $context) { $output = phutil_split_lines($output, false); foreach ($output as $key => $line) { $output[$key] = tsprintf("[%s] %R\n", $context, $line); } $output = implode('', $output); return $output; } public function didReadAgentStderr($channel, $stderr) { throw new Exception( pht( 'Unexpected output on agent stderr: %s.', $stderr)); } }