diff --git a/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php b/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php index e58650371f..7e5c5f9416 100644 --- a/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php +++ b/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php @@ -1,186 +1,203 @@ true, ); } public function testLeaseTask() { // Leasing should work. $task = $this->scheduleTask(); $this->expectNextLease($task); } public function testMultipleLease() { // We should not be able to lease a task multiple times. $task = $this->scheduleTask(); $this->expectNextLease($task); $this->expectNextLease(null); } public function testOldestFirst() { // Older tasks should lease first, all else being equal. $task1 = $this->scheduleTask(); $task2 = $this->scheduleTask(); $this->expectNextLease($task1); $this->expectNextLease($task2); } public function testNewBeforeLeased() { // Tasks not previously leased should lease before previously leased tasks. $task1 = $this->scheduleTask(); $task2 = $this->scheduleTask(); $task1->setLeaseOwner('test'); $task1->setLeaseExpires(time() - 100000); $task1->forceSaveWithoutLease(); $this->expectNextLease($task2); $this->expectNextLease($task1); } - public function testExecuteTask() { $task = $this->scheduleAndExecuteTask(); $this->assertEqual(true, $task->isArchived()); $this->assertEqual( PhabricatorWorkerArchiveTask::RESULT_SUCCESS, $task->getResult()); } public function testPermanentTaskFailure() { $task = $this->scheduleAndExecuteTask( array( 'doWork' => 'fail-permanent', )); $this->assertEqual(true, $task->isArchived()); $this->assertEqual( PhabricatorWorkerArchiveTask::RESULT_FAILURE, $task->getResult()); } public function testTemporaryTaskFailure() { $task = $this->scheduleAndExecuteTask( array( 'doWork' => 'fail-temporary', )); $this->assertEqual(false, $task->isArchived()); $this->assertEqual( true, ($task->getExecutionException() instanceof Exception)); } public function testTooManyTaskFailures() { // Expect temporary failures, then a permanent failure. $task = $this->scheduleAndExecuteTask( array( 'doWork' => 'fail-temporary', 'getMaximumRetryCount' => 3, 'getWaitBeforeRetry' => -60, )); // Temporary... $this->assertEqual(false, $task->isArchived()); $this->assertEqual( true, ($task->getExecutionException() instanceof Exception)); $this->assertEqual(1, $task->getFailureCount()); // Temporary... $task = $this->expectNextLease($task); $task = $task->executeTask(); $this->assertEqual(false, $task->isArchived()); $this->assertEqual( true, ($task->getExecutionException() instanceof Exception)); $this->assertEqual(2, $task->getFailureCount()); // Temporary... $task = $this->expectNextLease($task); $task = $task->executeTask(); $this->assertEqual(false, $task->isArchived()); $this->assertEqual( true, ($task->getExecutionException() instanceof Exception)); $this->assertEqual(3, $task->getFailureCount()); // Temporary... $task = $this->expectNextLease($task); $task = $task->executeTask(); $this->assertEqual(false, $task->isArchived()); $this->assertEqual( true, ($task->getExecutionException() instanceof Exception)); $this->assertEqual(4, $task->getFailureCount()); // Permanent. $task = $this->expectNextLease($task); $task = $task->executeTask(); $this->assertEqual(true, $task->isArchived()); $this->assertEqual( PhabricatorWorkerArchiveTask::RESULT_FAILURE, $task->getResult()); } public function testWaitBeforeRetry() { $task = $this->scheduleTask( array( 'doWork' => 'fail-temporary', 'getWaitBeforeRetry' => 1000000, )); $this->expectNextLease($task)->executeTask(); $this->expectNextLease(null); } public function testRequiredLeaseTime() { $task = $this->scheduleAndExecuteTask( array( 'getRequiredLeaseTime' => 1000000, )); $this->assertEqual(true, ($task->getLeaseExpires() - time()) > 1000); } + public function testLeasedIsOldestFirst() { + // Tasks which expired earlier should lease first, all else being equal. + + $task1 = $this->scheduleTask(); + $task2 = $this->scheduleTask(); + + $task1->setLeaseOwner('test'); + $task1->setLeaseExpires(time() - 100000); + $task1->forceSaveWithoutLease(); + + $task2->setLeaseOwner('test'); + $task2->setLeaseExpires(time() - 200000); + $task2->forceSaveWithoutLease(); + + $this->expectNextLease($task2); + $this->expectNextLease($task1); + } + private function expectNextLease($task) { $leased = id(new PhabricatorWorkerLeaseQuery()) ->setLimit(1) ->execute(); if ($task === null) { $this->assertEqual(0, count($leased)); return null; } else { $this->assertEqual(1, count($leased)); $this->assertEqual( (int)head($leased)->getID(), (int)$task->getID()); return head($leased); } } private function scheduleAndExecuteTask(array $data = array()) { $task = $this->scheduleTask($data); $task = $this->expectNextLease($task); $task = $task->executeTask(); return $task; } private function scheduleTask(array $data = array()) { return PhabricatorWorker::scheduleTask('PhabricatorTestWorker', $data); } } diff --git a/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php b/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php index e7b07ba844..507a8b6c44 100644 --- a/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php +++ b/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php @@ -1,212 +1,230 @@ ids = $ids; return $this; } public function setLimit($limit) { $this->limit = $limit; return $this; } public function execute() { if (!$this->limit) { throw new Exception("You must setLimit() when leasing tasks."); } $task_table = new PhabricatorWorkerActiveTask(); $taskdata_table = new PhabricatorWorkerTaskData(); $lease_ownership_name = $this->getLeaseOwnershipName(); $conn_w = $task_table->establishConnection('w'); // Try to satisfy the request from new, unleased tasks first. If we don't // find enough tasks, try tasks with expired leases (i.e., tasks which have // previously failed). $phases = array( self::PHASE_UNLEASED, self::PHASE_EXPIRED, ); $limit = $this->limit; $leased = 0; foreach ($phases as $phase) { // NOTE: If we issue `UPDATE ... WHERE ... ORDER BY id ASC`, the query // goes very, very slowly. The `ORDER BY` triggers this, although we get // the same apparent results without it. Without the ORDER BY, binary // read slaves complain that the query isn't repeatable. To avoid both // problems, do a SELECT and then an UPDATE. $rows = queryfx_all( $conn_w, 'SELECT id, leaseOwner FROM %T %Q %Q %Q', $task_table->getTableName(), $this->buildWhereClause($conn_w, $phase), - $this->buildOrderClause($conn_w), + $this->buildOrderClause($conn_w, $phase), $this->buildLimitClause($conn_w, $limit - $leased)); // NOTE: Sometimes, we'll race with another worker and they'll grab // this task before we do. We could reduce how often this happens by // selecting more tasks than we need, then shuffling them and trying // to lock only the number we're actually after. However, the amount // of time workers spend here should be very small relative to their // total runtime, so keep it simple for the moment. if ($rows) { queryfx( $conn_w, 'UPDATE %T task SET leaseOwner = %s, leaseExpires = UNIX_TIMESTAMP() + %d %Q', $task_table->getTableName(), $lease_ownership_name, self::DEFAULT_LEASE_DURATION, $this->buildUpdateWhereClause($conn_w, $phase, $rows)); $leased += $conn_w->getAffectedRows(); if ($leased == $limit) { break; } } } if (!$leased) { return array(); } $data = queryfx_all( $conn_w, 'SELECT task.*, taskdata.data _taskData, UNIX_TIMESTAMP() _serverTime FROM %T task LEFT JOIN %T taskdata ON taskdata.id = task.dataID WHERE leaseOwner = %s AND leaseExpires > UNIX_TIMESTAMP() %Q %Q', $task_table->getTableName(), $taskdata_table->getTableName(), $lease_ownership_name, - $this->buildOrderClause($conn_w), + $this->buildOrderClause($conn_w, $phase), $this->buildLimitClause($conn_w, $limit)); $tasks = $task_table->loadAllFromArray($data); $tasks = mpull($tasks, null, 'getID'); foreach ($data as $row) { $tasks[$row['id']]->setServerTime($row['_serverTime']); if ($row['_taskData']) { $task_data = json_decode($row['_taskData'], true); } else { $task_data = null; } $tasks[$row['id']]->setData($task_data); } return $tasks; } private function buildWhereClause(AphrontDatabaseConnection $conn_w, $phase) { $where = array(); switch ($phase) { case self::PHASE_UNLEASED: $where[] = 'leaseOwner IS NULL'; break; case self::PHASE_EXPIRED: $where[] = 'leaseExpires < UNIX_TIMESTAMP()'; break; default: throw new Exception("Unknown phase '{$phase}'!"); } if ($this->ids) { $where[] = qsprintf( $conn_w, 'id IN (%Ld)', $this->ids); } return $this->formatWhereClause($where); } private function buildUpdateWhereClause( AphrontDatabaseConnection $conn_w, $phase, array $rows) { $where = array(); // NOTE: This is basically working around the MySQL behavior that // `IN (NULL)` doesn't match NULL. switch ($phase) { case self::PHASE_UNLEASED: $where[] = qsprintf( $conn_w, 'leaseOwner IS NULL'); $where[] = qsprintf( $conn_w, 'id IN (%Ld)', ipull($rows, 'id')); break; case self::PHASE_EXPIRED: $in = array(); foreach ($rows as $row) { $in[] = qsprintf( $conn_w, '(id = %d AND leaseOwner = %s)', $row['id'], $row['leaseOwner']); } $where[] = qsprintf( $conn_w, '(%Q)', implode(' OR ', $in)); break; default: throw new Exception("Unknown phase '{$phase}'!"); } return $this->formatWhereClause($where); } - private function buildOrderClause(AphrontDatabaseConnection $conn_w) { - return qsprintf($conn_w, 'ORDER BY id ASC'); + private function buildOrderClause(AphrontDatabaseConnection $conn_w, $phase) { + switch ($phase) { + case self::PHASE_UNLEASED: + // When selecting new tasks, we want to consume them in roughly + // FIFO order, so we order by the task ID. + return qsprintf($conn_w, 'ORDER BY id ASC'); + case self::PHASE_EXPIRED: + // When selecting failed tasks, we want to consume them in roughly + // FIFO order of their failures, which is not necessarily their original + // queue order. + + // Particularly, this is important for tasks which use soft failures to + // indicate that they are waiting on other tasks to complete: we need to + // push them to the end of the queue after they fail, at least on + // average, so we don't deadlock retrying the same blocked task over + // and over again. + return qsprintf($conn_w, 'ORDER BY leaseExpires ASC'); + default: + throw new Exception(pht('Unknown phase "%s"!', $phase)); + } } private function buildLimitClause(AphrontDatabaseConnection $conn_w, $limit) { return qsprintf($conn_w, 'LIMIT %d', $limit); } private function getLeaseOwnershipName() { static $sequence = 0; $parts = array( getmypid(), time(), php_uname('n'), ++$sequence, ); return implode(':', $parts); } }