Differential D20078 Diff 47948 src/applications/repository/daemon/PhabricatorRepositoryPullLocalDaemon.php
Changeset View
Changeset View
Standalone View
Standalone View
src/applications/repository/daemon/PhabricatorRepositoryPullLocalDaemon.php
Show First 20 Lines • Show All 67 Lines • ▼ Show 20 Lines | protected function run() { | ||||
$retry_after = array(); | $retry_after = array(); | ||||
$min_sleep = 15; | $min_sleep = 15; | ||||
$max_sleep = phutil_units('5 minutes in seconds'); | $max_sleep = phutil_units('5 minutes in seconds'); | ||||
$max_futures = 4; | $max_futures = 4; | ||||
$futures = array(); | $futures = array(); | ||||
$queue = array(); | $queue = array(); | ||||
$sync_wait = phutil_units('2 minutes in seconds'); | |||||
$last_sync = array(); | |||||
while (!$this->shouldExit()) { | while (!$this->shouldExit()) { | ||||
PhabricatorCaches::destroyRequestCache(); | PhabricatorCaches::destroyRequestCache(); | ||||
$device = AlmanacKeys::getLiveDevice(); | $device = AlmanacKeys::getLiveDevice(); | ||||
$pullable = $this->loadPullableRepositories($include, $exclude, $device); | $pullable = $this->loadPullableRepositories($include, $exclude, $device); | ||||
// If any repositories have the NEEDS_UPDATE flag set, pull them | // If any repositories have the NEEDS_UPDATE flag set, pull them | ||||
// as soon as possible. | // as soon as possible. | ||||
$need_update_messages = $this->loadRepositoryUpdateMessages(true); | $need_update_messages = $this->loadRepositoryUpdateMessages(true); | ||||
foreach ($need_update_messages as $message) { | foreach ($need_update_messages as $message) { | ||||
$repo = idx($pullable, $message->getRepositoryID()); | $repo = idx($pullable, $message->getRepositoryID()); | ||||
if (!$repo) { | if (!$repo) { | ||||
continue; | continue; | ||||
} | } | ||||
$this->log( | $this->log( | ||||
pht( | pht( | ||||
'Got an update message for repository "%s"!', | 'Got an update message for repository "%s"!', | ||||
$repo->getMonogram())); | $repo->getMonogram())); | ||||
$retry_after[$message->getRepositoryID()] = time(); | $retry_after[$message->getRepositoryID()] = time(); | ||||
} | } | ||||
if ($device) { | |||||
$unsynchronized = $this->loadUnsynchronizedRepositories($device); | |||||
$now = PhabricatorTime::getNow(); | |||||
foreach ($unsynchronized as $repository) { | |||||
$id = $repository->getID(); | |||||
$this->log( | |||||
pht( | |||||
'Cluster repository ("%s") is out of sync on this node ("%s").', | |||||
$repository->getDisplayName(), | |||||
$device->getName())); | |||||
// Don't let out-of-sync conditions trigger updates too frequently, | |||||
// since we don't want to get trapped in a death spiral if sync is | |||||
// failing. | |||||
$sync_at = idx($last_sync, $id, 0); | |||||
$wait_duration = ($now - $sync_at); | |||||
if ($wait_duration < $sync_wait) { | |||||
$this->log( | |||||
pht( | |||||
'Skipping forced out-of-sync update because the last update '. | |||||
'was too recent (%s seconds ago).', | |||||
$wait_duration)); | |||||
continue; | |||||
} | |||||
$last_sync[$id] = $now; | |||||
$retry_after[$id] = $now; | |||||
} | |||||
} | |||||
// If any repositories were deleted, remove them from the retry timer map | // If any repositories were deleted, remove them from the retry timer map | ||||
// so we don't end up with a retry timer that never gets updated and | // so we don't end up with a retry timer that never gets updated and | ||||
// causes us to sleep for the minimum amount of time. | // causes us to sleep for the minimum amount of time. | ||||
$retry_after = array_select_keys( | $retry_after = array_select_keys( | ||||
$retry_after, | $retry_after, | ||||
array_keys($pullable)); | array_keys($pullable)); | ||||
// Figure out which repositories we need to queue for an update. | // Figure out which repositories we need to queue for an update. | ||||
▲ Show 20 Lines • Show All 409 Lines • ▼ Show 20 Lines | while (($sleep_until - time()) > 0) { | ||||
$this->log(pht('Awakened from sleep by pending updates!')); | $this->log(pht('Awakened from sleep by pending updates!')); | ||||
break; | break; | ||||
} | } | ||||
} | } | ||||
return false; | return false; | ||||
} | } | ||||
private function loadUnsynchronizedRepositories(AlmanacDevice $device) { | |||||
$viewer = $this->getViewer(); | |||||
$table = new PhabricatorRepositoryWorkingCopyVersion(); | |||||
$conn = $table->establishConnection('r'); | |||||
$our_versions = queryfx_all( | |||||
$conn, | |||||
'SELECT repositoryPHID, repositoryVersion FROM %R WHERE devicePHID = %s', | |||||
$table, | |||||
$device->getPHID()); | |||||
$our_versions = ipull($our_versions, 'repositoryVersion', 'repositoryPHID'); | |||||
$max_versions = queryfx_all( | |||||
$conn, | |||||
'SELECT repositoryPHID, MAX(repositoryVersion) maxVersion FROM %R | |||||
GROUP BY repositoryPHID', | |||||
$table); | |||||
$max_versions = ipull($max_versions, 'maxVersion', 'repositoryPHID'); | |||||
$unsynchronized_phids = array(); | |||||
foreach ($max_versions as $repository_phid => $max_version) { | |||||
$our_version = idx($our_versions, $repository_phid); | |||||
if (($our_version === null) || ($our_version < $max_version)) { | |||||
$unsynchronized_phids[] = $repository_phid; | |||||
} | |||||
} | |||||
if (!$unsynchronized_phids) { | |||||
return array(); | |||||
} | |||||
return id(new PhabricatorRepositoryQuery()) | |||||
->setViewer($viewer) | |||||
->withPHIDs($unsynchronized_phids) | |||||
->execute(); | |||||
} | |||||
} | } |