Changeset View
Changeset View
Standalone View
Standalone View
src/future/FutureIterator.php
| Show First 20 Lines • Show All 174 Lines • ▼ Show 20 Lines | public function next() { | ||||
| $read_sockets = array(); | $read_sockets = array(); | ||||
| $write_sockets = array(); | $write_sockets = array(); | ||||
| $start = microtime(true); | $start = microtime(true); | ||||
| $timeout = $this->timeout; | $timeout = $this->timeout; | ||||
| $this->isTimeout = false; | $this->isTimeout = false; | ||||
| $check = $this->getWorkingSet(); | $check = $this->getWorkingSet(); | ||||
| $resolve = null; | $resolve = null; | ||||
| do { | do { | ||||
| $read_sockets = array(); | $read_sockets = array(); | ||||
| $write_sockets = array(); | $write_sockets = array(); | ||||
| $can_use_sockets = true; | $can_use_sockets = true; | ||||
| $wait_time = 1; | $wait_time = 1; | ||||
| foreach ($check as $wait => $key) { | foreach ($check as $wait => $key) { | ||||
| $future = $this->futures[$key]; | $future = $this->futures[$key]; | ||||
| try { | |||||
| if ($future->getException()) { | $future->updateFuture(); | ||||
| if ($future->hasException()) { | |||||
| if ($resolve === null) { | |||||
| $resolve = $wait; | $resolve = $wait; | ||||
| } | |||||
| continue; | continue; | ||||
| } | } | ||||
| if ($future->isReady()) { | |||||
| if ($future->hasResult()) { | |||||
| if ($resolve === null) { | if ($resolve === null) { | ||||
| $resolve = $wait; | $resolve = $wait; | ||||
| } | } | ||||
| continue; | continue; | ||||
| } | } | ||||
| $got_sockets = false; | $got_sockets = false; | ||||
| $socks = $future->getReadSockets(); | $socks = $future->getReadSockets(); | ||||
| if ($socks) { | if ($socks) { | ||||
| $got_sockets = true; | $got_sockets = true; | ||||
| foreach ($socks as $socket) { | foreach ($socks as $socket) { | ||||
| $read_sockets[] = $socket; | $read_sockets[] = $socket; | ||||
| } | } | ||||
| } | } | ||||
| $socks = $future->getWriteSockets(); | $socks = $future->getWriteSockets(); | ||||
| if ($socks) { | if ($socks) { | ||||
| $got_sockets = true; | $got_sockets = true; | ||||
| foreach ($socks as $socket) { | foreach ($socks as $socket) { | ||||
| $write_sockets[] = $socket; | $write_sockets[] = $socket; | ||||
| } | } | ||||
| } | } | ||||
| // If any currently active future had neither read nor write sockets, | // If any currently active future had neither read nor write sockets, | ||||
| // we can't wait for the current batch of items using sockets. | // we can't wait for the current batch of items using sockets. | ||||
| if (!$got_sockets) { | if (!$got_sockets) { | ||||
| $can_use_sockets = false; | $can_use_sockets = false; | ||||
| } else { | } else { | ||||
| $wait_time = min($wait_time, $future->getDefaultWait()); | $wait_time = min($wait_time, $future->getDefaultWait()); | ||||
| } | } | ||||
| } catch (Exception $ex) { | |||||
| $this->futures[$key]->setException($ex); | |||||
| $resolve = $wait; | |||||
| break; | |||||
| } | |||||
| } | } | ||||
| if ($resolve === null) { | if ($resolve === null) { | ||||
| // Check for a setUpdateInterval() timeout. | // Check for a setUpdateInterval() timeout. | ||||
| if ($timeout !== null) { | if ($timeout !== null) { | ||||
| $elapsed = microtime(true) - $start; | $elapsed = microtime(true) - $start; | ||||
| if ($elapsed > $timeout) { | if ($elapsed > $timeout) { | ||||
| $this->isTimeout = true; | $this->isTimeout = true; | ||||
| return; | return; | ||||
| } else { | } else { | ||||
| $wait_time = $timeout - $elapsed; | $wait_time = $timeout - $elapsed; | ||||
| } | } | ||||
| } | } | ||||
| if ($can_use_sockets) { | if ($can_use_sockets) { | ||||
| $this->waitForSockets($read_sockets, $write_sockets, $wait_time); | $this->waitForSockets($read_sockets, $write_sockets, $wait_time); | ||||
| } else { | } else { | ||||
| usleep(1000); | usleep(1000); | ||||
| } | } | ||||
| } | } | ||||
| } while ($resolve === null); | } while ($resolve === null); | ||||
| $this->key = $this->wait[$resolve]; | $this->key = $this->wait[$resolve]; | ||||
| unset($this->wait[$resolve]); | unset($this->wait[$resolve]); | ||||
| $this->updateWorkingSet(); | $this->updateWorkingSet(); | ||||
| } | } | ||||
| /** | /** | ||||
| * @task iterator | * @task iterator | ||||
| */ | */ | ||||
| public function current() { | public function current() { | ||||
| if ($this->isTimeout) { | if ($this->isTimeout) { | ||||
| ▲ Show 20 Lines • Show All 116 Lines • Show Last 20 Lines | |||||