Differential D15792 Diff 38039 src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php
Changeset View
Changeset View
Standalone View
Standalone View
src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php
| <?php | <?php | ||||
| /** | /** | ||||
| * Manages repository synchronization for cluster repositories. | * Manages repository synchronization for cluster repositories. | ||||
| * | * | ||||
| * @task config Configuring Synchronization | * @task config Configuring Synchronization | ||||
| * @task sync Cluster Synchronization | * @task sync Cluster Synchronization | ||||
| * @task internal Internals | * @task internal Internals | ||||
| */ | */ | ||||
| final class DiffusionRepositoryClusterEngine extends Phobject { | final class DiffusionRepositoryClusterEngine extends Phobject { | ||||
| private $repository; | private $repository; | ||||
| private $viewer; | private $viewer; | ||||
| private $logger; | |||||
| private $clusterWriteLock; | private $clusterWriteLock; | ||||
| private $clusterWriteVersion; | private $clusterWriteVersion; | ||||
| private $logger; | private $clusterWriteOwner; | ||||
| /* -( Configuring Synchronization )---------------------------------------- */ | /* -( Configuring Synchronization )---------------------------------------- */ | ||||
| public function setRepository(PhabricatorRepository $repository) { | public function setRepository(PhabricatorRepository $repository) { | ||||
| $this->repository = $repository; | $this->repository = $repository; | ||||
| return $this; | return $this; | ||||
| ▲ Show 20 Lines • Show All 217 Lines • ▼ Show 20 Lines | public function synchronizeWorkingCopyBeforeWrite() { | ||||
| $repository = $this->getRepository(); | $repository = $this->getRepository(); | ||||
| $viewer = $this->getViewer(); | $viewer = $this->getViewer(); | ||||
| $repository_phid = $repository->getPHID(); | $repository_phid = $repository->getPHID(); | ||||
| $device = AlmanacKeys::getLiveDevice(); | $device = AlmanacKeys::getLiveDevice(); | ||||
| $device_phid = $device->getPHID(); | $device_phid = $device->getPHID(); | ||||
| $table = new PhabricatorRepositoryWorkingCopyVersion(); | |||||
| $locked_connection = $table->establishConnection('w'); | |||||
| $write_lock = PhabricatorRepositoryWorkingCopyVersion::getWriteLock( | $write_lock = PhabricatorRepositoryWorkingCopyVersion::getWriteLock( | ||||
| $repository_phid); | $repository_phid); | ||||
| $write_lock->useSpecificConnection($locked_connection); | |||||
| $lock_wait = phutil_units('2 minutes in seconds'); | $lock_wait = phutil_units('2 minutes in seconds'); | ||||
| $this->logLine( | $this->logLine( | ||||
| pht( | pht( | ||||
| 'Waiting up to %s second(s) for a cluster write lock...', | 'Waiting up to %s second(s) for a cluster write lock...', | ||||
| new PhutilNumber($lock_wait))); | new PhutilNumber($lock_wait))); | ||||
| try { | try { | ||||
| Show All 24 Lines | public function synchronizeWorkingCopyBeforeWrite() { | ||||
| foreach ($versions as $version) { | foreach ($versions as $version) { | ||||
| if (!$version->getIsWriting()) { | if (!$version->getIsWriting()) { | ||||
| continue; | continue; | ||||
| } | } | ||||
| throw new Exception( | throw new Exception( | ||||
| pht( | pht( | ||||
| 'An previous write to this repository was interrupted; refusing '. | 'An previous write to this repository was interrupted; refusing '. | ||||
| 'new writes. This issue resolves operator intervention to resolve, '. | 'new writes. This issue requires operator intervention to resolve, '. | ||||
| 'see "Write Interruptions" in the "Cluster: Repositories" in the '. | 'see "Write Interruptions" in the "Cluster: Repositories" in the '. | ||||
| 'documentation for instructions.')); | 'documentation for instructions.')); | ||||
| } | } | ||||
| try { | try { | ||||
| $max_version = $this->synchronizeWorkingCopyBeforeRead(); | $max_version = $this->synchronizeWorkingCopyBeforeRead(); | ||||
| } catch (Exception $ex) { | } catch (Exception $ex) { | ||||
| $write_lock->unlock(); | $write_lock->unlock(); | ||||
| throw $ex; | throw $ex; | ||||
| } | } | ||||
| $pid = getmypid(); | |||||
| $hash = Filesystem::readRandomCharacters(12); | |||||
| $this->clusterWriteOwner = "{$pid}.{$hash}"; | |||||
| PhabricatorRepositoryWorkingCopyVersion::willWrite( | PhabricatorRepositoryWorkingCopyVersion::willWrite( | ||||
| $locked_connection, | |||||
| $repository_phid, | $repository_phid, | ||||
| $device_phid, | $device_phid, | ||||
| array( | array( | ||||
| 'userPHID' => $viewer->getPHID(), | 'userPHID' => $viewer->getPHID(), | ||||
| 'epoch' => PhabricatorTime::getNow(), | 'epoch' => PhabricatorTime::getNow(), | ||||
| 'devicePHID' => $device_phid, | 'devicePHID' => $device_phid, | ||||
| )); | ), | ||||
| $this->clusterWriteOwner); | |||||
| $this->clusterWriteVersion = $max_version; | $this->clusterWriteVersion = $max_version; | ||||
| $this->clusterWriteLock = $write_lock; | $this->clusterWriteLock = $write_lock; | ||||
| } | } | ||||
| /** | /** | ||||
| * @task sync | * @task sync | ||||
| Show All 11 Lines | public function synchronizeWorkingCopyAfterWrite() { | ||||
| } | } | ||||
| $repository = $this->getRepository(); | $repository = $this->getRepository(); | ||||
| $repository_phid = $repository->getPHID(); | $repository_phid = $repository->getPHID(); | ||||
| $device = AlmanacKeys::getLiveDevice(); | $device = AlmanacKeys::getLiveDevice(); | ||||
| $device_phid = $device->getPHID(); | $device_phid = $device->getPHID(); | ||||
| // It is possible that we've lost the global lock while receiving the push. | |||||
| // For example, the master database may have been restarted between the | |||||
| // time we acquired the global lock and now, when the push has finished. | |||||
| // We wrote a durable lock while we were holding the the global lock, | |||||
| // essentially upgrading our lock. We can still safely release this upgraded | |||||
| // lock even if we're no longer holding the global lock. | |||||
| // If we fail to release the lock, the repository will be frozen until | |||||
| // an operator can figure out what happened, so we try pretty hard to | |||||
| // reconnect to the database and release the lock. | |||||
| $now = PhabricatorTime::getNow(); | |||||
| $duration = phutil_units('5 minutes in seconds'); | |||||
| $try_until = $now + $duration; | |||||
| $did_release = false; | |||||
| $already_failed = false; | |||||
| while (PhabricatorTime::getNow() <= $try_until) { | |||||
| try { | |||||
| // NOTE: This means we're still bumping the version when pushes fail. We | // NOTE: This means we're still bumping the version when pushes fail. We | ||||
| // could select only un-rejected events instead to bump a little less | // could select only un-rejected events instead to bump a little less | ||||
| // often. | // often. | ||||
| $new_log = id(new PhabricatorRepositoryPushEventQuery()) | $new_log = id(new PhabricatorRepositoryPushEventQuery()) | ||||
| ->setViewer(PhabricatorUser::getOmnipotentUser()) | ->setViewer(PhabricatorUser::getOmnipotentUser()) | ||||
| ->withRepositoryPHIDs(array($repository_phid)) | ->withRepositoryPHIDs(array($repository_phid)) | ||||
| ->setLimit(1) | ->setLimit(1) | ||||
| ->executeOne(); | ->executeOne(); | ||||
| $old_version = $this->clusterWriteVersion; | $old_version = $this->clusterWriteVersion; | ||||
| if ($new_log) { | if ($new_log) { | ||||
| $new_version = $new_log->getID(); | $new_version = $new_log->getID(); | ||||
| } else { | } else { | ||||
| $new_version = $old_version; | $new_version = $old_version; | ||||
| } | } | ||||
| PhabricatorRepositoryWorkingCopyVersion::didWrite( | PhabricatorRepositoryWorkingCopyVersion::didWrite( | ||||
| $repository_phid, | $repository_phid, | ||||
| $device_phid, | $device_phid, | ||||
| $this->clusterWriteVersion, | $this->clusterWriteVersion, | ||||
| $new_log->getID()); | $new_log->getID(), | ||||
| $this->clusterWriteOwner); | |||||
| $did_release = true; | |||||
| break; | |||||
| } catch (AphrontConnectionQueryException $ex) { | |||||
| $connection_exception = $ex; | |||||
| } catch (AphrontConnectionLostQueryException $ex) { | |||||
| $connection_exception = $ex; | |||||
| } | |||||
| if (!$already_failed) { | |||||
| $already_failed = true; | |||||
| $this->logLine( | |||||
| pht('CRITICAL. Failed to release cluster write lock!')); | |||||
| $this->logLine( | |||||
| pht( | |||||
| 'The connection to the master database was lost while receiving '. | |||||
| 'the write.')); | |||||
| $this->logLine( | |||||
| pht( | |||||
| 'This process will spend %s more second(s) attempting to '. | |||||
| 'recover, then give up.', | |||||
| new PhutilNumber($duration))); | |||||
| } | |||||
| sleep(1); | |||||
| } | |||||
| if ($did_release) { | |||||
| if ($already_failed) { | |||||
| $this->logLine( | |||||
| pht('RECOVERED. Link to master database was restored.')); | |||||
| } | |||||
| $this->logLine(pht('Released cluster write lock.')); | |||||
| } else { | |||||
| throw new Exception( | |||||
| pht( | |||||
| 'Failed to reconnect to master database and release held write '. | |||||
| 'lock ("%s") on device "%s" for repository "%s" after trying '. | |||||
| 'for %s seconds(s). This repository will be frozen.', | |||||
| $this->clusterWriteOwner, | |||||
| $device->getName(), | |||||
| $this->getDisplayName(), | |||||
| new PhutilNumber($duration))); | |||||
| } | |||||
| // We can continue even if we've lost this lock, everything is still | |||||
| // consistent. | |||||
| try { | |||||
| $this->clusterWriteLock->unlock(); | $this->clusterWriteLock->unlock(); | ||||
| } catch (Exception $ex) { | |||||
| // Ignore. | |||||
| } | |||||
| $this->clusterWriteLock = null; | $this->clusterWriteLock = null; | ||||
| $this->clusterWriteOwner = null; | |||||
| } | } | ||||
| /* -( Internals )---------------------------------------------------------- */ | /* -( Internals )---------------------------------------------------------- */ | ||||
| /** | /** | ||||
| * @task internal | * @task internal | ||||
| ▲ Show 20 Lines • Show All 166 Lines • Show Last 20 Lines | |||||