Changeset View
Changeset View
Standalone View
Standalone View
src/applications/fact/daemon/PhabricatorFactDaemon.php
<?php | <?php | ||||
final class PhabricatorFactDaemon extends PhabricatorDaemon { | final class PhabricatorFactDaemon extends PhabricatorDaemon { | ||||
private $engines; | private $engines; | ||||
const RAW_FACT_BUFFER_LIMIT = 128; | |||||
protected function run() { | protected function run() { | ||||
$this->setEngines(PhabricatorFactEngine::loadAllEngines()); | $this->setEngines(PhabricatorFactEngine::loadAllEngines()); | ||||
while (!$this->shouldExit()) { | while (!$this->shouldExit()) { | ||||
PhabricatorCaches::destroyRequestCache(); | PhabricatorCaches::destroyRequestCache(); | ||||
$iterators = $this->getAllApplicationIterators(); | $iterators = $this->getAllApplicationIterators(); | ||||
foreach ($iterators as $iterator_name => $iterator) { | foreach ($iterators as $iterator_name => $iterator) { | ||||
$this->processIteratorWithCursor($iterator_name, $iterator); | $this->processIteratorWithCursor($iterator_name, $iterator); | ||||
} | } | ||||
$this->processAggregates(); | |||||
$this->log(pht('Zzz...')); | $this->log(pht('Zzz...')); | ||||
$this->sleep(60 * 5); | $this->sleep(60 * 5); | ||||
} | } | ||||
} | } | ||||
public static function getAllApplicationIterators() { | public static function getAllApplicationIterators() { | ||||
$apps = PhabricatorApplication::getAllInstalledApplications(); | $apps = PhabricatorApplication::getAllInstalledApplications(); | ||||
Show All 40 Lines | public function setEngines(array $engines) { | ||||
$this->engines = $engines; | $this->engines = $engines; | ||||
return $this; | return $this; | ||||
} | } | ||||
public function processIterator($iterator) { | public function processIterator($iterator) { | ||||
$result = null; | $result = null; | ||||
$raw_facts = array(); | $datapoints = array(); | ||||
foreach ($iterator as $key => $object) { | foreach ($iterator as $key => $object) { | ||||
$phid = $object->getPHID(); | $phid = $object->getPHID(); | ||||
$this->log(pht('Processing %s...', $phid)); | $this->log(pht('Processing %s...', $phid)); | ||||
$raw_facts[$phid] = $this->computeRawFacts($object); | $datapoints[$phid] = $this->newDatapoints($object); | ||||
if (count($raw_facts) > self::RAW_FACT_BUFFER_LIMIT) { | if (count($datapoints) > 1024) { | ||||
$this->updateRawFacts($raw_facts); | $this->updateDatapoints($datapoints); | ||||
$raw_facts = array(); | $datapoints = array(); | ||||
} | } | ||||
$result = $key; | $result = $key; | ||||
} | } | ||||
if ($raw_facts) { | if ($datapoints) { | ||||
$this->updateRawFacts($raw_facts); | $this->updateDatapoints($datapoints); | ||||
$raw_facts = array(); | $datapoints = array(); | ||||
} | } | ||||
return $result; | return $result; | ||||
} | } | ||||
public function processAggregates() { | private function newDatapoints(PhabricatorLiskDAO $object) { | ||||
$this->log(pht('Processing aggregates.')); | |||||
$facts = $this->computeAggregateFacts(); | |||||
$this->updateAggregateFacts($facts); | |||||
} | |||||
private function computeAggregateFacts() { | |||||
$facts = array(); | |||||
foreach ($this->engines as $engine) { | |||||
if (!$engine->shouldComputeAggregateFacts()) { | |||||
continue; | |||||
} | |||||
$facts[] = $engine->computeAggregateFacts(); | |||||
} | |||||
return array_mergev($facts); | |||||
} | |||||
private function computeRawFacts(PhabricatorLiskDAO $object) { | |||||
$facts = array(); | $facts = array(); | ||||
foreach ($this->engines as $engine) { | foreach ($this->engines as $engine) { | ||||
if (!$engine->shouldComputeRawFactsForObject($object)) { | if (!$engine->supportsDatapointsForObject($object)) { | ||||
continue; | continue; | ||||
} | } | ||||
$facts[] = $engine->computeRawFactsForObject($object); | $facts[] = $engine->newDatapointsForObject($object); | ||||
} | } | ||||
return array_mergev($facts); | return array_mergev($facts); | ||||
} | } | ||||
private function updateRawFacts(array $map) { | private function updateDatapoints(array $map) { | ||||
foreach ($map as $phid => $facts) { | foreach ($map as $phid => $facts) { | ||||
assert_instances_of($facts, 'PhabricatorFactRaw'); | assert_instances_of($facts, 'PhabricatorFactIntDatapoint'); | ||||
} | } | ||||
$phids = array_keys($map); | $phids = array_keys($map); | ||||
if (!$phids) { | if (!$phids) { | ||||
return; | return; | ||||
} | } | ||||
$table = new PhabricatorFactRaw(); | |||||
$fact_keys = array(); | |||||
$objects = array(); | |||||
foreach ($map as $phid => $facts) { | |||||
foreach ($facts as $fact) { | |||||
$fact_keys[$fact->getKey()] = true; | |||||
$object_phid = $fact->getObjectPHID(); | |||||
$objects[$object_phid] = $object_phid; | |||||
$dimension_phid = $fact->getDimensionPHID(); | |||||
if ($dimension_phid !== null) { | |||||
$objects[$dimension_phid] = $dimension_phid; | |||||
} | |||||
} | |||||
} | |||||
$key_map = id(new PhabricatorFactKeyDimension()) | |||||
->newDimensionMap(array_keys($fact_keys)); | |||||
$object_map = id(new PhabricatorFactObjectDimension()) | |||||
->newDimensionMap(array_keys($objects)); | |||||
$table = new PhabricatorFactIntDatapoint(); | |||||
$conn = $table->establishConnection('w'); | $conn = $table->establishConnection('w'); | ||||
$table_name = $table->getTableName(); | $table_name = $table->getTableName(); | ||||
$sql = array(); | $sql = array(); | ||||
foreach ($map as $phid => $facts) { | foreach ($map as $phid => $facts) { | ||||
foreach ($facts as $fact) { | foreach ($facts as $fact) { | ||||
$key_id = $key_map[$fact->getKey()]; | |||||
$object_id = $object_map[$fact->getObjectPHID()]; | |||||
$dimension_phid = $fact->getDimensionPHID(); | |||||
if ($dimension_phid !== null) { | |||||
$dimension_id = $object_map[$dimension_phid]; | |||||
} else { | |||||
$dimension_id = null; | |||||
} | |||||
$sql[] = qsprintf( | $sql[] = qsprintf( | ||||
$conn, | $conn, | ||||
'(%s, %s, %s, %d, %d, %d)', | '(%d, %d, %nd, %d, %d)', | ||||
$fact->getFactType(), | $key_id, | ||||
$fact->getObjectPHID(), | $object_id, | ||||
$fact->getObjectA(), | $dimension_id, | ||||
$fact->getValueX(), | $fact->getValue(), | ||||
$fact->getValueY(), | |||||
$fact->getEpoch()); | $fact->getEpoch()); | ||||
} | } | ||||
} | } | ||||
$rebuilt_ids = array_select_keys($object_map, $phids); | |||||
$table->openTransaction(); | $table->openTransaction(); | ||||
queryfx( | queryfx( | ||||
$conn, | $conn, | ||||
'DELETE FROM %T WHERE objectPHID IN (%Ls)', | 'DELETE FROM %T WHERE objectID IN (%Ld)', | ||||
$table_name, | $table_name, | ||||
$phids); | $rebuilt_ids); | ||||
if ($sql) { | if ($sql) { | ||||
foreach (array_chunk($sql, 256) as $chunk) { | foreach (PhabricatorLiskDAO::chunkSQL($sql) as $chunk) { | ||||
queryfx( | queryfx( | ||||
$conn, | $conn, | ||||
'INSERT INTO %T | 'INSERT INTO %T | ||||
(factType, objectPHID, objectA, valueX, valueY, epoch) | (keyID, objectID, dimensionID, value, epoch) | ||||
VALUES %Q', | VALUES %Q', | ||||
$table_name, | $table_name, | ||||
implode(', ', $chunk)); | $chunk); | ||||
} | } | ||||
} | } | ||||
$table->saveTransaction(); | $table->saveTransaction(); | ||||
} | } | ||||
private function updateAggregateFacts(array $facts) { | |||||
if (!$facts) { | |||||
return; | |||||
} | |||||
$table = new PhabricatorFactAggregate(); | |||||
$conn = $table->establishConnection('w'); | |||||
$table_name = $table->getTableName(); | |||||
$sql = array(); | |||||
foreach ($facts as $fact) { | |||||
$sql[] = qsprintf( | |||||
$conn, | |||||
'(%s, %s, %d)', | |||||
$fact->getFactType(), | |||||
$fact->getObjectPHID(), | |||||
$fact->getValueX()); | |||||
} | |||||
foreach (array_chunk($sql, 256) as $chunk) { | |||||
queryfx( | |||||
$conn, | |||||
'INSERT INTO %T (factType, objectPHID, valueX) VALUES %Q | |||||
ON DUPLICATE KEY UPDATE valueX = VALUES(valueX)', | |||||
$table_name, | |||||
implode(', ', $chunk)); | |||||
} | |||||
} | |||||
} | } |