Differential D9871 Diff 23765 src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php
Changeset View
Changeset View
Standalone View
Standalone View
src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php
Show First 20 Lines • Show All 46 Lines • ▼ Show 20 Lines | public function execute() { | ||||
$phases = array( | $phases = array( | ||||
self::PHASE_UNLEASED, | self::PHASE_UNLEASED, | ||||
self::PHASE_EXPIRED, | self::PHASE_EXPIRED, | ||||
); | ); | ||||
$limit = $this->limit; | $limit = $this->limit; | ||||
$leased = 0; | $leased = 0; | ||||
foreach ($phases as $phase) { | foreach ($phases as $phase) { | ||||
// NOTE: If we issue `UPDATE ... WHERE ... ORDER BY id ASC`, the query | // NOTE: If we issue `UPDATE ... WHERE ... ORDER BY id ASC`, the query | ||||
// goes very, very slowly. The `ORDER BY` triggers this, although we get | // goes very, very slowly. The `ORDER BY` triggers this, although we get | ||||
// the same apparent results without it. Without the ORDER BY, binary | // the same apparent results without it. Without the ORDER BY, binary | ||||
// read slaves complain that the query isn't repeatable. To avoid both | // read slaves complain that the query isn't repeatable. To avoid both | ||||
// problems, do a SELECT and then an UPDATE. | // problems, do a SELECT and then an UPDATE. | ||||
$rows = queryfx_all( | $rows = queryfx_all( | ||||
$conn_w, | $conn_w, | ||||
▲ Show 20 Lines • Show All 57 Lines • ▼ Show 20 Lines | foreach ($data as $row) { | ||||
} | } | ||||
$tasks[$row['id']]->setData($task_data); | $tasks[$row['id']]->setData($task_data); | ||||
} | } | ||||
return $tasks; | return $tasks; | ||||
} | } | ||||
private function buildWhereClause(AphrontDatabaseConnection $conn_w, $phase) { | private function buildWhereClause(AphrontDatabaseConnection $conn_w, $phase) { | ||||
$where = array(); | $where = array(); | ||||
switch ($phase) { | switch ($phase) { | ||||
case self::PHASE_UNLEASED: | case self::PHASE_UNLEASED: | ||||
$where[] = 'leaseOwner IS NULL'; | $where[] = 'leaseOwner IS NULL'; | ||||
break; | break; | ||||
case self::PHASE_EXPIRED: | case self::PHASE_EXPIRED: | ||||
$where[] = 'leaseExpires < UNIX_TIMESTAMP()'; | $where[] = 'leaseExpires < UNIX_TIMESTAMP()'; | ||||
break; | break; | ||||
default: | default: | ||||
throw new Exception("Unknown phase '{$phase}'!"); | throw new Exception("Unknown phase '{$phase}'!"); | ||||
} | } | ||||
if ($this->ids) { | if ($this->ids) { | ||||
$where[] = qsprintf( | $where[] = qsprintf($conn_w, 'id IN (%Ld)', $this->ids); | ||||
$conn_w, | |||||
'id IN (%Ld)', | |||||
$this->ids); | |||||
} | } | ||||
return $this->formatWhereClause($where); | return $this->formatWhereClause($where); | ||||
} | } | ||||
private function buildUpdateWhereClause( | private function buildUpdateWhereClause( | ||||
AphrontDatabaseConnection $conn_w, | AphrontDatabaseConnection $conn_w, | ||||
$phase, | $phase, | ||||
array $rows) { | array $rows) { | ||||
$where = array(); | $where = array(); | ||||
// NOTE: This is basically working around the MySQL behavior that | // NOTE: This is basically working around the MySQL behavior that | ||||
// `IN (NULL)` doesn't match NULL. | // `IN (NULL)` doesn't match NULL. | ||||
switch ($phase) { | switch ($phase) { | ||||
case self::PHASE_UNLEASED: | case self::PHASE_UNLEASED: | ||||
$where[] = qsprintf( | $where[] = qsprintf($conn_w, 'leaseOwner IS NULL'); | ||||
$conn_w, | $where[] = qsprintf($conn_w, 'id IN (%Ld)', ipull($rows, 'id')); | ||||
'leaseOwner IS NULL'); | |||||
$where[] = qsprintf( | |||||
$conn_w, | |||||
'id IN (%Ld)', | |||||
ipull($rows, 'id')); | |||||
break; | break; | ||||
case self::PHASE_EXPIRED: | case self::PHASE_EXPIRED: | ||||
$in = array(); | $in = array(); | ||||
foreach ($rows as $row) { | foreach ($rows as $row) { | ||||
$in[] = qsprintf( | $in[] = qsprintf( | ||||
$conn_w, | $conn_w, | ||||
'(id = %d AND leaseOwner = %s)', | '(id = %d AND leaseOwner = %s)', | ||||
$row['id'], | $row['id'], | ||||
$row['leaseOwner']); | $row['leaseOwner']); | ||||
} | } | ||||
$where[] = qsprintf( | $where[] = qsprintf($conn_w, '(%Q)', implode(' OR ', $in)); | ||||
$conn_w, | |||||
'(%Q)', | |||||
implode(' OR ', $in)); | |||||
break; | break; | ||||
default: | default: | ||||
throw new Exception("Unknown phase '{$phase}'!"); | throw new Exception("Unknown phase '{$phase}'!"); | ||||
} | } | ||||
return $this->formatWhereClause($where); | return $this->formatWhereClause($where); | ||||
} | } | ||||
private function buildOrderClause(AphrontDatabaseConnection $conn_w, $phase) { | private function buildOrderClause(AphrontDatabaseConnection $conn_w, $phase) { | ||||
switch ($phase) { | switch ($phase) { | ||||
case self::PHASE_UNLEASED: | case self::PHASE_UNLEASED: | ||||
// When selecting new tasks, we want to consume them in roughly | // When selecting new tasks, we want to consume them in order of | ||||
// FIFO order, so we order by the task ID. | // decreasing priority (and then FIFO). | ||||
return qsprintf($conn_w, 'ORDER BY id ASC'); | return qsprintf($conn_w, 'ORDER BY id ASC'); | ||||
epriestley: We never actually made this order by priority. | |||||
case self::PHASE_EXPIRED: | case self::PHASE_EXPIRED: | ||||
// When selecting failed tasks, we want to consume them in roughly | // When selecting failed tasks, we want to consume them in roughly | ||||
// FIFO order of their failures, which is not necessarily their original | // FIFO order of their failures, which is not necessarily their original | ||||
// queue order. | // queue order. | ||||
// Particularly, this is important for tasks which use soft failures to | // Particularly, this is important for tasks which use soft failures to | ||||
// indicate that they are waiting on other tasks to complete: we need to | // indicate that they are waiting on other tasks to complete: we need to | ||||
// push them to the end of the queue after they fail, at least on | // push them to the end of the queue after they fail, at least on | ||||
Show All 26 Lines |
We never actually made this order by priority.