diff --git a/src/future/Future.php b/src/future/Future.php index f58a5287..f69f14bc 100644 --- a/src/future/Future.php +++ b/src/future/Future.php @@ -1,157 +1,211 @@ resolve()" is no longer '. 'supported. Update the caller so it no longer passes a '. 'timeout.')); } + if ($this->hasException()) { + throw $this->getException(); + } + if (!$this->hasResult()) { $graph = new FutureIterator(array($this)); $graph->resolveAll(); } - if ($this->hasException()) { - throw $this->getException(); + return $this->getResult(); + } + + final public function startFuture() { + if ($this->hasStarted) { + throw new Exception( + pht( + 'Future has already started; futures can not start more '. + 'than once.')); } + $this->hasStarted = true; - return $this->getResult(); + $this->isReady(); } final public function updateFuture() { if ($this->hasException()) { return; } if ($this->hasResult()) { return; } try { $this->isReady(); } catch (Exception $ex) { $this->setException($ex); } catch (Throwable $ex) { $this->setException($ex); } } + final public function endFuture() { + if (!$this->hasException() && !$this->hasResult()) { + throw new Exception( + pht( + 'Trying to end a future which has no exception and no result. '. + 'Futures must resolve before they can be ended.')); + } + + if ($this->hasEnded) { + throw new Exception( + pht( + 'Future has already ended; futures can not end more '. + 'than once.')); + } + $this->hasEnded = true; + } + /** * Retrieve a list of sockets which we can wait to become readable while * a future is resolving. If your future has sockets which can be * `select()`ed, return them here (or in @{method:getWriteSockets}) to make - * the resolve loop do a `select()`. If you do not return sockets in either + * the resolve loop do a `select()`. If you do not return sockets in either * case, you'll get a busy wait. * * @return list A list of sockets which we expect to become readable. */ public function getReadSockets() { return array(); } /** * Retrieve a list of sockets which we can wait to become writable while a * future is resolving. See @{method:getReadSockets}. * * @return list A list of sockets which we expect to become writable. */ public function getWriteSockets() { return array(); } /** * Default amount of time to wait on stream select for this future. Normally * 1 second is fine, but if the future has a timeout sooner than that it * should return the amount of time left before the timeout. */ public function getDefaultWait() { return 1; } public function start() { $this->isReady(); return $this; } /** * Retrieve the final result of the future. * * @return wild Final resolution of this future. */ final protected function getResult() { if (!$this->hasResult()) { throw new Exception( pht( 'Future has not yet resolved. Resolve futures before retrieving '. 'results.')); } return $this->result; } final protected function setResult($result) { if ($this->hasResult()) { throw new Exception( pht( 'Future has already resolved. Futures may not resolve more than '. 'once.')); } $this->hasResult = true; $this->result = $result; return $this; } final public function hasResult() { return $this->hasResult; } final private function setException($exception) { // NOTE: The parameter may be an Exception or a Throwable. $this->exception = $exception; return $this; } final private function getException() { return $this->exception; } final public function hasException() { return ($this->exception !== null); } + final public function setFutureKey($key) { + if ($this->futureKey !== null) { + throw new Exception( + pht( + 'Future already has a key ("%s") assigned.', + $key)); + } + + $this->futureKey = $key; + + return $this; + } + + final public function getFutureKey() { + static $next_key = 1; + + if ($this->futureKey === null) { + $this->futureKey = sprintf('Future/%d', $next_key++); + } + + return $this->futureKey; + } } diff --git a/src/future/FutureIterator.php b/src/future/FutureIterator.php index c8b01294..772fdc77 100644 --- a/src/future/FutureIterator.php +++ b/src/future/FutureIterator.php @@ -1,382 +1,468 @@ new ExecFuture('wc -c a.txt'), * 'b.txt' => new ExecFuture('wc -c b.txt'), * 'c.txt' => new ExecFuture('wc -c c.txt'), * ); * * foreach (new FutureIterator($futures) as $key => $future) { * // IMPORTANT: keys are preserved but the order of elements is not. This * // construct iterates over the futures in the order they resolve, so the * // fastest future is the one you'll get first. This allows you to start * // doing followup processing as soon as possible. * * list($err, $stdout) = $future->resolve(); * do_some_processing($stdout); * } * * For a general overview of futures, see @{article:Using Futures}. * * @task basics Basics * @task config Configuring Iteration * @task iterator Iterator Interface * @task internal Internals */ -final class FutureIterator extends Phobject implements Iterator { +final class FutureIterator + extends Phobject + implements Iterator { - protected $wait = array(); - protected $work = array(); - protected $futures = array(); - protected $key; + private $hold = array(); + private $wait = array(); + private $work = array(); - protected $limit; + private $futures = array(); + private $key; - protected $timeout; - protected $isTimeout = false; + private $limit; + + private $timeout; + private $isTimeout = false; + private $hasRewound = false; /* -( Basics )------------------------------------------------------------- */ /** * Create a new iterator over a list of futures. * * @param list List of @{class:Future}s to resolve. * @task basics */ public function __construct(array $futures) { assert_instances_of($futures, 'Future'); - $this->futures = $futures; + + $respect_keys = !phutil_is_natural_list($futures); + + foreach ($futures as $map_key => $future) { + if ($respect_keys) { + $future->setFutureKey($map_key); + } + $this->addFuture($future); + } } /** * Block until all futures resolve. * * @return void * @task basics */ public function resolveAll() { - iterator_to_array($this); + // If a caller breaks out of a "foreach" and then calls "resolveAll()", + // interpret it to mean that we should iterate over whatever futures + // remain. + + if ($this->hasRewound) { + while ($this->valid()) { + $this->next(); + } + } else { + iterator_to_array($this); + } } /** * Add another future to the set of futures. This is useful if you have a * set of futures to run mostly in parallel, but some futures depend on * others. * * @param Future @{class:Future} to add to iterator * @task basics */ - public function addFuture(Future $future, $key = null) { - if ($key === null) { - $this->futures[] = $future; - $this->wait[] = last_key($this->futures); - } else if (!isset($this->futures[$key])) { - $this->futures[$key] = $future; - $this->wait[] = $key; - } else { - throw new Exception(pht('Invalid key %s', $key)); + public function addFuture(Future $future) { + $key = $future->getFutureKey(); + + if (isset($this->futures[$key])) { + throw new Exception( + pht( + 'This future graph already has a future with key "%s". Each '. + 'future must have a unique key.', + $key)); } - // Start running the future if we don't have $this->limit futures running - // already. updateWorkingSet() won't start running the future if there's no - // limit, so we'll manually poke it here in that case. - $this->updateWorkingSet(); - if (!$this->limit) { - $future->isReady(); - } + $this->futures[$key] = $future; + $this->hold[$key] = $key; + return $this; } /* -( Configuring Iteration )---------------------------------------------- */ /** * Set a maximum amount of time you want to wait before the iterator will * yield a result. If no future has resolved yet, the iterator will yield * null for key and value. Among other potential uses, you can use this to * show some busy indicator: * * $futures = id(new FutureIterator($futures)) * ->setUpdateInterval(1); * foreach ($futures as $future) { * if ($future === null) { * echo "Still working...\n"; * } else { * // ... * } * } * * This will echo "Still working..." once per second as long as futures are * resolving. By default, FutureIterator never yields null. * * @param float Maximum number of seconds to block waiting on futures before * yielding null. * @return this * * @task config */ public function setUpdateInterval($interval) { $this->timeout = $interval; return $this; } /** * Limit the number of simultaneously executing futures. * * $futures = id(new FutureIterator($futures)) * ->limit(4); * foreach ($futures as $future) { * // Run no more than 4 futures simultaneously. * } * * @param int Maximum number of simultaneous jobs allowed. * @return this * * @task config */ public function limit($max) { $this->limit = $max; return $this; } + public function setMaximumWorkingSetSize($limit) { + $this->limit = $limit; + return $this; + } + + public function getMaximumWorkingSetSize() { + return $this->limit; + } + /* -( Iterator Interface )------------------------------------------------- */ /** * @task iterator */ public function rewind() { - $this->wait = array_keys($this->futures); - $this->work = null; - $this->updateWorkingSet(); + if ($this->hasRewound) { + throw new Exception( + pht('Future graphs can not be rewound.')); + } + $this->hasRewound = true; + $this->next(); } /** * @task iterator */ public function next() { $this->key = null; - if (!count($this->wait)) { + + $this->updateWorkingSet(); + + if (!$this->work) { return; } - $read_sockets = array(); - $write_sockets = array(); - $start = microtime(true); $timeout = $this->timeout; $this->isTimeout = false; - $check = $this->getWorkingSet(); + $working_set = array_select_keys($this->futures, $this->work); - $resolve = null; - do { - $read_sockets = array(); - $write_sockets = array(); - $can_use_sockets = true; - $wait_time = 1; - foreach ($check as $wait => $key) { - $future = $this->futures[$key]; + while (true) { + // Update every future first. This is a no-op on futures which have + // already resolved or failed, but we want to give futures an + // opportunity to make progress even if we can resolve something. + foreach ($working_set as $future_key => $future) { $future->updateFuture(); + } + + // Check if any future has resolved or failed. If we have any such + // futures, we'll return the first one from the iterator. + $resolve_key = null; + foreach ($working_set as $future_key => $future) { if ($future->hasException()) { - if ($resolve === null) { - $resolve = $wait; - } - continue; + $resolve_key = $future_key; + break; } if ($future->hasResult()) { - if ($resolve === null) { - $resolve = $wait; - } - continue; + $resolve_key = $future_key; + break; } + } - $got_sockets = false; - $socks = $future->getReadSockets(); - if ($socks) { - $got_sockets = true; - foreach ($socks as $socket) { - $read_sockets[] = $socket; - } - } + // We've found a future to resolve, so we're done here for now. - $socks = $future->getWriteSockets(); - if ($socks) { - $got_sockets = true; - foreach ($socks as $socket) { - $write_sockets[] = $socket; - } - } + if ($resolve_key !== null) { + $this->moveFutureToDone($resolve_key); + return; + } - // If any currently active future had neither read nor write sockets, - // we can't wait for the current batch of items using sockets. - if (!$got_sockets) { - $can_use_sockets = false; - } else { - $wait_time = min($wait_time, $future->getDefaultWait()); + // We don't have any futures to resolve yet. Check if we're reached + // an update interval. + + $wait_time = 1; + if ($timeout !== null) { + $elapsed = microtime(true) - $start; + + if ($elapsed > $timeout) { + $this->isTimeout = true; + return; } + + $wait_time = min($wait_time, $timeout - $elapsed); } - if ($resolve === null) { - - // Check for a setUpdateInterval() timeout. - if ($timeout !== null) { - $elapsed = microtime(true) - $start; - if ($elapsed > $timeout) { - $this->isTimeout = true; - return; - } else { - $wait_time = $timeout - $elapsed; - } + // We're going to wait. If possible, we'd like to wait with sockets. + // If we can't, we'll just sleep. + + $read_sockets = array(); + $write_sockets = array(); + foreach ($working_set as $future_key => $future) { + $sockets = $future->getReadSockets(); + foreach ($sockets as $socket) { + $read_sockets[] = $socket; } - if ($can_use_sockets) { - $this->waitForSockets($read_sockets, $write_sockets, $wait_time); - } else { - usleep(1000); + $sockets = $future->getWriteSockets(); + foreach ($sockets as $socket) { + $write_sockets[] = $socket; } } - } while ($resolve === null); - - $this->key = $this->wait[$resolve]; - unset($this->wait[$resolve]); - $this->updateWorkingSet(); + $use_sockets = ($read_sockets || $write_sockets); + if ($use_sockets) { + foreach ($working_set as $future) { + $wait_time = min($wait_time, $future->getDefaultWait()); + } + $this->waitForSockets($read_sockets, $write_sockets, $wait_time); + } else { + usleep(1000); + } + } } /** * @task iterator */ public function current() { if ($this->isTimeout) { return null; } return $this->futures[$this->key]; } /** * @task iterator */ public function key() { if ($this->isTimeout) { return null; } return $this->key; } /** * @task iterator */ public function valid() { if ($this->isTimeout) { return true; } return ($this->key !== null); } /* -( Internals )---------------------------------------------------------- */ - /** * @task internal */ - protected function getWorkingSet() { - if ($this->work === null) { - return $this->wait; + protected function updateWorkingSet() { + $limit = $this->getMaximumWorkingSetSize(); + $work_count = count($this->work); + + // If we're already working on the maximum number of futures, we just have + // to wait for something to resolve. There's no benefit to updating the + // queue since we can never make any meaningful progress. + + if ($limit) { + if ($work_count >= $limit) { + return; + } } - return $this->work; - } + // If any futures that are currently held are no longer blocked by + // dependencies, move them from "hold" to "wait". - /** - * @task internal - */ - protected function updateWorkingSet() { - if (!$this->limit) { - return; + foreach ($this->hold as $future_key) { + if (!$this->canMoveFutureToWait($future_key)) { + continue; + } + + $this->moveFutureToWait($future_key); + } + + $wait_count = count($this->wait); + $hold_count = count($this->hold); + + if (!$work_count && !$wait_count && $hold_count) { + throw new Exception( + pht( + 'Future graph is stalled: some futures are held, but no futures '. + 'are waiting or working. The graph can never resolve.')); } - $old = $this->work; - $this->work = array_slice($this->wait, 0, $this->limit, true); + // Figure out how many futures we can start. If we don't have a limit, + // we can start every waiting future. If we do have a limit, we can only + // start as many futures as we have slots for. - // If we're using a limit, our futures are sleeping and need to be polled - // to begin execution, so poll any futures which weren't in our working set - // before. - foreach ($this->work as $work => $key) { - if (!isset($old[$work])) { - $this->futures[$key]->isReady(); + if ($limit) { + $work_limit = min($limit, $wait_count); + } else { + $work_limit = $wait_count; + } + + // If we're ready to start futures, start them now. + + if ($work_limit) { + foreach ($this->wait as $future_key) { + $this->moveFutureToWork($future_key); + + $work_limit--; + if (!$work_limit) { + return; + } } } + } + private function canMoveFutureToWait($future_key) { + return true; + } + + private function moveFutureToWait($future_key) { + unset($this->hold[$future_key]); + $this->wait[$future_key] = $future_key; + } + + private function moveFutureToWork($future_key) { + unset($this->wait[$future_key]); + $this->work[$future_key] = $future_key; + + $this->futures[$future_key]->startFuture(); + } + + private function moveFutureToDone($future_key) { + $this->key = $future_key; + unset($this->work[$future_key]); + + // Before we return, do another working set update so we start any + // futures that are ready to go as soon as we can. + + $this->updateWorkingSet(); + + $this->futures[$future_key]->endFuture(); + } /** * Wait for activity on one of several sockets. * * @param list List of sockets expected to become readable. * @param list List of sockets expected to become writable. * @param float Timeout, in seconds. * @return void */ private function waitForSockets( array $read_list, array $write_list, $timeout = 1.0) { static $handler_installed = false; if (!$handler_installed) { // If we're spawning child processes, we need to install a signal handler // here to catch cases like execing '(sleep 60 &) &' where the child // exits but a socket is kept open. But we don't actually need to do // anything because the SIGCHLD will interrupt the stream_select(), as // long as we have a handler registered. if (function_exists('pcntl_signal')) { if (!pcntl_signal(SIGCHLD, array(__CLASS__, 'handleSIGCHLD'))) { throw new Exception(pht('Failed to install signal handler!')); } } $handler_installed = true; } $timeout_sec = (int)$timeout; $timeout_usec = (int)(1000000 * ($timeout - $timeout_sec)); $exceptfds = array(); $ok = @stream_select( $read_list, $write_list, $exceptfds, $timeout_sec, $timeout_usec); if ($ok === false) { // Hopefully, means we received a SIGCHLD. In the worst case, we degrade // to a busy wait. } } public static function handleSIGCHLD($signo) { // This function is a dummy, we just need to have some handler registered // so that PHP will get interrupted during "stream_select()". If we don't // register a handler, "stream_select()" won't fail. } } diff --git a/src/future/exec/__tests__/ExecFutureTestCase.php b/src/future/exec/__tests__/ExecFutureTestCase.php index e10a188a..572c5692 100644 --- a/src/future/exec/__tests__/ExecFutureTestCase.php +++ b/src/future/exec/__tests__/ExecFutureTestCase.php @@ -1,173 +1,173 @@ newCat() ->write('') ->resolvex(); $this->assertEqual('', $stdout); } private function newCat() { $bin = $this->getSupportExecutable('cat'); return new ExecFuture('php -f %R', $bin); } private function newSleep($duration) { $bin = $this->getSupportExecutable('sleep'); return new ExecFuture('php -f %R -- %s', $bin, $duration); } public function testKeepPipe() { // NOTE: This is mostly testing the semantics of $keep_pipe in write(). list($stdout) = $this->newCat() ->write('', true) ->start() ->write('x', true) ->write('y', true) ->write('z', false) ->resolvex(); $this->assertEqual('xyz', $stdout); } public function testLargeBuffer() { // NOTE: This is mostly a coverage test to hit branches where we're still // flushing a buffer. $data = str_repeat('x', 1024 * 1024 * 4); list($stdout) = $this->newCat()->write($data)->resolvex(); $this->assertEqual($data, $stdout); } public function testBufferLimit() { $data = str_repeat('x', 1024 * 1024); list($stdout) = $this->newCat() ->setStdoutSizeLimit(1024) ->write($data) ->resolvex(); $this->assertEqual(substr($data, 0, 1024), $stdout); } public function testResolveTimeoutTestShouldRunLessThan1Sec() { // NOTE: This tests interactions between the resolve() timeout and the // resolution timeout, which are somewhat similar but not identical. - $future = $this->newSleep(32000)->start(); + $future = $this->newSleep(32000); $future->setTimeout(32000); // We expect this to return in 0.01s. $iterator = (new FutureIterator(array($future))) ->setUpdateInterval(0.01); foreach ($iterator as $resolved_result) { $result = $resolved_result; break; } $this->assertEqual($result, null); // We expect this to now force the time out / kill immediately. If we don't // do this, we'll hang when exiting until our subprocess exits (32000 // seconds!) $future->setTimeout(0.01); - $future->resolve(); + $iterator->resolveAll(); } public function testTerminateWithoutStart() { // We never start this future, but it should be fine to kill a future from // any state. $future = $this->newSleep(1); $future->resolveKill(); $this->assertTrue(true); } public function testTimeoutTestShouldRunLessThan1Sec() { // NOTE: This is partly testing that we choose appropriate select wait // times; this test should run for significantly less than 1 second. $future = $this->newSleep(32000); list($err) = $future->setTimeout(0.01)->resolve(); $this->assertTrue($err > 0); $this->assertTrue($future->getWasKilledByTimeout()); } public function testMultipleTimeoutsTestShouldRunLessThan1Sec() { $futures = array(); for ($ii = 0; $ii < 4; $ii++) { $futures[] = $this->newSleep(32000)->setTimeout(0.01); } foreach (new FutureIterator($futures) as $future) { list($err) = $future->resolve(); $this->assertTrue($err > 0); $this->assertTrue($future->getWasKilledByTimeout()); } } public function testMultipleResolves() { // It should be safe to call resolve(), resolvex(), resolveKill(), etc., // as many times as you want on the same process. $bin = $this->getSupportExecutable('echo'); $future = new ExecFuture('php -f %R -- quack', $bin); $future->resolve(); $future->resolvex(); list($err) = $future->resolveKill(); $this->assertEqual(0, $err); } public function testReadBuffering() { $str_len_8 = 'abcdefgh'; $str_len_4 = 'abcd'; // This is a write/read with no read buffer. $future = $this->newCat(); $future->write($str_len_8); do { $future->isReady(); list($read) = $future->read(); if (strlen($read)) { break; } } while (true); // We expect to get the entire string back in the read. $this->assertEqual($str_len_8, $read); $future->resolve(); // This is a write/read with a read buffer. $future = $this->newCat(); $future->write($str_len_8); // Set the read buffer size. $future->setReadBufferSize(4); do { $future->isReady(); list($read) = $future->read(); if (strlen($read)) { break; } } while (true); // We expect to get the entire string back in the read. $this->assertEqual($str_len_4, $read); // Remove the limit so we can resolve the future. $future->setReadBufferSize(null); $future->resolve(); } } diff --git a/src/lint/linter/ArcanistFutureLinter.php b/src/lint/linter/ArcanistFutureLinter.php index e9df8371..7fad5a55 100644 --- a/src/lint/linter/ArcanistFutureLinter.php +++ b/src/lint/linter/ArcanistFutureLinter.php @@ -1,56 +1,57 @@ getFuturesLimit(); $this->futures = id(new FutureIterator(array()))->limit($limit); foreach ($this->buildFutures($paths) as $path => $future) { - $this->futures->addFuture($future, $path); + $future->setFutureKey($path); + $this->futures->addFuture($future); } } final public function lintPath($path) { return; } public function didLintPaths(array $paths) { if (!$this->futures) { return; } $map = array(); foreach ($this->futures as $path => $future) { $this->setActivePath($path); $this->resolveFuture($path, $future); $map[$path] = $future; } $this->futures = array(); $this->didResolveLinterFutures($map); } /** * Hook for cleaning up resources. * * This is invoked after a block of futures resolve, and allows linters to * discard or clean up any shared resources they no longer need. * * @param map Map of paths to resolved futures. * @return void */ protected function didResolveLinterFutures(array $futures) { return; } }