diff --git a/scripts/daemon/exec/exec_daemon.php b/scripts/daemon/exec/exec_daemon.php --- a/scripts/daemon/exec/exec_daemon.php +++ b/scripts/daemon/exec/exec_daemon.php @@ -60,6 +60,7 @@ 'log' => 'optional string|null', 'argv' => 'optional list', 'load' => 'optional list', + 'autoscale' => 'optional wild', )); $log = idx($config, 'log'); @@ -110,4 +111,9 @@ $daemon->setVerbose(true); } +$autoscale = idx($config, 'autoscale'); +if ($autoscale) { + $daemon->setAutoscaleProperties($autoscale); +} + $daemon->execute(); diff --git a/src/__phutil_library_map__.php b/src/__phutil_library_map__.php --- a/src/__phutil_library_map__.php +++ b/src/__phutil_library_map__.php @@ -183,6 +183,7 @@ 'PhutilHangForeverDaemon' => 'daemon/torture/PhutilHangForeverDaemon.php', 'PhutilHelpArgumentWorkflow' => 'parser/argument/workflow/PhutilHelpArgumentWorkflow.php', 'PhutilHgsprintfTestCase' => 'xsprintf/__tests__/PhutilHgsprintfTestCase.php', + 'PhutilHighIntensityIntervalDaemon' => 'daemon/torture/PhutilHighIntensityIntervalDaemon.php', 'PhutilINIParserException' => 'parser/exception/PhutilINIParserException.php', 'PhutilIPAddress' => 'ip/PhutilIPAddress.php', 'PhutilIPAddressTestCase' => 'ip/__tests__/PhutilIPAddressTestCase.php', @@ -614,6 +615,7 @@ 'PhutilHangForeverDaemon' => 'PhutilTortureTestDaemon', 'PhutilHelpArgumentWorkflow' => 'PhutilArgumentWorkflow', 'PhutilHgsprintfTestCase' => 'PhutilTestCase', + 'PhutilHighIntensityIntervalDaemon' => 'PhutilTortureTestDaemon', 'PhutilINIParserException' => 'Exception', 'PhutilIPAddress' => 'Phobject', 'PhutilIPAddressTestCase' => 'PhutilTestCase', diff --git a/src/daemon/PhutilDaemon.php b/src/daemon/PhutilDaemon.php --- a/src/daemon/PhutilDaemon.php +++ b/src/daemon/PhutilDaemon.php @@ -4,7 +4,41 @@ * Scaffolding for implementing robust background processing scripts. * * + * Autoscaling + * =========== + * + * Autoscaling automatically launches copies of a daemon when it is busy + * (scaling the pool up) and stops them when they're idle (scaling the pool + * down). This is appropriate for daemons which perform highly parallelizable + * work. + * + * To make a daemon support autoscaling, the implementation should look + * something like this: + * + * while (!$this->shouldExit()) { + * if (work_available()) { + * $this->willBeginWork(); + * do_work(); + * $this->sleep(0); + * } else { + * $this->willBeginIdle(); + * $this->sleep(1); + * } + * } + * + * In particular, call @{method:willBeginWork} before becoming busy, and + * @{method:willBeginIdle} when no work is available. If the daemon is launched + * into an autoscale pool, this will cause the pool to automatically scale up + * when busy and down when idle. + * + * See @{class:PhutilHighIntensityIntervalDaemon} for an example of a simple + * autoscaling daemon. + * + * Launching a daemon which does not make these callbacks into an autoscale + * pool will have no effect. + * * @task overseer Communicating With the Overseer + * @task autoscale Autoscaling Daemon Pools * * @stable */ @@ -12,6 +46,12 @@ const MESSAGETYPE_STDOUT = 'stdout'; const MESSAGETYPE_HEARTBEAT = 'heartbeat'; + const MESSAGETYPE_BUSY = 'busy'; + const MESSAGETYPE_IDLE = 'idle'; + const MESSAGETYPE_DOWN = 'down'; + + const WORKSTATE_BUSY = 'busy'; + const WORKSTATE_IDLE = 'idle'; private $argv; private $traceMode; @@ -19,6 +59,9 @@ private $verbose; private $notifyReceived; private $inGracefulShutdown; + private $workState = null; + private $idleSince = null; + private $autoscaleProperties = array(); final public function setVerbose($verbose) { $this->verbose = $verbose; @@ -72,11 +115,46 @@ $this->notifyReceived = false; $this->willSleep($duration); $this->stillWorking(); + + $is_autoscale = $this->isClonedAutoscaleDaemon(); + $scale_down = $this->getAutoscaleDownDuration(); + + $max_sleep = 60; + if ($is_autoscale) { + $max_sleep = min($max_sleep, $scale_down); + } + + if ($is_autoscale) { + if ($this->workState == self::WORKSTATE_IDLE) { + $dur = (time() - $this->idleSince); + $this->log(pht('Idle for %s seconds.', $dur)); + } + } + while ($duration > 0 && !$this->notifyReceived && !$this->shouldExit()) { - sleep(min($duration, 60)); - $duration -= 60; + + // If this is an autoscaling clone and we've been idle for too long, + // we're going to scale the pool down by exiting and not restarting. The + // DOWN message tells the overseer that we don't want to be restarted. + if ($is_autoscale) { + if ($this->workState == self::WORKSTATE_IDLE) { + if ($this->idleSince && ($this->idleSince + $scale_down < time())) { + $this->inGracefulShutdown = true; + $this->emitOverseerMessage(self::MESSAGETYPE_DOWN, null); + $this->log( + pht( + 'Daemon was idle for more than %s seconds, scaling pool '. + 'down.', + $scale_down)); + break; + } + } + } + + sleep(min($duration, $max_sleep)); + $duration -= $max_sleep; $this->stillWorking(); } } @@ -205,4 +283,109 @@ } } + +/* -( Autoscaling )-------------------------------------------------------- */ + + + /** + * Prepare to become busy. This may autoscale the pool up. + * + * This notifies the overseer that the daemon has become busy. If daemons + * that are part of an autoscale pool are continuously busy for a prolonged + * period of time, the overseer may scale up the pool. + * + * @return this + * @task autoscale + */ + protected function willBeginWork() { + if ($this->workState != self::WORKSTATE_BUSY) { + $this->workState = self::WORKSTATE_BUSY; + $this->idleSince = null; + $this->emitOverseerMessage(self::MESSAGETYPE_BUSY, null); + } + + return $this; + } + + + /** + * Prepare to idle. This may autoscale the pool down. + * + * This notifies the overseer that the daemon is no longer busy. If daemons + * that are part of an autoscale pool are idle for a prolonged period of time, + * they may exit to scale the pool down. + * + * @return this + * @task autoscale + */ + protected function willBeginIdle() { + if ($this->workState != self::WORKSTATE_IDLE) { + $this->workState = self::WORKSTATE_IDLE; + $this->idleSince = time(); + $this->emitOverseerMessage(self::MESSAGETYPE_IDLE, null); + } + + return $this; + } + + + /** + * Determine if this is a clone or the original daemon. + * + * @return bool True if this is an cloned autoscaling daemon. + * @task autoscale + */ + private function isClonedAutoscaleDaemon() { + return (bool)$this->getAutoscaleProperty('clone', false); + } + + + /** + * Get the duration (in seconds) which a daemon must be continuously idle + * for before it should exit to scale the pool down. + * + * @return int Duration, in seconds. + * @task autoscale + */ + private function getAutoscaleDownDuration() { + return $this->getAutoscaleProperty('down', 15); + } + + + /** + * Configure autoscaling for this daemon. + * + * @param map Map of autoscale properties. + * @return this + * @task autoscale + */ + public function setAutoscaleProperties(array $autoscale_properties) { + PhutilTypeSpec::checkMap( + $autoscale_properties, + array( + 'group' => 'optional string', + 'up' => 'optional int', + 'down' => 'optional int', + 'pool' => 'optional int', + 'clone' => 'optional bool', + )); + + $this->autoscaleProperties = $autoscale_properties; + + return $this; + } + + + /** + * Read autoscaling configuration for this daemon. + * + * @param string Property to read. + * @param wild Default value to return if the property is not set. + * @return wild Property value, or `$default` if one is not set. + * @task autoscale + */ + private function getAutoscaleProperty($key, $default = null) { + return idx($this->autoscaleProperties, $key, $default); + } + } diff --git a/src/daemon/PhutilDaemonHandle.php b/src/daemon/PhutilDaemonHandle.php --- a/src/daemon/PhutilDaemonHandle.php +++ b/src/daemon/PhutilDaemonHandle.php @@ -18,6 +18,7 @@ private $stdoutBuffer; private $restartAt; private $silent; + private $shouldRestart = true; private $shouldShutdown; private $future; private $traceMemory; @@ -47,6 +48,10 @@ return (bool)$this->future; } + public function isDone() { + return (!$this->shouldRestart && !$this->isRunning()); + } + public function getFuture() { return $this->future; } @@ -73,6 +78,9 @@ $this->updateMemory(); if (!$this->isRunning()) { + if (!$this->shouldRestart) { + return; + } if (!$this->restartAt || (time() < $this->restartAt)) { return; } @@ -314,6 +322,18 @@ case PhutilDaemon::MESSAGETYPE_HEARTBEAT: $this->deadline = time() + $this->getRequiredHeartbeatFrequency(); break; + case PhutilDaemon::MESSAGETYPE_BUSY: + $this->overseer->didBeginWork($this); + break; + case PhutilDaemon::MESSAGETYPE_IDLE: + $this->overseer->didBeginIdle($this); + break; + case PhutilDaemon::MESSAGETYPE_DOWN: + // The daemon is exiting because it doesn't have enough work and it + // is trying to scale the pool down. We should not restart it. + $this->shouldRestart = false; + $this->shouldShutdown = true; + break; default: // If we can't parse this or it isn't a message we understand, just // emit the raw message. diff --git a/src/daemon/PhutilDaemonOverseer.php b/src/daemon/PhutilDaemonOverseer.php --- a/src/daemon/PhutilDaemonOverseer.php +++ b/src/daemon/PhutilDaemonOverseer.php @@ -23,6 +23,7 @@ private $err = 0; private $lastPidfile; private $startEpoch; + private $autoscale = array(); public function __construct(array $argv) { PhutilServiceProfiler::getInstance()->enableDiscardMode(); @@ -157,6 +158,11 @@ $this->daemons = array(); foreach ($this->config['daemons'] as $config) { + $config += array( + 'argv' => array(), + 'autoscale' => array(), + ); + $daemon = new PhutilDaemonHandle( $this, $config['class'], @@ -165,15 +171,13 @@ 'log' => $this->log, 'argv' => $config['argv'], 'load' => $this->libraries, + 'autoscale' => $config['autoscale'], )); $daemon->setSilent((!$this->traceMode && !$this->verbose)); $daemon->setTraceMemory($this->traceMemory); - $this->daemons[] = array( - 'config' => $config, - 'handle' => $daemon, - ); + $this->addDaemon($daemon, $config); } while (true) { @@ -183,9 +187,14 @@ if ($daemon->isRunning()) { $futures[] = $daemon->getFuture(); } + + if ($daemon->isDone()) { + $this->removeDaemon($daemon); + } } $this->updatePidfile(); + $this->updateAutoscale(); if ($futures) { $iter = id(new FutureIterator($futures)) @@ -204,6 +213,111 @@ exit($this->err); } + private function addDaemon(PhutilDaemonHandle $daemon, array $config) { + $id = $daemon->getDaemonID(); + $this->daemons[$id] = array( + 'handle' => $daemon, + 'config' => $config, + ); + + $autoscale_group = $this->getAutoscaleGroup($daemon); + if ($autoscale_group) { + $this->autoscale[$autoscale_group][$id] = true; + } + + return $this; + } + + private function removeDaemon(PhutilDaemonHandle $daemon) { + $id = $daemon->getDaemonID(); + + $autoscale_group = $this->getAutoscaleGroup($daemon); + if ($autoscale_group) { + unset($this->autoscale[$autoscale_group][$id]); + } + + unset($this->daemons[$id]); + + return $this; + } + + private function getAutoscaleGroup(PhutilDaemonHandle $daemon) { + return $this->getAutoscaleProperty($daemon, 'group'); + } + + private function getAutoscaleProperty( + PhutilDaemonHandle $daemon, + $key, + $default = null) { + + $id = $daemon->getDaemonID(); + $autoscale = $this->daemons[$id]['config']['autoscale']; + return idx($autoscale, $key, $default); + } + + public function didBeginWork(PhutilDaemonHandle $daemon) { + $id = $daemon->getDaemonID(); + $busy = idx($this->daemons[$daemon->getDaemonID()], 'busy'); + if (!$busy) { + $this->daemons[$id]['busy'] = time(); + } + } + + public function didBeginIdle(PhutilDaemonHandle $daemon) { + $id = $daemon->getDaemonID(); + unset($this->daemons[$id]['busy']); + } + + public function updateAutoscale() { + foreach ($this->autoscale as $group => $daemons) { + $daemon = $this->daemons[head_key($daemons)]['handle']; + $scaleup_duration = $this->getAutoscaleProperty($daemon, 'up', 2); + $max_pool_size = $this->getAutoscaleProperty($daemon, 'pool', 8); + + // Don't scale a group if it is already at the maximum pool size. + if (count($daemons) >= $max_pool_size) { + continue; + } + + $should_scale = true; + foreach ($daemons as $daemon_id => $ignored) { + $busy = idx($this->daemons[$daemon_id], 'busy'); + if (!$busy) { + // At least one daemon in the group hasn't reported that it has + // started work. + $should_scale = false; + break; + } + + if ((time() - $busy) < $scaleup_duration) { + // At least one daemon in the group was idle recently, so we have + // not fullly + $should_scale = false; + break; + } + } + + if ($should_scale) { + $config = $this->daemons[$daemon_id]['config']; + + $config['autoscale']['clone'] = true; + + $clone = new PhutilDaemonHandle( + $this, + $config['class'], + $this->argv, + array( + 'log' => $this->log, + 'argv' => $config['argv'], + 'load' => $this->libraries, + 'autoscale' => $config['autoscale'], + )); + + $this->addDaemon($clone, $config); + } + } + } + public function didReceiveNotifySignal($signo) { foreach ($this->getDaemonHandles() as $daemon) { $daemon->didReceiveNotifySignal($signo); diff --git a/src/daemon/torture/PhutilHighIntensityIntervalDaemon.php b/src/daemon/torture/PhutilHighIntensityIntervalDaemon.php new file mode 100644 --- /dev/null +++ b/src/daemon/torture/PhutilHighIntensityIntervalDaemon.php @@ -0,0 +1,24 @@ +shouldExit()) { + $m = (int)date('i'); + if ($m % 2) { + $this->willBeginWork(); + $this->log('Busy.'); + } else { + $this->willBeginIdle(); + $this->log('Idle.'); + } + $this->sleep(1); + } + } + +}