diff --git a/resources/sql/autopatches/20140711.workerpriority.sql b/resources/sql/autopatches/20140711.workerpriority.sql new file mode 100644 --- /dev/null +++ b/resources/sql/autopatches/20140711.workerpriority.sql @@ -0,0 +1,11 @@ +ALTER TABLE {$NAMESPACE}_worker.worker_activetask + ADD COLUMN priority int unsigned NOT NULL; + +ALTER TABLE {$NAMESPACE}_worker.worker_activetask + ADD KEY (leaseOwner, priority, id); + +ALTER TABLE {$NAMESPACE}_worker.worker_archivetask + ADD COLUMN priority int unsigned NOT NULL; + +ALTER TABLE {$NAMESPACE}_worker.worker_archivetask + ADD KEY (leaseOwner, priority, id); diff --git a/scripts/repository/reparse.php b/scripts/repository/reparse.php --- a/scripts/repository/reparse.php +++ b/scripts/repository/reparse.php @@ -267,7 +267,10 @@ if ($all_from_repo && !$force_local) { foreach ($classes as $class) { - PhabricatorWorker::scheduleTask($class, $spec); + PhabricatorWorker::scheduleTask( + $class, + $spec, + PhabricatorWorker::PRIORITY_IMPORT); $commit_name = 'r'.$callsign.$commit->getCommitIdentifier(); echo " Queued '{$class}' for commit '{$commit_name}'.\n"; diff --git a/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php b/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php --- a/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php +++ b/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php @@ -136,6 +136,7 @@ $task->getTaskClass(), $task->getLeaseOwner(), $task->getLeaseExpires() - time(), + $task->getPriority(), $task->getFailureCount(), phutil_tag( 'a', @@ -158,6 +159,7 @@ pht('Class'), pht('Owner'), pht('Expires'), + pht('Priority'), pht('Failures'), '', )); @@ -168,6 +170,7 @@ '', '', 'n', + 'n', 'action', )); $leased_table->setNoDataString(pht('No tasks are leased by workers.')); diff --git a/src/applications/diffusion/engine/DiffusionCommitHookEngine.php b/src/applications/diffusion/engine/DiffusionCommitHookEngine.php --- a/src/applications/diffusion/engine/DiffusionCommitHookEngine.php +++ b/src/applications/diffusion/engine/DiffusionCommitHookEngine.php @@ -187,7 +187,8 @@ 'eventPHID' => $event->getPHID(), 'emailPHIDs' => array_values($this->emailPHIDs), 'info' => $this->loadCommitInfoForWorker($all_updates), - )); + ), + PhabricatorWorker::PRIORITY_ALERTS); } return 0; diff --git a/src/applications/metamta/management/PhabricatorMailManagementResendWorkflow.php b/src/applications/metamta/management/PhabricatorMailManagementResendWorkflow.php --- a/src/applications/metamta/management/PhabricatorMailManagementResendWorkflow.php +++ b/src/applications/metamta/management/PhabricatorMailManagementResendWorkflow.php @@ -49,7 +49,8 @@ $mailer_task = PhabricatorWorker::scheduleTask( 'PhabricatorMetaMTAWorker', - $message->getID()); + $message->getID(), + PhabricatorWorker::PRIORITY_ALERTS); $console->writeOut( "Queued message #%d for resend.\n", diff --git a/src/applications/metamta/storage/PhabricatorMetaMTAMail.php b/src/applications/metamta/storage/PhabricatorMetaMTAMail.php --- a/src/applications/metamta/storage/PhabricatorMetaMTAMail.php +++ b/src/applications/metamta/storage/PhabricatorMetaMTAMail.php @@ -300,7 +300,8 @@ // Queue a task to send this mail. $mailer_task = PhabricatorWorker::scheduleTask( 'PhabricatorMetaMTAWorker', - $this->getID()); + $this->getID(), + PhabricatorWorker::PRIORITY_ALERTS); $this->saveTransaction(); diff --git a/src/applications/search/index/PhabricatorSearchIndexer.php b/src/applications/search/index/PhabricatorSearchIndexer.php --- a/src/applications/search/index/PhabricatorSearchIndexer.php +++ b/src/applications/search/index/PhabricatorSearchIndexer.php @@ -7,7 +7,8 @@ 'PhabricatorSearchWorker', array( 'documentPHID' => $phid, - )); + ), + PhabricatorWorker::PRIORITY_IMPORT); } public function indexDocumentByPHID($phid) { diff --git a/src/infrastructure/daemon/workers/PhabricatorWorker.php b/src/infrastructure/daemon/workers/PhabricatorWorker.php --- a/src/infrastructure/daemon/workers/PhabricatorWorker.php +++ b/src/infrastructure/daemon/workers/PhabricatorWorker.php @@ -9,6 +9,11 @@ private static $runAllTasksInProcess = false; private $queuedTasks = array(); + const PRIORITY_ALERTS = 4000; + const PRIORITY_DEFAULT = 3000; + const PRIORITY_BULK = 2000; + const PRIORITY_IMPORT = 1000; + /* -( Configuring Retries and Failures )----------------------------------- */ @@ -32,13 +37,13 @@ /** - * Return the maximum number of times this task may be retried before it - * is considered permanently failed. By default, tasks retry indefinitely. You + * Return the maximum number of times this task may be retried before it is + * considered permanently failed. By default, tasks retry indefinitely. You * can throw a @{class:PhabricatorWorkerPermanentFailureException} to cause an * immediate permanent failure. * - * @return int|null Number of times the task will retry before permanent - * failure. Return `null` to retry indefinitely. + * @return int|null Number of times the task will retry before permanent + * failure. Return `null` to retry indefinitely. * * @task config */ @@ -52,15 +57,15 @@ * retrying. For most tasks you can leave this at `null`, which will give you * a short default retry period (currently 60 seconds). * - * @param PhabricatorWorkerTask The task itself. This object is probably - * useful mostly to examine the failure - * count if you want to implement staggered - * retries, or to examine the execution - * exception if you want to react to - * different failures in different ways. - * @return int|null Number of seconds to wait between retries, - * or null for a default retry period - * (currently 60 seconds). + * @param PhabricatorWorkerTask The task itself. This object is probably + * useful mostly to examine the failure count + * if you want to implement staggered retries, + * or to examine the execution exception if + * you want to react to different failures in + * different ways. + * @return int|null Number of seconds to wait between retries, + * or null for a default retry period + * (currently 60 seconds). * * @task config */ @@ -70,7 +75,6 @@ abstract protected function doWork(); - final public function __construct($data) { $this->data = $data; } @@ -83,10 +87,19 @@ $this->doWork(); } - final public static function scheduleTask($task_class, $data) { + final public static function scheduleTask( + $task_class, + $data, + $priority = null) { + + if ($priority === null) { + $priority = self::PRIORITY_DEFAULT; + } + $task = id(new PhabricatorWorkerActiveTask()) ->setTaskClass($task_class) - ->setData($data); + ->setData($data) + ->setPriority($priority); if (self::$runAllTasksInProcess) { // Do the work in-process. @@ -96,8 +109,8 @@ try { $worker->doWork(); foreach ($worker->getQueuedTasks() as $queued_task) { - list($queued_class, $queued_data) = $queued_task; - self::scheduleTask($queued_class, $queued_data); + list($queued_class, $queued_data, $queued_priority) = $queued_task; + self::scheduleTask($queued_class, $queued_data, $queued_priority); } break; } catch (PhabricatorWorkerYieldException $ex) { @@ -106,7 +119,6 @@ 'In-process task "%s" yielded for %s seconds, sleeping...', $task_class, $ex->getDuration())); - sleep($ex->getDuration()); } } @@ -184,8 +196,7 @@ foreach ($tasks as $task) { if ($task->getResult() != PhabricatorWorkerArchiveTask::RESULT_SUCCESS) { - throw new Exception( - pht('Task %d failed!', $task->getID())); + throw new Exception(pht('Task %d failed!', $task->getID())); } } } @@ -204,7 +215,7 @@ self::$runAllTasksInProcess = $all; } - protected function log($pattern /* $args */) { + final protected function log($pattern /* , ... */) { $console = PhutilConsole::getConsole(); $argv = func_get_args(); call_user_func_array(array($console, 'writeLog'), $argv); @@ -217,12 +228,13 @@ * * The followup task will be queued only if this task completes cleanly. * - * @param string Task class to queue. - * @param array Data for the followup task. + * @param string Task class to queue. + * @param array Data for the followup task. + * @param int|null Priority for the followup task. * @return this */ - protected function queueTask($class, array $data) { - $this->queuedTasks[] = array($class, $data); + final protected function queueTask($class, array $data, $priority = null) { + $this->queuedTasks[] = array($class, $data, $priority); return $this; } @@ -230,9 +242,9 @@ /** * Get tasks queued as followups by @{method:queueTask}. * - * @return list> Queued task specifications. + * @return list> Queued task specifications. */ - public function getQueuedTasks() { + final public function getQueuedTasks() { return $this->queuedTasks; } diff --git a/src/infrastructure/daemon/workers/__tests__/PhabricatorTestWorker.php b/src/infrastructure/daemon/workers/__tests__/PhabricatorTestWorker.php --- a/src/infrastructure/daemon/workers/__tests__/PhabricatorTestWorker.php +++ b/src/infrastructure/daemon/workers/__tests__/PhabricatorTestWorker.php @@ -26,8 +26,7 @@ protected function doWork() { switch (idx($this->getTaskData(), 'doWork')) { case 'fail-temporary': - throw new Exception( - 'Temporary failure!'); + throw new Exception('Temporary failure!'); case 'fail-permanent': throw new PhabricatorWorkerPermanentFailureException( 'Permanent failure!'); diff --git a/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php b/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php --- a/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php +++ b/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php @@ -9,35 +9,30 @@ } public function testLeaseTask() { - // Leasing should work. - $task = $this->scheduleTask(); - - $this->expectNextLease($task); + $this->expectNextLease($task, 'Leasing should work.'); } public function testMultipleLease() { - // We should not be able to lease a task multiple times. - $task = $this->scheduleTask(); $this->expectNextLease($task); - $this->expectNextLease(null); + $this->expectNextLease( + null, + 'We should not be able to lease a task multiple times.'); } public function testOldestFirst() { - // Older tasks should lease first, all else being equal. - $task1 = $this->scheduleTask(); $task2 = $this->scheduleTask(); - $this->expectNextLease($task1); + $this->expectNextLease( + $task1, + 'Older tasks should lease first, all else being equal.'); $this->expectNextLease($task2); } public function testNewBeforeLeased() { - // Tasks not previously leased should lease before previously leased tasks. - $task1 = $this->scheduleTask(); $task2 = $this->scheduleTask(); @@ -45,7 +40,10 @@ $task1->setLeaseExpires(time() - 100000); $task1->forceSaveWithoutLease(); - $this->expectNextLease($task2); + $this->expectNextLease( + $task2, + 'Tasks not previously leased should lease before previously '. + 'leased tasks.'); $this->expectNextLease($task1); } @@ -138,15 +136,13 @@ public function testRequiredLeaseTime() { $task = $this->scheduleAndExecuteTask( array( - 'getRequiredLeaseTime' => 1000000, + 'getRequiredLeaseTime' => 1000000, )); $this->assertTrue(($task->getLeaseExpires() - time()) > 1000); } public function testLeasedIsOldestFirst() { - // Tasks which expired earlier should lease first, all else being equal. - $task1 = $this->scheduleTask(); $task2 = $this->scheduleTask(); @@ -158,36 +154,59 @@ $task2->setLeaseExpires(time() - 200000); $task2->forceSaveWithoutLease(); - $this->expectNextLease($task2); + $this->expectNextLease( + $task2, + 'Tasks which expired earlier should lease first, all else being equal.'); $this->expectNextLease($task1); } - private function expectNextLease($task) { + public function testLeasedIsHighestPriority() { + $task1 = $this->scheduleTask(array(), 2); + $task2 = $this->scheduleTask(array(), 1); + $task3 = $this->scheduleTask(array(), 1); + + $this->expectNextLease( + $task1, + 'Tasks with a higher priority should be scheduled first.'); + $this->expectNextLease( + $task2, + 'Tasks with the same priority should be FIFO.'); + $this->expectNextLease($task3); + } + + private function expectNextLease($task, $message = null) { $leased = id(new PhabricatorWorkerLeaseQuery()) ->setLimit(1) ->execute(); if ($task === null) { - $this->assertEqual(0, count($leased)); + $this->assertEqual(0, count($leased), $message); return null; } else { - $this->assertEqual(1, count($leased)); + $this->assertEqual(1, count($leased), $message); $this->assertEqual( (int)head($leased)->getID(), - (int)$task->getID()); + (int)$task->getID(), + $message); return head($leased); } } - private function scheduleAndExecuteTask(array $data = array()) { - $task = $this->scheduleTask($data); + private function scheduleAndExecuteTask( + array $data = array(), + $priority = null) { + + $task = $this->scheduleTask($data, $priority); $task = $this->expectNextLease($task); $task = $task->executeTask(); return $task; } - private function scheduleTask(array $data = array()) { - return PhabricatorWorker::scheduleTask('PhabricatorTestWorker', $data); + private function scheduleTask(array $data = array(), $priority = null) { + return PhabricatorWorker::scheduleTask( + 'PhabricatorTestWorker', + $data, + $priority); } } diff --git a/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php b/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php --- a/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php +++ b/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php @@ -52,7 +52,6 @@ $leased = 0; foreach ($phases as $phase) { - // NOTE: If we issue `UPDATE ... WHERE ... ORDER BY id ASC`, the query // goes very, very slowly. The `ORDER BY` triggers this, although we get // the same apparent results without it. Without the ORDER BY, binary @@ -126,7 +125,6 @@ } private function buildWhereClause(AphrontDatabaseConnection $conn_w, $phase) { - $where = array(); switch ($phase) { @@ -141,10 +139,7 @@ } if ($this->ids) { - $where[] = qsprintf( - $conn_w, - 'id IN (%Ld)', - $this->ids); + $where[] = qsprintf($conn_w, 'id IN (%Ld)', $this->ids); } return $this->formatWhereClause($where); @@ -162,13 +157,8 @@ switch ($phase) { case self::PHASE_UNLEASED: - $where[] = qsprintf( - $conn_w, - 'leaseOwner IS NULL'); - $where[] = qsprintf( - $conn_w, - 'id IN (%Ld)', - ipull($rows, 'id')); + $where[] = qsprintf($conn_w, 'leaseOwner IS NULL'); + $where[] = qsprintf($conn_w, 'id IN (%Ld)', ipull($rows, 'id')); break; case self::PHASE_EXPIRED: $in = array(); @@ -179,24 +169,20 @@ $row['id'], $row['leaseOwner']); } - $where[] = qsprintf( - $conn_w, - '(%Q)', - implode(' OR ', $in)); + $where[] = qsprintf($conn_w, '(%Q)', implode(' OR ', $in)); break; default: throw new Exception("Unknown phase '{$phase}'!"); } return $this->formatWhereClause($where); - } private function buildOrderClause(AphrontDatabaseConnection $conn_w, $phase) { switch ($phase) { case self::PHASE_UNLEASED: - // When selecting new tasks, we want to consume them in roughly - // FIFO order, so we order by the task ID. + // When selecting new tasks, we want to consume them in order of + // decreasing priority (and then FIFO). return qsprintf($conn_w, 'ORDER BY id ASC'); case self::PHASE_EXPIRED: // When selecting failed tasks, we want to consume them in roughly diff --git a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php --- a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php +++ b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php @@ -86,6 +86,7 @@ ->setLeaseExpires($this->getLeaseExpires()) ->setFailureCount($this->getFailureCount()) ->setDataID($this->getDataID()) + ->setPriority($this->getPriority()) ->setResult($result) ->setDuration($duration); @@ -164,12 +165,11 @@ if ($did_succeed) { foreach ($worker->getQueuedTasks() as $task) { list($class, $data) = $task; - PhabricatorWorker::scheduleTask($class, $data); + PhabricatorWorker::scheduleTask($class, $data, $this->getPriority()); } } return $result; } - } diff --git a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerArchiveTask.php b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerArchiveTask.php --- a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerArchiveTask.php +++ b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerArchiveTask.php @@ -11,8 +11,7 @@ public function save() { if ($this->getID() === null) { - throw new Exception( - 'Trying to archive a task with no ID.'); + throw new Exception('Trying to archive a task with no ID.'); } $other = new PhabricatorWorkerActiveTask(); @@ -57,6 +56,7 @@ ->setLeaseExpires(0) ->setFailureCount(0) ->setDataID($this->getDataID()) + ->setPriority($this->getPriority()) ->insert(); $this->setDataID(null); diff --git a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTask.php b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTask.php --- a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTask.php +++ b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTask.php @@ -9,33 +9,34 @@ protected $leaseExpires; protected $failureCount; protected $dataID; + protected $priority; private $data; private $executionException; - public function setExecutionException(Exception $execution_exception) { + final public function setExecutionException(Exception $execution_exception) { $this->executionException = $execution_exception; return $this; } - public function getExecutionException() { + final public function getExecutionException() { return $this->executionException; } - public function setData($data) { + final public function setData($data) { $this->data = $data; return $this; } - public function getData() { + final public function getData() { return $this->data; } - public function isArchived() { + final public function isArchived() { return ($this instanceof PhabricatorWorkerArchiveTask); } - public function getWorkerInstance() { + final public function getWorkerInstance() { $id = $this->getID(); $class = $this->getTaskClass(); diff --git a/src/infrastructure/sms/adapter/PhabricatorSMSImplementationAdapter.php b/src/infrastructure/sms/adapter/PhabricatorSMSImplementationAdapter.php --- a/src/infrastructure/sms/adapter/PhabricatorSMSImplementationAdapter.php +++ b/src/infrastructure/sms/adapter/PhabricatorSMSImplementationAdapter.php @@ -79,6 +79,8 @@ 'PhabricatorSMSDemultiplexWorker', array( 'toNumbers' => $to_numbers, - 'body' => $body)); + 'body' => $body, + ), + PhabricatorWorker::PRIORITY_ALERTS); } }