diff --git a/src/applications/search/worker/PhabricatorSearchWorker.php b/src/applications/search/worker/PhabricatorSearchWorker.php index f93df63981..361d9c9b6f 100644 --- a/src/applications/search/worker/PhabricatorSearchWorker.php +++ b/src/applications/search/worker/PhabricatorSearchWorker.php @@ -1,94 +1,94 @@ $phid, 'parameters' => $parameters, ), array( - 'priority' => parent::PRIORITY_IMPORT, + 'priority' => parent::PRIORITY_INDEX, 'objectPHID' => $phid, )); } protected function doWork() { $data = $this->getTaskData(); $object_phid = idx($data, 'documentPHID'); $object = $this->loadObjectForIndexing($object_phid); $engine = id(new PhabricatorIndexEngine()) ->setObject($object); $parameters = idx($data, 'parameters', array()); $engine->setParameters($parameters); if (!$engine->shouldIndexObject()) { return; } $key = "index.{$object_phid}"; $lock = PhabricatorGlobalLock::newLock($key); try { $lock->lock(1); } catch (PhutilLockException $ex) { // If we fail to acquire the lock, just yield. It's expected that we may // contend on this lock occasionally if a large object receives many // updates in a short period of time, and it's appropriate to just retry // rebuilding the index later. throw new PhabricatorWorkerYieldException(15); } try { // Reload the object now that we have a lock, to make sure we have the // most current version. $object = $this->loadObjectForIndexing($object->getPHID()); $engine->setObject($object); $engine->indexObject(); } catch (Exception $ex) { $lock->unlock(); if (!($ex instanceof PhabricatorWorkerPermanentFailureException)) { $ex = new PhabricatorWorkerPermanentFailureException( pht( 'Failed to update search index for document "%s": %s', $object_phid, $ex->getMessage())); } throw $ex; } $lock->unlock(); } private function loadObjectForIndexing($phid) { $viewer = PhabricatorUser::getOmnipotentUser(); $object = id(new PhabricatorObjectQuery()) ->setViewer($viewer) ->withPHIDs(array($phid)) ->executeOne(); if (!$object) { throw new PhabricatorWorkerPermanentFailureException( pht( 'Unable to load object "%s" to rebuild indexes.', $phid)); } return $object; } } diff --git a/src/infrastructure/daemon/workers/PhabricatorWorker.php b/src/infrastructure/daemon/workers/PhabricatorWorker.php index 1b9821b68d..f055544b7b 100644 --- a/src/infrastructure/daemon/workers/PhabricatorWorker.php +++ b/src/infrastructure/daemon/workers/PhabricatorWorker.php @@ -1,314 +1,315 @@ currentWorkerTask = $task; return $this; } public function getCurrentWorkerTask() { return $this->currentWorkerTask; } public function getCurrentWorkerTaskID() { $task = $this->getCurrentWorkerTask(); if (!$task) { return null; } return $task->getID(); } abstract protected function doWork(); final public function __construct($data) { $this->data = $data; } final protected function getTaskData() { return $this->data; } final protected function getTaskDataValue($key, $default = null) { $data = $this->getTaskData(); if (!is_array($data)) { throw new PhabricatorWorkerPermanentFailureException( pht('Expected task data to be a dictionary.')); } return idx($data, $key, $default); } final public function executeTask() { $this->doWork(); } final public static function scheduleTask( $task_class, $data, $options = array()) { PhutilTypeSpec::checkMap( $options, array( 'priority' => 'optional int|null', 'objectPHID' => 'optional string|null', 'delayUntil' => 'optional int|null', )); $priority = idx($options, 'priority'); if ($priority === null) { $priority = self::PRIORITY_DEFAULT; } $object_phid = idx($options, 'objectPHID'); $task = id(new PhabricatorWorkerActiveTask()) ->setTaskClass($task_class) ->setData($data) ->setPriority($priority) ->setObjectPHID($object_phid); $delay = idx($options, 'delayUntil'); if ($delay) { $task->setLeaseExpires($delay); } if (self::$runAllTasksInProcess) { // Do the work in-process. $worker = newv($task_class, array($data)); while (true) { try { $worker->executeTask(); $worker->flushTaskQueue(); break; } catch (PhabricatorWorkerYieldException $ex) { phlog( pht( 'In-process task "%s" yielded for %s seconds, sleeping...', $task_class, $ex->getDuration())); sleep($ex->getDuration()); } } // Now, save a task row and immediately archive it so we can return an // object with a valid ID. $task->openTransaction(); $task->save(); $archived = $task->archiveTask( PhabricatorWorkerArchiveTask::RESULT_SUCCESS, 0); $task->saveTransaction(); return $archived; } else { $task->save(); return $task; } } public function renderForDisplay(PhabricatorUser $viewer) { return null; } /** * Set this flag to execute scheduled tasks synchronously, in the same * process. This is useful for debugging, and otherwise dramatically worse * in every way imaginable. */ public static function setRunAllTasksInProcess($all) { self::$runAllTasksInProcess = $all; } final protected function log($pattern /* , ... */) { $console = PhutilConsole::getConsole(); $argv = func_get_args(); call_user_func_array(array($console, 'writeLog'), $argv); return $this; } /** * Queue a task to be executed after this one succeeds. * * The followup task will be queued only if this task completes cleanly. * * @param string Task class to queue. * @param array Data for the followup task. * @param array Options for the followup task. * @return this */ final protected function queueTask( $class, array $data, array $options = array()) { $this->queuedTasks[] = array($class, $data, $options); return $this; } /** * Get tasks queued as followups by @{method:queueTask}. * * @return list> Queued task specifications. */ final protected function getQueuedTasks() { return $this->queuedTasks; } /** * Schedule any queued tasks, then empty the task queue. * * By default, the queue is flushed only if a task succeeds. You can call * this method to force the queue to flush before failing (for example, if * you are using queues to improve locking behavior). * * @param map Optional default options. * @return this */ final public function flushTaskQueue($defaults = array()) { foreach ($this->getQueuedTasks() as $task) { list($class, $data, $options) = $task; $options = $options + $defaults; self::scheduleTask($class, $data, $options); } $this->queuedTasks = array(); } /** * Awaken tasks that have yielded. * * Reschedules the specified tasks if they are currently queued in a yielded, * unleased, unretried state so they'll execute sooner. This can let the * queue avoid unnecessary waits. * * This method does not provide any assurances about when these tasks will * execute, or even guarantee that it will have any effect at all. * * @param list List of task IDs to try to awaken. * @return void */ final public static function awakenTaskIDs(array $ids) { if (!$ids) { return; } $table = new PhabricatorWorkerActiveTask(); $conn_w = $table->establishConnection('w'); // NOTE: At least for now, we're keeping these tasks yielded, just // pretending that they threw a shorter yield than they really did. // Overlap the windows here to handle minor client/server time differences // and because it's likely correct to push these tasks to the head of their // respective priorities. There is a good chance they are ready to execute. $window = phutil_units('1 hour in seconds'); $epoch_ago = (PhabricatorTime::getNow() - $window); queryfx( $conn_w, 'UPDATE %T SET leaseExpires = %d WHERE id IN (%Ld) AND leaseOwner = %s AND leaseExpires > %d AND failureCount = 0', $table->getTableName(), $epoch_ago, $ids, self::YIELD_OWNER, $epoch_ago); } protected function newContentSource() { return PhabricatorContentSource::newForSource( PhabricatorDaemonContentSource::SOURCECONST); } }