diff --git a/src/future/Future.php b/src/future/Future.php --- a/src/future/Future.php +++ b/src/future/Future.php @@ -8,8 +8,11 @@ abstract class Future extends Phobject { private $hasResult = false; + private $hasStarted = false; + private $hasEnded = false; private $result; private $exception; + private $futureKey; /** * Is this future's process complete? Specifically, can this future be @@ -36,16 +39,28 @@ 'timeout.')); } + if ($this->hasException()) { + throw $this->getException(); + } + if (!$this->hasResult()) { $graph = new FutureIterator(array($this)); $graph->resolveAll(); } - if ($this->hasException()) { - throw $this->getException(); + return $this->getResult(); + } + + final public function startFuture() { + if ($this->hasStarted) { + throw new Exception( + pht( + 'Future has already started; futures can not start more '. + 'than once.')); } + $this->hasStarted = true; - return $this->getResult(); + $this->isReady(); } final public function updateFuture() { @@ -66,11 +81,28 @@ } } + final public function endFuture() { + if (!$this->hasException() && !$this->hasResult()) { + throw new Exception( + pht( + 'Trying to end a future which has no exception and no result. '. + 'Futures must resolve before they can be ended.')); + } + + if ($this->hasEnded) { + throw new Exception( + pht( + 'Future has already ended; futures can not end more '. + 'than once.')); + } + $this->hasEnded = true; + } + /** * Retrieve a list of sockets which we can wait to become readable while * a future is resolving. If your future has sockets which can be * `select()`ed, return them here (or in @{method:getWriteSockets}) to make - * the resolve loop do a `select()`. If you do not return sockets in either + * the resolve loop do a `select()`. If you do not return sockets in either * case, you'll get a busy wait. * * @return list A list of sockets which we expect to become readable. @@ -153,5 +185,27 @@ return ($this->exception !== null); } + final public function setFutureKey($key) { + if ($this->futureKey !== null) { + throw new Exception( + pht( + 'Future already has a key ("%s") assigned.', + $key)); + } + + $this->futureKey = $key; + + return $this; + } + + final public function getFutureKey() { + static $next_key = 1; + + if ($this->futureKey === null) { + $this->futureKey = sprintf('Future/%d', $next_key++); + } + + return $this->futureKey; + } } diff --git a/src/future/FutureIterator.php b/src/future/FutureIterator.php --- a/src/future/FutureIterator.php +++ b/src/future/FutureIterator.php @@ -28,17 +28,22 @@ * @task iterator Iterator Interface * @task internal Internals */ -final class FutureIterator extends Phobject implements Iterator { +final class FutureIterator + extends Phobject + implements Iterator { - protected $wait = array(); - protected $work = array(); - protected $futures = array(); - protected $key; + private $hold = array(); + private $wait = array(); + private $work = array(); - protected $limit; + private $futures = array(); + private $key; - protected $timeout; - protected $isTimeout = false; + private $limit; + + private $timeout; + private $isTimeout = false; + private $hasRewound = false; /* -( Basics )------------------------------------------------------------- */ @@ -52,7 +57,15 @@ */ public function __construct(array $futures) { assert_instances_of($futures, 'Future'); - $this->futures = $futures; + + $respect_keys = !phutil_is_natural_list($futures); + + foreach ($futures as $map_key => $future) { + if ($respect_keys) { + $future->setFutureKey($map_key); + } + $this->addFuture($future); + } } @@ -63,7 +76,17 @@ * @task basics */ public function resolveAll() { - iterator_to_array($this); + // If a caller breaks out of a "foreach" and then calls "resolveAll()", + // interpret it to mean that we should iterate over whatever futures + // remain. + + if ($this->hasRewound) { + while ($this->valid()) { + $this->next(); + } + } else { + iterator_to_array($this); + } } /** @@ -74,24 +97,20 @@ * @param Future @{class:Future} to add to iterator * @task basics */ - public function addFuture(Future $future, $key = null) { - if ($key === null) { - $this->futures[] = $future; - $this->wait[] = last_key($this->futures); - } else if (!isset($this->futures[$key])) { - $this->futures[$key] = $future; - $this->wait[] = $key; - } else { - throw new Exception(pht('Invalid key %s', $key)); + public function addFuture(Future $future) { + $key = $future->getFutureKey(); + + if (isset($this->futures[$key])) { + throw new Exception( + pht( + 'This future graph already has a future with key "%s". Each '. + 'future must have a unique key.', + $key)); } - // Start running the future if we don't have $this->limit futures running - // already. updateWorkingSet() won't start running the future if there's no - // limit, so we'll manually poke it here in that case. - $this->updateWorkingSet(); - if (!$this->limit) { - $future->isReady(); - } + $this->futures[$key] = $future; + $this->hold[$key] = $key; + return $this; } @@ -150,6 +169,15 @@ } + public function setMaximumWorkingSetSize($limit) { + $this->limit = $limit; + return $this; + } + + public function getMaximumWorkingSetSize() { + return $this->limit; + } + /* -( Iterator Interface )------------------------------------------------- */ @@ -157,9 +185,12 @@ * @task iterator */ public function rewind() { - $this->wait = array_keys($this->futures); - $this->work = null; - $this->updateWorkingSet(); + if ($this->hasRewound) { + throw new Exception( + pht('Future graphs can not be rewound.')); + } + $this->hasRewound = true; + $this->next(); } @@ -168,95 +199,93 @@ */ public function next() { $this->key = null; - if (!count($this->wait)) { + + $this->updateWorkingSet(); + + if (!$this->work) { return; } - $read_sockets = array(); - $write_sockets = array(); - $start = microtime(true); $timeout = $this->timeout; $this->isTimeout = false; - $check = $this->getWorkingSet(); + $working_set = array_select_keys($this->futures, $this->work); - $resolve = null; - do { - $read_sockets = array(); - $write_sockets = array(); - $can_use_sockets = true; - $wait_time = 1; - foreach ($check as $wait => $key) { - $future = $this->futures[$key]; + while (true) { + // Update every future first. This is a no-op on futures which have + // already resolved or failed, but we want to give futures an + // opportunity to make progress even if we can resolve something. + foreach ($working_set as $future_key => $future) { $future->updateFuture(); + } + + // Check if any future has resolved or failed. If we have any such + // futures, we'll return the first one from the iterator. + $resolve_key = null; + foreach ($working_set as $future_key => $future) { if ($future->hasException()) { - if ($resolve === null) { - $resolve = $wait; - } - continue; + $resolve_key = $future_key; + break; } if ($future->hasResult()) { - if ($resolve === null) { - $resolve = $wait; - } - continue; + $resolve_key = $future_key; + break; } + } - $got_sockets = false; - $socks = $future->getReadSockets(); - if ($socks) { - $got_sockets = true; - foreach ($socks as $socket) { - $read_sockets[] = $socket; - } - } + // We've found a future to resolve, so we're done here for now. - $socks = $future->getWriteSockets(); - if ($socks) { - $got_sockets = true; - foreach ($socks as $socket) { - $write_sockets[] = $socket; - } - } + if ($resolve_key !== null) { + $this->moveFutureToDone($resolve_key); + return; + } - // If any currently active future had neither read nor write sockets, - // we can't wait for the current batch of items using sockets. - if (!$got_sockets) { - $can_use_sockets = false; - } else { - $wait_time = min($wait_time, $future->getDefaultWait()); + // We don't have any futures to resolve yet. Check if we're reached + // an update interval. + + $wait_time = 1; + if ($timeout !== null) { + $elapsed = microtime(true) - $start; + + if ($elapsed > $timeout) { + $this->isTimeout = true; + return; } + + $wait_time = min($wait_time, $timeout - $elapsed); } - if ($resolve === null) { - - // Check for a setUpdateInterval() timeout. - if ($timeout !== null) { - $elapsed = microtime(true) - $start; - if ($elapsed > $timeout) { - $this->isTimeout = true; - return; - } else { - $wait_time = $timeout - $elapsed; - } + // We're going to wait. If possible, we'd like to wait with sockets. + // If we can't, we'll just sleep. + + $read_sockets = array(); + $write_sockets = array(); + foreach ($working_set as $future_key => $future) { + $sockets = $future->getReadSockets(); + foreach ($sockets as $socket) { + $read_sockets[] = $socket; } - if ($can_use_sockets) { - $this->waitForSockets($read_sockets, $write_sockets, $wait_time); - } else { - usleep(1000); + $sockets = $future->getWriteSockets(); + foreach ($sockets as $socket) { + $write_sockets[] = $socket; } } - } while ($resolve === null); - - $this->key = $this->wait[$resolve]; - unset($this->wait[$resolve]); - $this->updateWorkingSet(); + $use_sockets = ($read_sockets || $write_sockets); + if ($use_sockets) { + foreach ($working_set as $future) { + $wait_time = min($wait_time, $future->getDefaultWait()); + } + $this->waitForSockets($read_sockets, $write_sockets, $wait_time); + } else { + usleep(1000); + } + } } /** @@ -292,39 +321,96 @@ /* -( Internals )---------------------------------------------------------- */ - /** * @task internal */ - protected function getWorkingSet() { - if ($this->work === null) { - return $this->wait; + protected function updateWorkingSet() { + $limit = $this->getMaximumWorkingSetSize(); + $work_count = count($this->work); + + // If we're already working on the maximum number of futures, we just have + // to wait for something to resolve. There's no benefit to updating the + // queue since we can never make any meaningful progress. + + if ($limit) { + if ($work_count >= $limit) { + return; + } } - return $this->work; - } + // If any futures that are currently held are no longer blocked by + // dependencies, move them from "hold" to "wait". - /** - * @task internal - */ - protected function updateWorkingSet() { - if (!$this->limit) { - return; + foreach ($this->hold as $future_key) { + if (!$this->canMoveFutureToWait($future_key)) { + continue; + } + + $this->moveFutureToWait($future_key); + } + + $wait_count = count($this->wait); + $hold_count = count($this->hold); + + if (!$work_count && !$wait_count && $hold_count) { + throw new Exception( + pht( + 'Future graph is stalled: some futures are held, but no futures '. + 'are waiting or working. The graph can never resolve.')); } - $old = $this->work; - $this->work = array_slice($this->wait, 0, $this->limit, true); + // Figure out how many futures we can start. If we don't have a limit, + // we can start every waiting future. If we do have a limit, we can only + // start as many futures as we have slots for. - // If we're using a limit, our futures are sleeping and need to be polled - // to begin execution, so poll any futures which weren't in our working set - // before. - foreach ($this->work as $work => $key) { - if (!isset($old[$work])) { - $this->futures[$key]->isReady(); + if ($limit) { + $work_limit = min($limit, $wait_count); + } else { + $work_limit = $wait_count; + } + + // If we're ready to start futures, start them now. + + if ($work_limit) { + foreach ($this->wait as $future_key) { + $this->moveFutureToWork($future_key); + + $work_limit--; + if (!$work_limit) { + return; + } } } + } + private function canMoveFutureToWait($future_key) { + return true; + } + + private function moveFutureToWait($future_key) { + unset($this->hold[$future_key]); + $this->wait[$future_key] = $future_key; + } + + private function moveFutureToWork($future_key) { + unset($this->wait[$future_key]); + $this->work[$future_key] = $future_key; + + $this->futures[$future_key]->startFuture(); + } + + private function moveFutureToDone($future_key) { + $this->key = $future_key; + unset($this->work[$future_key]); + + // Before we return, do another working set update so we start any + // futures that are ready to go as soon as we can. + + $this->updateWorkingSet(); + + $this->futures[$future_key]->endFuture(); + } /** * Wait for activity on one of several sockets. diff --git a/src/future/exec/__tests__/ExecFutureTestCase.php b/src/future/exec/__tests__/ExecFutureTestCase.php --- a/src/future/exec/__tests__/ExecFutureTestCase.php +++ b/src/future/exec/__tests__/ExecFutureTestCase.php @@ -61,7 +61,7 @@ // NOTE: This tests interactions between the resolve() timeout and the // resolution timeout, which are somewhat similar but not identical. - $future = $this->newSleep(32000)->start(); + $future = $this->newSleep(32000); $future->setTimeout(32000); // We expect this to return in 0.01s. @@ -77,7 +77,7 @@ // do this, we'll hang when exiting until our subprocess exits (32000 // seconds!) $future->setTimeout(0.01); - $future->resolve(); + $iterator->resolveAll(); } public function testTerminateWithoutStart() { diff --git a/src/lint/linter/ArcanistFutureLinter.php b/src/lint/linter/ArcanistFutureLinter.php --- a/src/lint/linter/ArcanistFutureLinter.php +++ b/src/lint/linter/ArcanistFutureLinter.php @@ -15,7 +15,8 @@ $limit = $this->getFuturesLimit(); $this->futures = id(new FutureIterator(array()))->limit($limit); foreach ($this->buildFutures($paths) as $path => $future) { - $this->futures->addFuture($future, $path); + $future->setFutureKey($path); + $this->futures->addFuture($future); } }