Differential D15986 Diff 38488 src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php
Changeset View
Changeset View
Standalone View
Standalone View
src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php
| Show First 20 Lines • Show All 79 Lines • ▼ Show 20 Lines | public function synchronizeWorkingCopyAfterCreation() { | ||||
| return $this; | return $this; | ||||
| } | } | ||||
| /** | /** | ||||
| * @task sync | * @task sync | ||||
| */ | */ | ||||
| public function synchronizeWorkingCopyAfterHostingChange() { | |||||
| if (!$this->shouldEnableSynchronization()) { | |||||
| return; | |||||
| } | |||||
| $repository = $this->getRepository(); | |||||
| $repository_phid = $repository->getPHID(); | |||||
| $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( | |||||
| $repository_phid); | |||||
| $versions = mpull($versions, null, 'getDevicePHID'); | |||||
| // After converting a hosted repository to observed, or vice versa, we | |||||
| // need to reset version numbers because the clocks for observed and hosted | |||||
| // repositories run on different units. | |||||
| // We identify all the cluster leaders and reset their version to 0. | |||||
| // We identify all the cluster followers and demote them. | |||||
| // This allows the cluter to start over again at version 0 but keep the | |||||
| // same leaders. | |||||
| if ($versions) { | |||||
| $max_version = (int)max(mpull($versions, 'getRepositoryVersion')); | |||||
| foreach ($versions as $version) { | |||||
| $device_phid = $version->getDevicePHID(); | |||||
| if ($version->getRepositoryVersion() == $max_version) { | |||||
| PhabricatorRepositoryWorkingCopyVersion::updateVersion( | |||||
| $repository_phid, | |||||
| $device_phid, | |||||
| 0); | |||||
| } else { | |||||
| PhabricatorRepositoryWorkingCopyVersion::demoteDevice( | |||||
| $repository_phid, | |||||
| $device_phid); | |||||
| } | |||||
| } | |||||
| } | |||||
| return $this; | |||||
| } | |||||
| /** | |||||
| * @task sync | |||||
| */ | |||||
| public function synchronizeWorkingCopyBeforeRead() { | public function synchronizeWorkingCopyBeforeRead() { | ||||
| if (!$this->shouldEnableSynchronization()) { | if (!$this->shouldEnableSynchronization()) { | ||||
| return; | return; | ||||
| } | } | ||||
| $repository = $this->getRepository(); | $repository = $this->getRepository(); | ||||
| $repository_phid = $repository->getPHID(); | $repository_phid = $repository->getPHID(); | ||||
| ▲ Show 20 Lines • Show All 48 Lines • ▼ Show 20 Lines | public function synchronizeWorkingCopyBeforeRead() { | ||||
| if ($versions) { | if ($versions) { | ||||
| // This is the normal case, where we have some version information and | // This is the normal case, where we have some version information and | ||||
| // can identify which nodes are leaders. If the current node is not a | // can identify which nodes are leaders. If the current node is not a | ||||
| // leader, we want to fetch from a leader and then update our version. | // leader, we want to fetch from a leader and then update our version. | ||||
| $max_version = (int)max(mpull($versions, 'getRepositoryVersion')); | $max_version = (int)max(mpull($versions, 'getRepositoryVersion')); | ||||
| if ($max_version > $this_version) { | if ($max_version > $this_version) { | ||||
| if ($repository->isHosted()) { | |||||
| $fetchable = array(); | $fetchable = array(); | ||||
| foreach ($versions as $version) { | foreach ($versions as $version) { | ||||
| if ($version->getRepositoryVersion() == $max_version) { | if ($version->getRepositoryVersion() == $max_version) { | ||||
| $fetchable[] = $version->getDevicePHID(); | $fetchable[] = $version->getDevicePHID(); | ||||
| } | } | ||||
| } | } | ||||
| $this->synchronizeWorkingCopyFromDevices($fetchable); | $this->synchronizeWorkingCopyFromDevices($fetchable); | ||||
| } else { | |||||
| $this->synchornizeWorkingCopyFromRemote(); | |||||
| } | |||||
| PhabricatorRepositoryWorkingCopyVersion::updateVersion( | PhabricatorRepositoryWorkingCopyVersion::updateVersion( | ||||
| $repository_phid, | $repository_phid, | ||||
| $device_phid, | $device_phid, | ||||
| $max_version); | $max_version); | ||||
| } else { | } else { | ||||
| $this->logLine( | $this->logLine( | ||||
| pht( | pht( | ||||
| ▲ Show 20 Lines • Show All 156 Lines • ▼ Show 20 Lines | PhabricatorRepositoryWorkingCopyVersion::willWrite( | ||||
| ), | ), | ||||
| $this->clusterWriteOwner); | $this->clusterWriteOwner); | ||||
| $this->clusterWriteVersion = $max_version; | $this->clusterWriteVersion = $max_version; | ||||
| $this->clusterWriteLock = $write_lock; | $this->clusterWriteLock = $write_lock; | ||||
| } | } | ||||
| public function synchronizeWorkingCopyAfterDiscovery($new_version) { | |||||
| if (!$this->shouldEnableSynchronization()) { | |||||
| return; | |||||
| } | |||||
| $repository = $this->getRepository(); | |||||
| $repository_phid = $repository->getPHID(); | |||||
| if ($repository->isHosted()) { | |||||
| return; | |||||
| } | |||||
| $viewer = $this->getViewer(); | |||||
| $device = AlmanacKeys::getLiveDevice(); | |||||
| $device_phid = $device->getPHID(); | |||||
| // NOTE: We are not holding a lock here because this method is only called | |||||
| // from PhabricatorRepositoryDiscoveryEngine, which already holds a device | |||||
| // lock. Even if we do race here and record an older version, the | |||||
| // consequences are mild: we only do extra work to correct it later. | |||||
| $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( | |||||
| $repository_phid); | |||||
| $versions = mpull($versions, null, 'getDevicePHID'); | |||||
| $this_version = idx($versions, $device_phid); | |||||
| if ($this_version) { | |||||
| $this_version = (int)$this_version->getRepositoryVersion(); | |||||
| } else { | |||||
| $this_version = -1; | |||||
| } | |||||
| if ($new_version > $this_version) { | |||||
| PhabricatorRepositoryWorkingCopyVersion::updateVersion( | |||||
| $repository_phid, | |||||
| $device_phid, | |||||
| $new_version); | |||||
| } | |||||
| } | |||||
| /** | /** | ||||
| * @task sync | * @task sync | ||||
| */ | */ | ||||
| public function synchronizeWorkingCopyAfterWrite() { | public function synchronizeWorkingCopyAfterWrite() { | ||||
| if (!$this->shouldEnableSynchronization()) { | if (!$this->shouldEnableSynchronization()) { | ||||
| return; | return; | ||||
| } | } | ||||
| ▲ Show 20 Lines • Show All 126 Lines • ▼ Show 20 Lines | if (!$service_phid) { | ||||
| return false; | return false; | ||||
| } | } | ||||
| // TODO: For now, this is only supported for Git. | // TODO: For now, this is only supported for Git. | ||||
| if (!$repository->isGit()) { | if (!$repository->isGit()) { | ||||
| return false; | return false; | ||||
| } | } | ||||
| // TODO: It may eventually make sense to try to version and synchronize | |||||
| // observed repositories (so that daemons don't do reads against out-of | |||||
| // date hosts), but don't bother for now. | |||||
| if (!$repository->isHosted()) { | |||||
| return false; | |||||
| } | |||||
| $device = AlmanacKeys::getLiveDevice(); | $device = AlmanacKeys::getLiveDevice(); | ||||
| if (!$device) { | if (!$device) { | ||||
| return false; | return false; | ||||
| } | } | ||||
| return true; | return true; | ||||
| } | } | ||||
| /** | /** | ||||
| * @task internal | * @task internal | ||||
| */ | */ | ||||
| private function synchornizeWorkingCopyFromRemote() { | |||||
| $repository = $this->getRepository(); | |||||
| $device = AlmanacKeys::getLiveDevice(); | |||||
| $local_path = $repository->getLocalPath(); | |||||
| $fetch_uri = $repository->getRemoteURIEnvelope(); | |||||
| if ($repository->isGit()) { | |||||
| $this->requireWorkingCopy(); | |||||
| $argv = array( | |||||
| 'fetch --prune -- %P %s', | |||||
| $fetch_uri, | |||||
| '+refs/*:refs/*', | |||||
| ); | |||||
| } else { | |||||
| throw new Exception(pht('Remote sync only supported for git!')); | |||||
| } | |||||
| $future = DiffusionCommandEngine::newCommandEngine($repository) | |||||
| ->setArgv($argv) | |||||
| ->setSudoAsDaemon(true) | |||||
| ->setCredentialPHID($repository->getCredentialPHID()) | |||||
| ->setProtocol($repository->getRemoteProtocol()) | |||||
| ->newFuture(); | |||||
| $future->setCWD($local_path); | |||||
| try { | |||||
| $future->resolvex(); | |||||
| } catch (Exception $ex) { | |||||
| $this->logLine( | |||||
| pht( | |||||
| 'Synchronization of "%s" from remote failed: %s', | |||||
| $device->getName(), | |||||
| $ex->getMessage())); | |||||
| throw $ex; | |||||
| } | |||||
| } | |||||
| /** | |||||
| * @task internal | |||||
| */ | |||||
| private function synchronizeWorkingCopyFromDevices(array $device_phids) { | private function synchronizeWorkingCopyFromDevices(array $device_phids) { | ||||
| $repository = $this->getRepository(); | $repository = $this->getRepository(); | ||||
| $service = $repository->loadAlmanacService(); | $service = $repository->loadAlmanacService(); | ||||
| if (!$service) { | if (!$service) { | ||||
| throw new Exception(pht('Failed to load repository cluster service.')); | throw new Exception(pht('Failed to load repository cluster service.')); | ||||
| } | } | ||||
| ▲ Show 20 Lines • Show All 54 Lines • ▼ Show 20 Lines | $this->logLine( | ||||
| 'read.', | 'read.', | ||||
| $device->getName(), | $device->getName(), | ||||
| $binding->getDevice()->getName())); | $binding->getDevice()->getName())); | ||||
| $fetch_uri = $repository->getClusterRepositoryURIFromBinding($binding); | $fetch_uri = $repository->getClusterRepositoryURIFromBinding($binding); | ||||
| $local_path = $repository->getLocalPath(); | $local_path = $repository->getLocalPath(); | ||||
| if ($repository->isGit()) { | if ($repository->isGit()) { | ||||
| if (!Filesystem::pathExists($local_path)) { | $this->requireWorkingCopy(); | ||||
| throw new Exception( | |||||
| pht( | |||||
| 'Repository "%s" does not have a working copy on this device '. | |||||
| 'yet, so it can not be synchronized. Wait for the daemons to '. | |||||
| 'construct one or run `bin/repository update %s` on this host '. | |||||
| '("%s") to build it explicitly.', | |||||
| $repository->getDisplayName(), | |||||
| $repository->getMonogram(), | |||||
| $device->getName())); | |||||
| } | |||||
| $argv = array( | $argv = array( | ||||
| 'fetch --prune -- %s %s', | 'fetch --prune -- %s %s', | ||||
| $fetch_uri, | $fetch_uri, | ||||
| '+refs/*:refs/*', | '+refs/*:refs/*', | ||||
| ); | ); | ||||
| } else { | } else { | ||||
| throw new Exception(pht('Binding sync only supported for git!')); | throw new Exception(pht('Binding sync only supported for git!')); | ||||
| Show All 35 Lines | /* -( Internals )---------------------------------------------------------- */ | ||||
| */ | */ | ||||
| private function logText($message) { | private function logText($message) { | ||||
| $log = $this->logger; | $log = $this->logger; | ||||
| if ($log) { | if ($log) { | ||||
| $log->writeClusterEngineLogMessage($message); | $log->writeClusterEngineLogMessage($message); | ||||
| } | } | ||||
| return $this; | return $this; | ||||
| } | } | ||||
| private function requireWorkingCopy() { | |||||
| $repository = $this->getRepository(); | |||||
| $local_path = $repository->getLocalPath(); | |||||
| if (!Filesystem::pathExists($local_path)) { | |||||
| $device = AlmanacKeys::getLiveDevice(); | |||||
| throw new Exception( | |||||
| pht( | |||||
| 'Repository "%s" does not have a working copy on this device '. | |||||
| 'yet, so it can not be synchronized. Wait for the daemons to '. | |||||
| 'construct one or run `bin/repository update %s` on this host '. | |||||
| '("%s") to build it explicitly.', | |||||
| $repository->getDisplayName(), | |||||
| $repository->getMonogram(), | |||||
| $device->getName())); | |||||
| } | |||||
| } | |||||
| } | } | ||||