Page MenuHomePhabricator

D9871.id23699.diff
No OneTemporary

D9871.id23699.diff

diff --git a/resources/sql/autopatches/20140710.workerpriority.sql b/resources/sql/autopatches/20140710.workerpriority.sql
new file mode 100644
--- /dev/null
+++ b/resources/sql/autopatches/20140710.workerpriority.sql
@@ -0,0 +1,11 @@
+ALTER TABLE {$NAMESPACE}_worker.worker_activetask
+ ADD COLUMN priority bigint unsigned NOT NULL;
+
+ALTER TABLE {$NAMESPACE}_worker.worker_activetask
+ ADD KEY (leaseOwner, priority, id);
+
+ALTER TABLE {$NAMESPACE}_worker.worker_archivetask
+ ADD COLUMN priority bigint unsigned NOT NULL;
+
+ALTER TABLE {$NAMESPACE}_worker.worker_archivetask
+ ADD KEY (leaseOwner, priority, id);
diff --git a/src/__phutil_library_map__.php b/src/__phutil_library_map__.php
--- a/src/__phutil_library_map__.php
+++ b/src/__phutil_library_map__.php
@@ -710,6 +710,8 @@
'DrydockSSHCommandInterface' => 'applications/drydock/interface/command/DrydockSSHCommandInterface.php',
'DrydockWebrootInterface' => 'applications/drydock/interface/webroot/DrydockWebrootInterface.php',
'DrydockWorkingCopyBlueprintImplementation' => 'applications/drydock/blueprint/DrydockWorkingCopyBlueprintImplementation.php',
+ 'DummyWorker' => 'applications/daemon/worker/DummyWorker.php',
+ 'FailureWorker' => 'applications/daemon/worker/FailureWorker.php',
'FeedPublisherHTTPWorker' => 'applications/feed/worker/FeedPublisherHTTPWorker.php',
'FeedPublisherWorker' => 'applications/feed/worker/FeedPublisherWorker.php',
'FeedPushWorker' => 'applications/feed/worker/FeedPushWorker.php',
@@ -3426,6 +3428,8 @@
'DrydockSSHCommandInterface' => 'DrydockCommandInterface',
'DrydockWebrootInterface' => 'DrydockInterface',
'DrydockWorkingCopyBlueprintImplementation' => 'DrydockBlueprintImplementation',
+ 'DummyWorker' => 'PhabricatorWorker',
+ 'FailureWorker' => 'PhabricatorWorker',
'FeedPublisherHTTPWorker' => 'FeedPushWorker',
'FeedPublisherWorker' => 'FeedPushWorker',
'FeedPushWorker' => 'PhabricatorWorker',
diff --git a/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php b/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php
--- a/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php
+++ b/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php
@@ -136,6 +136,7 @@
$task->getTaskClass(),
$task->getLeaseOwner(),
$task->getLeaseExpires() - time(),
+ $task->getPriority(),
$task->getFailureCount(),
phutil_tag(
'a',
@@ -158,6 +159,7 @@
pht('Class'),
pht('Owner'),
pht('Expires'),
+ pht('Priority'),
pht('Failures'),
'',
));
@@ -168,6 +170,7 @@
'',
'',
'n',
+ 'n',
'action',
));
$leased_table->setNoDataString(pht('No tasks are leased by workers.'));
diff --git a/src/applications/daemon/worker/DummyWorker.php b/src/applications/daemon/worker/DummyWorker.php
new file mode 100644
--- /dev/null
+++ b/src/applications/daemon/worker/DummyWorker.php
@@ -0,0 +1,9 @@
+<?php
+
+final class DummyWorker extends PhabricatorWorker {
+
+ protected function doWork() {
+ sleep(60);
+ }
+
+}
diff --git a/src/applications/daemon/worker/FailureWorker.php b/src/applications/daemon/worker/FailureWorker.php
new file mode 100644
--- /dev/null
+++ b/src/applications/daemon/worker/FailureWorker.php
@@ -0,0 +1,10 @@
+<?php
+
+final class FailureWorker extends PhabricatorWorker {
+
+ protected function doWork() {
+ sleep(60);
+ throw new Exception('Oops');
+ }
+
+}
diff --git a/src/infrastructure/daemon/workers/PhabricatorWorker.php b/src/infrastructure/daemon/workers/PhabricatorWorker.php
--- a/src/infrastructure/daemon/workers/PhabricatorWorker.php
+++ b/src/infrastructure/daemon/workers/PhabricatorWorker.php
@@ -9,6 +9,8 @@
private static $runAllTasksInProcess = false;
private $queuedTasks = array();
+ const DEFAULT_PRIORITY = 3000;
+
/* -( Configuring Retries and Failures )----------------------------------- */
@@ -32,13 +34,13 @@
/**
- * Return the maximum number of times this task may be retried before it
- * is considered permanently failed. By default, tasks retry indefinitely. You
+ * Return the maximum number of times this task may be retried before it is
+ * considered permanently failed. By default, tasks retry indefinitely. You
* can throw a @{class:PhabricatorWorkerPermanentFailureException} to cause an
* immediate permanent failure.
*
- * @return int|null Number of times the task will retry before permanent
- * failure. Return `null` to retry indefinitely.
+ * @return int|null Number of times the task will retry before permanent
+ * failure. Return `null` to retry indefinitely.
*
* @task config
*/
@@ -52,15 +54,15 @@
* retrying. For most tasks you can leave this at `null`, which will give you
* a short default retry period (currently 60 seconds).
*
- * @param PhabricatorWorkerTask The task itself. This object is probably
- * useful mostly to examine the failure
- * count if you want to implement staggered
- * retries, or to examine the execution
- * exception if you want to react to
- * different failures in different ways.
- * @return int|null Number of seconds to wait between retries,
- * or null for a default retry period
- * (currently 60 seconds).
+ * @param PhabricatorWorkerTask The task itself. This object is probably
+ * useful mostly to examine the failure count
+ * if you want to implement staggered retries,
+ * or to examine the execution exception if
+ * you want to react to different failures in
+ * different ways.
+ * @return int|null Number of seconds to wait between retries,
+ * or null for a default retry period
+ * (currently 60 seconds).
*
* @task config
*/
@@ -70,7 +72,6 @@
abstract protected function doWork();
-
final public function __construct($data) {
$this->data = $data;
}
@@ -83,10 +84,15 @@
$this->doWork();
}
- final public static function scheduleTask($task_class, $data) {
+ final public static function scheduleTask(
+ $task_class,
+ $data,
+ $priority = self::DEFAULT_PRIORITY) {
+
$task = id(new PhabricatorWorkerActiveTask())
->setTaskClass($task_class)
- ->setData($data);
+ ->setData($data)
+ ->setPriority($priority);
if (self::$runAllTasksInProcess) {
// Do the work in-process.
@@ -96,8 +102,8 @@
try {
$worker->doWork();
foreach ($worker->getQueuedTasks() as $queued_task) {
- list($queued_class, $queued_data) = $queued_task;
- self::scheduleTask($queued_class, $queued_data);
+ list($queued_class, $queued_data, $queued_priority) = $queued_task;
+ self::scheduleTask($queued_class, $queued_data, $queued_priority);
}
break;
} catch (PhabricatorWorkerYieldException $ex) {
@@ -106,7 +112,6 @@
'In-process task "%s" yielded for %s seconds, sleeping...',
$task_class,
$ex->getDuration()));
-
sleep($ex->getDuration());
}
}
@@ -184,8 +189,7 @@
foreach ($tasks as $task) {
if ($task->getResult() != PhabricatorWorkerArchiveTask::RESULT_SUCCESS) {
- throw new Exception(
- pht('Task %d failed!', $task->getID()));
+ throw new Exception(pht('Task %d failed!', $task->getID()));
}
}
}
@@ -204,7 +208,7 @@
self::$runAllTasksInProcess = $all;
}
- protected function log($pattern /* $args */) {
+ protected function log($pattern /* , ... */) {
$console = PhutilConsole::getConsole();
$argv = func_get_args();
call_user_func_array(array($console, 'writeLog'), $argv);
@@ -219,10 +223,15 @@
*
* @param string Task class to queue.
* @param array Data for the followup task.
+ * @param int Priority for the followup task
* @return this
*/
- protected function queueTask($class, array $data) {
- $this->queuedTasks[] = array($class, $data);
+ protected function queueTask(
+ $class,
+ array $data,
+ $priority = self::DEFAULT_PRIORITY) {
+
+ $this->queuedTasks[] = array($class, $data, $priority);
return $this;
}
@@ -230,7 +239,7 @@
/**
* Get tasks queued as followups by @{method:queueTask}.
*
- * @return list<pair<string, wild>> Queued task specifications.
+ * @return list<tuple<string, wild, int>> Queued task specifications.
*/
public function getQueuedTasks() {
return $this->queuedTasks;
diff --git a/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php b/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php
--- a/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php
+++ b/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php
@@ -52,7 +52,6 @@
$leased = 0;
foreach ($phases as $phase) {
-
// NOTE: If we issue `UPDATE ... WHERE ... ORDER BY id ASC`, the query
// goes very, very slowly. The `ORDER BY` triggers this, although we get
// the same apparent results without it. Without the ORDER BY, binary
@@ -126,7 +125,6 @@
}
private function buildWhereClause(AphrontDatabaseConnection $conn_w, $phase) {
-
$where = array();
switch ($phase) {
@@ -141,10 +139,7 @@
}
if ($this->ids) {
- $where[] = qsprintf(
- $conn_w,
- 'id IN (%Ld)',
- $this->ids);
+ $where[] = qsprintf($conn_w, 'id IN (%Ld)', $this->ids);
}
return $this->formatWhereClause($where);
@@ -162,13 +157,8 @@
switch ($phase) {
case self::PHASE_UNLEASED:
- $where[] = qsprintf(
- $conn_w,
- 'leaseOwner IS NULL');
- $where[] = qsprintf(
- $conn_w,
- 'id IN (%Ld)',
- ipull($rows, 'id'));
+ $where[] = qsprintf($conn_w, 'leaseOwner IS NULL');
+ $where[] = qsprintf($conn_w, 'id IN (%Ld)', ipull($rows, 'id'));
break;
case self::PHASE_EXPIRED:
$in = array();
@@ -179,25 +169,20 @@
$row['id'],
$row['leaseOwner']);
}
- $where[] = qsprintf(
- $conn_w,
- '(%Q)',
- implode(' OR ', $in));
+ $where[] = qsprintf($conn_w, '(%Q)', implode(' OR ', $in));
break;
default:
throw new Exception("Unknown phase '{$phase}'!");
}
return $this->formatWhereClause($where);
-
}
private function buildOrderClause(AphrontDatabaseConnection $conn_w, $phase) {
switch ($phase) {
case self::PHASE_UNLEASED:
- // When selecting new tasks, we want to consume them in roughly
- // FIFO order, so we order by the task ID.
- return qsprintf($conn_w, 'ORDER BY id ASC');
+ // TODO: Comment goes here.
+ return qsprintf($conn_w, 'ORDER BY priority DESC, id ASC');
case self::PHASE_EXPIRED:
// When selecting failed tasks, we want to consume them in roughly
// FIFO order of their failures, which is not necessarily their original
diff --git a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php
--- a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php
+++ b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php
@@ -86,6 +86,7 @@
->setLeaseExpires($this->getLeaseExpires())
->setFailureCount($this->getFailureCount())
->setDataID($this->getDataID())
+ ->setPriority($this->getPriority())
->setResult($result)
->setDuration($duration);
@@ -164,12 +165,11 @@
if ($did_succeed) {
foreach ($worker->getQueuedTasks() as $task) {
list($class, $data) = $task;
- PhabricatorWorker::scheduleTask($class, $data);
+ PhabricatorWorker::scheduleTask($class, $data, $this->getPriority());
}
}
return $result;
}
-
}
diff --git a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerArchiveTask.php b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerArchiveTask.php
--- a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerArchiveTask.php
+++ b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerArchiveTask.php
@@ -11,8 +11,7 @@
public function save() {
if ($this->getID() === null) {
- throw new Exception(
- 'Trying to archive a task with no ID.');
+ throw new Exception('Trying to archive a task with no ID.');
}
$other = new PhabricatorWorkerActiveTask();
@@ -57,6 +56,7 @@
->setLeaseExpires(0)
->setFailureCount(0)
->setDataID($this->getDataID())
+ ->setPriority($this->getPriority())
->insert();
$this->setDataID(null);
diff --git a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTask.php b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTask.php
--- a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTask.php
+++ b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTask.php
@@ -9,33 +9,34 @@
protected $leaseExpires;
protected $failureCount;
protected $dataID;
+ protected $priority;
private $data;
private $executionException;
- public function setExecutionException(Exception $execution_exception) {
+ final public function setExecutionException(Exception $execution_exception) {
$this->executionException = $execution_exception;
return $this;
}
- public function getExecutionException() {
+ final public function getExecutionException() {
return $this->executionException;
}
- public function setData($data) {
+ final public function setData($data) {
$this->data = $data;
return $this;
}
- public function getData() {
+ final public function getData() {
return $this->data;
}
- public function isArchived() {
+ final public function isArchived() {
return ($this instanceof PhabricatorWorkerArchiveTask);
}
- public function getWorkerInstance() {
+ final public function getWorkerInstance() {
$id = $this->getID();
$class = $this->getTaskClass();

File Metadata

Mime Type
text/plain
Expires
Sat, Mar 15, 10:31 PM (3 w, 1 d ago)
Storage Engine
blob
Storage Format
Encrypted (AES-256-CBC)
Storage Handle
7704211
Default Alt Text
D9871.id23699.diff (14 KB)

Event Timeline