Changeset View
Changeset View
Standalone View
Standalone View
src/infrastructure/daemon/PhutilDaemon.php
- This file was added.
| <?php | |||||
| /** | |||||
| * Scaffolding for implementing robust background processing scripts. | |||||
| * | |||||
| * | |||||
| * Autoscaling | |||||
| * =========== | |||||
| * | |||||
| * Autoscaling automatically launches copies of a daemon when it is busy | |||||
| * (scaling the pool up) and stops them when they're idle (scaling the pool | |||||
| * down). This is appropriate for daemons which perform highly parallelizable | |||||
| * work. | |||||
| * | |||||
| * To make a daemon support autoscaling, the implementation should look | |||||
| * something like this: | |||||
| * | |||||
| * while (!$this->shouldExit()) { | |||||
| * if (work_available()) { | |||||
| * $this->willBeginWork(); | |||||
| * do_work(); | |||||
| * $this->sleep(0); | |||||
| * } else { | |||||
| * $this->willBeginIdle(); | |||||
| * $this->sleep(1); | |||||
| * } | |||||
| * } | |||||
| * | |||||
| * In particular, call @{method:willBeginWork} before becoming busy, and | |||||
| * @{method:willBeginIdle} when no work is available. If the daemon is launched | |||||
| * into an autoscale pool, this will cause the pool to automatically scale up | |||||
| * when busy and down when idle. | |||||
| * | |||||
| * See @{class:PhutilHighIntensityIntervalDaemon} for an example of a simple | |||||
| * autoscaling daemon. | |||||
| * | |||||
| * Launching a daemon which does not make these callbacks into an autoscale | |||||
| * pool will have no effect. | |||||
| * | |||||
| * @task overseer Communicating With the Overseer | |||||
| * @task autoscale Autoscaling Daemon Pools | |||||
| */ | |||||
| abstract class PhutilDaemon extends Phobject { | |||||
| const MESSAGETYPE_STDOUT = 'stdout'; | |||||
| const MESSAGETYPE_HEARTBEAT = 'heartbeat'; | |||||
| const MESSAGETYPE_BUSY = 'busy'; | |||||
| const MESSAGETYPE_IDLE = 'idle'; | |||||
| const MESSAGETYPE_DOWN = 'down'; | |||||
| const MESSAGETYPE_HIBERNATE = 'hibernate'; | |||||
| const WORKSTATE_BUSY = 'busy'; | |||||
| const WORKSTATE_IDLE = 'idle'; | |||||
| private $argv; | |||||
| private $traceMode; | |||||
| private $traceMemory; | |||||
| private $verbose; | |||||
| private $notifyReceived; | |||||
| private $inGracefulShutdown; | |||||
| private $workState = null; | |||||
| private $idleSince = null; | |||||
| private $scaledownDuration; | |||||
| final public function setVerbose($verbose) { | |||||
| $this->verbose = $verbose; | |||||
| return $this; | |||||
| } | |||||
| final public function getVerbose() { | |||||
| return $this->verbose; | |||||
| } | |||||
| final public function setScaledownDuration($scaledown_duration) { | |||||
| $this->scaledownDuration = $scaledown_duration; | |||||
| return $this; | |||||
| } | |||||
| final public function getScaledownDuration() { | |||||
| return $this->scaledownDuration; | |||||
| } | |||||
| final public function __construct(array $argv) { | |||||
| $this->argv = $argv; | |||||
| $router = PhutilSignalRouter::getRouter(); | |||||
| $handler_key = 'daemon.term'; | |||||
| if (!$router->getHandler($handler_key)) { | |||||
| $handler = new PhutilCallbackSignalHandler( | |||||
| SIGTERM, | |||||
| __CLASS__.'::onTermSignal'); | |||||
| $router->installHandler($handler_key, $handler); | |||||
| } | |||||
| pcntl_signal(SIGINT, array($this, 'onGracefulSignal')); | |||||
| pcntl_signal(SIGUSR2, array($this, 'onNotifySignal')); | |||||
| // Without discard mode, this consumes unbounded amounts of memory. Keep | |||||
| // memory bounded. | |||||
| PhutilServiceProfiler::getInstance()->enableDiscardMode(); | |||||
| $this->beginStdoutCapture(); | |||||
| } | |||||
| final public function __destruct() { | |||||
| $this->endStdoutCapture(); | |||||
| } | |||||
| final public function stillWorking() { | |||||
| $this->emitOverseerMessage(self::MESSAGETYPE_HEARTBEAT, null); | |||||
| if ($this->traceMemory) { | |||||
| $daemon = get_class($this); | |||||
| fprintf( | |||||
| STDERR, | |||||
| "%s %s %s\n", | |||||
| '<RAMS>', | |||||
| $daemon, | |||||
| pht( | |||||
| 'Memory Usage: %s KB', | |||||
| new PhutilNumber(memory_get_usage() / 1024, 1))); | |||||
| } | |||||
| } | |||||
| final public function shouldExit() { | |||||
| return $this->inGracefulShutdown; | |||||
| } | |||||
| final protected function shouldHibernate($duration) { | |||||
| // Don't hibernate if we don't have very long to sleep. | |||||
| if ($duration < 30) { | |||||
| return false; | |||||
| } | |||||
| // Never hibernate if we're part of a pool and could scale down instead. | |||||
| // We only hibernate the last process to drop the pool size to zero. | |||||
| if ($this->getScaledownDuration()) { | |||||
| return false; | |||||
| } | |||||
| // Don't hibernate for too long. | |||||
| $duration = min($duration, phutil_units('3 minutes in seconds')); | |||||
| $this->emitOverseerMessage( | |||||
| self::MESSAGETYPE_HIBERNATE, | |||||
| array( | |||||
| 'duration' => $duration, | |||||
| )); | |||||
| $this->log( | |||||
| pht( | |||||
| 'Preparing to hibernate for %s second(s).', | |||||
| new PhutilNumber($duration))); | |||||
| return true; | |||||
| } | |||||
| final protected function sleep($duration) { | |||||
| $this->notifyReceived = false; | |||||
| $this->willSleep($duration); | |||||
| $this->stillWorking(); | |||||
| $scale_down = $this->getScaledownDuration(); | |||||
| $max_sleep = 60; | |||||
| if ($scale_down) { | |||||
| $max_sleep = min($max_sleep, $scale_down); | |||||
| } | |||||
| if ($scale_down) { | |||||
| if ($this->workState == self::WORKSTATE_IDLE) { | |||||
| $dur = $this->getIdleDuration(); | |||||
| $this->log(pht('Idle for %s seconds.', $dur)); | |||||
| } | |||||
| } | |||||
| while ($duration > 0 && | |||||
| !$this->notifyReceived && | |||||
| !$this->shouldExit()) { | |||||
| // If this is an autoscaling clone and we've been idle for too long, | |||||
| // we're going to scale the pool down by exiting and not restarting. The | |||||
| // DOWN message tells the overseer that we don't want to be restarted. | |||||
| if ($scale_down) { | |||||
| if ($this->workState == self::WORKSTATE_IDLE) { | |||||
| if ($this->idleSince && ($this->idleSince + $scale_down < time())) { | |||||
| $this->inGracefulShutdown = true; | |||||
| $this->emitOverseerMessage(self::MESSAGETYPE_DOWN, null); | |||||
| $this->log( | |||||
| pht( | |||||
| 'Daemon was idle for more than %s second(s), '. | |||||
| 'scaling pool down.', | |||||
| new PhutilNumber($scale_down))); | |||||
| break; | |||||
| } | |||||
| } | |||||
| } | |||||
| sleep(min($duration, $max_sleep)); | |||||
| $duration -= $max_sleep; | |||||
| $this->stillWorking(); | |||||
| } | |||||
| } | |||||
| protected function willSleep($duration) { | |||||
| return; | |||||
| } | |||||
| public static function onTermSignal($signo) { | |||||
| self::didCatchSignal($signo); | |||||
| } | |||||
| final protected function getArgv() { | |||||
| return $this->argv; | |||||
| } | |||||
| final public function execute() { | |||||
| $this->willRun(); | |||||
| $this->run(); | |||||
| } | |||||
| abstract protected function run(); | |||||
| final public function setTraceMemory() { | |||||
| $this->traceMemory = true; | |||||
| return $this; | |||||
| } | |||||
| final public function getTraceMemory() { | |||||
| return $this->traceMemory; | |||||
| } | |||||
| final public function setTraceMode() { | |||||
| $this->traceMode = true; | |||||
| PhutilServiceProfiler::installEchoListener(); | |||||
| PhutilConsole::getConsole()->getServer()->setEnableLog(true); | |||||
| $this->didSetTraceMode(); | |||||
| return $this; | |||||
| } | |||||
| final public function getTraceMode() { | |||||
| return $this->traceMode; | |||||
| } | |||||
| final public function onGracefulSignal($signo) { | |||||
| self::didCatchSignal($signo); | |||||
| $this->inGracefulShutdown = true; | |||||
| } | |||||
| final public function onNotifySignal($signo) { | |||||
| self::didCatchSignal($signo); | |||||
| $this->notifyReceived = true; | |||||
| $this->onNotify($signo); | |||||
| } | |||||
| protected function onNotify($signo) { | |||||
| // This is a hook for subclasses. | |||||
| } | |||||
| protected function willRun() { | |||||
| // This is a hook for subclasses. | |||||
| } | |||||
| protected function didSetTraceMode() { | |||||
| // This is a hook for subclasses. | |||||
| } | |||||
| final protected function log($message) { | |||||
| if ($this->verbose) { | |||||
| $daemon = get_class($this); | |||||
| fprintf(STDERR, "%s %s %s\n", '<VERB>', $daemon, $message); | |||||
| } | |||||
| } | |||||
| private static function didCatchSignal($signo) { | |||||
| $signame = phutil_get_signal_name($signo); | |||||
| fprintf( | |||||
| STDERR, | |||||
| "%s Caught signal %s (%s).\n", | |||||
| '<SGNL>', | |||||
| $signo, | |||||
| $signame); | |||||
| } | |||||
| /* -( Communicating With the Overseer )------------------------------------ */ | |||||
| private function beginStdoutCapture() { | |||||
| ob_start(array($this, 'didReceiveStdout'), 2); | |||||
| } | |||||
| private function endStdoutCapture() { | |||||
| ob_end_flush(); | |||||
| } | |||||
| public function didReceiveStdout($data) { | |||||
| if (!strlen($data)) { | |||||
| return ''; | |||||
| } | |||||
| return $this->encodeOverseerMessage(self::MESSAGETYPE_STDOUT, $data); | |||||
| } | |||||
| private function encodeOverseerMessage($type, $data) { | |||||
| $structure = array($type); | |||||
| if ($data !== null) { | |||||
| $structure[] = $data; | |||||
| } | |||||
| return json_encode($structure)."\n"; | |||||
| } | |||||
| private function emitOverseerMessage($type, $data) { | |||||
| $this->endStdoutCapture(); | |||||
| echo $this->encodeOverseerMessage($type, $data); | |||||
| $this->beginStdoutCapture(); | |||||
| } | |||||
| public static function errorListener($event, $value, array $metadata) { | |||||
| // If the caller has redirected the error log to a file, PHP won't output | |||||
| // messages to stderr, so the overseer can't capture them. Install a | |||||
| // listener which just echoes errors to stderr, so the overseer is always | |||||
| // aware of errors. | |||||
| $console = PhutilConsole::getConsole(); | |||||
| $message = idx($metadata, 'default_message'); | |||||
| if ($message) { | |||||
| $console->writeErr("%s\n", $message); | |||||
| } | |||||
| if (idx($metadata, 'trace')) { | |||||
| $trace = PhutilErrorHandler::formatStacktrace($metadata['trace']); | |||||
| $console->writeErr("%s\n", $trace); | |||||
| } | |||||
| } | |||||
| /* -( Autoscaling )-------------------------------------------------------- */ | |||||
| /** | |||||
| * Prepare to become busy. This may autoscale the pool up. | |||||
| * | |||||
| * This notifies the overseer that the daemon has become busy. If daemons | |||||
| * that are part of an autoscale pool are continuously busy for a prolonged | |||||
| * period of time, the overseer may scale up the pool. | |||||
| * | |||||
| * @return this | |||||
| * @task autoscale | |||||
| */ | |||||
| protected function willBeginWork() { | |||||
| if ($this->workState != self::WORKSTATE_BUSY) { | |||||
| $this->workState = self::WORKSTATE_BUSY; | |||||
| $this->idleSince = null; | |||||
| $this->emitOverseerMessage(self::MESSAGETYPE_BUSY, null); | |||||
| } | |||||
| return $this; | |||||
| } | |||||
| /** | |||||
| * Prepare to idle. This may autoscale the pool down. | |||||
| * | |||||
| * This notifies the overseer that the daemon is no longer busy. If daemons | |||||
| * that are part of an autoscale pool are idle for a prolonged period of | |||||
| * time, they may exit to scale the pool down. | |||||
| * | |||||
| * @return this | |||||
| * @task autoscale | |||||
| */ | |||||
| protected function willBeginIdle() { | |||||
| if ($this->workState != self::WORKSTATE_IDLE) { | |||||
| $this->workState = self::WORKSTATE_IDLE; | |||||
| $this->idleSince = time(); | |||||
| $this->emitOverseerMessage(self::MESSAGETYPE_IDLE, null); | |||||
| } | |||||
| return $this; | |||||
| } | |||||
| protected function getIdleDuration() { | |||||
| if (!$this->idleSince) { | |||||
| return null; | |||||
| } | |||||
| $now = time(); | |||||
| return ($now - $this->idleSince); | |||||
| } | |||||
| } | |||||