Page MenuHomePhabricator

D21036.id.diff
No OneTemporary

D21036.id.diff

diff --git a/src/future/Future.php b/src/future/Future.php
--- a/src/future/Future.php
+++ b/src/future/Future.php
@@ -8,8 +8,11 @@
abstract class Future extends Phobject {
private $hasResult = false;
+ private $hasStarted = false;
+ private $hasEnded = false;
private $result;
private $exception;
+ private $futureKey;
/**
* Is this future's process complete? Specifically, can this future be
@@ -36,16 +39,28 @@
'timeout.'));
}
+ if ($this->hasException()) {
+ throw $this->getException();
+ }
+
if (!$this->hasResult()) {
$graph = new FutureIterator(array($this));
$graph->resolveAll();
}
- if ($this->hasException()) {
- throw $this->getException();
+ return $this->getResult();
+ }
+
+ final public function startFuture() {
+ if ($this->hasStarted) {
+ throw new Exception(
+ pht(
+ 'Future has already started; futures can not start more '.
+ 'than once.'));
}
+ $this->hasStarted = true;
- return $this->getResult();
+ $this->isReady();
}
final public function updateFuture() {
@@ -66,11 +81,28 @@
}
}
+ final public function endFuture() {
+ if (!$this->hasException() && !$this->hasResult()) {
+ throw new Exception(
+ pht(
+ 'Trying to end a future which has no exception and no result. '.
+ 'Futures must resolve before they can be ended.'));
+ }
+
+ if ($this->hasEnded) {
+ throw new Exception(
+ pht(
+ 'Future has already ended; futures can not end more '.
+ 'than once.'));
+ }
+ $this->hasEnded = true;
+ }
+
/**
* Retrieve a list of sockets which we can wait to become readable while
* a future is resolving. If your future has sockets which can be
* `select()`ed, return them here (or in @{method:getWriteSockets}) to make
- * the resolve loop do a `select()`. If you do not return sockets in either
+ * the resolve loop do a `select()`. If you do not return sockets in either
* case, you'll get a busy wait.
*
* @return list A list of sockets which we expect to become readable.
@@ -153,5 +185,27 @@
return ($this->exception !== null);
}
+ final public function setFutureKey($key) {
+ if ($this->futureKey !== null) {
+ throw new Exception(
+ pht(
+ 'Future already has a key ("%s") assigned.',
+ $key));
+ }
+
+ $this->futureKey = $key;
+
+ return $this;
+ }
+
+ final public function getFutureKey() {
+ static $next_key = 1;
+
+ if ($this->futureKey === null) {
+ $this->futureKey = sprintf('Future/%d', $next_key++);
+ }
+
+ return $this->futureKey;
+ }
}
diff --git a/src/future/FutureIterator.php b/src/future/FutureIterator.php
--- a/src/future/FutureIterator.php
+++ b/src/future/FutureIterator.php
@@ -28,17 +28,22 @@
* @task iterator Iterator Interface
* @task internal Internals
*/
-final class FutureIterator extends Phobject implements Iterator {
+final class FutureIterator
+ extends Phobject
+ implements Iterator {
- protected $wait = array();
- protected $work = array();
- protected $futures = array();
- protected $key;
+ private $hold = array();
+ private $wait = array();
+ private $work = array();
- protected $limit;
+ private $futures = array();
+ private $key;
- protected $timeout;
- protected $isTimeout = false;
+ private $limit;
+
+ private $timeout;
+ private $isTimeout = false;
+ private $hasRewound = false;
/* -( Basics )------------------------------------------------------------- */
@@ -52,7 +57,15 @@
*/
public function __construct(array $futures) {
assert_instances_of($futures, 'Future');
- $this->futures = $futures;
+
+ $respect_keys = !phutil_is_natural_list($futures);
+
+ foreach ($futures as $map_key => $future) {
+ if ($respect_keys) {
+ $future->setFutureKey($map_key);
+ }
+ $this->addFuture($future);
+ }
}
@@ -63,7 +76,17 @@
* @task basics
*/
public function resolveAll() {
- iterator_to_array($this);
+ // If a caller breaks out of a "foreach" and then calls "resolveAll()",
+ // interpret it to mean that we should iterate over whatever futures
+ // remain.
+
+ if ($this->hasRewound) {
+ while ($this->valid()) {
+ $this->next();
+ }
+ } else {
+ iterator_to_array($this);
+ }
}
/**
@@ -74,24 +97,20 @@
* @param Future @{class:Future} to add to iterator
* @task basics
*/
- public function addFuture(Future $future, $key = null) {
- if ($key === null) {
- $this->futures[] = $future;
- $this->wait[] = last_key($this->futures);
- } else if (!isset($this->futures[$key])) {
- $this->futures[$key] = $future;
- $this->wait[] = $key;
- } else {
- throw new Exception(pht('Invalid key %s', $key));
+ public function addFuture(Future $future) {
+ $key = $future->getFutureKey();
+
+ if (isset($this->futures[$key])) {
+ throw new Exception(
+ pht(
+ 'This future graph already has a future with key "%s". Each '.
+ 'future must have a unique key.',
+ $key));
}
- // Start running the future if we don't have $this->limit futures running
- // already. updateWorkingSet() won't start running the future if there's no
- // limit, so we'll manually poke it here in that case.
- $this->updateWorkingSet();
- if (!$this->limit) {
- $future->isReady();
- }
+ $this->futures[$key] = $future;
+ $this->hold[$key] = $key;
+
return $this;
}
@@ -150,6 +169,15 @@
}
+ public function setMaximumWorkingSetSize($limit) {
+ $this->limit = $limit;
+ return $this;
+ }
+
+ public function getMaximumWorkingSetSize() {
+ return $this->limit;
+ }
+
/* -( Iterator Interface )------------------------------------------------- */
@@ -157,9 +185,12 @@
* @task iterator
*/
public function rewind() {
- $this->wait = array_keys($this->futures);
- $this->work = null;
- $this->updateWorkingSet();
+ if ($this->hasRewound) {
+ throw new Exception(
+ pht('Future graphs can not be rewound.'));
+ }
+ $this->hasRewound = true;
+
$this->next();
}
@@ -168,95 +199,93 @@
*/
public function next() {
$this->key = null;
- if (!count($this->wait)) {
+
+ $this->updateWorkingSet();
+
+ if (!$this->work) {
return;
}
- $read_sockets = array();
- $write_sockets = array();
-
$start = microtime(true);
$timeout = $this->timeout;
$this->isTimeout = false;
- $check = $this->getWorkingSet();
+ $working_set = array_select_keys($this->futures, $this->work);
- $resolve = null;
- do {
- $read_sockets = array();
- $write_sockets = array();
- $can_use_sockets = true;
- $wait_time = 1;
- foreach ($check as $wait => $key) {
- $future = $this->futures[$key];
+ while (true) {
+ // Update every future first. This is a no-op on futures which have
+ // already resolved or failed, but we want to give futures an
+ // opportunity to make progress even if we can resolve something.
+ foreach ($working_set as $future_key => $future) {
$future->updateFuture();
+ }
+
+ // Check if any future has resolved or failed. If we have any such
+ // futures, we'll return the first one from the iterator.
+ $resolve_key = null;
+ foreach ($working_set as $future_key => $future) {
if ($future->hasException()) {
- if ($resolve === null) {
- $resolve = $wait;
- }
- continue;
+ $resolve_key = $future_key;
+ break;
}
if ($future->hasResult()) {
- if ($resolve === null) {
- $resolve = $wait;
- }
- continue;
+ $resolve_key = $future_key;
+ break;
}
+ }
- $got_sockets = false;
- $socks = $future->getReadSockets();
- if ($socks) {
- $got_sockets = true;
- foreach ($socks as $socket) {
- $read_sockets[] = $socket;
- }
- }
+ // We've found a future to resolve, so we're done here for now.
- $socks = $future->getWriteSockets();
- if ($socks) {
- $got_sockets = true;
- foreach ($socks as $socket) {
- $write_sockets[] = $socket;
- }
- }
+ if ($resolve_key !== null) {
+ $this->moveFutureToDone($resolve_key);
+ return;
+ }
- // If any currently active future had neither read nor write sockets,
- // we can't wait for the current batch of items using sockets.
- if (!$got_sockets) {
- $can_use_sockets = false;
- } else {
- $wait_time = min($wait_time, $future->getDefaultWait());
+ // We don't have any futures to resolve yet. Check if we're reached
+ // an update interval.
+
+ $wait_time = 1;
+ if ($timeout !== null) {
+ $elapsed = microtime(true) - $start;
+
+ if ($elapsed > $timeout) {
+ $this->isTimeout = true;
+ return;
}
+
+ $wait_time = min($wait_time, $timeout - $elapsed);
}
- if ($resolve === null) {
-
- // Check for a setUpdateInterval() timeout.
- if ($timeout !== null) {
- $elapsed = microtime(true) - $start;
- if ($elapsed > $timeout) {
- $this->isTimeout = true;
- return;
- } else {
- $wait_time = $timeout - $elapsed;
- }
+ // We're going to wait. If possible, we'd like to wait with sockets.
+ // If we can't, we'll just sleep.
+
+ $read_sockets = array();
+ $write_sockets = array();
+ foreach ($working_set as $future_key => $future) {
+ $sockets = $future->getReadSockets();
+ foreach ($sockets as $socket) {
+ $read_sockets[] = $socket;
}
- if ($can_use_sockets) {
- $this->waitForSockets($read_sockets, $write_sockets, $wait_time);
- } else {
- usleep(1000);
+ $sockets = $future->getWriteSockets();
+ foreach ($sockets as $socket) {
+ $write_sockets[] = $socket;
}
}
- } while ($resolve === null);
-
- $this->key = $this->wait[$resolve];
- unset($this->wait[$resolve]);
- $this->updateWorkingSet();
+ $use_sockets = ($read_sockets || $write_sockets);
+ if ($use_sockets) {
+ foreach ($working_set as $future) {
+ $wait_time = min($wait_time, $future->getDefaultWait());
+ }
+ $this->waitForSockets($read_sockets, $write_sockets, $wait_time);
+ } else {
+ usleep(1000);
+ }
+ }
}
/**
@@ -292,39 +321,96 @@
/* -( Internals )---------------------------------------------------------- */
-
/**
* @task internal
*/
- protected function getWorkingSet() {
- if ($this->work === null) {
- return $this->wait;
+ protected function updateWorkingSet() {
+ $limit = $this->getMaximumWorkingSetSize();
+ $work_count = count($this->work);
+
+ // If we're already working on the maximum number of futures, we just have
+ // to wait for something to resolve. There's no benefit to updating the
+ // queue since we can never make any meaningful progress.
+
+ if ($limit) {
+ if ($work_count >= $limit) {
+ return;
+ }
}
- return $this->work;
- }
+ // If any futures that are currently held are no longer blocked by
+ // dependencies, move them from "hold" to "wait".
- /**
- * @task internal
- */
- protected function updateWorkingSet() {
- if (!$this->limit) {
- return;
+ foreach ($this->hold as $future_key) {
+ if (!$this->canMoveFutureToWait($future_key)) {
+ continue;
+ }
+
+ $this->moveFutureToWait($future_key);
+ }
+
+ $wait_count = count($this->wait);
+ $hold_count = count($this->hold);
+
+ if (!$work_count && !$wait_count && $hold_count) {
+ throw new Exception(
+ pht(
+ 'Future graph is stalled: some futures are held, but no futures '.
+ 'are waiting or working. The graph can never resolve.'));
}
- $old = $this->work;
- $this->work = array_slice($this->wait, 0, $this->limit, true);
+ // Figure out how many futures we can start. If we don't have a limit,
+ // we can start every waiting future. If we do have a limit, we can only
+ // start as many futures as we have slots for.
- // If we're using a limit, our futures are sleeping and need to be polled
- // to begin execution, so poll any futures which weren't in our working set
- // before.
- foreach ($this->work as $work => $key) {
- if (!isset($old[$work])) {
- $this->futures[$key]->isReady();
+ if ($limit) {
+ $work_limit = min($limit, $wait_count);
+ } else {
+ $work_limit = $wait_count;
+ }
+
+ // If we're ready to start futures, start them now.
+
+ if ($work_limit) {
+ foreach ($this->wait as $future_key) {
+ $this->moveFutureToWork($future_key);
+
+ $work_limit--;
+ if (!$work_limit) {
+ return;
+ }
}
}
+
}
+ private function canMoveFutureToWait($future_key) {
+ return true;
+ }
+
+ private function moveFutureToWait($future_key) {
+ unset($this->hold[$future_key]);
+ $this->wait[$future_key] = $future_key;
+ }
+
+ private function moveFutureToWork($future_key) {
+ unset($this->wait[$future_key]);
+ $this->work[$future_key] = $future_key;
+
+ $this->futures[$future_key]->startFuture();
+ }
+
+ private function moveFutureToDone($future_key) {
+ $this->key = $future_key;
+ unset($this->work[$future_key]);
+
+ // Before we return, do another working set update so we start any
+ // futures that are ready to go as soon as we can.
+
+ $this->updateWorkingSet();
+
+ $this->futures[$future_key]->endFuture();
+ }
/**
* Wait for activity on one of several sockets.
diff --git a/src/future/exec/__tests__/ExecFutureTestCase.php b/src/future/exec/__tests__/ExecFutureTestCase.php
--- a/src/future/exec/__tests__/ExecFutureTestCase.php
+++ b/src/future/exec/__tests__/ExecFutureTestCase.php
@@ -61,7 +61,7 @@
// NOTE: This tests interactions between the resolve() timeout and the
// resolution timeout, which are somewhat similar but not identical.
- $future = $this->newSleep(32000)->start();
+ $future = $this->newSleep(32000);
$future->setTimeout(32000);
// We expect this to return in 0.01s.
@@ -77,7 +77,7 @@
// do this, we'll hang when exiting until our subprocess exits (32000
// seconds!)
$future->setTimeout(0.01);
- $future->resolve();
+ $iterator->resolveAll();
}
public function testTerminateWithoutStart() {
diff --git a/src/lint/linter/ArcanistFutureLinter.php b/src/lint/linter/ArcanistFutureLinter.php
--- a/src/lint/linter/ArcanistFutureLinter.php
+++ b/src/lint/linter/ArcanistFutureLinter.php
@@ -15,7 +15,8 @@
$limit = $this->getFuturesLimit();
$this->futures = id(new FutureIterator(array()))->limit($limit);
foreach ($this->buildFutures($paths) as $path => $future) {
- $this->futures->addFuture($future, $path);
+ $future->setFutureKey($path);
+ $this->futures->addFuture($future);
}
}

File Metadata

Mime Type
text/plain
Expires
Wed, Apr 2, 5:21 AM (3 d, 11 h ago)
Storage Engine
blob
Storage Format
Encrypted (AES-256-CBC)
Storage Handle
7654416
Default Alt Text
D21036.id.diff (15 KB)

Event Timeline