Changeset View
Changeset View
Standalone View
Standalone View
src/applications/repository/storage/PhabricatorRepository.php
| Show First 20 Lines • Show All 1,836 Lines • ▼ Show 20 Lines | /* -( Repository URI Management )------------------------------------------ */ | ||||
| * @param PhabricatorUser Viewing user. | * @param PhabricatorUser Viewing user. | ||||
| * @param map<string, wild> Constraints on selectable services. | * @param map<string, wild> Constraints on selectable services. | ||||
| * @return string|null URI, or `null` for local repositories. | * @return string|null URI, or `null` for local repositories. | ||||
| */ | */ | ||||
| public function getAlmanacServiceURI( | public function getAlmanacServiceURI( | ||||
| PhabricatorUser $viewer, | PhabricatorUser $viewer, | ||||
| array $options) { | array $options) { | ||||
| $refs = $this->getAlmanacServiceRefs($viewer, $options); | |||||
| if (!$refs) { | |||||
| return null; | |||||
| } | |||||
| $ref = head($refs); | |||||
| return $ref->getURI(); | |||||
| } | |||||
| public function getAlmanacServiceRefs( | |||||
| PhabricatorUser $viewer, | |||||
| array $options) { | |||||
| PhutilTypeSpec::checkMap( | PhutilTypeSpec::checkMap( | ||||
| $options, | $options, | ||||
| array( | array( | ||||
| 'neverProxy' => 'bool', | 'neverProxy' => 'bool', | ||||
| 'protocols' => 'list<string>', | 'protocols' => 'list<string>', | ||||
| 'writable' => 'optional bool', | 'writable' => 'optional bool', | ||||
| )); | )); | ||||
| $never_proxy = $options['neverProxy']; | $never_proxy = $options['neverProxy']; | ||||
| $protocols = $options['protocols']; | $protocols = $options['protocols']; | ||||
| $writable = idx($options, 'writable', false); | $writable = idx($options, 'writable', false); | ||||
| $cache_key = $this->getAlmanacServiceCacheKey(); | $cache_key = $this->getAlmanacServiceCacheKey(); | ||||
| if (!$cache_key) { | if (!$cache_key) { | ||||
| return null; | return array(); | ||||
| } | } | ||||
| $cache = PhabricatorCaches::getMutableStructureCache(); | $cache = PhabricatorCaches::getMutableStructureCache(); | ||||
| $uris = $cache->getKey($cache_key, false); | $uris = $cache->getKey($cache_key, false); | ||||
| // If we haven't built the cache yet, build it now. | // If we haven't built the cache yet, build it now. | ||||
| if ($uris === false) { | if ($uris === false) { | ||||
| $uris = $this->buildAlmanacServiceURIs(); | $uris = $this->buildAlmanacServiceURIs(); | ||||
| $cache->setKey($cache_key, $uris); | $cache->setKey($cache_key, $uris); | ||||
| } | } | ||||
| if ($uris === null) { | if ($uris === null) { | ||||
| return null; | return array(); | ||||
| } | } | ||||
| $local_device = AlmanacKeys::getDeviceID(); | $local_device = AlmanacKeys::getDeviceID(); | ||||
| if ($never_proxy && !$local_device) { | if ($never_proxy && !$local_device) { | ||||
| throw new Exception( | throw new Exception( | ||||
| pht( | pht( | ||||
| 'Unable to handle proxied service request. This device is not '. | 'Unable to handle proxied service request. This device is not '. | ||||
| 'registered, so it can not identify local services. Register '. | 'registered, so it can not identify local services. Register '. | ||||
| 'this device before sending requests here.')); | 'this device before sending requests here.')); | ||||
| } | } | ||||
| $protocol_map = array_fuse($protocols); | $protocol_map = array_fuse($protocols); | ||||
| $results = array(); | $results = array(); | ||||
| foreach ($uris as $uri) { | foreach ($uris as $uri) { | ||||
| // If we're never proxying this and it's locally satisfiable, return | // If we're never proxying this and it's locally satisfiable, return | ||||
| // `null` to tell the caller to handle it locally. If we're allowed to | // `null` to tell the caller to handle it locally. If we're allowed to | ||||
| // proxy, we skip this check and may proxy the request to ourselves. | // proxy, we skip this check and may proxy the request to ourselves. | ||||
| // (That proxied request will end up here with proxying forbidden, | // (That proxied request will end up here with proxying forbidden, | ||||
| // return `null`, and then the request will actually run.) | // return `null`, and then the request will actually run.) | ||||
| if ($local_device && $never_proxy) { | if ($local_device && $never_proxy) { | ||||
| if ($uri['device'] == $local_device) { | if ($uri['device'] == $local_device) { | ||||
| return null; | return array(); | ||||
| } | } | ||||
| } | } | ||||
| if (isset($protocol_map[$uri['protocol']])) { | if (isset($protocol_map[$uri['protocol']])) { | ||||
| $results[] = $uri; | $results[] = $uri; | ||||
| } | } | ||||
| } | } | ||||
| ▲ Show 20 Lines • Show All 44 Lines • ▼ Show 20 Lines | if (count($results) > 1) { | ||||
| 'Repository "%s" is bound to multiple active repository hosts, '. | 'Repository "%s" is bound to multiple active repository hosts, '. | ||||
| 'but this repository does not support cluster synchronization. '. | 'but this repository does not support cluster synchronization. '. | ||||
| 'Declusterize this repository or move it to a service with only '. | 'Declusterize this repository or move it to a service with only '. | ||||
| 'one host.', | 'one host.', | ||||
| $this->getDisplayName())); | $this->getDisplayName())); | ||||
| } | } | ||||
| } | } | ||||
| $refs = array(); | |||||
| foreach ($results as $result) { | |||||
| $refs[] = DiffusionServiceRef::newFromDictionary($result); | |||||
| } | |||||
| // If we require a writable device, remove URIs which aren't writable. | // If we require a writable device, remove URIs which aren't writable. | ||||
| if ($writable) { | if ($writable) { | ||||
| foreach ($results as $key => $uri) { | foreach ($refs as $key => $ref) { | ||||
| if (!$uri['writable']) { | if (!$ref->isWritable()) { | ||||
| unset($results[$key]); | unset($results[$key]); | ||||
epriestley: This variable should have been renamed. See T13611. | |||||
| } | } | ||||
| } | } | ||||
| if (!$results) { | if (!$refs) { | ||||
| throw new Exception( | throw new Exception( | ||||
| pht( | pht( | ||||
| 'This repository ("%s") is not writable with the given '. | 'This repository ("%s") is not writable with the given '. | ||||
| 'protocols (%s). The Almanac service for this repository has no '. | 'protocols (%s). The Almanac service for this repository has no '. | ||||
| 'writable bindings that support these protocols.', | 'writable bindings that support these protocols.', | ||||
| $this->getDisplayName(), | $this->getDisplayName(), | ||||
| implode(', ', $protocols))); | implode(', ', $protocols))); | ||||
| } | } | ||||
| } | } | ||||
| if ($writable) { | if ($writable) { | ||||
| $results = $this->sortWritableAlmanacServiceURIs($results); | $refs = $this->sortWritableAlmanacServiceRefs($refs); | ||||
| } else { | } else { | ||||
| shuffle($results); | $refs = $this->sortReadableAlmanacServiceRefs($refs); | ||||
| } | } | ||||
| $result = head($results); | return array_values($refs); | ||||
| return $result['uri']; | |||||
| } | } | ||||
| private function sortWritableAlmanacServiceURIs(array $results) { | private function sortReadableAlmanacServiceRefs(array $refs) { | ||||
| assert_instances_of($refs, 'DiffusionServiceRef'); | |||||
| shuffle($refs); | |||||
| return $refs; | |||||
| } | |||||
| private function sortWritableAlmanacServiceRefs(array $refs) { | |||||
| assert_instances_of($refs, 'DiffusionServiceRef'); | |||||
| // See T13109 for discussion of how this method routes requests. | // See T13109 for discussion of how this method routes requests. | ||||
| // In the absence of other rules, we'll send traffic to devices randomly. | // In the absence of other rules, we'll send traffic to devices randomly. | ||||
| // We also want to select randomly among nodes which are equally good | // We also want to select randomly among nodes which are equally good | ||||
| // candidates to receive the write, and accomplish that by shuffling the | // candidates to receive the write, and accomplish that by shuffling the | ||||
| // list up front. | // list up front. | ||||
| shuffle($results); | shuffle($refs); | ||||
| $order = array(); | $order = array(); | ||||
| // If some device is currently holding the write lock, send all requests | // If some device is currently holding the write lock, send all requests | ||||
| // to that device. We're trying to queue writes on a single device so they | // to that device. We're trying to queue writes on a single device so they | ||||
| // do not need to wait for read synchronization after earlier writes | // do not need to wait for read synchronization after earlier writes | ||||
| // complete. | // complete. | ||||
| $writer = PhabricatorRepositoryWorkingCopyVersion::loadWriter( | $writer = PhabricatorRepositoryWorkingCopyVersion::loadWriter( | ||||
| $this->getPHID()); | $this->getPHID()); | ||||
| if ($writer) { | if ($writer) { | ||||
| $device_phid = $writer->getWriteProperty('devicePHID'); | $device_phid = $writer->getWriteProperty('devicePHID'); | ||||
| foreach ($results as $key => $result) { | foreach ($refs as $key => $ref) { | ||||
| if ($result['devicePHID'] === $device_phid) { | if ($ref->getDevicePHID() === $device_phid) { | ||||
| $order[] = $key; | $order[] = $key; | ||||
| } | } | ||||
| } | } | ||||
| } | } | ||||
| // If no device is currently holding the write lock, try to send requests | // If no device is currently holding the write lock, try to send requests | ||||
| // to a device which is already up to date and will not need to synchronize | // to a device which is already up to date and will not need to synchronize | ||||
| // before it can accept the write. | // before it can accept the write. | ||||
| $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( | $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( | ||||
| $this->getPHID()); | $this->getPHID()); | ||||
| if ($versions) { | if ($versions) { | ||||
| $max_version = (int)max(mpull($versions, 'getRepositoryVersion')); | $max_version = (int)max(mpull($versions, 'getRepositoryVersion')); | ||||
| $max_devices = array(); | $max_devices = array(); | ||||
| foreach ($versions as $version) { | foreach ($versions as $version) { | ||||
| if ($version->getRepositoryVersion() == $max_version) { | if ($version->getRepositoryVersion() == $max_version) { | ||||
| $max_devices[] = $version->getDevicePHID(); | $max_devices[] = $version->getDevicePHID(); | ||||
| } | } | ||||
| } | } | ||||
| $max_devices = array_fuse($max_devices); | $max_devices = array_fuse($max_devices); | ||||
| foreach ($results as $key => $result) { | foreach ($refs as $key => $ref) { | ||||
| if (isset($max_devices[$result['devicePHID']])) { | if (isset($max_devices[$ref->getDevicePHID()])) { | ||||
| $order[] = $key; | $order[] = $key; | ||||
| } | } | ||||
| } | } | ||||
| } | } | ||||
| // Reorder the results, putting any we've selected as preferred targets for | // Reorder the results, putting any we've selected as preferred targets for | ||||
| // the write at the head of the list. | // the write at the head of the list. | ||||
| $results = array_select_keys($results, $order) + $results; | $refs = array_select_keys($refs, $order) + $refs; | ||||
| return $results; | return $refs; | ||||
| } | } | ||||
| public function supportsSynchronization() { | public function supportsSynchronization() { | ||||
| // TODO: For now, this is only supported for Git. | // TODO: For now, this is only supported for Git. | ||||
| if (!$this->isGit()) { | if (!$this->isGit()) { | ||||
| return false; | return false; | ||||
| } | } | ||||
| ▲ Show 20 Lines • Show All 752 Lines • Show Last 20 Lines | |||||
This variable should have been renamed. See T13611.