Page MenuHomePhabricator

D9871.id23765.diff
No OneTemporary

D9871.id23765.diff

diff --git a/resources/sql/autopatches/20140711.workerpriority.sql b/resources/sql/autopatches/20140711.workerpriority.sql
new file mode 100644
--- /dev/null
+++ b/resources/sql/autopatches/20140711.workerpriority.sql
@@ -0,0 +1,11 @@
+ALTER TABLE {$NAMESPACE}_worker.worker_activetask
+ ADD COLUMN priority int unsigned NOT NULL;
+
+ALTER TABLE {$NAMESPACE}_worker.worker_activetask
+ ADD KEY (leaseOwner, priority, id);
+
+ALTER TABLE {$NAMESPACE}_worker.worker_archivetask
+ ADD COLUMN priority int unsigned NOT NULL;
+
+ALTER TABLE {$NAMESPACE}_worker.worker_archivetask
+ ADD KEY (leaseOwner, priority, id);
diff --git a/scripts/repository/reparse.php b/scripts/repository/reparse.php
--- a/scripts/repository/reparse.php
+++ b/scripts/repository/reparse.php
@@ -267,7 +267,10 @@
if ($all_from_repo && !$force_local) {
foreach ($classes as $class) {
- PhabricatorWorker::scheduleTask($class, $spec);
+ PhabricatorWorker::scheduleTask(
+ $class,
+ $spec,
+ PhabricatorWorker::PRIORITY_IMPORT);
$commit_name = 'r'.$callsign.$commit->getCommitIdentifier();
echo " Queued '{$class}' for commit '{$commit_name}'.\n";
diff --git a/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php b/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php
--- a/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php
+++ b/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php
@@ -136,6 +136,7 @@
$task->getTaskClass(),
$task->getLeaseOwner(),
$task->getLeaseExpires() - time(),
+ $task->getPriority(),
$task->getFailureCount(),
phutil_tag(
'a',
@@ -158,6 +159,7 @@
pht('Class'),
pht('Owner'),
pht('Expires'),
+ pht('Priority'),
pht('Failures'),
'',
));
@@ -168,6 +170,7 @@
'',
'',
'n',
+ 'n',
'action',
));
$leased_table->setNoDataString(pht('No tasks are leased by workers.'));
diff --git a/src/applications/diffusion/engine/DiffusionCommitHookEngine.php b/src/applications/diffusion/engine/DiffusionCommitHookEngine.php
--- a/src/applications/diffusion/engine/DiffusionCommitHookEngine.php
+++ b/src/applications/diffusion/engine/DiffusionCommitHookEngine.php
@@ -187,7 +187,8 @@
'eventPHID' => $event->getPHID(),
'emailPHIDs' => array_values($this->emailPHIDs),
'info' => $this->loadCommitInfoForWorker($all_updates),
- ));
+ ),
+ PhabricatorWorker::PRIORITY_ALERTS);
}
return 0;
diff --git a/src/applications/metamta/management/PhabricatorMailManagementResendWorkflow.php b/src/applications/metamta/management/PhabricatorMailManagementResendWorkflow.php
--- a/src/applications/metamta/management/PhabricatorMailManagementResendWorkflow.php
+++ b/src/applications/metamta/management/PhabricatorMailManagementResendWorkflow.php
@@ -49,7 +49,8 @@
$mailer_task = PhabricatorWorker::scheduleTask(
'PhabricatorMetaMTAWorker',
- $message->getID());
+ $message->getID(),
+ PhabricatorWorker::PRIORITY_ALERTS);
$console->writeOut(
"Queued message #%d for resend.\n",
diff --git a/src/applications/metamta/storage/PhabricatorMetaMTAMail.php b/src/applications/metamta/storage/PhabricatorMetaMTAMail.php
--- a/src/applications/metamta/storage/PhabricatorMetaMTAMail.php
+++ b/src/applications/metamta/storage/PhabricatorMetaMTAMail.php
@@ -300,7 +300,8 @@
// Queue a task to send this mail.
$mailer_task = PhabricatorWorker::scheduleTask(
'PhabricatorMetaMTAWorker',
- $this->getID());
+ $this->getID(),
+ PhabricatorWorker::PRIORITY_ALERTS);
$this->saveTransaction();
diff --git a/src/applications/search/index/PhabricatorSearchIndexer.php b/src/applications/search/index/PhabricatorSearchIndexer.php
--- a/src/applications/search/index/PhabricatorSearchIndexer.php
+++ b/src/applications/search/index/PhabricatorSearchIndexer.php
@@ -7,7 +7,8 @@
'PhabricatorSearchWorker',
array(
'documentPHID' => $phid,
- ));
+ ),
+ PhabricatorWorker::PRIORITY_IMPORT);
}
public function indexDocumentByPHID($phid) {
diff --git a/src/infrastructure/daemon/workers/PhabricatorWorker.php b/src/infrastructure/daemon/workers/PhabricatorWorker.php
--- a/src/infrastructure/daemon/workers/PhabricatorWorker.php
+++ b/src/infrastructure/daemon/workers/PhabricatorWorker.php
@@ -9,6 +9,11 @@
private static $runAllTasksInProcess = false;
private $queuedTasks = array();
+ const PRIORITY_ALERTS = 4000;
+ const PRIORITY_DEFAULT = 3000;
+ const PRIORITY_BULK = 2000;
+ const PRIORITY_IMPORT = 1000;
+
/* -( Configuring Retries and Failures )----------------------------------- */
@@ -32,13 +37,13 @@
/**
- * Return the maximum number of times this task may be retried before it
- * is considered permanently failed. By default, tasks retry indefinitely. You
+ * Return the maximum number of times this task may be retried before it is
+ * considered permanently failed. By default, tasks retry indefinitely. You
* can throw a @{class:PhabricatorWorkerPermanentFailureException} to cause an
* immediate permanent failure.
*
- * @return int|null Number of times the task will retry before permanent
- * failure. Return `null` to retry indefinitely.
+ * @return int|null Number of times the task will retry before permanent
+ * failure. Return `null` to retry indefinitely.
*
* @task config
*/
@@ -52,15 +57,15 @@
* retrying. For most tasks you can leave this at `null`, which will give you
* a short default retry period (currently 60 seconds).
*
- * @param PhabricatorWorkerTask The task itself. This object is probably
- * useful mostly to examine the failure
- * count if you want to implement staggered
- * retries, or to examine the execution
- * exception if you want to react to
- * different failures in different ways.
- * @return int|null Number of seconds to wait between retries,
- * or null for a default retry period
- * (currently 60 seconds).
+ * @param PhabricatorWorkerTask The task itself. This object is probably
+ * useful mostly to examine the failure count
+ * if you want to implement staggered retries,
+ * or to examine the execution exception if
+ * you want to react to different failures in
+ * different ways.
+ * @return int|null Number of seconds to wait between retries,
+ * or null for a default retry period
+ * (currently 60 seconds).
*
* @task config
*/
@@ -70,7 +75,6 @@
abstract protected function doWork();
-
final public function __construct($data) {
$this->data = $data;
}
@@ -83,10 +87,19 @@
$this->doWork();
}
- final public static function scheduleTask($task_class, $data) {
+ final public static function scheduleTask(
+ $task_class,
+ $data,
+ $priority = null) {
+
+ if ($priority === null) {
+ $priority = self::PRIORITY_DEFAULT;
+ }
+
$task = id(new PhabricatorWorkerActiveTask())
->setTaskClass($task_class)
- ->setData($data);
+ ->setData($data)
+ ->setPriority($priority);
if (self::$runAllTasksInProcess) {
// Do the work in-process.
@@ -96,8 +109,8 @@
try {
$worker->doWork();
foreach ($worker->getQueuedTasks() as $queued_task) {
- list($queued_class, $queued_data) = $queued_task;
- self::scheduleTask($queued_class, $queued_data);
+ list($queued_class, $queued_data, $queued_priority) = $queued_task;
+ self::scheduleTask($queued_class, $queued_data, $queued_priority);
}
break;
} catch (PhabricatorWorkerYieldException $ex) {
@@ -106,7 +119,6 @@
'In-process task "%s" yielded for %s seconds, sleeping...',
$task_class,
$ex->getDuration()));
-
sleep($ex->getDuration());
}
}
@@ -184,8 +196,7 @@
foreach ($tasks as $task) {
if ($task->getResult() != PhabricatorWorkerArchiveTask::RESULT_SUCCESS) {
- throw new Exception(
- pht('Task %d failed!', $task->getID()));
+ throw new Exception(pht('Task %d failed!', $task->getID()));
}
}
}
@@ -204,7 +215,7 @@
self::$runAllTasksInProcess = $all;
}
- protected function log($pattern /* $args */) {
+ final protected function log($pattern /* , ... */) {
$console = PhutilConsole::getConsole();
$argv = func_get_args();
call_user_func_array(array($console, 'writeLog'), $argv);
@@ -217,12 +228,13 @@
*
* The followup task will be queued only if this task completes cleanly.
*
- * @param string Task class to queue.
- * @param array Data for the followup task.
+ * @param string Task class to queue.
+ * @param array Data for the followup task.
+ * @param int|null Priority for the followup task.
* @return this
*/
- protected function queueTask($class, array $data) {
- $this->queuedTasks[] = array($class, $data);
+ final protected function queueTask($class, array $data, $priority = null) {
+ $this->queuedTasks[] = array($class, $data, $priority);
return $this;
}
@@ -230,9 +242,9 @@
/**
* Get tasks queued as followups by @{method:queueTask}.
*
- * @return list<pair<string, wild>> Queued task specifications.
+ * @return list<tuple<string, wild, int|null>> Queued task specifications.
*/
- public function getQueuedTasks() {
+ final public function getQueuedTasks() {
return $this->queuedTasks;
}
diff --git a/src/infrastructure/daemon/workers/__tests__/PhabricatorTestWorker.php b/src/infrastructure/daemon/workers/__tests__/PhabricatorTestWorker.php
--- a/src/infrastructure/daemon/workers/__tests__/PhabricatorTestWorker.php
+++ b/src/infrastructure/daemon/workers/__tests__/PhabricatorTestWorker.php
@@ -26,8 +26,7 @@
protected function doWork() {
switch (idx($this->getTaskData(), 'doWork')) {
case 'fail-temporary':
- throw new Exception(
- 'Temporary failure!');
+ throw new Exception('Temporary failure!');
case 'fail-permanent':
throw new PhabricatorWorkerPermanentFailureException(
'Permanent failure!');
diff --git a/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php b/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php
--- a/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php
+++ b/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php
@@ -9,35 +9,30 @@
}
public function testLeaseTask() {
- // Leasing should work.
-
$task = $this->scheduleTask();
-
- $this->expectNextLease($task);
+ $this->expectNextLease($task, 'Leasing should work.');
}
public function testMultipleLease() {
- // We should not be able to lease a task multiple times.
-
$task = $this->scheduleTask();
$this->expectNextLease($task);
- $this->expectNextLease(null);
+ $this->expectNextLease(
+ null,
+ 'We should not be able to lease a task multiple times.');
}
public function testOldestFirst() {
- // Older tasks should lease first, all else being equal.
-
$task1 = $this->scheduleTask();
$task2 = $this->scheduleTask();
- $this->expectNextLease($task1);
+ $this->expectNextLease(
+ $task1,
+ 'Older tasks should lease first, all else being equal.');
$this->expectNextLease($task2);
}
public function testNewBeforeLeased() {
- // Tasks not previously leased should lease before previously leased tasks.
-
$task1 = $this->scheduleTask();
$task2 = $this->scheduleTask();
@@ -45,7 +40,10 @@
$task1->setLeaseExpires(time() - 100000);
$task1->forceSaveWithoutLease();
- $this->expectNextLease($task2);
+ $this->expectNextLease(
+ $task2,
+ 'Tasks not previously leased should lease before previously '.
+ 'leased tasks.');
$this->expectNextLease($task1);
}
@@ -138,15 +136,13 @@
public function testRequiredLeaseTime() {
$task = $this->scheduleAndExecuteTask(
array(
- 'getRequiredLeaseTime' => 1000000,
+ 'getRequiredLeaseTime' => 1000000,
));
$this->assertTrue(($task->getLeaseExpires() - time()) > 1000);
}
public function testLeasedIsOldestFirst() {
- // Tasks which expired earlier should lease first, all else being equal.
-
$task1 = $this->scheduleTask();
$task2 = $this->scheduleTask();
@@ -158,36 +154,59 @@
$task2->setLeaseExpires(time() - 200000);
$task2->forceSaveWithoutLease();
- $this->expectNextLease($task2);
+ $this->expectNextLease(
+ $task2,
+ 'Tasks which expired earlier should lease first, all else being equal.');
$this->expectNextLease($task1);
}
- private function expectNextLease($task) {
+ public function testLeasedIsHighestPriority() {
+ $task1 = $this->scheduleTask(array(), 2);
+ $task2 = $this->scheduleTask(array(), 1);
+ $task3 = $this->scheduleTask(array(), 1);
+
+ $this->expectNextLease(
+ $task1,
+ 'Tasks with a higher priority should be scheduled first.');
+ $this->expectNextLease(
+ $task2,
+ 'Tasks with the same priority should be FIFO.');
+ $this->expectNextLease($task3);
+ }
+
+ private function expectNextLease($task, $message = null) {
$leased = id(new PhabricatorWorkerLeaseQuery())
->setLimit(1)
->execute();
if ($task === null) {
- $this->assertEqual(0, count($leased));
+ $this->assertEqual(0, count($leased), $message);
return null;
} else {
- $this->assertEqual(1, count($leased));
+ $this->assertEqual(1, count($leased), $message);
$this->assertEqual(
(int)head($leased)->getID(),
- (int)$task->getID());
+ (int)$task->getID(),
+ $message);
return head($leased);
}
}
- private function scheduleAndExecuteTask(array $data = array()) {
- $task = $this->scheduleTask($data);
+ private function scheduleAndExecuteTask(
+ array $data = array(),
+ $priority = null) {
+
+ $task = $this->scheduleTask($data, $priority);
$task = $this->expectNextLease($task);
$task = $task->executeTask();
return $task;
}
- private function scheduleTask(array $data = array()) {
- return PhabricatorWorker::scheduleTask('PhabricatorTestWorker', $data);
+ private function scheduleTask(array $data = array(), $priority = null) {
+ return PhabricatorWorker::scheduleTask(
+ 'PhabricatorTestWorker',
+ $data,
+ $priority);
}
}
diff --git a/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php b/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php
--- a/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php
+++ b/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php
@@ -52,7 +52,6 @@
$leased = 0;
foreach ($phases as $phase) {
-
// NOTE: If we issue `UPDATE ... WHERE ... ORDER BY id ASC`, the query
// goes very, very slowly. The `ORDER BY` triggers this, although we get
// the same apparent results without it. Without the ORDER BY, binary
@@ -126,7 +125,6 @@
}
private function buildWhereClause(AphrontDatabaseConnection $conn_w, $phase) {
-
$where = array();
switch ($phase) {
@@ -141,10 +139,7 @@
}
if ($this->ids) {
- $where[] = qsprintf(
- $conn_w,
- 'id IN (%Ld)',
- $this->ids);
+ $where[] = qsprintf($conn_w, 'id IN (%Ld)', $this->ids);
}
return $this->formatWhereClause($where);
@@ -162,13 +157,8 @@
switch ($phase) {
case self::PHASE_UNLEASED:
- $where[] = qsprintf(
- $conn_w,
- 'leaseOwner IS NULL');
- $where[] = qsprintf(
- $conn_w,
- 'id IN (%Ld)',
- ipull($rows, 'id'));
+ $where[] = qsprintf($conn_w, 'leaseOwner IS NULL');
+ $where[] = qsprintf($conn_w, 'id IN (%Ld)', ipull($rows, 'id'));
break;
case self::PHASE_EXPIRED:
$in = array();
@@ -179,24 +169,20 @@
$row['id'],
$row['leaseOwner']);
}
- $where[] = qsprintf(
- $conn_w,
- '(%Q)',
- implode(' OR ', $in));
+ $where[] = qsprintf($conn_w, '(%Q)', implode(' OR ', $in));
break;
default:
throw new Exception("Unknown phase '{$phase}'!");
}
return $this->formatWhereClause($where);
-
}
private function buildOrderClause(AphrontDatabaseConnection $conn_w, $phase) {
switch ($phase) {
case self::PHASE_UNLEASED:
- // When selecting new tasks, we want to consume them in roughly
- // FIFO order, so we order by the task ID.
+ // When selecting new tasks, we want to consume them in order of
+ // decreasing priority (and then FIFO).
return qsprintf($conn_w, 'ORDER BY id ASC');
case self::PHASE_EXPIRED:
// When selecting failed tasks, we want to consume them in roughly
diff --git a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php
--- a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php
+++ b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php
@@ -86,6 +86,7 @@
->setLeaseExpires($this->getLeaseExpires())
->setFailureCount($this->getFailureCount())
->setDataID($this->getDataID())
+ ->setPriority($this->getPriority())
->setResult($result)
->setDuration($duration);
@@ -164,12 +165,11 @@
if ($did_succeed) {
foreach ($worker->getQueuedTasks() as $task) {
list($class, $data) = $task;
- PhabricatorWorker::scheduleTask($class, $data);
+ PhabricatorWorker::scheduleTask($class, $data, $this->getPriority());
}
}
return $result;
}
-
}
diff --git a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerArchiveTask.php b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerArchiveTask.php
--- a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerArchiveTask.php
+++ b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerArchiveTask.php
@@ -11,8 +11,7 @@
public function save() {
if ($this->getID() === null) {
- throw new Exception(
- 'Trying to archive a task with no ID.');
+ throw new Exception('Trying to archive a task with no ID.');
}
$other = new PhabricatorWorkerActiveTask();
@@ -57,6 +56,7 @@
->setLeaseExpires(0)
->setFailureCount(0)
->setDataID($this->getDataID())
+ ->setPriority($this->getPriority())
->insert();
$this->setDataID(null);
diff --git a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTask.php b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTask.php
--- a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTask.php
+++ b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTask.php
@@ -9,33 +9,34 @@
protected $leaseExpires;
protected $failureCount;
protected $dataID;
+ protected $priority;
private $data;
private $executionException;
- public function setExecutionException(Exception $execution_exception) {
+ final public function setExecutionException(Exception $execution_exception) {
$this->executionException = $execution_exception;
return $this;
}
- public function getExecutionException() {
+ final public function getExecutionException() {
return $this->executionException;
}
- public function setData($data) {
+ final public function setData($data) {
$this->data = $data;
return $this;
}
- public function getData() {
+ final public function getData() {
return $this->data;
}
- public function isArchived() {
+ final public function isArchived() {
return ($this instanceof PhabricatorWorkerArchiveTask);
}
- public function getWorkerInstance() {
+ final public function getWorkerInstance() {
$id = $this->getID();
$class = $this->getTaskClass();
diff --git a/src/infrastructure/sms/adapter/PhabricatorSMSImplementationAdapter.php b/src/infrastructure/sms/adapter/PhabricatorSMSImplementationAdapter.php
--- a/src/infrastructure/sms/adapter/PhabricatorSMSImplementationAdapter.php
+++ b/src/infrastructure/sms/adapter/PhabricatorSMSImplementationAdapter.php
@@ -79,6 +79,8 @@
'PhabricatorSMSDemultiplexWorker',
array(
'toNumbers' => $to_numbers,
- 'body' => $body));
+ 'body' => $body,
+ ),
+ PhabricatorWorker::PRIORITY_ALERTS);
}
}

File Metadata

Mime Type
text/plain
Expires
Fri, Mar 28, 3:51 PM (1 w, 2 d ago)
Storage Engine
blob
Storage Format
Encrypted (AES-256-CBC)
Storage Handle
7388407
Default Alt Text
D9871.id23765.diff (21 KB)

Event Timeline