diff --git a/src/future/Future.php b/src/future/Future.php --- a/src/future/Future.php +++ b/src/future/Future.php @@ -9,8 +9,7 @@ private $hasResult = false; private $result; - - protected $exception; + private $exception; /** * Is this future's process complete? Specifically, can this future be @@ -37,23 +36,34 @@ 'timeout.')); } - $graph = new FutureIterator(array($this)); - $graph->resolveAll(); + if (!$this->hasResult()) { + $graph = new FutureIterator(array($this)); + $graph->resolveAll(); + } - if ($this->exception) { - throw $this->exception; + if ($this->hasException()) { + throw $this->getException(); } return $this->getResult(); } - public function setException(Exception $ex) { - $this->exception = $ex; - return $this; - } + final public function updateFuture() { + if ($this->hasException()) { + return; + } - public function getException() { - return $this->exception; + if ($this->hasResult()) { + return; + } + + try { + $this->isReady(); + } catch (Exception $ex) { + $this->setException($ex); + } catch (Throwable $ex) { + $this->setException($ex); + } } /** @@ -125,8 +135,23 @@ return $this; } - final protected function hasResult() { + final public function hasResult() { return $this->hasResult; } + final private function setException($exception) { + // NOTE: The parameter may be an Exception or a Throwable. + $this->exception = $exception; + return $this; + } + + final private function getException() { + return $this->exception; + } + + final public function hasException() { + return ($this->exception !== null); + } + + } diff --git a/src/future/FutureIterator.php b/src/future/FutureIterator.php --- a/src/future/FutureIterator.php +++ b/src/future/FutureIterator.php @@ -180,6 +180,7 @@ $this->isTimeout = false; $check = $this->getWorkingSet(); + $resolve = null; do { $read_sockets = array(); @@ -188,48 +189,49 @@ $wait_time = 1; foreach ($check as $wait => $key) { $future = $this->futures[$key]; - try { - if ($future->getException()) { + + $future->updateFuture(); + + if ($future->hasException()) { + if ($resolve === null) { $resolve = $wait; - continue; - } - if ($future->isReady()) { - if ($resolve === null) { - $resolve = $wait; - } - continue; } + continue; + } - $got_sockets = false; - $socks = $future->getReadSockets(); - if ($socks) { - $got_sockets = true; - foreach ($socks as $socket) { - $read_sockets[] = $socket; - } + if ($future->hasResult()) { + if ($resolve === null) { + $resolve = $wait; } + continue; + } - $socks = $future->getWriteSockets(); - if ($socks) { - $got_sockets = true; - foreach ($socks as $socket) { - $write_sockets[] = $socket; - } + $got_sockets = false; + $socks = $future->getReadSockets(); + if ($socks) { + $got_sockets = true; + foreach ($socks as $socket) { + $read_sockets[] = $socket; } + } - // If any currently active future had neither read nor write sockets, - // we can't wait for the current batch of items using sockets. - if (!$got_sockets) { - $can_use_sockets = false; - } else { - $wait_time = min($wait_time, $future->getDefaultWait()); + $socks = $future->getWriteSockets(); + if ($socks) { + $got_sockets = true; + foreach ($socks as $socket) { + $write_sockets[] = $socket; } - } catch (Exception $ex) { - $this->futures[$key]->setException($ex); - $resolve = $wait; - break; + } + + // If any currently active future had neither read nor write sockets, + // we can't wait for the current batch of items using sockets. + if (!$got_sockets) { + $can_use_sockets = false; + } else { + $wait_time = min($wait_time, $future->getDefaultWait()); } } + if ($resolve === null) { // Check for a setUpdateInterval() timeout. @@ -253,6 +255,7 @@ $this->key = $this->wait[$resolve]; unset($this->wait[$resolve]); + $this->updateWorkingSet(); } diff --git a/src/future/FutureProxy.php b/src/future/FutureProxy.php --- a/src/future/FutureProxy.php +++ b/src/future/FutureProxy.php @@ -27,23 +27,27 @@ } public function isReady() { - return $this->getProxiedFuture()->isReady(); - } + if ($this->hasResult()) { + return true; + } - public function resolve() { - $result = $this->getProxiedFuture()->resolve(); - $result = $this->didReceiveResult($result); - $this->setResult($result); - return $this->getResult(); - } + $proxied = $this->getProxiedFuture(); - public function setException(Exception $ex) { - $this->getProxiedFuture()->setException($ex); - return $this; + $is_ready = $proxied->isReady(); + + if ($proxied->hasResult()) { + $result = $proxied->getResult(); + $result = $this->didReceiveResult($result); + $this->setResult($result); + } + + return $is_ready; } - public function getException() { - return $this->getProxiedFuture()->getException(); + public function resolve() { + $this->getProxiedFuture()->resolve(); + $this->isReady(); + return $this->getResult(); } public function getReadSockets() {