diff --git a/src/phage/action/PhageAgentAction.php b/src/phage/action/PhageAgentAction.php index 61894f0..1e996a4 100644 --- a/src/phage/action/PhageAgentAction.php +++ b/src/phage/action/PhageAgentAction.php @@ -1,189 +1,272 @@ isExiting) { + throw new Exception( + pht( + 'You can not add new actions to an exiting agent.')); + } + + $key = 'command/'.$this->commandKey++; + + $this->commands[$key] = array( + 'key' => $key, + 'command' => $action, + ); + + $this->queued[$key] = $key; } public function isActiveAgent() { return $this->isActive; } + final public function setLimit($limit) { + $this->limit = $limit; + return $this; + } + + final public function getLimit() { + return $this->limit; + } + + final public function setThrottle($throttle) { + $this->throttle = $throttle; + return $this; + } + + final public function getThrottle() { + return $this->throttle; + } + abstract protected function newAgentFuture(PhutilCommandString $command); protected function getAllWaitingChannels() { $channels = array(); if ($this->isActiveAgent()) { $channels[] = $this->channel; } return $channels; } public function startAgent() { $bootloader = new PhagePHPAgentBootloader(); $future = $this->newAgentFuture($bootloader->getBootCommand()); $future->write($bootloader->getBootSequence(), $keep_open = true); $channel = new PhutilExecChannel($future); $channel->setStderrHandler(array($this, 'didReadAgentStderr')); $channel = new PhutilJSONProtocolChannel($channel); - foreach ($this->getActions() as $command) { - $key = 'command/'.$this->commandKey++; + $this->future = $future; + $this->channel = $channel; + $this->isActive = true; + } + + private function updateQueue() { + // If we don't have anything waiting in queue, we have nothing to do. + if (!$this->queued) { + return false; + } + + $now = microtime(true); + + // If we're throttling commands and recently started one, don't start + // another one yet. + $throttle = $this->getThrottle(); + if ($throttle) { + if ($this->waitUntil && ($now < $this->waitUntil)) { + return false; + } + } + + // If we're limiting parallelism and the active list is full, don't + // start anything else yet. + $limit = $this->getLimit(); + if ($limit) { + if (count($this->active) >= $limit) { + return false; + } + } + + // Move a command out of the queue and tell the agent to execute it. + $key = head($this->queued); + unset($this->queued[$key]); + + $this->active[$key] = $key; + $command = $this->commands[$key]['command']; + + $channel = $this->getChannel(); - $this->commands[$key] = array( + $channel->write( + array( + 'type' => 'EXEC', 'key' => $key, - 'command' => $command, - ); - - $channel->write( - array( - 'type' => 'EXEC', - 'key' => $key, - 'command' => $command->getCommand()->getUnmaskedString(), - )); + 'command' => $command->getCommand()->getUnmaskedString(), + )); + + if ($throttle) { + $this->waitUntil = ($now + $throttle); } - $this->future = $future; - $this->channel = $channel; - $this->isActive = true; + return true; + } + + private function getChannel() { + return $this->channel; } public function updateAgent() { if (!$this->isActiveAgent()) { return; } $channel = $this->channel; while (true) { + do { + $did_update = $this->updateQueue(); + } while ($did_update); + $is_open = $channel->update(); $message = $channel->read(); if ($message !== null) { switch ($message['type']) { case 'TEXT': $key = $message['key']; $this->writeOutput($key, $message['kind'], $message['text']); break; case 'RSLV': $key = $message['key']; $command = $this->commands[$key]['command']; $this->writeOutput($key, 'stdout', $message['stdout']); $this->writeOutput($key, 'stderr', $message['stderr']); $exit_code = $message['err']; $command->setExitCode($exit_code); if ($exit_code != 0) { $exit_code = $this->formatOutput( pht( 'Command ("%s") exited nonzero ("%s")!', $command->getCommand(), $exit_code), $command->getLabel()); fprintf(STDOUT, '%s', $exit_code); } - unset($this->commands[$key]); + unset($this->active[$key]); - if (!$this->commands) { + if (!$this->active && !$this->queued) { $channel->write( array( 'type' => 'EXIT', 'key' => 'exit', )); $this->isExiting = true; break; } } } if (!$is_open) { if ($this->isExiting) { $this->isActive = false; break; } else { throw new Exception(pht('Channel closed unexpectedly!')); } } if ($message === null) { break; } } } private function writeOutput($key, $kind, $text) { if (!strlen($text)) { return; } switch ($kind) { case 'stdout': $target = STDOUT; break; case 'stderr': $target = STDERR; break; default: throw new Exception(pht('Unknown output kind "%s".', $kind)); } $command = $this->commands[$key]['command']; $label = $command->getLabel(); if (!strlen($label)) { $label = pht('Unknown Command'); } $text = $this->formatOutput($text, $label); fprintf($target, '%s', $text); } private function formatOutput($output, $context) { $output = phutil_split_lines($output, false); foreach ($output as $key => $line) { $output[$key] = tsprintf("[%s] %R\n", $context, $line); } $output = implode('', $output); return $output; } public function didReadAgentStderr($channel, $stderr) { throw new Exception( pht( 'Unexpected output on agent stderr: %s.', $stderr)); } }