Page MenuHomePhabricator

D17396.id41833.diff
No OneTemporary

D17396.id41833.diff

diff --git a/src/phage/action/PhageAgentAction.php b/src/phage/action/PhageAgentAction.php
--- a/src/phage/action/PhageAgentAction.php
+++ b/src/phage/action/PhageAgentAction.php
@@ -11,6 +11,13 @@
private $isExiting = false;
private $isActive = false;
+ private $limit = 0;
+ private $throttle = 0;
+ private $waitUntil;
+
+ private $queued = array();
+ private $active = array();
+
final public function isContainerAction() {
return true;
}
@@ -20,14 +27,49 @@
// explicit or by making agents execute actions?
if (!($action instanceof PhageExecuteAction)) {
- throw new Exception(pht('Can only add execute actions to an agent.'));
+ throw new Exception(
+ pht(
+ 'Can only add execute actions to an agent.'));
}
+
+ if ($this->isExiting) {
+ throw new Exception(
+ pht(
+ 'You can not add new actions to an exiting agent.'));
+ }
+
+ $key = 'command/'.$this->commandKey++;
+
+ $this->commands[$key] = array(
+ 'key' => $key,
+ 'command' => $action,
+ );
+
+ $this->queued[$key] = $key;
}
public function isActiveAgent() {
return $this->isActive;
}
+ final public function setLimit($limit) {
+ $this->limit = $limit;
+ return $this;
+ }
+
+ final public function getLimit() {
+ return $this->limit;
+ }
+
+ final public function setThrottle($throttle) {
+ $this->throttle = $throttle;
+ return $this;
+ }
+
+ final public function getThrottle() {
+ return $this->throttle;
+ }
+
abstract protected function newAgentFuture(PhutilCommandString $command);
protected function getAllWaitingChannels() {
@@ -52,25 +94,62 @@
$channel = new PhutilJSONProtocolChannel($channel);
- foreach ($this->getActions() as $command) {
- $key = 'command/'.$this->commandKey++;
+ $this->future = $future;
+ $this->channel = $channel;
+ $this->isActive = true;
+ }
+
+ private function updateQueue() {
+ // If we don't have anything waiting in queue, we have nothing to do.
+ if (!$this->queued) {
+ return false;
+ }
+
+ $now = microtime(true);
+
+ // If we're throttling commands and recently started one, don't start
+ // another one yet.
+ $throttle = $this->getThrottle();
+ if ($throttle) {
+ if ($this->waitUntil && ($now < $this->waitUntil)) {
+ return false;
+ }
+ }
+
+ // If we're limiting parallelism and the active list is full, don't
+ // start anything else yet.
+ $limit = $this->getLimit();
+ if ($limit) {
+ if (count($this->active) >= $limit) {
+ return false;
+ }
+ }
+
+ // Move a command out of the queue and tell the agent to execute it.
+ $key = head($this->queued);
+ unset($this->queued[$key]);
+
+ $this->active[$key] = $key;
+ $command = $this->commands[$key]['command'];
+
+ $channel = $this->getChannel();
- $this->commands[$key] = array(
+ $channel->write(
+ array(
+ 'type' => 'EXEC',
'key' => $key,
- 'command' => $command,
- );
-
- $channel->write(
- array(
- 'type' => 'EXEC',
- 'key' => $key,
- 'command' => $command->getCommand()->getUnmaskedString(),
- ));
+ 'command' => $command->getCommand()->getUnmaskedString(),
+ ));
+
+ if ($throttle) {
+ $this->waitUntil = ($now + $throttle);
}
- $this->future = $future;
- $this->channel = $channel;
- $this->isActive = true;
+ return true;
+ }
+
+ private function getChannel() {
+ return $this->channel;
}
public function updateAgent() {
@@ -81,6 +160,10 @@
$channel = $this->channel;
while (true) {
+ do {
+ $did_update = $this->updateQueue();
+ } while ($did_update);
+
$is_open = $channel->update();
$message = $channel->read();
@@ -112,9 +195,9 @@
fprintf(STDOUT, '%s', $exit_code);
}
- unset($this->commands[$key]);
+ unset($this->active[$key]);
- if (!$this->commands) {
+ if (!$this->active && !$this->queued) {
$channel->write(
array(
'type' => 'EXIT',

File Metadata

Mime Type
text/plain
Expires
Fri, Mar 21, 6:40 PM (5 d, 19 h ago)
Storage Engine
blob
Storage Format
Encrypted (AES-256-CBC)
Storage Handle
7384603
Default Alt Text
D17396.id41833.diff (4 KB)

Event Timeline