Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F15437739
D9871.id23750.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
21 KB
Referenced Files
None
Subscribers
None
D9871.id23750.diff
View Options
diff --git a/resources/sql/autopatches/20140710.workerpriority.sql b/resources/sql/autopatches/20140710.workerpriority.sql
new file mode 100644
--- /dev/null
+++ b/resources/sql/autopatches/20140710.workerpriority.sql
@@ -0,0 +1,11 @@
+ALTER TABLE {$NAMESPACE}_worker.worker_activetask
+ ADD COLUMN priority int unsigned NOT NULL;
+
+ALTER TABLE {$NAMESPACE}_worker.worker_activetask
+ ADD KEY (leaseOwner, priority, id);
+
+ALTER TABLE {$NAMESPACE}_worker.worker_archivetask
+ ADD COLUMN priority int unsigned NOT NULL;
+
+ALTER TABLE {$NAMESPACE}_worker.worker_archivetask
+ ADD KEY (leaseOwner, priority, id);
diff --git a/scripts/repository/reparse.php b/scripts/repository/reparse.php
--- a/scripts/repository/reparse.php
+++ b/scripts/repository/reparse.php
@@ -267,7 +267,10 @@
if ($all_from_repo && !$force_local) {
foreach ($classes as $class) {
- PhabricatorWorker::scheduleTask($class, $spec);
+ PhabricatorWorker::scheduleTask(
+ $class,
+ $spec,
+ PhabricatorWorker::PRIORITY_IMPORT);
$commit_name = 'r'.$callsign.$commit->getCommitIdentifier();
echo " Queued '{$class}' for commit '{$commit_name}'.\n";
diff --git a/src/__phutil_library_map__.php b/src/__phutil_library_map__.php
--- a/src/__phutil_library_map__.php
+++ b/src/__phutil_library_map__.php
@@ -5,7 +5,6 @@
* @generated
* @phutil-library-version 2
*/
-
phutil_register_library_map(array(
'__library_version__' => 2,
'class' =>
diff --git a/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php b/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php
--- a/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php
+++ b/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php
@@ -136,6 +136,7 @@
$task->getTaskClass(),
$task->getLeaseOwner(),
$task->getLeaseExpires() - time(),
+ $task->getPriority(),
$task->getFailureCount(),
phutil_tag(
'a',
@@ -158,6 +159,7 @@
pht('Class'),
pht('Owner'),
pht('Expires'),
+ pht('Priority'),
pht('Failures'),
'',
));
@@ -168,6 +170,7 @@
'',
'',
'n',
+ 'n',
'action',
));
$leased_table->setNoDataString(pht('No tasks are leased by workers.'));
diff --git a/src/applications/diffusion/engine/DiffusionCommitHookEngine.php b/src/applications/diffusion/engine/DiffusionCommitHookEngine.php
--- a/src/applications/diffusion/engine/DiffusionCommitHookEngine.php
+++ b/src/applications/diffusion/engine/DiffusionCommitHookEngine.php
@@ -187,7 +187,8 @@
'eventPHID' => $event->getPHID(),
'emailPHIDs' => array_values($this->emailPHIDs),
'info' => $this->loadCommitInfoForWorker($all_updates),
- ));
+ ),
+ PhabricatorWorker::PRIORITY_ALERTS);
}
return 0;
diff --git a/src/applications/metamta/management/PhabricatorMailManagementResendWorkflow.php b/src/applications/metamta/management/PhabricatorMailManagementResendWorkflow.php
--- a/src/applications/metamta/management/PhabricatorMailManagementResendWorkflow.php
+++ b/src/applications/metamta/management/PhabricatorMailManagementResendWorkflow.php
@@ -49,7 +49,8 @@
$mailer_task = PhabricatorWorker::scheduleTask(
'PhabricatorMetaMTAWorker',
- $message->getID());
+ $message->getID(),
+ PhabricatorWorker::PRIORITY_ALERTS);
$console->writeOut(
"Queued message #%d for resend.\n",
diff --git a/src/applications/metamta/storage/PhabricatorMetaMTAMail.php b/src/applications/metamta/storage/PhabricatorMetaMTAMail.php
--- a/src/applications/metamta/storage/PhabricatorMetaMTAMail.php
+++ b/src/applications/metamta/storage/PhabricatorMetaMTAMail.php
@@ -300,7 +300,8 @@
// Queue a task to send this mail.
$mailer_task = PhabricatorWorker::scheduleTask(
'PhabricatorMetaMTAWorker',
- $this->getID());
+ $this->getID(),
+ PhabricatorWorker::PRIORITY_ALERTS);
$this->saveTransaction();
diff --git a/src/applications/search/index/PhabricatorSearchIndexer.php b/src/applications/search/index/PhabricatorSearchIndexer.php
--- a/src/applications/search/index/PhabricatorSearchIndexer.php
+++ b/src/applications/search/index/PhabricatorSearchIndexer.php
@@ -7,7 +7,8 @@
'PhabricatorSearchWorker',
array(
'documentPHID' => $phid,
- ));
+ ),
+ PhabricatorWorker::PRIORITY_IMPORT);
}
public function indexDocumentByPHID($phid) {
diff --git a/src/infrastructure/daemon/workers/PhabricatorWorker.php b/src/infrastructure/daemon/workers/PhabricatorWorker.php
--- a/src/infrastructure/daemon/workers/PhabricatorWorker.php
+++ b/src/infrastructure/daemon/workers/PhabricatorWorker.php
@@ -9,6 +9,11 @@
private static $runAllTasksInProcess = false;
private $queuedTasks = array();
+ const PRIORITY_ALERTS = 4000;
+ const PRIORITY_DEFAULT = 3000;
+ const PRIORITY_BULK = 2000;
+ const PRIORITY_IMPORT = 1000;
+
/* -( Configuring Retries and Failures )----------------------------------- */
@@ -32,13 +37,13 @@
/**
- * Return the maximum number of times this task may be retried before it
- * is considered permanently failed. By default, tasks retry indefinitely. You
+ * Return the maximum number of times this task may be retried before it is
+ * considered permanently failed. By default, tasks retry indefinitely. You
* can throw a @{class:PhabricatorWorkerPermanentFailureException} to cause an
* immediate permanent failure.
*
- * @return int|null Number of times the task will retry before permanent
- * failure. Return `null` to retry indefinitely.
+ * @return int|null Number of times the task will retry before permanent
+ * failure. Return `null` to retry indefinitely.
*
* @task config
*/
@@ -52,15 +57,15 @@
* retrying. For most tasks you can leave this at `null`, which will give you
* a short default retry period (currently 60 seconds).
*
- * @param PhabricatorWorkerTask The task itself. This object is probably
- * useful mostly to examine the failure
- * count if you want to implement staggered
- * retries, or to examine the execution
- * exception if you want to react to
- * different failures in different ways.
- * @return int|null Number of seconds to wait between retries,
- * or null for a default retry period
- * (currently 60 seconds).
+ * @param PhabricatorWorkerTask The task itself. This object is probably
+ * useful mostly to examine the failure count
+ * if you want to implement staggered retries,
+ * or to examine the execution exception if
+ * you want to react to different failures in
+ * different ways.
+ * @return int|null Number of seconds to wait between retries,
+ * or null for a default retry period
+ * (currently 60 seconds).
*
* @task config
*/
@@ -70,7 +75,6 @@
abstract protected function doWork();
-
final public function __construct($data) {
$this->data = $data;
}
@@ -83,10 +87,19 @@
$this->doWork();
}
- final public static function scheduleTask($task_class, $data) {
+ final public static function scheduleTask(
+ $task_class,
+ $data,
+ $priority = null) {
+
+ if ($priority === null) {
+ $priority = self::PRIORITY_DEFAULT;
+ }
+
$task = id(new PhabricatorWorkerActiveTask())
->setTaskClass($task_class)
- ->setData($data);
+ ->setData($data)
+ ->setPriority($priority);
if (self::$runAllTasksInProcess) {
// Do the work in-process.
@@ -96,8 +109,8 @@
try {
$worker->doWork();
foreach ($worker->getQueuedTasks() as $queued_task) {
- list($queued_class, $queued_data) = $queued_task;
- self::scheduleTask($queued_class, $queued_data);
+ list($queued_class, $queued_data, $queued_priority) = $queued_task;
+ self::scheduleTask($queued_class, $queued_data, $queued_priority);
}
break;
} catch (PhabricatorWorkerYieldException $ex) {
@@ -106,7 +119,6 @@
'In-process task "%s" yielded for %s seconds, sleeping...',
$task_class,
$ex->getDuration()));
-
sleep($ex->getDuration());
}
}
@@ -184,8 +196,7 @@
foreach ($tasks as $task) {
if ($task->getResult() != PhabricatorWorkerArchiveTask::RESULT_SUCCESS) {
- throw new Exception(
- pht('Task %d failed!', $task->getID()));
+ throw new Exception(pht('Task %d failed!', $task->getID()));
}
}
}
@@ -204,7 +215,7 @@
self::$runAllTasksInProcess = $all;
}
- protected function log($pattern /* $args */) {
+ final protected function log($pattern /* , ... */) {
$console = PhutilConsole::getConsole();
$argv = func_get_args();
call_user_func_array(array($console, 'writeLog'), $argv);
@@ -219,10 +230,11 @@
*
* @param string Task class to queue.
* @param array Data for the followup task.
+ * @param int Priority for the followup task.
* @return this
*/
- protected function queueTask($class, array $data) {
- $this->queuedTasks[] = array($class, $data);
+ final protected function queueTask($class, array $data, $priority = null) {
+ $this->queuedTasks[] = array($class, $data, $priority);
return $this;
}
@@ -230,9 +242,9 @@
/**
* Get tasks queued as followups by @{method:queueTask}.
*
- * @return list<pair<string, wild>> Queued task specifications.
+ * @return list<tuple<string, wild, int>> Queued task specifications.
*/
- public function getQueuedTasks() {
+ final public function getQueuedTasks() {
return $this->queuedTasks;
}
diff --git a/src/infrastructure/daemon/workers/__tests__/PhabricatorTestWorker.php b/src/infrastructure/daemon/workers/__tests__/PhabricatorTestWorker.php
--- a/src/infrastructure/daemon/workers/__tests__/PhabricatorTestWorker.php
+++ b/src/infrastructure/daemon/workers/__tests__/PhabricatorTestWorker.php
@@ -26,8 +26,7 @@
protected function doWork() {
switch (idx($this->getTaskData(), 'doWork')) {
case 'fail-temporary':
- throw new Exception(
- 'Temporary failure!');
+ throw new Exception('Temporary failure!');
case 'fail-permanent':
throw new PhabricatorWorkerPermanentFailureException(
'Permanent failure!');
diff --git a/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php b/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php
--- a/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php
+++ b/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php
@@ -9,35 +9,30 @@
}
public function testLeaseTask() {
- // Leasing should work.
-
$task = $this->scheduleTask();
-
- $this->expectNextLease($task);
+ $this->expectNextLease($task, 'Leasing should work.');
}
public function testMultipleLease() {
- // We should not be able to lease a task multiple times.
-
$task = $this->scheduleTask();
$this->expectNextLease($task);
- $this->expectNextLease(null);
+ $this->expectNextLease(
+ null,
+ 'We should not be able to lease a task multiple times.');
}
public function testOldestFirst() {
- // Older tasks should lease first, all else being equal.
-
$task1 = $this->scheduleTask();
$task2 = $this->scheduleTask();
- $this->expectNextLease($task1);
+ $this->expectNextLease(
+ $task1,
+ 'Older tasks should lease first, all else being equal.');
$this->expectNextLease($task2);
}
public function testNewBeforeLeased() {
- // Tasks not previously leased should lease before previously leased tasks.
-
$task1 = $this->scheduleTask();
$task2 = $this->scheduleTask();
@@ -45,7 +40,10 @@
$task1->setLeaseExpires(time() - 100000);
$task1->forceSaveWithoutLease();
- $this->expectNextLease($task2);
+ $this->expectNextLease(
+ $task2,
+ 'Tasks not previously leased should lease before previously '.
+ 'leased tasks.');
$this->expectNextLease($task1);
}
@@ -138,15 +136,13 @@
public function testRequiredLeaseTime() {
$task = $this->scheduleAndExecuteTask(
array(
- 'getRequiredLeaseTime' => 1000000,
+ 'getRequiredLeaseTime' => 1000000,
));
$this->assertTrue(($task->getLeaseExpires() - time()) > 1000);
}
public function testLeasedIsOldestFirst() {
- // Tasks which expired earlier should lease first, all else being equal.
-
$task1 = $this->scheduleTask();
$task2 = $this->scheduleTask();
@@ -158,36 +154,59 @@
$task2->setLeaseExpires(time() - 200000);
$task2->forceSaveWithoutLease();
- $this->expectNextLease($task2);
+ $this->expectNextLease(
+ $task2,
+ 'Tasks which expired earlier should lease first, all else being equal.');
$this->expectNextLease($task1);
}
- private function expectNextLease($task) {
+ public function testLeasedIsHighestPriority() {
+ $task1 = $this->scheduleTask(array(), 2);
+ $task2 = $this->scheduleTask(array(), 1);
+ $task3 = $this->scheduleTask(array(), 1);
+
+ $this->expectNextLease(
+ $task1,
+ 'Tasks with a higher priority should be scheduled first.');
+ $this->expectNextLease(
+ $task2,
+ 'Tasks with the same priority should be FIFO.');
+ $this->expectNextLease($task3);
+ }
+
+ private function expectNextLease($task, $message = null) {
$leased = id(new PhabricatorWorkerLeaseQuery())
->setLimit(1)
->execute();
if ($task === null) {
- $this->assertEqual(0, count($leased));
+ $this->assertEqual(0, count($leased), $message);
return null;
} else {
- $this->assertEqual(1, count($leased));
+ $this->assertEqual(1, count($leased), $message);
$this->assertEqual(
(int)head($leased)->getID(),
- (int)$task->getID());
+ (int)$task->getID(),
+ $message);
return head($leased);
}
}
- private function scheduleAndExecuteTask(array $data = array()) {
- $task = $this->scheduleTask($data);
+ private function scheduleAndExecuteTask(
+ array $data = array(),
+ $priority = null) {
+
+ $task = $this->scheduleTask($data, $priority);
$task = $this->expectNextLease($task);
$task = $task->executeTask();
return $task;
}
- private function scheduleTask(array $data = array()) {
- return PhabricatorWorker::scheduleTask('PhabricatorTestWorker', $data);
+ private function scheduleTask(array $data = array(), $priority = null) {
+ return PhabricatorWorker::scheduleTask(
+ 'PhabricatorTestWorker',
+ $data,
+ $priority);
}
}
diff --git a/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php b/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php
--- a/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php
+++ b/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php
@@ -52,7 +52,6 @@
$leased = 0;
foreach ($phases as $phase) {
-
// NOTE: If we issue `UPDATE ... WHERE ... ORDER BY id ASC`, the query
// goes very, very slowly. The `ORDER BY` triggers this, although we get
// the same apparent results without it. Without the ORDER BY, binary
@@ -126,7 +125,6 @@
}
private function buildWhereClause(AphrontDatabaseConnection $conn_w, $phase) {
-
$where = array();
switch ($phase) {
@@ -141,10 +139,7 @@
}
if ($this->ids) {
- $where[] = qsprintf(
- $conn_w,
- 'id IN (%Ld)',
- $this->ids);
+ $where[] = qsprintf($conn_w, 'id IN (%Ld)', $this->ids);
}
return $this->formatWhereClause($where);
@@ -162,13 +157,8 @@
switch ($phase) {
case self::PHASE_UNLEASED:
- $where[] = qsprintf(
- $conn_w,
- 'leaseOwner IS NULL');
- $where[] = qsprintf(
- $conn_w,
- 'id IN (%Ld)',
- ipull($rows, 'id'));
+ $where[] = qsprintf($conn_w, 'leaseOwner IS NULL');
+ $where[] = qsprintf($conn_w, 'id IN (%Ld)', ipull($rows, 'id'));
break;
case self::PHASE_EXPIRED:
$in = array();
@@ -179,24 +169,20 @@
$row['id'],
$row['leaseOwner']);
}
- $where[] = qsprintf(
- $conn_w,
- '(%Q)',
- implode(' OR ', $in));
+ $where[] = qsprintf($conn_w, '(%Q)', implode(' OR ', $in));
break;
default:
throw new Exception("Unknown phase '{$phase}'!");
}
return $this->formatWhereClause($where);
-
}
private function buildOrderClause(AphrontDatabaseConnection $conn_w, $phase) {
switch ($phase) {
case self::PHASE_UNLEASED:
- // When selecting new tasks, we want to consume them in roughly
- // FIFO order, so we order by the task ID.
+ // When selecting new tasks, we want to consume them in order of
+ // decreasing priority (and then FIFO).
return qsprintf($conn_w, 'ORDER BY id ASC');
case self::PHASE_EXPIRED:
// When selecting failed tasks, we want to consume them in roughly
diff --git a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php
--- a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php
+++ b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php
@@ -86,6 +86,7 @@
->setLeaseExpires($this->getLeaseExpires())
->setFailureCount($this->getFailureCount())
->setDataID($this->getDataID())
+ ->setPriority($this->getPriority())
->setResult($result)
->setDuration($duration);
@@ -164,12 +165,11 @@
if ($did_succeed) {
foreach ($worker->getQueuedTasks() as $task) {
list($class, $data) = $task;
- PhabricatorWorker::scheduleTask($class, $data);
+ PhabricatorWorker::scheduleTask($class, $data, $this->getPriority());
}
}
return $result;
}
-
}
diff --git a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerArchiveTask.php b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerArchiveTask.php
--- a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerArchiveTask.php
+++ b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerArchiveTask.php
@@ -11,8 +11,7 @@
public function save() {
if ($this->getID() === null) {
- throw new Exception(
- 'Trying to archive a task with no ID.');
+ throw new Exception('Trying to archive a task with no ID.');
}
$other = new PhabricatorWorkerActiveTask();
@@ -57,6 +56,7 @@
->setLeaseExpires(0)
->setFailureCount(0)
->setDataID($this->getDataID())
+ ->setPriority($this->getPriority())
->insert();
$this->setDataID(null);
diff --git a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTask.php b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTask.php
--- a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTask.php
+++ b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTask.php
@@ -9,33 +9,34 @@
protected $leaseExpires;
protected $failureCount;
protected $dataID;
+ protected $priority;
private $data;
private $executionException;
- public function setExecutionException(Exception $execution_exception) {
+ final public function setExecutionException(Exception $execution_exception) {
$this->executionException = $execution_exception;
return $this;
}
- public function getExecutionException() {
+ final public function getExecutionException() {
return $this->executionException;
}
- public function setData($data) {
+ final public function setData($data) {
$this->data = $data;
return $this;
}
- public function getData() {
+ final public function getData() {
return $this->data;
}
- public function isArchived() {
+ final public function isArchived() {
return ($this instanceof PhabricatorWorkerArchiveTask);
}
- public function getWorkerInstance() {
+ final public function getWorkerInstance() {
$id = $this->getID();
$class = $this->getTaskClass();
diff --git a/src/infrastructure/sms/adapter/PhabricatorSMSImplementationAdapter.php b/src/infrastructure/sms/adapter/PhabricatorSMSImplementationAdapter.php
--- a/src/infrastructure/sms/adapter/PhabricatorSMSImplementationAdapter.php
+++ b/src/infrastructure/sms/adapter/PhabricatorSMSImplementationAdapter.php
@@ -79,6 +79,8 @@
'PhabricatorSMSDemultiplexWorker',
array(
'toNumbers' => $to_numbers,
- 'body' => $body));
+ 'body' => $body,
+ ),
+ PhabricatorWorker::PRIORITY_ALERTS);
}
}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Wed, Mar 26, 9:13 PM (1 w, 3 d ago)
Storage Engine
blob
Storage Format
Encrypted (AES-256-CBC)
Storage Handle
7596897
Default Alt Text
D9871.id23750.diff (21 KB)
Attached To
Mode
D9871: Allow worker tasks to have priorities
Attached
Detach File
Event Timeline
Log In to Comment