diff --git a/src/phage/__tests__/PhageAgentTestCase.php b/src/phage/__tests__/PhageAgentTestCase.php index 368ea8a..4973081 100644 --- a/src/phage/__tests__/PhageAgentTestCase.php +++ b/src/phage/__tests__/PhageAgentTestCase.php @@ -1,47 +1,49 @@ runBootloaderTests(new PhagePHPAgentBootloader()); } private function runBootloaderTests(PhageAgentBootloader $boot) { $name = get_class($boot); $exec = new ExecFuture('%C', $boot->getBootCommand()); $exec->write($boot->getBootSequence(), $keep_open = true); $exec_channel = new PhutilExecChannel($exec); $agent = new PhutilJSONProtocolChannel($exec_channel); $agent->write( array( 'type' => 'EXEC', 'key' => 1, 'command' => 'echo phage', + 'timeout' => null, )); $this->agentExpect( $agent, array( 'type' => 'RSLV', 'key' => 1, 'err' => 0, 'stdout' => "phage\n", 'stderr' => '', + 'timeout' => false, ), pht("'%s' for %s", 'echo phage', $name)); $agent->write( array( 'type' => 'EXIT', )); } private function agentExpect(PhutilChannel $agent, $expect, $what) { $message = $agent->waitForMessage(); $this->assertEqual($expect, $message, $what); } } diff --git a/src/phage/action/PhageAgentAction.php b/src/phage/action/PhageAgentAction.php index 1e996a4..9a84b47 100644 --- a/src/phage/action/PhageAgentAction.php +++ b/src/phage/action/PhageAgentAction.php @@ -1,272 +1,274 @@ isExiting) { throw new Exception( pht( 'You can not add new actions to an exiting agent.')); } $key = 'command/'.$this->commandKey++; $this->commands[$key] = array( 'key' => $key, 'command' => $action, ); $this->queued[$key] = $key; } public function isActiveAgent() { return $this->isActive; } final public function setLimit($limit) { $this->limit = $limit; return $this; } final public function getLimit() { return $this->limit; } final public function setThrottle($throttle) { $this->throttle = $throttle; return $this; } final public function getThrottle() { return $this->throttle; } abstract protected function newAgentFuture(PhutilCommandString $command); protected function getAllWaitingChannels() { $channels = array(); if ($this->isActiveAgent()) { $channels[] = $this->channel; } return $channels; } public function startAgent() { $bootloader = new PhagePHPAgentBootloader(); $future = $this->newAgentFuture($bootloader->getBootCommand()); $future->write($bootloader->getBootSequence(), $keep_open = true); $channel = new PhutilExecChannel($future); $channel->setStderrHandler(array($this, 'didReadAgentStderr')); $channel = new PhutilJSONProtocolChannel($channel); $this->future = $future; $this->channel = $channel; $this->isActive = true; } private function updateQueue() { // If we don't have anything waiting in queue, we have nothing to do. if (!$this->queued) { return false; } $now = microtime(true); // If we're throttling commands and recently started one, don't start // another one yet. $throttle = $this->getThrottle(); if ($throttle) { if ($this->waitUntil && ($now < $this->waitUntil)) { return false; } } // If we're limiting parallelism and the active list is full, don't // start anything else yet. $limit = $this->getLimit(); if ($limit) { if (count($this->active) >= $limit) { return false; } } // Move a command out of the queue and tell the agent to execute it. $key = head($this->queued); unset($this->queued[$key]); $this->active[$key] = $key; $command = $this->commands[$key]['command']; $channel = $this->getChannel(); $channel->write( array( 'type' => 'EXEC', 'key' => $key, 'command' => $command->getCommand()->getUnmaskedString(), + 'timeout' => $command->getTimeout(), )); if ($throttle) { $this->waitUntil = ($now + $throttle); } return true; } private function getChannel() { return $this->channel; } public function updateAgent() { if (!$this->isActiveAgent()) { return; } $channel = $this->channel; while (true) { do { $did_update = $this->updateQueue(); } while ($did_update); $is_open = $channel->update(); $message = $channel->read(); if ($message !== null) { switch ($message['type']) { case 'TEXT': $key = $message['key']; $this->writeOutput($key, $message['kind'], $message['text']); break; case 'RSLV': $key = $message['key']; $command = $this->commands[$key]['command']; $this->writeOutput($key, 'stdout', $message['stdout']); $this->writeOutput($key, 'stderr', $message['stderr']); $exit_code = $message['err']; $command->setExitCode($exit_code); + $command->setDidTimeout((bool)idx($message, 'timeout')); if ($exit_code != 0) { $exit_code = $this->formatOutput( pht( 'Command ("%s") exited nonzero ("%s")!', $command->getCommand(), $exit_code), $command->getLabel()); fprintf(STDOUT, '%s', $exit_code); } unset($this->active[$key]); if (!$this->active && !$this->queued) { $channel->write( array( 'type' => 'EXIT', 'key' => 'exit', )); $this->isExiting = true; break; } } } if (!$is_open) { if ($this->isExiting) { $this->isActive = false; break; } else { throw new Exception(pht('Channel closed unexpectedly!')); } } if ($message === null) { break; } } } private function writeOutput($key, $kind, $text) { if (!strlen($text)) { return; } switch ($kind) { case 'stdout': $target = STDOUT; break; case 'stderr': $target = STDERR; break; default: throw new Exception(pht('Unknown output kind "%s".', $kind)); } $command = $this->commands[$key]['command']; $label = $command->getLabel(); if (!strlen($label)) { $label = pht('Unknown Command'); } $text = $this->formatOutput($text, $label); fprintf($target, '%s', $text); } private function formatOutput($output, $context) { $output = phutil_split_lines($output, false); foreach ($output as $key => $line) { $output[$key] = tsprintf("[%s] %R\n", $context, $line); } $output = implode('', $output); return $output; } public function didReadAgentStderr($channel, $stderr) { throw new Exception( pht( 'Unexpected output on agent stderr: %s.', $stderr)); } } diff --git a/src/phage/action/PhageExecuteAction.php b/src/phage/action/PhageExecuteAction.php index fa477c2..811b289 100644 --- a/src/phage/action/PhageExecuteAction.php +++ b/src/phage/action/PhageExecuteAction.php @@ -1,41 +1,62 @@ command = $command; return $this; } public function getCommand() { return $this->command; } public function setLabel($label) { $this->label = $label; return $this; } public function getLabel() { return $this->label; } + public function setTimeout($timeout) { + $this->timeout = $timeout; + return $this; + } + + public function getTimeout() { + return $this->timeout; + } + public function setExitCode($exit_code) { $this->exitCode = $exit_code; return $this; } public function getExitCode() { return $this->exitCode; } + public function setDidTimeout($did_timeout) { + $this->didTimeout = $did_timeout; + return $this; + } + + public function getDidTimeout() { + return $this->didTimeout; + } + } diff --git a/src/phage/agent/PhagePHPAgent.php b/src/phage/agent/PhagePHPAgent.php index 6ac27bf..e3beebc 100644 --- a/src/phage/agent/PhagePHPAgent.php +++ b/src/phage/agent/PhagePHPAgent.php @@ -1,138 +1,145 @@ stdin = $stdin; } public function execute() { while (true) { if ($this->exec) { $iterator = new FutureIterator($this->exec); $iterator->setUpdateInterval(0.050); foreach ($iterator as $key => $future) { if ($future === null) { foreach ($this->exec as $read_key => $read_future) { $this->readFuture($read_key, $read_future); } break; } else { $this->resolveFuture($key, $future); } } } else { PhutilChannel::waitForAny(array($this->getMaster())); } $this->processInput(); } } private function getMaster() { if (!$this->master) { $raw_channel = new PhutilSocketChannel( $this->stdin, fopen('php://stdout', 'w')); $json_channel = new PhutilJSONProtocolChannel($raw_channel); $this->master = $json_channel; } return $this->master; } private function processInput() { $channel = $this->getMaster(); $open = $channel->update(); if (!$open) { throw new Exception(pht('Channel closed!')); } while (true) { $command = $channel->read(); if ($command === null) { break; } $this->processCommand($command); } } private function processCommand(array $spec) { switch ($spec['type']) { case 'EXEC': $key = $spec['key']; $cmd = $spec['command']; $future = new ExecFuture('%C', $cmd); + + $timeout = $spec['timeout']; + if ($timeout) { + $future->setTimeout(ceil($timeout)); + } + $future->isReady(); $this->exec[$key] = $future; break; case 'EXIT': $this->terminateAgent(); break; } } private function readFuture($key, ExecFuture $future) { $master = $this->getMaster(); list($stdout, $stderr) = $future->read(); $future->discardBuffers(); if (strlen($stdout)) { $master->write( array( 'type' => 'TEXT', 'key' => $key, 'kind' => 'stdout', 'text' => $stdout, )); } if (strlen($stderr)) { $master->write( array( 'type' => 'TEXT', 'key' => $key, 'kind' => 'stderr', 'text' => $stderr, )); } } private function resolveFuture($key, ExecFuture $future) { $result = $future->resolve(); $master = $this->getMaster(); $master->write( array( 'type' => 'RSLV', 'key' => $key, 'err' => $result[0], 'stdout' => $result[1], 'stderr' => $result[2], + 'timeout' => (bool)$future->getWasKilledByTimeout(), )); unset($this->exec[$key]); } public function __destruct() { $this->terminateAgent(); } private function terminateAgent() { foreach ($this->exec as $key => $future) { $future->resolveKill(); } exit(0); } }