Changeset View
Changeset View
Standalone View
Standalone View
src/infrastructure/daemon/PhutilDaemonHandle.php
- This file was added.
<?php | |||||
final class PhutilDaemonHandle extends Phobject { | |||||
const EVENT_DID_LAUNCH = 'daemon.didLaunch'; | |||||
const EVENT_DID_LOG = 'daemon.didLogMessage'; | |||||
const EVENT_DID_HEARTBEAT = 'daemon.didHeartbeat'; | |||||
const EVENT_WILL_GRACEFUL = 'daemon.willGraceful'; | |||||
const EVENT_WILL_EXIT = 'daemon.willExit'; | |||||
private $pool; | |||||
private $properties; | |||||
private $future; | |||||
private $argv; | |||||
private $restartAt; | |||||
private $busyEpoch; | |||||
private $pid; | |||||
private $daemonID; | |||||
private $deadline; | |||||
private $heartbeat; | |||||
private $stdoutBuffer; | |||||
private $shouldRestart = true; | |||||
private $shouldShutdown; | |||||
private $hibernating = false; | |||||
private $shouldSendExitEvent = false; | |||||
private function __construct() { | |||||
// <empty> | |||||
} | |||||
public static function newFromConfig(array $config) { | |||||
PhutilTypeSpec::checkMap( | |||||
$config, | |||||
array( | |||||
'class' => 'string', | |||||
'argv' => 'optional list<string>', | |||||
'load' => 'optional list<string>', | |||||
'log' => 'optional string|null', | |||||
'down' => 'optional int', | |||||
)); | |||||
$config = $config + array( | |||||
'argv' => array(), | |||||
'load' => array(), | |||||
'log' => null, | |||||
'down' => 15, | |||||
); | |||||
$daemon = new self(); | |||||
$daemon->properties = $config; | |||||
$daemon->daemonID = $daemon->generateDaemonID(); | |||||
return $daemon; | |||||
} | |||||
public function setDaemonPool(PhutilDaemonPool $daemon_pool) { | |||||
$this->pool = $daemon_pool; | |||||
return $this; | |||||
} | |||||
public function getDaemonPool() { | |||||
return $this->pool; | |||||
} | |||||
public function getBusyEpoch() { | |||||
return $this->busyEpoch; | |||||
} | |||||
public function getDaemonClass() { | |||||
return $this->getProperty('class'); | |||||
} | |||||
private function getProperty($key) { | |||||
return idx($this->properties, $key); | |||||
} | |||||
public function setCommandLineArguments(array $arguments) { | |||||
$this->argv = $arguments; | |||||
return $this; | |||||
} | |||||
public function getCommandLineArguments() { | |||||
return $this->argv; | |||||
} | |||||
public function getDaemonArguments() { | |||||
return $this->getProperty('argv'); | |||||
} | |||||
public function didLaunch() { | |||||
$this->restartAt = time(); | |||||
$this->shouldSendExitEvent = true; | |||||
$this->dispatchEvent( | |||||
self::EVENT_DID_LAUNCH, | |||||
array( | |||||
'argv' => $this->getCommandLineArguments(), | |||||
'explicitArgv' => $this->getDaemonArguments(), | |||||
)); | |||||
return $this; | |||||
} | |||||
public function isRunning() { | |||||
return (bool)$this->future; | |||||
} | |||||
public function isHibernating() { | |||||
return | |||||
!$this->isRunning() && | |||||
!$this->isDone() && | |||||
$this->hibernating; | |||||
} | |||||
public function wakeFromHibernation() { | |||||
if (!$this->isHibernating()) { | |||||
return $this; | |||||
} | |||||
$this->logMessage( | |||||
'WAKE', | |||||
pht( | |||||
'Process is being awakened from hibernation.')); | |||||
$this->restartAt = time(); | |||||
$this->update(); | |||||
return $this; | |||||
} | |||||
public function isDone() { | |||||
return (!$this->shouldRestart && !$this->isRunning()); | |||||
} | |||||
public function getFuture() { | |||||
return $this->future; | |||||
} | |||||
public function update() { | |||||
if (!$this->isRunning()) { | |||||
if (!$this->shouldRestart) { | |||||
return; | |||||
} | |||||
if (!$this->restartAt || (time() < $this->restartAt)) { | |||||
return; | |||||
} | |||||
if ($this->shouldShutdown) { | |||||
return; | |||||
} | |||||
$this->startDaemonProcess(); | |||||
} | |||||
$future = $this->future; | |||||
$result = null; | |||||
if ($future->isReady()) { | |||||
$result = $future->resolve(); | |||||
} | |||||
list($stdout, $stderr) = $future->read(); | |||||
$future->discardBuffers(); | |||||
if (strlen($stdout)) { | |||||
$this->didReadStdout($stdout); | |||||
} | |||||
$stderr = trim($stderr); | |||||
if (strlen($stderr)) { | |||||
foreach (phutil_split_lines($stderr, false) as $line) { | |||||
$this->logMessage('STDE', $line); | |||||
} | |||||
} | |||||
if ($result !== null) { | |||||
list($err) = $result; | |||||
if ($err) { | |||||
$this->logMessage('FAIL', pht('Process exited with error %s.', $err)); | |||||
} else { | |||||
$this->logMessage('DONE', pht('Process exited normally.')); | |||||
} | |||||
$this->future = null; | |||||
if ($this->shouldShutdown) { | |||||
$this->restartAt = null; | |||||
} else { | |||||
$this->scheduleRestart(); | |||||
} | |||||
} | |||||
$this->updateHeartbeatEvent(); | |||||
$this->updateHangDetection(); | |||||
} | |||||
private function updateHeartbeatEvent() { | |||||
if ($this->heartbeat > time()) { | |||||
return; | |||||
} | |||||
$this->heartbeat = time() + $this->getHeartbeatEventFrequency(); | |||||
$this->dispatchEvent(self::EVENT_DID_HEARTBEAT); | |||||
} | |||||
private function updateHangDetection() { | |||||
if (!$this->isRunning()) { | |||||
return; | |||||
} | |||||
if (time() > $this->deadline) { | |||||
$this->logMessage('HANG', pht('Hang detected. Restarting process.')); | |||||
$this->annihilateProcessGroup(); | |||||
$this->scheduleRestart(); | |||||
} | |||||
} | |||||
private function scheduleRestart() { | |||||
// Wait a minimum of a few sceconds before restarting, but we may wait | |||||
// longer if the daemon has initiated hibernation. | |||||
$default_restart = time() + self::getWaitBeforeRestart(); | |||||
if ($default_restart >= $this->restartAt) { | |||||
$this->restartAt = $default_restart; | |||||
} | |||||
$this->logMessage( | |||||
'WAIT', | |||||
pht( | |||||
'Waiting %s second(s) to restart process.', | |||||
new PhutilNumber($this->restartAt - time()))); | |||||
} | |||||
/** | |||||
* Generate a unique ID for this daemon. | |||||
* | |||||
* @return string A unique daemon ID. | |||||
*/ | |||||
private function generateDaemonID() { | |||||
return substr(getmypid().':'.Filesystem::readRandomCharacters(12), 0, 12); | |||||
} | |||||
public function getDaemonID() { | |||||
return $this->daemonID; | |||||
} | |||||
public function getPID() { | |||||
return $this->pid; | |||||
} | |||||
private function getCaptureBufferSize() { | |||||
return 65535; | |||||
} | |||||
private function getRequiredHeartbeatFrequency() { | |||||
return 86400; | |||||
} | |||||
public static function getWaitBeforeRestart() { | |||||
return 5; | |||||
} | |||||
public static function getHeartbeatEventFrequency() { | |||||
return 120; | |||||
} | |||||
private function getKillDelay() { | |||||
return 3; | |||||
} | |||||
private function getDaemonCWD() { | |||||
$root = dirname(phutil_get_library_root('phabricator')); | |||||
return $root.'/scripts/daemon/exec/'; | |||||
} | |||||
private function newExecFuture() { | |||||
$class = $this->getDaemonClass(); | |||||
$argv = $this->getCommandLineArguments(); | |||||
$buffer_size = $this->getCaptureBufferSize(); | |||||
// NOTE: PHP implements proc_open() by running 'sh -c'. On most systems this | |||||
// is bash, but on Ubuntu it's dash. When you proc_open() using bash, you | |||||
// get one new process (the command you ran). When you proc_open() using | |||||
// dash, you get two new processes: the command you ran and a parent | |||||
// "dash -c" (or "sh -c") process. This means that the child process's PID | |||||
// is actually the 'dash' PID, not the command's PID. To avoid this, use | |||||
// 'exec' to replace the shell process with the real process; without this, | |||||
// the child will call posix_getppid(), be given the pid of the 'sh -c' | |||||
// process, and send it SIGUSR1 to keepalive which will terminate it | |||||
// immediately. We also won't be able to do process group management because | |||||
// the shell process won't properly posix_setsid() so the pgid of the child | |||||
// won't be meaningful. | |||||
$config = $this->properties; | |||||
unset($config['class']); | |||||
$config = phutil_json_encode($config); | |||||
return id(new ExecFuture('exec ./exec_daemon.php %s %Ls', $class, $argv)) | |||||
->setCWD($this->getDaemonCWD()) | |||||
->setStdoutSizeLimit($buffer_size) | |||||
->setStderrSizeLimit($buffer_size) | |||||
->write($config); | |||||
} | |||||
/** | |||||
* Dispatch an event to event listeners. | |||||
* | |||||
* @param string Event type. | |||||
* @param dict Event parameters. | |||||
* @return void | |||||
*/ | |||||
private function dispatchEvent($type, array $params = array()) { | |||||
$data = array( | |||||
'id' => $this->getDaemonID(), | |||||
'daemonClass' => $this->getDaemonClass(), | |||||
'childPID' => $this->getPID(), | |||||
) + $params; | |||||
$event = new PhutilEvent($type, $data); | |||||
try { | |||||
PhutilEventEngine::dispatchEvent($event); | |||||
} catch (Exception $ex) { | |||||
phlog($ex); | |||||
} | |||||
} | |||||
private function annihilateProcessGroup() { | |||||
$pid = $this->getPID(); | |||||
$pgid = posix_getpgid($pid); | |||||
if ($pid && $pgid) { | |||||
posix_kill(-$pgid, SIGTERM); | |||||
sleep($this->getKillDelay()); | |||||
posix_kill(-$pgid, SIGKILL); | |||||
$this->pid = null; | |||||
} | |||||
} | |||||
private function startDaemonProcess() { | |||||
$this->logMessage('INIT', pht('Starting process.')); | |||||
$this->deadline = time() + $this->getRequiredHeartbeatFrequency(); | |||||
$this->heartbeat = time() + self::getHeartbeatEventFrequency(); | |||||
$this->stdoutBuffer = ''; | |||||
$this->hibernating = false; | |||||
$this->future = $this->newExecFuture(); | |||||
$this->future->start(); | |||||
$this->pid = $this->future->getPID(); | |||||
} | |||||
private function didReadStdout($data) { | |||||
$this->stdoutBuffer .= $data; | |||||
while (true) { | |||||
$pos = strpos($this->stdoutBuffer, "\n"); | |||||
if ($pos === false) { | |||||
break; | |||||
} | |||||
$message = substr($this->stdoutBuffer, 0, $pos); | |||||
$this->stdoutBuffer = substr($this->stdoutBuffer, $pos + 1); | |||||
try { | |||||
$structure = phutil_json_decode($message); | |||||
} catch (PhutilJSONParserException $ex) { | |||||
$structure = array(); | |||||
} | |||||
switch (idx($structure, 0)) { | |||||
case PhutilDaemon::MESSAGETYPE_STDOUT: | |||||
$this->logMessage('STDO', idx($structure, 1)); | |||||
break; | |||||
case PhutilDaemon::MESSAGETYPE_HEARTBEAT: | |||||
$this->deadline = time() + $this->getRequiredHeartbeatFrequency(); | |||||
break; | |||||
case PhutilDaemon::MESSAGETYPE_BUSY: | |||||
if (!$this->busyEpoch) { | |||||
$this->busyEpoch = time(); | |||||
} | |||||
break; | |||||
case PhutilDaemon::MESSAGETYPE_IDLE: | |||||
$this->busyEpoch = null; | |||||
break; | |||||
case PhutilDaemon::MESSAGETYPE_DOWN: | |||||
// The daemon is exiting because it doesn't have enough work and it | |||||
// is trying to scale the pool down. We should not restart it. | |||||
$this->shouldRestart = false; | |||||
$this->shouldShutdown = true; | |||||
break; | |||||
case PhutilDaemon::MESSAGETYPE_HIBERNATE: | |||||
$config = idx($structure, 1); | |||||
$duration = (int)idx($config, 'duration', 0); | |||||
$this->restartAt = time() + $duration; | |||||
$this->hibernating = true; | |||||
$this->busyEpoch = null; | |||||
$this->logMessage( | |||||
'ZZZZ', | |||||
pht( | |||||
'Process is preparing to hibernate for %s second(s).', | |||||
new PhutilNumber($duration))); | |||||
break; | |||||
default: | |||||
// If we can't parse this or it isn't a message we understand, just | |||||
// emit the raw message. | |||||
$this->logMessage('STDO', pht('<Malformed> %s', $message)); | |||||
break; | |||||
} | |||||
} | |||||
} | |||||
public function didReceiveNotifySignal($signo) { | |||||
$pid = $this->getPID(); | |||||
if ($pid) { | |||||
posix_kill($pid, $signo); | |||||
} | |||||
} | |||||
public function didReceiveReloadSignal($signo) { | |||||
$signame = phutil_get_signal_name($signo); | |||||
if ($signame) { | |||||
$sigmsg = pht( | |||||
'Reloading in response to signal %d (%s).', | |||||
$signo, | |||||
$signame); | |||||
} else { | |||||
$sigmsg = pht( | |||||
'Reloading in response to signal %d.', | |||||
$signo); | |||||
} | |||||
$this->logMessage('RELO', $sigmsg, $signo); | |||||
// This signal means "stop the current process gracefully, then launch | |||||
// a new identical process once it exits". This can be used to update | |||||
// daemons after code changes (the new processes will run the new code) | |||||
// without aborting any running tasks. | |||||
// We SIGINT the daemon but don't set the shutdown flag, so it will | |||||
// naturally be restarted after it exits, as though it had exited after an | |||||
// unhandled exception. | |||||
posix_kill($this->getPID(), SIGINT); | |||||
} | |||||
public function didReceiveGracefulSignal($signo) { | |||||
$this->shouldShutdown = true; | |||||
$this->shouldRestart = false; | |||||
$signame = phutil_get_signal_name($signo); | |||||
if ($signame) { | |||||
$sigmsg = pht( | |||||
'Graceful shutdown in response to signal %d (%s).', | |||||
$signo, | |||||
$signame); | |||||
} else { | |||||
$sigmsg = pht( | |||||
'Graceful shutdown in response to signal %d.', | |||||
$signo); | |||||
} | |||||
$this->logMessage('DONE', $sigmsg, $signo); | |||||
posix_kill($this->getPID(), SIGINT); | |||||
} | |||||
public function didReceiveTerminateSignal($signo) { | |||||
$this->shouldShutdown = true; | |||||
$this->shouldRestart = false; | |||||
$signame = phutil_get_signal_name($signo); | |||||
if ($signame) { | |||||
$sigmsg = pht( | |||||
'Shutting down in response to signal %s (%s).', | |||||
$signo, | |||||
$signame); | |||||
} else { | |||||
$sigmsg = pht('Shutting down in response to signal %s.', $signo); | |||||
} | |||||
$this->logMessage('EXIT', $sigmsg, $signo); | |||||
$this->annihilateProcessGroup(); | |||||
} | |||||
private function logMessage($type, $message, $context = null) { | |||||
$this->getDaemonPool()->logMessage($type, $message, $context); | |||||
$this->dispatchEvent( | |||||
self::EVENT_DID_LOG, | |||||
array( | |||||
'type' => $type, | |||||
'message' => $message, | |||||
'context' => $context, | |||||
)); | |||||
} | |||||
public function didExit() { | |||||
if ($this->shouldSendExitEvent) { | |||||
$this->dispatchEvent(self::EVENT_WILL_EXIT); | |||||
$this->shouldSendExitEvent = false; | |||||
} | |||||
return $this; | |||||
} | |||||
} |