Differential D15986 Diff 38488 src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php
Changeset View
Changeset View
Standalone View
Standalone View
src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php
Show First 20 Lines • Show All 79 Lines • ▼ Show 20 Lines | public function synchronizeWorkingCopyAfterCreation() { | ||||
return $this; | return $this; | ||||
} | } | ||||
/** | /** | ||||
* @task sync | * @task sync | ||||
*/ | */ | ||||
public function synchronizeWorkingCopyAfterHostingChange() { | |||||
if (!$this->shouldEnableSynchronization()) { | |||||
return; | |||||
} | |||||
$repository = $this->getRepository(); | |||||
$repository_phid = $repository->getPHID(); | |||||
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( | |||||
$repository_phid); | |||||
$versions = mpull($versions, null, 'getDevicePHID'); | |||||
// After converting a hosted repository to observed, or vice versa, we | |||||
// need to reset version numbers because the clocks for observed and hosted | |||||
// repositories run on different units. | |||||
// We identify all the cluster leaders and reset their version to 0. | |||||
// We identify all the cluster followers and demote them. | |||||
// This allows the cluter to start over again at version 0 but keep the | |||||
// same leaders. | |||||
if ($versions) { | |||||
$max_version = (int)max(mpull($versions, 'getRepositoryVersion')); | |||||
foreach ($versions as $version) { | |||||
$device_phid = $version->getDevicePHID(); | |||||
if ($version->getRepositoryVersion() == $max_version) { | |||||
PhabricatorRepositoryWorkingCopyVersion::updateVersion( | |||||
$repository_phid, | |||||
$device_phid, | |||||
0); | |||||
} else { | |||||
PhabricatorRepositoryWorkingCopyVersion::demoteDevice( | |||||
$repository_phid, | |||||
$device_phid); | |||||
} | |||||
} | |||||
} | |||||
return $this; | |||||
} | |||||
/** | |||||
* @task sync | |||||
*/ | |||||
public function synchronizeWorkingCopyBeforeRead() { | public function synchronizeWorkingCopyBeforeRead() { | ||||
if (!$this->shouldEnableSynchronization()) { | if (!$this->shouldEnableSynchronization()) { | ||||
return; | return; | ||||
} | } | ||||
$repository = $this->getRepository(); | $repository = $this->getRepository(); | ||||
$repository_phid = $repository->getPHID(); | $repository_phid = $repository->getPHID(); | ||||
▲ Show 20 Lines • Show All 48 Lines • ▼ Show 20 Lines | public function synchronizeWorkingCopyBeforeRead() { | ||||
if ($versions) { | if ($versions) { | ||||
// This is the normal case, where we have some version information and | // This is the normal case, where we have some version information and | ||||
// can identify which nodes are leaders. If the current node is not a | // can identify which nodes are leaders. If the current node is not a | ||||
// leader, we want to fetch from a leader and then update our version. | // leader, we want to fetch from a leader and then update our version. | ||||
$max_version = (int)max(mpull($versions, 'getRepositoryVersion')); | $max_version = (int)max(mpull($versions, 'getRepositoryVersion')); | ||||
if ($max_version > $this_version) { | if ($max_version > $this_version) { | ||||
if ($repository->isHosted()) { | |||||
$fetchable = array(); | $fetchable = array(); | ||||
foreach ($versions as $version) { | foreach ($versions as $version) { | ||||
if ($version->getRepositoryVersion() == $max_version) { | if ($version->getRepositoryVersion() == $max_version) { | ||||
$fetchable[] = $version->getDevicePHID(); | $fetchable[] = $version->getDevicePHID(); | ||||
} | } | ||||
} | } | ||||
$this->synchronizeWorkingCopyFromDevices($fetchable); | $this->synchronizeWorkingCopyFromDevices($fetchable); | ||||
} else { | |||||
$this->synchornizeWorkingCopyFromRemote(); | |||||
} | |||||
PhabricatorRepositoryWorkingCopyVersion::updateVersion( | PhabricatorRepositoryWorkingCopyVersion::updateVersion( | ||||
$repository_phid, | $repository_phid, | ||||
$device_phid, | $device_phid, | ||||
$max_version); | $max_version); | ||||
} else { | } else { | ||||
$this->logLine( | $this->logLine( | ||||
pht( | pht( | ||||
▲ Show 20 Lines • Show All 156 Lines • ▼ Show 20 Lines | PhabricatorRepositoryWorkingCopyVersion::willWrite( | ||||
), | ), | ||||
$this->clusterWriteOwner); | $this->clusterWriteOwner); | ||||
$this->clusterWriteVersion = $max_version; | $this->clusterWriteVersion = $max_version; | ||||
$this->clusterWriteLock = $write_lock; | $this->clusterWriteLock = $write_lock; | ||||
} | } | ||||
public function synchronizeWorkingCopyAfterDiscovery($new_version) { | |||||
if (!$this->shouldEnableSynchronization()) { | |||||
return; | |||||
} | |||||
$repository = $this->getRepository(); | |||||
$repository_phid = $repository->getPHID(); | |||||
if ($repository->isHosted()) { | |||||
return; | |||||
} | |||||
$viewer = $this->getViewer(); | |||||
$device = AlmanacKeys::getLiveDevice(); | |||||
$device_phid = $device->getPHID(); | |||||
// NOTE: We are not holding a lock here because this method is only called | |||||
// from PhabricatorRepositoryDiscoveryEngine, which already holds a device | |||||
// lock. Even if we do race here and record an older version, the | |||||
// consequences are mild: we only do extra work to correct it later. | |||||
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( | |||||
$repository_phid); | |||||
$versions = mpull($versions, null, 'getDevicePHID'); | |||||
$this_version = idx($versions, $device_phid); | |||||
if ($this_version) { | |||||
$this_version = (int)$this_version->getRepositoryVersion(); | |||||
} else { | |||||
$this_version = -1; | |||||
} | |||||
if ($new_version > $this_version) { | |||||
PhabricatorRepositoryWorkingCopyVersion::updateVersion( | |||||
$repository_phid, | |||||
$device_phid, | |||||
$new_version); | |||||
} | |||||
} | |||||
/** | /** | ||||
* @task sync | * @task sync | ||||
*/ | */ | ||||
public function synchronizeWorkingCopyAfterWrite() { | public function synchronizeWorkingCopyAfterWrite() { | ||||
if (!$this->shouldEnableSynchronization()) { | if (!$this->shouldEnableSynchronization()) { | ||||
return; | return; | ||||
} | } | ||||
▲ Show 20 Lines • Show All 126 Lines • ▼ Show 20 Lines | if (!$service_phid) { | ||||
return false; | return false; | ||||
} | } | ||||
// TODO: For now, this is only supported for Git. | // TODO: For now, this is only supported for Git. | ||||
if (!$repository->isGit()) { | if (!$repository->isGit()) { | ||||
return false; | return false; | ||||
} | } | ||||
// TODO: It may eventually make sense to try to version and synchronize | |||||
// observed repositories (so that daemons don't do reads against out-of | |||||
// date hosts), but don't bother for now. | |||||
if (!$repository->isHosted()) { | |||||
return false; | |||||
} | |||||
$device = AlmanacKeys::getLiveDevice(); | $device = AlmanacKeys::getLiveDevice(); | ||||
if (!$device) { | if (!$device) { | ||||
return false; | return false; | ||||
} | } | ||||
return true; | return true; | ||||
} | } | ||||
/** | /** | ||||
* @task internal | * @task internal | ||||
*/ | */ | ||||
private function synchornizeWorkingCopyFromRemote() { | |||||
$repository = $this->getRepository(); | |||||
$device = AlmanacKeys::getLiveDevice(); | |||||
$local_path = $repository->getLocalPath(); | |||||
$fetch_uri = $repository->getRemoteURIEnvelope(); | |||||
if ($repository->isGit()) { | |||||
$this->requireWorkingCopy(); | |||||
$argv = array( | |||||
'fetch --prune -- %P %s', | |||||
$fetch_uri, | |||||
'+refs/*:refs/*', | |||||
); | |||||
} else { | |||||
throw new Exception(pht('Remote sync only supported for git!')); | |||||
} | |||||
$future = DiffusionCommandEngine::newCommandEngine($repository) | |||||
->setArgv($argv) | |||||
->setSudoAsDaemon(true) | |||||
->setCredentialPHID($repository->getCredentialPHID()) | |||||
->setProtocol($repository->getRemoteProtocol()) | |||||
->newFuture(); | |||||
$future->setCWD($local_path); | |||||
try { | |||||
$future->resolvex(); | |||||
} catch (Exception $ex) { | |||||
$this->logLine( | |||||
pht( | |||||
'Synchronization of "%s" from remote failed: %s', | |||||
$device->getName(), | |||||
$ex->getMessage())); | |||||
throw $ex; | |||||
} | |||||
} | |||||
/** | |||||
* @task internal | |||||
*/ | |||||
private function synchronizeWorkingCopyFromDevices(array $device_phids) { | private function synchronizeWorkingCopyFromDevices(array $device_phids) { | ||||
$repository = $this->getRepository(); | $repository = $this->getRepository(); | ||||
$service = $repository->loadAlmanacService(); | $service = $repository->loadAlmanacService(); | ||||
if (!$service) { | if (!$service) { | ||||
throw new Exception(pht('Failed to load repository cluster service.')); | throw new Exception(pht('Failed to load repository cluster service.')); | ||||
} | } | ||||
▲ Show 20 Lines • Show All 54 Lines • ▼ Show 20 Lines | $this->logLine( | ||||
'read.', | 'read.', | ||||
$device->getName(), | $device->getName(), | ||||
$binding->getDevice()->getName())); | $binding->getDevice()->getName())); | ||||
$fetch_uri = $repository->getClusterRepositoryURIFromBinding($binding); | $fetch_uri = $repository->getClusterRepositoryURIFromBinding($binding); | ||||
$local_path = $repository->getLocalPath(); | $local_path = $repository->getLocalPath(); | ||||
if ($repository->isGit()) { | if ($repository->isGit()) { | ||||
if (!Filesystem::pathExists($local_path)) { | $this->requireWorkingCopy(); | ||||
throw new Exception( | |||||
pht( | |||||
'Repository "%s" does not have a working copy on this device '. | |||||
'yet, so it can not be synchronized. Wait for the daemons to '. | |||||
'construct one or run `bin/repository update %s` on this host '. | |||||
'("%s") to build it explicitly.', | |||||
$repository->getDisplayName(), | |||||
$repository->getMonogram(), | |||||
$device->getName())); | |||||
} | |||||
$argv = array( | $argv = array( | ||||
'fetch --prune -- %s %s', | 'fetch --prune -- %s %s', | ||||
$fetch_uri, | $fetch_uri, | ||||
'+refs/*:refs/*', | '+refs/*:refs/*', | ||||
); | ); | ||||
} else { | } else { | ||||
throw new Exception(pht('Binding sync only supported for git!')); | throw new Exception(pht('Binding sync only supported for git!')); | ||||
Show All 35 Lines | /* -( Internals )---------------------------------------------------------- */ | ||||
*/ | */ | ||||
private function logText($message) { | private function logText($message) { | ||||
$log = $this->logger; | $log = $this->logger; | ||||
if ($log) { | if ($log) { | ||||
$log->writeClusterEngineLogMessage($message); | $log->writeClusterEngineLogMessage($message); | ||||
} | } | ||||
return $this; | return $this; | ||||
} | } | ||||
private function requireWorkingCopy() { | |||||
$repository = $this->getRepository(); | |||||
$local_path = $repository->getLocalPath(); | |||||
if (!Filesystem::pathExists($local_path)) { | |||||
$device = AlmanacKeys::getLiveDevice(); | |||||
throw new Exception( | |||||
pht( | |||||
'Repository "%s" does not have a working copy on this device '. | |||||
'yet, so it can not be synchronized. Wait for the daemons to '. | |||||
'construct one or run `bin/repository update %s` on this host '. | |||||
'("%s") to build it explicitly.', | |||||
$repository->getDisplayName(), | |||||
$repository->getMonogram(), | |||||
$device->getName())); | |||||
} | |||||
} | |||||
} | } |