diff --git a/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php b/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php index 2d815917a4..9df37c7fb5 100644 --- a/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php +++ b/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php @@ -1,888 +1,888 @@ repository = $repository; return $this; } public function getRepository() { return $this->repository; } public function setViewer(PhabricatorUser $viewer) { $this->viewer = $viewer; return $this; } public function getViewer() { return $this->viewer; } public function setLog(DiffusionRepositoryClusterEngineLogInterface $log) { $this->logger = $log; return $this; } /* -( Cluster Synchronization )-------------------------------------------- */ /** * Synchronize repository version information after creating a repository. * * This initializes working copy versions for all currently bound devices to * 0, so that we don't get stuck making an ambiguous choice about which * devices are leaders when we later synchronize before a read. * * @task sync */ public function synchronizeWorkingCopyAfterCreation() { if (!$this->shouldEnableSynchronization(false)) { return; } $repository = $this->getRepository(); $repository_phid = $repository->getPHID(); $service = $repository->loadAlmanacService(); if (!$service) { throw new Exception(pht('Failed to load repository cluster service.')); } $bindings = $service->getActiveBindings(); foreach ($bindings as $binding) { PhabricatorRepositoryWorkingCopyVersion::updateVersion( $repository_phid, $binding->getDevicePHID(), 0); } return $this; } /** * @task sync */ public function synchronizeWorkingCopyAfterHostingChange() { if (!$this->shouldEnableSynchronization(false)) { return; } $repository = $this->getRepository(); $repository_phid = $repository->getPHID(); $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( $repository_phid); $versions = mpull($versions, null, 'getDevicePHID'); // After converting a hosted repository to observed, or vice versa, we // need to reset version numbers because the clocks for observed and hosted // repositories run on different units. // We identify all the cluster leaders and reset their version to 0. // We identify all the cluster followers and demote them. // This allows the cluster to start over again at version 0 but keep the // same leaders. if ($versions) { $max_version = (int)max(mpull($versions, 'getRepositoryVersion')); foreach ($versions as $version) { $device_phid = $version->getDevicePHID(); if ($version->getRepositoryVersion() == $max_version) { PhabricatorRepositoryWorkingCopyVersion::updateVersion( $repository_phid, $device_phid, 0); } else { PhabricatorRepositoryWorkingCopyVersion::demoteDevice( $repository_phid, $device_phid); } } } return $this; } /** * @task sync */ public function synchronizeWorkingCopyBeforeRead() { if (!$this->shouldEnableSynchronization(true)) { return; } $repository = $this->getRepository(); $repository_phid = $repository->getPHID(); $device = AlmanacKeys::getLiveDevice(); $device_phid = $device->getPHID(); $read_lock = PhabricatorRepositoryWorkingCopyVersion::getReadLock( $repository_phid, $device_phid); $lock_wait = phutil_units('2 minutes in seconds'); $this->logLine( pht( 'Acquiring read lock for repository "%s" on device "%s"...', $repository->getDisplayName(), $device->getName())); try { $start = PhabricatorTime::getNow(); $read_lock->lock($lock_wait); $waited = (PhabricatorTime::getNow() - $start); if ($waited) { $this->logLine( pht( 'Acquired read lock after %s second(s).', new PhutilNumber($waited))); } else { $this->logLine( pht( 'Acquired read lock immediately.')); } } catch (PhutilLockException $ex) { throw new PhutilProxyException( pht( 'Failed to acquire read lock after waiting %s second(s). You '. 'may be able to retry later. (%s)', new PhutilNumber($lock_wait), $ex->getHint()), $ex); } $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( $repository_phid); $versions = mpull($versions, null, 'getDevicePHID'); $this_version = idx($versions, $device_phid); if ($this_version) { $this_version = (int)$this_version->getRepositoryVersion(); } else { $this_version = null; } if ($versions) { // This is the normal case, where we have some version information and // can identify which nodes are leaders. If the current node is not a // leader, we want to fetch from a leader and then update our version. $max_version = (int)max(mpull($versions, 'getRepositoryVersion')); if (($this_version === null) || ($max_version > $this_version)) { if ($repository->isHosted()) { $fetchable = array(); foreach ($versions as $version) { if ($version->getRepositoryVersion() == $max_version) { $fetchable[] = $version->getDevicePHID(); } } $this->synchronizeWorkingCopyFromDevices( $fetchable, $this_version, $max_version); } else { $this->synchronizeWorkingCopyFromRemote(); } PhabricatorRepositoryWorkingCopyVersion::updateVersion( $repository_phid, $device_phid, $max_version); } else { $this->logLine( pht( 'Device "%s" is already a cluster leader and does not need '. 'to be synchronized.', $device->getName())); } $result_version = $max_version; } else { // If no version records exist yet, we need to be careful, because we // can not tell which nodes are leaders. // There might be several nodes with arbitrary existing data, and we have // no way to tell which one has the "right" data. If we pick wrong, we // might erase some or all of the data in the repository. // Since this is dangerous, we refuse to guess unless there is only one // device. If we're the only device in the group, we obviously must be // a leader. $service = $repository->loadAlmanacService(); if (!$service) { throw new Exception(pht('Failed to load repository cluster service.')); } $bindings = $service->getActiveBindings(); $device_map = array(); foreach ($bindings as $binding) { $device_map[$binding->getDevicePHID()] = true; } if (count($device_map) > 1) { throw new Exception( pht( 'Repository "%s" exists on more than one device, but no device '. 'has any repository version information. Phabricator can not '. 'guess which copy of the existing data is authoritative. Promote '. 'a device or see "Ambiguous Leaders" in the documentation.', $repository->getDisplayName())); } if (empty($device_map[$device->getPHID()])) { throw new Exception( pht( 'Repository "%s" is being synchronized on device "%s", but '. 'this device is not bound to the corresponding cluster '. 'service ("%s").', $repository->getDisplayName(), $device->getName(), $service->getName())); } // The current device is the only device in service, so it must be a // leader. We can safely have any future nodes which come online read // from it. PhabricatorRepositoryWorkingCopyVersion::updateVersion( $repository_phid, $device_phid, 0); $result_version = 0; } $read_lock->unlock(); return $result_version; } /** * @task sync */ public function synchronizeWorkingCopyBeforeWrite() { if (!$this->shouldEnableSynchronization(true)) { return; } $repository = $this->getRepository(); $viewer = $this->getViewer(); $repository_phid = $repository->getPHID(); $device = AlmanacKeys::getLiveDevice(); $device_phid = $device->getPHID(); $table = new PhabricatorRepositoryWorkingCopyVersion(); $locked_connection = $table->establishConnection('w'); $write_lock = PhabricatorRepositoryWorkingCopyVersion::getWriteLock( $repository_phid); - $write_lock->useSpecificConnection($locked_connection); + $write_lock->setExternalConnection($locked_connection); $this->logLine( pht( 'Acquiring write lock for repository "%s"...', $repository->getDisplayName())); // See T13590. On the HTTP pathway, it's possible for us to hit the script // time limit while holding the durable write lock if a user makes a big // push. Remove the time limit before we acquire the durable lock. set_time_limit(0); $lock_wait = phutil_units('2 minutes in seconds'); try { $write_wait_start = microtime(true); $start = PhabricatorTime::getNow(); $step_wait = 1; while (true) { try { $write_lock->lock((int)floor($step_wait)); $write_wait_end = microtime(true); break; } catch (PhutilLockException $ex) { $waited = (PhabricatorTime::getNow() - $start); if ($waited > $lock_wait) { throw $ex; } $this->logActiveWriter($viewer, $repository); } // Wait a little longer before the next message we print. $step_wait = $step_wait + 0.5; $step_wait = min($step_wait, 3); } $waited = (PhabricatorTime::getNow() - $start); if ($waited) { $this->logLine( pht( 'Acquired write lock after %s second(s).', new PhutilNumber($waited))); } else { $this->logLine( pht( 'Acquired write lock immediately.')); } } catch (PhutilLockException $ex) { throw new PhutilProxyException( pht( 'Failed to acquire write lock after waiting %s second(s). You '. 'may be able to retry later. (%s)', new PhutilNumber($lock_wait), $ex->getHint()), $ex); } $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( $repository_phid); foreach ($versions as $version) { if (!$version->getIsWriting()) { continue; } throw new Exception( pht( 'An previous write to this repository was interrupted; refusing '. 'new writes. This issue requires operator intervention to resolve, '. 'see "Write Interruptions" in the "Cluster: Repositories" in the '. 'documentation for instructions.')); } $read_wait_start = microtime(true); try { $max_version = $this->synchronizeWorkingCopyBeforeRead(); } catch (Exception $ex) { $write_lock->unlock(); throw $ex; } $read_wait_end = microtime(true); $pid = getmypid(); $hash = Filesystem::readRandomCharacters(12); $this->clusterWriteOwner = "{$pid}.{$hash}"; PhabricatorRepositoryWorkingCopyVersion::willWrite( $locked_connection, $repository_phid, $device_phid, array( 'userPHID' => $viewer->getPHID(), 'epoch' => PhabricatorTime::getNow(), 'devicePHID' => $device_phid, ), $this->clusterWriteOwner); $this->clusterWriteVersion = $max_version; $this->clusterWriteLock = $write_lock; $write_wait = ($write_wait_end - $write_wait_start); $read_wait = ($read_wait_end - $read_wait_start); $log = $this->logger; if ($log) { $log->writeClusterEngineLogProperty('writeWait', $write_wait); $log->writeClusterEngineLogProperty('readWait', $read_wait); } } public function synchronizeWorkingCopyAfterDiscovery($new_version) { if (!$this->shouldEnableSynchronization(true)) { return; } $repository = $this->getRepository(); $repository_phid = $repository->getPHID(); if ($repository->isHosted()) { return; } $viewer = $this->getViewer(); $device = AlmanacKeys::getLiveDevice(); $device_phid = $device->getPHID(); // NOTE: We are not holding a lock here because this method is only called // from PhabricatorRepositoryDiscoveryEngine, which already holds a device // lock. Even if we do race here and record an older version, the // consequences are mild: we only do extra work to correct it later. $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( $repository_phid); $versions = mpull($versions, null, 'getDevicePHID'); $this_version = idx($versions, $device_phid); if ($this_version) { $this_version = (int)$this_version->getRepositoryVersion(); } else { $this_version = null; } if (($this_version === null) || ($new_version > $this_version)) { PhabricatorRepositoryWorkingCopyVersion::updateVersion( $repository_phid, $device_phid, $new_version); } } /** * @task sync */ public function synchronizeWorkingCopyAfterWrite() { if (!$this->shouldEnableSynchronization(true)) { return; } if (!$this->clusterWriteLock) { throw new Exception( pht( 'Trying to synchronize after write, but not holding a write '. 'lock!')); } $repository = $this->getRepository(); $repository_phid = $repository->getPHID(); $device = AlmanacKeys::getLiveDevice(); $device_phid = $device->getPHID(); // It is possible that we've lost the global lock while receiving the push. // For example, the master database may have been restarted between the // time we acquired the global lock and now, when the push has finished. // We wrote a durable lock while we were holding the the global lock, // essentially upgrading our lock. We can still safely release this upgraded // lock even if we're no longer holding the global lock. // If we fail to release the lock, the repository will be frozen until // an operator can figure out what happened, so we try pretty hard to // reconnect to the database and release the lock. $now = PhabricatorTime::getNow(); $duration = phutil_units('5 minutes in seconds'); $try_until = $now + $duration; $did_release = false; $already_failed = false; while (PhabricatorTime::getNow() <= $try_until) { try { // NOTE: This means we're still bumping the version when pushes fail. We // could select only un-rejected events instead to bump a little less // often. $new_log = id(new PhabricatorRepositoryPushEventQuery()) ->setViewer(PhabricatorUser::getOmnipotentUser()) ->withRepositoryPHIDs(array($repository_phid)) ->setLimit(1) ->executeOne(); $old_version = $this->clusterWriteVersion; if ($new_log) { $new_version = $new_log->getID(); } else { $new_version = $old_version; } PhabricatorRepositoryWorkingCopyVersion::didWrite( $repository_phid, $device_phid, $this->clusterWriteVersion, $new_version, $this->clusterWriteOwner); $did_release = true; break; } catch (AphrontConnectionQueryException $ex) { $connection_exception = $ex; } catch (AphrontConnectionLostQueryException $ex) { $connection_exception = $ex; } if (!$already_failed) { $already_failed = true; $this->logLine( pht('CRITICAL. Failed to release cluster write lock!')); $this->logLine( pht( 'The connection to the master database was lost while receiving '. 'the write.')); $this->logLine( pht( 'This process will spend %s more second(s) attempting to '. 'recover, then give up.', new PhutilNumber($duration))); } sleep(1); } if ($did_release) { if ($already_failed) { $this->logLine( pht('RECOVERED. Link to master database was restored.')); } $this->logLine(pht('Released cluster write lock.')); } else { throw new Exception( pht( 'Failed to reconnect to master database and release held write '. 'lock ("%s") on device "%s" for repository "%s" after trying '. 'for %s seconds(s). This repository will be frozen.', $this->clusterWriteOwner, $device->getName(), $this->getDisplayName(), new PhutilNumber($duration))); } // We can continue even if we've lost this lock, everything is still // consistent. try { $this->clusterWriteLock->unlock(); } catch (Exception $ex) { // Ignore. } $this->clusterWriteLock = null; $this->clusterWriteOwner = null; } /* -( Internals )---------------------------------------------------------- */ /** * @task internal */ private function shouldEnableSynchronization($require_device) { $repository = $this->getRepository(); $service_phid = $repository->getAlmanacServicePHID(); if (!$service_phid) { return false; } if (!$repository->supportsSynchronization()) { return false; } if ($require_device) { $device = AlmanacKeys::getLiveDevice(); if (!$device) { return false; } } return true; } /** * @task internal */ private function synchronizeWorkingCopyFromRemote() { $repository = $this->getRepository(); $device = AlmanacKeys::getLiveDevice(); $local_path = $repository->getLocalPath(); $fetch_uri = $repository->getRemoteURIEnvelope(); if ($repository->isGit()) { $this->requireWorkingCopy(); $argv = array( 'fetch --prune -- %P %s', $fetch_uri, '+refs/*:refs/*', ); } else { throw new Exception(pht('Remote sync only supported for git!')); } $future = DiffusionCommandEngine::newCommandEngine($repository) ->setArgv($argv) ->setSudoAsDaemon(true) ->setCredentialPHID($repository->getCredentialPHID()) ->setURI($repository->getRemoteURIObject()) ->newFuture(); $future->setCWD($local_path); try { $future->resolvex(); } catch (Exception $ex) { $this->logLine( pht( 'Synchronization of "%s" from remote failed: %s', $device->getName(), $ex->getMessage())); throw $ex; } } /** * @task internal */ private function synchronizeWorkingCopyFromDevices( array $device_phids, $local_version, $remote_version) { $repository = $this->getRepository(); $service = $repository->loadAlmanacService(); if (!$service) { throw new Exception(pht('Failed to load repository cluster service.')); } $device_map = array_fuse($device_phids); $bindings = $service->getActiveBindings(); $fetchable = array(); foreach ($bindings as $binding) { // We can't fetch from nodes which don't have the newest version. $device_phid = $binding->getDevicePHID(); if (empty($device_map[$device_phid])) { continue; } // TODO: For now, only fetch over SSH. We could support fetching over // HTTP eventually. if ($binding->getAlmanacPropertyValue('protocol') != 'ssh') { continue; } $fetchable[] = $binding; } if (!$fetchable) { throw new Exception( pht( 'Leader lost: no up-to-date nodes in repository cluster are '. 'fetchable.')); } // If we can synchronize from multiple sources, choose one at random. shuffle($fetchable); $caught = null; foreach ($fetchable as $binding) { try { $this->synchronizeWorkingCopyFromBinding( $binding, $local_version, $remote_version); $caught = null; break; } catch (Exception $ex) { $caught = $ex; } } if ($caught) { throw $caught; } } /** * @task internal */ private function synchronizeWorkingCopyFromBinding( AlmanacBinding $binding, $local_version, $remote_version) { $repository = $this->getRepository(); $device = AlmanacKeys::getLiveDevice(); $this->logLine( pht( 'Synchronizing this device ("%s") from cluster leader ("%s").', $device->getName(), $binding->getDevice()->getName())); $fetch_uri = $repository->getClusterRepositoryURIFromBinding($binding); $local_path = $repository->getLocalPath(); if ($repository->isGit()) { $this->requireWorkingCopy(); $argv = array( 'fetch --prune -- %s %s', $fetch_uri, '+refs/*:refs/*', ); } else { throw new Exception(pht('Binding sync only supported for git!')); } $future = DiffusionCommandEngine::newCommandEngine($repository) ->setArgv($argv) ->setConnectAsDevice(true) ->setSudoAsDaemon(true) ->setURI($fetch_uri) ->newFuture(); $future->setCWD($local_path); $log = PhabricatorRepositorySyncEvent::initializeNewEvent() ->setRepositoryPHID($repository->getPHID()) ->setEpoch(PhabricatorTime::getNow()) ->setDevicePHID($device->getPHID()) ->setFromDevicePHID($binding->getDevice()->getPHID()) ->setDeviceVersion($local_version) ->setFromDeviceVersion($remote_version); $sync_start = microtime(true); try { $future->resolvex(); } catch (Exception $ex) { $log->setSyncWait(phutil_microseconds_since($sync_start)); if ($ex instanceof CommandException) { if ($future->getWasKilledByTimeout()) { $result_type = PhabricatorRepositorySyncEvent::RESULT_TIMEOUT; } else { $result_type = PhabricatorRepositorySyncEvent::RESULT_ERROR; } $log ->setResultCode($ex->getError()) ->setResultType($result_type) ->setProperty('stdout', $ex->getStdout()) ->setProperty('stderr', $ex->getStderr()); } else { $log ->setResultCode(1) ->setResultType(PhabricatorRepositorySyncEvent::RESULT_EXCEPTION) ->setProperty('message', $ex->getMessage()); } $log->save(); $this->logLine( pht( 'Synchronization of "%s" from leader "%s" failed: %s', $device->getName(), $binding->getDevice()->getName(), $ex->getMessage())); throw $ex; } $log ->setSyncWait(phutil_microseconds_since($sync_start)) ->setResultCode(0) ->setResultType(PhabricatorRepositorySyncEvent::RESULT_SYNC) ->save(); } /** * @task internal */ private function logLine($message) { return $this->logText("# {$message}\n"); } /** * @task internal */ private function logText($message) { $log = $this->logger; if ($log) { $log->writeClusterEngineLogMessage($message); } return $this; } private function requireWorkingCopy() { $repository = $this->getRepository(); $local_path = $repository->getLocalPath(); if (!Filesystem::pathExists($local_path)) { $device = AlmanacKeys::getLiveDevice(); throw new Exception( pht( 'Repository "%s" does not have a working copy on this device '. 'yet, so it can not be synchronized. Wait for the daemons to '. 'construct one or run `bin/repository update %s` on this host '. '("%s") to build it explicitly.', $repository->getDisplayName(), $repository->getMonogram(), $device->getName())); } } private function logActiveWriter( PhabricatorUser $viewer, PhabricatorRepository $repository) { $writer = PhabricatorRepositoryWorkingCopyVersion::loadWriter( $repository->getPHID()); if (!$writer) { $this->logLine(pht('Waiting on another user to finish writing...')); return; } $user_phid = $writer->getWriteProperty('userPHID'); $device_phid = $writer->getWriteProperty('devicePHID'); $epoch = $writer->getWriteProperty('epoch'); $phids = array($user_phid, $device_phid); $handles = $viewer->loadHandles($phids); $duration = (PhabricatorTime::getNow() - $epoch) + 1; $this->logLine( pht( 'Waiting for %s to finish writing (on device "%s" for %ss)...', $handles[$user_phid]->getName(), $handles[$device_phid]->getName(), new PhutilNumber($duration))); } } diff --git a/src/infrastructure/storage/management/workflow/PhabricatorStorageManagementWorkflow.php b/src/infrastructure/storage/management/workflow/PhabricatorStorageManagementWorkflow.php index 490036e36d..71f6374b2a 100644 --- a/src/infrastructure/storage/management/workflow/PhabricatorStorageManagementWorkflow.php +++ b/src/infrastructure/storage/management/workflow/PhabricatorStorageManagementWorkflow.php @@ -1,1287 +1,1287 @@ apis = $apis; return $this; } final public function getAnyAPI() { return head($this->getAPIs()); } final public function getMasterAPIs() { $apis = $this->getAPIs(); $results = array(); foreach ($apis as $api) { if ($api->getRef()->getIsMaster()) { $results[] = $api; } } if (!$results) { throw new PhutilArgumentUsageException( pht( 'This command only operates on database masters, but the selected '. 'database hosts do not include any masters.')); } return $results; } final public function getSingleAPI() { $apis = $this->getAPIs(); if (count($apis) == 1) { return head($apis); } throw new PhutilArgumentUsageException( pht( 'Phabricator is configured in cluster mode, with multiple database '. 'hosts. Use "--host" to specify which host you want to operate on.')); } final public function getAPIs() { return $this->apis; } final protected function isDryRun() { return $this->dryRun; } final protected function setDryRun($dry_run) { $this->dryRun = $dry_run; return $this; } final protected function isForce() { return $this->force; } final protected function setForce($force) { $this->force = $force; return $this; } public function getPatches() { return $this->patches; } public function setPatches(array $patches) { assert_instances_of($patches, 'PhabricatorStoragePatch'); $this->patches = $patches; return $this; } protected function isReadOnlyWorkflow() { return false; } public function execute(PhutilArgumentParser $args) { $this->setDryRun($args->getArg('dryrun')); $this->setForce($args->getArg('force')); if (!$this->isReadOnlyWorkflow()) { if (PhabricatorEnv::isReadOnly()) { if ($this->isForce()) { PhabricatorEnv::setReadOnly(false, null); } else { throw new PhutilArgumentUsageException( pht( 'Phabricator is currently in read-only mode. Use --force to '. 'override this mode.')); } } } return $this->didExecute($args); } public function didExecute(PhutilArgumentParser $args) {} private function loadSchemata(PhabricatorStorageManagementAPI $api) { $query = id(new PhabricatorConfigSchemaQuery()); $ref = $api->getRef(); $ref_key = $ref->getRefKey(); $query->setAPIs(array($api)); $query->setRefs(array($ref)); $actual = $query->loadActualSchemata(); $expect = $query->loadExpectedSchemata(); $comp = $query->buildComparisonSchemata($expect, $actual); return array( $comp[$ref_key], $expect[$ref_key], $actual[$ref_key], ); } final protected function adjustSchemata( PhabricatorStorageManagementAPI $api, $unsafe) { $lock = $this->lock($api); try { $err = $this->doAdjustSchemata($api, $unsafe); // Analyze tables if we're not doing a dry run and adjustments are either // all clear or have minor errors like surplus tables. if (!$this->dryRun) { $should_analyze = (($err == 0) || ($err == 2)); if ($should_analyze) { $this->analyzeTables($api); } } } catch (Exception $ex) { $lock->unlock(); throw $ex; } $lock->unlock(); return $err; } private function doAdjustSchemata( PhabricatorStorageManagementAPI $api, $unsafe) { $console = PhutilConsole::getConsole(); $console->writeOut( "%s\n", pht( 'Verifying database schemata on "%s"...', $api->getRef()->getRefKey())); list($adjustments, $errors) = $this->findAdjustments($api); if (!$adjustments) { $console->writeOut( "%s\n", pht('Found no adjustments for schemata.')); return $this->printErrors($errors, 0); } if (!$this->force && !$api->isCharacterSetAvailable('utf8mb4')) { $message = pht( "You have an old version of MySQL (older than 5.5) which does not ". "support the utf8mb4 character set. We strongly recommend upgrading ". "to 5.5 or newer.\n\n". "If you apply adjustments now and later update MySQL to 5.5 or newer, ". "you'll need to apply adjustments again (and they will take a long ". "time).\n\n". "You can exit this workflow, update MySQL now, and then run this ". "workflow again. This is recommended, but may cause a lot of downtime ". "right now.\n\n". "You can exit this workflow, continue using Phabricator without ". "applying adjustments, update MySQL at a later date, and then run ". "this workflow again. This is also a good approach, and will let you ". "delay downtime until later.\n\n". "You can proceed with this workflow, and then optionally update ". "MySQL at a later date. After you do, you'll need to apply ". "adjustments again.\n\n". "For more information, see \"Managing Storage Adjustments\" in ". "the documentation."); $console->writeOut( "\n** %s **\n\n%s\n", pht('OLD MySQL VERSION'), phutil_console_wrap($message)); $prompt = pht('Continue with old MySQL version?'); if (!phutil_console_confirm($prompt, $default_no = true)) { return; } } $table = id(new PhutilConsoleTable()) ->addColumn('database', array('title' => pht('Database'))) ->addColumn('table', array('title' => pht('Table'))) ->addColumn('name', array('title' => pht('Name'))) ->addColumn('info', array('title' => pht('Issues'))); foreach ($adjustments as $adjust) { $info = array(); foreach ($adjust['issues'] as $issue) { $info[] = PhabricatorConfigStorageSchema::getIssueName($issue); } $table->addRow(array( 'database' => $adjust['database'], 'table' => idx($adjust, 'table'), 'name' => idx($adjust, 'name'), 'info' => implode(', ', $info), )); } $console->writeOut("\n\n"); $table->draw(); if ($this->dryRun) { $console->writeOut( "%s\n", pht('DRYRUN: Would apply adjustments.')); return 0; } else if ($this->didInitialize) { // If we just initialized the database, continue without prompting. This // is nicer for first-time setup and there's no reasonable reason any // user would ever answer "no" to the prompt against an empty schema. } else if (!$this->force) { $console->writeOut( "\n%s\n", pht( "Found %s adjustment(s) to apply, detailed above.\n\n". "You can review adjustments in more detail from the web interface, ". "in Config > Database Status. To better understand the adjustment ". "workflow, see \"Managing Storage Adjustments\" in the ". "documentation.\n\n". "MySQL needs to copy table data to make some adjustments, so these ". "migrations may take some time.", phutil_count($adjustments))); $prompt = pht('Apply these schema adjustments?'); if (!phutil_console_confirm($prompt, $default_no = true)) { return 1; } } $console->writeOut( "%s\n", pht('Applying schema adjustments...')); $conn = $api->getConn(null); if ($unsafe) { queryfx($conn, 'SET SESSION sql_mode = %s', ''); } else { queryfx($conn, 'SET SESSION sql_mode = %s', 'STRICT_ALL_TABLES'); } $failed = array(); // We make changes in several phases. $phases = array( // Drop surplus autoincrements. This allows us to drop primary keys on // autoincrement columns. 'drop_auto', // Drop all keys we're going to adjust. This prevents them from // interfering with column changes. 'drop_keys', // Apply all database, table, and column changes. 'main', // Restore adjusted keys. 'add_keys', // Add missing autoincrements. 'add_auto', ); $bar = id(new PhutilConsoleProgressBar()) ->setTotal(count($adjustments) * count($phases)); foreach ($phases as $phase) { foreach ($adjustments as $adjust) { try { switch ($adjust['kind']) { case 'database': if ($phase == 'main') { queryfx( $conn, 'ALTER DATABASE %T CHARACTER SET = %s COLLATE = %s', $adjust['database'], $adjust['charset'], $adjust['collation']); } break; case 'table': if ($phase == 'main') { queryfx( $conn, 'ALTER TABLE %T.%T COLLATE = %s, ENGINE = %s', $adjust['database'], $adjust['table'], $adjust['collation'], $adjust['engine']); } break; case 'column': $apply = false; $auto = false; $new_auto = idx($adjust, 'auto'); if ($phase == 'drop_auto') { if ($new_auto === false) { $apply = true; $auto = false; } } else if ($phase == 'main') { $apply = true; if ($new_auto === false) { $auto = false; } else { $auto = $adjust['is_auto']; } } else if ($phase == 'add_auto') { if ($new_auto === true) { $apply = true; $auto = true; } } if ($apply) { $parts = array(); if ($auto) { $parts[] = qsprintf( $conn, 'AUTO_INCREMENT'); } if ($adjust['charset']) { switch ($adjust['charset']) { case 'binary': $charset_value = qsprintf($conn, 'binary'); break; case 'utf8': $charset_value = qsprintf($conn, 'utf8'); break; case 'utf8mb4': $charset_value = qsprintf($conn, 'utf8mb4'); break; default: throw new Exception( pht( 'Unsupported character set "%s".', $adjust['charset'])); } switch ($adjust['collation']) { case 'binary': $collation_value = qsprintf($conn, 'binary'); break; case 'utf8_general_ci': $collation_value = qsprintf($conn, 'utf8_general_ci'); break; case 'utf8mb4_bin': $collation_value = qsprintf($conn, 'utf8mb4_bin'); break; case 'utf8mb4_unicode_ci': $collation_value = qsprintf($conn, 'utf8mb4_unicode_ci'); break; default: throw new Exception( pht( 'Unsupported collation set "%s".', $adjust['collation'])); } $parts[] = qsprintf( $conn, 'CHARACTER SET %Q COLLATE %Q', $charset_value, $collation_value); } if ($parts) { $parts = qsprintf($conn, '%LJ', $parts); } else { $parts = qsprintf($conn, ''); } if ($adjust['nullable']) { $nullable = qsprintf($conn, 'NULL'); } else { $nullable = qsprintf($conn, 'NOT NULL'); } // TODO: We're using "%Z" here for the column type, which is // technically unsafe. It would be nice to be able to use "%Q" // instead, but this requires a fair amount of legwork to // enumerate all column types. queryfx( $conn, 'ALTER TABLE %T.%T MODIFY %T %Z %Q %Q', $adjust['database'], $adjust['table'], $adjust['name'], $adjust['type'], $parts, $nullable); } break; case 'key': if (($phase == 'drop_keys') && $adjust['exists']) { if ($adjust['name'] == 'PRIMARY') { $key_name = qsprintf($conn, 'PRIMARY KEY'); } else { $key_name = qsprintf($conn, 'KEY %T', $adjust['name']); } queryfx( $conn, 'ALTER TABLE %T.%T DROP %Q', $adjust['database'], $adjust['table'], $key_name); } if (($phase == 'add_keys') && $adjust['keep']) { // Different keys need different creation syntax. Notable // special cases are primary keys and fulltext keys. if ($adjust['name'] == 'PRIMARY') { $key_name = qsprintf($conn, 'PRIMARY KEY'); } else if ($adjust['indexType'] == 'FULLTEXT') { $key_name = qsprintf($conn, 'FULLTEXT %T', $adjust['name']); } else { if ($adjust['unique']) { $key_name = qsprintf( $conn, 'UNIQUE KEY %T', $adjust['name']); } else { $key_name = qsprintf( $conn, '/* NONUNIQUE */ KEY %T', $adjust['name']); } } queryfx( $conn, 'ALTER TABLE %T.%T ADD %Q (%LK)', $adjust['database'], $adjust['table'], $key_name, $adjust['columns']); } break; default: throw new Exception( pht('Unknown schema adjustment kind "%s"!', $adjust['kind'])); } } catch (AphrontQueryException $ex) { $failed[] = array($adjust, $ex); } $bar->update(1); } } $bar->done(); if (!$failed) { $console->writeOut( "%s\n", pht('Completed applying all schema adjustments.')); $err = 0; } else { $table = id(new PhutilConsoleTable()) ->addColumn('target', array('title' => pht('Target'))) ->addColumn('error', array('title' => pht('Error'))); foreach ($failed as $failure) { list($adjust, $ex) = $failure; $pieces = array_select_keys( $adjust, array('database', 'table', 'name')); $pieces = array_filter($pieces); $target = implode('.', $pieces); $table->addRow( array( 'target' => $target, 'error' => $ex->getMessage(), )); } $console->writeOut("\n"); $table->draw(); $console->writeOut( "\n%s\n", pht('Failed to make some schema adjustments, detailed above.')); $console->writeOut( "%s\n", pht( 'For help troubleshooting adjustments, see "Managing Storage '. 'Adjustments" in the documentation.')); $err = 1; } return $this->printErrors($errors, $err); } private function findAdjustments( PhabricatorStorageManagementAPI $api) { list($comp, $expect, $actual) = $this->loadSchemata($api); $issue_charset = PhabricatorConfigStorageSchema::ISSUE_CHARSET; $issue_collation = PhabricatorConfigStorageSchema::ISSUE_COLLATION; $issue_columntype = PhabricatorConfigStorageSchema::ISSUE_COLUMNTYPE; $issue_surpluskey = PhabricatorConfigStorageSchema::ISSUE_SURPLUSKEY; $issue_missingkey = PhabricatorConfigStorageSchema::ISSUE_MISSINGKEY; $issue_columns = PhabricatorConfigStorageSchema::ISSUE_KEYCOLUMNS; $issue_unique = PhabricatorConfigStorageSchema::ISSUE_UNIQUE; $issue_longkey = PhabricatorConfigStorageSchema::ISSUE_LONGKEY; $issue_auto = PhabricatorConfigStorageSchema::ISSUE_AUTOINCREMENT; $issue_engine = PhabricatorConfigStorageSchema::ISSUE_ENGINE; $adjustments = array(); $errors = array(); foreach ($comp->getDatabases() as $database_name => $database) { foreach ($this->findErrors($database) as $issue) { $errors[] = array( 'database' => $database_name, 'issue' => $issue, ); } $expect_database = $expect->getDatabase($database_name); $actual_database = $actual->getDatabase($database_name); if (!$expect_database || !$actual_database) { // If there's a real issue here, skip this stuff. continue; } if ($actual_database->getAccessDenied()) { // If we can't access the database, we can't access the tables either. continue; } $issues = array(); if ($database->hasIssue($issue_charset)) { $issues[] = $issue_charset; } if ($database->hasIssue($issue_collation)) { $issues[] = $issue_collation; } if ($issues) { $adjustments[] = array( 'kind' => 'database', 'database' => $database_name, 'issues' => $issues, 'charset' => $expect_database->getCharacterSet(), 'collation' => $expect_database->getCollation(), ); } foreach ($database->getTables() as $table_name => $table) { foreach ($this->findErrors($table) as $issue) { $errors[] = array( 'database' => $database_name, 'table' => $table_name, 'issue' => $issue, ); } $expect_table = $expect_database->getTable($table_name); $actual_table = $actual_database->getTable($table_name); if (!$expect_table || !$actual_table) { continue; } $issues = array(); if ($table->hasIssue($issue_collation)) { $issues[] = $issue_collation; } if ($table->hasIssue($issue_engine)) { $issues[] = $issue_engine; } if ($issues) { $adjustments[] = array( 'kind' => 'table', 'database' => $database_name, 'table' => $table_name, 'issues' => $issues, 'collation' => $expect_table->getCollation(), 'engine' => $expect_table->getEngine(), ); } foreach ($table->getColumns() as $column_name => $column) { foreach ($this->findErrors($column) as $issue) { $errors[] = array( 'database' => $database_name, 'table' => $table_name, 'name' => $column_name, 'issue' => $issue, ); } $expect_column = $expect_table->getColumn($column_name); $actual_column = $actual_table->getColumn($column_name); if (!$expect_column || !$actual_column) { continue; } $issues = array(); if ($column->hasIssue($issue_collation)) { $issues[] = $issue_collation; } if ($column->hasIssue($issue_charset)) { $issues[] = $issue_charset; } if ($column->hasIssue($issue_columntype)) { $issues[] = $issue_columntype; } if ($column->hasIssue($issue_auto)) { $issues[] = $issue_auto; } if ($issues) { if ($expect_column->getCharacterSet() === null) { // For non-text columns, we won't be specifying a collation or // character set. $charset = null; $collation = null; } else { $charset = $expect_column->getCharacterSet(); $collation = $expect_column->getCollation(); } $adjustment = array( 'kind' => 'column', 'database' => $database_name, 'table' => $table_name, 'name' => $column_name, 'issues' => $issues, 'collation' => $collation, 'charset' => $charset, 'type' => $expect_column->getColumnType(), // NOTE: We don't adjust column nullability because it is // dangerous, so always use the current nullability. 'nullable' => $actual_column->getNullable(), // NOTE: This always stores the current value, because we have // to make these updates separately. 'is_auto' => $actual_column->getAutoIncrement(), ); if ($column->hasIssue($issue_auto)) { $adjustment['auto'] = $expect_column->getAutoIncrement(); } $adjustments[] = $adjustment; } } foreach ($table->getKeys() as $key_name => $key) { foreach ($this->findErrors($key) as $issue) { $errors[] = array( 'database' => $database_name, 'table' => $table_name, 'name' => $key_name, 'issue' => $issue, ); } $expect_key = $expect_table->getKey($key_name); $actual_key = $actual_table->getKey($key_name); $issues = array(); $keep_key = true; if ($key->hasIssue($issue_surpluskey)) { $issues[] = $issue_surpluskey; $keep_key = false; } if ($key->hasIssue($issue_missingkey)) { $issues[] = $issue_missingkey; } if ($key->hasIssue($issue_columns)) { $issues[] = $issue_columns; } if ($key->hasIssue($issue_unique)) { $issues[] = $issue_unique; } // NOTE: We can't really fix this, per se, but we may need to remove // the key to change the column type. In the best case, the new // column type won't be overlong and recreating the key really will // fix the issue. In the worst case, we get the right column type and // lose the key, which is still better than retaining the key having // the wrong column type. if ($key->hasIssue($issue_longkey)) { $issues[] = $issue_longkey; } if ($issues) { $adjustment = array( 'kind' => 'key', 'database' => $database_name, 'table' => $table_name, 'name' => $key_name, 'issues' => $issues, 'exists' => (bool)$actual_key, 'keep' => $keep_key, ); if ($keep_key) { $adjustment += array( 'columns' => $expect_key->getColumnNames(), 'unique' => $expect_key->getUnique(), 'indexType' => $expect_key->getIndexType(), ); } $adjustments[] = $adjustment; } } } } return array($adjustments, $errors); } private function findErrors(PhabricatorConfigStorageSchema $schema) { $result = array(); foreach ($schema->getLocalIssues() as $issue) { $status = PhabricatorConfigStorageSchema::getIssueStatus($issue); if ($status == PhabricatorConfigStorageSchema::STATUS_FAIL) { $result[] = $issue; } } return $result; } private function printErrors(array $errors, $default_return) { if (!$errors) { return $default_return; } $console = PhutilConsole::getConsole(); $table = id(new PhutilConsoleTable()) ->addColumn('target', array('title' => pht('Target'))) ->addColumn('error', array('title' => pht('Error'))); $any_surplus = false; $all_surplus = true; $any_access = false; $all_access = true; foreach ($errors as $error) { $pieces = array_select_keys( $error, array('database', 'table', 'name')); $pieces = array_filter($pieces); $target = implode('.', $pieces); $name = PhabricatorConfigStorageSchema::getIssueName($error['issue']); $issue = $error['issue']; if ($issue === PhabricatorConfigStorageSchema::ISSUE_SURPLUS) { $any_surplus = true; } else { $all_surplus = false; } if ($issue === PhabricatorConfigStorageSchema::ISSUE_ACCESSDENIED) { $any_access = true; } else { $all_access = false; } $table->addRow( array( 'target' => $target, 'error' => $name, )); } $console->writeOut("\n"); $table->draw(); $console->writeOut("\n"); $message = array(); if ($all_surplus) { $message[] = pht( 'You have surplus schemata (extra tables or columns which Phabricator '. 'does not expect). For information on resolving these '. 'issues, see the "Surplus Schemata" section in the "Managing Storage '. 'Adjustments" article in the documentation.'); } else if ($all_access) { $message[] = pht( 'The user you are connecting to MySQL with does not have the correct '. 'permissions, and can not access some databases or tables that it '. 'needs to be able to access. GRANT the user additional permissions.'); } else { $message[] = pht( 'The schemata have errors (detailed above) which the adjustment '. 'workflow can not fix.'); if ($any_access) { $message[] = pht( 'Some of these errors are caused by access control problems. '. 'The user you are connecting with does not have permission to see '. 'all of the database or tables that Phabricator uses. You need to '. 'GRANT the user more permission, or use a different user.'); } if ($any_surplus) { $message[] = pht( 'Some of these errors are caused by surplus schemata (extra '. 'tables or columns which Phabricator does not expect). These are '. 'not serious. For information on resolving these issues, see the '. '"Surplus Schemata" section in the "Managing Storage Adjustments" '. 'article in the documentation.'); } $message[] = pht( 'If you are not developing Phabricator itself, report this issue to '. 'the upstream.'); $message[] = pht( 'If you are developing Phabricator, these errors usually indicate '. 'that your schema specifications do not agree with the schemata your '. 'code actually builds.'); } $message = implode("\n\n", $message); if ($all_surplus) { $console->writeOut( "** %s **\n\n%s\n", pht('SURPLUS SCHEMATA'), phutil_console_wrap($message)); } else if ($all_access) { $console->writeOut( "** %s **\n\n%s\n", pht('ACCESS DENIED'), phutil_console_wrap($message)); } else { $console->writeOut( "** %s **\n\n%s\n", pht('SCHEMATA ERRORS'), phutil_console_wrap($message)); } return 2; } final protected function upgradeSchemata( array $apis, $apply_only = null, $no_quickstart = false, $init_only = false) { $locks = array(); foreach ($apis as $api) { $locks[] = $this->lock($api); } try { $this->doUpgradeSchemata($apis, $apply_only, $no_quickstart, $init_only); } catch (Exception $ex) { foreach ($locks as $lock) { $lock->unlock(); } throw $ex; } foreach ($locks as $lock) { $lock->unlock(); } } private function doUpgradeSchemata( array $apis, $apply_only, $no_quickstart, $init_only) { $patches = $this->patches; $is_dryrun = $this->dryRun; // We expect that patches should already be sorted properly. However, // phase behavior will be wrong if they aren't, so make sure. $patches = msortv($patches, 'newSortVector'); $api_map = array(); foreach ($apis as $api) { $api_map[$api->getRef()->getRefKey()] = $api; } foreach ($api_map as $ref_key => $api) { $applied = $api->getAppliedPatches(); $needs_init = ($applied === null); if (!$needs_init) { continue; } if ($is_dryrun) { echo tsprintf( "%s\n", pht( 'DRYRUN: Storage on host "%s" does not exist yet, so it '. 'would be created.', $ref_key)); continue; } if ($apply_only) { throw new PhutilArgumentUsageException( pht( 'Storage on host "%s" has not been initialized yet. You must '. 'initialize storage before selectively applying patches.', $ref_key)); } // If we're initializing storage for the first time on any host, track // it so that we can give the user a nicer experience during the // subsequent adjustment phase. $this->didInitialize = true; $legacy = $api->getLegacyPatches($patches); if ($legacy || $no_quickstart || $init_only) { // If we have legacy patches, we can't quickstart. $api->createDatabase('meta_data'); $api->createTable( 'meta_data', 'patch_status', array( 'patch VARCHAR(255) NOT NULL PRIMARY KEY COLLATE utf8_general_ci', 'applied INT UNSIGNED NOT NULL', )); foreach ($legacy as $patch) { $api->markPatchApplied($patch); } } else { echo tsprintf( "%s\n", pht( 'Loading quickstart template onto "%s"...', $ref_key)); $root = dirname(phutil_get_library_root('phabricator')); $sql = $root.'/resources/sql/quickstart.sql'; $api->applyPatchSQL($sql); } } if ($init_only) { echo pht('Storage initialized.')."\n"; return 0; } $applied_map = array(); $state_map = array(); foreach ($api_map as $ref_key => $api) { $applied = $api->getAppliedPatches(); // If we still have nothing applied, this is a dry run and we didn't // actually initialize storage. Here, just do nothing. if ($applied === null) { if ($is_dryrun) { continue; } else { throw new Exception( pht( 'Database initialization on host "%s" applied no patches!', $ref_key)); } } $applied = array_fuse($applied); $state_map[$ref_key] = $applied; if ($apply_only) { if (isset($applied[$apply_only])) { if (!$this->force && !$is_dryrun) { echo phutil_console_wrap( pht( 'Patch "%s" has already been applied on host "%s". Are you '. 'sure you want to apply it again? This may put your storage '. 'in a state that the upgrade scripts can not automatically '. 'manage.', $apply_only, $ref_key)); if (!phutil_console_confirm(pht('Apply patch again?'))) { echo pht('Cancelled.')."\n"; return 1; } } // Mark this patch as not yet applied on this host. unset($applied[$apply_only]); } } $applied_map[$ref_key] = $applied; } // If we're applying only a specific patch, select just that patch. if ($apply_only) { $patches = array_select_keys($patches, array($apply_only)); } // Apply each patch to each database. We apply patches patch-by-patch, // not database-by-database: for each patch we apply it to every database, // then move to the next patch. // We must do this because ".php" patches may depend on ".sql" patches // being up to date on all masters, and that will work fine if we put each // patch on every host before moving on. If we try to bring database hosts // up to date one at a time we can end up in a big mess. $duration_map = array(); // First, find any global patches which have been applied to ANY database. // We are just going to mark these as applied without actually running // them. Otherwise, adding new empty masters to an existing cluster will // try to apply them against invalid states. foreach ($patches as $key => $patch) { if ($patch->getIsGlobalPatch()) { foreach ($applied_map as $ref_key => $applied) { if (isset($applied[$key])) { $duration_map[$key] = 1; } } } } while (true) { $applied_something = false; foreach ($patches as $key => $patch) { // First, check if any databases need this patch. We can just skip it // if it has already been applied everywhere. $need_patch = array(); foreach ($applied_map as $ref_key => $applied) { if (isset($applied[$key])) { continue; } $need_patch[] = $ref_key; } if (!$need_patch) { unset($patches[$key]); continue; } // Check if we can apply this patch yet. Before we can apply a patch, // all of the dependencies for the patch must have been applied on all // databases. Requiring that all databases stay in sync prevents one // database from racing ahead if it happens to get a patch that nothing // else has yet. $missing_patch = null; foreach ($patch->getAfter() as $after) { foreach ($applied_map as $ref_key => $applied) { if (isset($applied[$after])) { // This database already has the patch. We can apply it to // other databases but don't need to apply it here. continue; } $missing_patch = $after; break 2; } } if ($missing_patch) { if ($apply_only) { echo tsprintf( "%s\n", pht( 'Unable to apply patch "%s" because it depends on patch '. '"%s", which has not been applied on some hosts: %s.', $apply_only, $missing_patch, implode(', ', $need_patch))); return 1; } else { // Some databases are missing the dependencies, so keep trying // other patches instead. If everything goes right, we'll apply the // dependencies and then come back and apply this patch later. continue; } } $is_global = $patch->getIsGlobalPatch(); $patch_apis = array_select_keys($api_map, $need_patch); foreach ($patch_apis as $ref_key => $api) { if ($is_global) { // If this is a global patch which we previously applied, just // read the duration from the map without actually applying // the patch. $duration = idx($duration_map, $key); } else { $duration = null; } if ($duration === null) { if ($is_dryrun) { echo tsprintf( "%s\n", pht( 'DRYRUN: Would apply patch "%s" to host "%s".', $key, $ref_key)); } else { echo tsprintf( "%s\n", pht( 'Applying patch "%s" to host "%s"...', $key, $ref_key)); } $t_begin = microtime(true); if (!$is_dryrun) { $api->applyPatch($patch); } $t_end = microtime(true); $duration = ($t_end - $t_begin); $duration_map[$key] = $duration; } // If we're explicitly reapplying this patch, we don't need to // mark it as applied. if (!isset($state_map[$ref_key][$key])) { if (!$is_dryrun) { $api->markPatchApplied($key, ($t_end - $t_begin)); } $applied_map[$ref_key][$key] = true; } } // We applied this everywhere, so we're done with the patch. unset($patches[$key]); $applied_something = true; } if (!$applied_something) { if ($patches) { throw new Exception( pht( 'Some patches could not be applied: %s', implode(', ', array_keys($patches)))); } else if (!$is_dryrun && !$apply_only) { echo pht( 'Storage is up to date. Use "%s" for details.', 'storage status')."\n"; } break; } } } final protected function getBareHostAndPort($host) { // Split out port information, since the command-line client requires a // separate flag for the port. $uri = new PhutilURI('mysql://'.$host); if ($uri->getPort()) { $port = $uri->getPort(); $bare_hostname = $uri->getDomain(); } else { $port = null; $bare_hostname = $host; } return array($bare_hostname, $port); } /** * Acquires a @{class:PhabricatorGlobalLock}. * * @return PhabricatorGlobalLock */ final protected function lock(PhabricatorStorageManagementAPI $api) { // Although we're holding this lock on different databases so it could // have the same name on each as far as the database is concerned, the // locks would be the same within this process. $parameters = array( 'refKey' => $api->getRef()->getRefKey(), ); // We disable logging for this lock because we may not have created the // log table yet, or may need to adjust it. return PhabricatorGlobalLock::newLock('adjust', $parameters) - ->useSpecificConnection($api->getConn(null)) + ->setExternalConnection($api->getConn(null)) ->setDisableLogging(true) ->lock(); } final protected function analyzeTables( PhabricatorStorageManagementAPI $api) { // Analyzing tables can sometimes have a significant effect on query // performance, particularly for the fulltext ngrams tables. See T12819 // for some specific examples. $conn = $api->getConn(null); $patches = $this->getPatches(); $databases = $api->getDatabaseList($patches, true); $this->logInfo( pht('ANALYZE'), pht('Analyzing tables...')); $targets = array(); foreach ($databases as $database) { queryfx($conn, 'USE %C', $database); $tables = queryfx_all($conn, 'SHOW TABLE STATUS'); foreach ($tables as $table) { $table_name = $table['Name']; $targets[] = array( 'database' => $database, 'table' => $table_name, ); } } $bar = id(new PhutilConsoleProgressBar()) ->setTotal(count($targets)); foreach ($targets as $target) { queryfx( $conn, 'ANALYZE TABLE %T.%T', $target['database'], $target['table']); $bar->update(1); } $bar->done(); $this->logOkay( pht('ANALYZED'), pht( 'Analyzed %d table(s).', count($targets))); } } diff --git a/src/infrastructure/util/PhabricatorGlobalLock.php b/src/infrastructure/util/PhabricatorGlobalLock.php index cf380c3726..3bf5ab0c54 100644 --- a/src/infrastructure/util/PhabricatorGlobalLock.php +++ b/src/infrastructure/util/PhabricatorGlobalLock.php @@ -1,396 +1,407 @@ lock(); * do_contentious_things(); * $lock->unlock(); * * NOTE: This lock is not completely global; it is namespaced to the active * storage namespace so that unit tests running in separate table namespaces * are isolated from one another. * * @task construct Constructing Locks * @task impl Implementation */ final class PhabricatorGlobalLock extends PhutilLock { private $parameters; private $conn; private $externalConnection; private $log; private $disableLogging; private static $pool = array(); /* -( Constructing Locks )------------------------------------------------- */ public static function newLock($name, $parameters = array()) { $namespace = PhabricatorLiskDAO::getStorageNamespace(); $namespace = PhabricatorHash::digestToLength($namespace, 20); $parts = array(); ksort($parameters); foreach ($parameters as $key => $parameter) { if (!preg_match('/^[a-zA-Z0-9]+\z/', $key)) { throw new Exception( pht( 'Lock parameter key "%s" must be alphanumeric.', $key)); } if (!is_scalar($parameter) && !is_null($parameter)) { throw new Exception( pht( 'Lock parameter for key "%s" must be a scalar.', $key)); } $value = phutil_json_encode($parameter); $parts[] = "{$key}={$value}"; } $parts = implode(', ', $parts); $local = "{$name}({$parts})"; $local = PhabricatorHash::digestToLength($local, 20); $full_name = "ph:{$namespace}:{$local}"; $lock = self::getLock($full_name); if (!$lock) { $lock = new PhabricatorGlobalLock($full_name); self::registerLock($lock); $lock->parameters = $parameters; } return $lock; } /** * Use a specific database connection for locking. * * By default, `PhabricatorGlobalLock` will lock on the "repository" database * (somewhat arbitrarily). In most cases this is fine, but this method can * be used to lock on a specific connection. * * @param AphrontDatabaseConnection * @return this */ - public function useSpecificConnection(AphrontDatabaseConnection $conn) { - $this->conn = $conn; + public function setExternalConnection(AphrontDatabaseConnection $conn) { + if ($this->conn) { + throw new Exception( + pht( + 'Lock is already held, and must be released before the '. + 'connection may be changed.')); + } $this->externalConnection = $conn; return $this; } public function setDisableLogging($disable) { $this->disableLogging = $disable; return $this; } /* -( Connection Pool )---------------------------------------------------- */ public static function getConnectionPoolSize() { return count(self::$pool); } public static function clearConnectionPool() { self::$pool = array(); } /* -( Implementation )----------------------------------------------------- */ protected function doLock($wait) { $conn = $this->conn; + if (!$conn) { + if ($this->externalConnection) { + $conn = $this->externalConnection; + } + } + if (!$conn) { // Try to reuse a connection from the connection pool. $conn = array_pop(self::$pool); } if (!$conn) { // NOTE: Using the 'repository' database somewhat arbitrarily, mostly // because the first client of locks is the repository daemons. We must // always use the same database for all locks, but don't access any // tables so we could use any valid database. We could build a // database-free connection instead, but that's kind of messy and we // might forget about it in the future if we vertically partition the // application. $dao = new PhabricatorRepository(); // NOTE: Using "force_new" to make sure each lock is on its own // connection. $conn = $dao->establishConnection('w', $force_new = true); } // NOTE: Since MySQL will disconnect us if we're idle for too long, we set // the wait_timeout to an enormous value, to allow us to hold the // connection open indefinitely (or, at least, for 24 days). $max_allowed_timeout = 2147483; queryfx($conn, 'SET wait_timeout = %d', $max_allowed_timeout); $lock_name = $this->getName(); $result = queryfx_one( $conn, 'SELECT GET_LOCK(%s, %f)', $lock_name, $wait); $ok = head($result); if (!$ok) { // See PHI1794. We failed to acquire the lock, but the connection itself // is still good. We're done with it, so add it to the pool, just as we // would if we were releasing the lock. // If we don't do this, we may establish a huge number of connections // very rapidly if many workers try to acquire a lock at once. For // example, this can happen if there are a large number of webhook tasks // in the queue. self::$pool[] = $conn; throw id(new PhutilLockException($lock_name)) ->setHint($this->newHint($lock_name, $wait)); } $conn->rememberLock($lock_name); $this->conn = $conn; if ($this->shouldLogLock()) { $lock_context = $this->newLockContext(); $log = id(new PhabricatorDaemonLockLog()) ->setLockName($lock_name) ->setLockParameters($this->parameters) ->setLockContext($lock_context) ->save(); $this->log = $log; } } protected function doUnlock() { $lock_name = $this->getName(); $conn = $this->conn; try { $result = queryfx_one( $conn, 'SELECT RELEASE_LOCK(%s)', $lock_name); $conn->forgetLock($lock_name); } catch (Exception $ex) { $result = array(null); } $ok = head($result); if (!$ok) { // TODO: We could throw here, but then this lock doesn't get marked // unlocked and we throw again later when exiting. It also doesn't // particularly matter for any current applications. For now, just // swallow the error. } $this->conn = null; if (!$this->externalConnection) { $conn->close(); self::$pool[] = $conn; } if ($this->log) { $log = $this->log; $this->log = null; $conn = $log->establishConnection('w'); queryfx( $conn, 'UPDATE %T SET lockReleased = UNIX_TIMESTAMP() WHERE id = %d', $log->getTableName(), $log->getID()); } } private function shouldLogLock() { if ($this->disableLogging) { return false; } $policy = id(new PhabricatorDaemonLockLogGarbageCollector()) ->getRetentionPolicy(); if (!$policy) { return false; } return true; } private function newLockContext() { $context = array( 'pid' => getmypid(), 'host' => php_uname('n'), 'sapi' => php_sapi_name(), ); global $argv; if ($argv) { $context['argv'] = $argv; } $access_log = null; // TODO: There's currently no cohesive way to get the parameterized access // log for the current request across different request types. Web requests // have an "AccessLog", SSH requests have an "SSHLog", and other processes // (like scripts) have no log. But there's no method to say "give me any // log you've got". For now, just test if we have a web request and use the // "AccessLog" if we do, since that's the only one we actually read any // parameters from. // NOTE: "PhabricatorStartup" is only available from web requests, not // from CLI scripts. if (class_exists('PhabricatorStartup', false)) { $access_log = PhabricatorAccessLog::getLog(); } if ($access_log) { $controller = $access_log->getData('C'); if ($controller) { $context['controller'] = $controller; } $method = $access_log->getData('m'); if ($method) { $context['method'] = $method; } } return $context; } private function newHint($lock_name, $wait) { if (!$this->shouldLogLock()) { return pht( 'Enable the lock log for more detailed information about '. 'which process is holding this lock.'); } $now = PhabricatorTime::getNow(); // First, look for recent logs. If other processes have been acquiring and // releasing this lock while we've been waiting, this is more likely to be // a contention/throughput issue than an issue with something hung while // holding the lock. $limit = 100; $logs = id(new PhabricatorDaemonLockLog())->loadAllWhere( 'lockName = %s AND dateCreated >= %d ORDER BY id ASC LIMIT %d', $lock_name, ($now - $wait), $limit); if ($logs) { if (count($logs) === $limit) { return pht( 'During the last %s second(s) spent waiting for the lock, more '. 'than %s other process(es) acquired it, so this is likely a '. 'bottleneck. Use "bin/lock log --name %s" to review log activity.', new PhutilNumber($wait), new PhutilNumber($limit), $lock_name); } else { return pht( 'During the last %s second(s) spent waiting for the lock, %s '. 'other process(es) acquired it, so this is likely a '. 'bottleneck. Use "bin/lock log --name %s" to review log activity.', new PhutilNumber($wait), phutil_count($logs), $lock_name); } } $last_log = id(new PhabricatorDaemonLockLog())->loadOneWhere( 'lockName = %s ORDER BY id DESC LIMIT 1', $lock_name); if ($last_log) { $info = array(); $acquired = $last_log->getDateCreated(); $context = $last_log->getLockContext(); $process_info = array(); $pid = idx($context, 'pid'); if ($pid) { $process_info[] = 'pid='.$pid; } $host = idx($context, 'host'); if ($host) { $process_info[] = 'host='.$host; } $sapi = idx($context, 'sapi'); if ($sapi) { $process_info[] = 'sapi='.$sapi; } $argv = idx($context, 'argv'); if ($argv) { $process_info[] = 'argv='.(string)csprintf('%LR', $argv); } $controller = idx($context, 'controller'); if ($controller) { $process_info[] = 'controller='.$controller; } $method = idx($context, 'method'); if ($method) { $process_info[] = 'method='.$method; } $process_info = implode(', ', $process_info); $info[] = pht( 'This lock was most recently acquired by a process (%s) '. '%s second(s) ago.', $process_info, new PhutilNumber($now - $acquired)); $released = $last_log->getLockReleased(); if ($released) { $info[] = pht( 'This lock was released %s second(s) ago.', new PhutilNumber($now - $released)); } else { $info[] = pht('There is no record of this lock being released.'); } return implode(' ', $info); } return pht( 'Found no records of processes acquiring or releasing this lock.'); } } diff --git a/src/infrastructure/util/__tests__/PhabricatorGlobalLockTestCase.php b/src/infrastructure/util/__tests__/PhabricatorGlobalLockTestCase.php index 4e24cdfed4..8d5176f391 100644 --- a/src/infrastructure/util/__tests__/PhabricatorGlobalLockTestCase.php +++ b/src/infrastructure/util/__tests__/PhabricatorGlobalLockTestCase.php @@ -1,92 +1,116 @@ true, ); } public function testConnectionPoolWithDefaultConnection() { PhabricatorGlobalLock::clearConnectionPool(); $this->assertEqual( 0, PhabricatorGlobalLock::getConnectionPoolSize(), pht('Clear Connection Pool')); $lock_name = $this->newLockName(); $lock = PhabricatorGlobalLock::newLock($lock_name); $lock->lock(); $this->assertEqual( 0, PhabricatorGlobalLock::getConnectionPoolSize(), pht('Connection Pool With Lock')); $lock->unlock(); $this->assertEqual( 1, PhabricatorGlobalLock::getConnectionPoolSize(), pht('Connection Pool With Lock Released')); PhabricatorGlobalLock::clearConnectionPool(); } public function testConnectionPoolWithSpecificConnection() { $conn = id(new HarbormasterScratchTable()) ->establishConnection('w'); PhabricatorGlobalLock::clearConnectionPool(); $this->assertEqual( 0, PhabricatorGlobalLock::getConnectionPoolSize(), pht('Clear Connection Pool')); $this->assertEqual( false, $conn->isHoldingAnyLock(), pht('Specific Connection, No Lock')); $lock_name = $this->newLockName(); $lock = PhabricatorGlobalLock::newLock($lock_name); - $lock->useSpecificConnection($conn); + $lock->setExternalConnection($conn); $lock->lock(); $this->assertEqual( 0, PhabricatorGlobalLock::getConnectionPoolSize(), pht('Connection Pool + Specific, With Lock')); $this->assertEqual( true, $conn->isHoldingAnyLock(), pht('Specific Connection, Holding Lock')); $lock->unlock(); // The specific connection provided should NOT be returned to the // connection pool. $this->assertEqual( 0, PhabricatorGlobalLock::getConnectionPoolSize(), pht('Connection Pool + Specific, With Lock Released')); $this->assertEqual( false, $conn->isHoldingAnyLock(), pht('Specific Connection, No Lock')); PhabricatorGlobalLock::clearConnectionPool(); } + public function testExternalConnectionMutationScope() { + $conn = id(new HarbormasterScratchTable()) + ->establishConnection('w'); + + $lock_name = $this->newLockName(); + $lock = PhabricatorGlobalLock::newLock($lock_name); + $lock->lock(); + + $caught = null; + try { + $lock->setExternalConnection($conn); + } catch (Exception $ex) { + $caught = $ex; + } catch (Throwable $ex) { + $caught = $ex; + } + + $lock->unlock(); + + $this->assertTrue( + ($caught instanceof Exception), + pht('Changing connection while locked is forbidden.')); + } + private function newLockName() { return 'testlock-'.Filesystem::readRandomCharacters(16); } }