Changeset View
Changeset View
Standalone View
Standalone View
src/future/FutureIterator.php
| Show First 20 Lines • Show All 57 Lines • ▼ Show 20 Lines | /* -( Basics )------------------------------------------------------------- */ | ||||
| /** | /** | ||||
| * Block until all futures resolve. | * Block until all futures resolve. | ||||
| * | * | ||||
| * @return void | * @return void | ||||
| * @task basics | * @task basics | ||||
| */ | */ | ||||
| public function resolveAll() { | public function resolveAll() { | ||||
| foreach ($this as $future) { | iterator_to_array($this); | ||||
| $future->resolve(); | |||||
| } | |||||
| } | } | ||||
| /** | /** | ||||
| * Add another future to the set of futures. This is useful if you have a | * Add another future to the set of futures. This is useful if you have a | ||||
| * set of futures to run mostly in parallel, but some futures depend on | * set of futures to run mostly in parallel, but some futures depend on | ||||
| * others. | * others. | ||||
| * | * | ||||
| * @param Future @{class:Future} to add to iterator | * @param Future @{class:Future} to add to iterator | ||||
| ▲ Show 20 Lines • Show All 164 Lines • ▼ Show 20 Lines | do { | ||||
| $this->isTimeout = true; | $this->isTimeout = true; | ||||
| return; | return; | ||||
| } else { | } else { | ||||
| $wait_time = $timeout - $elapsed; | $wait_time = $timeout - $elapsed; | ||||
| } | } | ||||
| } | } | ||||
| if ($can_use_sockets) { | if ($can_use_sockets) { | ||||
| Future::waitForSockets($read_sockets, $write_sockets, $wait_time); | $this->waitForSockets($read_sockets, $write_sockets, $wait_time); | ||||
| } else { | } else { | ||||
| usleep(1000); | usleep(1000); | ||||
| } | } | ||||
| } | } | ||||
| } while ($resolve === null); | } while ($resolve === null); | ||||
| $this->key = $this->wait[$resolve]; | $this->key = $this->wait[$resolve]; | ||||
| unset($this->wait[$resolve]); | unset($this->wait[$resolve]); | ||||
| ▲ Show 20 Lines • Show All 61 Lines • ▼ Show 20 Lines | protected function updateWorkingSet() { | ||||
| // before. | // before. | ||||
| foreach ($this->work as $work => $key) { | foreach ($this->work as $work => $key) { | ||||
| if (!isset($old[$work])) { | if (!isset($old[$work])) { | ||||
| $this->futures[$key]->isReady(); | $this->futures[$key]->isReady(); | ||||
| } | } | ||||
| } | } | ||||
| } | } | ||||
| /** | |||||
| * Wait for activity on one of several sockets. | |||||
| * | |||||
| * @param list List of sockets expected to become readable. | |||||
| * @param list List of sockets expected to become writable. | |||||
| * @param float Timeout, in seconds. | |||||
| * @return void | |||||
| */ | |||||
| private function waitForSockets( | |||||
| array $read_list, | |||||
| array $write_list, | |||||
| $timeout = 1.0) { | |||||
| static $handler_installed = false; | |||||
| if (!$handler_installed) { | |||||
| // If we're spawning child processes, we need to install a signal handler | |||||
| // here to catch cases like execing '(sleep 60 &) &' where the child | |||||
| // exits but a socket is kept open. But we don't actually need to do | |||||
| // anything because the SIGCHLD will interrupt the stream_select(), as | |||||
| // long as we have a handler registered. | |||||
| if (function_exists('pcntl_signal')) { | |||||
| if (!pcntl_signal(SIGCHLD, array(__CLASS__, 'handleSIGCHLD'))) { | |||||
| throw new Exception(pht('Failed to install signal handler!')); | |||||
| } | |||||
| } | |||||
| $handler_installed = true; | |||||
| } | |||||
| $timeout_sec = (int)$timeout; | |||||
| $timeout_usec = (int)(1000000 * ($timeout - $timeout_sec)); | |||||
| $exceptfds = array(); | |||||
| $ok = @stream_select( | |||||
| $read_list, | |||||
| $write_list, | |||||
| $exceptfds, | |||||
| $timeout_sec, | |||||
| $timeout_usec); | |||||
| if ($ok === false) { | |||||
| // Hopefully, means we received a SIGCHLD. In the worst case, we degrade | |||||
| // to a busy wait. | |||||
| } | |||||
| } | |||||
| public static function handleSIGCHLD($signo) { | |||||
| // This function is a dummy, we just need to have some handler registered | |||||
| // so that PHP will get interrupted during "stream_select()". If we don't | |||||
| // register a handler, "stream_select()" won't fail. | |||||
| } | |||||
| } | } | ||||