Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F15461113
D21036.id.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
15 KB
Referenced Files
None
Subscribers
None
D21036.id.diff
View Options
diff --git a/src/future/Future.php b/src/future/Future.php
--- a/src/future/Future.php
+++ b/src/future/Future.php
@@ -8,8 +8,11 @@
abstract class Future extends Phobject {
private $hasResult = false;
+ private $hasStarted = false;
+ private $hasEnded = false;
private $result;
private $exception;
+ private $futureKey;
/**
* Is this future's process complete? Specifically, can this future be
@@ -36,16 +39,28 @@
'timeout.'));
}
+ if ($this->hasException()) {
+ throw $this->getException();
+ }
+
if (!$this->hasResult()) {
$graph = new FutureIterator(array($this));
$graph->resolveAll();
}
- if ($this->hasException()) {
- throw $this->getException();
+ return $this->getResult();
+ }
+
+ final public function startFuture() {
+ if ($this->hasStarted) {
+ throw new Exception(
+ pht(
+ 'Future has already started; futures can not start more '.
+ 'than once.'));
}
+ $this->hasStarted = true;
- return $this->getResult();
+ $this->isReady();
}
final public function updateFuture() {
@@ -66,11 +81,28 @@
}
}
+ final public function endFuture() {
+ if (!$this->hasException() && !$this->hasResult()) {
+ throw new Exception(
+ pht(
+ 'Trying to end a future which has no exception and no result. '.
+ 'Futures must resolve before they can be ended.'));
+ }
+
+ if ($this->hasEnded) {
+ throw new Exception(
+ pht(
+ 'Future has already ended; futures can not end more '.
+ 'than once.'));
+ }
+ $this->hasEnded = true;
+ }
+
/**
* Retrieve a list of sockets which we can wait to become readable while
* a future is resolving. If your future has sockets which can be
* `select()`ed, return them here (or in @{method:getWriteSockets}) to make
- * the resolve loop do a `select()`. If you do not return sockets in either
+ * the resolve loop do a `select()`. If you do not return sockets in either
* case, you'll get a busy wait.
*
* @return list A list of sockets which we expect to become readable.
@@ -153,5 +185,27 @@
return ($this->exception !== null);
}
+ final public function setFutureKey($key) {
+ if ($this->futureKey !== null) {
+ throw new Exception(
+ pht(
+ 'Future already has a key ("%s") assigned.',
+ $key));
+ }
+
+ $this->futureKey = $key;
+
+ return $this;
+ }
+
+ final public function getFutureKey() {
+ static $next_key = 1;
+
+ if ($this->futureKey === null) {
+ $this->futureKey = sprintf('Future/%d', $next_key++);
+ }
+
+ return $this->futureKey;
+ }
}
diff --git a/src/future/FutureIterator.php b/src/future/FutureIterator.php
--- a/src/future/FutureIterator.php
+++ b/src/future/FutureIterator.php
@@ -28,17 +28,22 @@
* @task iterator Iterator Interface
* @task internal Internals
*/
-final class FutureIterator extends Phobject implements Iterator {
+final class FutureIterator
+ extends Phobject
+ implements Iterator {
- protected $wait = array();
- protected $work = array();
- protected $futures = array();
- protected $key;
+ private $hold = array();
+ private $wait = array();
+ private $work = array();
- protected $limit;
+ private $futures = array();
+ private $key;
- protected $timeout;
- protected $isTimeout = false;
+ private $limit;
+
+ private $timeout;
+ private $isTimeout = false;
+ private $hasRewound = false;
/* -( Basics )------------------------------------------------------------- */
@@ -52,7 +57,15 @@
*/
public function __construct(array $futures) {
assert_instances_of($futures, 'Future');
- $this->futures = $futures;
+
+ $respect_keys = !phutil_is_natural_list($futures);
+
+ foreach ($futures as $map_key => $future) {
+ if ($respect_keys) {
+ $future->setFutureKey($map_key);
+ }
+ $this->addFuture($future);
+ }
}
@@ -63,7 +76,17 @@
* @task basics
*/
public function resolveAll() {
- iterator_to_array($this);
+ // If a caller breaks out of a "foreach" and then calls "resolveAll()",
+ // interpret it to mean that we should iterate over whatever futures
+ // remain.
+
+ if ($this->hasRewound) {
+ while ($this->valid()) {
+ $this->next();
+ }
+ } else {
+ iterator_to_array($this);
+ }
}
/**
@@ -74,24 +97,20 @@
* @param Future @{class:Future} to add to iterator
* @task basics
*/
- public function addFuture(Future $future, $key = null) {
- if ($key === null) {
- $this->futures[] = $future;
- $this->wait[] = last_key($this->futures);
- } else if (!isset($this->futures[$key])) {
- $this->futures[$key] = $future;
- $this->wait[] = $key;
- } else {
- throw new Exception(pht('Invalid key %s', $key));
+ public function addFuture(Future $future) {
+ $key = $future->getFutureKey();
+
+ if (isset($this->futures[$key])) {
+ throw new Exception(
+ pht(
+ 'This future graph already has a future with key "%s". Each '.
+ 'future must have a unique key.',
+ $key));
}
- // Start running the future if we don't have $this->limit futures running
- // already. updateWorkingSet() won't start running the future if there's no
- // limit, so we'll manually poke it here in that case.
- $this->updateWorkingSet();
- if (!$this->limit) {
- $future->isReady();
- }
+ $this->futures[$key] = $future;
+ $this->hold[$key] = $key;
+
return $this;
}
@@ -150,6 +169,15 @@
}
+ public function setMaximumWorkingSetSize($limit) {
+ $this->limit = $limit;
+ return $this;
+ }
+
+ public function getMaximumWorkingSetSize() {
+ return $this->limit;
+ }
+
/* -( Iterator Interface )------------------------------------------------- */
@@ -157,9 +185,12 @@
* @task iterator
*/
public function rewind() {
- $this->wait = array_keys($this->futures);
- $this->work = null;
- $this->updateWorkingSet();
+ if ($this->hasRewound) {
+ throw new Exception(
+ pht('Future graphs can not be rewound.'));
+ }
+ $this->hasRewound = true;
+
$this->next();
}
@@ -168,95 +199,93 @@
*/
public function next() {
$this->key = null;
- if (!count($this->wait)) {
+
+ $this->updateWorkingSet();
+
+ if (!$this->work) {
return;
}
- $read_sockets = array();
- $write_sockets = array();
-
$start = microtime(true);
$timeout = $this->timeout;
$this->isTimeout = false;
- $check = $this->getWorkingSet();
+ $working_set = array_select_keys($this->futures, $this->work);
- $resolve = null;
- do {
- $read_sockets = array();
- $write_sockets = array();
- $can_use_sockets = true;
- $wait_time = 1;
- foreach ($check as $wait => $key) {
- $future = $this->futures[$key];
+ while (true) {
+ // Update every future first. This is a no-op on futures which have
+ // already resolved or failed, but we want to give futures an
+ // opportunity to make progress even if we can resolve something.
+ foreach ($working_set as $future_key => $future) {
$future->updateFuture();
+ }
+
+ // Check if any future has resolved or failed. If we have any such
+ // futures, we'll return the first one from the iterator.
+ $resolve_key = null;
+ foreach ($working_set as $future_key => $future) {
if ($future->hasException()) {
- if ($resolve === null) {
- $resolve = $wait;
- }
- continue;
+ $resolve_key = $future_key;
+ break;
}
if ($future->hasResult()) {
- if ($resolve === null) {
- $resolve = $wait;
- }
- continue;
+ $resolve_key = $future_key;
+ break;
}
+ }
- $got_sockets = false;
- $socks = $future->getReadSockets();
- if ($socks) {
- $got_sockets = true;
- foreach ($socks as $socket) {
- $read_sockets[] = $socket;
- }
- }
+ // We've found a future to resolve, so we're done here for now.
- $socks = $future->getWriteSockets();
- if ($socks) {
- $got_sockets = true;
- foreach ($socks as $socket) {
- $write_sockets[] = $socket;
- }
- }
+ if ($resolve_key !== null) {
+ $this->moveFutureToDone($resolve_key);
+ return;
+ }
- // If any currently active future had neither read nor write sockets,
- // we can't wait for the current batch of items using sockets.
- if (!$got_sockets) {
- $can_use_sockets = false;
- } else {
- $wait_time = min($wait_time, $future->getDefaultWait());
+ // We don't have any futures to resolve yet. Check if we're reached
+ // an update interval.
+
+ $wait_time = 1;
+ if ($timeout !== null) {
+ $elapsed = microtime(true) - $start;
+
+ if ($elapsed > $timeout) {
+ $this->isTimeout = true;
+ return;
}
+
+ $wait_time = min($wait_time, $timeout - $elapsed);
}
- if ($resolve === null) {
-
- // Check for a setUpdateInterval() timeout.
- if ($timeout !== null) {
- $elapsed = microtime(true) - $start;
- if ($elapsed > $timeout) {
- $this->isTimeout = true;
- return;
- } else {
- $wait_time = $timeout - $elapsed;
- }
+ // We're going to wait. If possible, we'd like to wait with sockets.
+ // If we can't, we'll just sleep.
+
+ $read_sockets = array();
+ $write_sockets = array();
+ foreach ($working_set as $future_key => $future) {
+ $sockets = $future->getReadSockets();
+ foreach ($sockets as $socket) {
+ $read_sockets[] = $socket;
}
- if ($can_use_sockets) {
- $this->waitForSockets($read_sockets, $write_sockets, $wait_time);
- } else {
- usleep(1000);
+ $sockets = $future->getWriteSockets();
+ foreach ($sockets as $socket) {
+ $write_sockets[] = $socket;
}
}
- } while ($resolve === null);
-
- $this->key = $this->wait[$resolve];
- unset($this->wait[$resolve]);
- $this->updateWorkingSet();
+ $use_sockets = ($read_sockets || $write_sockets);
+ if ($use_sockets) {
+ foreach ($working_set as $future) {
+ $wait_time = min($wait_time, $future->getDefaultWait());
+ }
+ $this->waitForSockets($read_sockets, $write_sockets, $wait_time);
+ } else {
+ usleep(1000);
+ }
+ }
}
/**
@@ -292,39 +321,96 @@
/* -( Internals )---------------------------------------------------------- */
-
/**
* @task internal
*/
- protected function getWorkingSet() {
- if ($this->work === null) {
- return $this->wait;
+ protected function updateWorkingSet() {
+ $limit = $this->getMaximumWorkingSetSize();
+ $work_count = count($this->work);
+
+ // If we're already working on the maximum number of futures, we just have
+ // to wait for something to resolve. There's no benefit to updating the
+ // queue since we can never make any meaningful progress.
+
+ if ($limit) {
+ if ($work_count >= $limit) {
+ return;
+ }
}
- return $this->work;
- }
+ // If any futures that are currently held are no longer blocked by
+ // dependencies, move them from "hold" to "wait".
- /**
- * @task internal
- */
- protected function updateWorkingSet() {
- if (!$this->limit) {
- return;
+ foreach ($this->hold as $future_key) {
+ if (!$this->canMoveFutureToWait($future_key)) {
+ continue;
+ }
+
+ $this->moveFutureToWait($future_key);
+ }
+
+ $wait_count = count($this->wait);
+ $hold_count = count($this->hold);
+
+ if (!$work_count && !$wait_count && $hold_count) {
+ throw new Exception(
+ pht(
+ 'Future graph is stalled: some futures are held, but no futures '.
+ 'are waiting or working. The graph can never resolve.'));
}
- $old = $this->work;
- $this->work = array_slice($this->wait, 0, $this->limit, true);
+ // Figure out how many futures we can start. If we don't have a limit,
+ // we can start every waiting future. If we do have a limit, we can only
+ // start as many futures as we have slots for.
- // If we're using a limit, our futures are sleeping and need to be polled
- // to begin execution, so poll any futures which weren't in our working set
- // before.
- foreach ($this->work as $work => $key) {
- if (!isset($old[$work])) {
- $this->futures[$key]->isReady();
+ if ($limit) {
+ $work_limit = min($limit, $wait_count);
+ } else {
+ $work_limit = $wait_count;
+ }
+
+ // If we're ready to start futures, start them now.
+
+ if ($work_limit) {
+ foreach ($this->wait as $future_key) {
+ $this->moveFutureToWork($future_key);
+
+ $work_limit--;
+ if (!$work_limit) {
+ return;
+ }
}
}
+
}
+ private function canMoveFutureToWait($future_key) {
+ return true;
+ }
+
+ private function moveFutureToWait($future_key) {
+ unset($this->hold[$future_key]);
+ $this->wait[$future_key] = $future_key;
+ }
+
+ private function moveFutureToWork($future_key) {
+ unset($this->wait[$future_key]);
+ $this->work[$future_key] = $future_key;
+
+ $this->futures[$future_key]->startFuture();
+ }
+
+ private function moveFutureToDone($future_key) {
+ $this->key = $future_key;
+ unset($this->work[$future_key]);
+
+ // Before we return, do another working set update so we start any
+ // futures that are ready to go as soon as we can.
+
+ $this->updateWorkingSet();
+
+ $this->futures[$future_key]->endFuture();
+ }
/**
* Wait for activity on one of several sockets.
diff --git a/src/future/exec/__tests__/ExecFutureTestCase.php b/src/future/exec/__tests__/ExecFutureTestCase.php
--- a/src/future/exec/__tests__/ExecFutureTestCase.php
+++ b/src/future/exec/__tests__/ExecFutureTestCase.php
@@ -61,7 +61,7 @@
// NOTE: This tests interactions between the resolve() timeout and the
// resolution timeout, which are somewhat similar but not identical.
- $future = $this->newSleep(32000)->start();
+ $future = $this->newSleep(32000);
$future->setTimeout(32000);
// We expect this to return in 0.01s.
@@ -77,7 +77,7 @@
// do this, we'll hang when exiting until our subprocess exits (32000
// seconds!)
$future->setTimeout(0.01);
- $future->resolve();
+ $iterator->resolveAll();
}
public function testTerminateWithoutStart() {
diff --git a/src/lint/linter/ArcanistFutureLinter.php b/src/lint/linter/ArcanistFutureLinter.php
--- a/src/lint/linter/ArcanistFutureLinter.php
+++ b/src/lint/linter/ArcanistFutureLinter.php
@@ -15,7 +15,8 @@
$limit = $this->getFuturesLimit();
$this->futures = id(new FutureIterator(array()))->limit($limit);
foreach ($this->buildFutures($paths) as $path => $future) {
- $this->futures->addFuture($future, $path);
+ $future->setFutureKey($path);
+ $this->futures->addFuture($future);
}
}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Wed, Apr 2, 5:21 AM (3 d, 11 h ago)
Storage Engine
blob
Storage Format
Encrypted (AES-256-CBC)
Storage Handle
7654416
Default Alt Text
D21036.id.diff (15 KB)
Attached To
Mode
D21036: Make "FutureIterator" queue management more formal
Attached
Detach File
Event Timeline
Log In to Comment