diff --git a/src/daemon/PhutilDaemon.php b/src/daemon/PhutilDaemon.php index 5856cf3..701b3d7 100644 --- a/src/daemon/PhutilDaemon.php +++ b/src/daemon/PhutilDaemon.php @@ -1,383 +1,393 @@ shouldExit()) { * if (work_available()) { * $this->willBeginWork(); * do_work(); * $this->sleep(0); * } else { * $this->willBeginIdle(); * $this->sleep(1); * } * } * * In particular, call @{method:willBeginWork} before becoming busy, and * @{method:willBeginIdle} when no work is available. If the daemon is launched * into an autoscale pool, this will cause the pool to automatically scale up * when busy and down when idle. * * See @{class:PhutilHighIntensityIntervalDaemon} for an example of a simple * autoscaling daemon. * * Launching a daemon which does not make these callbacks into an autoscale * pool will have no effect. * * @task overseer Communicating With the Overseer * @task autoscale Autoscaling Daemon Pools */ abstract class PhutilDaemon extends Phobject { const MESSAGETYPE_STDOUT = 'stdout'; const MESSAGETYPE_HEARTBEAT = 'heartbeat'; const MESSAGETYPE_BUSY = 'busy'; const MESSAGETYPE_IDLE = 'idle'; const MESSAGETYPE_DOWN = 'down'; const MESSAGETYPE_HIBERNATE = 'hibernate'; const WORKSTATE_BUSY = 'busy'; const WORKSTATE_IDLE = 'idle'; private $argv; private $traceMode; private $traceMemory; private $verbose; private $notifyReceived; private $inGracefulShutdown; private $workState = null; private $idleSince = null; private $scaledownDuration; final public function setVerbose($verbose) { $this->verbose = $verbose; return $this; } final public function getVerbose() { return $this->verbose; } final public function setScaledownDuration($scaledown_duration) { $this->scaledownDuration = $scaledown_duration; return $this; } final public function getScaledownDuration() { return $this->scaledownDuration; } final public function __construct(array $argv) { $this->argv = $argv; $router = PhutilSignalRouter::getRouter(); $handler_key = 'daemon.term'; if (!$router->getHandler($handler_key)) { $handler = new PhutilCallbackSignalHandler( SIGTERM, __CLASS__.'::onTermSignal'); $router->installHandler($handler_key, $handler); } pcntl_signal(SIGINT, array($this, 'onGracefulSignal')); pcntl_signal(SIGUSR2, array($this, 'onNotifySignal')); // Without discard mode, this consumes unbounded amounts of memory. Keep // memory bounded. PhutilServiceProfiler::getInstance()->enableDiscardMode(); $this->beginStdoutCapture(); } final public function __destruct() { $this->endStdoutCapture(); } final public function stillWorking() { $this->emitOverseerMessage(self::MESSAGETYPE_HEARTBEAT, null); if ($this->traceMemory) { $daemon = get_class($this); fprintf( STDERR, "%s %s %s\n", '', $daemon, pht( 'Memory Usage: %s KB', new PhutilNumber(memory_get_usage() / 1024, 1))); } } final public function shouldExit() { return $this->inGracefulShutdown; } final protected function shouldHibernate($duration) { // Don't hibernate if we don't have very long to sleep. if ($duration < 30) { return false; } // Never hibernate if we're part of a pool and could scale down instead. // We only hibernate the last process to drop the pool size to zero. if ($this->getScaledownDuration()) { return false; } // Don't hibernate for too long. $duration = min($duration, phutil_units('3 minutes in seconds')); $this->emitOverseerMessage( self::MESSAGETYPE_HIBERNATE, array( 'duration' => $duration, )); $this->log( pht( 'Preparing to hibernate for %s second(s).', new PhutilNumber($duration))); return true; } final protected function sleep($duration) { $this->notifyReceived = false; $this->willSleep($duration); $this->stillWorking(); $scale_down = $this->getScaledownDuration(); $max_sleep = 60; if ($scale_down) { $max_sleep = min($max_sleep, $scale_down); } if ($scale_down) { if ($this->workState == self::WORKSTATE_IDLE) { - $dur = (time() - $this->idleSince); + $dur = $this->getIdleDuration(); $this->log(pht('Idle for %s seconds.', $dur)); } } while ($duration > 0 && !$this->notifyReceived && !$this->shouldExit()) { // If this is an autoscaling clone and we've been idle for too long, // we're going to scale the pool down by exiting and not restarting. The // DOWN message tells the overseer that we don't want to be restarted. if ($scale_down) { if ($this->workState == self::WORKSTATE_IDLE) { if ($this->idleSince && ($this->idleSince + $scale_down < time())) { $this->inGracefulShutdown = true; $this->emitOverseerMessage(self::MESSAGETYPE_DOWN, null); $this->log( pht( 'Daemon was idle for more than %s second(s), '. 'scaling pool down.', new PhutilNumber($scale_down))); break; } } } sleep(min($duration, $max_sleep)); $duration -= $max_sleep; $this->stillWorking(); } } protected function willSleep($duration) { return; } public static function onTermSignal($signo) { self::didCatchSignal($signo); } final protected function getArgv() { return $this->argv; } final public function execute() { $this->willRun(); $this->run(); } abstract protected function run(); final public function setTraceMemory() { $this->traceMemory = true; return $this; } final public function getTraceMemory() { return $this->traceMemory; } final public function setTraceMode() { $this->traceMode = true; PhutilServiceProfiler::installEchoListener(); PhutilConsole::getConsole()->getServer()->setEnableLog(true); $this->didSetTraceMode(); return $this; } final public function getTraceMode() { return $this->traceMode; } final public function onGracefulSignal($signo) { self::didCatchSignal($signo); $this->inGracefulShutdown = true; } final public function onNotifySignal($signo) { self::didCatchSignal($signo); $this->notifyReceived = true; $this->onNotify($signo); } protected function onNotify($signo) { // This is a hook for subclasses. } protected function willRun() { // This is a hook for subclasses. } protected function didSetTraceMode() { // This is a hook for subclasses. } final protected function log($message) { if ($this->verbose) { $daemon = get_class($this); fprintf(STDERR, "%s %s %s\n", '', $daemon, $message); } } private static function didCatchSignal($signo) { $signame = phutil_get_signal_name($signo); fprintf( STDERR, "%s Caught signal %s (%s).\n", '', $signo, $signame); } /* -( Communicating With the Overseer )------------------------------------ */ private function beginStdoutCapture() { ob_start(array($this, 'didReceiveStdout'), 2); } private function endStdoutCapture() { ob_end_flush(); } public function didReceiveStdout($data) { if (!strlen($data)) { return ''; } return $this->encodeOverseerMessage(self::MESSAGETYPE_STDOUT, $data); } private function encodeOverseerMessage($type, $data) { $structure = array($type); if ($data !== null) { $structure[] = $data; } return json_encode($structure)."\n"; } private function emitOverseerMessage($type, $data) { $this->endStdoutCapture(); echo $this->encodeOverseerMessage($type, $data); $this->beginStdoutCapture(); } public static function errorListener($event, $value, array $metadata) { // If the caller has redirected the error log to a file, PHP won't output // messages to stderr, so the overseer can't capture them. Install a // listener which just echoes errors to stderr, so the overseer is always // aware of errors. $console = PhutilConsole::getConsole(); $message = idx($metadata, 'default_message'); if ($message) { $console->writeErr("%s\n", $message); } if (idx($metadata, 'trace')) { $trace = PhutilErrorHandler::formatStacktrace($metadata['trace']); $console->writeErr("%s\n", $trace); } } /* -( Autoscaling )-------------------------------------------------------- */ /** * Prepare to become busy. This may autoscale the pool up. * * This notifies the overseer that the daemon has become busy. If daemons * that are part of an autoscale pool are continuously busy for a prolonged * period of time, the overseer may scale up the pool. * * @return this * @task autoscale */ protected function willBeginWork() { if ($this->workState != self::WORKSTATE_BUSY) { $this->workState = self::WORKSTATE_BUSY; $this->idleSince = null; $this->emitOverseerMessage(self::MESSAGETYPE_BUSY, null); } return $this; } /** * Prepare to idle. This may autoscale the pool down. * * This notifies the overseer that the daemon is no longer busy. If daemons * that are part of an autoscale pool are idle for a prolonged period of * time, they may exit to scale the pool down. * * @return this * @task autoscale */ protected function willBeginIdle() { if ($this->workState != self::WORKSTATE_IDLE) { $this->workState = self::WORKSTATE_IDLE; $this->idleSince = time(); $this->emitOverseerMessage(self::MESSAGETYPE_IDLE, null); } return $this; } + + protected function getIdleDuration() { + if (!$this->idleSince) { + return null; + } + + $now = time(); + return ($now - $this->idleSince); + } + }