Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F15419664
D17396.id.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
4 KB
Referenced Files
None
Subscribers
None
D17396.id.diff
View Options
diff --git a/src/phage/action/PhageAgentAction.php b/src/phage/action/PhageAgentAction.php
--- a/src/phage/action/PhageAgentAction.php
+++ b/src/phage/action/PhageAgentAction.php
@@ -11,6 +11,13 @@
private $isExiting = false;
private $isActive = false;
+ private $limit = 0;
+ private $throttle = 0;
+ private $waitUntil;
+
+ private $queued = array();
+ private $active = array();
+
final public function isContainerAction() {
return true;
}
@@ -20,14 +27,49 @@
// explicit or by making agents execute actions?
if (!($action instanceof PhageExecuteAction)) {
- throw new Exception(pht('Can only add execute actions to an agent.'));
+ throw new Exception(
+ pht(
+ 'Can only add execute actions to an agent.'));
}
+
+ if ($this->isExiting) {
+ throw new Exception(
+ pht(
+ 'You can not add new actions to an exiting agent.'));
+ }
+
+ $key = 'command/'.$this->commandKey++;
+
+ $this->commands[$key] = array(
+ 'key' => $key,
+ 'command' => $action,
+ );
+
+ $this->queued[$key] = $key;
}
public function isActiveAgent() {
return $this->isActive;
}
+ final public function setLimit($limit) {
+ $this->limit = $limit;
+ return $this;
+ }
+
+ final public function getLimit() {
+ return $this->limit;
+ }
+
+ final public function setThrottle($throttle) {
+ $this->throttle = $throttle;
+ return $this;
+ }
+
+ final public function getThrottle() {
+ return $this->throttle;
+ }
+
abstract protected function newAgentFuture(PhutilCommandString $command);
protected function getAllWaitingChannels() {
@@ -52,25 +94,62 @@
$channel = new PhutilJSONProtocolChannel($channel);
- foreach ($this->getActions() as $command) {
- $key = 'command/'.$this->commandKey++;
+ $this->future = $future;
+ $this->channel = $channel;
+ $this->isActive = true;
+ }
+
+ private function updateQueue() {
+ // If we don't have anything waiting in queue, we have nothing to do.
+ if (!$this->queued) {
+ return false;
+ }
+
+ $now = microtime(true);
+
+ // If we're throttling commands and recently started one, don't start
+ // another one yet.
+ $throttle = $this->getThrottle();
+ if ($throttle) {
+ if ($this->waitUntil && ($now < $this->waitUntil)) {
+ return false;
+ }
+ }
+
+ // If we're limiting parallelism and the active list is full, don't
+ // start anything else yet.
+ $limit = $this->getLimit();
+ if ($limit) {
+ if (count($this->active) >= $limit) {
+ return false;
+ }
+ }
+
+ // Move a command out of the queue and tell the agent to execute it.
+ $key = head($this->queued);
+ unset($this->queued[$key]);
+
+ $this->active[$key] = $key;
+ $command = $this->commands[$key]['command'];
+
+ $channel = $this->getChannel();
- $this->commands[$key] = array(
+ $channel->write(
+ array(
+ 'type' => 'EXEC',
'key' => $key,
- 'command' => $command,
- );
-
- $channel->write(
- array(
- 'type' => 'EXEC',
- 'key' => $key,
- 'command' => $command->getCommand()->getUnmaskedString(),
- ));
+ 'command' => $command->getCommand()->getUnmaskedString(),
+ ));
+
+ if ($throttle) {
+ $this->waitUntil = ($now + $throttle);
}
- $this->future = $future;
- $this->channel = $channel;
- $this->isActive = true;
+ return true;
+ }
+
+ private function getChannel() {
+ return $this->channel;
}
public function updateAgent() {
@@ -81,6 +160,10 @@
$channel = $this->channel;
while (true) {
+ do {
+ $did_update = $this->updateQueue();
+ } while ($did_update);
+
$is_open = $channel->update();
$message = $channel->read();
@@ -112,9 +195,9 @@
fprintf(STDOUT, '%s', $exit_code);
}
- unset($this->commands[$key]);
+ unset($this->active[$key]);
- if (!$this->commands) {
+ if (!$this->active && !$this->queued) {
$channel->write(
array(
'type' => 'EXIT',
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Mar 22, 7:51 AM (5 d, 6 h ago)
Storage Engine
blob
Storage Format
Encrypted (AES-256-CBC)
Storage Handle
7384603
Default Alt Text
D17396.id.diff (4 KB)
Attached To
Mode
D17396: Add limit (maximum simultaneous commands) and throttle (delay between commands) to Phage
Attached
Detach File
Event Timeline
Log In to Comment