diff --git a/scripts/daemon/exec/exec_daemon.php b/scripts/daemon/exec/exec_daemon.php index ab74112..2cb206e 100755 --- a/scripts/daemon/exec/exec_daemon.php +++ b/scripts/daemon/exec/exec_daemon.php @@ -1,125 +1,127 @@ #!/usr/bin/env php setTagline(pht('daemon executor')); $args->setSynopsis(<<parse( array( array( 'name' => 'trace', 'help' => pht('Enable debug tracing.'), ), array( 'name' => 'trace-memory', 'help' => pht('Enable debug memory tracing.'), ), array( 'name' => 'verbose', 'help' => pht('Enable verbose activity logging.'), ), array( 'name' => 'label', 'short' => 'l', 'param' => 'label', 'help' => pht( 'Optional process label. Makes "%s" nicer, no behavioral effects.', 'ps'), ), array( 'name' => 'daemon', 'wildcard' => true, ), )); $trace_memory = $args->getArg('trace-memory'); $trace_mode = $args->getArg('trace') || $trace_memory; $verbose = $args->getArg('verbose'); if (function_exists('posix_isatty') && posix_isatty(STDIN)) { fprintf(STDERR, pht('Reading daemon configuration from stdin...')."\n"); } $config = @file_get_contents('php://stdin'); $config = id(new PhutilJSONParser())->parse($config); PhutilTypeSpec::checkMap( $config, array( 'log' => 'optional string|null', 'argv' => 'optional list', 'load' => 'optional list', 'autoscale' => 'optional wild', )); $log = idx($config, 'log'); if ($log) { ini_set('error_log', $log); PhutilErrorHandler::setErrorListener(array('PhutilDaemon', 'errorListener')); } $load = idx($config, 'load', array()); foreach ($load as $library) { $library = Filesystem::resolvePath($library); phutil_load_library($library); } PhutilErrorHandler::initialize(); $daemon = $args->getArg('daemon'); if (!$daemon) { throw new PhutilArgumentUsageException( pht('Specify which class of daemon to start.')); } else if (count($daemon) > 1) { throw new PhutilArgumentUsageException( pht('Specify exactly one daemon to start.')); } else { $daemon = head($daemon); if (!class_exists($daemon)) { throw new PhutilArgumentUsageException( pht( 'No class "%s" exists in any known library.', $daemon)); } else if (!is_subclass_of($daemon, 'PhutilDaemon')) { throw new PhutilArgumentUsageException( pht( 'Class "%s" is not a subclass of "%s".', $daemon, 'PhutilDaemon')); } } $argv = idx($config, 'argv', array()); $daemon = newv($daemon, array($argv)); if ($trace_mode) { $daemon->setTraceMode(); } if ($trace_memory) { $daemon->setTraceMemory(); } if ($verbose) { $daemon->setVerbose(true); } $autoscale = idx($config, 'autoscale'); if ($autoscale) { $daemon->setAutoscaleProperties($autoscale); } $daemon->execute(); diff --git a/src/daemon/PhutilDaemon.php b/src/daemon/PhutilDaemon.php index 4d7afe6..43a6c3f 100644 --- a/src/daemon/PhutilDaemon.php +++ b/src/daemon/PhutilDaemon.php @@ -1,396 +1,410 @@ shouldExit()) { * if (work_available()) { * $this->willBeginWork(); * do_work(); * $this->sleep(0); * } else { * $this->willBeginIdle(); * $this->sleep(1); * } * } * * In particular, call @{method:willBeginWork} before becoming busy, and * @{method:willBeginIdle} when no work is available. If the daemon is launched * into an autoscale pool, this will cause the pool to automatically scale up * when busy and down when idle. * * See @{class:PhutilHighIntensityIntervalDaemon} for an example of a simple * autoscaling daemon. * * Launching a daemon which does not make these callbacks into an autoscale * pool will have no effect. * * @task overseer Communicating With the Overseer * @task autoscale Autoscaling Daemon Pools */ abstract class PhutilDaemon extends Phobject { const MESSAGETYPE_STDOUT = 'stdout'; const MESSAGETYPE_HEARTBEAT = 'heartbeat'; const MESSAGETYPE_BUSY = 'busy'; const MESSAGETYPE_IDLE = 'idle'; const MESSAGETYPE_DOWN = 'down'; const WORKSTATE_BUSY = 'busy'; const WORKSTATE_IDLE = 'idle'; private $argv; private $traceMode; private $traceMemory; private $verbose; private $notifyReceived; private $inGracefulShutdown; private $workState = null; private $idleSince = null; private $autoscaleProperties = array(); final public function setVerbose($verbose) { $this->verbose = $verbose; return $this; } final public function getVerbose() { return $this->verbose; } private static $sighandlerInstalled; final public function __construct(array $argv) { - declare(ticks = 1); $this->argv = $argv; if (!self::$sighandlerInstalled) { self::$sighandlerInstalled = true; pcntl_signal(SIGTERM, __CLASS__.'::exitOnSignal'); } - pcntl_signal(SIGINT, array($this, 'onGracefulSignal')); + pcntl_signal(SIGINT, array($this, 'onGracefulSignal')); pcntl_signal(SIGUSR2, array($this, 'onNotifySignal')); // Without discard mode, this consumes unbounded amounts of memory. Keep // memory bounded. PhutilServiceProfiler::getInstance()->enableDiscardMode(); $this->beginStdoutCapture(); } final public function __destruct() { $this->endStdoutCapture(); } final public function stillWorking() { $this->emitOverseerMessage(self::MESSAGETYPE_HEARTBEAT, null); if ($this->traceMemory) { $daemon = get_class($this); fprintf( STDERR, - "<%s> %s %s\n", + "%s %s %s\n", '', $daemon, pht( 'Memory Usage: %s KB', new PhutilNumber(memory_get_usage() / 1024, 1))); } } final public function shouldExit() { return $this->inGracefulShutdown; } final protected function sleep($duration) { $this->notifyReceived = false; $this->willSleep($duration); $this->stillWorking(); $is_autoscale = $this->isClonedAutoscaleDaemon(); $scale_down = $this->getAutoscaleDownDuration(); $max_sleep = 60; if ($is_autoscale) { $max_sleep = min($max_sleep, $scale_down); } if ($is_autoscale) { if ($this->workState == self::WORKSTATE_IDLE) { $dur = (time() - $this->idleSince); $this->log(pht('Idle for %s seconds.', $dur)); } } while ($duration > 0 && !$this->notifyReceived && !$this->shouldExit()) { // If this is an autoscaling clone and we've been idle for too long, // we're going to scale the pool down by exiting and not restarting. The // DOWN message tells the overseer that we don't want to be restarted. if ($is_autoscale) { if ($this->workState == self::WORKSTATE_IDLE) { if ($this->idleSince && ($this->idleSince + $scale_down < time())) { $this->inGracefulShutdown = true; $this->emitOverseerMessage(self::MESSAGETYPE_DOWN, null); $this->log( pht( 'Daemon was idle for more than %s second(s), '. 'scaling pool down.', new PhutilNumber($scale_down))); break; } } } sleep(min($duration, $max_sleep)); $duration -= $max_sleep; $this->stillWorking(); } } protected function willSleep($duration) { return; } public static function exitOnSignal($signo) { - // Normally, PHP doesn't invoke destructors when existing in response to + self::didCatchSignal($signo); + + // Normally, PHP doesn't invoke destructors when exiting in response to // a signal. This forces it to do so, so we have a fighting chance of // releasing any locks, leases or resources on our way out. exit(128 + $signo); } final protected function getArgv() { return $this->argv; } final public function execute() { $this->willRun(); $this->run(); } abstract protected function run(); final public function setTraceMemory() { $this->traceMemory = true; return $this; } final public function getTraceMemory() { return $this->traceMemory; } final public function setTraceMode() { $this->traceMode = true; PhutilServiceProfiler::installEchoListener(); PhutilConsole::getConsole()->getServer()->setEnableLog(true); $this->didSetTraceMode(); return $this; } final public function getTraceMode() { return $this->traceMode; } final public function onGracefulSignal($signo) { + self::didCatchSignal($signo); $this->inGracefulShutdown = true; } final public function onNotifySignal($signo) { + self::didCatchSignal($signo); $this->notifyReceived = true; $this->onNotify($signo); } protected function onNotify($signo) { // This is a hook for subclasses. } protected function willRun() { // This is a hook for subclasses. } protected function didSetTraceMode() { // This is a hook for subclasses. } final protected function log($message) { if ($this->verbose) { $daemon = get_class($this); - fprintf(STDERR, "<%s> %s %s\n", '', $daemon, $message); + fprintf(STDERR, "%s %s %s\n", '', $daemon, $message); } } + private static function didCatchSignal($signo) { + $signame = phutil_get_signal_name($signo); + fprintf( + STDERR, + "%s Caught signal %s (%s).\n", + '', + $signo, + $signame); + } + /* -( Communicating With the Overseer )------------------------------------ */ private function beginStdoutCapture() { ob_start(array($this, 'didReceiveStdout'), 2); } private function endStdoutCapture() { ob_end_flush(); } public function didReceiveStdout($data) { if (!strlen($data)) { return ''; } + return $this->encodeOverseerMessage(self::MESSAGETYPE_STDOUT, $data); } private function encodeOverseerMessage($type, $data) { $structure = array($type); if ($data !== null) { $structure[] = $data; } return json_encode($structure)."\n"; } private function emitOverseerMessage($type, $data) { $this->endStdoutCapture(); echo $this->encodeOverseerMessage($type, $data); $this->beginStdoutCapture(); } public static function errorListener($event, $value, array $metadata) { // If the caller has redirected the error log to a file, PHP won't output // messages to stderr, so the overseer can't capture them. Install a // listener which just echoes errors to stderr, so the overseer is always // aware of errors. $console = PhutilConsole::getConsole(); $message = idx($metadata, 'default_message'); if ($message) { $console->writeErr("%s\n", $message); } if (idx($metadata, 'trace')) { $trace = PhutilErrorHandler::formatStacktrace($metadata['trace']); $console->writeErr("%s\n", $trace); } } /* -( Autoscaling )-------------------------------------------------------- */ /** * Prepare to become busy. This may autoscale the pool up. * * This notifies the overseer that the daemon has become busy. If daemons * that are part of an autoscale pool are continuously busy for a prolonged * period of time, the overseer may scale up the pool. * * @return this * @task autoscale */ protected function willBeginWork() { if ($this->workState != self::WORKSTATE_BUSY) { $this->workState = self::WORKSTATE_BUSY; $this->idleSince = null; $this->emitOverseerMessage(self::MESSAGETYPE_BUSY, null); } return $this; } /** * Prepare to idle. This may autoscale the pool down. * * This notifies the overseer that the daemon is no longer busy. If daemons * that are part of an autoscale pool are idle for a prolonged period of time, * they may exit to scale the pool down. * * @return this * @task autoscale */ protected function willBeginIdle() { if ($this->workState != self::WORKSTATE_IDLE) { $this->workState = self::WORKSTATE_IDLE; $this->idleSince = time(); $this->emitOverseerMessage(self::MESSAGETYPE_IDLE, null); } return $this; } /** * Determine if this is a clone or the original daemon. * * @return bool True if this is an cloned autoscaling daemon. * @task autoscale */ private function isClonedAutoscaleDaemon() { return (bool)$this->getAutoscaleProperty('clone', false); } /** * Get the duration (in seconds) which a daemon must be continuously idle * for before it should exit to scale the pool down. * * @return int Duration, in seconds. * @task autoscale */ private function getAutoscaleDownDuration() { return $this->getAutoscaleProperty('down', 15); } /** * Configure autoscaling for this daemon. * * @param map Map of autoscale properties. * @return this * @task autoscale */ public function setAutoscaleProperties(array $autoscale_properties) { PhutilTypeSpec::checkMap( $autoscale_properties, array( 'group' => 'optional string', 'up' => 'optional int', 'down' => 'optional int', 'pool' => 'optional int', 'clone' => 'optional bool', 'reserve' => 'optional int|float', )); $this->autoscaleProperties = $autoscale_properties; return $this; } /** * Read autoscaling configuration for this daemon. * * @param string Property to read. * @param wild Default value to return if the property is not set. * @return wild Property value, or `$default` if one is not set. * @task autoscale */ private function getAutoscaleProperty($key, $default = null) { return idx($this->autoscaleProperties, $key, $default); } } diff --git a/src/daemon/PhutilDaemonHandle.php b/src/daemon/PhutilDaemonHandle.php index 37858fb..3de873c 100644 --- a/src/daemon/PhutilDaemonHandle.php +++ b/src/daemon/PhutilDaemonHandle.php @@ -1,419 +1,422 @@ overseer = $overseer; $this->daemonClass = $daemon_class; $this->argv = $argv; $this->config = $config; $this->restartAt = time(); $this->daemonID = $this->generateDaemonID(); $this->dispatchEvent( self::EVENT_DID_LAUNCH, array( 'argv' => $this->argv, 'explicitArgv' => idx($this->config, 'argv'), )); } public function isRunning() { return (bool)$this->future; } public function isDone() { return (!$this->shouldRestart && !$this->isRunning()); } public function getFuture() { return $this->future; } public function setSilent($silent) { $this->silent = $silent; return $this; } public function getSilent() { return $this->silent; } public function setTraceMemory($trace_memory) { $this->traceMemory = $trace_memory; return $this; } public function getTraceMemory() { return $this->traceMemory; } public function update() { $this->updateMemory(); if (!$this->isRunning()) { if (!$this->shouldRestart) { return; } if (!$this->restartAt || (time() < $this->restartAt)) { return; } if ($this->shouldShutdown) { return; } $this->startDaemonProcess(); } $future = $this->future; $result = null; if ($future->isReady()) { $result = $future->resolve(); } list($stdout, $stderr) = $future->read(); $future->discardBuffers(); if (strlen($stdout)) { $this->didReadStdout($stdout); } $stderr = trim($stderr); if (strlen($stderr)) { - $this->logMessage('STDE', $stderr); + foreach (phutil_split_lines($stderr, false) as $line) { + $this->logMessage('STDE', $line); + } } if ($result !== null) { list($err) = $result; + if ($err) { - $this->logMessage('FAIL', pht('Process exited with error %s', $err)); + $this->logMessage('FAIL', pht('Process exited with error %s.', $err)); } else { $this->logMessage('DONE', pht('Process exited normally.')); } $this->future = null; if ($this->shouldShutdown) { $this->restartAt = null; } else { $this->scheduleRestart(); } } $this->updateHeartbeatEvent(); $this->updateHangDetection(); } private function updateHeartbeatEvent() { if ($this->heartbeat > time()) { return; } $this->heartbeat = time() + $this->getHeartbeatEventFrequency(); $this->dispatchEvent(self::EVENT_DID_HEARTBEAT); } private function updateHangDetection() { if (!$this->isRunning()) { return; } if (time() > $this->deadline) { $this->logMessage('HANG', pht('Hang detected. Restarting process.')); $this->annihilateProcessGroup(); $this->scheduleRestart(); } } private function scheduleRestart() { $this->logMessage('WAIT', pht('Waiting to restart process.')); $this->restartAt = time() + self::getWaitBeforeRestart(); } /** * Generate a unique ID for this daemon. * * @return string A unique daemon ID. */ private function generateDaemonID() { return substr(getmypid().':'.Filesystem::readRandomCharacters(12), 0, 12); } public function getDaemonID() { return $this->daemonID; } public function getPID() { return $this->pid; } private function getCaptureBufferSize() { return 65535; } private function getRequiredHeartbeatFrequency() { return 86400; } public static function getWaitBeforeRestart() { return 5; } public static function getHeartbeatEventFrequency() { return 120; } private function getKillDelay() { return 3; } private function getDaemonCWD() { $root = dirname(phutil_get_library_root('phutil')); return $root.'/scripts/daemon/exec/'; } private function newExecFuture() { $class = $this->daemonClass; $argv = $this->argv; $buffer_size = $this->getCaptureBufferSize(); // NOTE: PHP implements proc_open() by running 'sh -c'. On most systems this // is bash, but on Ubuntu it's dash. When you proc_open() using bash, you // get one new process (the command you ran). When you proc_open() using // dash, you get two new processes: the command you ran and a parent // "dash -c" (or "sh -c") process. This means that the child process's PID // is actually the 'dash' PID, not the command's PID. To avoid this, use // 'exec' to replace the shell process with the real process; without this, // the child will call posix_getppid(), be given the pid of the 'sh -c' // process, and send it SIGUSR1 to keepalive which will terminate it // immediately. We also won't be able to do process group management because // the shell process won't properly posix_setsid() so the pgid of the child // won't be meaningful. return id(new ExecFuture('exec ./exec_daemon.php %s %Ls', $class, $argv)) ->setCWD($this->getDaemonCWD()) ->setStdoutSizeLimit($buffer_size) ->setStderrSizeLimit($buffer_size) ->write(json_encode($this->config)); } /** * Dispatch an event to event listeners. * * @param string Event type. * @param dict Event parameters. * @return void */ private function dispatchEvent($type, array $params = array()) { $data = array( 'id' => $this->daemonID, 'daemonClass' => $this->daemonClass, 'childPID' => $this->pid, ) + $params; $event = new PhutilEvent($type, $data); try { PhutilEventEngine::dispatchEvent($event); } catch (Exception $ex) { phlog($ex); } } private function annihilateProcessGroup() { $pid = $this->pid; $pgid = posix_getpgid($pid); if ($pid && $pgid) { posix_kill(-$pgid, SIGTERM); sleep($this->getKillDelay()); posix_kill(-$pgid, SIGKILL); $this->pid = null; } } private function updateMemory() { if ($this->traceMemory) { $this->logMessage( 'RAMS', pht( 'Overseer Memory Usage: %s KB', new PhutilNumber(memory_get_usage() / 1024, 1))); } } private function startDaemonProcess() { $this->logMessage('INIT', pht('Starting process.')); $this->deadline = time() + $this->getRequiredHeartbeatFrequency(); $this->heartbeat = time() + self::getHeartbeatEventFrequency(); $this->stdoutBuffer = ''; $this->future = $this->newExecFuture(); $this->future->start(); $this->pid = $this->future->getPID(); } private function didReadStdout($data) { $this->stdoutBuffer .= $data; while (true) { $pos = strpos($this->stdoutBuffer, "\n"); if ($pos === false) { break; } $message = substr($this->stdoutBuffer, 0, $pos); $this->stdoutBuffer = substr($this->stdoutBuffer, $pos + 1); try { $structure = phutil_json_decode($message); } catch (PhutilJSONParserException $ex) { $structure = array(); } switch (idx($structure, 0)) { case PhutilDaemon::MESSAGETYPE_STDOUT: $this->logMessage('STDO', idx($structure, 1)); break; case PhutilDaemon::MESSAGETYPE_HEARTBEAT: $this->deadline = time() + $this->getRequiredHeartbeatFrequency(); break; case PhutilDaemon::MESSAGETYPE_BUSY: $this->overseer->didBeginWork($this); break; case PhutilDaemon::MESSAGETYPE_IDLE: $this->overseer->didBeginIdle($this); break; case PhutilDaemon::MESSAGETYPE_DOWN: // The daemon is exiting because it doesn't have enough work and it // is trying to scale the pool down. We should not restart it. $this->shouldRestart = false; $this->shouldShutdown = true; break; default: // If we can't parse this or it isn't a message we understand, just // emit the raw message. $this->logMessage('STDO', pht(' %s', $message)); break; } } } public function didReceiveNotifySignal($signo) { $pid = $this->pid; if ($pid) { posix_kill($pid, $signo); } } public function didReceiveReloadSignal($signo) { $signame = phutil_get_signal_name($signo); if ($signame) { $sigmsg = pht( 'Reloading in response to signal %d (%s).', $signo, $signame); } else { $sigmsg = pht( 'Reloading in response to signal %d.', $signo); } $this->logMessage('RELO', $sigmsg, $signo); // This signal means "stop the current process gracefully, then launch // a new identical process once it exits". This can be used to update // daemons after code changes (the new processes will run the new code) // without aborting any running tasks. // We SIGINT the daemon but don't set the shutdown flag, so it will // naturally be restarted after it exits, as though it had exited after an // unhandled exception. posix_kill($this->pid, SIGINT); } public function didReceiveGracefulSignal($signo) { $this->shouldShutdown = true; $this->shouldRestart = false; $signame = phutil_get_signal_name($signo); if ($signame) { $sigmsg = pht( 'Graceful shutdown in response to signal %d (%s).', $signo, $signame); } else { $sigmsg = pht( 'Graceful shutdown in response to signal %d.', $signo); } $this->logMessage('DONE', $sigmsg, $signo); posix_kill($this->pid, SIGINT); } public function didReceiveTerminalSignal($signo) { $this->shouldShutdown = true; $this->shouldRestart = false; $signame = phutil_get_signal_name($signo); if ($signame) { $sigmsg = pht( 'Shutting down in response to signal %s (%s).', $signo, $signame); } else { $sigmsg = pht('Shutting down in response to signal %s.', $signo); } $this->logMessage('EXIT', $sigmsg, $signo); $this->annihilateProcessGroup(); } private function logMessage($type, $message, $context = null) { if (!$this->getSilent()) { echo date('Y-m-d g:i:s A').' ['.$type.'] '.$message."\n"; } $this->dispatchEvent( self::EVENT_DID_LOG, array( 'type' => $type, 'message' => $message, 'context' => $context, )); } public function didRemoveDaemon() { $this->dispatchEvent(self::EVENT_WILL_EXIT); } } diff --git a/src/future/exec/ExecFuture.php b/src/future/exec/ExecFuture.php index 23bdb21..11ecca1 100644 --- a/src/future/exec/ExecFuture.php +++ b/src/future/exec/ExecFuture.php @@ -1,904 +1,915 @@ array('pipe', 'r'), // stdin 1 => array('pipe', 'w'), // stdout 2 => array('pipe', 'w'), // stderr ); /* -( Creating ExecFutures )----------------------------------------------- */ /** * Create a new ExecFuture. * * $future = new ExecFuture('wc -l %s', $file_path); * * @param string `sprintf()`-style command string which will be passed * through @{function:csprintf} with the rest of the arguments. * @param ... Zero or more additional arguments for @{function:csprintf}. * @return ExecFuture ExecFuture for running the specified command. * @task create */ public function __construct($command) { $argv = func_get_args(); $this->command = call_user_func_array('csprintf', $argv); $this->stdin = new PhutilRope(); } /* -( Command Information )------------------------------------------------ */ /** * Retrieve the raw command to be executed. * * @return string Raw command. * @task info */ public function getCommand() { return $this->command; } /** * Retrieve the byte limit for the stderr buffer. * * @return int Maximum buffer size, in bytes. * @task info */ public function getStderrSizeLimit() { return $this->stderrSizeLimit; } /** * Retrieve the byte limit for the stdout buffer. * * @return int Maximum buffer size, in bytes. * @task info */ public function getStdoutSizeLimit() { return $this->stdoutSizeLimit; } /** * Get the process's pid. This only works after execution is initiated, e.g. * by a call to start(). * * @return int Process ID of the executing process. * @task info */ public function getPID() { $status = $this->procGetStatus(); return $status['pid']; } /* -( Configuring Execution )---------------------------------------------- */ /** * Set a maximum size for the stdout read buffer. To limit stderr, see * @{method:setStderrSizeLimit}. The major use of these methods is to use less * memory if you are running a command which sometimes produces huge volumes * of output that you don't really care about. * * NOTE: Setting this to 0 means "no buffer", not "unlimited buffer". * * @param int Maximum size of the stdout read buffer. * @return this * @task config */ public function setStdoutSizeLimit($limit) { $this->stdoutSizeLimit = $limit; return $this; } /** * Set a maximum size for the stderr read buffer. * See @{method:setStdoutSizeLimit} for discussion. * * @param int Maximum size of the stderr read buffer. * @return this * @task config */ public function setStderrSizeLimit($limit) { $this->stderrSizeLimit = $limit; return $this; } /** * Set the maximum internal read buffer size this future. The future will * block reads once the internal stdout or stderr buffer exceeds this size. * * NOTE: If you @{method:resolve} a future with a read buffer limit, you may * block forever! * * TODO: We should probably release the read buffer limit during * @{method:resolve}, or otherwise detect this. For now, be careful. * * @param int|null Maximum buffer size, or `null` for unlimited. * @return this */ public function setReadBufferSize($read_buffer_size) { $this->readBufferSize = $read_buffer_size; return $this; } /** * Set whether to use non-blocking streams on Windows. * * @param bool Whether to use non-blocking streams. * @return this * @task config */ public function setUseWindowsFileStreams($use_streams) { if (phutil_is_windows()) { $this->useWindowsFileStreams = $use_streams; } return $this; } /* -( Interacting With Commands )------------------------------------------ */ /** * Read and return output from stdout and stderr, if any is available. This * method keeps a read cursor on each stream, but the entire streams are * still returned when the future resolves. You can call read() again after * resolving the future to retrieve only the parts of the streams you did not * previously read: * * $future = new ExecFuture('...'); * // ... * list($stdout) = $future->read(); // Returns output so far * list($stdout) = $future->read(); // Returns new output since first call * // ... * list($stdout) = $future->resolvex(); // Returns ALL output * list($stdout) = $future->read(); // Returns unread output * * NOTE: If you set a limit with @{method:setStdoutSizeLimit} or * @{method:setStderrSizeLimit}, this method will not be able to read data * past the limit. * * NOTE: If you call @{method:discardBuffers}, all the stdout/stderr data * will be thrown away and the cursors will be reset. * * @return pair <$stdout, $stderr> pair with new output since the last call * to this method. * @task interact */ public function read() { $stdout = $this->readStdout(); $result = array( $stdout, (string)substr($this->stderr, $this->stderrPos), ); $this->stderrPos = strlen($this->stderr); return $result; } public function readStdout() { if ($this->start) { $this->isReady(); // Sync } $result = (string)substr($this->stdout, $this->stdoutPos); $this->stdoutPos = strlen($this->stdout); return $result; } /** * Write data to stdin of the command. * * @param string Data to write. * @param bool If true, keep the pipe open for writing. By default, the pipe * will be closed as soon as possible so that commands which * listen for EOF will execute. If you want to keep the pipe open * past the start of command execution, do an empty write with * `$keep_pipe = true` first. * @return this * @task interact */ public function write($data, $keep_pipe = false) { if (strlen($data)) { if (!$this->stdin) { throw new Exception(pht('Writing to a closed pipe!')); } $this->stdin->append($data); } $this->closePipe = !$keep_pipe; return $this; } /** * Permanently discard the stdout and stderr buffers and reset the read * cursors. This is basically useful only if you are streaming a large amount * of data from some process: * * $future = new ExecFuture('zcat huge_file.gz'); * do { * $done = $future->resolve(0.1); // Every 100ms, * list($stdout) = $future->read(); // read output... * echo $stdout; // send it somewhere... * $future->discardBuffers(); // and then free the buffers. * } while ($done === null); * * Conceivably you might also need to do this if you're writing a client using * @{class:ExecFuture} and `netcat`, but you probably should not do that. * * NOTE: This completely discards the data. It won't be available when the * future resolves. This is almost certainly only useful if you need the * buffer memory for some reason. * * @return this * @task interact */ public function discardBuffers() { $this->discardStdoutBuffer(); $this->stderr = ''; $this->stderrPos = 0; return $this; } public function discardStdoutBuffer() { $this->stdout = ''; $this->stdoutPos = 0; return $this; } /** * Returns true if this future was killed by a timeout configured with * @{method:setTimeout}. * * @return bool True if the future was killed for exceeding its time limit. */ public function getWasKilledByTimeout() { return $this->killedByTimeout; } /* -( Configuring Execution )---------------------------------------------- */ /** * Set a hard limit on execution time. If the command runs longer, it will * be killed and the future will resolve with an error code. You can test * if a future was killed by a timeout with @{method:getWasKilledByTimeout}. * * @param int Maximum number of seconds this command may execute for. * @return this * @task config */ public function setTimeout($seconds) { $this->timeout = $seconds; return $this; } /* -( Resolving Execution )------------------------------------------------ */ /** * Resolve a command you expect to exit with return code 0. Works like * @{method:resolve}, but throws if $err is nonempty. Returns only * $stdout and $stderr. See also @{function:execx}. * * list($stdout, $stderr) = $future->resolvex(); * * @param float Optional timeout after which resolution will pause and * execution will return to the caller. * @return pair <$stdout, $stderr> pair. * @task resolve */ public function resolvex($timeout = null) { list($err, $stdout, $stderr) = $this->resolve($timeout); if ($err) { $cmd = $this->command; throw new CommandException( pht('Command failed with error #%d!', $err), $cmd, $err, $stdout, $stderr); } return array($stdout, $stderr); } /** * Resolve a command you expect to return valid JSON. Works like * @{method:resolvex}, but also throws if stderr is nonempty, or stdout is not * valid JSON. Returns a PHP array, decoded from the JSON command output. * * @param float Optional timeout after which resolution will pause and * execution will return to the caller. * @return array PHP array, decoded from JSON command output. * @task resolve */ public function resolveJSON($timeout = null) { list($stdout, $stderr) = $this->resolvex($timeout); if (strlen($stderr)) { $cmd = $this->command; throw new CommandException( pht( "JSON command '%s' emitted text to stderr when none was expected: %d", $cmd, $stderr), $cmd, 0, $stdout, $stderr); } try { return phutil_json_decode($stdout); } catch (PhutilJSONParserException $ex) { $cmd = $this->command; throw new CommandException( pht( "JSON command '%s' did not produce a valid JSON object on stdout: %s", $cmd, $stdout), $cmd, 0, $stdout, $stderr); } } /** * Resolve the process by abruptly terminating it. * * @return list List of results. * @task resolve */ public function resolveKill() { if (!$this->result) { if (defined('SIGKILL')) { $signal = SIGKILL; } else { $signal = 9; } proc_terminate($this->proc, $signal); $this->result = array( 128 + $signal, $this->stdout, $this->stderr, ); $this->closeProcess(); } return $this->result; } /* -( Internals )---------------------------------------------------------- */ /** * Provides read sockets to the future core. * * @return list List of read sockets. * @task internal */ public function getReadSockets() { list($stdin, $stdout, $stderr) = $this->pipes; $sockets = array(); if (isset($stdout) && !feof($stdout)) { $sockets[] = $stdout; } if (isset($stderr) && !feof($stderr)) { $sockets[] = $stderr; } return $sockets; } /** * Provides write sockets to the future core. * * @return list List of write sockets. * @task internal */ public function getWriteSockets() { list($stdin, $stdout, $stderr) = $this->pipes; $sockets = array(); if (isset($stdin) && $this->stdin->getByteLength() && !feof($stdin)) { $sockets[] = $stdin; } return $sockets; } /** * Determine if the read buffer is empty. * * @return bool True if the read buffer is empty. * @task internal */ public function isReadBufferEmpty() { return !strlen($this->stdout); } /** * Determine if the write buffer is empty. * * @return bool True if the write buffer is empty. * @task internal */ public function isWriteBufferEmpty() { return !$this->getWriteBufferSize(); } /** * Determine the number of bytes in the write buffer. * * @return int Number of bytes in the write buffer. * @task internal */ public function getWriteBufferSize() { if (!$this->stdin) { return 0; } return $this->stdin->getByteLength(); } /** * Reads some bytes from a stream, discarding output once a certain amount * has been accumulated. * * @param resource Stream to read from. * @param int Maximum number of bytes to return from $stream. If * additional bytes are available, they will be read and * discarded. * @param string Human-readable description of stream, for exception * message. * @param int Maximum number of bytes to read. * @return string The data read from the stream. * @task internal */ private function readAndDiscard($stream, $limit, $description, $length) { $output = ''; if ($length <= 0) { return ''; } do { $data = fread($stream, min($length, 64 * 1024)); if (false === $data) { throw new Exception(pht('Failed to read from %s', $description)); } $read_bytes = strlen($data); if ($read_bytes > 0 && $limit > 0) { if ($read_bytes > $limit) { $data = substr($data, 0, $limit); } $output .= $data; $limit -= strlen($data); } if (strlen($output) >= $length) { break; } } while ($read_bytes > 0); return $output; } /** * Begin or continue command execution. * * @return bool True if future has resolved. * @task internal */ public function isReady() { // NOTE: We have soft dependencies on PhutilServiceProfiler and // PhutilErrorTrap here. These dependencies are soft to avoid the need to // build them into the Phage agent. Under normal circumstances, these // classes are always available. if (!$this->pipes) { // NOTE: See note above about Phage. if (class_exists('PhutilServiceProfiler')) { $profiler = PhutilServiceProfiler::getInstance(); $this->profilerCallID = $profiler->beginServiceCall( array( 'type' => 'exec', 'command' => (string)$this->command, )); } if (!$this->start) { // We might already have started the timer via initiating resolution. $this->start = microtime(true); } $unmasked_command = $this->command; if ($unmasked_command instanceof PhutilCommandString) { $unmasked_command = $unmasked_command->getUnmaskedString(); } $pipes = array(); if (phutil_is_windows()) { // See T4395. proc_open under Windows uses "cmd /C [cmd]", which will // strip the first and last quote when there aren't exactly two quotes // (and some other conditions as well). This results in a command that // looks like `command" "path to my file" "something something` which is // clearly wrong. By surrounding the command string with quotes we can // be sure this process is harmless. if (strpos($unmasked_command, '"') !== false) { $unmasked_command = '"'.$unmasked_command.'"'; } } if ($this->hasEnv()) { $env = $this->getEnv(); } else { $env = null; } $cwd = $this->getCWD(); // NOTE: See note above about Phage. if (class_exists('PhutilErrorTrap')) { $trap = new PhutilErrorTrap(); } else { $trap = null; } $spec = self::$descriptorSpec; if ($this->useWindowsFileStreams) { $this->windowsStdoutTempFile = new TempFile(); $this->windowsStderrTempFile = new TempFile(); $spec = array( 0 => self::$descriptorSpec[0], // stdin 1 => fopen($this->windowsStdoutTempFile, 'wb'), // stdout 2 => fopen($this->windowsStderrTempFile, 'wb'), // stderr ); if (!$spec[1] || !$spec[2]) { throw new Exception(pht( 'Unable to create temporary files for '. 'Windows stdout / stderr streams')); } } $proc = @proc_open( $unmasked_command, $spec, $pipes, $cwd, $env); if ($this->useWindowsFileStreams) { fclose($spec[1]); fclose($spec[2]); $pipes = array( 0 => head($pipes), // stdin 1 => fopen($this->windowsStdoutTempFile, 'rb'), // stdout 2 => fopen($this->windowsStderrTempFile, 'rb'), // stderr ); if (!$pipes[1] || !$pipes[2]) { throw new Exception(pht( 'Unable to open temporary files for '. 'reading Windows stdout / stderr streams')); } } if ($trap) { $err = $trap->getErrorsAsString(); $trap->destroy(); } else { $err = error_get_last(); } if (!is_resource($proc)) { throw new Exception( pht( 'Failed to `%s`: %s', 'proc_open()', $err)); } $this->pipes = $pipes; $this->proc = $proc; list($stdin, $stdout, $stderr) = $pipes; if (!phutil_is_windows()) { // On Windows, we redirect process standard output and standard error // through temporary files, and then use stream_select to determine // if there's more data to read. if ((!stream_set_blocking($stdout, false)) || (!stream_set_blocking($stderr, false)) || (!stream_set_blocking($stdin, false))) { $this->__destruct(); throw new Exception(pht('Failed to set streams nonblocking.')); } } $this->tryToCloseStdin(); return false; } if (!$this->proc) { return true; } list($stdin, $stdout, $stderr) = $this->pipes; while (isset($this->stdin) && $this->stdin->getByteLength()) { $write_segment = $this->stdin->getAnyPrefix(); $bytes = fwrite($stdin, $write_segment); if ($bytes === false) { throw new Exception(pht('Unable to write to stdin!')); } else if ($bytes) { $this->stdin->removeBytesFromHead($bytes); } else { // Writes are blocked for now. break; } } $this->tryToCloseStdin(); // Read status before reading pipes so that we can never miss data that // arrives between our last read and the process exiting. $status = $this->procGetStatus(); $read_buffer_size = $this->readBufferSize; $max_stdout_read_bytes = PHP_INT_MAX; $max_stderr_read_bytes = PHP_INT_MAX; if ($read_buffer_size !== null) { $max_stdout_read_bytes = $read_buffer_size - strlen($this->stdout); $max_stderr_read_bytes = $read_buffer_size - strlen($this->stderr); } if ($max_stdout_read_bytes > 0) { $this->stdout .= $this->readAndDiscard( $stdout, $this->getStdoutSizeLimit() - strlen($this->stdout), 'stdout', $max_stdout_read_bytes); } if ($max_stderr_read_bytes > 0) { $this->stderr .= $this->readAndDiscard( $stderr, $this->getStderrSizeLimit() - strlen($this->stderr), 'stderr', $max_stderr_read_bytes); } $is_done = false; if (!$status['running']) { // We may still have unread bytes on stdout or stderr, particularly if // this future is being buffered and streamed. If we do, we don't want to // consider the subprocess to have exited until we've read everything. // See T9724 for context. if (feof($stdout) && feof($stderr)) { $is_done = true; } } if ($is_done) { if ($this->useWindowsFileStreams) { fclose($stdout); fclose($stderr); } + // If the subprocess got nuked with `kill -9`, we get a -1 exitcode. + // Upgrade this to a slightly more informative value by examining the + // terminating signal code. + $err = $status['exitcode']; + if ($err == -1) { + if ($status['signaled']) { + $err = 128 + $status['termsig']; + } + } + $this->result = array( - $status['exitcode'], + $err, $this->stdout, $this->stderr, ); $this->closeProcess(); return true; } $elapsed = (microtime(true) - $this->start); if ($this->timeout && ($elapsed >= $this->timeout)) { $this->killedByTimeout = true; $this->resolveKill(); return true; } } /** * @return void * @task internal */ public function __destruct() { if (!$this->proc) { return; } // NOTE: If we try to proc_close() an open process, we hang indefinitely. To // avoid this, kill the process explicitly if it's still running. $status = $this->procGetStatus(); if ($status['running']) { $this->resolveKill(); } else { $this->closeProcess(); } } /** * Close and free resources if necessary. * * @return void * @task internal */ private function closeProcess() { foreach ($this->pipes as $pipe) { if (isset($pipe)) { @fclose($pipe); } } $this->pipes = array(null, null, null); if ($this->proc) { @proc_close($this->proc); $this->proc = null; } $this->stdin = null; if ($this->profilerCallID !== null) { $profiler = PhutilServiceProfiler::getInstance(); $profiler->endServiceCall( $this->profilerCallID, array( 'err' => $this->result ? idx($this->result, 0) : null, )); $this->profilerCallID = null; } } /** * Execute `proc_get_status()`, but avoid pitfalls. * * @return dict Process status. * @task internal */ private function procGetStatus() { // After the process exits, we only get one chance to read proc_get_status() // before it starts returning garbage. Make sure we don't throw away the // last good read. if ($this->procStatus) { if (!$this->procStatus['running']) { return $this->procStatus; } } $this->procStatus = proc_get_status($this->proc); + return $this->procStatus; } /** * Try to close stdin, if we're done using it. This keeps us from hanging if * the process on the other end of the pipe is waiting for EOF. * * @return void * @task internal */ private function tryToCloseStdin() { if (!$this->closePipe) { // We've been told to keep the pipe open by a call to write(..., true). return; } if ($this->stdin->getByteLength()) { // We still have bytes to write. return; } list($stdin) = $this->pipes; if (!$stdin) { // We've already closed stdin. return; } // There's nothing stopping us from closing stdin, so close it. @fclose($stdin); $this->pipes[0] = null; } public function getDefaultWait() { $wait = parent::getDefaultWait(); if ($this->timeout) { if (!$this->start) { $this->start = microtime(true); } $elapsed = (microtime(true) - $this->start); $wait = max(0, min($this->timeout - $elapsed, $wait)); } return $wait; } }