Changeset View
Changeset View
Standalone View
Standalone View
src/daemon/PhutilDaemonOverseer.php
<?php | <?php | ||||
/** | /** | ||||
* Oversees a daemon and restarts it if it fails. | * Oversees a daemon and restarts it if it fails. | ||||
* | |||||
* @task signals Signal Handling | |||||
*/ | */ | ||||
final class PhutilDaemonOverseer extends Phobject { | final class PhutilDaemonOverseer extends Phobject { | ||||
private $argv; | private $argv; | ||||
private $moreArgs; | |||||
private $inAbruptShutdown; | |||||
private $inGracefulShutdown; | |||||
private static $instance; | private static $instance; | ||||
private $config; | private $config; | ||||
private $daemons = array(); | private $pools = array(); | ||||
private $traceMode; | private $traceMode; | ||||
private $traceMemory; | private $traceMemory; | ||||
private $daemonize; | private $daemonize; | ||||
private $piddir; | private $piddir; | ||||
private $log; | private $log; | ||||
private $libraries = array(); | private $libraries = array(); | ||||
private $modules = array(); | private $modules = array(); | ||||
private $verbose; | private $verbose; | ||||
private $err = 0; | |||||
private $lastPidfile; | private $lastPidfile; | ||||
private $startEpoch; | private $startEpoch; | ||||
private $autoscale = array(); | private $autoscale = array(); | ||||
private $autoscaleConfig = array(); | private $autoscaleConfig = array(); | ||||
const SIGNAL_NOTIFY = 'signal/notify'; | |||||
const SIGNAL_RELOAD = 'signal/reload'; | |||||
const SIGNAL_GRACEFUL = 'signal/graceful'; | |||||
const SIGNAL_TERMINATE = 'signal/terminate'; | |||||
private $err = 0; | |||||
private $inAbruptShutdown; | |||||
private $inGracefulShutdown; | |||||
public function __construct(array $argv) { | public function __construct(array $argv) { | ||||
PhutilServiceProfiler::getInstance()->enableDiscardMode(); | PhutilServiceProfiler::getInstance()->enableDiscardMode(); | ||||
$args = new PhutilArgumentParser($argv); | $args = new PhutilArgumentParser($argv); | ||||
$args->setTagline(pht('daemon overseer')); | $args->setTagline(pht('daemon overseer')); | ||||
$args->setSynopsis(<<<EOHELP | $args->setSynopsis(<<<EOHELP | ||||
**launch_daemon.php** [__options__] __daemon__ | **launch_daemon.php** [__options__] __daemon__ | ||||
Launch and oversee an instance of __daemon__. | Launch and oversee an instance of __daemon__. | ||||
▲ Show 20 Lines • Show All 106 Lines • ▼ Show 20 Lines | if ($this->daemonize) { | ||||
throw new Exception(pht('Unable to fork!')); | throw new Exception(pht('Unable to fork!')); | ||||
} else if ($pid) { | } else if ($pid) { | ||||
exit(0); | exit(0); | ||||
} | } | ||||
} | } | ||||
$this->modules = PhutilDaemonOverseerModule::getAllModules(); | $this->modules = PhutilDaemonOverseerModule::getAllModules(); | ||||
pcntl_signal(SIGUSR2, array($this, 'didReceiveNotifySignal')); | $this->installSignalHandlers(); | ||||
pcntl_signal(SIGHUP, array($this, 'didReceiveReloadSignal')); | |||||
pcntl_signal(SIGINT, array($this, 'didReceiveGracefulSignal')); | |||||
pcntl_signal(SIGTERM, array($this, 'didReceiveTerminalSignal')); | |||||
} | } | ||||
public function addLibrary($library) { | public function addLibrary($library) { | ||||
$this->libraries[] = $library; | $this->libraries[] = $library; | ||||
return $this; | return $this; | ||||
} | } | ||||
public function run() { | public function run() { | ||||
$this->daemons = array(); | $this->createDaemonPools(); | ||||
foreach ($this->config['daemons'] as $config) { | |||||
$config += array( | |||||
'argv' => array(), | |||||
'autoscale' => array(), | |||||
); | |||||
$daemon = new PhutilDaemonHandle( | |||||
$this, | |||||
$config['class'], | |||||
$this->argv, | |||||
array( | |||||
'log' => $this->log, | |||||
'argv' => $config['argv'], | |||||
'load' => $this->libraries, | |||||
'autoscale' => $config['autoscale'], | |||||
)); | |||||
$daemon->setTraceMemory($this->traceMemory); | |||||
$this->addDaemon($daemon, $config); | |||||
$group = idx($config['autoscale'], 'group'); | |||||
if (strlen($group)) { | |||||
if (isset($this->autoscaleConfig[$group])) { | |||||
throw new Exception( | |||||
pht( | |||||
'Two daemons are part of the same autoscale group ("%s"). '. | |||||
'Each daemon autoscale group must be unique.', | |||||
$group)); | |||||
} | |||||
$this->autoscaleConfig[$group] = $config; | |||||
} | |||||
} | |||||
$should_reload = false; | |||||
while (true) { | while (true) { | ||||
foreach ($this->modules as $module) { | if ($this->shouldReloadDaemons()) { | ||||
try { | $this->didReceiveSignal(SIGHUP); | ||||
if ($module->shouldReloadDaemons()) { | |||||
$this->logMessage( | |||||
'RELO', | |||||
pht( | |||||
'Reloading daemons (triggered by overseer module "%s").', | |||||
get_class($module))); | |||||
$should_reload = true; | |||||
} | |||||
} catch (Exception $ex) { | |||||
phlog($ex); | |||||
} | |||||
} | |||||
if ($should_reload) { | |||||
$this->didReceiveReloadSignal(SIGHUP); | |||||
$should_reload = false; | |||||
} | } | ||||
$futures = array(); | $futures = array(); | ||||
foreach ($this->getDaemonHandles() as $daemon) { | foreach ($this->getDaemonPools() as $pool) { | ||||
$daemon->update(); | $pool->updatePool(); | ||||
if ($daemon->isRunning()) { | |||||
$futures[] = $daemon->getFuture(); | |||||
} | |||||
if ($daemon->isDone()) { | foreach ($pool->getFutures() as $future) { | ||||
$this->removeDaemon($daemon); | $futures[] = $future; | ||||
} | } | ||||
} | } | ||||
$this->updatePidfile(); | $this->updatePidfile(); | ||||
$this->updateAutoscale(); | $this->updateMemory(); | ||||
if ($futures) { | $this->waitForDaemonFutures($futures); | ||||
$iter = id(new FutureIterator($futures)) | |||||
->setUpdateInterval(1); | if (!$futures) { | ||||
foreach ($iter as $future) { | |||||
break; | |||||
} | |||||
} else { | |||||
if ($this->inGracefulShutdown) { | if ($this->inGracefulShutdown) { | ||||
break; | break; | ||||
} | } | ||||
sleep(1); | |||||
} | } | ||||
} | } | ||||
exit($this->err); | exit($this->err); | ||||
} | } | ||||
private function addDaemon(PhutilDaemonHandle $daemon, array $config) { | |||||
$id = $daemon->getDaemonID(); | |||||
$this->daemons[$id] = array( | |||||
'handle' => $daemon, | |||||
'config' => $config, | |||||
); | |||||
$autoscale_group = $this->getAutoscaleGroup($daemon); | |||||
if ($autoscale_group) { | |||||
$this->autoscale[$autoscale_group][$id] = true; | |||||
} | |||||
return $this; | |||||
} | |||||
private function removeDaemon(PhutilDaemonHandle $daemon) { | |||||
$id = $daemon->getDaemonID(); | |||||
$autoscale_group = $this->getAutoscaleGroup($daemon); | |||||
if ($autoscale_group) { | |||||
unset($this->autoscale[$autoscale_group][$id]); | |||||
} | |||||
unset($this->daemons[$id]); | |||||
$daemon->didRemoveDaemon(); | |||||
return $this; | |||||
} | |||||
private function getAutoscaleGroup(PhutilDaemonHandle $daemon) { | |||||
$id = $daemon->getDaemonID(); | |||||
$autoscale = $this->daemons[$id]['config']['autoscale']; | |||||
return idx($autoscale, 'group'); | |||||
} | |||||
private function getAutoscaleProperty($group_key, $key, $default = null) { | |||||
$config = $this->autoscaleConfig[$group_key]['autoscale']; | |||||
return idx($config, $key, $default); | |||||
} | |||||
public function didBeginWork(PhutilDaemonHandle $daemon) { | |||||
$id = $daemon->getDaemonID(); | |||||
$busy = idx($this->daemons[$daemon->getDaemonID()], 'busy'); | |||||
if (!$busy) { | |||||
$this->daemons[$id]['busy'] = time(); | |||||
} | |||||
} | |||||
public function didBeginIdle(PhutilDaemonHandle $daemon) { | |||||
$id = $daemon->getDaemonID(); | |||||
unset($this->daemons[$id]['busy']); | |||||
} | |||||
public function updateAutoscale() { | |||||
if ($this->inGracefulShutdown) { | |||||
return; | |||||
} | |||||
foreach ($this->autoscale as $group => $daemons) { | private function waitForDaemonFutures(array $futures) { | ||||
$scaleup_duration = $this->getAutoscaleProperty($group, 'up', 2); | assert_instances_of($futures, 'ExecFuture'); | ||||
$max_pool_size = $this->getAutoscaleProperty($group, 'pool', 8); | |||||
$reserve = $this->getAutoscaleProperty($group, 'reserve', 0); | |||||
// Don't scale a group if it is already at the maximum pool size. | if ($futures) { | ||||
if (count($daemons) >= $max_pool_size) { | // TODO: This only wakes if any daemons actually exit. It would be a bit | ||||
Lint: TODO Comment: This comment has a TODO. | |||||
continue; | // cleaner to wait on any I/O with Channels. | ||||
} | $iter = id(new FutureIterator($futures)) | ||||
->setUpdateInterval(1); | |||||
$should_scale = true; | foreach ($iter as $future) { | ||||
foreach ($daemons as $daemon_id => $ignored) { | |||||
$busy = idx($this->daemons[$daemon_id], 'busy'); | |||||
if (!$busy) { | |||||
// At least one daemon in the group hasn't reported that it has | |||||
// started work. | |||||
$should_scale = false; | |||||
break; | |||||
} | |||||
if ((time() - $busy) < $scaleup_duration) { | |||||
// At least one daemon in the group was idle recently, so we have | |||||
// not fullly | |||||
$should_scale = false; | |||||
break; | break; | ||||
} | } | ||||
} else { | |||||
if (!$this->inGracefulShutdown) { | |||||
sleep(1); | |||||
} | } | ||||
// If we have a configured memory reserve for this pool, it tells us that | |||||
// we should not scale up unless there's at least that much memory left | |||||
// on the system (for example, a reserve of 0.25 means that 25% of system | |||||
// memory must be free to autoscale). | |||||
if ($should_scale && $reserve) { | |||||
// On some systems this may be slightly more expensive than other | |||||
// checks, so only do it once we're prepared to scale up. | |||||
$memory = PhutilSystem::getSystemMemoryInformation(); | |||||
$free_ratio = ($memory['free'] / $memory['total']); | |||||
// If we don't have enough free memory, don't scale. | |||||
if ($free_ratio <= $reserve) { | |||||
continue; | |||||
} | } | ||||
} | } | ||||
if ($should_scale) { | private function createDaemonPools() { | ||||
$config = $this->autoscaleConfig[$group]; | $configs = $this->config['daemons']; | ||||
$config['autoscale']['clone'] = true; | |||||
$clone = new PhutilDaemonHandle( | $forced_options = array( | ||||
$this, | |||||
$config['class'], | |||||
$this->argv, | |||||
array( | |||||
'log' => $this->log, | |||||
'argv' => $config['argv'], | |||||
'load' => $this->libraries, | 'load' => $this->libraries, | ||||
'autoscale' => $config['autoscale'], | 'log' => $this->log, | ||||
)); | ); | ||||
$this->logMessage( | |||||
'AUTO', | |||||
pht( | |||||
'Scaling pool "%s" up to %s daemon(s).', | |||||
$group, | |||||
new PhutilNumber(count($daemons) + 1))); | |||||
$this->addDaemon($clone, $config); | |||||
// Don't scale more than one pool up per iteration. Otherwise, we could | |||||
// break the memory barrier if we have a lot of pools and scale them | |||||
// all up at once. | |||||
return; | |||||
} | |||||
} | |||||
} | |||||
public function didReceiveNotifySignal($signo) { | |||||
foreach ($this->getDaemonHandles() as $daemon) { | |||||
$daemon->didReceiveNotifySignal($signo); | |||||
} | |||||
} | |||||
public function didReceiveReloadSignal($signo) { | foreach ($configs as $config) { | ||||
foreach ($this->getDaemonHandles() as $daemon) { | $config = $forced_options + $config; | ||||
$daemon->didReceiveReloadSignal($signo); | |||||
} | |||||
} | |||||
public function didReceiveGracefulSignal($signo) { | $pool = PhutilDaemonPool::newFromConfig($config) | ||||
// If we receive SIGINT more than once, interpret it like SIGTERM. | ->setOverseer($this) | ||||
if ($this->inGracefulShutdown) { | ->setCommandLineArguments($this->argv); | ||||
return $this->didReceiveTerminalSignal($signo); | |||||
} | |||||
$this->inGracefulShutdown = true; | |||||
foreach ($this->getDaemonHandles() as $daemon) { | $this->pools[] = $pool; | ||||
$daemon->didReceiveGracefulSignal($signo); | |||||
} | } | ||||
} | } | ||||
public function didReceiveTerminalSignal($signo) { | private function getDaemonPools() { | ||||
$this->err = 128 + $signo; | return $this->pools; | ||||
if ($this->inAbruptShutdown) { | |||||
exit($this->err); | |||||
} | } | ||||
$this->inAbruptShutdown = true; | |||||
foreach ($this->getDaemonHandles() as $daemon) { | |||||
$daemon->didReceiveTerminalSignal($signo); | |||||
} | |||||
} | |||||
private function getDaemonHandles() { | |||||
return ipull($this->daemons, 'handle'); | |||||
} | |||||
/** | /** | ||||
* Identify running daemons by examining the process table. This isn't | * Identify running daemons by examining the process table. This isn't | ||||
* completely reliable, but can be used as a fallback if the pid files fail | * completely reliable, but can be used as a fallback if the pid files fail | ||||
* or we end up with stray daemons by other means. | * or we end up with stray daemons by other means. | ||||
* | * | ||||
* Example output (array keys are process IDs): | * Example output (array keys are process IDs): | ||||
* | * | ||||
▲ Show 20 Lines • Show All 52 Lines • ▼ Show 20 Lines | public static function findRunningDaemons() { | ||||
return $results; | return $results; | ||||
} | } | ||||
private function updatePidfile() { | private function updatePidfile() { | ||||
if (!$this->piddir) { | if (!$this->piddir) { | ||||
return; | return; | ||||
} | } | ||||
$daemons = array(); | $pidfile = $this->toDictionary(); | ||||
foreach ($this->daemons as $daemon) { | if ($pidfile !== $this->lastPidfile) { | ||||
$handle = $daemon['handle']; | $this->lastPidfile = $pidfile; | ||||
$config = $daemon['config']; | $pidfile_path = $this->piddir.'/daemon.'.getmypid(); | ||||
Filesystem::writeFile($pidfile_path, phutil_json_encode($pidfile)); | |||||
} | |||||
} | |||||
if (!$handle->isRunning()) { | public function toDictionary() { | ||||
$daemons = array(); | |||||
foreach ($this->getDaemonPools() as $pool) { | |||||
foreach ($pool->getDaemons() as $daemon) { | |||||
if (!$daemon->isRunning()) { | |||||
continue; | continue; | ||||
} | } | ||||
$daemons[] = array( | $daemons[] = $daemon->toDictionary(); | ||||
'pid' => $handle->getPID(), | } | ||||
'id' => $handle->getDaemonID(), | |||||
'config' => $config, | |||||
); | |||||
} | } | ||||
$pidfile = array( | return array( | ||||
'pid' => getmypid(), | 'pid' => getmypid(), | ||||
'start' => $this->startEpoch, | 'start' => $this->startEpoch, | ||||
'config' => $this->config, | 'config' => $this->config, | ||||
'daemons' => $daemons, | 'daemons' => $daemons, | ||||
); | ); | ||||
} | |||||
if ($pidfile !== $this->lastPidfile) { | private function updateMemory() { | ||||
$this->lastPidfile = $pidfile; | if (!$this->traceMemory) { | ||||
$pidfile_path = $this->piddir.'/daemon.'.getmypid(); | return; | ||||
Filesystem::writeFile($pidfile_path, json_encode($pidfile)); | |||||
} | } | ||||
$this->logMessage( | |||||
'RAMS', | |||||
pht( | |||||
'Overseer Memory Usage: %s KB', | |||||
new PhutilNumber(memory_get_usage() / 1024, 1))); | |||||
} | } | ||||
public function logMessage($type, $message, $context = null) { | public function logMessage($type, $message, $context = null) { | ||||
if ($this->traceMode || $this->verbose) { | if ($this->traceMode || $this->verbose) { | ||||
error_log(date('Y-m-d g:i:s A').' ['.$type.'] '.$message); | error_log(date('Y-m-d g:i:s A').' ['.$type.'] '.$message); | ||||
} | } | ||||
} | } | ||||
/* -( Signal Handling )---------------------------------------------------- */ | |||||
/** | |||||
* @task signals | |||||
*/ | |||||
private function installSignalHandlers() { | |||||
$signals = array( | |||||
SIGUSR2, | |||||
SIGHUP, | |||||
SIGINT, | |||||
SIGTERM, | |||||
); | |||||
foreach ($signals as $signal) { | |||||
pcntl_signal($signal, array($this, 'didReceiveSignal')); | |||||
} | |||||
} | |||||
/** | |||||
* @task signals | |||||
*/ | |||||
public function didReceiveSignal($signo) { | |||||
switch ($signo) { | |||||
case SIGUSR2: | |||||
$signal_type = self::SIGNAL_NOTIFY; | |||||
break; | |||||
case SIGHUP: | |||||
$signal_type = self::SIGNAL_RELOAD; | |||||
break; | |||||
case SIGINT: | |||||
// If we receive SIGINT more than once, interpret it like SIGTERM. | |||||
if ($this->inGracefulShutdown) { | |||||
return $this->didReceiveSignal(SIGTERM); | |||||
} | |||||
$this->inGracefulShutdown = true; | |||||
$signal_type = self::SIGNAL_GRACEFUL; | |||||
break; | |||||
case SIGTERM: | |||||
// If we receive SIGTERM more than once, terminate abruptly. | |||||
$this->err = 128 + $signo; | |||||
if ($this->inAbruptShutdown) { | |||||
exit($this->err); | |||||
} | |||||
$this->inAbruptShutdown = true; | |||||
$signal_type = self::SIGNAL_TERMINATE; | |||||
break; | |||||
default: | |||||
throw new Exception( | |||||
pht( | |||||
'Signal handler called with unknown signal type ("%d")!', | |||||
$signo)); | |||||
} | |||||
foreach ($this->getDaemonPools() as $pool) { | |||||
$pool->didReceiveSignal($signal_type, $signo); | |||||
} | |||||
} | |||||
/* -( Daemon Modules )----------------------------------------------------- */ | |||||
private function getModules() { | |||||
return $this->modules; | |||||
} | |||||
private function shouldReloadDaemons() { | |||||
$modules = $this->getModules(); | |||||
$should_reload = false; | |||||
foreach ($modules as $module) { | |||||
try { | |||||
// NOTE: Even if one module tells us to reload, we call the method on | |||||
// each module anyway to make calls a little more predictable. | |||||
if ($module->shouldReloadDaemons()) { | |||||
$this->logMessage( | |||||
'RELO', | |||||
pht( | |||||
'Reloading daemons (triggered by overseer module "%s").', | |||||
get_class($module))); | |||||
$should_reload = true; | |||||
} | |||||
} catch (Exception $ex) { | |||||
phlog($ex); | |||||
} | |||||
} | |||||
return $should_reload; | |||||
} | |||||
} | } |
This comment has a TODO.