Changeset View
Changeset View
Standalone View
Standalone View
src/infrastructure/daemon/workers/PhabricatorWorker.php
<?php | <?php | ||||
/** | /** | ||||
* @task config Configuring Retries and Failures | * @task config Configuring Retries and Failures | ||||
*/ | */ | ||||
abstract class PhabricatorWorker { | abstract class PhabricatorWorker { | ||||
private $data; | private $data; | ||||
private static $runAllTasksInProcess = false; | private static $runAllTasksInProcess = false; | ||||
private $queuedTasks = array(); | private $queuedTasks = array(); | ||||
const PRIORITY_ALERTS = 4000; | |||||
const PRIORITY_DEFAULT = 3000; | |||||
const PRIORITY_BULK = 2000; | |||||
const PRIORITY_IMPORT = 1000; | |||||
/* -( Configuring Retries and Failures )----------------------------------- */ | /* -( Configuring Retries and Failures )----------------------------------- */ | ||||
/** | /** | ||||
* Return the number of seconds this worker needs hold a lease on the task for | * Return the number of seconds this worker needs hold a lease on the task for | ||||
* while it performs work. For most tasks you can leave this at `null`, which | * while it performs work. For most tasks you can leave this at `null`, which | ||||
* will give you a default lease (currently 2 hours). | * will give you a default lease (currently 2 hours). | ||||
* | * | ||||
* For tasks which may take a very long time to complete, you should return | * For tasks which may take a very long time to complete, you should return | ||||
* an upper bound on the amount of time the task may require. | * an upper bound on the amount of time the task may require. | ||||
* | * | ||||
* @return int|null Number of seconds this task needs to remain leased for, | * @return int|null Number of seconds this task needs to remain leased for, | ||||
* or null for a default lease. | * or null for a default lease. | ||||
* | * | ||||
* @task config | * @task config | ||||
*/ | */ | ||||
public function getRequiredLeaseTime() { | public function getRequiredLeaseTime() { | ||||
return null; | return null; | ||||
} | } | ||||
/** | /** | ||||
* Return the maximum number of times this task may be retried before it | * Return the maximum number of times this task may be retried before it is | ||||
* is considered permanently failed. By default, tasks retry indefinitely. You | * considered permanently failed. By default, tasks retry indefinitely. You | ||||
* can throw a @{class:PhabricatorWorkerPermanentFailureException} to cause an | * can throw a @{class:PhabricatorWorkerPermanentFailureException} to cause an | ||||
* immediate permanent failure. | * immediate permanent failure. | ||||
* | * | ||||
* @return int|null Number of times the task will retry before permanent | * @return int|null Number of times the task will retry before permanent | ||||
* failure. Return `null` to retry indefinitely. | * failure. Return `null` to retry indefinitely. | ||||
* | * | ||||
* @task config | * @task config | ||||
*/ | */ | ||||
public function getMaximumRetryCount() { | public function getMaximumRetryCount() { | ||||
return null; | return null; | ||||
} | } | ||||
/** | /** | ||||
* Return the number of seconds a task should wait after a failure before | * Return the number of seconds a task should wait after a failure before | ||||
* retrying. For most tasks you can leave this at `null`, which will give you | * retrying. For most tasks you can leave this at `null`, which will give you | ||||
* a short default retry period (currently 60 seconds). | * a short default retry period (currently 60 seconds). | ||||
* | * | ||||
* @param PhabricatorWorkerTask The task itself. This object is probably | * @param PhabricatorWorkerTask The task itself. This object is probably | ||||
* useful mostly to examine the failure | * useful mostly to examine the failure count | ||||
* count if you want to implement staggered | * if you want to implement staggered retries, | ||||
* retries, or to examine the execution | * or to examine the execution exception if | ||||
* exception if you want to react to | * you want to react to different failures in | ||||
* different failures in different ways. | * different ways. | ||||
* @return int|null Number of seconds to wait between retries, | * @return int|null Number of seconds to wait between retries, | ||||
* or null for a default retry period | * or null for a default retry period | ||||
* (currently 60 seconds). | * (currently 60 seconds). | ||||
* | * | ||||
* @task config | * @task config | ||||
*/ | */ | ||||
public function getWaitBeforeRetry(PhabricatorWorkerTask $task) { | public function getWaitBeforeRetry(PhabricatorWorkerTask $task) { | ||||
return null; | return null; | ||||
} | } | ||||
abstract protected function doWork(); | abstract protected function doWork(); | ||||
final public function __construct($data) { | final public function __construct($data) { | ||||
$this->data = $data; | $this->data = $data; | ||||
} | } | ||||
final protected function getTaskData() { | final protected function getTaskData() { | ||||
return $this->data; | return $this->data; | ||||
} | } | ||||
final public function executeTask() { | final public function executeTask() { | ||||
$this->doWork(); | $this->doWork(); | ||||
} | } | ||||
final public static function scheduleTask($task_class, $data) { | final public static function scheduleTask( | ||||
$task_class, | |||||
$data, | |||||
$priority = null) { | |||||
if ($priority === null) { | |||||
$priority = self::PRIORITY_DEFAULT; | |||||
} | |||||
$task = id(new PhabricatorWorkerActiveTask()) | $task = id(new PhabricatorWorkerActiveTask()) | ||||
->setTaskClass($task_class) | ->setTaskClass($task_class) | ||||
->setData($data); | ->setData($data) | ||||
->setPriority($priority); | |||||
if (self::$runAllTasksInProcess) { | if (self::$runAllTasksInProcess) { | ||||
// Do the work in-process. | // Do the work in-process. | ||||
$worker = newv($task_class, array($data)); | $worker = newv($task_class, array($data)); | ||||
while (true) { | while (true) { | ||||
try { | try { | ||||
$worker->doWork(); | $worker->doWork(); | ||||
foreach ($worker->getQueuedTasks() as $queued_task) { | foreach ($worker->getQueuedTasks() as $queued_task) { | ||||
list($queued_class, $queued_data) = $queued_task; | list($queued_class, $queued_data, $queued_priority) = $queued_task; | ||||
self::scheduleTask($queued_class, $queued_data); | self::scheduleTask($queued_class, $queued_data, $queued_priority); | ||||
} | } | ||||
break; | break; | ||||
} catch (PhabricatorWorkerYieldException $ex) { | } catch (PhabricatorWorkerYieldException $ex) { | ||||
phlog( | phlog( | ||||
pht( | pht( | ||||
'In-process task "%s" yielded for %s seconds, sleeping...', | 'In-process task "%s" yielded for %s seconds, sleeping...', | ||||
$task_class, | $task_class, | ||||
$ex->getDuration())); | $ex->getDuration())); | ||||
sleep($ex->getDuration()); | sleep($ex->getDuration()); | ||||
} | } | ||||
} | } | ||||
// Now, save a task row and immediately archive it so we can return an | // Now, save a task row and immediately archive it so we can return an | ||||
// object with a valid ID. | // object with a valid ID. | ||||
$task->openTransaction(); | $task->openTransaction(); | ||||
$task->save(); | $task->save(); | ||||
▲ Show 20 Lines • Show All 61 Lines • ▼ Show 20 Lines | final public static function waitForTasks(array $task_ids) { | ||||
} | } | ||||
$tasks = id(new PhabricatorWorkerArchiveTask())->loadAllWhere( | $tasks = id(new PhabricatorWorkerArchiveTask())->loadAllWhere( | ||||
'id IN (%Ld)', | 'id IN (%Ld)', | ||||
$task_ids); | $task_ids); | ||||
foreach ($tasks as $task) { | foreach ($tasks as $task) { | ||||
if ($task->getResult() != PhabricatorWorkerArchiveTask::RESULT_SUCCESS) { | if ($task->getResult() != PhabricatorWorkerArchiveTask::RESULT_SUCCESS) { | ||||
throw new Exception( | throw new Exception(pht('Task %d failed!', $task->getID())); | ||||
pht('Task %d failed!', $task->getID())); | |||||
} | } | ||||
} | } | ||||
} | } | ||||
public function renderForDisplay(PhabricatorUser $viewer) { | public function renderForDisplay(PhabricatorUser $viewer) { | ||||
$data = PhutilReadableSerializer::printableValue($this->data); | $data = PhutilReadableSerializer::printableValue($this->data); | ||||
return phutil_tag('pre', array(), $data); | return phutil_tag('pre', array(), $data); | ||||
} | } | ||||
/** | /** | ||||
* Set this flag to execute scheduled tasks synchronously, in the same | * Set this flag to execute scheduled tasks synchronously, in the same | ||||
* process. This is useful for debugging, and otherwise dramatically worse | * process. This is useful for debugging, and otherwise dramatically worse | ||||
* in every way imaginable. | * in every way imaginable. | ||||
*/ | */ | ||||
public static function setRunAllTasksInProcess($all) { | public static function setRunAllTasksInProcess($all) { | ||||
self::$runAllTasksInProcess = $all; | self::$runAllTasksInProcess = $all; | ||||
} | } | ||||
protected function log($pattern /* $args */) { | final protected function log($pattern /* , ... */) { | ||||
$console = PhutilConsole::getConsole(); | $console = PhutilConsole::getConsole(); | ||||
$argv = func_get_args(); | $argv = func_get_args(); | ||||
call_user_func_array(array($console, 'writeLog'), $argv); | call_user_func_array(array($console, 'writeLog'), $argv); | ||||
return $this; | return $this; | ||||
} | } | ||||
/** | /** | ||||
* Queue a task to be executed after this one succeeds. | * Queue a task to be executed after this one succeeds. | ||||
* | * | ||||
* The followup task will be queued only if this task completes cleanly. | * The followup task will be queued only if this task completes cleanly. | ||||
* | * | ||||
* @param string Task class to queue. | * @param string Task class to queue. | ||||
* @param array Data for the followup task. | * @param array Data for the followup task. | ||||
* @param int|null Priority for the followup task. | |||||
* @return this | * @return this | ||||
*/ | */ | ||||
protected function queueTask($class, array $data) { | final protected function queueTask($class, array $data, $priority = null) { | ||||
$this->queuedTasks[] = array($class, $data); | $this->queuedTasks[] = array($class, $data, $priority); | ||||
return $this; | return $this; | ||||
} | } | ||||
/** | /** | ||||
* Get tasks queued as followups by @{method:queueTask}. | * Get tasks queued as followups by @{method:queueTask}. | ||||
* | * | ||||
* @return list<pair<string, wild>> Queued task specifications. | * @return list<tuple<string, wild, int|null>> Queued task specifications. | ||||
*/ | */ | ||||
public function getQueuedTasks() { | final public function getQueuedTasks() { | ||||
return $this->queuedTasks; | return $this->queuedTasks; | ||||
} | } | ||||
} | } |