diff --git a/src/future/FutureIterator.php b/src/future/FutureIterator.php index 028c02da..96a41e3d 100644 --- a/src/future/FutureIterator.php +++ b/src/future/FutureIterator.php @@ -1,463 +1,470 @@ new ExecFuture('wc -c a.txt'), * 'b.txt' => new ExecFuture('wc -c b.txt'), * 'c.txt' => new ExecFuture('wc -c c.txt'), * ); * * foreach (new FutureIterator($futures) as $key => $future) { * // IMPORTANT: keys are preserved but the order of elements is not. This * // construct iterates over the futures in the order they resolve, so the * // fastest future is the one you'll get first. This allows you to start * // doing followup processing as soon as possible. * * list($err, $stdout) = $future->resolve(); * do_some_processing($stdout); * } * * For a general overview of futures, see @{article:Using Futures}. * * @task basics Basics * @task config Configuring Iteration * @task iterator Iterator Interface * @task internal Internals */ final class FutureIterator extends Phobject implements Iterator { private $hold = array(); private $wait = array(); private $work = array(); private $futures = array(); private $key; private $limit; private $timeout; private $isTimeout = false; private $hasRewound = false; /* -( Basics )------------------------------------------------------------- */ /** * Create a new iterator over a list of futures. * * @param list List of @{class:Future}s to resolve. * @task basics */ public function __construct(array $futures) { assert_instances_of($futures, 'Future'); foreach ($futures as $map_key => $future) { $future->setFutureKey($map_key); $this->addFuture($future); } } /** * Block until all futures resolve. * * @return void * @task basics */ public function resolveAll() { // If a caller breaks out of a "foreach" and then calls "resolveAll()", // interpret it to mean that we should iterate over whatever futures // remain. if ($this->hasRewound) { while ($this->valid()) { $this->next(); } } else { iterator_to_array($this); } } /** * Add another future to the set of futures. This is useful if you have a * set of futures to run mostly in parallel, but some futures depend on * others. * * @param Future @{class:Future} to add to iterator * @task basics */ public function addFuture(Future $future) { $key = $future->getFutureKey(); if (isset($this->futures[$key])) { throw new Exception( pht( 'This future graph already has a future with key "%s". Each '. 'future must have a unique key.', $key)); } $this->futures[$key] = $future; $this->hold[$key] = $key; return $this; } /* -( Configuring Iteration )---------------------------------------------- */ /** * Set a maximum amount of time you want to wait before the iterator will * yield a result. If no future has resolved yet, the iterator will yield * null for key and value. Among other potential uses, you can use this to * show some busy indicator: * * $futures = id(new FutureIterator($futures)) * ->setUpdateInterval(1); * foreach ($futures as $future) { * if ($future === null) { * echo "Still working...\n"; * } else { * // ... * } * } * * This will echo "Still working..." once per second as long as futures are * resolving. By default, FutureIterator never yields null. * * @param float Maximum number of seconds to block waiting on futures before * yielding null. * @return this * * @task config */ public function setUpdateInterval($interval) { $this->timeout = $interval; return $this; } /** * Limit the number of simultaneously executing futures. * * $futures = id(new FutureIterator($futures)) * ->limit(4); * foreach ($futures as $future) { * // Run no more than 4 futures simultaneously. * } * * @param int Maximum number of simultaneous jobs allowed. * @return this * * @task config */ public function limit($max) { $this->limit = $max; return $this; } public function setMaximumWorkingSetSize($limit) { $this->limit = $limit; return $this; } public function getMaximumWorkingSetSize() { return $this->limit; } /* -( Iterator Interface )------------------------------------------------- */ /** * @task iterator */ public function rewind() { if ($this->hasRewound) { throw new Exception( pht('Future graphs can not be rewound.')); } $this->hasRewound = true; $this->next(); } /** * @task iterator */ public function next() { - $this->key = null; + // See T13572. If we preivously resolved and returned a Future, release + // it now. This prevents us from holding Futures indefinitely when callers + // like FuturePool build long-lived iterators and keep adding new Futures + // to them. + if ($this->key !== null) { + unset($this->futures[$this->key]); + $this->key = null; + } $this->updateWorkingSet(); if (!$this->work) { return; } $start = microtime(true); $timeout = $this->timeout; $this->isTimeout = false; $working_set = array_select_keys($this->futures, $this->work); while (true) { // Update every future first. This is a no-op on futures which have // already resolved or failed, but we want to give futures an // opportunity to make progress even if we can resolve something. foreach ($working_set as $future_key => $future) { $future->updateFuture(); } // Check if any future has resolved or failed. If we have any such // futures, we'll return the first one from the iterator. $resolve_key = null; foreach ($working_set as $future_key => $future) { if ($future->canResolve()) { $resolve_key = $future_key; break; } } // We've found a future to resolve, so we're done here for now. if ($resolve_key !== null) { $this->moveFutureToDone($resolve_key); return; } // We don't have any futures to resolve yet. Check if we're reached // an update interval. $wait_time = 1; if ($timeout !== null) { $elapsed = microtime(true) - $start; if ($elapsed > $timeout) { $this->isTimeout = true; return; } $wait_time = min($wait_time, $timeout - $elapsed); } // We're going to wait. If possible, we'd like to wait with sockets. // If we can't, we'll just sleep. $read_sockets = array(); $write_sockets = array(); foreach ($working_set as $future_key => $future) { $sockets = $future->getReadSockets(); foreach ($sockets as $socket) { $read_sockets[] = $socket; } $sockets = $future->getWriteSockets(); foreach ($sockets as $socket) { $write_sockets[] = $socket; } } $use_sockets = ($read_sockets || $write_sockets); if ($use_sockets) { foreach ($working_set as $future) { $wait_time = min($wait_time, $future->getDefaultWait()); } $this->waitForSockets($read_sockets, $write_sockets, $wait_time); } else { usleep(1000); } } } /** * @task iterator */ public function current() { if ($this->isTimeout) { return null; } return $this->futures[$this->key]; } /** * @task iterator */ public function key() { if ($this->isTimeout) { return null; } return $this->key; } /** * @task iterator */ public function valid() { if ($this->isTimeout) { return true; } return ($this->key !== null); } /* -( Internals )---------------------------------------------------------- */ /** * @task internal */ protected function updateWorkingSet() { $limit = $this->getMaximumWorkingSetSize(); $work_count = count($this->work); // If we're already working on the maximum number of futures, we just have // to wait for something to resolve. There's no benefit to updating the // queue since we can never make any meaningful progress. if ($limit) { if ($work_count >= $limit) { return; } } // If any futures that are currently held are no longer blocked by // dependencies, move them from "hold" to "wait". foreach ($this->hold as $future_key) { if (!$this->canMoveFutureToWait($future_key)) { continue; } $this->moveFutureToWait($future_key); } $wait_count = count($this->wait); $hold_count = count($this->hold); if (!$work_count && !$wait_count && $hold_count) { throw new Exception( pht( 'Future graph is stalled: some futures are held, but no futures '. 'are waiting or working. The graph can never resolve.')); } // Figure out how many futures we can start. If we don't have a limit, // we can start every waiting future. If we do have a limit, we can only // start as many futures as we have slots for. if ($limit) { $work_limit = min($limit, $wait_count); } else { $work_limit = $wait_count; } // If we're ready to start futures, start them now. if ($work_limit) { foreach ($this->wait as $future_key) { $this->moveFutureToWork($future_key); $work_limit--; if (!$work_limit) { return; } } } } private function canMoveFutureToWait($future_key) { return true; } private function moveFutureToWait($future_key) { unset($this->hold[$future_key]); $this->wait[$future_key] = $future_key; } private function moveFutureToWork($future_key) { unset($this->wait[$future_key]); $this->work[$future_key] = $future_key; $future = $this->futures[$future_key]; if (!$future->getHasFutureStarted()) { $future ->setRaiseExceptionOnStart(false) ->start(); } } private function moveFutureToDone($future_key) { $this->key = $future_key; unset($this->work[$future_key]); // Before we return, do another working set update so we start any // futures that are ready to go as soon as we can. $this->updateWorkingSet(); } /** * Wait for activity on one of several sockets. * * @param list List of sockets expected to become readable. * @param list List of sockets expected to become writable. * @param float Timeout, in seconds. * @return void */ private function waitForSockets( array $read_list, array $write_list, $timeout = 1.0) { static $handler_installed = false; if (!$handler_installed) { // If we're spawning child processes, we need to install a signal handler // here to catch cases like execing '(sleep 60 &) &' where the child // exits but a socket is kept open. But we don't actually need to do // anything because the SIGCHLD will interrupt the stream_select(), as // long as we have a handler registered. if (function_exists('pcntl_signal')) { if (!pcntl_signal(SIGCHLD, array(__CLASS__, 'handleSIGCHLD'))) { throw new Exception(pht('Failed to install signal handler!')); } } $handler_installed = true; } $timeout_sec = (int)$timeout; $timeout_usec = (int)(1000000 * ($timeout - $timeout_sec)); $exceptfds = array(); $ok = @stream_select( $read_list, $write_list, $exceptfds, $timeout_sec, $timeout_usec); if ($ok === false) { // Hopefully, means we received a SIGCHLD. In the worst case, we degrade // to a busy wait. } } public static function handleSIGCHLD($signo) { // This function is a dummy, we just need to have some handler registered // so that PHP will get interrupted during "stream_select()". If we don't // register a handler, "stream_select()" won't fail. } }