Changeset View
Changeset View
Standalone View
Standalone View
src/future/Future.php
| <?php | <?php | ||||
| /** | /** | ||||
| * A 'future' or 'promise' is an object which represents the result of some | * A 'future' or 'promise' is an object which represents the result of some | ||||
| * pending computation. For a more complete overview of futures, see | * pending computation. For a more complete overview of futures, see | ||||
| * @{article:Using Futures}. | * @{article:Using Futures}. | ||||
| */ | */ | ||||
| abstract class Future extends Phobject { | abstract class Future extends Phobject { | ||||
| protected static $handlerInstalled = null; | |||||
| protected $result; | protected $result; | ||||
| protected $exception; | protected $exception; | ||||
| /** | /** | ||||
| * Is this future's process complete? Specifically, can this future be | * Is this future's process complete? Specifically, can this future be | ||||
| * resolved without blocking? | * resolved without blocking? | ||||
| * | * | ||||
| * @return bool If true, the external process is complete and resolving this | * @return bool If true, the external process is complete and resolving this | ||||
| Show All 12 Lines | public function resolve() { | ||||
| if (count($args)) { | if (count($args)) { | ||||
| throw new Exception( | throw new Exception( | ||||
| pht( | pht( | ||||
| 'Parameter "timeout" to "Future->resolve()" is no longer '. | 'Parameter "timeout" to "Future->resolve()" is no longer '. | ||||
| 'supported. Update the caller so it no longer passes a '. | 'supported. Update the caller so it no longer passes a '. | ||||
| 'timeout.')); | 'timeout.')); | ||||
| } | } | ||||
| $wait = $this->getDefaultWait(); | $graph = new FutureIterator(array($this)); | ||||
| do { | $graph->resolveAll(); | ||||
| $this->checkException(); | |||||
| if ($this->isReady()) { | |||||
| break; | |||||
| } | |||||
| $read = $this->getReadSockets(); | |||||
| $write = $this->getWriteSockets(); | |||||
| if ($read || $write) { | if ($this->exception) { | ||||
| self::waitForSockets($read, $write, $wait); | throw $this->exception; | ||||
| } | } | ||||
| } while (true); | |||||
| $this->checkException(); | |||||
| return $this->getResult(); | return $this->getResult(); | ||||
| } | } | ||||
| public function setException(Exception $ex) { | public function setException(Exception $ex) { | ||||
| $this->exception = $ex; | $this->exception = $ex; | ||||
| return $this; | return $this; | ||||
| } | } | ||||
| public function getException() { | public function getException() { | ||||
| return $this->exception; | return $this->exception; | ||||
| } | } | ||||
| /** | |||||
| * If an exception was set by setException(), throw it. | |||||
| */ | |||||
| private function checkException() { | |||||
| if ($this->exception) { | |||||
| throw $this->exception; | |||||
| } | |||||
| } | |||||
| /** | /** | ||||
| * Retrieve a list of sockets which we can wait to become readable while | * Retrieve a list of sockets which we can wait to become readable while | ||||
| * a future is resolving. If your future has sockets which can be | * a future is resolving. If your future has sockets which can be | ||||
| * `select()`ed, return them here (or in @{method:getWriteSockets}) to make | * `select()`ed, return them here (or in @{method:getWriteSockets}) to make | ||||
| * the resolve loop do a `select()`. If you do not return sockets in either | * the resolve loop do a `select()`. If you do not return sockets in either | ||||
| * case, you'll get a busy wait. | * case, you'll get a busy wait. | ||||
| * | * | ||||
| * @return list A list of sockets which we expect to become readable. | * @return list A list of sockets which we expect to become readable. | ||||
| Show All 10 Lines | abstract class Future extends Phobject { | ||||
| * @return list A list of sockets which we expect to become writable. | * @return list A list of sockets which we expect to become writable. | ||||
| */ | */ | ||||
| public function getWriteSockets() { | public function getWriteSockets() { | ||||
| return array(); | return array(); | ||||
| } | } | ||||
| /** | /** | ||||
| * Wait for activity on one of several sockets. | |||||
| * | |||||
| * @param list List of sockets expected to become readable. | |||||
| * @param list List of sockets expected to become writable. | |||||
| * @param float Timeout, in seconds. | |||||
| * @return void | |||||
| */ | |||||
| public static function waitForSockets( | |||||
| array $read_list, | |||||
| array $write_list, | |||||
| $timeout = 1) { | |||||
| if (!self::$handlerInstalled) { | |||||
| // If we're spawning child processes, we need to install a signal handler | |||||
| // here to catch cases like execing '(sleep 60 &) &' where the child | |||||
| // exits but a socket is kept open. But we don't actually need to do | |||||
| // anything because the SIGCHLD will interrupt the stream_select(), as | |||||
| // long as we have a handler registered. | |||||
| if (function_exists('pcntl_signal')) { | |||||
| if (!pcntl_signal(SIGCHLD, array(__CLASS__, 'handleSIGCHLD'))) { | |||||
| throw new Exception(pht('Failed to install signal handler!')); | |||||
| } | |||||
| } | |||||
| self::$handlerInstalled = true; | |||||
| } | |||||
| $timeout_sec = (int)$timeout; | |||||
| $timeout_usec = (int)(1000000 * ($timeout - $timeout_sec)); | |||||
| $exceptfds = array(); | |||||
| $ok = @stream_select( | |||||
| $read_list, | |||||
| $write_list, | |||||
| $exceptfds, | |||||
| $timeout_sec, | |||||
| $timeout_usec); | |||||
| if ($ok === false) { | |||||
| // Hopefully, means we received a SIGCHLD. In the worst case, we degrade | |||||
| // to a busy wait. | |||||
| } | |||||
| } | |||||
| public static function handleSIGCHLD($signo) { | |||||
| // This function is a dummy, we just need to have some handler registered | |||||
| // so that PHP will get interrupted during stream_select(). If we don't | |||||
| // register a handler, stream_select() won't fail. | |||||
| } | |||||
| /** | |||||
| * Retrieve the final result of the future. This method will be called after | * Retrieve the final result of the future. This method will be called after | ||||
| * the future is ready (as per @{method:isReady}) but before results are | * the future is ready (as per @{method:isReady}) but before results are | ||||
| * passed back to the caller. The major use of this function is that you can | * passed back to the caller. The major use of this function is that you can | ||||
| * override it in subclasses to do postprocessing or error checking, which is | * override it in subclasses to do postprocessing or error checking, which is | ||||
| * particularly useful if building application-specific futures on top of | * particularly useful if building application-specific futures on top of | ||||
| * primitive transport futures (like @{class:CurlFuture} and | * primitive transport futures (like @{class:CurlFuture} and | ||||
| * @{class:ExecFuture}) which can make it tricky to hook this logic into the | * @{class:ExecFuture}) which can make it tricky to hook this logic into the | ||||
| * main pipeline. | * main pipeline. | ||||
| Show All 22 Lines | |||||