diff --git a/src/future/Future.php b/src/future/Future.php index 08772279..f58a5287 100644 --- a/src/future/Future.php +++ b/src/future/Future.php @@ -1,132 +1,157 @@ resolve()" is no longer '. 'supported. Update the caller so it no longer passes a '. 'timeout.')); } - $graph = new FutureIterator(array($this)); - $graph->resolveAll(); + if (!$this->hasResult()) { + $graph = new FutureIterator(array($this)); + $graph->resolveAll(); + } - if ($this->exception) { - throw $this->exception; + if ($this->hasException()) { + throw $this->getException(); } return $this->getResult(); } - public function setException(Exception $ex) { - $this->exception = $ex; - return $this; - } + final public function updateFuture() { + if ($this->hasException()) { + return; + } - public function getException() { - return $this->exception; + if ($this->hasResult()) { + return; + } + + try { + $this->isReady(); + } catch (Exception $ex) { + $this->setException($ex); + } catch (Throwable $ex) { + $this->setException($ex); + } } /** * Retrieve a list of sockets which we can wait to become readable while * a future is resolving. If your future has sockets which can be * `select()`ed, return them here (or in @{method:getWriteSockets}) to make * the resolve loop do a `select()`. If you do not return sockets in either * case, you'll get a busy wait. * * @return list A list of sockets which we expect to become readable. */ public function getReadSockets() { return array(); } /** * Retrieve a list of sockets which we can wait to become writable while a * future is resolving. See @{method:getReadSockets}. * * @return list A list of sockets which we expect to become writable. */ public function getWriteSockets() { return array(); } /** * Default amount of time to wait on stream select for this future. Normally * 1 second is fine, but if the future has a timeout sooner than that it * should return the amount of time left before the timeout. */ public function getDefaultWait() { return 1; } public function start() { $this->isReady(); return $this; } /** * Retrieve the final result of the future. * * @return wild Final resolution of this future. */ final protected function getResult() { if (!$this->hasResult()) { throw new Exception( pht( 'Future has not yet resolved. Resolve futures before retrieving '. 'results.')); } return $this->result; } final protected function setResult($result) { if ($this->hasResult()) { throw new Exception( pht( 'Future has already resolved. Futures may not resolve more than '. 'once.')); } $this->hasResult = true; $this->result = $result; return $this; } - final protected function hasResult() { + final public function hasResult() { return $this->hasResult; } + final private function setException($exception) { + // NOTE: The parameter may be an Exception or a Throwable. + $this->exception = $exception; + return $this; + } + + final private function getException() { + return $this->exception; + } + + final public function hasException() { + return ($this->exception !== null); + } + + } diff --git a/src/future/FutureIterator.php b/src/future/FutureIterator.php index 933fe9d8..c8b01294 100644 --- a/src/future/FutureIterator.php +++ b/src/future/FutureIterator.php @@ -1,379 +1,382 @@ new ExecFuture('wc -c a.txt'), * 'b.txt' => new ExecFuture('wc -c b.txt'), * 'c.txt' => new ExecFuture('wc -c c.txt'), * ); * * foreach (new FutureIterator($futures) as $key => $future) { * // IMPORTANT: keys are preserved but the order of elements is not. This * // construct iterates over the futures in the order they resolve, so the * // fastest future is the one you'll get first. This allows you to start * // doing followup processing as soon as possible. * * list($err, $stdout) = $future->resolve(); * do_some_processing($stdout); * } * * For a general overview of futures, see @{article:Using Futures}. * * @task basics Basics * @task config Configuring Iteration * @task iterator Iterator Interface * @task internal Internals */ final class FutureIterator extends Phobject implements Iterator { protected $wait = array(); protected $work = array(); protected $futures = array(); protected $key; protected $limit; protected $timeout; protected $isTimeout = false; /* -( Basics )------------------------------------------------------------- */ /** * Create a new iterator over a list of futures. * * @param list List of @{class:Future}s to resolve. * @task basics */ public function __construct(array $futures) { assert_instances_of($futures, 'Future'); $this->futures = $futures; } /** * Block until all futures resolve. * * @return void * @task basics */ public function resolveAll() { iterator_to_array($this); } /** * Add another future to the set of futures. This is useful if you have a * set of futures to run mostly in parallel, but some futures depend on * others. * * @param Future @{class:Future} to add to iterator * @task basics */ public function addFuture(Future $future, $key = null) { if ($key === null) { $this->futures[] = $future; $this->wait[] = last_key($this->futures); } else if (!isset($this->futures[$key])) { $this->futures[$key] = $future; $this->wait[] = $key; } else { throw new Exception(pht('Invalid key %s', $key)); } // Start running the future if we don't have $this->limit futures running // already. updateWorkingSet() won't start running the future if there's no // limit, so we'll manually poke it here in that case. $this->updateWorkingSet(); if (!$this->limit) { $future->isReady(); } return $this; } /* -( Configuring Iteration )---------------------------------------------- */ /** * Set a maximum amount of time you want to wait before the iterator will * yield a result. If no future has resolved yet, the iterator will yield * null for key and value. Among other potential uses, you can use this to * show some busy indicator: * * $futures = id(new FutureIterator($futures)) * ->setUpdateInterval(1); * foreach ($futures as $future) { * if ($future === null) { * echo "Still working...\n"; * } else { * // ... * } * } * * This will echo "Still working..." once per second as long as futures are * resolving. By default, FutureIterator never yields null. * * @param float Maximum number of seconds to block waiting on futures before * yielding null. * @return this * * @task config */ public function setUpdateInterval($interval) { $this->timeout = $interval; return $this; } /** * Limit the number of simultaneously executing futures. * * $futures = id(new FutureIterator($futures)) * ->limit(4); * foreach ($futures as $future) { * // Run no more than 4 futures simultaneously. * } * * @param int Maximum number of simultaneous jobs allowed. * @return this * * @task config */ public function limit($max) { $this->limit = $max; return $this; } /* -( Iterator Interface )------------------------------------------------- */ /** * @task iterator */ public function rewind() { $this->wait = array_keys($this->futures); $this->work = null; $this->updateWorkingSet(); $this->next(); } /** * @task iterator */ public function next() { $this->key = null; if (!count($this->wait)) { return; } $read_sockets = array(); $write_sockets = array(); $start = microtime(true); $timeout = $this->timeout; $this->isTimeout = false; $check = $this->getWorkingSet(); + $resolve = null; do { $read_sockets = array(); $write_sockets = array(); $can_use_sockets = true; $wait_time = 1; foreach ($check as $wait => $key) { $future = $this->futures[$key]; - try { - if ($future->getException()) { + + $future->updateFuture(); + + if ($future->hasException()) { + if ($resolve === null) { $resolve = $wait; - continue; - } - if ($future->isReady()) { - if ($resolve === null) { - $resolve = $wait; - } - continue; } + continue; + } - $got_sockets = false; - $socks = $future->getReadSockets(); - if ($socks) { - $got_sockets = true; - foreach ($socks as $socket) { - $read_sockets[] = $socket; - } + if ($future->hasResult()) { + if ($resolve === null) { + $resolve = $wait; } + continue; + } - $socks = $future->getWriteSockets(); - if ($socks) { - $got_sockets = true; - foreach ($socks as $socket) { - $write_sockets[] = $socket; - } + $got_sockets = false; + $socks = $future->getReadSockets(); + if ($socks) { + $got_sockets = true; + foreach ($socks as $socket) { + $read_sockets[] = $socket; } + } - // If any currently active future had neither read nor write sockets, - // we can't wait for the current batch of items using sockets. - if (!$got_sockets) { - $can_use_sockets = false; - } else { - $wait_time = min($wait_time, $future->getDefaultWait()); + $socks = $future->getWriteSockets(); + if ($socks) { + $got_sockets = true; + foreach ($socks as $socket) { + $write_sockets[] = $socket; } - } catch (Exception $ex) { - $this->futures[$key]->setException($ex); - $resolve = $wait; - break; + } + + // If any currently active future had neither read nor write sockets, + // we can't wait for the current batch of items using sockets. + if (!$got_sockets) { + $can_use_sockets = false; + } else { + $wait_time = min($wait_time, $future->getDefaultWait()); } } + if ($resolve === null) { // Check for a setUpdateInterval() timeout. if ($timeout !== null) { $elapsed = microtime(true) - $start; if ($elapsed > $timeout) { $this->isTimeout = true; return; } else { $wait_time = $timeout - $elapsed; } } if ($can_use_sockets) { $this->waitForSockets($read_sockets, $write_sockets, $wait_time); } else { usleep(1000); } } } while ($resolve === null); $this->key = $this->wait[$resolve]; unset($this->wait[$resolve]); + $this->updateWorkingSet(); } /** * @task iterator */ public function current() { if ($this->isTimeout) { return null; } return $this->futures[$this->key]; } /** * @task iterator */ public function key() { if ($this->isTimeout) { return null; } return $this->key; } /** * @task iterator */ public function valid() { if ($this->isTimeout) { return true; } return ($this->key !== null); } /* -( Internals )---------------------------------------------------------- */ /** * @task internal */ protected function getWorkingSet() { if ($this->work === null) { return $this->wait; } return $this->work; } /** * @task internal */ protected function updateWorkingSet() { if (!$this->limit) { return; } $old = $this->work; $this->work = array_slice($this->wait, 0, $this->limit, true); // If we're using a limit, our futures are sleeping and need to be polled // to begin execution, so poll any futures which weren't in our working set // before. foreach ($this->work as $work => $key) { if (!isset($old[$work])) { $this->futures[$key]->isReady(); } } } /** * Wait for activity on one of several sockets. * * @param list List of sockets expected to become readable. * @param list List of sockets expected to become writable. * @param float Timeout, in seconds. * @return void */ private function waitForSockets( array $read_list, array $write_list, $timeout = 1.0) { static $handler_installed = false; if (!$handler_installed) { // If we're spawning child processes, we need to install a signal handler // here to catch cases like execing '(sleep 60 &) &' where the child // exits but a socket is kept open. But we don't actually need to do // anything because the SIGCHLD will interrupt the stream_select(), as // long as we have a handler registered. if (function_exists('pcntl_signal')) { if (!pcntl_signal(SIGCHLD, array(__CLASS__, 'handleSIGCHLD'))) { throw new Exception(pht('Failed to install signal handler!')); } } $handler_installed = true; } $timeout_sec = (int)$timeout; $timeout_usec = (int)(1000000 * ($timeout - $timeout_sec)); $exceptfds = array(); $ok = @stream_select( $read_list, $write_list, $exceptfds, $timeout_sec, $timeout_usec); if ($ok === false) { // Hopefully, means we received a SIGCHLD. In the worst case, we degrade // to a busy wait. } } public static function handleSIGCHLD($signo) { // This function is a dummy, we just need to have some handler registered // so that PHP will get interrupted during "stream_select()". If we don't // register a handler, "stream_select()" won't fail. } } diff --git a/src/future/FutureProxy.php b/src/future/FutureProxy.php index 4b90df40..65cbffa2 100644 --- a/src/future/FutureProxy.php +++ b/src/future/FutureProxy.php @@ -1,64 +1,68 @@ setProxiedFuture($proxied); } } public function setProxiedFuture(Future $proxied) { $this->proxied = $proxied; return $this; } protected function getProxiedFuture() { if (!$this->proxied) { throw new Exception(pht('The proxied future has not been provided yet.')); } return $this->proxied; } public function isReady() { - return $this->getProxiedFuture()->isReady(); - } + if ($this->hasResult()) { + return true; + } - public function resolve() { - $result = $this->getProxiedFuture()->resolve(); - $result = $this->didReceiveResult($result); - $this->setResult($result); - return $this->getResult(); - } + $proxied = $this->getProxiedFuture(); - public function setException(Exception $ex) { - $this->getProxiedFuture()->setException($ex); - return $this; + $is_ready = $proxied->isReady(); + + if ($proxied->hasResult()) { + $result = $proxied->getResult(); + $result = $this->didReceiveResult($result); + $this->setResult($result); + } + + return $is_ready; } - public function getException() { - return $this->getProxiedFuture()->getException(); + public function resolve() { + $this->getProxiedFuture()->resolve(); + $this->isReady(); + return $this->getResult(); } public function getReadSockets() { return $this->getProxiedFuture()->getReadSockets(); } public function getWriteSockets() { return $this->getProxiedFuture()->getWriteSockets(); } public function start() { $this->getProxiedFuture()->start(); return $this; } abstract protected function didReceiveResult($result); }