Changeset View
Changeset View
Standalone View
Standalone View
src/future/FutureIterator.php
| Show All 22 Lines | |||||
| * | * | ||||
| * For a general overview of futures, see @{article:Using Futures}. | * For a general overview of futures, see @{article:Using Futures}. | ||||
| * | * | ||||
| * @task basics Basics | * @task basics Basics | ||||
| * @task config Configuring Iteration | * @task config Configuring Iteration | ||||
| * @task iterator Iterator Interface | * @task iterator Iterator Interface | ||||
| * @task internal Internals | * @task internal Internals | ||||
| */ | */ | ||||
| final class FutureIterator extends Phobject implements Iterator { | final class FutureIterator | ||||
| extends Phobject | |||||
| protected $wait = array(); | implements Iterator { | ||||
| protected $work = array(); | |||||
| protected $futures = array(); | private $hold = array(); | ||||
| protected $key; | private $wait = array(); | ||||
| private $work = array(); | |||||
| protected $limit; | |||||
| private $futures = array(); | |||||
| protected $timeout; | private $key; | ||||
| protected $isTimeout = false; | |||||
| private $limit; | |||||
| private $timeout; | |||||
| private $isTimeout = false; | |||||
| private $hasRewound = false; | |||||
| /* -( Basics )------------------------------------------------------------- */ | /* -( Basics )------------------------------------------------------------- */ | ||||
| /** | /** | ||||
| * Create a new iterator over a list of futures. | * Create a new iterator over a list of futures. | ||||
| * | * | ||||
| * @param list List of @{class:Future}s to resolve. | * @param list List of @{class:Future}s to resolve. | ||||
| * @task basics | * @task basics | ||||
| */ | */ | ||||
| public function __construct(array $futures) { | public function __construct(array $futures) { | ||||
| assert_instances_of($futures, 'Future'); | assert_instances_of($futures, 'Future'); | ||||
| $this->futures = $futures; | |||||
| $respect_keys = !phutil_is_natural_list($futures); | |||||
| foreach ($futures as $map_key => $future) { | |||||
| if ($respect_keys) { | |||||
| $future->setFutureKey($map_key); | |||||
| } | |||||
| $this->addFuture($future); | |||||
| } | |||||
| } | } | ||||
| /** | /** | ||||
| * Block until all futures resolve. | * Block until all futures resolve. | ||||
| * | * | ||||
| * @return void | * @return void | ||||
| * @task basics | * @task basics | ||||
| */ | */ | ||||
| public function resolveAll() { | public function resolveAll() { | ||||
| // If a caller breaks out of a "foreach" and then calls "resolveAll()", | |||||
| // interpret it to mean that we should iterate over whatever futures | |||||
| // remain. | |||||
| if ($this->hasRewound) { | |||||
| while ($this->valid()) { | |||||
| $this->next(); | |||||
| } | |||||
| } else { | |||||
| iterator_to_array($this); | iterator_to_array($this); | ||||
| } | } | ||||
| } | |||||
| /** | /** | ||||
| * Add another future to the set of futures. This is useful if you have a | * Add another future to the set of futures. This is useful if you have a | ||||
| * set of futures to run mostly in parallel, but some futures depend on | * set of futures to run mostly in parallel, but some futures depend on | ||||
| * others. | * others. | ||||
| * | * | ||||
| * @param Future @{class:Future} to add to iterator | * @param Future @{class:Future} to add to iterator | ||||
| * @task basics | * @task basics | ||||
| */ | */ | ||||
| public function addFuture(Future $future, $key = null) { | public function addFuture(Future $future) { | ||||
| if ($key === null) { | $key = $future->getFutureKey(); | ||||
| $this->futures[] = $future; | |||||
| $this->wait[] = last_key($this->futures); | |||||
| } else if (!isset($this->futures[$key])) { | |||||
| $this->futures[$key] = $future; | |||||
| $this->wait[] = $key; | |||||
| } else { | |||||
| throw new Exception(pht('Invalid key %s', $key)); | |||||
| } | |||||
| // Start running the future if we don't have $this->limit futures running | if (isset($this->futures[$key])) { | ||||
| // already. updateWorkingSet() won't start running the future if there's no | throw new Exception( | ||||
| // limit, so we'll manually poke it here in that case. | pht( | ||||
| $this->updateWorkingSet(); | 'This future graph already has a future with key "%s". Each '. | ||||
| if (!$this->limit) { | 'future must have a unique key.', | ||||
| $future->isReady(); | $key)); | ||||
| } | } | ||||
| $this->futures[$key] = $future; | |||||
| $this->hold[$key] = $key; | |||||
| return $this; | return $this; | ||||
| } | } | ||||
| /* -( Configuring Iteration )---------------------------------------------- */ | /* -( Configuring Iteration )---------------------------------------------- */ | ||||
| /** | /** | ||||
| ▲ Show 20 Lines • Show All 42 Lines • ▼ Show 20 Lines | /* -( Configuring Iteration )---------------------------------------------- */ | ||||
| * @task config | * @task config | ||||
| */ | */ | ||||
| public function limit($max) { | public function limit($max) { | ||||
| $this->limit = $max; | $this->limit = $max; | ||||
| return $this; | return $this; | ||||
| } | } | ||||
| public function setMaximumWorkingSetSize($limit) { | |||||
| $this->limit = $limit; | |||||
| return $this; | |||||
| } | |||||
| public function getMaximumWorkingSetSize() { | |||||
| return $this->limit; | |||||
| } | |||||
| /* -( Iterator Interface )------------------------------------------------- */ | /* -( Iterator Interface )------------------------------------------------- */ | ||||
| /** | /** | ||||
| * @task iterator | * @task iterator | ||||
| */ | */ | ||||
| public function rewind() { | public function rewind() { | ||||
| $this->wait = array_keys($this->futures); | if ($this->hasRewound) { | ||||
| $this->work = null; | throw new Exception( | ||||
| $this->updateWorkingSet(); | pht('Future graphs can not be rewound.')); | ||||
| } | |||||
| $this->hasRewound = true; | |||||
| $this->next(); | $this->next(); | ||||
| } | } | ||||
| /** | /** | ||||
| * @task iterator | * @task iterator | ||||
| */ | */ | ||||
| public function next() { | public function next() { | ||||
| $this->key = null; | $this->key = null; | ||||
| if (!count($this->wait)) { | |||||
| $this->updateWorkingSet(); | |||||
| if (!$this->work) { | |||||
| return; | return; | ||||
| } | } | ||||
| $read_sockets = array(); | |||||
| $write_sockets = array(); | |||||
| $start = microtime(true); | $start = microtime(true); | ||||
| $timeout = $this->timeout; | $timeout = $this->timeout; | ||||
| $this->isTimeout = false; | $this->isTimeout = false; | ||||
| $check = $this->getWorkingSet(); | $working_set = array_select_keys($this->futures, $this->work); | ||||
| $resolve = null; | while (true) { | ||||
| do { | // Update every future first. This is a no-op on futures which have | ||||
| $read_sockets = array(); | // already resolved or failed, but we want to give futures an | ||||
| $write_sockets = array(); | // opportunity to make progress even if we can resolve something. | ||||
| $can_use_sockets = true; | |||||
| $wait_time = 1; | |||||
| foreach ($check as $wait => $key) { | |||||
| $future = $this->futures[$key]; | |||||
| foreach ($working_set as $future_key => $future) { | |||||
| $future->updateFuture(); | $future->updateFuture(); | ||||
| } | |||||
| // Check if any future has resolved or failed. If we have any such | |||||
| // futures, we'll return the first one from the iterator. | |||||
| $resolve_key = null; | |||||
| foreach ($working_set as $future_key => $future) { | |||||
| if ($future->hasException()) { | if ($future->hasException()) { | ||||
| if ($resolve === null) { | $resolve_key = $future_key; | ||||
| $resolve = $wait; | break; | ||||
| } | |||||
| continue; | |||||
| } | } | ||||
| if ($future->hasResult()) { | if ($future->hasResult()) { | ||||
| if ($resolve === null) { | $resolve_key = $future_key; | ||||
| $resolve = $wait; | break; | ||||
| } | } | ||||
| continue; | |||||
| } | } | ||||
| $got_sockets = false; | // We've found a future to resolve, so we're done here for now. | ||||
| $socks = $future->getReadSockets(); | |||||
| if ($socks) { | |||||
| $got_sockets = true; | |||||
| foreach ($socks as $socket) { | |||||
| $read_sockets[] = $socket; | |||||
| } | |||||
| } | |||||
| $socks = $future->getWriteSockets(); | if ($resolve_key !== null) { | ||||
| if ($socks) { | $this->moveFutureToDone($resolve_key); | ||||
| $got_sockets = true; | return; | ||||
| foreach ($socks as $socket) { | |||||
| $write_sockets[] = $socket; | |||||
| } | |||||
| } | |||||
| // If any currently active future had neither read nor write sockets, | |||||
| // we can't wait for the current batch of items using sockets. | |||||
| if (!$got_sockets) { | |||||
| $can_use_sockets = false; | |||||
| } else { | |||||
| $wait_time = min($wait_time, $future->getDefaultWait()); | |||||
| } | |||||
| } | } | ||||
| if ($resolve === null) { | // We don't have any futures to resolve yet. Check if we're reached | ||||
| // an update interval. | |||||
| // Check for a setUpdateInterval() timeout. | $wait_time = 1; | ||||
| if ($timeout !== null) { | if ($timeout !== null) { | ||||
| $elapsed = microtime(true) - $start; | $elapsed = microtime(true) - $start; | ||||
| if ($elapsed > $timeout) { | if ($elapsed > $timeout) { | ||||
| $this->isTimeout = true; | $this->isTimeout = true; | ||||
| return; | return; | ||||
| } else { | } | ||||
| $wait_time = $timeout - $elapsed; | |||||
| $wait_time = min($wait_time, $timeout - $elapsed); | |||||
| } | |||||
| // We're going to wait. If possible, we'd like to wait with sockets. | |||||
| // If we can't, we'll just sleep. | |||||
| $read_sockets = array(); | |||||
| $write_sockets = array(); | |||||
| foreach ($working_set as $future_key => $future) { | |||||
| $sockets = $future->getReadSockets(); | |||||
| foreach ($sockets as $socket) { | |||||
| $read_sockets[] = $socket; | |||||
| } | |||||
| $sockets = $future->getWriteSockets(); | |||||
| foreach ($sockets as $socket) { | |||||
| $write_sockets[] = $socket; | |||||
| } | } | ||||
| } | } | ||||
| if ($can_use_sockets) { | $use_sockets = ($read_sockets || $write_sockets); | ||||
| if ($use_sockets) { | |||||
| foreach ($working_set as $future) { | |||||
| $wait_time = min($wait_time, $future->getDefaultWait()); | |||||
| } | |||||
| $this->waitForSockets($read_sockets, $write_sockets, $wait_time); | $this->waitForSockets($read_sockets, $write_sockets, $wait_time); | ||||
| } else { | } else { | ||||
| usleep(1000); | usleep(1000); | ||||
| } | } | ||||
| } | } | ||||
| } while ($resolve === null); | |||||
| $this->key = $this->wait[$resolve]; | |||||
| unset($this->wait[$resolve]); | |||||
| $this->updateWorkingSet(); | |||||
| } | } | ||||
| /** | /** | ||||
| * @task iterator | * @task iterator | ||||
| */ | */ | ||||
| public function current() { | public function current() { | ||||
| if ($this->isTimeout) { | if ($this->isTimeout) { | ||||
| return null; | return null; | ||||
| Show All 19 Lines | if ($this->isTimeout) { | ||||
| return true; | return true; | ||||
| } | } | ||||
| return ($this->key !== null); | return ($this->key !== null); | ||||
| } | } | ||||
| /* -( Internals )---------------------------------------------------------- */ | /* -( Internals )---------------------------------------------------------- */ | ||||
| /** | /** | ||||
| * @task internal | * @task internal | ||||
| */ | */ | ||||
| protected function getWorkingSet() { | protected function updateWorkingSet() { | ||||
| if ($this->work === null) { | $limit = $this->getMaximumWorkingSetSize(); | ||||
| return $this->wait; | $work_count = count($this->work); | ||||
| // If we're already working on the maximum number of futures, we just have | |||||
| // to wait for something to resolve. There's no benefit to updating the | |||||
| // queue since we can never make any meaningful progress. | |||||
| if ($limit) { | |||||
| if ($work_count >= $limit) { | |||||
| return; | |||||
| } | |||||
| } | } | ||||
| return $this->work; | // If any futures that are currently held are no longer blocked by | ||||
| // dependencies, move them from "hold" to "wait". | |||||
| foreach ($this->hold as $future_key) { | |||||
| if (!$this->canMoveFutureToWait($future_key)) { | |||||
| continue; | |||||
| } | } | ||||
| /** | $this->moveFutureToWait($future_key); | ||||
| * @task internal | } | ||||
| */ | |||||
| protected function updateWorkingSet() { | $wait_count = count($this->wait); | ||||
| if (!$this->limit) { | $hold_count = count($this->hold); | ||||
| if (!$work_count && !$wait_count && $hold_count) { | |||||
| throw new Exception( | |||||
| pht( | |||||
| 'Future graph is stalled: some futures are held, but no futures '. | |||||
| 'are waiting or working. The graph can never resolve.')); | |||||
| } | |||||
| // Figure out how many futures we can start. If we don't have a limit, | |||||
| // we can start every waiting future. If we do have a limit, we can only | |||||
| // start as many futures as we have slots for. | |||||
| if ($limit) { | |||||
| $work_limit = min($limit, $wait_count); | |||||
| } else { | |||||
| $work_limit = $wait_count; | |||||
| } | |||||
| // If we're ready to start futures, start them now. | |||||
| if ($work_limit) { | |||||
| foreach ($this->wait as $future_key) { | |||||
| $this->moveFutureToWork($future_key); | |||||
| $work_limit--; | |||||
| if (!$work_limit) { | |||||
| return; | return; | ||||
| } | } | ||||
| } | |||||
| } | |||||
| $old = $this->work; | } | ||||
| $this->work = array_slice($this->wait, 0, $this->limit, true); | |||||
| // If we're using a limit, our futures are sleeping and need to be polled | private function canMoveFutureToWait($future_key) { | ||||
| // to begin execution, so poll any futures which weren't in our working set | return true; | ||||
| // before. | |||||
| foreach ($this->work as $work => $key) { | |||||
| if (!isset($old[$work])) { | |||||
| $this->futures[$key]->isReady(); | |||||
| } | } | ||||
| private function moveFutureToWait($future_key) { | |||||
| unset($this->hold[$future_key]); | |||||
| $this->wait[$future_key] = $future_key; | |||||
| } | } | ||||
| private function moveFutureToWork($future_key) { | |||||
| unset($this->wait[$future_key]); | |||||
| $this->work[$future_key] = $future_key; | |||||
| $this->futures[$future_key]->startFuture(); | |||||
| } | } | ||||
| private function moveFutureToDone($future_key) { | |||||
| $this->key = $future_key; | |||||
| unset($this->work[$future_key]); | |||||
| // Before we return, do another working set update so we start any | |||||
| // futures that are ready to go as soon as we can. | |||||
| $this->updateWorkingSet(); | |||||
| $this->futures[$future_key]->endFuture(); | |||||
| } | |||||
| /** | /** | ||||
| * Wait for activity on one of several sockets. | * Wait for activity on one of several sockets. | ||||
| * | * | ||||
| * @param list List of sockets expected to become readable. | * @param list List of sockets expected to become readable. | ||||
| * @param list List of sockets expected to become writable. | * @param list List of sockets expected to become writable. | ||||
| * @param float Timeout, in seconds. | * @param float Timeout, in seconds. | ||||
| * @return void | * @return void | ||||
| ▲ Show 20 Lines • Show All 47 Lines • Show Last 20 Lines | |||||