diff --git a/resources/sql/autopatches/20140710.workerpriority.sql b/resources/sql/autopatches/20140710.workerpriority.sql new file mode 100644 --- /dev/null +++ b/resources/sql/autopatches/20140710.workerpriority.sql @@ -0,0 +1,11 @@ +ALTER TABLE {$NAMESPACE}_worker.worker_activetask + ADD COLUMN priority bigint unsigned NOT NULL; + +ALTER TABLE {$NAMESPACE}_worker.worker_activetask + ADD KEY (leaseOwner, priority, id); + +ALTER TABLE {$NAMESPACE}_worker.worker_archivetask + ADD COLUMN priority bigint unsigned NOT NULL; + +ALTER TABLE {$NAMESPACE}_worker.worker_archivetask + ADD KEY (leaseOwner, priority, id); diff --git a/src/__phutil_library_map__.php b/src/__phutil_library_map__.php --- a/src/__phutil_library_map__.php +++ b/src/__phutil_library_map__.php @@ -710,6 +710,8 @@ 'DrydockSSHCommandInterface' => 'applications/drydock/interface/command/DrydockSSHCommandInterface.php', 'DrydockWebrootInterface' => 'applications/drydock/interface/webroot/DrydockWebrootInterface.php', 'DrydockWorkingCopyBlueprintImplementation' => 'applications/drydock/blueprint/DrydockWorkingCopyBlueprintImplementation.php', + 'DummyWorker' => 'applications/daemon/worker/DummyWorker.php', + 'FailureWorker' => 'applications/daemon/worker/FailureWorker.php', 'FeedPublisherHTTPWorker' => 'applications/feed/worker/FeedPublisherHTTPWorker.php', 'FeedPublisherWorker' => 'applications/feed/worker/FeedPublisherWorker.php', 'FeedPushWorker' => 'applications/feed/worker/FeedPushWorker.php', @@ -3426,6 +3428,8 @@ 'DrydockSSHCommandInterface' => 'DrydockCommandInterface', 'DrydockWebrootInterface' => 'DrydockInterface', 'DrydockWorkingCopyBlueprintImplementation' => 'DrydockBlueprintImplementation', + 'DummyWorker' => 'PhabricatorWorker', + 'FailureWorker' => 'PhabricatorWorker', 'FeedPublisherHTTPWorker' => 'FeedPushWorker', 'FeedPublisherWorker' => 'FeedPushWorker', 'FeedPushWorker' => 'PhabricatorWorker', diff --git a/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php b/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php --- a/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php +++ b/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php @@ -136,6 +136,7 @@ $task->getTaskClass(), $task->getLeaseOwner(), $task->getLeaseExpires() - time(), + $task->getPriority(), $task->getFailureCount(), phutil_tag( 'a', @@ -158,6 +159,7 @@ pht('Class'), pht('Owner'), pht('Expires'), + pht('Priority'), pht('Failures'), '', )); @@ -168,6 +170,7 @@ '', '', 'n', + 'n', 'action', )); $leased_table->setNoDataString(pht('No tasks are leased by workers.')); diff --git a/src/applications/daemon/worker/DummyWorker.php b/src/applications/daemon/worker/DummyWorker.php new file mode 100644 --- /dev/null +++ b/src/applications/daemon/worker/DummyWorker.php @@ -0,0 +1,9 @@ +data = $data; } @@ -83,10 +84,15 @@ $this->doWork(); } - final public static function scheduleTask($task_class, $data) { + final public static function scheduleTask( + $task_class, + $data, + $priority = self::DEFAULT_PRIORITY) { + $task = id(new PhabricatorWorkerActiveTask()) ->setTaskClass($task_class) - ->setData($data); + ->setData($data) + ->setPriority($priority); if (self::$runAllTasksInProcess) { // Do the work in-process. @@ -96,8 +102,8 @@ try { $worker->doWork(); foreach ($worker->getQueuedTasks() as $queued_task) { - list($queued_class, $queued_data) = $queued_task; - self::scheduleTask($queued_class, $queued_data); + list($queued_class, $queued_data, $queued_priority) = $queued_task; + self::scheduleTask($queued_class, $queued_data, $queued_priority); } break; } catch (PhabricatorWorkerYieldException $ex) { @@ -106,7 +112,6 @@ 'In-process task "%s" yielded for %s seconds, sleeping...', $task_class, $ex->getDuration())); - sleep($ex->getDuration()); } } @@ -184,8 +189,7 @@ foreach ($tasks as $task) { if ($task->getResult() != PhabricatorWorkerArchiveTask::RESULT_SUCCESS) { - throw new Exception( - pht('Task %d failed!', $task->getID())); + throw new Exception(pht('Task %d failed!', $task->getID())); } } } @@ -204,7 +208,7 @@ self::$runAllTasksInProcess = $all; } - protected function log($pattern /* $args */) { + protected function log($pattern /* , ... */) { $console = PhutilConsole::getConsole(); $argv = func_get_args(); call_user_func_array(array($console, 'writeLog'), $argv); @@ -219,10 +223,15 @@ * * @param string Task class to queue. * @param array Data for the followup task. + * @param int Priority for the followup task * @return this */ - protected function queueTask($class, array $data) { - $this->queuedTasks[] = array($class, $data); + protected function queueTask( + $class, + array $data, + $priority = self::DEFAULT_PRIORITY) { + + $this->queuedTasks[] = array($class, $data, $priority); return $this; } @@ -230,7 +239,7 @@ /** * Get tasks queued as followups by @{method:queueTask}. * - * @return list> Queued task specifications. + * @return list> Queued task specifications. */ public function getQueuedTasks() { return $this->queuedTasks; diff --git a/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php b/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php --- a/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php +++ b/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php @@ -52,7 +52,6 @@ $leased = 0; foreach ($phases as $phase) { - // NOTE: If we issue `UPDATE ... WHERE ... ORDER BY id ASC`, the query // goes very, very slowly. The `ORDER BY` triggers this, although we get // the same apparent results without it. Without the ORDER BY, binary @@ -126,7 +125,6 @@ } private function buildWhereClause(AphrontDatabaseConnection $conn_w, $phase) { - $where = array(); switch ($phase) { @@ -141,10 +139,7 @@ } if ($this->ids) { - $where[] = qsprintf( - $conn_w, - 'id IN (%Ld)', - $this->ids); + $where[] = qsprintf($conn_w, 'id IN (%Ld)', $this->ids); } return $this->formatWhereClause($where); @@ -162,13 +157,8 @@ switch ($phase) { case self::PHASE_UNLEASED: - $where[] = qsprintf( - $conn_w, - 'leaseOwner IS NULL'); - $where[] = qsprintf( - $conn_w, - 'id IN (%Ld)', - ipull($rows, 'id')); + $where[] = qsprintf($conn_w, 'leaseOwner IS NULL'); + $where[] = qsprintf($conn_w, 'id IN (%Ld)', ipull($rows, 'id')); break; case self::PHASE_EXPIRED: $in = array(); @@ -179,25 +169,20 @@ $row['id'], $row['leaseOwner']); } - $where[] = qsprintf( - $conn_w, - '(%Q)', - implode(' OR ', $in)); + $where[] = qsprintf($conn_w, '(%Q)', implode(' OR ', $in)); break; default: throw new Exception("Unknown phase '{$phase}'!"); } return $this->formatWhereClause($where); - } private function buildOrderClause(AphrontDatabaseConnection $conn_w, $phase) { switch ($phase) { case self::PHASE_UNLEASED: - // When selecting new tasks, we want to consume them in roughly - // FIFO order, so we order by the task ID. - return qsprintf($conn_w, 'ORDER BY id ASC'); + // TODO: Comment goes here. + return qsprintf($conn_w, 'ORDER BY priority DESC, id ASC'); case self::PHASE_EXPIRED: // When selecting failed tasks, we want to consume them in roughly // FIFO order of their failures, which is not necessarily their original diff --git a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php --- a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php +++ b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php @@ -86,6 +86,7 @@ ->setLeaseExpires($this->getLeaseExpires()) ->setFailureCount($this->getFailureCount()) ->setDataID($this->getDataID()) + ->setPriority($this->getPriority()) ->setResult($result) ->setDuration($duration); @@ -164,12 +165,11 @@ if ($did_succeed) { foreach ($worker->getQueuedTasks() as $task) { list($class, $data) = $task; - PhabricatorWorker::scheduleTask($class, $data); + PhabricatorWorker::scheduleTask($class, $data, $this->getPriority()); } } return $result; } - } diff --git a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerArchiveTask.php b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerArchiveTask.php --- a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerArchiveTask.php +++ b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerArchiveTask.php @@ -11,8 +11,7 @@ public function save() { if ($this->getID() === null) { - throw new Exception( - 'Trying to archive a task with no ID.'); + throw new Exception('Trying to archive a task with no ID.'); } $other = new PhabricatorWorkerActiveTask(); @@ -57,6 +56,7 @@ ->setLeaseExpires(0) ->setFailureCount(0) ->setDataID($this->getDataID()) + ->setPriority($this->getPriority()) ->insert(); $this->setDataID(null); diff --git a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTask.php b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTask.php --- a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTask.php +++ b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTask.php @@ -9,33 +9,34 @@ protected $leaseExpires; protected $failureCount; protected $dataID; + protected $priority; private $data; private $executionException; - public function setExecutionException(Exception $execution_exception) { + final public function setExecutionException(Exception $execution_exception) { $this->executionException = $execution_exception; return $this; } - public function getExecutionException() { + final public function getExecutionException() { return $this->executionException; } - public function setData($data) { + final public function setData($data) { $this->data = $data; return $this; } - public function getData() { + final public function getData() { return $this->data; } - public function isArchived() { + final public function isArchived() { return ($this instanceof PhabricatorWorkerArchiveTask); } - public function getWorkerInstance() { + final public function getWorkerInstance() { $id = $this->getID(); $class = $this->getTaskClass();