Changeset View
Changeset View
Standalone View
Standalone View
src/applications/repository/storage/PhabricatorRepository.php
Show First 20 Lines • Show All 1,926 Lines • ▼ Show 20 Lines | public function getAlmanacServiceURI( | ||||
$never_proxy, | $never_proxy, | ||||
array $protocols) { | array $protocols) { | ||||
$service = $this->loadAlmanacService(); | $service = $this->loadAlmanacService(); | ||||
if (!$service) { | if (!$service) { | ||||
return null; | return null; | ||||
} | } | ||||
$bindings = $service->getBindings(); | $bindings = $service->getActiveBindings(); | ||||
if (!$bindings) { | if (!$bindings) { | ||||
throw new Exception( | throw new Exception( | ||||
pht( | pht( | ||||
'The Almanac service for this repository is not bound to any '. | 'The Almanac service for this repository is not bound to any '. | ||||
'interfaces.')); | 'interfaces.')); | ||||
} | } | ||||
$local_device = AlmanacKeys::getDeviceID(); | $local_device = AlmanacKeys::getDeviceID(); | ||||
if ($never_proxy && !$local_device) { | if ($never_proxy && !$local_device) { | ||||
throw new Exception( | throw new Exception( | ||||
pht( | pht( | ||||
'Unable to handle proxied service request. This device is not '. | 'Unable to handle proxied service request. This device is not '. | ||||
'registered, so it can not identify local services. Register '. | 'registered, so it can not identify local services. Register '. | ||||
'this device before sending requests here.')); | 'this device before sending requests here.')); | ||||
} | } | ||||
$protocol_map = array_fuse($protocols); | $protocol_map = array_fuse($protocols); | ||||
$uris = array(); | $uris = array(); | ||||
foreach ($bindings as $binding) { | foreach ($bindings as $binding) { | ||||
if ($binding->getIsDisabled()) { | |||||
continue; | |||||
} | |||||
$iface = $binding->getInterface(); | $iface = $binding->getInterface(); | ||||
// If we're never proxying this and it's locally satisfiable, return | // If we're never proxying this and it's locally satisfiable, return | ||||
// `null` to tell the caller to handle it locally. If we're allowed to | // `null` to tell the caller to handle it locally. If we're allowed to | ||||
// proxy, we skip this check and may proxy the request to ourselves. | // proxy, we skip this check and may proxy the request to ourselves. | ||||
// (That proxied request will end up here with proxying forbidden, | // (That proxied request will end up here with proxying forbidden, | ||||
// return `null`, and then the request will actually run.) | // return `null`, and then the request will actually run.) | ||||
▲ Show 20 Lines • Show All 254 Lines • ▼ Show 20 Lines | if (!$device) { | ||||
return false; | return false; | ||||
} | } | ||||
return true; | return true; | ||||
} | } | ||||
/** | /** | ||||
* Synchronize repository version information after creating a repository. | |||||
* | |||||
* This initializes working copy versions for all currently bound devices to | |||||
* 0, so that we don't get stuck making an ambiguous choice about which | |||||
* devices are leaders when we later synchronize before a read. | |||||
* | |||||
* @task sync | |||||
*/ | |||||
public function synchronizeWorkingCopyAfterCreation() { | |||||
if (!$this->shouldEnableSynchronization()) { | |||||
return; | |||||
} | |||||
$repository_phid = $this->getPHID(); | |||||
$service = $this->loadAlmanacService(); | |||||
if (!$service) { | |||||
throw new Exception(pht('Failed to load repository cluster service.')); | |||||
} | |||||
$bindings = $service->getActiveBindings(); | |||||
foreach ($bindings as $binding) { | |||||
PhabricatorRepositoryWorkingCopyVersion::updateVersion( | |||||
$repository_phid, | |||||
$binding->getDevicePHID(), | |||||
0); | |||||
} | |||||
} | |||||
/** | |||||
* @task sync | * @task sync | ||||
*/ | */ | ||||
public function synchronizeWorkingCopyBeforeRead() { | public function synchronizeWorkingCopyBeforeRead() { | ||||
if (!$this->shouldEnableSynchronization()) { | if (!$this->shouldEnableSynchronization()) { | ||||
return; | return; | ||||
} | } | ||||
$repository_phid = $this->getPHID(); | $repository_phid = $this->getPHID(); | ||||
Show All 11 Lines | public function synchronizeWorkingCopyBeforeRead() { | ||||
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( | $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( | ||||
$repository_phid); | $repository_phid); | ||||
$versions = mpull($versions, null, 'getDevicePHID'); | $versions = mpull($versions, null, 'getDevicePHID'); | ||||
$this_version = idx($versions, $device_phid); | $this_version = idx($versions, $device_phid); | ||||
if ($this_version) { | if ($this_version) { | ||||
$this_version = (int)$this_version->getRepositoryVersion(); | $this_version = (int)$this_version->getRepositoryVersion(); | ||||
} else { | } else { | ||||
$this_version = 0; | $this_version = -1; | ||||
} | } | ||||
if ($versions) { | if ($versions) { | ||||
$max_version = (int)max(mpull($versions, 'getRepositoryVersion')); | // This is the normal case, where we have some version information and | ||||
} else { | // can identify which nodes are leaders. If the current node is not a | ||||
$max_version = 0; | // leader, we want to fetch from a leader and then update our version. | ||||
} | |||||
$max_version = (int)max(mpull($versions, 'getRepositoryVersion')); | |||||
if ($max_version > $this_version) { | if ($max_version > $this_version) { | ||||
$fetchable = array(); | $fetchable = array(); | ||||
foreach ($versions as $version) { | foreach ($versions as $version) { | ||||
if ($version->getRepositoryVersion() == $max_version) { | if ($version->getRepositoryVersion() == $max_version) { | ||||
$fetchable[] = $version->getDevicePHID(); | $fetchable[] = $version->getDevicePHID(); | ||||
} | } | ||||
} | } | ||||
$this->synchronizeWorkingCopyFromDevices($fetchable); | $this->synchronizeWorkingCopyFromDevices($fetchable); | ||||
PhabricatorRepositoryWorkingCopyVersion::updateVersion( | PhabricatorRepositoryWorkingCopyVersion::updateVersion( | ||||
$repository_phid, | $repository_phid, | ||||
$device_phid, | $device_phid, | ||||
$max_version); | $max_version); | ||||
} | } | ||||
$result_version = $max_version; | |||||
} else { | |||||
// If no version records exist yet, we need to be careful, because we | |||||
// can not tell which nodes are leaders. | |||||
// There might be several nodes with arbitrary existing data, and we have | |||||
// no way to tell which one has the "right" data. If we pick wrong, we | |||||
// might erase some or all of the data in the repository. | |||||
// Since this is dangeorus, we refuse to guess unless there is only one | |||||
// device. If we're the only device in the group, we obviously must be | |||||
// a leader. | |||||
$service = $this->loadAlmanacService(); | |||||
if (!$service) { | |||||
throw new Exception(pht('Failed to load repository cluster service.')); | |||||
} | |||||
$bindings = $service->getActiveBindings(); | |||||
$device_map = array(); | |||||
foreach ($bindings as $binding) { | |||||
$device_map[$binding->getDevicePHID()] = true; | |||||
} | |||||
if (count($device_map) > 1) { | |||||
throw new Exception( | |||||
pht( | |||||
'Repository "%s" exists on more than one device, but no device '. | |||||
'has any repository version information. Phabricator can not '. | |||||
'guess which copy of the existing data is authoritative. Remove '. | |||||
'all but one device from service to mark the remaining device '. | |||||
'as the authority.', | |||||
$this->getDisplayName())); | |||||
} | |||||
if (empty($device_map[$device->getPHID()])) { | |||||
throw new Exception( | |||||
pht( | |||||
'Repository "%s" is being synchronized on device "%s", but '. | |||||
'this device is not bound to the corresponding cluster '. | |||||
'service ("%s").', | |||||
$this->getDisplayName(), | |||||
$device->getName(), | |||||
$service->getName())); | |||||
} | |||||
// The current device is the only device in service, so it must be a | |||||
// leader. We can safely have any future nodes which come online read | |||||
// from it. | |||||
PhabricatorRepositoryWorkingCopyVersion::updateVersion( | |||||
$repository_phid, | |||||
$device_phid, | |||||
0); | |||||
$result_version = 0; | |||||
} | |||||
$read_lock->unlock(); | $read_lock->unlock(); | ||||
return $max_version; | return $result_version; | ||||
} | } | ||||
/** | /** | ||||
* @task sync | * @task sync | ||||
*/ | */ | ||||
public function synchronizeWorkingCopyBeforeWrite( | public function synchronizeWorkingCopyBeforeWrite( | ||||
PhabricatorUser $actor) { | PhabricatorUser $actor) { | ||||
▲ Show 20 Lines • Show All 100 Lines • ▼ Show 20 Lines | /* -( Cluster Synchronization )-------------------------------------------- */ | ||||
*/ | */ | ||||
private function synchronizeWorkingCopyFromDevices(array $device_phids) { | private function synchronizeWorkingCopyFromDevices(array $device_phids) { | ||||
$service = $this->loadAlmanacService(); | $service = $this->loadAlmanacService(); | ||||
if (!$service) { | if (!$service) { | ||||
throw new Exception(pht('Failed to load repository cluster service.')); | throw new Exception(pht('Failed to load repository cluster service.')); | ||||
} | } | ||||
$device_map = array_fuse($device_phids); | $device_map = array_fuse($device_phids); | ||||
$bindings = $service->getBindings(); | $bindings = $service->getActiveBindings(); | ||||
$fetchable = array(); | $fetchable = array(); | ||||
foreach ($bindings as $binding) { | foreach ($bindings as $binding) { | ||||
// We can't fetch from disabled nodes. | |||||
if ($binding->getIsDisabled()) { | |||||
continue; | |||||
} | |||||
// We can't fetch from nodes which don't have the newest version. | // We can't fetch from nodes which don't have the newest version. | ||||
$device_phid = $binding->getDevicePHID(); | $device_phid = $binding->getDevicePHID(); | ||||
if (empty($device_map[$device_phid])) { | if (empty($device_map[$device_phid])) { | ||||
continue; | continue; | ||||
} | } | ||||
// TODO: For now, only fetch over SSH. We could support fetching over | // TODO: For now, only fetch over SSH. We could support fetching over | ||||
// HTTP eventually. | // HTTP eventually. | ||||
▲ Show 20 Lines • Show All 336 Lines • Show Last 20 Lines |