diff --git a/src/future/Future.php b/src/future/Future.php index 8078aacc..b8656bd6 100644 --- a/src/future/Future.php +++ b/src/future/Future.php @@ -1,257 +1,277 @@ resolve()" is no longer '. 'supported. Update the caller so it no longer passes a '. 'timeout.')); } - if (!$this->hasResult() && !$this->hasException()) { + if (!$this->canResolve()) { $graph = new FutureIterator(array($this)); $graph->resolveAll(); } if ($this->hasException()) { throw $this->getException(); } return $this->getResult(); } - final public function startFuture() { - if ($this->hasStarted) { - throw new Exception( - pht( - 'Future has already started; futures can not start more '. - 'than once.')); - } - $this->hasStarted = true; - - $this->startServiceProfiler(); - $this->updateFuture(); - } - final public function updateFuture() { - if ($this->hasException()) { - return; - } - - if ($this->hasResult()) { + if ($this->canResolve()) { return; } try { $this->isReady(); } catch (Exception $ex) { $this->setException($ex); } catch (Throwable $ex) { $this->setException($ex); } } - final public function endFuture() { - if (!$this->hasException() && !$this->hasResult()) { - throw new Exception( - pht( - 'Trying to end a future which has no exception and no result. '. - 'Futures must resolve before they can be ended.')); - } - - if ($this->hasEnded) { - throw new Exception( - pht( - 'Future has already ended; futures can not end more '. - 'than once.')); - } - $this->hasEnded = true; - - $this->endServiceProfiler(); - } - private function startServiceProfiler() { // NOTE: This is a soft dependency so that we don't need to build the // ServiceProfiler into the Phage agent. Normally, this class is always // available. if (!class_exists('PhutilServiceProfiler')) { return; } $params = $this->getServiceProfilerStartParameters(); if ($params === null) { return; } $profiler = PhutilServiceProfiler::getInstance(); $call_id = $profiler->beginServiceCall($params); $this->serviceProfilerCallID = $call_id; } private function endServiceProfiler() { $call_id = $this->serviceProfilerCallID; if ($call_id === null) { return; } $params = $this->getServiceProfilerResultParameters(); $profiler = PhutilServiceProfiler::getInstance(); $profiler->endServiceCall($call_id, $params); } protected function getServiceProfilerStartParameters() { return array(); } protected function getServiceProfilerResultParameters() { return array(); } /** * Retrieve a list of sockets which we can wait to become readable while * a future is resolving. If your future has sockets which can be * `select()`ed, return them here (or in @{method:getWriteSockets}) to make * the resolve loop do a `select()`. If you do not return sockets in either * case, you'll get a busy wait. * * @return list A list of sockets which we expect to become readable. */ public function getReadSockets() { return array(); } /** * Retrieve a list of sockets which we can wait to become writable while a * future is resolving. See @{method:getReadSockets}. * * @return list A list of sockets which we expect to become writable. */ public function getWriteSockets() { return array(); } /** * Default amount of time to wait on stream select for this future. Normally * 1 second is fine, but if the future has a timeout sooner than that it * should return the amount of time left before the timeout. */ public function getDefaultWait() { return 1; } public function start() { - $this->isReady(); + if ($this->hasStarted) { + throw new Exception( + pht( + 'Future has already started; futures can not start more '. + 'than once.')); + } + $this->hasStarted = true; + + $this->startServiceProfiler(); + + $this->updateFuture(); + + if ($this->raiseExceptionOnStart) { + if ($this->hasException()) { + throw $this->getException(); + } + } + return $this; } /** * Retrieve the final result of the future. * * @return wild Final resolution of this future. */ final protected function getResult() { if (!$this->hasResult()) { throw new Exception( pht( 'Future has not yet resolved. Resolve futures before retrieving '. 'results.')); } return $this->result; } final protected function setResult($result) { if ($this->hasResult()) { throw new Exception( pht( 'Future has already resolved. Futures may not resolve more than '. 'once.')); } $this->hasResult = true; $this->result = $result; + $this->endFuture(); + return $this; } final public function hasResult() { return $this->hasResult; } - final private function setException($exception) { + private function setException($exception) { // NOTE: The parameter may be an Exception or a Throwable. $this->exception = $exception; + + $this->endFuture(); + return $this; } - final private function getException() { + private function getException() { return $this->exception; } final public function hasException() { return ($this->exception !== null); } final public function setFutureKey($key) { if ($this->futureKey !== null) { throw new Exception( pht( 'Future already has a key ("%s") assigned.', $key)); } $this->futureKey = $key; return $this; } final public function getFutureKey() { if ($this->futureKey === null) { $this->futureKey = sprintf('Future/%d', self::$nextKey++); } return $this->futureKey; } + final public function setRaiseExceptionOnStart($raise) { + $this->raiseExceptionOnStart = $raise; + return $this; + } + + final public function getHasFutureStarted() { + return $this->hasStarted; + } + + final public function canResolve() { + if ($this->hasResult()) { + return true; + } + + if ($this->hasException()) { + return true; + } + + return false; + } + + private function endFuture() { + if ($this->hasEnded) { + throw new Exception( + pht( + 'Future has already ended; futures can not end more '. + 'than once.')); + } + $this->hasEnded = true; + + $this->endServiceProfiler(); + } + } diff --git a/src/future/FutureIterator.php b/src/future/FutureIterator.php index e33282c8..028c02da 100644 --- a/src/future/FutureIterator.php +++ b/src/future/FutureIterator.php @@ -1,464 +1,463 @@ new ExecFuture('wc -c a.txt'), * 'b.txt' => new ExecFuture('wc -c b.txt'), * 'c.txt' => new ExecFuture('wc -c c.txt'), * ); * * foreach (new FutureIterator($futures) as $key => $future) { * // IMPORTANT: keys are preserved but the order of elements is not. This * // construct iterates over the futures in the order they resolve, so the * // fastest future is the one you'll get first. This allows you to start * // doing followup processing as soon as possible. * * list($err, $stdout) = $future->resolve(); * do_some_processing($stdout); * } * * For a general overview of futures, see @{article:Using Futures}. * * @task basics Basics * @task config Configuring Iteration * @task iterator Iterator Interface * @task internal Internals */ final class FutureIterator extends Phobject implements Iterator { private $hold = array(); private $wait = array(); private $work = array(); private $futures = array(); private $key; private $limit; private $timeout; private $isTimeout = false; private $hasRewound = false; /* -( Basics )------------------------------------------------------------- */ /** * Create a new iterator over a list of futures. * * @param list List of @{class:Future}s to resolve. * @task basics */ public function __construct(array $futures) { assert_instances_of($futures, 'Future'); foreach ($futures as $map_key => $future) { $future->setFutureKey($map_key); $this->addFuture($future); } } /** * Block until all futures resolve. * * @return void * @task basics */ public function resolveAll() { // If a caller breaks out of a "foreach" and then calls "resolveAll()", // interpret it to mean that we should iterate over whatever futures // remain. if ($this->hasRewound) { while ($this->valid()) { $this->next(); } } else { iterator_to_array($this); } } /** * Add another future to the set of futures. This is useful if you have a * set of futures to run mostly in parallel, but some futures depend on * others. * * @param Future @{class:Future} to add to iterator * @task basics */ public function addFuture(Future $future) { $key = $future->getFutureKey(); if (isset($this->futures[$key])) { throw new Exception( pht( 'This future graph already has a future with key "%s". Each '. 'future must have a unique key.', $key)); } $this->futures[$key] = $future; $this->hold[$key] = $key; return $this; } /* -( Configuring Iteration )---------------------------------------------- */ /** * Set a maximum amount of time you want to wait before the iterator will * yield a result. If no future has resolved yet, the iterator will yield * null for key and value. Among other potential uses, you can use this to * show some busy indicator: * * $futures = id(new FutureIterator($futures)) * ->setUpdateInterval(1); * foreach ($futures as $future) { * if ($future === null) { * echo "Still working...\n"; * } else { * // ... * } * } * * This will echo "Still working..." once per second as long as futures are * resolving. By default, FutureIterator never yields null. * * @param float Maximum number of seconds to block waiting on futures before * yielding null. * @return this * * @task config */ public function setUpdateInterval($interval) { $this->timeout = $interval; return $this; } /** * Limit the number of simultaneously executing futures. * * $futures = id(new FutureIterator($futures)) * ->limit(4); * foreach ($futures as $future) { * // Run no more than 4 futures simultaneously. * } * * @param int Maximum number of simultaneous jobs allowed. * @return this * * @task config */ public function limit($max) { $this->limit = $max; return $this; } public function setMaximumWorkingSetSize($limit) { $this->limit = $limit; return $this; } public function getMaximumWorkingSetSize() { return $this->limit; } /* -( Iterator Interface )------------------------------------------------- */ /** * @task iterator */ public function rewind() { if ($this->hasRewound) { throw new Exception( pht('Future graphs can not be rewound.')); } $this->hasRewound = true; $this->next(); } /** * @task iterator */ public function next() { $this->key = null; $this->updateWorkingSet(); if (!$this->work) { return; } $start = microtime(true); $timeout = $this->timeout; $this->isTimeout = false; $working_set = array_select_keys($this->futures, $this->work); while (true) { // Update every future first. This is a no-op on futures which have // already resolved or failed, but we want to give futures an // opportunity to make progress even if we can resolve something. foreach ($working_set as $future_key => $future) { $future->updateFuture(); } // Check if any future has resolved or failed. If we have any such // futures, we'll return the first one from the iterator. $resolve_key = null; foreach ($working_set as $future_key => $future) { - if ($future->hasException()) { - $resolve_key = $future_key; - break; - } - - if ($future->hasResult()) { + if ($future->canResolve()) { $resolve_key = $future_key; break; } } // We've found a future to resolve, so we're done here for now. if ($resolve_key !== null) { $this->moveFutureToDone($resolve_key); return; } // We don't have any futures to resolve yet. Check if we're reached // an update interval. $wait_time = 1; if ($timeout !== null) { $elapsed = microtime(true) - $start; if ($elapsed > $timeout) { $this->isTimeout = true; return; } $wait_time = min($wait_time, $timeout - $elapsed); } // We're going to wait. If possible, we'd like to wait with sockets. // If we can't, we'll just sleep. $read_sockets = array(); $write_sockets = array(); foreach ($working_set as $future_key => $future) { $sockets = $future->getReadSockets(); foreach ($sockets as $socket) { $read_sockets[] = $socket; } $sockets = $future->getWriteSockets(); foreach ($sockets as $socket) { $write_sockets[] = $socket; } } $use_sockets = ($read_sockets || $write_sockets); if ($use_sockets) { foreach ($working_set as $future) { $wait_time = min($wait_time, $future->getDefaultWait()); } $this->waitForSockets($read_sockets, $write_sockets, $wait_time); } else { usleep(1000); } } } /** * @task iterator */ public function current() { if ($this->isTimeout) { return null; } return $this->futures[$this->key]; } /** * @task iterator */ public function key() { if ($this->isTimeout) { return null; } return $this->key; } /** * @task iterator */ public function valid() { if ($this->isTimeout) { return true; } return ($this->key !== null); } /* -( Internals )---------------------------------------------------------- */ /** * @task internal */ protected function updateWorkingSet() { $limit = $this->getMaximumWorkingSetSize(); $work_count = count($this->work); // If we're already working on the maximum number of futures, we just have // to wait for something to resolve. There's no benefit to updating the // queue since we can never make any meaningful progress. if ($limit) { if ($work_count >= $limit) { return; } } // If any futures that are currently held are no longer blocked by // dependencies, move them from "hold" to "wait". foreach ($this->hold as $future_key) { if (!$this->canMoveFutureToWait($future_key)) { continue; } $this->moveFutureToWait($future_key); } $wait_count = count($this->wait); $hold_count = count($this->hold); if (!$work_count && !$wait_count && $hold_count) { throw new Exception( pht( 'Future graph is stalled: some futures are held, but no futures '. 'are waiting or working. The graph can never resolve.')); } // Figure out how many futures we can start. If we don't have a limit, // we can start every waiting future. If we do have a limit, we can only // start as many futures as we have slots for. if ($limit) { $work_limit = min($limit, $wait_count); } else { $work_limit = $wait_count; } // If we're ready to start futures, start them now. if ($work_limit) { foreach ($this->wait as $future_key) { $this->moveFutureToWork($future_key); $work_limit--; if (!$work_limit) { return; } } } } private function canMoveFutureToWait($future_key) { return true; } private function moveFutureToWait($future_key) { unset($this->hold[$future_key]); $this->wait[$future_key] = $future_key; } private function moveFutureToWork($future_key) { unset($this->wait[$future_key]); $this->work[$future_key] = $future_key; - $this->futures[$future_key]->startFuture(); + $future = $this->futures[$future_key]; + + if (!$future->getHasFutureStarted()) { + $future + ->setRaiseExceptionOnStart(false) + ->start(); + } } private function moveFutureToDone($future_key) { $this->key = $future_key; unset($this->work[$future_key]); // Before we return, do another working set update so we start any // futures that are ready to go as soon as we can. $this->updateWorkingSet(); - - $this->futures[$future_key]->endFuture(); } /** * Wait for activity on one of several sockets. * * @param list List of sockets expected to become readable. * @param list List of sockets expected to become writable. * @param float Timeout, in seconds. * @return void */ private function waitForSockets( array $read_list, array $write_list, $timeout = 1.0) { static $handler_installed = false; if (!$handler_installed) { // If we're spawning child processes, we need to install a signal handler // here to catch cases like execing '(sleep 60 &) &' where the child // exits but a socket is kept open. But we don't actually need to do // anything because the SIGCHLD will interrupt the stream_select(), as // long as we have a handler registered. if (function_exists('pcntl_signal')) { if (!pcntl_signal(SIGCHLD, array(__CLASS__, 'handleSIGCHLD'))) { throw new Exception(pht('Failed to install signal handler!')); } } $handler_installed = true; } $timeout_sec = (int)$timeout; $timeout_usec = (int)(1000000 * ($timeout - $timeout_sec)); $exceptfds = array(); $ok = @stream_select( $read_list, $write_list, $exceptfds, $timeout_sec, $timeout_usec); if ($ok === false) { // Hopefully, means we received a SIGCHLD. In the worst case, we degrade // to a busy wait. } } public static function handleSIGCHLD($signo) { // This function is a dummy, we just need to have some handler registered // so that PHP will get interrupted during "stream_select()". If we don't // register a handler, "stream_select()" won't fail. } } diff --git a/src/future/exec/ExecFuture.php b/src/future/exec/ExecFuture.php index 0264d4d3..ecce090c 100644 --- a/src/future/exec/ExecFuture.php +++ b/src/future/exec/ExecFuture.php @@ -1,994 +1,1017 @@ array('pipe', 'r'), // stdin 1 => array('pipe', 'w'), // stdout 2 => array('pipe', 'w'), // stderr ); protected function didConstruct() { $this->stdin = new PhutilRope(); } /* -( Command Information )------------------------------------------------ */ /** * Retrieve the byte limit for the stderr buffer. * * @return int Maximum buffer size, in bytes. * @task info */ public function getStderrSizeLimit() { return $this->stderrSizeLimit; } /** * Retrieve the byte limit for the stdout buffer. * * @return int Maximum buffer size, in bytes. * @task info */ public function getStdoutSizeLimit() { return $this->stdoutSizeLimit; } /** * Get the process's pid. This only works after execution is initiated, e.g. * by a call to start(). * * @return int Process ID of the executing process. * @task info */ public function getPID() { $status = $this->procGetStatus(); return $status['pid']; } + public function hasPID() { + if ($this->procStatus) { + return true; + } + + if ($this->proc) { + return true; + } + + return false; + } + /* -( Configuring Execution )---------------------------------------------- */ /** * Set a maximum size for the stdout read buffer. To limit stderr, see * @{method:setStderrSizeLimit}. The major use of these methods is to use less * memory if you are running a command which sometimes produces huge volumes * of output that you don't really care about. * * NOTE: Setting this to 0 means "no buffer", not "unlimited buffer". * * @param int Maximum size of the stdout read buffer. * @return this * @task config */ public function setStdoutSizeLimit($limit) { $this->stdoutSizeLimit = $limit; return $this; } /** * Set a maximum size for the stderr read buffer. * See @{method:setStdoutSizeLimit} for discussion. * * @param int Maximum size of the stderr read buffer. * @return this * @task config */ public function setStderrSizeLimit($limit) { $this->stderrSizeLimit = $limit; return $this; } /** * Set the maximum internal read buffer size this future. The future will * block reads once the internal stdout or stderr buffer exceeds this size. * * NOTE: If you @{method:resolve} a future with a read buffer limit, you may * block forever! * * TODO: We should probably release the read buffer limit during * @{method:resolve}, or otherwise detect this. For now, be careful. * * @param int|null Maximum buffer size, or `null` for unlimited. * @return this */ public function setReadBufferSize($read_buffer_size) { $this->readBufferSize = $read_buffer_size; return $this; } /* -( Interacting With Commands )------------------------------------------ */ /** * Read and return output from stdout and stderr, if any is available. This * method keeps a read cursor on each stream, but the entire streams are * still returned when the future resolves. You can call read() again after * resolving the future to retrieve only the parts of the streams you did not * previously read: * * $future = new ExecFuture('...'); * // ... * list($stdout) = $future->read(); // Returns output so far * list($stdout) = $future->read(); // Returns new output since first call * // ... * list($stdout) = $future->resolvex(); // Returns ALL output * list($stdout) = $future->read(); // Returns unread output * * NOTE: If you set a limit with @{method:setStdoutSizeLimit} or * @{method:setStderrSizeLimit}, this method will not be able to read data * past the limit. * * NOTE: If you call @{method:discardBuffers}, all the stdout/stderr data * will be thrown away and the cursors will be reset. * * @return pair <$stdout, $stderr> pair with new output since the last call * to this method. * @task interact */ public function read() { $stdout = $this->readStdout(); $result = array( $stdout, (string)substr($this->stderr, $this->stderrPos), ); $this->stderrPos = strlen($this->stderr); return $result; } public function readStdout() { if ($this->start) { - $this->isReady(); // Sync + $this->updateFuture(); // Sync } $result = (string)substr($this->stdout, $this->stdoutPos); $this->stdoutPos = strlen($this->stdout); return $result; } /** * Write data to stdin of the command. * * @param string Data to write. * @param bool If true, keep the pipe open for writing. By default, the pipe * will be closed as soon as possible so that commands which * listen for EOF will execute. If you want to keep the pipe open * past the start of command execution, do an empty write with * `$keep_pipe = true` first. * @return this * @task interact */ public function write($data, $keep_pipe = false) { if (strlen($data)) { if (!$this->stdin) { throw new Exception(pht('Writing to a closed pipe!')); } $this->stdin->append($data); } $this->closePipe = !$keep_pipe; return $this; } /** * Permanently discard the stdout and stderr buffers and reset the read * cursors. This is basically useful only if you are streaming a large amount * of data from some process. * * Conceivably you might also need to do this if you're writing a client using * @{class:ExecFuture} and `netcat`, but you probably should not do that. * * NOTE: This completely discards the data. It won't be available when the * future resolves. This is almost certainly only useful if you need the * buffer memory for some reason. * * @return this * @task interact */ public function discardBuffers() { $this->discardStdoutBuffer(); $this->stderr = ''; $this->stderrPos = 0; return $this; } public function discardStdoutBuffer() { $this->stdout = ''; $this->stdoutPos = 0; return $this; } /** * Returns true if this future was killed by a timeout configured with * @{method:setTimeout}. * * @return bool True if the future was killed for exceeding its time limit. */ public function getWasKilledByTimeout() { return $this->killedByTimeout; } /* -( Configuring Execution )---------------------------------------------- */ /** * Set a hard limit on execution time. If the command runs longer, it will * be terminated and the future will resolve with an error code. You can test * if a future was killed by a timeout with @{method:getWasKilledByTimeout}. * * The subprocess will be sent a `TERM` signal, and then a `KILL` signal a * short while later if it fails to exit. * * @param int Maximum number of seconds this command may execute for before * it is signaled. * @return this * @task config */ public function setTimeout($seconds) { $this->terminateTimeout = $seconds; $this->killTimeout = $seconds + min($seconds, 60); return $this; } /* -( Resolving Execution )------------------------------------------------ */ /** * Resolve a command you expect to exit with return code 0. Works like * @{method:resolve}, but throws if $err is nonempty. Returns only * $stdout and $stderr. See also @{function:execx}. * * list($stdout, $stderr) = $future->resolvex(); * * @param float Optional timeout after which resolution will pause and * execution will return to the caller. * @return pair <$stdout, $stderr> pair. * @task resolve */ public function resolvex() { $result = $this->resolve(); return $this->raiseResultError($result); } /** * Resolve a command you expect to return valid JSON. Works like * @{method:resolvex}, but also throws if stderr is nonempty, or stdout is not * valid JSON. Returns a PHP array, decoded from the JSON command output. * * @param float Optional timeout after which resolution will pause and * execution will return to the caller. * @return array PHP array, decoded from JSON command output. * @task resolve */ public function resolveJSON() { list($stdout, $stderr) = $this->resolvex(); if (strlen($stderr)) { $cmd = $this->getCommand(); throw new CommandException( pht( "JSON command '%s' emitted text to stderr when none was expected: %d", $cmd, $stderr), $cmd, 0, $stdout, $stderr); } try { return phutil_json_decode($stdout); } catch (PhutilJSONParserException $ex) { $cmd = $this->getCommand(); throw new CommandException( pht( "JSON command '%s' did not produce a valid JSON object on stdout: %s", $cmd, $stdout), $cmd, 0, $stdout, $stderr); } } /** * Resolve the process by abruptly terminating it. * * @return list List of results. * @task resolve */ public function resolveKill() { if (!$this->hasResult()) { $signal = 9; if ($this->proc) { proc_terminate($this->proc, $signal); } $this->closeProcess(); $result = array( 128 + $signal, $this->stdout, $this->stderr, ); $this->recordResult($result); } return $this->getResult(); } private function recordResult(array $result) { $resolve_on_error = $this->getResolveOnError(); if (!$resolve_on_error) { $result = $this->raiseResultError($result); } $this->setResult($result); } private function raiseResultError($result) { list($err, $stdout, $stderr) = $result; if ($err) { $cmd = $this->getCommand(); if ($this->getWasKilledByTimeout()) { // NOTE: The timeout can be a float and PhutilNumber only handles // integers, so just use "%s" to render it. $message = pht( 'Command killed by timeout after running for more than %s seconds.', $this->terminateTimeout); } else { $message = pht('Command failed with error #%d!', $err); } throw new CommandException( $message, $cmd, $err, $stdout, $stderr); } return array($stdout, $stderr); } /* -( Internals )---------------------------------------------------------- */ /** * Provides read sockets to the future core. * * @return list List of read sockets. * @task internal */ public function getReadSockets() { list($stdin, $stdout, $stderr) = $this->pipes; $sockets = array(); if (isset($stdout) && !feof($stdout)) { $sockets[] = $stdout; } if (isset($stderr) && !feof($stderr)) { $sockets[] = $stderr; } return $sockets; } /** * Provides write sockets to the future core. * * @return list List of write sockets. * @task internal */ public function getWriteSockets() { list($stdin, $stdout, $stderr) = $this->pipes; $sockets = array(); if (isset($stdin) && $this->stdin->getByteLength() && !feof($stdin)) { $sockets[] = $stdin; } return $sockets; } /** * Determine if the read buffer is empty. * * @return bool True if the read buffer is empty. * @task internal */ public function isReadBufferEmpty() { return !strlen($this->stdout); } /** * Determine if the write buffer is empty. * * @return bool True if the write buffer is empty. * @task internal */ public function isWriteBufferEmpty() { return !$this->getWriteBufferSize(); } /** * Determine the number of bytes in the write buffer. * * @return int Number of bytes in the write buffer. * @task internal */ public function getWriteBufferSize() { if (!$this->stdin) { return 0; } return $this->stdin->getByteLength(); } /** * Reads some bytes from a stream, discarding output once a certain amount * has been accumulated. * * @param resource Stream to read from. * @param int Maximum number of bytes to return from $stream. If * additional bytes are available, they will be read and * discarded. * @param string Human-readable description of stream, for exception * message. * @param int Maximum number of bytes to read. * @return string The data read from the stream. * @task internal */ private function readAndDiscard($stream, $limit, $description, $length) { $output = ''; if ($length <= 0) { return ''; } do { $data = fread($stream, min($length, 64 * 1024)); if (false === $data) { throw new Exception(pht('Failed to read from %s', $description)); } $read_bytes = strlen($data); if ($read_bytes > 0 && $limit > 0) { if ($read_bytes > $limit) { $data = substr($data, 0, $limit); } $output .= $data; $limit -= strlen($data); } if (strlen($output) >= $length) { break; } } while ($read_bytes > 0); return $output; } /** * Begin or continue command execution. * * @return bool True if future has resolved. * @task internal */ public function isReady() { // NOTE: We have a soft dependencies on PhutilErrorTrap here, to avoid // the need to build it into the Phage agent. Under normal circumstances, // this class are always available. if (!$this->pipes) { $is_windows = phutil_is_windows(); if (!$this->start) { // We might already have started the timer via initiating resolution. $this->start = microtime(true); } $unmasked_command = $this->getCommand(); $unmasked_command = $unmasked_command->getUnmaskedString(); $pipes = array(); if ($this->hasEnv()) { $env = $this->getEnv(); } else { $env = null; } $cwd = $this->getCWD(); // NOTE: See note above about Phage. if (class_exists('PhutilErrorTrap')) { $trap = new PhutilErrorTrap(); } else { $trap = null; } $spec = self::$descriptorSpec; if ($is_windows) { $stdout_file = new TempFile(); $stderr_file = new TempFile(); $stdout_handle = fopen($stdout_file, 'wb'); if (!$stdout_handle) { throw new Exception( pht( 'Unable to open stdout temporary file ("%s") for writing.', $stdout_file)); } $stderr_handle = fopen($stderr_file, 'wb'); if (!$stderr_handle) { throw new Exception( pht( 'Unable to open stderr temporary file ("%s") for writing.', $stderr_file)); } $spec = array( 0 => self::$descriptorSpec[0], 1 => $stdout_handle, 2 => $stderr_handle, ); } $proc = @proc_open( $unmasked_command, $spec, $pipes, $cwd, $env, array( 'bypass_shell' => true, )); if ($trap) { $err = $trap->getErrorsAsString(); $trap->destroy(); } else { $err = error_get_last(); if ($err) { $err = $err['message']; } } if ($is_windows) { fclose($stdout_handle); fclose($stderr_handle); } if (!is_resource($proc)) { // When you run an invalid command on a Linux system, the "proc_open()" // works and then the process (really a "/bin/sh -c ...") exits after // it fails to resolve the command. // When you run an invalid command on a Windows system, we bypass the // shell and the "proc_open()" itself fails. See also T13504. Fail the // future immediately, acting as though it exited with an error code // for consistency with Linux. $result = array( 1, '', pht( 'Call to "proc_open()" to open a subprocess failed: %s', $err), ); $this->recordResult($result); return true; } if ($is_windows) { $stdout_handle = fopen($stdout_file, 'rb'); if (!$stdout_handle) { throw new Exception( pht( 'Unable to open stdout temporary file ("%s") for reading.', $stdout_file)); } $stderr_handle = fopen($stderr_file, 'rb'); if (!$stderr_handle) { throw new Exception( pht( 'Unable to open stderr temporary file ("%s") for reading.', $stderr_file)); } $pipes = array( 0 => $pipes[0], 1 => $stdout_handle, 2 => $stderr_handle, ); $this->windowsStdoutTempFile = $stdout_file; $this->windowsStderrTempFile = $stderr_file; } $this->pipes = $pipes; $this->proc = $proc; list($stdin, $stdout, $stderr) = $pipes; if (!$is_windows) { // On Windows, we redirect process standard output and standard error // through temporary files. Files don't block, so we don't need to make // these streams nonblocking. if ((!stream_set_blocking($stdout, false)) || (!stream_set_blocking($stderr, false)) || (!stream_set_blocking($stdin, false))) { $this->__destruct(); throw new Exception(pht('Failed to set streams nonblocking.')); } } $this->tryToCloseStdin(); return false; } if (!$this->proc) { return true; } list($stdin, $stdout, $stderr) = $this->pipes; while (isset($this->stdin) && $this->stdin->getByteLength()) { $write_segment = $this->stdin->getAnyPrefix(); try { $bytes = fwrite($stdin, $write_segment); } catch (RuntimeException $ex) { // If the subprocess has exited, we may get a broken pipe error here // in recent versions of PHP. There does not seem to be any way to // get the actual error code other than reading the exception string. // For now, treat this as if writes are blocked. break; } if ($bytes === false) { throw new Exception(pht('Unable to write to stdin!')); } else if ($bytes) { $this->stdin->removeBytesFromHead($bytes); } else { // Writes are blocked for now. break; } } $this->tryToCloseStdin(); // Read status before reading pipes so that we can never miss data that // arrives between our last read and the process exiting. $status = $this->procGetStatus(); $read_buffer_size = $this->readBufferSize; $max_stdout_read_bytes = PHP_INT_MAX; $max_stderr_read_bytes = PHP_INT_MAX; if ($read_buffer_size !== null) { $max_stdout_read_bytes = $read_buffer_size - strlen($this->stdout); $max_stderr_read_bytes = $read_buffer_size - strlen($this->stderr); } if ($max_stdout_read_bytes > 0) { $this->stdout .= $this->readAndDiscard( $stdout, $this->getStdoutSizeLimit() - strlen($this->stdout), 'stdout', $max_stdout_read_bytes); } if ($max_stderr_read_bytes > 0) { $this->stderr .= $this->readAndDiscard( $stderr, $this->getStderrSizeLimit() - strlen($this->stderr), 'stderr', $max_stderr_read_bytes); } $is_done = false; if (!$status['running']) { // We may still have unread bytes on stdout or stderr, particularly if // this future is being buffered and streamed. If we do, we don't want to // consider the subprocess to have exited until we've read everything. // See T9724 for context. if (feof($stdout) && feof($stderr)) { $is_done = true; } } if ($is_done) { $signal_info = null; // If the subprocess got nuked with `kill -9`, we get a -1 exitcode. // Upgrade this to a slightly more informative value by examining the // terminating signal code. $err = $status['exitcode']; if ($err == -1) { if ($status['signaled']) { $signo = $status['termsig']; $err = 128 + $signo; $signal_info = pht( "\n\n", phutil_get_signal_name($signo), $signo); } } $result = array( $err, $this->stdout, $signal_info.$this->stderr, ); $this->recordResult($result); $this->closeProcess(); return true; } $elapsed = (microtime(true) - $this->start); if ($this->terminateTimeout && ($elapsed >= $this->terminateTimeout)) { if (!$this->didTerminate) { $this->killedByTimeout = true; $this->sendTerminateSignal(); return false; } } if ($this->killTimeout && ($elapsed >= $this->killTimeout)) { $this->killedByTimeout = true; $this->resolveKill(); return true; } } /** * @return void * @task internal */ public function __destruct() { if (!$this->proc) { return; } // NOTE: If we try to proc_close() an open process, we hang indefinitely. To // avoid this, kill the process explicitly if it's still running. $status = $this->procGetStatus(); if ($status['running']) { $this->sendTerminateSignal(); if (!$this->waitForExit(5)) { $this->resolveKill(); } } else { $this->closeProcess(); } } /** * Close and free resources if necessary. * * @return void * @task internal */ private function closeProcess() { foreach ($this->pipes as $pipe) { if (isset($pipe)) { @fclose($pipe); } } $this->pipes = array(null, null, null); if ($this->proc) { @proc_close($this->proc); $this->proc = null; } $this->stdin = null; unset($this->windowsStdoutTempFile); unset($this->windowsStderrTempFile); } /** * Execute `proc_get_status()`, but avoid pitfalls. * * @return dict Process status. * @task internal */ private function procGetStatus() { // After the process exits, we only get one chance to read proc_get_status() // before it starts returning garbage. Make sure we don't throw away the // last good read. if ($this->procStatus) { if (!$this->procStatus['running']) { return $this->procStatus; } } + + // See T13555. This may occur if you call "getPID()" on a future which + // exited immediately without ever creating a valid subprocess. + + if (!$this->proc) { + throw new Exception( + pht( + 'Attempting to get subprocess status in "ExecFuture" with no '. + 'valid subprocess.')); + } + $this->procStatus = proc_get_status($this->proc); return $this->procStatus; } /** * Try to close stdin, if we're done using it. This keeps us from hanging if * the process on the other end of the pipe is waiting for EOF. * * @return void * @task internal */ private function tryToCloseStdin() { if (!$this->closePipe) { // We've been told to keep the pipe open by a call to write(..., true). return; } if ($this->stdin->getByteLength()) { // We still have bytes to write. return; } list($stdin) = $this->pipes; if (!$stdin) { // We've already closed stdin. return; } // There's nothing stopping us from closing stdin, so close it. @fclose($stdin); $this->pipes[0] = null; } public function getDefaultWait() { $wait = parent::getDefaultWait(); $next_timeout = $this->getNextTimeout(); if ($next_timeout) { if (!$this->start) { $this->start = microtime(true); } $elapsed = (microtime(true) - $this->start); $wait = max(0, min($next_timeout - $elapsed, $wait)); } return $wait; } private function getNextTimeout() { if ($this->didTerminate) { return $this->killTimeout; } else { return $this->terminateTimeout; } } private function sendTerminateSignal() { $this->didTerminate = true; proc_terminate($this->proc); return $this; } private function waitForExit($duration) { $start = microtime(true); while (true) { $status = $this->procGetStatus(); if (!$status['running']) { return true; } $waited = (microtime(true) - $start); if ($waited > $duration) { return false; } } } protected function getServiceProfilerStartParameters() { return array( 'type' => 'exec', 'command' => phutil_string_cast($this->getCommand()), ); } protected function getServiceProfilerResultParameters() { if ($this->hasResult()) { $result = $this->getResult(); $err = idx($result, 0); } else { $err = null; } return array( 'err' => $err, ); } } diff --git a/src/repository/graph/query/ArcanistGitCommitGraphQuery.php b/src/repository/graph/query/ArcanistGitCommitGraphQuery.php index 2491a549..53587e45 100644 --- a/src/repository/graph/query/ArcanistGitCommitGraphQuery.php +++ b/src/repository/graph/query/ArcanistGitCommitGraphQuery.php @@ -1,209 +1,209 @@ newFutures(); $this->executeIterators(); return $this->seen; } private function newFutures() { $head_hashes = $this->getHeadHashes(); $exact_hashes = $this->getExactHashes(); if (!$head_hashes && !$exact_hashes) { throw new Exception(pht('Need head hashes or exact hashes!')); } $api = $this->getRepositoryAPI(); $ref_lists = array(); if ($head_hashes) { $refs = array(); if ($head_hashes !== null) { foreach ($head_hashes as $hash) { $refs[] = $hash; } } $tail_hashes = $this->getTailHashes(); if ($tail_hashes !== null) { foreach ($tail_hashes as $tail_hash) { $refs[] = sprintf('^%s^@', $tail_hash); } } $ref_lists[] = $refs; } if ($exact_hashes !== null) { foreach ($exact_hashes as $exact_hash) { $ref_list = array(); $ref_list[] = $exact_hash; $ref_list[] = sprintf('^%s^@', $exact_hash); $ref_list[] = '--'; $ref_lists[] = $ref_list; } } $flags = array(); $min_epoch = $this->getMinimumEpoch(); if ($min_epoch !== null) { $flags[] = '--after'; $flags[] = date('c', $min_epoch); } $max_epoch = $this->getMaximumEpoch(); if ($max_epoch !== null) { $flags[] = '--before'; $flags[] = date('c', $max_epoch); } foreach ($ref_lists as $ref_list) { $ref_blob = implode("\n", $ref_list)."\n"; $fields = array( '%e', '%H', '%P', '%ct', '%B', ); $format = implode('%x02', $fields).'%x01'; $future = $api->newFuture( 'log --format=%s %Ls --stdin', $format, $flags); $future->write($ref_blob); $future->setResolveOnError(true); $this->futures[] = $future; } } private function executeIterators() { while ($this->futures || $this->iterators) { $iterator_limit = 8; while (count($this->iterators) < $iterator_limit) { if (!$this->futures) { break; } $future = array_pop($this->futures); - $future->startFuture(); + $future->start(); $iterator = id(new LinesOfALargeExecFuture($future)) ->setDelimiter("\1"); $iterator->rewind(); $iterator_key = $this->getNextIteratorKey(); $this->iterators[$iterator_key] = $iterator; } $limit = $this->getLimit(); foreach ($this->iterators as $iterator_key => $iterator) { $this->executeIterator($iterator_key, $iterator); if ($limit) { if (count($this->seen) >= $limit) { return; } } } } } private function getNextIteratorKey() { return $this->iteratorKey++; } private function executeIterator($iterator_key, $lines) { $graph = $this->getGraph(); $limit = $this->getLimit(); $is_done = false; while (true) { if (!$lines->valid()) { $is_done = true; break; } $line = $lines->current(); $lines->next(); if ($line === "\n") { continue; } $fields = explode("\2", $line); if (count($fields) !== 5) { throw new Exception( pht( 'Failed to split line "%s" from "git log".', $line)); } list($encoding, $hash, $parents, $commit_epoch, $message) = $fields; // TODO: Handle encoding, see DiffusionLowLevelCommitQuery. $node = $graph->getNode($hash); if (!$node) { $node = $graph->newNode($hash); } $this->seen[$hash] = $node; $node ->setCommitMessage($message) ->setCommitEpoch((int)$commit_epoch); if (strlen($parents)) { $parents = explode(' ', $parents); $parent_nodes = array(); foreach ($parents as $parent) { $parent_node = $graph->getNode($parent); if (!$parent_node) { $parent_node = $graph->newNode($parent); } $parent_nodes[$parent] = $parent_node; $parent_node->addChildNode($node); } $node->setParentNodes($parent_nodes); } else { $parents = array(); } if ($limit) { if (count($this->seen) >= $limit) { break; } } } if ($is_done) { unset($this->iterators[$iterator_key]); } } }