Changeset View
Changeset View
Standalone View
Standalone View
src/daemon/PhutilDaemon.php
<?php | <?php | ||||
/** | /** | ||||
* Scaffolding for implementing robust background processing scripts. | * Scaffolding for implementing robust background processing scripts. | ||||
* | * | ||||
* | * | ||||
* Autoscaling | |||||
* =========== | |||||
* | |||||
* Autoscaling automatically launches copies of a daemon when it is busy | |||||
* (scaling the pool up) and stops them when they're idle (scaling the pool | |||||
* down). This is appropriate for daemons which perform highly parallelizable | |||||
* work. | |||||
* | |||||
* To make a daemon support autoscaling, the implementation should look | |||||
* something like this: | |||||
* | |||||
* while (!$this->shouldExit()) { | |||||
* if (work_available()) { | |||||
* $this->willBeginWork(); | |||||
* do_work(); | |||||
* $this->sleep(0); | |||||
* } else { | |||||
* $this->willBeginIdle(); | |||||
* $this->sleep(1); | |||||
* } | |||||
* } | |||||
* | |||||
* In particular, call @{method:willBeginWork} before becoming busy, and | |||||
* @{method:willBeginIdle} when no work is available. If the daemon is launched | |||||
* into an autoscale pool, this will cause the pool to automatically scale up | |||||
* when busy and down when idle. | |||||
* | |||||
* See @{class:PhutilHighIntensityIntervalDaemon} for an example of a simple | |||||
* autoscaling daemon. | |||||
* | |||||
* Launching a daemon which does not make these callbacks into an autoscale | |||||
* pool will have no effect. | |||||
* | |||||
* @task overseer Communicating With the Overseer | * @task overseer Communicating With the Overseer | ||||
* @task autoscale Autoscaling Daemon Pools | |||||
* | * | ||||
* @stable | * @stable | ||||
*/ | */ | ||||
abstract class PhutilDaemon { | abstract class PhutilDaemon { | ||||
const MESSAGETYPE_STDOUT = 'stdout'; | const MESSAGETYPE_STDOUT = 'stdout'; | ||||
const MESSAGETYPE_HEARTBEAT = 'heartbeat'; | const MESSAGETYPE_HEARTBEAT = 'heartbeat'; | ||||
const MESSAGETYPE_BUSY = 'busy'; | |||||
const MESSAGETYPE_IDLE = 'idle'; | |||||
const MESSAGETYPE_DOWN = 'down'; | |||||
const WORKSTATE_BUSY = 'busy'; | |||||
const WORKSTATE_IDLE = 'idle'; | |||||
private $argv; | private $argv; | ||||
private $traceMode; | private $traceMode; | ||||
private $traceMemory; | private $traceMemory; | ||||
private $verbose; | private $verbose; | ||||
private $notifyReceived; | private $notifyReceived; | ||||
private $inGracefulShutdown; | private $inGracefulShutdown; | ||||
private $workState = null; | |||||
private $idleSince = null; | |||||
private $autoscaleProperties = array(); | |||||
final public function setVerbose($verbose) { | final public function setVerbose($verbose) { | ||||
$this->verbose = $verbose; | $this->verbose = $verbose; | ||||
return $this; | return $this; | ||||
} | } | ||||
final public function getVerbose() { | final public function getVerbose() { | ||||
return $this->verbose; | return $this->verbose; | ||||
Show All 37 Lines | abstract class PhutilDaemon { | ||||
final public function shouldExit() { | final public function shouldExit() { | ||||
return $this->inGracefulShutdown; | return $this->inGracefulShutdown; | ||||
} | } | ||||
final protected function sleep($duration) { | final protected function sleep($duration) { | ||||
$this->notifyReceived = false; | $this->notifyReceived = false; | ||||
$this->willSleep($duration); | $this->willSleep($duration); | ||||
$this->stillWorking(); | $this->stillWorking(); | ||||
$is_autoscale = $this->isClonedAutoscaleDaemon(); | |||||
$scale_down = $this->getAutoscaleDownDuration(); | |||||
$max_sleep = 60; | |||||
if ($is_autoscale) { | |||||
$max_sleep = min($max_sleep, $scale_down); | |||||
} | |||||
if ($is_autoscale) { | |||||
if ($this->workState == self::WORKSTATE_IDLE) { | |||||
$dur = (time() - $this->idleSince); | |||||
$this->log(pht('Idle for %s seconds.', $dur)); | |||||
} | |||||
} | |||||
while ($duration > 0 && | while ($duration > 0 && | ||||
!$this->notifyReceived && | !$this->notifyReceived && | ||||
!$this->shouldExit()) { | !$this->shouldExit()) { | ||||
sleep(min($duration, 60)); | |||||
$duration -= 60; | // If this is an autoscaling clone and we've been idle for too long, | ||||
// we're going to scale the pool down by exiting and not restarting. The | |||||
// DOWN message tells the overseer that we don't want to be restarted. | |||||
if ($is_autoscale) { | |||||
if ($this->workState == self::WORKSTATE_IDLE) { | |||||
if ($this->idleSince && ($this->idleSince + $scale_down < time())) { | |||||
$this->inGracefulShutdown = true; | |||||
$this->emitOverseerMessage(self::MESSAGETYPE_DOWN, null); | |||||
$this->log( | |||||
pht( | |||||
'Daemon was idle for more than %s seconds, scaling pool '. | |||||
'down.', | |||||
$scale_down)); | |||||
break; | |||||
} | |||||
} | |||||
} | |||||
sleep(min($duration, $max_sleep)); | |||||
$duration -= $max_sleep; | |||||
$this->stillWorking(); | $this->stillWorking(); | ||||
} | } | ||||
} | } | ||||
protected function willSleep($duration) { | protected function willSleep($duration) { | ||||
return; | return; | ||||
} | } | ||||
▲ Show 20 Lines • Show All 112 Lines • ▼ Show 20 Lines | if ($message) { | ||||
$console->writeErr("%s\n", $message); | $console->writeErr("%s\n", $message); | ||||
} | } | ||||
if (idx($metadata, 'trace')) { | if (idx($metadata, 'trace')) { | ||||
$trace = PhutilErrorHandler::formatStacktrace($metadata['trace']); | $trace = PhutilErrorHandler::formatStacktrace($metadata['trace']); | ||||
$console->writeErr("%s\n", $trace); | $console->writeErr("%s\n", $trace); | ||||
} | } | ||||
} | } | ||||
/* -( Autoscaling )-------------------------------------------------------- */ | |||||
/** | |||||
* Prepare to become busy. This may autoscale the pool up. | |||||
* | |||||
* This notifies the overseer that the daemon has become busy. If daemons | |||||
* that are part of an autoscale pool are continuously busy for a prolonged | |||||
* period of time, the overseer may scale up the pool. | |||||
* | |||||
* @return this | |||||
* @task autoscale | |||||
*/ | |||||
protected function willBeginWork() { | |||||
if ($this->workState != self::WORKSTATE_BUSY) { | |||||
$this->workState = self::WORKSTATE_BUSY; | |||||
$this->idleSince = null; | |||||
$this->emitOverseerMessage(self::MESSAGETYPE_BUSY, null); | |||||
} | |||||
return $this; | |||||
} | |||||
/** | |||||
* Prepare to idle. This may autoscale the pool down. | |||||
* | |||||
* This notifies the overseer that the daemon is no longer busy. If daemons | |||||
* that are part of an autoscale pool are idle for a prolonged period of time, | |||||
* they may exit to scale the pool down. | |||||
* | |||||
* @return this | |||||
* @task autoscale | |||||
*/ | |||||
protected function willBeginIdle() { | |||||
if ($this->workState != self::WORKSTATE_IDLE) { | |||||
$this->workState = self::WORKSTATE_IDLE; | |||||
$this->idleSince = time(); | |||||
$this->emitOverseerMessage(self::MESSAGETYPE_IDLE, null); | |||||
} | |||||
return $this; | |||||
} | |||||
/** | |||||
* Determine if this is a clone or the original daemon. | |||||
* | |||||
* @return bool True if this is an cloned autoscaling daemon. | |||||
* @task autoscale | |||||
*/ | |||||
private function isClonedAutoscaleDaemon() { | |||||
return (bool)$this->getAutoscaleProperty('clone', false); | |||||
} | |||||
/** | |||||
* Get the duration (in seconds) which a daemon must be continuously idle | |||||
* for before it should exit to scale the pool down. | |||||
* | |||||
* @return int Duration, in seconds. | |||||
* @task autoscale | |||||
*/ | |||||
private function getAutoscaleDownDuration() { | |||||
return $this->getAutoscaleProperty('down', 15); | |||||
} | |||||
/** | |||||
* Configure autoscaling for this daemon. | |||||
* | |||||
* @param map<string, wild> Map of autoscale properties. | |||||
* @return this | |||||
* @task autoscale | |||||
*/ | |||||
public function setAutoscaleProperties(array $autoscale_properties) { | |||||
PhutilTypeSpec::checkMap( | |||||
$autoscale_properties, | |||||
array( | |||||
'group' => 'optional string', | |||||
'up' => 'optional int', | |||||
'down' => 'optional int', | |||||
'pool' => 'optional int', | |||||
'clone' => 'optional bool', | |||||
)); | |||||
$this->autoscaleProperties = $autoscale_properties; | |||||
return $this; | |||||
} | |||||
/** | |||||
* Read autoscaling configuration for this daemon. | |||||
* | |||||
* @param string Property to read. | |||||
* @param wild Default value to return if the property is not set. | |||||
* @return wild Property value, or `$default` if one is not set. | |||||
* @task autoscale | |||||
*/ | |||||
private function getAutoscaleProperty($key, $default = null) { | |||||
return idx($this->autoscaleProperties, $key, $default); | |||||
} | |||||
} | } |