Page MenuHomePhabricator

D11859.id28570.diff
No OneTemporary

D11859.id28570.diff

diff --git a/scripts/daemon/exec/exec_daemon.php b/scripts/daemon/exec/exec_daemon.php
--- a/scripts/daemon/exec/exec_daemon.php
+++ b/scripts/daemon/exec/exec_daemon.php
@@ -60,6 +60,7 @@
'log' => 'optional string|null',
'argv' => 'optional list<wild>',
'load' => 'optional list<string>',
+ 'autoscale' => 'optional wild',
));
$log = idx($config, 'log');
@@ -110,4 +111,9 @@
$daemon->setVerbose(true);
}
+$autoscale = idx($config, 'autoscale');
+if ($autoscale) {
+ $daemon->setAutoscaleProperties($autoscale);
+}
+
$daemon->execute();
diff --git a/src/__phutil_library_map__.php b/src/__phutil_library_map__.php
--- a/src/__phutil_library_map__.php
+++ b/src/__phutil_library_map__.php
@@ -183,6 +183,7 @@
'PhutilHangForeverDaemon' => 'daemon/torture/PhutilHangForeverDaemon.php',
'PhutilHelpArgumentWorkflow' => 'parser/argument/workflow/PhutilHelpArgumentWorkflow.php',
'PhutilHgsprintfTestCase' => 'xsprintf/__tests__/PhutilHgsprintfTestCase.php',
+ 'PhutilHighIntensityIntervalDaemon' => 'daemon/torture/PhutilHighIntensityIntervalDaemon.php',
'PhutilINIParserException' => 'parser/exception/PhutilINIParserException.php',
'PhutilIPAddress' => 'ip/PhutilIPAddress.php',
'PhutilIPAddressTestCase' => 'ip/__tests__/PhutilIPAddressTestCase.php',
@@ -614,6 +615,7 @@
'PhutilHangForeverDaemon' => 'PhutilTortureTestDaemon',
'PhutilHelpArgumentWorkflow' => 'PhutilArgumentWorkflow',
'PhutilHgsprintfTestCase' => 'PhutilTestCase',
+ 'PhutilHighIntensityIntervalDaemon' => 'PhutilTortureTestDaemon',
'PhutilINIParserException' => 'Exception',
'PhutilIPAddress' => 'Phobject',
'PhutilIPAddressTestCase' => 'PhutilTestCase',
diff --git a/src/daemon/PhutilDaemon.php b/src/daemon/PhutilDaemon.php
--- a/src/daemon/PhutilDaemon.php
+++ b/src/daemon/PhutilDaemon.php
@@ -4,7 +4,41 @@
* Scaffolding for implementing robust background processing scripts.
*
*
+ * Autoscaling
+ * ===========
+ *
+ * Autoscaling automatically launches copies of a daemon when it is busy
+ * (scaling the pool up) and stops them when they're idle (scaling the pool
+ * down). This is appropriate for daemons which perform highly parallelizable
+ * work.
+ *
+ * To make a daemon support autoscaling, the implementation should look
+ * something like this:
+ *
+ * while (!$this->shouldExit()) {
+ * if (work_available()) {
+ * $this->willBeginWork();
+ * do_work();
+ * $this->sleep(0);
+ * } else {
+ * $this->willBeginIdle();
+ * $this->sleep(1);
+ * }
+ * }
+ *
+ * In particular, call @{method:willBeginWork} before becoming busy, and
+ * @{method:willBeginIdle} when no work is available. If the daemon is launched
+ * into an autoscale pool, this will cause the pool to automatically scale up
+ * when busy and down when idle.
+ *
+ * See @{class:PhutilHighIntensityIntervalDaemon} for an example of a simple
+ * autoscaling daemon.
+ *
+ * Launching a daemon which does not make these callbacks into an autoscale
+ * pool will have no effect.
+ *
* @task overseer Communicating With the Overseer
+ * @task autoscale Autoscaling Daemon Pools
*
* @stable
*/
@@ -12,6 +46,12 @@
const MESSAGETYPE_STDOUT = 'stdout';
const MESSAGETYPE_HEARTBEAT = 'heartbeat';
+ const MESSAGETYPE_BUSY = 'busy';
+ const MESSAGETYPE_IDLE = 'idle';
+ const MESSAGETYPE_DOWN = 'down';
+
+ const WORKSTATE_BUSY = 'busy';
+ const WORKSTATE_IDLE = 'idle';
private $argv;
private $traceMode;
@@ -19,6 +59,9 @@
private $verbose;
private $notifyReceived;
private $inGracefulShutdown;
+ private $workState = null;
+ private $idleSince = null;
+ private $autoscaleProperties = array();
final public function setVerbose($verbose) {
$this->verbose = $verbose;
@@ -72,11 +115,46 @@
$this->notifyReceived = false;
$this->willSleep($duration);
$this->stillWorking();
+
+ $is_autoscale = $this->isClonedAutoscaleDaemon();
+ $scale_down = $this->getAutoscaleDownDuration();
+
+ $max_sleep = 60;
+ if ($is_autoscale) {
+ $max_sleep = min($max_sleep, $scale_down);
+ }
+
+ if ($is_autoscale) {
+ if ($this->workState == self::WORKSTATE_IDLE) {
+ $dur = (time() - $this->idleSince);
+ $this->log(pht('Idle for %s seconds.', $dur));
+ }
+ }
+
while ($duration > 0 &&
!$this->notifyReceived &&
!$this->shouldExit()) {
- sleep(min($duration, 60));
- $duration -= 60;
+
+ // If this is an autoscaling clone and we've been idle for too long,
+ // we're going to scale the pool down by exiting and not restarting. The
+ // DOWN message tells the overseer that we don't want to be restarted.
+ if ($is_autoscale) {
+ if ($this->workState == self::WORKSTATE_IDLE) {
+ if ($this->idleSince && ($this->idleSince + $scale_down < time())) {
+ $this->inGracefulShutdown = true;
+ $this->emitOverseerMessage(self::MESSAGETYPE_DOWN, null);
+ $this->log(
+ pht(
+ 'Daemon was idle for more than %s seconds, scaling pool '.
+ 'down.',
+ $scale_down));
+ break;
+ }
+ }
+ }
+
+ sleep(min($duration, $max_sleep));
+ $duration -= $max_sleep;
$this->stillWorking();
}
}
@@ -205,4 +283,109 @@
}
}
+
+/* -( Autoscaling )-------------------------------------------------------- */
+
+
+ /**
+ * Prepare to become busy. This may autoscale the pool up.
+ *
+ * This notifies the overseer that the daemon has become busy. If daemons
+ * that are part of an autoscale pool are continuously busy for a prolonged
+ * period of time, the overseer may scale up the pool.
+ *
+ * @return this
+ * @task autoscale
+ */
+ protected function willBeginWork() {
+ if ($this->workState != self::WORKSTATE_BUSY) {
+ $this->workState = self::WORKSTATE_BUSY;
+ $this->idleSince = null;
+ $this->emitOverseerMessage(self::MESSAGETYPE_BUSY, null);
+ }
+
+ return $this;
+ }
+
+
+ /**
+ * Prepare to idle. This may autoscale the pool down.
+ *
+ * This notifies the overseer that the daemon is no longer busy. If daemons
+ * that are part of an autoscale pool are idle for a prolonged period of time,
+ * they may exit to scale the pool down.
+ *
+ * @return this
+ * @task autoscale
+ */
+ protected function willBeginIdle() {
+ if ($this->workState != self::WORKSTATE_IDLE) {
+ $this->workState = self::WORKSTATE_IDLE;
+ $this->idleSince = time();
+ $this->emitOverseerMessage(self::MESSAGETYPE_IDLE, null);
+ }
+
+ return $this;
+ }
+
+
+ /**
+ * Determine if this is a clone or the original daemon.
+ *
+ * @return bool True if this is an cloned autoscaling daemon.
+ * @task autoscale
+ */
+ private function isClonedAutoscaleDaemon() {
+ return (bool)$this->getAutoscaleProperty('clone', false);
+ }
+
+
+ /**
+ * Get the duration (in seconds) which a daemon must be continuously idle
+ * for before it should exit to scale the pool down.
+ *
+ * @return int Duration, in seconds.
+ * @task autoscale
+ */
+ private function getAutoscaleDownDuration() {
+ return $this->getAutoscaleProperty('down', 15);
+ }
+
+
+ /**
+ * Configure autoscaling for this daemon.
+ *
+ * @param map<string, wild> Map of autoscale properties.
+ * @return this
+ * @task autoscale
+ */
+ public function setAutoscaleProperties(array $autoscale_properties) {
+ PhutilTypeSpec::checkMap(
+ $autoscale_properties,
+ array(
+ 'group' => 'optional string',
+ 'up' => 'optional int',
+ 'down' => 'optional int',
+ 'pool' => 'optional int',
+ 'clone' => 'optional bool',
+ ));
+
+ $this->autoscaleProperties = $autoscale_properties;
+
+ return $this;
+ }
+
+
+ /**
+ * Read autoscaling configuration for this daemon.
+ *
+ * @param string Property to read.
+ * @param wild Default value to return if the property is not set.
+ * @return wild Property value, or `$default` if one is not set.
+ * @task autoscale
+ */
+ private function getAutoscaleProperty($key, $default = null) {
+ return idx($this->autoscaleProperties, $key, $default);
+ }
+
}
diff --git a/src/daemon/PhutilDaemonHandle.php b/src/daemon/PhutilDaemonHandle.php
--- a/src/daemon/PhutilDaemonHandle.php
+++ b/src/daemon/PhutilDaemonHandle.php
@@ -18,6 +18,7 @@
private $stdoutBuffer;
private $restartAt;
private $silent;
+ private $shouldRestart = true;
private $shouldShutdown;
private $future;
private $traceMemory;
@@ -47,6 +48,10 @@
return (bool)$this->future;
}
+ public function isDone() {
+ return (!$this->shouldRestart && !$this->isRunning());
+ }
+
public function getFuture() {
return $this->future;
}
@@ -73,6 +78,9 @@
$this->updateMemory();
if (!$this->isRunning()) {
+ if (!$this->shouldRestart) {
+ return;
+ }
if (!$this->restartAt || (time() < $this->restartAt)) {
return;
}
@@ -314,6 +322,18 @@
case PhutilDaemon::MESSAGETYPE_HEARTBEAT:
$this->deadline = time() + $this->getRequiredHeartbeatFrequency();
break;
+ case PhutilDaemon::MESSAGETYPE_BUSY:
+ $this->overseer->didBeginWork($this);
+ break;
+ case PhutilDaemon::MESSAGETYPE_IDLE:
+ $this->overseer->didBeginIdle($this);
+ break;
+ case PhutilDaemon::MESSAGETYPE_DOWN:
+ // The daemon is exiting because it doesn't have enough work and it
+ // is trying to scale the pool down. We should not restart it.
+ $this->shouldRestart = false;
+ $this->shouldShutdown = true;
+ break;
default:
// If we can't parse this or it isn't a message we understand, just
// emit the raw message.
diff --git a/src/daemon/PhutilDaemonOverseer.php b/src/daemon/PhutilDaemonOverseer.php
--- a/src/daemon/PhutilDaemonOverseer.php
+++ b/src/daemon/PhutilDaemonOverseer.php
@@ -23,6 +23,7 @@
private $err = 0;
private $lastPidfile;
private $startEpoch;
+ private $autoscale = array();
public function __construct(array $argv) {
PhutilServiceProfiler::getInstance()->enableDiscardMode();
@@ -157,6 +158,11 @@
$this->daemons = array();
foreach ($this->config['daemons'] as $config) {
+ $config += array(
+ 'argv' => array(),
+ 'autoscale' => array(),
+ );
+
$daemon = new PhutilDaemonHandle(
$this,
$config['class'],
@@ -165,15 +171,13 @@
'log' => $this->log,
'argv' => $config['argv'],
'load' => $this->libraries,
+ 'autoscale' => $config['autoscale'],
));
$daemon->setSilent((!$this->traceMode && !$this->verbose));
$daemon->setTraceMemory($this->traceMemory);
- $this->daemons[] = array(
- 'config' => $config,
- 'handle' => $daemon,
- );
+ $this->addDaemon($daemon, $config);
}
while (true) {
@@ -183,9 +187,14 @@
if ($daemon->isRunning()) {
$futures[] = $daemon->getFuture();
}
+
+ if ($daemon->isDone()) {
+ $this->removeDaemon($daemon);
+ }
}
$this->updatePidfile();
+ $this->updateAutoscale();
if ($futures) {
$iter = id(new FutureIterator($futures))
@@ -204,6 +213,111 @@
exit($this->err);
}
+ private function addDaemon(PhutilDaemonHandle $daemon, array $config) {
+ $id = $daemon->getDaemonID();
+ $this->daemons[$id] = array(
+ 'handle' => $daemon,
+ 'config' => $config,
+ );
+
+ $autoscale_group = $this->getAutoscaleGroup($daemon);
+ if ($autoscale_group) {
+ $this->autoscale[$autoscale_group][$id] = true;
+ }
+
+ return $this;
+ }
+
+ private function removeDaemon(PhutilDaemonHandle $daemon) {
+ $id = $daemon->getDaemonID();
+
+ $autoscale_group = $this->getAutoscaleGroup($daemon);
+ if ($autoscale_group) {
+ unset($this->autoscale[$autoscale_group][$id]);
+ }
+
+ unset($this->daemons[$id]);
+
+ return $this;
+ }
+
+ private function getAutoscaleGroup(PhutilDaemonHandle $daemon) {
+ return $this->getAutoscaleProperty($daemon, 'group');
+ }
+
+ private function getAutoscaleProperty(
+ PhutilDaemonHandle $daemon,
+ $key,
+ $default = null) {
+
+ $id = $daemon->getDaemonID();
+ $autoscale = $this->daemons[$id]['config']['autoscale'];
+ return idx($autoscale, $key, $default);
+ }
+
+ public function didBeginWork(PhutilDaemonHandle $daemon) {
+ $id = $daemon->getDaemonID();
+ $busy = idx($this->daemons[$daemon->getDaemonID()], 'busy');
+ if (!$busy) {
+ $this->daemons[$id]['busy'] = time();
+ }
+ }
+
+ public function didBeginIdle(PhutilDaemonHandle $daemon) {
+ $id = $daemon->getDaemonID();
+ unset($this->daemons[$id]['busy']);
+ }
+
+ public function updateAutoscale() {
+ foreach ($this->autoscale as $group => $daemons) {
+ $daemon = $this->daemons[head_key($daemons)]['handle'];
+ $scaleup_duration = $this->getAutoscaleProperty($daemon, 'up', 2);
+ $max_pool_size = $this->getAutoscaleProperty($daemon, 'pool', 8);
+
+ // Don't scale a group if it is already at the maximum pool size.
+ if (count($daemons) >= $max_pool_size) {
+ continue;
+ }
+
+ $should_scale = true;
+ foreach ($daemons as $daemon_id => $ignored) {
+ $busy = idx($this->daemons[$daemon_id], 'busy');
+ if (!$busy) {
+ // At least one daemon in the group hasn't reported that it has
+ // started work.
+ $should_scale = false;
+ break;
+ }
+
+ if ((time() - $busy) < $scaleup_duration) {
+ // At least one daemon in the group was idle recently, so we have
+ // not fullly
+ $should_scale = false;
+ break;
+ }
+ }
+
+ if ($should_scale) {
+ $config = $this->daemons[$daemon_id]['config'];
+
+ $config['autoscale']['clone'] = true;
+
+ $clone = new PhutilDaemonHandle(
+ $this,
+ $config['class'],
+ $this->argv,
+ array(
+ 'log' => $this->log,
+ 'argv' => $config['argv'],
+ 'load' => $this->libraries,
+ 'autoscale' => $config['autoscale'],
+ ));
+
+ $this->addDaemon($clone, $config);
+ }
+ }
+ }
+
public function didReceiveNotifySignal($signo) {
foreach ($this->getDaemonHandles() as $daemon) {
$daemon->didReceiveNotifySignal($signo);
diff --git a/src/daemon/torture/PhutilHighIntensityIntervalDaemon.php b/src/daemon/torture/PhutilHighIntensityIntervalDaemon.php
new file mode 100644
--- /dev/null
+++ b/src/daemon/torture/PhutilHighIntensityIntervalDaemon.php
@@ -0,0 +1,24 @@
+<?php
+
+/**
+ * Daemon which is very busy every other minute. This will cause it to
+ * autoscale up and down.
+ *
+ */
+final class PhutilHighIntensityIntervalDaemon extends PhutilTortureTestDaemon {
+
+ protected function run() {
+ while (!$this->shouldExit()) {
+ $m = (int)date('i');
+ if ($m % 2) {
+ $this->willBeginWork();
+ $this->log('Busy.');
+ } else {
+ $this->willBeginIdle();
+ $this->log('Idle.');
+ }
+ $this->sleep(1);
+ }
+ }
+
+}

File Metadata

Mime Type
text/plain
Expires
Thu, Mar 20, 3:19 AM (2 d, 23 h ago)
Storage Engine
blob
Storage Format
Encrypted (AES-256-CBC)
Storage Handle
7712447
Default Alt Text
D11859.id28570.diff (15 KB)

Event Timeline