diff --git a/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php b/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php index e7ac2b992c..f51d558de1 100644 --- a/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php +++ b/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php @@ -1,236 +1,258 @@ getRequest(); $user = $request->getUser(); $window_start = (time() - (60 * 15)); // Assume daemons spend about 250ms second in overhead per task acquiring // leases and doing other bookkeeping. This is probably an over-estimation, // but we'd rather show that utilization is too high than too low. $lease_overhead = 0.250; $completed = id(new PhabricatorWorkerArchiveTask())->loadAllWhere( 'dateModified > %d', $window_start); $failed = id(new PhabricatorWorkerActiveTask())->loadAllWhere( 'failureTime > %d', $window_start); $usage_total = 0; $usage_start = PHP_INT_MAX; $completed_info = array(); foreach ($completed as $completed_task) { $class = $completed_task->getTaskClass(); if (empty($completed_info[$class])) { $completed_info[$class] = array( 'n' => 0, 'duration' => 0, ); } $completed_info[$class]['n']++; $duration = $completed_task->getDuration(); $completed_info[$class]['duration'] += $duration; // NOTE: Duration is in microseconds, but we're just using seconds to // compute utilization. $usage_total += $lease_overhead + ($duration / 1000000); $usage_start = min($usage_start, $completed_task->getDateModified()); } $completed_info = isort($completed_info, 'n'); $rows = array(); foreach ($completed_info as $class => $info) { $rows[] = array( $class, number_format($info['n']), number_format((int)($info['duration'] / $info['n'])).' us', ); } if ($failed) { // Add the time it takes to restart the daemons. This includes a guess // about other overhead of 2X. $usage_total += PhutilDaemonOverseer::RESTART_WAIT * count($failed) * 2; foreach ($failed as $failed_task) { $usage_start = min($usage_start, $failed_task->getFailureTime()); } $rows[] = array( phutil_tag('em', array(), pht('Temporary Failures')), count($failed), null, ); } $logs = id(new PhabricatorDaemonLogQuery()) ->setViewer($user) ->withStatus(PhabricatorDaemonLogQuery::STATUS_ALIVE) ->setAllowStatusWrites(true) ->execute(); $taskmasters = 0; foreach ($logs as $log) { if ($log->getDaemon() == 'PhabricatorTaskmasterDaemon') { $taskmasters++; } } if ($taskmasters && $usage_total) { // Total number of wall-time seconds the daemons have been running since // the oldest event. For very short times round up to 15s so we don't // render any ridiculous numbers if you reload the page immediately after // restarting the daemons. $available_time = $taskmasters * max(15, (time() - $usage_start)); // Percentage of those wall-time seconds we can account for, which the // daemons spent doing work: $used_time = ($usage_total / $available_time); $rows[] = array( phutil_tag('em', array(), pht('Queue Utilization (Approximate)')), sprintf('%.1f%%', 100 * $used_time), null, ); } $completed_table = new AphrontTableView($rows); $completed_table->setNoDataString( pht('No tasks have completed in the last 15 minutes.')); $completed_table->setHeaders( array( pht('Class'), pht('Count'), pht('Avg'), )); $completed_table->setColumnClasses( array( 'wide', 'n', 'n', )); $completed_panel = new PHUIObjectBoxView(); $completed_panel->setHeaderText( pht('Recently Completed Tasks (Last 15m)')); $completed_panel->appendChild($completed_table); $daemon_table = new PhabricatorDaemonLogListView(); $daemon_table->setUser($user); $daemon_table->setDaemonLogs($logs); - $tasks = id(new PhabricatorWorkerActiveTask())->loadAllWhere( - 'leaseOwner IS NOT NULL'); - - $rows = array(); - foreach ($tasks as $task) { - $rows[] = array( - $task->getID(), - $task->getTaskClass(), - $task->getLeaseOwner(), - $task->getLeaseExpires() - time(), - $task->getPriority(), - $task->getFailureCount(), - phutil_tag( - 'a', - array( - 'href' => '/daemon/task/'.$task->getID().'/', - 'class' => 'button small grey', - ), - pht('View Task')), - ); - } - $daemon_panel = new PHUIObjectBoxView(); $daemon_panel->setHeaderText(pht('Active Daemons')); $daemon_panel->appendChild($daemon_table); - $leased_table = new AphrontTableView($rows); - $leased_table->setHeaders( - array( - pht('ID'), - pht('Class'), - pht('Owner'), - pht('Expires'), - pht('Priority'), - pht('Failures'), - '', - )); - $leased_table->setColumnClasses( - array( - 'n', - 'wide', - '', - '', - 'n', - 'n', - 'action', - )); - $leased_table->setNoDataString(pht('No tasks are leased by workers.')); - $leased_panel = new PHUIObjectBoxView(); - $leased_panel->setHeaderText(pht('Leased Tasks')); - $leased_panel->appendChild($leased_table); + $tasks = id(new PhabricatorWorkerActiveTask())->loadAllWhere( + 'leaseOwner IS NOT NULL'); + + $tasks_table = $this->renderTasksTable( + $tasks, + pht('No tasks are leased by workers.')); + + $leased_panel = id(new PHUIObjectBoxView()) + ->setHeaderText(pht('Leased Tasks')) + ->appendChild($tasks_table); $task_table = new PhabricatorWorkerActiveTask(); $queued = queryfx_all( $task_table->establishConnection('r'), 'SELECT taskClass, count(*) N FROM %T GROUP BY taskClass ORDER BY N DESC', $task_table->getTableName()); $rows = array(); foreach ($queued as $row) { $rows[] = array( $row['taskClass'], number_format($row['N']), ); } $queued_table = new AphrontTableView($rows); $queued_table->setHeaders( array( pht('Class'), pht('Count'), )); $queued_table->setColumnClasses( array( 'wide', 'n', )); $queued_table->setNoDataString(pht('Task queue is empty.')); $queued_panel = new PHUIObjectBoxView(); $queued_panel->setHeaderText(pht('Queued Tasks')); $queued_panel->appendChild($queued_table); + $upcoming = id(new PhabricatorWorkerLeaseQuery()) + ->setLimit(10) + ->setSkipLease(true) + ->execute(); + + $upcoming_panel = id(new PHUIObjectBoxView()) + ->setHeaderText(pht('Next In Queue')) + ->appendChild( + $this->renderTasksTable($upcoming, pht('Task queue is empty.'))); + $crumbs = $this->buildApplicationCrumbs(); $crumbs->addTextCrumb(pht('Console')); $nav = $this->buildSideNavView(); $nav->selectFilter('/'); $nav->appendChild( array( $crumbs, $completed_panel, $daemon_panel, $queued_panel, $leased_panel, + $upcoming_panel, )); return $this->buildApplicationPage( $nav, array( 'title' => pht('Console'), 'device' => false, )); } + private function renderTasksTable(array $tasks, $nodata) { + $rows = array(); + foreach ($tasks as $task) { + $rows[] = array( + $task->getID(), + $task->getTaskClass(), + $task->getLeaseOwner(), + $task->getLeaseExpires() + ? phutil_format_relative_time($task->getLeaseExpires() - time()) + : '-', + $task->getPriority(), + $task->getFailureCount(), + phutil_tag( + 'a', + array( + 'href' => '/daemon/task/'.$task->getID().'/', + 'class' => 'button small grey', + ), + pht('View Task')), + ); + } + + $table = new AphrontTableView($rows); + $table->setHeaders( + array( + pht('ID'), + pht('Class'), + pht('Owner'), + pht('Expires'), + pht('Priority'), + pht('Failures'), + '', + )); + $table->setColumnClasses( + array( + 'n', + 'wide', + '', + '', + 'n', + 'n', + 'action', + )); + $table->setNoDataString($nodata); + + return $table; + } + } diff --git a/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php b/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php index 47e229ec62..659a83821e 100644 --- a/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php +++ b/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php @@ -1,212 +1,212 @@ true, ); } public function testLeaseTask() { $task = $this->scheduleTask(); $this->expectNextLease($task, 'Leasing should work.'); } public function testMultipleLease() { $task = $this->scheduleTask(); $this->expectNextLease($task); $this->expectNextLease( null, 'We should not be able to lease a task multiple times.'); } public function testOldestFirst() { $task1 = $this->scheduleTask(); $task2 = $this->scheduleTask(); $this->expectNextLease( $task1, 'Older tasks should lease first, all else being equal.'); $this->expectNextLease($task2); } public function testNewBeforeLeased() { $task1 = $this->scheduleTask(); $task2 = $this->scheduleTask(); $task1->setLeaseOwner('test'); $task1->setLeaseExpires(time() - 100000); $task1->forceSaveWithoutLease(); $this->expectNextLease( $task2, 'Tasks not previously leased should lease before previously '. 'leased tasks.'); $this->expectNextLease($task1); } public function testExecuteTask() { $task = $this->scheduleAndExecuteTask(); $this->assertEqual(true, $task->isArchived()); $this->assertEqual( PhabricatorWorkerArchiveTask::RESULT_SUCCESS, $task->getResult()); } public function testPermanentTaskFailure() { $task = $this->scheduleAndExecuteTask( array( 'doWork' => 'fail-permanent', )); $this->assertEqual(true, $task->isArchived()); $this->assertEqual( PhabricatorWorkerArchiveTask::RESULT_FAILURE, $task->getResult()); } public function testTemporaryTaskFailure() { $task = $this->scheduleAndExecuteTask( array( 'doWork' => 'fail-temporary', )); $this->assertFalse($task->isArchived()); $this->assertTrue($task->getExecutionException() instanceof Exception); } public function testTooManyTaskFailures() { // Expect temporary failures, then a permanent failure. $task = $this->scheduleAndExecuteTask( array( 'doWork' => 'fail-temporary', 'getMaximumRetryCount' => 3, 'getWaitBeforeRetry' => -60, )); // Temporary... $this->assertFalse($task->isArchived()); $this->assertTrue($task->getExecutionException() instanceof Exception); $this->assertEqual(1, $task->getFailureCount()); // Temporary... $task = $this->expectNextLease($task); $task = $task->executeTask(); $this->assertFalse($task->isArchived()); $this->assertTrue($task->getExecutionException() instanceof Exception); $this->assertEqual(2, $task->getFailureCount()); // Temporary... $task = $this->expectNextLease($task); $task = $task->executeTask(); $this->assertFalse($task->isArchived()); $this->assertTrue($task->getExecutionException() instanceof Exception); $this->assertEqual(3, $task->getFailureCount()); // Temporary... $task = $this->expectNextLease($task); $task = $task->executeTask(); $this->assertFalse($task->isArchived()); $this->assertTrue($task->getExecutionException() instanceof Exception); $this->assertEqual(4, $task->getFailureCount()); // Permanent. $task = $this->expectNextLease($task); $task = $task->executeTask(); $this->assertTrue($task->isArchived()); $this->assertEqual( PhabricatorWorkerArchiveTask::RESULT_FAILURE, $task->getResult()); } public function testWaitBeforeRetry() { $task = $this->scheduleTask( array( 'doWork' => 'fail-temporary', 'getWaitBeforeRetry' => 1000000, )); $this->expectNextLease($task)->executeTask(); $this->expectNextLease(null); } public function testRequiredLeaseTime() { $task = $this->scheduleAndExecuteTask( array( 'getRequiredLeaseTime' => 1000000, )); $this->assertTrue(($task->getLeaseExpires() - time()) > 1000); } public function testLeasedIsOldestFirst() { $task1 = $this->scheduleTask(); $task2 = $this->scheduleTask(); $task1->setLeaseOwner('test'); $task1->setLeaseExpires(time() - 100000); $task1->forceSaveWithoutLease(); $task2->setLeaseOwner('test'); $task2->setLeaseExpires(time() - 200000); $task2->forceSaveWithoutLease(); $this->expectNextLease( $task2, 'Tasks which expired earlier should lease first, all else being equal.'); $this->expectNextLease($task1); } public function testLeasedIsHighestPriority() { - $task1 = $this->scheduleTask(array(), 2); + $task1 = $this->scheduleTask(array(), 1); $task2 = $this->scheduleTask(array(), 1); - $task3 = $this->scheduleTask(array(), 1); + $task3 = $this->scheduleTask(array(), 2); $this->expectNextLease( - $task1, + $task3, 'Tasks with a higher priority should be scheduled first.'); $this->expectNextLease( - $task2, + $task1, 'Tasks with the same priority should be FIFO.'); - $this->expectNextLease($task3); + $this->expectNextLease($task2); } private function expectNextLease($task, $message = null) { $leased = id(new PhabricatorWorkerLeaseQuery()) ->setLimit(1) ->execute(); if ($task === null) { $this->assertEqual(0, count($leased), $message); return null; } else { $this->assertEqual(1, count($leased), $message); $this->assertEqual( (int)head($leased)->getID(), (int)$task->getID(), $message); return head($leased); } } private function scheduleAndExecuteTask( array $data = array(), $priority = null) { $task = $this->scheduleTask($data, $priority); $task = $this->expectNextLease($task); $task = $task->executeTask(); return $task; } private function scheduleTask(array $data = array(), $priority = null) { return PhabricatorWorker::scheduleTask( 'PhabricatorTestWorker', $data, $priority); } } diff --git a/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php b/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php index bc2dd22cc8..fff5e85fa2 100644 --- a/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php +++ b/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php @@ -1,220 +1,253 @@ skipLease = $skip; + return $this; + } + public function withIDs(array $ids) { $this->ids = $ids; return $this; } public function setLimit($limit) { $this->limit = $limit; return $this; } public function execute() { if (!$this->limit) { throw new Exception('You must setLimit() when leasing tasks.'); } $task_table = new PhabricatorWorkerActiveTask(); $taskdata_table = new PhabricatorWorkerTaskData(); $lease_ownership_name = $this->getLeaseOwnershipName(); $conn_w = $task_table->establishConnection('w'); // Try to satisfy the request from new, unleased tasks first. If we don't // find enough tasks, try tasks with expired leases (i.e., tasks which have // previously failed). $phases = array( self::PHASE_UNLEASED, self::PHASE_EXPIRED, ); $limit = $this->limit; $leased = 0; + $task_ids = array(); foreach ($phases as $phase) { // NOTE: If we issue `UPDATE ... WHERE ... ORDER BY id ASC`, the query // goes very, very slowly. The `ORDER BY` triggers this, although we get // the same apparent results without it. Without the ORDER BY, binary // read slaves complain that the query isn't repeatable. To avoid both // problems, do a SELECT and then an UPDATE. $rows = queryfx_all( $conn_w, 'SELECT id, leaseOwner FROM %T %Q %Q %Q', $task_table->getTableName(), $this->buildWhereClause($conn_w, $phase), $this->buildOrderClause($conn_w, $phase), $this->buildLimitClause($conn_w, $limit - $leased)); // NOTE: Sometimes, we'll race with another worker and they'll grab // this task before we do. We could reduce how often this happens by // selecting more tasks than we need, then shuffling them and trying // to lock only the number we're actually after. However, the amount // of time workers spend here should be very small relative to their // total runtime, so keep it simple for the moment. if ($rows) { - queryfx( - $conn_w, - 'UPDATE %T task - SET leaseOwner = %s, leaseExpires = UNIX_TIMESTAMP() + %d - %Q', - $task_table->getTableName(), - $lease_ownership_name, - self::getDefaultLeaseDuration(), - $this->buildUpdateWhereClause($conn_w, $phase, $rows)); - - $leased += $conn_w->getAffectedRows(); + if ($this->skipLease) { + $leased += count($rows); + $task_ids += array_fuse(ipull($rows, 'id')); + } else { + queryfx( + $conn_w, + 'UPDATE %T task + SET leaseOwner = %s, leaseExpires = UNIX_TIMESTAMP() + %d + %Q', + $task_table->getTableName(), + $lease_ownership_name, + self::getDefaultLeaseDuration(), + $this->buildUpdateWhereClause($conn_w, $phase, $rows)); + + $leased += $conn_w->getAffectedRows(); + } + if ($leased == $limit) { break; } } } if (!$leased) { return array(); } + if ($this->skipLease) { + $selection_condition = qsprintf( + $conn_w, + 'task.id IN (%Ld)', + $task_ids); + } else { + $selection_condition = qsprintf( + $conn_w, + 'task.leaseOwner = %s AND leaseExpires > UNIX_TIMESTAMP()', + $lease_ownership_name); + } + $data = queryfx_all( $conn_w, 'SELECT task.*, taskdata.data _taskData, UNIX_TIMESTAMP() _serverTime FROM %T task LEFT JOIN %T taskdata ON taskdata.id = task.dataID - WHERE leaseOwner = %s AND leaseExpires > UNIX_TIMESTAMP() - %Q %Q', + WHERE %Q %Q %Q', $task_table->getTableName(), $taskdata_table->getTableName(), - $lease_ownership_name, + $selection_condition, $this->buildOrderClause($conn_w, $phase), $this->buildLimitClause($conn_w, $limit)); $tasks = $task_table->loadAllFromArray($data); $tasks = mpull($tasks, null, 'getID'); foreach ($data as $row) { $tasks[$row['id']]->setServerTime($row['_serverTime']); if ($row['_taskData']) { $task_data = json_decode($row['_taskData'], true); } else { $task_data = null; } $tasks[$row['id']]->setData($task_data); } return $tasks; } private function buildWhereClause(AphrontDatabaseConnection $conn_w, $phase) { $where = array(); switch ($phase) { case self::PHASE_UNLEASED: $where[] = 'leaseOwner IS NULL'; break; case self::PHASE_EXPIRED: $where[] = 'leaseExpires < UNIX_TIMESTAMP()'; break; default: throw new Exception("Unknown phase '{$phase}'!"); } if ($this->ids) { $where[] = qsprintf($conn_w, 'id IN (%Ld)', $this->ids); } return $this->formatWhereClause($where); } private function buildUpdateWhereClause( AphrontDatabaseConnection $conn_w, $phase, array $rows) { $where = array(); // NOTE: This is basically working around the MySQL behavior that // `IN (NULL)` doesn't match NULL. switch ($phase) { case self::PHASE_UNLEASED: $where[] = qsprintf($conn_w, 'leaseOwner IS NULL'); $where[] = qsprintf($conn_w, 'id IN (%Ld)', ipull($rows, 'id')); break; case self::PHASE_EXPIRED: $in = array(); foreach ($rows as $row) { $in[] = qsprintf( $conn_w, '(id = %d AND leaseOwner = %s)', $row['id'], $row['leaseOwner']); } $where[] = qsprintf($conn_w, '(%Q)', implode(' OR ', $in)); break; default: throw new Exception("Unknown phase '{$phase}'!"); } return $this->formatWhereClause($where); } private function buildOrderClause(AphrontDatabaseConnection $conn_w, $phase) { switch ($phase) { case self::PHASE_UNLEASED: // When selecting new tasks, we want to consume them in order of // decreasing priority (and then FIFO). - return qsprintf($conn_w, 'ORDER BY id ASC'); + return qsprintf($conn_w, 'ORDER BY priority DESC, id ASC'); case self::PHASE_EXPIRED: // When selecting failed tasks, we want to consume them in roughly // FIFO order of their failures, which is not necessarily their original // queue order. // Particularly, this is important for tasks which use soft failures to // indicate that they are waiting on other tasks to complete: we need to // push them to the end of the queue after they fail, at least on // average, so we don't deadlock retrying the same blocked task over // and over again. return qsprintf($conn_w, 'ORDER BY leaseExpires ASC'); default: throw new Exception(pht('Unknown phase "%s"!', $phase)); } } private function buildLimitClause(AphrontDatabaseConnection $conn_w, $limit) { return qsprintf($conn_w, 'LIMIT %d', $limit); } private function getLeaseOwnershipName() { static $sequence = 0; $parts = array( getmypid(), time(), php_uname('n'), ++$sequence, ); return implode(':', $parts); } }