diff --git a/src/daemon/PhutilDaemon.php b/src/daemon/PhutilDaemon.php index c39c433..71fe6c1 100644 --- a/src/daemon/PhutilDaemon.php +++ b/src/daemon/PhutilDaemon.php @@ -1,391 +1,392 @@ shouldExit()) { * if (work_available()) { * $this->willBeginWork(); * do_work(); * $this->sleep(0); * } else { * $this->willBeginIdle(); * $this->sleep(1); * } * } * * In particular, call @{method:willBeginWork} before becoming busy, and * @{method:willBeginIdle} when no work is available. If the daemon is launched * into an autoscale pool, this will cause the pool to automatically scale up * when busy and down when idle. * * See @{class:PhutilHighIntensityIntervalDaemon} for an example of a simple * autoscaling daemon. * * Launching a daemon which does not make these callbacks into an autoscale * pool will have no effect. * * @task overseer Communicating With the Overseer * @task autoscale Autoscaling Daemon Pools * * @stable */ abstract class PhutilDaemon { const MESSAGETYPE_STDOUT = 'stdout'; const MESSAGETYPE_HEARTBEAT = 'heartbeat'; const MESSAGETYPE_BUSY = 'busy'; const MESSAGETYPE_IDLE = 'idle'; const MESSAGETYPE_DOWN = 'down'; const WORKSTATE_BUSY = 'busy'; const WORKSTATE_IDLE = 'idle'; private $argv; private $traceMode; private $traceMemory; private $verbose; private $notifyReceived; private $inGracefulShutdown; private $workState = null; private $idleSince = null; private $autoscaleProperties = array(); final public function setVerbose($verbose) { $this->verbose = $verbose; return $this; } final public function getVerbose() { return $this->verbose; } private static $sighandlerInstalled; final public function __construct(array $argv) { declare(ticks = 1); $this->argv = $argv; if (!self::$sighandlerInstalled) { self::$sighandlerInstalled = true; pcntl_signal(SIGTERM, __CLASS__.'::exitOnSignal'); } pcntl_signal(SIGINT, array($this, 'onGracefulSignal')); pcntl_signal(SIGUSR2, array($this, 'onNotifySignal')); // Without discard mode, this consumes unbounded amounts of memory. Keep // memory bounded. PhutilServiceProfiler::getInstance()->enableDiscardMode(); $this->beginStdoutCapture(); } final public function __destruct() { $this->endStdoutCapture(); } final public function stillWorking() { $this->emitOverseerMessage(self::MESSAGETYPE_HEARTBEAT, null); if ($this->traceMemory) { $memuse = number_format(memory_get_usage() / 1024, 1); $daemon = get_class($this); fprintf(STDERR, '%s', " {$daemon} Memory Usage: {$memuse} KB\n"); } } final public function shouldExit() { return $this->inGracefulShutdown; } final protected function sleep($duration) { $this->notifyReceived = false; $this->willSleep($duration); $this->stillWorking(); $is_autoscale = $this->isClonedAutoscaleDaemon(); $scale_down = $this->getAutoscaleDownDuration(); $max_sleep = 60; if ($is_autoscale) { $max_sleep = min($max_sleep, $scale_down); } if ($is_autoscale) { if ($this->workState == self::WORKSTATE_IDLE) { $dur = (time() - $this->idleSince); $this->log(pht('Idle for %s seconds.', $dur)); } } while ($duration > 0 && !$this->notifyReceived && !$this->shouldExit()) { // If this is an autoscaling clone and we've been idle for too long, // we're going to scale the pool down by exiting and not restarting. The // DOWN message tells the overseer that we don't want to be restarted. if ($is_autoscale) { if ($this->workState == self::WORKSTATE_IDLE) { if ($this->idleSince && ($this->idleSince + $scale_down < time())) { $this->inGracefulShutdown = true; $this->emitOverseerMessage(self::MESSAGETYPE_DOWN, null); $this->log( pht( 'Daemon was idle for more than %s seconds, scaling pool '. 'down.', $scale_down)); break; } } } sleep(min($duration, $max_sleep)); $duration -= $max_sleep; $this->stillWorking(); } } protected function willSleep($duration) { return; } public static function exitOnSignal($signo) { // Normally, PHP doesn't invoke destructors when existing in response to // a signal. This forces it to do so, so we have a fighting chance of // releasing any locks, leases or resources on our way out. exit(128 + $signo); } final protected function getArgv() { return $this->argv; } final public function execute() { $this->willRun(); $this->run(); } abstract protected function run(); final public function setTraceMemory() { $this->traceMemory = true; return $this; } final public function getTraceMemory() { return $this->traceMemory; } final public function setTraceMode() { $this->traceMode = true; PhutilServiceProfiler::installEchoListener(); PhutilConsole::getConsole()->getServer()->setEnableLog(true); $this->didSetTraceMode(); return $this; } final public function getTraceMode() { return $this->traceMode; } final public function onGracefulSignal($signo) { $this->inGracefulShutdown = true; } final public function onNotifySignal($signo) { $this->notifyReceived = true; $this->onNotify($signo); } protected function onNotify($signo) { // This is a hook for subclasses. } protected function willRun() { // This is a hook for subclasses. } protected function didSetTraceMode() { // This is a hook for subclasses. } final protected function log($message) { if ($this->verbose) { $daemon = get_class($this); fprintf(STDERR, '%s', " {$daemon} {$message}\n"); } } /* -( Communicating With the Overseer )------------------------------------ */ private function beginStdoutCapture() { ob_start(array($this, 'didReceiveStdout'), 2); } private function endStdoutCapture() { ob_end_flush(); } public function didReceiveStdout($data) { if (!strlen($data)) { return ''; } return $this->encodeOverseerMessage(self::MESSAGETYPE_STDOUT, $data); } private function encodeOverseerMessage($type, $data) { $structure = array($type); if ($data !== null) { $structure[] = $data; } return json_encode($structure)."\n"; } private function emitOverseerMessage($type, $data) { $this->endStdoutCapture(); echo $this->encodeOverseerMessage($type, $data); $this->beginStdoutCapture(); } public static function errorListener($event, $value, array $metadata) { // If the caller has redirected the error log to a file, PHP won't output // messages to stderr, so the overseer can't capture them. Install a // listener which just echoes errors to stderr, so the overseer is always // aware of errors. $console = PhutilConsole::getConsole(); $message = idx($metadata, 'default_message'); if ($message) { $console->writeErr("%s\n", $message); } if (idx($metadata, 'trace')) { $trace = PhutilErrorHandler::formatStacktrace($metadata['trace']); $console->writeErr("%s\n", $trace); } } /* -( Autoscaling )-------------------------------------------------------- */ /** * Prepare to become busy. This may autoscale the pool up. * * This notifies the overseer that the daemon has become busy. If daemons * that are part of an autoscale pool are continuously busy for a prolonged * period of time, the overseer may scale up the pool. * * @return this * @task autoscale */ protected function willBeginWork() { if ($this->workState != self::WORKSTATE_BUSY) { $this->workState = self::WORKSTATE_BUSY; $this->idleSince = null; $this->emitOverseerMessage(self::MESSAGETYPE_BUSY, null); } return $this; } /** * Prepare to idle. This may autoscale the pool down. * * This notifies the overseer that the daemon is no longer busy. If daemons * that are part of an autoscale pool are idle for a prolonged period of time, * they may exit to scale the pool down. * * @return this * @task autoscale */ protected function willBeginIdle() { if ($this->workState != self::WORKSTATE_IDLE) { $this->workState = self::WORKSTATE_IDLE; $this->idleSince = time(); $this->emitOverseerMessage(self::MESSAGETYPE_IDLE, null); } return $this; } /** * Determine if this is a clone or the original daemon. * * @return bool True if this is an cloned autoscaling daemon. * @task autoscale */ private function isClonedAutoscaleDaemon() { return (bool)$this->getAutoscaleProperty('clone', false); } /** * Get the duration (in seconds) which a daemon must be continuously idle * for before it should exit to scale the pool down. * * @return int Duration, in seconds. * @task autoscale */ private function getAutoscaleDownDuration() { return $this->getAutoscaleProperty('down', 15); } /** * Configure autoscaling for this daemon. * * @param map Map of autoscale properties. * @return this * @task autoscale */ public function setAutoscaleProperties(array $autoscale_properties) { PhutilTypeSpec::checkMap( $autoscale_properties, array( 'group' => 'optional string', 'up' => 'optional int', 'down' => 'optional int', 'pool' => 'optional int', 'clone' => 'optional bool', + 'reserve' => 'optional float', )); $this->autoscaleProperties = $autoscale_properties; return $this; } /** * Read autoscaling configuration for this daemon. * * @param string Property to read. * @param wild Default value to return if the property is not set. * @return wild Property value, or `$default` if one is not set. * @task autoscale */ private function getAutoscaleProperty($key, $default = null) { return idx($this->autoscaleProperties, $key, $default); } } diff --git a/src/daemon/PhutilDaemonOverseer.php b/src/daemon/PhutilDaemonOverseer.php index e2ec3be..797723a 100644 --- a/src/daemon/PhutilDaemonOverseer.php +++ b/src/daemon/PhutilDaemonOverseer.php @@ -1,453 +1,475 @@ enableDiscardMode(); $args = new PhutilArgumentParser($argv); $args->setTagline('daemon overseer'); $args->setSynopsis(<<parseStandardArguments(); $args->parse( array( array( 'name' => 'trace-memory', 'help' => 'Enable debug memory tracing.', ), array( 'name' => 'verbose', 'help' => 'Enable verbose activity logging.', ), array( 'name' => 'label', 'short' => 'l', 'param' => 'label', 'help' => pht( 'Optional process label. Makes "ps" nicer, no behavioral effects.'), ), )); $argv = array(); if ($args->getArg('trace')) { $this->traceMode = true; $argv[] = '--trace'; } if ($args->getArg('trace-memory')) { $this->traceMode = true; $this->traceMemory = true; $argv[] = '--trace-memory'; } $verbose = $args->getArg('verbose'); if ($verbose) { $this->verbose = true; $argv[] = '--verbose'; } $label = $args->getArg('label'); if ($label) { $argv[] = '-l'; $argv[] = $label; } $this->argv = $argv; if (function_exists('posix_isatty') && posix_isatty(STDIN)) { fprintf(STDERR, pht('Reading daemon configuration from stdin...')."\n"); } $config = @file_get_contents('php://stdin'); $config = id(new PhutilJSONParser())->parse($config); $this->libraries = idx($config, 'load'); $this->log = idx($config, 'log'); $this->daemonize = idx($config, 'daemonize'); $this->piddir = idx($config, 'piddir'); $this->config = $config; if (self::$instance) { throw new Exception( 'You may not instantiate more than one Overseer per process.'); } self::$instance = $this; $this->startEpoch = time(); // Check this before we daemonize, since if it's an issue the child will // exit immediately. if ($this->piddir) { $dir = $this->piddir; try { Filesystem::assertWritable($dir); } catch (Exception $ex) { throw new Exception( "Specified daemon PID directory ('{$dir}') does not exist or is ". "not writable by the daemon user!"); } } if (!idx($config, 'daemons')) { throw new PhutilArgumentUsageException( pht('You must specify at least one daemon to start!')); } if ($this->log) { // NOTE: Now that we're committed to daemonizing, redirect the error // log if we have a `--log` parameter. Do this at the last moment // so as many setup issues as possible are surfaced. ini_set('error_log', $this->log); } if ($this->daemonize) { // We need to get rid of these or the daemon will hang when we TERM it // waiting for something to read the buffers. TODO: Learn how unix works. fclose(STDOUT); fclose(STDERR); ob_start(); $pid = pcntl_fork(); if ($pid === -1) { throw new Exception('Unable to fork!'); } else if ($pid) { exit(0); } } declare(ticks = 1); pcntl_signal(SIGUSR2, array($this, 'didReceiveNotifySignal')); pcntl_signal(SIGINT, array($this, 'didReceiveGracefulSignal')); pcntl_signal(SIGTERM, array($this, 'didReceiveTerminalSignal')); } public function addLibrary($library) { $this->libraries[] = $library; return $this; } public function run() { $this->daemons = array(); foreach ($this->config['daemons'] as $config) { $config += array( 'argv' => array(), 'autoscale' => array(), ); $daemon = new PhutilDaemonHandle( $this, $config['class'], $this->argv, array( 'log' => $this->log, 'argv' => $config['argv'], 'load' => $this->libraries, 'autoscale' => $config['autoscale'], )); $daemon->setSilent((!$this->traceMode && !$this->verbose)); $daemon->setTraceMemory($this->traceMemory); $this->addDaemon($daemon, $config); } while (true) { $futures = array(); foreach ($this->getDaemonHandles() as $daemon) { $daemon->update(); if ($daemon->isRunning()) { $futures[] = $daemon->getFuture(); } if ($daemon->isDone()) { $this->removeDaemon($daemon); } } $this->updatePidfile(); $this->updateAutoscale(); if ($futures) { $iter = id(new FutureIterator($futures)) ->setUpdateInterval(1); foreach ($iter as $future) { break; } } else { if ($this->inGracefulShutdown) { break; } sleep(1); } } exit($this->err); } private function addDaemon(PhutilDaemonHandle $daemon, array $config) { $id = $daemon->getDaemonID(); $this->daemons[$id] = array( 'handle' => $daemon, 'config' => $config, ); $autoscale_group = $this->getAutoscaleGroup($daemon); if ($autoscale_group) { $this->autoscale[$autoscale_group][$id] = true; } return $this; } private function removeDaemon(PhutilDaemonHandle $daemon) { $id = $daemon->getDaemonID(); $autoscale_group = $this->getAutoscaleGroup($daemon); if ($autoscale_group) { unset($this->autoscale[$autoscale_group][$id]); } unset($this->daemons[$id]); return $this; } private function getAutoscaleGroup(PhutilDaemonHandle $daemon) { return $this->getAutoscaleProperty($daemon, 'group'); } private function getAutoscaleProperty( PhutilDaemonHandle $daemon, $key, $default = null) { $id = $daemon->getDaemonID(); $autoscale = $this->daemons[$id]['config']['autoscale']; return idx($autoscale, $key, $default); } public function didBeginWork(PhutilDaemonHandle $daemon) { $id = $daemon->getDaemonID(); $busy = idx($this->daemons[$daemon->getDaemonID()], 'busy'); if (!$busy) { $this->daemons[$id]['busy'] = time(); } } public function didBeginIdle(PhutilDaemonHandle $daemon) { $id = $daemon->getDaemonID(); unset($this->daemons[$id]['busy']); } public function updateAutoscale() { foreach ($this->autoscale as $group => $daemons) { $daemon = $this->daemons[head_key($daemons)]['handle']; $scaleup_duration = $this->getAutoscaleProperty($daemon, 'up', 2); $max_pool_size = $this->getAutoscaleProperty($daemon, 'pool', 8); + $reserve = $this->getAutoscaleProperty($daemon, 'reserve', 0); // Don't scale a group if it is already at the maximum pool size. if (count($daemons) >= $max_pool_size) { continue; } $should_scale = true; foreach ($daemons as $daemon_id => $ignored) { $busy = idx($this->daemons[$daemon_id], 'busy'); if (!$busy) { // At least one daemon in the group hasn't reported that it has // started work. $should_scale = false; break; } if ((time() - $busy) < $scaleup_duration) { // At least one daemon in the group was idle recently, so we have // not fullly $should_scale = false; break; } } + // If we have a configured memory reserve for this pool, it tells us that + // we should not scale up unless there's at least that much memory left + // on the system (for example, a reserve of 0.25 means that 25% of system + // memory must be free to autoscale). + if ($should_scale && $reserve) { + // On some systems this may be slightly more expensive than other + // checks, so only do it once we're prepared to scale up. + $memory = PhutilSystem::getSystemMemoryInformation(); + $free_ratio = ($memory['free'] / $memory['total']); + + // If we don't have enough free memory, don't scale. + if ($free_ratio <= $reserve) { + continue; + } + } + if ($should_scale) { $config = $this->daemons[$daemon_id]['config']; $config['autoscale']['clone'] = true; $clone = new PhutilDaemonHandle( $this, $config['class'], $this->argv, array( 'log' => $this->log, 'argv' => $config['argv'], 'load' => $this->libraries, 'autoscale' => $config['autoscale'], )); $this->addDaemon($clone, $config); + + // Don't scale more than one pool up per iteration. Otherwise, we could + // break the memory barrier if we have a lot of pools and scale them + // all up at once. + return; } } } public function didReceiveNotifySignal($signo) { foreach ($this->getDaemonHandles() as $daemon) { $daemon->didReceiveNotifySignal($signo); } } public function didReceiveGracefulSignal($signo) { // If we receive SIGINT more than once, interpret it like SIGTERM. if ($this->inGracefulShutdown) { return $this->didReceiveTerminalSignal($signo); } $this->inGracefulShutdown = true; foreach ($this->getDaemonHandles() as $daemon) { $daemon->didReceiveGracefulSignal($signo); } } public function didReceiveTerminalSignal($signo) { $this->err = 128 + $signo; if ($this->inAbruptShutdown) { exit($this->err); } $this->inAbruptShutdown = true; foreach ($this->getDaemonHandles() as $daemon) { $daemon->didReceiveTerminalSignal($signo); } } private function getDaemonHandles() { return ipull($this->daemons, 'handle'); } /** * Identify running daemons by examining the process table. This isn't * completely reliable, but can be used as a fallback if the pid files fail * or we end up with stray daemons by other means. * * Example output (array keys are process IDs): * * array( * 12345 => array( * 'type' => 'overseer', * 'command' => 'php launch_daemon.php --daemonize ...', * 'pid' => 12345, * ), * 12346 => array( * 'type' => 'daemon', * 'command' => 'php exec_daemon.php ...', * 'pid' => 12346, * ), * ); * * @return dict Map of PIDs to process information, identifying running * daemon processes. */ public static function findRunningDaemons() { $results = array(); list($err, $processes) = exec_manual('ps -o pid,command -a -x -w -w -w'); if ($err) { return $results; } $processes = array_filter(explode("\n", trim($processes))); foreach ($processes as $process) { list($pid, $command) = preg_split('/\s+/', trim($process), 2); $pattern = '/((launch|exec)_daemon.php|phd-daemon)/'; $matches = null; if (!preg_match($pattern, $command, $matches)) { continue; } switch ($matches[1]) { case 'exec_daemon.php': $type = 'daemon'; break; case 'launch_daemon.php': case 'phd-daemon': default: $type = 'overseer'; break; } $results[(int)$pid] = array( 'type' => $type, 'command' => $command, 'pid' => (int) $pid, ); } return $results; } private function updatePidfile() { if (!$this->piddir) { return; } $daemons = array(); foreach ($this->daemons as $daemon) { $handle = $daemon['handle']; $config = $daemon['config']; if (!$handle->isRunning()) { continue; } $daemons[] = array( 'pid' => $handle->getPID(), 'id' => $handle->getDaemonID(), 'config' => $config, ); } $pidfile = array( 'pid' => getmypid(), 'start' => $this->startEpoch, 'config' => $this->config, 'daemons' => $daemons, ); if ($pidfile !== $this->lastPidfile) { $this->lastPidfile = $pidfile; $pidfile_path = $this->piddir.'/daemon.'.getmypid(); Filesystem::writeFile($pidfile_path, json_encode($pidfile)); } } }