Changeset View
Changeset View
Standalone View
Standalone View
src/applications/repository/storage/PhabricatorRepository.php
Show First 20 Lines • Show All 62 Lines • ▼ Show 20 Lines | final class PhabricatorRepository extends PhabricatorRepositoryDAO | ||||
protected $almanacServicePHID; | protected $almanacServicePHID; | ||||
protected $spacePHID; | protected $spacePHID; | ||||
private $commitCount = self::ATTACHABLE; | private $commitCount = self::ATTACHABLE; | ||||
private $mostRecentCommit = self::ATTACHABLE; | private $mostRecentCommit = self::ATTACHABLE; | ||||
private $projectPHIDs = self::ATTACHABLE; | private $projectPHIDs = self::ATTACHABLE; | ||||
private $uris = self::ATTACHABLE; | private $uris = self::ATTACHABLE; | ||||
private $clusterWriteLock; | |||||
private $clusterWriteVersion; | |||||
public static function initializeNewRepository(PhabricatorUser $actor) { | public static function initializeNewRepository(PhabricatorUser $actor) { | ||||
$app = id(new PhabricatorApplicationQuery()) | $app = id(new PhabricatorApplicationQuery()) | ||||
->setViewer($actor) | ->setViewer($actor) | ||||
->withClasses(array('PhabricatorDiffusionApplication')) | ->withClasses(array('PhabricatorDiffusionApplication')) | ||||
->executeOne(); | ->executeOne(); | ||||
$view_policy = $app->getPolicy(DiffusionDefaultViewCapability::CAPABILITY); | $view_policy = $app->getPolicy(DiffusionDefaultViewCapability::CAPABILITY); | ||||
▲ Show 20 Lines • Show All 2,106 Lines • ▼ Show 20 Lines | foreach ($protocol_map as $protocol => $proto_supported) { | ||||
->setIsDisabled(!$proto_supported || !$id_supported); | ->setIsDisabled(!$proto_supported || !$id_supported); | ||||
} | } | ||||
} | } | ||||
return $uris; | return $uris; | ||||
} | } | ||||
/* -( Cluster Synchronization )-------------------------------------------- */ | public function getClusterRepositoryURIFromBinding( | ||||
private function shouldEnableSynchronization() { | |||||
$service_phid = $this->getAlmanacServicePHID(); | |||||
if (!$service_phid) { | |||||
return false; | |||||
} | |||||
// TODO: For now, this is only supported for Git. | |||||
if (!$this->isGit()) { | |||||
return false; | |||||
} | |||||
// TODO: It may eventually make sense to try to version and synchronize | |||||
// observed repositories (so that daemons don't do reads against out-of | |||||
// date hosts), but don't bother for now. | |||||
if (!$this->isHosted()) { | |||||
return false; | |||||
} | |||||
$device = AlmanacKeys::getLiveDevice(); | |||||
if (!$device) { | |||||
return false; | |||||
} | |||||
return true; | |||||
} | |||||
/** | |||||
* Synchronize repository version information after creating a repository. | |||||
* | |||||
* This initializes working copy versions for all currently bound devices to | |||||
* 0, so that we don't get stuck making an ambiguous choice about which | |||||
* devices are leaders when we later synchronize before a read. | |||||
* | |||||
* @task sync | |||||
*/ | |||||
public function synchronizeWorkingCopyAfterCreation() { | |||||
if (!$this->shouldEnableSynchronization()) { | |||||
return; | |||||
} | |||||
$repository_phid = $this->getPHID(); | |||||
$service = $this->loadAlmanacService(); | |||||
if (!$service) { | |||||
throw new Exception(pht('Failed to load repository cluster service.')); | |||||
} | |||||
$bindings = $service->getActiveBindings(); | |||||
foreach ($bindings as $binding) { | |||||
PhabricatorRepositoryWorkingCopyVersion::updateVersion( | |||||
$repository_phid, | |||||
$binding->getDevicePHID(), | |||||
0); | |||||
} | |||||
} | |||||
/** | |||||
* @task sync | |||||
*/ | |||||
public function synchronizeWorkingCopyBeforeRead() { | |||||
if (!$this->shouldEnableSynchronization()) { | |||||
return; | |||||
} | |||||
$repository_phid = $this->getPHID(); | |||||
$device = AlmanacKeys::getLiveDevice(); | |||||
$device_phid = $device->getPHID(); | |||||
$read_lock = PhabricatorRepositoryWorkingCopyVersion::getReadLock( | |||||
$repository_phid, | |||||
$device_phid); | |||||
// TODO: Raise a more useful exception if we fail to grab this lock. | |||||
$read_lock->lock(phutil_units('2 minutes in seconds')); | |||||
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( | |||||
$repository_phid); | |||||
$versions = mpull($versions, null, 'getDevicePHID'); | |||||
$this_version = idx($versions, $device_phid); | |||||
if ($this_version) { | |||||
$this_version = (int)$this_version->getRepositoryVersion(); | |||||
} else { | |||||
$this_version = -1; | |||||
} | |||||
if ($versions) { | |||||
// This is the normal case, where we have some version information and | |||||
// can identify which nodes are leaders. If the current node is not a | |||||
// leader, we want to fetch from a leader and then update our version. | |||||
$max_version = (int)max(mpull($versions, 'getRepositoryVersion')); | |||||
if ($max_version > $this_version) { | |||||
$fetchable = array(); | |||||
foreach ($versions as $version) { | |||||
if ($version->getRepositoryVersion() == $max_version) { | |||||
$fetchable[] = $version->getDevicePHID(); | |||||
} | |||||
} | |||||
$this->synchronizeWorkingCopyFromDevices($fetchable); | |||||
PhabricatorRepositoryWorkingCopyVersion::updateVersion( | |||||
$repository_phid, | |||||
$device_phid, | |||||
$max_version); | |||||
} | |||||
$result_version = $max_version; | |||||
} else { | |||||
// If no version records exist yet, we need to be careful, because we | |||||
// can not tell which nodes are leaders. | |||||
// There might be several nodes with arbitrary existing data, and we have | |||||
// no way to tell which one has the "right" data. If we pick wrong, we | |||||
// might erase some or all of the data in the repository. | |||||
// Since this is dangeorus, we refuse to guess unless there is only one | |||||
// device. If we're the only device in the group, we obviously must be | |||||
// a leader. | |||||
$service = $this->loadAlmanacService(); | |||||
if (!$service) { | |||||
throw new Exception(pht('Failed to load repository cluster service.')); | |||||
} | |||||
$bindings = $service->getActiveBindings(); | |||||
$device_map = array(); | |||||
foreach ($bindings as $binding) { | |||||
$device_map[$binding->getDevicePHID()] = true; | |||||
} | |||||
if (count($device_map) > 1) { | |||||
throw new Exception( | |||||
pht( | |||||
'Repository "%s" exists on more than one device, but no device '. | |||||
'has any repository version information. Phabricator can not '. | |||||
'guess which copy of the existing data is authoritative. Remove '. | |||||
'all but one device from service to mark the remaining device '. | |||||
'as the authority.', | |||||
$this->getDisplayName())); | |||||
} | |||||
if (empty($device_map[$device->getPHID()])) { | |||||
throw new Exception( | |||||
pht( | |||||
'Repository "%s" is being synchronized on device "%s", but '. | |||||
'this device is not bound to the corresponding cluster '. | |||||
'service ("%s").', | |||||
$this->getDisplayName(), | |||||
$device->getName(), | |||||
$service->getName())); | |||||
} | |||||
// The current device is the only device in service, so it must be a | |||||
// leader. We can safely have any future nodes which come online read | |||||
// from it. | |||||
PhabricatorRepositoryWorkingCopyVersion::updateVersion( | |||||
$repository_phid, | |||||
$device_phid, | |||||
0); | |||||
$result_version = 0; | |||||
} | |||||
$read_lock->unlock(); | |||||
return $result_version; | |||||
} | |||||
/** | |||||
* @task sync | |||||
*/ | |||||
public function synchronizeWorkingCopyBeforeWrite( | |||||
PhabricatorUser $actor) { | |||||
if (!$this->shouldEnableSynchronization()) { | |||||
return; | |||||
} | |||||
$repository_phid = $this->getPHID(); | |||||
$device = AlmanacKeys::getLiveDevice(); | |||||
$device_phid = $device->getPHID(); | |||||
$write_lock = PhabricatorRepositoryWorkingCopyVersion::getWriteLock( | |||||
$repository_phid); | |||||
// TODO: Raise a more useful exception if we fail to grab this lock. | |||||
$write_lock->lock(phutil_units('2 minutes in seconds')); | |||||
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( | |||||
$repository_phid); | |||||
foreach ($versions as $version) { | |||||
if (!$version->getIsWriting()) { | |||||
continue; | |||||
} | |||||
throw new Exception( | |||||
pht( | |||||
'An previous write to this repository was interrupted; refusing '. | |||||
'new writes. This issue resolves operator intervention to resolve, '. | |||||
'see "Write Interruptions" in the "Cluster: Repositories" in the '. | |||||
'documentation for instructions.')); | |||||
} | |||||
try { | |||||
$max_version = $this->synchronizeWorkingCopyBeforeRead(); | |||||
} catch (Exception $ex) { | |||||
$write_lock->unlock(); | |||||
throw $ex; | |||||
} | |||||
PhabricatorRepositoryWorkingCopyVersion::willWrite( | |||||
$repository_phid, | |||||
$device_phid, | |||||
array( | |||||
'userPHID' => $actor->getPHID(), | |||||
'epoch' => PhabricatorTime::getNow(), | |||||
'devicePHID' => $device_phid, | |||||
)); | |||||
$this->clusterWriteVersion = $max_version; | |||||
$this->clusterWriteLock = $write_lock; | |||||
} | |||||
/** | |||||
* @task sync | |||||
*/ | |||||
public function synchronizeWorkingCopyAfterWrite() { | |||||
if (!$this->shouldEnableSynchronization()) { | |||||
return; | |||||
} | |||||
if (!$this->clusterWriteLock) { | |||||
throw new Exception( | |||||
pht( | |||||
'Trying to synchronize after write, but not holding a write '. | |||||
'lock!')); | |||||
} | |||||
$repository_phid = $this->getPHID(); | |||||
$device = AlmanacKeys::getLiveDevice(); | |||||
$device_phid = $device->getPHID(); | |||||
// NOTE: This means we're still bumping the version when pushes fail. We | |||||
// could select only un-rejected events instead to bump a little less | |||||
// often. | |||||
$new_log = id(new PhabricatorRepositoryPushEventQuery()) | |||||
->setViewer(PhabricatorUser::getOmnipotentUser()) | |||||
->withRepositoryPHIDs(array($repository_phid)) | |||||
->setLimit(1) | |||||
->executeOne(); | |||||
$old_version = $this->clusterWriteVersion; | |||||
if ($new_log) { | |||||
$new_version = $new_log->getID(); | |||||
} else { | |||||
$new_version = $old_version; | |||||
} | |||||
PhabricatorRepositoryWorkingCopyVersion::didWrite( | |||||
$repository_phid, | |||||
$device_phid, | |||||
$this->clusterWriteVersion, | |||||
$new_log->getID()); | |||||
$this->clusterWriteLock->unlock(); | |||||
$this->clusterWriteLock = null; | |||||
} | |||||
/** | |||||
* @task sync | |||||
*/ | |||||
private function synchronizeWorkingCopyFromDevices(array $device_phids) { | |||||
$service = $this->loadAlmanacService(); | |||||
if (!$service) { | |||||
throw new Exception(pht('Failed to load repository cluster service.')); | |||||
} | |||||
$device_map = array_fuse($device_phids); | |||||
$bindings = $service->getActiveBindings(); | |||||
$fetchable = array(); | |||||
foreach ($bindings as $binding) { | |||||
// We can't fetch from nodes which don't have the newest version. | |||||
$device_phid = $binding->getDevicePHID(); | |||||
if (empty($device_map[$device_phid])) { | |||||
continue; | |||||
} | |||||
// TODO: For now, only fetch over SSH. We could support fetching over | |||||
// HTTP eventually. | |||||
if ($binding->getAlmanacPropertyValue('protocol') != 'ssh') { | |||||
continue; | |||||
} | |||||
$fetchable[] = $binding; | |||||
} | |||||
if (!$fetchable) { | |||||
throw new Exception( | |||||
pht( | |||||
'Leader lost: no up-to-date nodes in repository cluster are '. | |||||
'fetchable.')); | |||||
} | |||||
$caught = null; | |||||
foreach ($fetchable as $binding) { | |||||
try { | |||||
$this->synchronizeWorkingCopyFromBinding($binding); | |||||
$caught = null; | |||||
break; | |||||
} catch (Exception $ex) { | |||||
$caught = $ex; | |||||
} | |||||
} | |||||
if ($caught) { | |||||
throw $caught; | |||||
} | |||||
} | |||||
private function synchronizeWorkingCopyFromBinding($binding) { | |||||
$fetch_uri = $this->getClusterRepositoryURIFromBinding($binding); | |||||
$local_path = $this->getLocalPath(); | |||||
if ($this->isGit()) { | |||||
if (!Filesystem::pathExists($local_path)) { | |||||
$device = AlmanacKeys::getLiveDevice(); | |||||
throw new Exception( | |||||
pht( | |||||
'Repository "%s" does not have a working copy on this device '. | |||||
'yet, so it can not be synchronized. Wait for the daemons to '. | |||||
'construct one or run `bin/repository update %s` on this host '. | |||||
'("%s") to build it explicitly.', | |||||
$this->getDisplayName(), | |||||
$this->getMonogram(), | |||||
$device->getName())); | |||||
} | |||||
$argv = array( | |||||
'fetch --prune -- %s %s', | |||||
$fetch_uri, | |||||
'+refs/*:refs/*', | |||||
); | |||||
} else { | |||||
throw new Exception(pht('Binding sync only supported for git!')); | |||||
} | |||||
$future = DiffusionCommandEngine::newCommandEngine($this) | |||||
->setArgv($argv) | |||||
->setConnectAsDevice(true) | |||||
->setSudoAsDaemon(true) | |||||
->setProtocol($fetch_uri->getProtocol()) | |||||
->newFuture(); | |||||
$future->setCWD($local_path); | |||||
$future->resolvex(); | |||||
} | |||||
private function getClusterRepositoryURIFromBinding( | |||||
AlmanacBinding $binding) { | AlmanacBinding $binding) { | ||||
$protocol = $binding->getAlmanacPropertyValue('protocol'); | $protocol = $binding->getAlmanacPropertyValue('protocol'); | ||||
if ($protocol === null) { | if ($protocol === null) { | ||||
$protocol = 'https'; | $protocol = 'https'; | ||||
} | } | ||||
$iface = $binding->getInterface(); | $iface = $binding->getInterface(); | ||||
$address = $iface->renderDisplayAddress(); | $address = $iface->renderDisplayAddress(); | ||||
Show All 31 Lines | if (!($service_type instanceof AlmanacClusterRepositoryServiceType)) { | ||||
'The Almanac service for this repository does not have the correct '. | 'The Almanac service for this repository does not have the correct '. | ||||
'service type.')); | 'service type.')); | ||||
} | } | ||||
return $service; | return $service; | ||||
} | } | ||||
/* -( Symbols )-------------------------------------------------------------*/ | /* -( Symbols )-------------------------------------------------------------*/ | ||||
public function getSymbolSources() { | public function getSymbolSources() { | ||||
return $this->getDetail('symbol-sources', array()); | return $this->getDetail('symbol-sources', array()); | ||||
} | } | ||||
public function getSymbolLanguages() { | public function getSymbolLanguages() { | ||||
return $this->getDetail('symbol-languages', array()); | return $this->getDetail('symbol-languages', array()); | ||||
▲ Show 20 Lines • Show All 223 Lines • Show Last 20 Lines |