Changeset View
Changeset View
Standalone View
Standalone View
src/future/FutureIterator.php
Show All 22 Lines | |||||
* | * | ||||
* For a general overview of futures, see @{article:Using Futures}. | * For a general overview of futures, see @{article:Using Futures}. | ||||
* | * | ||||
* @task basics Basics | * @task basics Basics | ||||
* @task config Configuring Iteration | * @task config Configuring Iteration | ||||
* @task iterator Iterator Interface | * @task iterator Iterator Interface | ||||
* @task internal Internals | * @task internal Internals | ||||
*/ | */ | ||||
final class FutureIterator extends Phobject implements Iterator { | final class FutureIterator | ||||
extends Phobject | |||||
protected $wait = array(); | implements Iterator { | ||||
protected $work = array(); | |||||
protected $futures = array(); | private $hold = array(); | ||||
protected $key; | private $wait = array(); | ||||
private $work = array(); | |||||
protected $limit; | |||||
private $futures = array(); | |||||
protected $timeout; | private $key; | ||||
protected $isTimeout = false; | |||||
private $limit; | |||||
private $timeout; | |||||
private $isTimeout = false; | |||||
private $hasRewound = false; | |||||
/* -( Basics )------------------------------------------------------------- */ | /* -( Basics )------------------------------------------------------------- */ | ||||
/** | /** | ||||
* Create a new iterator over a list of futures. | * Create a new iterator over a list of futures. | ||||
* | * | ||||
* @param list List of @{class:Future}s to resolve. | * @param list List of @{class:Future}s to resolve. | ||||
* @task basics | * @task basics | ||||
*/ | */ | ||||
public function __construct(array $futures) { | public function __construct(array $futures) { | ||||
assert_instances_of($futures, 'Future'); | assert_instances_of($futures, 'Future'); | ||||
$this->futures = $futures; | |||||
$respect_keys = !phutil_is_natural_list($futures); | |||||
foreach ($futures as $map_key => $future) { | |||||
if ($respect_keys) { | |||||
$future->setFutureKey($map_key); | |||||
} | |||||
$this->addFuture($future); | |||||
} | |||||
} | } | ||||
/** | /** | ||||
* Block until all futures resolve. | * Block until all futures resolve. | ||||
* | * | ||||
* @return void | * @return void | ||||
* @task basics | * @task basics | ||||
*/ | */ | ||||
public function resolveAll() { | public function resolveAll() { | ||||
// If a caller breaks out of a "foreach" and then calls "resolveAll()", | |||||
// interpret it to mean that we should iterate over whatever futures | |||||
// remain. | |||||
if ($this->hasRewound) { | |||||
while ($this->valid()) { | |||||
$this->next(); | |||||
} | |||||
} else { | |||||
iterator_to_array($this); | iterator_to_array($this); | ||||
} | } | ||||
} | |||||
/** | /** | ||||
* Add another future to the set of futures. This is useful if you have a | * Add another future to the set of futures. This is useful if you have a | ||||
* set of futures to run mostly in parallel, but some futures depend on | * set of futures to run mostly in parallel, but some futures depend on | ||||
* others. | * others. | ||||
* | * | ||||
* @param Future @{class:Future} to add to iterator | * @param Future @{class:Future} to add to iterator | ||||
* @task basics | * @task basics | ||||
*/ | */ | ||||
public function addFuture(Future $future, $key = null) { | public function addFuture(Future $future) { | ||||
if ($key === null) { | $key = $future->getFutureKey(); | ||||
$this->futures[] = $future; | |||||
$this->wait[] = last_key($this->futures); | |||||
} else if (!isset($this->futures[$key])) { | |||||
$this->futures[$key] = $future; | |||||
$this->wait[] = $key; | |||||
} else { | |||||
throw new Exception(pht('Invalid key %s', $key)); | |||||
} | |||||
// Start running the future if we don't have $this->limit futures running | if (isset($this->futures[$key])) { | ||||
// already. updateWorkingSet() won't start running the future if there's no | throw new Exception( | ||||
// limit, so we'll manually poke it here in that case. | pht( | ||||
$this->updateWorkingSet(); | 'This future graph already has a future with key "%s". Each '. | ||||
if (!$this->limit) { | 'future must have a unique key.', | ||||
$future->isReady(); | $key)); | ||||
} | } | ||||
$this->futures[$key] = $future; | |||||
$this->hold[$key] = $key; | |||||
return $this; | return $this; | ||||
} | } | ||||
/* -( Configuring Iteration )---------------------------------------------- */ | /* -( Configuring Iteration )---------------------------------------------- */ | ||||
/** | /** | ||||
▲ Show 20 Lines • Show All 42 Lines • ▼ Show 20 Lines | /* -( Configuring Iteration )---------------------------------------------- */ | ||||
* @task config | * @task config | ||||
*/ | */ | ||||
public function limit($max) { | public function limit($max) { | ||||
$this->limit = $max; | $this->limit = $max; | ||||
return $this; | return $this; | ||||
} | } | ||||
public function setMaximumWorkingSetSize($limit) { | |||||
$this->limit = $limit; | |||||
return $this; | |||||
} | |||||
public function getMaximumWorkingSetSize() { | |||||
return $this->limit; | |||||
} | |||||
/* -( Iterator Interface )------------------------------------------------- */ | /* -( Iterator Interface )------------------------------------------------- */ | ||||
/** | /** | ||||
* @task iterator | * @task iterator | ||||
*/ | */ | ||||
public function rewind() { | public function rewind() { | ||||
$this->wait = array_keys($this->futures); | if ($this->hasRewound) { | ||||
$this->work = null; | throw new Exception( | ||||
$this->updateWorkingSet(); | pht('Future graphs can not be rewound.')); | ||||
} | |||||
$this->hasRewound = true; | |||||
$this->next(); | $this->next(); | ||||
} | } | ||||
/** | /** | ||||
* @task iterator | * @task iterator | ||||
*/ | */ | ||||
public function next() { | public function next() { | ||||
$this->key = null; | $this->key = null; | ||||
if (!count($this->wait)) { | |||||
$this->updateWorkingSet(); | |||||
if (!$this->work) { | |||||
return; | return; | ||||
} | } | ||||
$read_sockets = array(); | |||||
$write_sockets = array(); | |||||
$start = microtime(true); | $start = microtime(true); | ||||
$timeout = $this->timeout; | $timeout = $this->timeout; | ||||
$this->isTimeout = false; | $this->isTimeout = false; | ||||
$check = $this->getWorkingSet(); | $working_set = array_select_keys($this->futures, $this->work); | ||||
$resolve = null; | while (true) { | ||||
do { | // Update every future first. This is a no-op on futures which have | ||||
$read_sockets = array(); | // already resolved or failed, but we want to give futures an | ||||
$write_sockets = array(); | // opportunity to make progress even if we can resolve something. | ||||
$can_use_sockets = true; | |||||
$wait_time = 1; | |||||
foreach ($check as $wait => $key) { | |||||
$future = $this->futures[$key]; | |||||
foreach ($working_set as $future_key => $future) { | |||||
$future->updateFuture(); | $future->updateFuture(); | ||||
} | |||||
// Check if any future has resolved or failed. If we have any such | |||||
// futures, we'll return the first one from the iterator. | |||||
$resolve_key = null; | |||||
foreach ($working_set as $future_key => $future) { | |||||
if ($future->hasException()) { | if ($future->hasException()) { | ||||
if ($resolve === null) { | $resolve_key = $future_key; | ||||
$resolve = $wait; | break; | ||||
} | |||||
continue; | |||||
} | } | ||||
if ($future->hasResult()) { | if ($future->hasResult()) { | ||||
if ($resolve === null) { | $resolve_key = $future_key; | ||||
$resolve = $wait; | break; | ||||
} | } | ||||
continue; | |||||
} | } | ||||
$got_sockets = false; | // We've found a future to resolve, so we're done here for now. | ||||
$socks = $future->getReadSockets(); | |||||
if ($socks) { | |||||
$got_sockets = true; | |||||
foreach ($socks as $socket) { | |||||
$read_sockets[] = $socket; | |||||
} | |||||
} | |||||
$socks = $future->getWriteSockets(); | if ($resolve_key !== null) { | ||||
if ($socks) { | $this->moveFutureToDone($resolve_key); | ||||
$got_sockets = true; | return; | ||||
foreach ($socks as $socket) { | |||||
$write_sockets[] = $socket; | |||||
} | |||||
} | |||||
// If any currently active future had neither read nor write sockets, | |||||
// we can't wait for the current batch of items using sockets. | |||||
if (!$got_sockets) { | |||||
$can_use_sockets = false; | |||||
} else { | |||||
$wait_time = min($wait_time, $future->getDefaultWait()); | |||||
} | |||||
} | } | ||||
if ($resolve === null) { | // We don't have any futures to resolve yet. Check if we're reached | ||||
// an update interval. | |||||
// Check for a setUpdateInterval() timeout. | $wait_time = 1; | ||||
if ($timeout !== null) { | if ($timeout !== null) { | ||||
$elapsed = microtime(true) - $start; | $elapsed = microtime(true) - $start; | ||||
if ($elapsed > $timeout) { | if ($elapsed > $timeout) { | ||||
$this->isTimeout = true; | $this->isTimeout = true; | ||||
return; | return; | ||||
} else { | } | ||||
$wait_time = $timeout - $elapsed; | |||||
$wait_time = min($wait_time, $timeout - $elapsed); | |||||
} | |||||
// We're going to wait. If possible, we'd like to wait with sockets. | |||||
// If we can't, we'll just sleep. | |||||
$read_sockets = array(); | |||||
$write_sockets = array(); | |||||
foreach ($working_set as $future_key => $future) { | |||||
$sockets = $future->getReadSockets(); | |||||
foreach ($sockets as $socket) { | |||||
$read_sockets[] = $socket; | |||||
} | |||||
$sockets = $future->getWriteSockets(); | |||||
foreach ($sockets as $socket) { | |||||
$write_sockets[] = $socket; | |||||
} | } | ||||
} | } | ||||
if ($can_use_sockets) { | $use_sockets = ($read_sockets || $write_sockets); | ||||
if ($use_sockets) { | |||||
foreach ($working_set as $future) { | |||||
$wait_time = min($wait_time, $future->getDefaultWait()); | |||||
} | |||||
$this->waitForSockets($read_sockets, $write_sockets, $wait_time); | $this->waitForSockets($read_sockets, $write_sockets, $wait_time); | ||||
} else { | } else { | ||||
usleep(1000); | usleep(1000); | ||||
} | } | ||||
} | } | ||||
} while ($resolve === null); | |||||
$this->key = $this->wait[$resolve]; | |||||
unset($this->wait[$resolve]); | |||||
$this->updateWorkingSet(); | |||||
} | } | ||||
/** | /** | ||||
* @task iterator | * @task iterator | ||||
*/ | */ | ||||
public function current() { | public function current() { | ||||
if ($this->isTimeout) { | if ($this->isTimeout) { | ||||
return null; | return null; | ||||
Show All 19 Lines | if ($this->isTimeout) { | ||||
return true; | return true; | ||||
} | } | ||||
return ($this->key !== null); | return ($this->key !== null); | ||||
} | } | ||||
/* -( Internals )---------------------------------------------------------- */ | /* -( Internals )---------------------------------------------------------- */ | ||||
/** | /** | ||||
* @task internal | * @task internal | ||||
*/ | */ | ||||
protected function getWorkingSet() { | protected function updateWorkingSet() { | ||||
if ($this->work === null) { | $limit = $this->getMaximumWorkingSetSize(); | ||||
return $this->wait; | $work_count = count($this->work); | ||||
// If we're already working on the maximum number of futures, we just have | |||||
// to wait for something to resolve. There's no benefit to updating the | |||||
// queue since we can never make any meaningful progress. | |||||
if ($limit) { | |||||
if ($work_count >= $limit) { | |||||
return; | |||||
} | |||||
} | } | ||||
return $this->work; | // If any futures that are currently held are no longer blocked by | ||||
// dependencies, move them from "hold" to "wait". | |||||
foreach ($this->hold as $future_key) { | |||||
if (!$this->canMoveFutureToWait($future_key)) { | |||||
continue; | |||||
} | } | ||||
/** | $this->moveFutureToWait($future_key); | ||||
* @task internal | } | ||||
*/ | |||||
protected function updateWorkingSet() { | $wait_count = count($this->wait); | ||||
if (!$this->limit) { | $hold_count = count($this->hold); | ||||
if (!$work_count && !$wait_count && $hold_count) { | |||||
throw new Exception( | |||||
pht( | |||||
'Future graph is stalled: some futures are held, but no futures '. | |||||
'are waiting or working. The graph can never resolve.')); | |||||
} | |||||
// Figure out how many futures we can start. If we don't have a limit, | |||||
// we can start every waiting future. If we do have a limit, we can only | |||||
// start as many futures as we have slots for. | |||||
if ($limit) { | |||||
$work_limit = min($limit, $wait_count); | |||||
} else { | |||||
$work_limit = $wait_count; | |||||
} | |||||
// If we're ready to start futures, start them now. | |||||
if ($work_limit) { | |||||
foreach ($this->wait as $future_key) { | |||||
$this->moveFutureToWork($future_key); | |||||
$work_limit--; | |||||
if (!$work_limit) { | |||||
return; | return; | ||||
} | } | ||||
} | |||||
} | |||||
$old = $this->work; | } | ||||
$this->work = array_slice($this->wait, 0, $this->limit, true); | |||||
// If we're using a limit, our futures are sleeping and need to be polled | private function canMoveFutureToWait($future_key) { | ||||
// to begin execution, so poll any futures which weren't in our working set | return true; | ||||
// before. | |||||
foreach ($this->work as $work => $key) { | |||||
if (!isset($old[$work])) { | |||||
$this->futures[$key]->isReady(); | |||||
} | } | ||||
private function moveFutureToWait($future_key) { | |||||
unset($this->hold[$future_key]); | |||||
$this->wait[$future_key] = $future_key; | |||||
} | } | ||||
private function moveFutureToWork($future_key) { | |||||
unset($this->wait[$future_key]); | |||||
$this->work[$future_key] = $future_key; | |||||
$this->futures[$future_key]->startFuture(); | |||||
} | } | ||||
private function moveFutureToDone($future_key) { | |||||
$this->key = $future_key; | |||||
unset($this->work[$future_key]); | |||||
// Before we return, do another working set update so we start any | |||||
// futures that are ready to go as soon as we can. | |||||
$this->updateWorkingSet(); | |||||
$this->futures[$future_key]->endFuture(); | |||||
} | |||||
/** | /** | ||||
* Wait for activity on one of several sockets. | * Wait for activity on one of several sockets. | ||||
* | * | ||||
* @param list List of sockets expected to become readable. | * @param list List of sockets expected to become readable. | ||||
* @param list List of sockets expected to become writable. | * @param list List of sockets expected to become writable. | ||||
* @param float Timeout, in seconds. | * @param float Timeout, in seconds. | ||||
* @return void | * @return void | ||||
▲ Show 20 Lines • Show All 47 Lines • Show Last 20 Lines |