Changeset View
Changeset View
Standalone View
Standalone View
src/infrastructure/daemon/workers/PhabricatorTriggerDaemon.php
<?php | <?php | ||||
/** | /** | ||||
* Schedule and execute event triggers, which run code at specific times. | * Schedule and execute event triggers, which run code at specific times. | ||||
* | |||||
* Also performs garbage collection of old logs, caches, etc. | |||||
* | |||||
* @task garbage Garbage Collection | |||||
*/ | */ | ||||
final class PhabricatorTriggerDaemon | final class PhabricatorTriggerDaemon | ||||
extends PhabricatorDaemon { | extends PhabricatorDaemon { | ||||
const COUNTER_VERSION = 'trigger.version'; | const COUNTER_VERSION = 'trigger.version'; | ||||
const COUNTER_CURSOR = 'trigger.cursor'; | const COUNTER_CURSOR = 'trigger.cursor'; | ||||
private $garbageCollectors; | |||||
private $nextCollection; | |||||
protected function run() { | protected function run() { | ||||
// The trigger daemon is a low-level infrastructure daemon which schedules | // The trigger daemon is a low-level infrastructure daemon which schedules | ||||
// and executes chronological events. Examples include a subscription which | // and executes chronological events. Examples include a subscription which | ||||
// generates a bill on the 12th of every month, or a reminder email 15 | // generates a bill on the 12th of every month, or a reminder email 15 | ||||
// minutes before a meeting. | // minutes before a meeting. | ||||
// Only one trigger daemon can run at a time, and very little work should | // Only one trigger daemon can run at a time, and very little work should | ||||
Show All 29 Lines | protected function run() { | ||||
// | // | ||||
// The major goal of this design is to deliver on the guarantee that events | // The major goal of this design is to deliver on the guarantee that events | ||||
// will execute exactly once. It prevents race conditions in scheduling | // will execute exactly once. It prevents race conditions in scheduling | ||||
// and execution by ensuring there is only one writer for either of these | // and execution by ensuring there is only one writer for either of these | ||||
// phases. Without this separation of responsibilities, web processes | // phases. Without this separation of responsibilities, web processes | ||||
// trying to reschedule events after an update could race with other web | // trying to reschedule events after an update could race with other web | ||||
// processes or the daemon. | // processes or the daemon. | ||||
// We want to start the first GC cycle right away, not wait 4 hours. | |||||
$this->nextCollection = PhabricatorTime::getNow(); | |||||
do { | do { | ||||
$lock = PhabricatorGlobalLock::newLock('trigger'); | $lock = PhabricatorGlobalLock::newLock('trigger'); | ||||
try { | try { | ||||
$lock->lock(5); | $lock->lock(5); | ||||
} catch (PhutilLockException $ex) { | } catch (PhutilLockException $ex) { | ||||
throw new PhutilProxyException( | throw new PhutilProxyException( | ||||
pht( | pht( | ||||
Show All 16 Lines | do { | ||||
} | } | ||||
// Run the execution phase. This finds events which are due to execute | // Run the execution phase. This finds events which are due to execute | ||||
// and runs them. | // and runs them. | ||||
$this->executeTriggers(); | $this->executeTriggers(); | ||||
$lock->unlock(); | $lock->unlock(); | ||||
$this->sleep($this->getSleepDuration()); | $sleep_duration = $this->getSleepDuration(); | ||||
$sleep_duration = $this->runGarbageCollection($sleep_duration); | |||||
$this->sleep($sleep_duration); | |||||
} while (!$this->shouldExit()); | } while (!$this->shouldExit()); | ||||
} | } | ||||
/** | /** | ||||
* Process all of the triggers which have been updated since the last time | * Process all of the triggers which have been updated since the last time | ||||
* the daemon ran, scheduling them into the event table. | * the daemon ran, scheduling them into the event table. | ||||
* | * | ||||
▲ Show 20 Lines • Show All 145 Lines • ▼ Show 20 Lines | final class PhabricatorTriggerDaemon | ||||
* phase. | * phase. | ||||
* | * | ||||
* If no events are scheduled soon, we'll sleep for 60 seconds. Otherwise, | * If no events are scheduled soon, we'll sleep for 60 seconds. Otherwise, | ||||
* we'll sleep until the next scheduled event. | * we'll sleep until the next scheduled event. | ||||
* | * | ||||
* @return int Number of seconds to sleep for. | * @return int Number of seconds to sleep for. | ||||
*/ | */ | ||||
private function getSleepDuration() { | private function getSleepDuration() { | ||||
$sleep = 60; | $sleep = 5; | ||||
$next_triggers = id(new PhabricatorWorkerTriggerQuery()) | $next_triggers = id(new PhabricatorWorkerTriggerQuery()) | ||||
->setViewer($this->getViewer()) | ->setViewer($this->getViewer()) | ||||
->setOrder(PhabricatorWorkerTriggerQuery::ORDER_EXECUTION) | ->setOrder(PhabricatorWorkerTriggerQuery::ORDER_EXECUTION) | ||||
->setLimit(1) | ->setLimit(1) | ||||
->needEvents(true) | ->needEvents(true) | ||||
->execute(); | ->execute(); | ||||
if ($next_triggers) { | if ($next_triggers) { | ||||
Show All 25 Lines | LiskDAO::overwriteCounterValue( | ||||
$value); | $value); | ||||
} | } | ||||
private function loadCurrentCounter($counter_name) { | private function loadCurrentCounter($counter_name) { | ||||
return (int)LiskDAO::loadCurrentCounterValue( | return (int)LiskDAO::loadCurrentCounterValue( | ||||
id(new PhabricatorWorkerTrigger())->establishConnection('w'), | id(new PhabricatorWorkerTrigger())->establishConnection('w'), | ||||
$counter_name); | $counter_name); | ||||
} | } | ||||
/* -( Garbage Collection )------------------------------------------------- */ | |||||
/** | |||||
* Run the garbage collector for up to a specified number of seconds. | |||||
* | |||||
* @param int Number of seconds the GC may run for. | |||||
* @return int Number of seconds remaining in the time budget. | |||||
* @task garbage | |||||
*/ | |||||
private function runGarbageCollection($duration) { | |||||
$run_until = (PhabricatorTime::getNow() + $duration); | |||||
// NOTE: We always run at least one GC cycle to make sure the GC can make | |||||
// progress even if the trigger queue is busy. | |||||
do { | |||||
$more_garbage = $this->updateGarbageCollection(); | |||||
if (!$more_garbage) { | |||||
// If we don't have any more collection work to perform, we're all | |||||
// done. | |||||
break; | |||||
} | |||||
} while (PhabricatorTime::getNow() <= $run_until); | |||||
$remaining = max(0, $run_until - PhabricatorTime::getNow()); | |||||
return $remaining; | |||||
} | |||||
/** | |||||
* Update garbage collection, possibly collecting a small amount of garbage. | |||||
* | |||||
* @return bool True if there is more garbage to collect. | |||||
* @task garbage | |||||
*/ | |||||
private function updateGarbageCollection() { | |||||
// If we're ready to start the next collection cycle, load all the | |||||
// collectors. | |||||
$next = $this->nextCollection; | |||||
if ($next && (PhabricatorTime::getNow() >= $next)) { | |||||
$this->nextCollection = null; | |||||
$this->garbageCollectors = $this->loadGarbageCollectors(); | |||||
} | |||||
// If we're in a collection cycle, continue collection. | |||||
if ($this->garbageCollectors) { | |||||
foreach ($this->garbageCollectors as $key => $collector) { | |||||
$more_garbage = $collector->collectGarbage(); | |||||
if (!$more_garbage) { | |||||
unset($this->garbageCollectors[$key]); | |||||
} | |||||
// We only run one collection per call, to prevent triggers from being | |||||
// thrown too far off schedule if there's a lot of garbage to collect. | |||||
break; | |||||
} | |||||
if ($this->garbageCollectors) { | |||||
// If we have more work to do, return true. | |||||
return true; | |||||
} | |||||
// Otherwise, reschedule another cycle in 4 hours. | |||||
$now = PhabricatorTime::getNow(); | |||||
$wait = phutil_units('4 hours in seconds'); | |||||
$this->nextCollection = $now + $wait; | |||||
} | |||||
return false; | |||||
} | |||||
/** | |||||
* Load all of the available garbage collectors. | |||||
* | |||||
* @return list<PhabricatorGarbageCollector> Garbage collectors. | |||||
* @task garbage | |||||
*/ | |||||
private function loadGarbageCollectors() { | |||||
return id(new PhutilSymbolLoader()) | |||||
->setAncestorClass('PhabricatorGarbageCollector') | |||||
->loadObjects(); | |||||
} | |||||
} | } |