Differential D15682 Diff 37800 src/applications/repository/daemon/PhabricatorRepositoryPullLocalDaemon.php
Changeset View
Changeset View
Standalone View
Standalone View
src/applications/repository/daemon/PhabricatorRepositoryPullLocalDaemon.php
Show First 20 Lines • Show All 68 Lines • ▼ Show 20 Lines | protected function run() { | ||||
$min_sleep = 15; | $min_sleep = 15; | ||||
$max_futures = 4; | $max_futures = 4; | ||||
$futures = array(); | $futures = array(); | ||||
$queue = array(); | $queue = array(); | ||||
while (!$this->shouldExit()) { | while (!$this->shouldExit()) { | ||||
PhabricatorCaches::destroyRequestCache(); | PhabricatorCaches::destroyRequestCache(); | ||||
$pullable = $this->loadPullableRepositories($include, $exclude); | $device = AlmanacKeys::getLiveDevice(); | ||||
$pullable = $this->loadPullableRepositories($include, $exclude, $device); | |||||
// If any repositories have the NEEDS_UPDATE flag set, pull them | // If any repositories have the NEEDS_UPDATE flag set, pull them | ||||
// as soon as possible. | // as soon as possible. | ||||
$need_update_messages = $this->loadRepositoryUpdateMessages(true); | $need_update_messages = $this->loadRepositoryUpdateMessages(true); | ||||
foreach ($need_update_messages as $message) { | foreach ($need_update_messages as $message) { | ||||
$repo = idx($pullable, $message->getRepositoryID()); | $repo = idx($pullable, $message->getRepositoryID()); | ||||
if (!$repo) { | if (!$repo) { | ||||
continue; | continue; | ||||
▲ Show 20 Lines • Show All 206 Lines • ▼ Show 20 Lines | private function loadRepositoryUpdateMessages($consume = false) { | ||||
return $messages; | return $messages; | ||||
} | } | ||||
/** | /** | ||||
* @task pull | * @task pull | ||||
*/ | */ | ||||
private function loadPullableRepositories(array $include, array $exclude) { | private function loadPullableRepositories( | ||||
array $include, | |||||
array $exclude, | |||||
AlmanacDevice $device = null) { | |||||
$query = id(new PhabricatorRepositoryQuery()) | $query = id(new PhabricatorRepositoryQuery()) | ||||
->setViewer($this->getViewer()); | ->setViewer($this->getViewer()); | ||||
if ($include) { | if ($include) { | ||||
$query->withIdentifiers($include); | $query->withIdentifiers($include); | ||||
} | } | ||||
$repositories = $query->execute(); | $repositories = $query->execute(); | ||||
Show All 34 Lines | private function loadPullableRepositories( | ||||
} | } | ||||
foreach ($repositories as $key => $repository) { | foreach ($repositories as $key => $repository) { | ||||
if (!$repository->isTracked()) { | if (!$repository->isTracked()) { | ||||
unset($repositories[$key]); | unset($repositories[$key]); | ||||
} | } | ||||
} | } | ||||
$service_phids = array(); | |||||
foreach ($repositories as $key => $repository) { | |||||
$service_phid = $repository->getAlmanacServicePHID(); | |||||
// If the repository is bound to a service but this host is not a | |||||
// recognized device, or vice versa, don't pull the repository. | |||||
$is_cluster_repo = (bool)$service_phid; | |||||
$is_cluster_device = (bool)$device; | |||||
if ($is_cluster_repo != $is_cluster_device) { | |||||
if ($is_cluster_device) { | |||||
$this->log( | |||||
pht( | |||||
'Repository "%s" is not a cluster repository, but the current '. | |||||
'host is a cluster device ("%s"), so the repository will not '. | |||||
'be updated on this host.', | |||||
$repository->getDisplayName(), | |||||
$device->getName())); | |||||
} else { | |||||
$this->log( | |||||
pht( | |||||
'Repository "%s" is a cluster repository, but the current '. | |||||
'host is not a cluster device (it has no device ID), so the '. | |||||
'repository will not be updated on this host.', | |||||
$repository->getDisplayName())); | |||||
} | |||||
unset($repositories[$key]); | |||||
continue; | |||||
} | |||||
if ($service_phid) { | |||||
$service_phids[] = $service_phid; | |||||
} | |||||
} | |||||
if ($device) { | |||||
$device_phid = $device->getPHID(); | |||||
if ($service_phids) { | |||||
// We could include `withDevicePHIDs()` here to pull a smaller result | |||||
// set, but we can provide more helpful diagnostic messages below if | |||||
// we fetch a little more data. | |||||
$services = id(new AlmanacServiceQuery()) | |||||
->setViewer($this->getViewer()) | |||||
->withPHIDs($service_phids) | |||||
->needBindings(true) | |||||
->execute(); | |||||
$services = mpull($services, null, 'getPHID'); | |||||
} else { | |||||
$services = array(); | |||||
} | |||||
foreach ($repositories as $key => $repository) { | |||||
$service_phid = $repository->getAlmanacServicePHID(); | |||||
$service = idx($services, $service_phid); | |||||
if (!$service) { | |||||
$this->log( | |||||
pht( | |||||
'Repository "%s" is on cluster service "%s", but that service '. | |||||
'could not be loaded, so the repository will not be updated '. | |||||
'on this host.', | |||||
$repository->getDisplayName(), | |||||
$service_phid)); | |||||
unset($repositories[$key]); | |||||
continue; | |||||
} | |||||
$bindings = $service->getBindings(); | |||||
$bindings = mpull($bindings, null, 'getDevicePHID'); | |||||
$binding = idx($bindings, $device_phid); | |||||
if (!$binding) { | |||||
$this->log( | |||||
pht( | |||||
'Repository "%s" is on cluster service "%s", but that service '. | |||||
'is not bound to this device ("%s"), so the repository will '. | |||||
'not be updated on this host.', | |||||
$repository->getDisplayName(), | |||||
$service->getName(), | |||||
$device->getName())); | |||||
unset($repositories[$key]); | |||||
continue; | |||||
} | |||||
if ($binding->getIsDisabled()) { | |||||
$this->log( | |||||
pht( | |||||
'Repository "%s" is on cluster service "%s", but the binding '. | |||||
'between that service and this device ("%s") is disabled, so '. | |||||
'the not be updated on this host.', | |||||
$repository->getDisplayName(), | |||||
$service->getName(), | |||||
$device->getName())); | |||||
unset($repositories[$key]); | |||||
continue; | |||||
} | |||||
// We have a valid service that is actively bound to the current host | |||||
// device, so we're good to go. | |||||
} | |||||
} | |||||
// Shuffle the repositories, then re-key the array since shuffle() | // Shuffle the repositories, then re-key the array since shuffle() | ||||
// discards keys. This is mostly for startup, we'll use soft priorities | // discards keys. This is mostly for startup, we'll use soft priorities | ||||
// later. | // later. | ||||
shuffle($repositories); | shuffle($repositories); | ||||
$repositories = mpull($repositories, null, 'getID'); | $repositories = mpull($repositories, null, 'getID'); | ||||
return $repositories; | return $repositories; | ||||
} | } | ||||
▲ Show 20 Lines • Show All 87 Lines • Show Last 20 Lines |