diff --git a/src/phage/action/PhageAgentAction.php b/src/phage/action/PhageAgentAction.php --- a/src/phage/action/PhageAgentAction.php +++ b/src/phage/action/PhageAgentAction.php @@ -11,6 +11,13 @@ private $isExiting = false; private $isActive = false; + private $limit = 0; + private $throttle = 0; + private $waitUntil; + + private $queued = array(); + private $active = array(); + final public function isContainerAction() { return true; } @@ -20,14 +27,49 @@ // explicit or by making agents execute actions? if (!($action instanceof PhageExecuteAction)) { - throw new Exception(pht('Can only add execute actions to an agent.')); + throw new Exception( + pht( + 'Can only add execute actions to an agent.')); } + + if ($this->isExiting) { + throw new Exception( + pht( + 'You can not add new actions to an exiting agent.')); + } + + $key = 'command/'.$this->commandKey++; + + $this->commands[$key] = array( + 'key' => $key, + 'command' => $action, + ); + + $this->queued[$key] = $key; } public function isActiveAgent() { return $this->isActive; } + final public function setLimit($limit) { + $this->limit = $limit; + return $this; + } + + final public function getLimit() { + return $this->limit; + } + + final public function setThrottle($throttle) { + $this->throttle = $throttle; + return $this; + } + + final public function getThrottle() { + return $this->throttle; + } + abstract protected function newAgentFuture(PhutilCommandString $command); protected function getAllWaitingChannels() { @@ -52,25 +94,62 @@ $channel = new PhutilJSONProtocolChannel($channel); - foreach ($this->getActions() as $command) { - $key = 'command/'.$this->commandKey++; + $this->future = $future; + $this->channel = $channel; + $this->isActive = true; + } + + private function updateQueue() { + // If we don't have anything waiting in queue, we have nothing to do. + if (!$this->queued) { + return false; + } + + $now = microtime(true); + + // If we're throttling commands and recently started one, don't start + // another one yet. + $throttle = $this->getThrottle(); + if ($throttle) { + if ($this->waitUntil && ($now < $this->waitUntil)) { + return false; + } + } + + // If we're limiting parallelism and the active list is full, don't + // start anything else yet. + $limit = $this->getLimit(); + if ($limit) { + if (count($this->active) >= $limit) { + return false; + } + } + + // Move a command out of the queue and tell the agent to execute it. + $key = head($this->queued); + unset($this->queued[$key]); + + $this->active[$key] = $key; + $command = $this->commands[$key]['command']; + + $channel = $this->getChannel(); - $this->commands[$key] = array( + $channel->write( + array( + 'type' => 'EXEC', 'key' => $key, - 'command' => $command, - ); - - $channel->write( - array( - 'type' => 'EXEC', - 'key' => $key, - 'command' => $command->getCommand()->getUnmaskedString(), - )); + 'command' => $command->getCommand()->getUnmaskedString(), + )); + + if ($throttle) { + $this->waitUntil = ($now + $throttle); } - $this->future = $future; - $this->channel = $channel; - $this->isActive = true; + return true; + } + + private function getChannel() { + return $this->channel; } public function updateAgent() { @@ -81,6 +160,10 @@ $channel = $this->channel; while (true) { + do { + $did_update = $this->updateQueue(); + } while ($did_update); + $is_open = $channel->update(); $message = $channel->read(); @@ -112,9 +195,9 @@ fprintf(STDOUT, '%s', $exit_code); } - unset($this->commands[$key]); + unset($this->active[$key]); - if (!$this->commands) { + if (!$this->active && !$this->queued) { $channel->write( array( 'type' => 'EXIT',