diff --git a/resources/sql/autopatches/20180322.lock.02.wait.sql b/resources/sql/autopatches/20180322.lock.02.wait.sql new file mode 100644 index 0000000000..cba7cc64d0 --- /dev/null +++ b/resources/sql/autopatches/20180322.lock.02.wait.sql @@ -0,0 +1,8 @@ +ALTER TABLE {$NAMESPACE}_repository.repository_pushevent + ADD writeWait BIGINT UNSIGNED; + +ALTER TABLE {$NAMESPACE}_repository.repository_pushevent + ADD readWait BIGINT UNSIGNED; + +ALTER TABLE {$NAMESPACE}_repository.repository_pushevent + ADD hostWait BIGINT UNSIGNED; diff --git a/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php b/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php index c855b6c532..d5ba74a30e 100644 --- a/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php +++ b/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php @@ -1,810 +1,824 @@ repository = $repository; return $this; } public function getRepository() { return $this->repository; } public function setViewer(PhabricatorUser $viewer) { $this->viewer = $viewer; return $this; } public function getViewer() { return $this->viewer; } public function setLog(DiffusionRepositoryClusterEngineLogInterface $log) { $this->logger = $log; return $this; } /* -( Cluster Synchronization )-------------------------------------------- */ /** * Synchronize repository version information after creating a repository. * * This initializes working copy versions for all currently bound devices to * 0, so that we don't get stuck making an ambiguous choice about which * devices are leaders when we later synchronize before a read. * * @task sync */ public function synchronizeWorkingCopyAfterCreation() { if (!$this->shouldEnableSynchronization(false)) { return; } $repository = $this->getRepository(); $repository_phid = $repository->getPHID(); $service = $repository->loadAlmanacService(); if (!$service) { throw new Exception(pht('Failed to load repository cluster service.')); } $bindings = $service->getActiveBindings(); foreach ($bindings as $binding) { PhabricatorRepositoryWorkingCopyVersion::updateVersion( $repository_phid, $binding->getDevicePHID(), 0); } return $this; } /** * @task sync */ public function synchronizeWorkingCopyAfterHostingChange() { if (!$this->shouldEnableSynchronization(false)) { return; } $repository = $this->getRepository(); $repository_phid = $repository->getPHID(); $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( $repository_phid); $versions = mpull($versions, null, 'getDevicePHID'); // After converting a hosted repository to observed, or vice versa, we // need to reset version numbers because the clocks for observed and hosted // repositories run on different units. // We identify all the cluster leaders and reset their version to 0. // We identify all the cluster followers and demote them. // This allows the cluster to start over again at version 0 but keep the // same leaders. if ($versions) { $max_version = (int)max(mpull($versions, 'getRepositoryVersion')); foreach ($versions as $version) { $device_phid = $version->getDevicePHID(); if ($version->getRepositoryVersion() == $max_version) { PhabricatorRepositoryWorkingCopyVersion::updateVersion( $repository_phid, $device_phid, 0); } else { PhabricatorRepositoryWorkingCopyVersion::demoteDevice( $repository_phid, $device_phid); } } } return $this; } /** * @task sync */ public function synchronizeWorkingCopyBeforeRead() { if (!$this->shouldEnableSynchronization(true)) { return; } $repository = $this->getRepository(); $repository_phid = $repository->getPHID(); $device = AlmanacKeys::getLiveDevice(); $device_phid = $device->getPHID(); $read_lock = PhabricatorRepositoryWorkingCopyVersion::getReadLock( $repository_phid, $device_phid); $lock_wait = phutil_units('2 minutes in seconds'); $this->logLine( pht( 'Acquiring read lock for repository "%s" on device "%s"...', $repository->getDisplayName(), $device->getName())); try { $start = PhabricatorTime::getNow(); $read_lock->lock($lock_wait); $waited = (PhabricatorTime::getNow() - $start); if ($waited) { $this->logLine( pht( 'Acquired read lock after %s second(s).', new PhutilNumber($waited))); } else { $this->logLine( pht( 'Acquired read lock immediately.')); } } catch (Exception $ex) { throw new PhutilProxyException( pht( 'Failed to acquire read lock after waiting %s second(s). You '. 'may be able to retry later.', new PhutilNumber($lock_wait)), $ex); } $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( $repository_phid); $versions = mpull($versions, null, 'getDevicePHID'); $this_version = idx($versions, $device_phid); if ($this_version) { $this_version = (int)$this_version->getRepositoryVersion(); } else { $this_version = -1; } if ($versions) { // This is the normal case, where we have some version information and // can identify which nodes are leaders. If the current node is not a // leader, we want to fetch from a leader and then update our version. $max_version = (int)max(mpull($versions, 'getRepositoryVersion')); if ($max_version > $this_version) { if ($repository->isHosted()) { $fetchable = array(); foreach ($versions as $version) { if ($version->getRepositoryVersion() == $max_version) { $fetchable[] = $version->getDevicePHID(); } } $this->synchronizeWorkingCopyFromDevices($fetchable); } else { $this->synchornizeWorkingCopyFromRemote(); } PhabricatorRepositoryWorkingCopyVersion::updateVersion( $repository_phid, $device_phid, $max_version); } else { $this->logLine( pht( 'Device "%s" is already a cluster leader and does not need '. 'to be synchronized.', $device->getName())); } $result_version = $max_version; } else { // If no version records exist yet, we need to be careful, because we // can not tell which nodes are leaders. // There might be several nodes with arbitrary existing data, and we have // no way to tell which one has the "right" data. If we pick wrong, we // might erase some or all of the data in the repository. // Since this is dangerous, we refuse to guess unless there is only one // device. If we're the only device in the group, we obviously must be // a leader. $service = $repository->loadAlmanacService(); if (!$service) { throw new Exception(pht('Failed to load repository cluster service.')); } $bindings = $service->getActiveBindings(); $device_map = array(); foreach ($bindings as $binding) { $device_map[$binding->getDevicePHID()] = true; } if (count($device_map) > 1) { throw new Exception( pht( 'Repository "%s" exists on more than one device, but no device '. 'has any repository version information. Phabricator can not '. 'guess which copy of the existing data is authoritative. Promote '. 'a device or see "Ambiguous Leaders" in the documentation.', $repository->getDisplayName())); } if (empty($device_map[$device->getPHID()])) { throw new Exception( pht( 'Repository "%s" is being synchronized on device "%s", but '. 'this device is not bound to the corresponding cluster '. 'service ("%s").', $repository->getDisplayName(), $device->getName(), $service->getName())); } // The current device is the only device in service, so it must be a // leader. We can safely have any future nodes which come online read // from it. PhabricatorRepositoryWorkingCopyVersion::updateVersion( $repository_phid, $device_phid, 0); $result_version = 0; } $read_lock->unlock(); return $result_version; } /** * @task sync */ public function synchronizeWorkingCopyBeforeWrite() { if (!$this->shouldEnableSynchronization(true)) { return; } $repository = $this->getRepository(); $viewer = $this->getViewer(); $repository_phid = $repository->getPHID(); $device = AlmanacKeys::getLiveDevice(); $device_phid = $device->getPHID(); $table = new PhabricatorRepositoryWorkingCopyVersion(); $locked_connection = $table->establishConnection('w'); $write_lock = PhabricatorRepositoryWorkingCopyVersion::getWriteLock( $repository_phid); $write_lock->useSpecificConnection($locked_connection); $this->logLine( pht( 'Acquiring write lock for repository "%s"...', $repository->getDisplayName())); $lock_wait = phutil_units('2 minutes in seconds'); try { + $write_wait_start = microtime(true); + $start = PhabricatorTime::getNow(); $step_wait = 1; while (true) { try { $write_lock->lock((int)floor($step_wait)); + $write_wait_end = microtime(true); break; } catch (PhutilLockException $ex) { $waited = (PhabricatorTime::getNow() - $start); if ($waited > $lock_wait) { throw $ex; } $this->logActiveWriter($viewer, $repository); } // Wait a little longer before the next message we print. $step_wait = $step_wait + 0.5; $step_wait = min($step_wait, 3); } $waited = (PhabricatorTime::getNow() - $start); if ($waited) { $this->logLine( pht( 'Acquired write lock after %s second(s).', new PhutilNumber($waited))); } else { $this->logLine( pht( 'Acquired write lock immediately.')); } } catch (Exception $ex) { throw new PhutilProxyException( pht( 'Failed to acquire write lock after waiting %s second(s). You '. 'may be able to retry later.', new PhutilNumber($lock_wait)), $ex); } $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( $repository_phid); foreach ($versions as $version) { if (!$version->getIsWriting()) { continue; } throw new Exception( pht( 'An previous write to this repository was interrupted; refusing '. 'new writes. This issue requires operator intervention to resolve, '. 'see "Write Interruptions" in the "Cluster: Repositories" in the '. 'documentation for instructions.')); } + $read_wait_start = microtime(true); try { $max_version = $this->synchronizeWorkingCopyBeforeRead(); } catch (Exception $ex) { $write_lock->unlock(); throw $ex; } + $read_wait_end = microtime(true); $pid = getmypid(); $hash = Filesystem::readRandomCharacters(12); $this->clusterWriteOwner = "{$pid}.{$hash}"; PhabricatorRepositoryWorkingCopyVersion::willWrite( $locked_connection, $repository_phid, $device_phid, array( 'userPHID' => $viewer->getPHID(), 'epoch' => PhabricatorTime::getNow(), 'devicePHID' => $device_phid, ), $this->clusterWriteOwner); $this->clusterWriteVersion = $max_version; $this->clusterWriteLock = $write_lock; + + $write_wait = ($write_wait_end - $write_wait_start); + $read_wait = ($read_wait_end - $read_wait_start); + + $log = $this->logger; + if ($log) { + $log->writeClusterEngineLogProperty('readWait', $read_wait); + $log->writeClusterEngineLogProperty('writeWait', $write_wait); + } } public function synchronizeWorkingCopyAfterDiscovery($new_version) { if (!$this->shouldEnableSynchronization(true)) { return; } $repository = $this->getRepository(); $repository_phid = $repository->getPHID(); if ($repository->isHosted()) { return; } $viewer = $this->getViewer(); $device = AlmanacKeys::getLiveDevice(); $device_phid = $device->getPHID(); // NOTE: We are not holding a lock here because this method is only called // from PhabricatorRepositoryDiscoveryEngine, which already holds a device // lock. Even if we do race here and record an older version, the // consequences are mild: we only do extra work to correct it later. $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( $repository_phid); $versions = mpull($versions, null, 'getDevicePHID'); $this_version = idx($versions, $device_phid); if ($this_version) { $this_version = (int)$this_version->getRepositoryVersion(); } else { $this_version = -1; } if ($new_version > $this_version) { PhabricatorRepositoryWorkingCopyVersion::updateVersion( $repository_phid, $device_phid, $new_version); } } /** * @task sync */ public function synchronizeWorkingCopyAfterWrite() { if (!$this->shouldEnableSynchronization(true)) { return; } if (!$this->clusterWriteLock) { throw new Exception( pht( 'Trying to synchronize after write, but not holding a write '. 'lock!')); } $repository = $this->getRepository(); $repository_phid = $repository->getPHID(); $device = AlmanacKeys::getLiveDevice(); $device_phid = $device->getPHID(); // It is possible that we've lost the global lock while receiving the push. // For example, the master database may have been restarted between the // time we acquired the global lock and now, when the push has finished. // We wrote a durable lock while we were holding the the global lock, // essentially upgrading our lock. We can still safely release this upgraded // lock even if we're no longer holding the global lock. // If we fail to release the lock, the repository will be frozen until // an operator can figure out what happened, so we try pretty hard to // reconnect to the database and release the lock. $now = PhabricatorTime::getNow(); $duration = phutil_units('5 minutes in seconds'); $try_until = $now + $duration; $did_release = false; $already_failed = false; while (PhabricatorTime::getNow() <= $try_until) { try { // NOTE: This means we're still bumping the version when pushes fail. We // could select only un-rejected events instead to bump a little less // often. $new_log = id(new PhabricatorRepositoryPushEventQuery()) ->setViewer(PhabricatorUser::getOmnipotentUser()) ->withRepositoryPHIDs(array($repository_phid)) ->setLimit(1) ->executeOne(); $old_version = $this->clusterWriteVersion; if ($new_log) { $new_version = $new_log->getID(); } else { $new_version = $old_version; } PhabricatorRepositoryWorkingCopyVersion::didWrite( $repository_phid, $device_phid, $this->clusterWriteVersion, $new_version, $this->clusterWriteOwner); $did_release = true; break; } catch (AphrontConnectionQueryException $ex) { $connection_exception = $ex; } catch (AphrontConnectionLostQueryException $ex) { $connection_exception = $ex; } if (!$already_failed) { $already_failed = true; $this->logLine( pht('CRITICAL. Failed to release cluster write lock!')); $this->logLine( pht( 'The connection to the master database was lost while receiving '. 'the write.')); $this->logLine( pht( 'This process will spend %s more second(s) attempting to '. 'recover, then give up.', new PhutilNumber($duration))); } sleep(1); } if ($did_release) { if ($already_failed) { $this->logLine( pht('RECOVERED. Link to master database was restored.')); } $this->logLine(pht('Released cluster write lock.')); } else { throw new Exception( pht( 'Failed to reconnect to master database and release held write '. 'lock ("%s") on device "%s" for repository "%s" after trying '. 'for %s seconds(s). This repository will be frozen.', $this->clusterWriteOwner, $device->getName(), $this->getDisplayName(), new PhutilNumber($duration))); } // We can continue even if we've lost this lock, everything is still // consistent. try { $this->clusterWriteLock->unlock(); } catch (Exception $ex) { // Ignore. } $this->clusterWriteLock = null; $this->clusterWriteOwner = null; } /* -( Internals )---------------------------------------------------------- */ /** * @task internal */ private function shouldEnableSynchronization($require_device) { $repository = $this->getRepository(); $service_phid = $repository->getAlmanacServicePHID(); if (!$service_phid) { return false; } if (!$repository->supportsSynchronization()) { return false; } if ($require_device) { $device = AlmanacKeys::getLiveDevice(); if (!$device) { return false; } } return true; } /** * @task internal */ private function synchornizeWorkingCopyFromRemote() { $repository = $this->getRepository(); $device = AlmanacKeys::getLiveDevice(); $local_path = $repository->getLocalPath(); $fetch_uri = $repository->getRemoteURIEnvelope(); if ($repository->isGit()) { $this->requireWorkingCopy(); $argv = array( 'fetch --prune -- %P %s', $fetch_uri, '+refs/*:refs/*', ); } else { throw new Exception(pht('Remote sync only supported for git!')); } $future = DiffusionCommandEngine::newCommandEngine($repository) ->setArgv($argv) ->setSudoAsDaemon(true) ->setCredentialPHID($repository->getCredentialPHID()) ->setURI($repository->getRemoteURIObject()) ->newFuture(); $future->setCWD($local_path); try { $future->resolvex(); } catch (Exception $ex) { $this->logLine( pht( 'Synchronization of "%s" from remote failed: %s', $device->getName(), $ex->getMessage())); throw $ex; } } /** * @task internal */ private function synchronizeWorkingCopyFromDevices(array $device_phids) { $repository = $this->getRepository(); $service = $repository->loadAlmanacService(); if (!$service) { throw new Exception(pht('Failed to load repository cluster service.')); } $device_map = array_fuse($device_phids); $bindings = $service->getActiveBindings(); $fetchable = array(); foreach ($bindings as $binding) { // We can't fetch from nodes which don't have the newest version. $device_phid = $binding->getDevicePHID(); if (empty($device_map[$device_phid])) { continue; } // TODO: For now, only fetch over SSH. We could support fetching over // HTTP eventually. if ($binding->getAlmanacPropertyValue('protocol') != 'ssh') { continue; } $fetchable[] = $binding; } if (!$fetchable) { throw new Exception( pht( 'Leader lost: no up-to-date nodes in repository cluster are '. 'fetchable.')); } $caught = null; foreach ($fetchable as $binding) { try { $this->synchronizeWorkingCopyFromBinding($binding); $caught = null; break; } catch (Exception $ex) { $caught = $ex; } } if ($caught) { throw $caught; } } /** * @task internal */ private function synchronizeWorkingCopyFromBinding($binding) { $repository = $this->getRepository(); $device = AlmanacKeys::getLiveDevice(); $this->logLine( pht( 'Synchronizing this device ("%s") from cluster leader ("%s") before '. 'read.', $device->getName(), $binding->getDevice()->getName())); $fetch_uri = $repository->getClusterRepositoryURIFromBinding($binding); $local_path = $repository->getLocalPath(); if ($repository->isGit()) { $this->requireWorkingCopy(); $argv = array( 'fetch --prune -- %s %s', $fetch_uri, '+refs/*:refs/*', ); } else { throw new Exception(pht('Binding sync only supported for git!')); } $future = DiffusionCommandEngine::newCommandEngine($repository) ->setArgv($argv) ->setConnectAsDevice(true) ->setSudoAsDaemon(true) ->setURI($fetch_uri) ->newFuture(); $future->setCWD($local_path); try { $future->resolvex(); } catch (Exception $ex) { $this->logLine( pht( 'Synchronization of "%s" from leader "%s" failed: %s', $device->getName(), $binding->getDevice()->getName(), $ex->getMessage())); throw $ex; } } /** * @task internal */ private function logLine($message) { return $this->logText("# {$message}\n"); } /** * @task internal */ private function logText($message) { $log = $this->logger; if ($log) { $log->writeClusterEngineLogMessage($message); } return $this; } private function requireWorkingCopy() { $repository = $this->getRepository(); $local_path = $repository->getLocalPath(); if (!Filesystem::pathExists($local_path)) { $device = AlmanacKeys::getLiveDevice(); throw new Exception( pht( 'Repository "%s" does not have a working copy on this device '. 'yet, so it can not be synchronized. Wait for the daemons to '. 'construct one or run `bin/repository update %s` on this host '. '("%s") to build it explicitly.', $repository->getDisplayName(), $repository->getMonogram(), $device->getName())); } } private function logActiveWriter( PhabricatorUser $viewer, PhabricatorRepository $repository) { $writer = PhabricatorRepositoryWorkingCopyVersion::loadWriter( $repository->getPHID()); if (!$writer) { $this->logLine(pht('Waiting on another user to finish writing...')); return; } $user_phid = $writer->getWriteProperty('userPHID'); $device_phid = $writer->getWriteProperty('devicePHID'); $epoch = $writer->getWriteProperty('epoch'); $phids = array($user_phid, $device_phid); $handles = $viewer->loadHandles($phids); $duration = (PhabricatorTime::getNow() - $epoch) + 1; $this->logLine( pht( 'Waiting for %s to finish writing (on device "%s" for %ss)...', $handles[$user_phid]->getName(), $handles[$device_phid]->getName(), new PhutilNumber($duration))); } } diff --git a/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngineLogInterface.php b/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngineLogInterface.php index 9b1fe9a506..3e43d72779 100644 --- a/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngineLogInterface.php +++ b/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngineLogInterface.php @@ -1,7 +1,8 @@ setName('git-receive-pack'); $this->setArguments( array( array( 'name' => 'dir', 'wildcard' => true, ), )); } protected function executeRepositoryOperations() { + $host_wait_start = microtime(true); + $repository = $this->getRepository(); $viewer = $this->getSSHUser(); $device = AlmanacKeys::getLiveDevice(); // This is a write, and must have write access. $this->requireWriteAccess(); $cluster_engine = id(new DiffusionRepositoryClusterEngine()) ->setViewer($viewer) ->setRepository($repository) ->setLog($this); if ($this->shouldProxy()) { $command = $this->getProxyCommand(); $did_synchronize = false; if ($device) { $this->writeClusterEngineLogMessage( pht( "# Push received by \"%s\", forwarding to cluster host.\n", $device->getName())); } } else { $command = csprintf('git-receive-pack %s', $repository->getLocalPath()); $did_synchronize = true; $cluster_engine->synchronizeWorkingCopyBeforeWrite(); if ($device) { $this->writeClusterEngineLogMessage( pht( "# Ready to receive on cluster host \"%s\".\n", $device->getName())); } } $caught = null; try { $err = $this->executeRepositoryCommand($command); } catch (Exception $ex) { $caught = $ex; } // We've committed the write (or rejected it), so we can release the lock // without waiting for the client to receive the acknowledgement. if ($did_synchronize) { $cluster_engine->synchronizeWorkingCopyAfterWrite(); } if ($caught) { throw $caught; } if (!$err) { $repository->writeStatusMessage( PhabricatorRepositoryStatusMessage::TYPE_NEEDS_UPDATE, PhabricatorRepositoryStatusMessage::CODE_OKAY); $this->waitForGitClient(); + + $host_wait_end = microtime(true); + + $this->updatePushLogWithTimingInformation( + $this->getClusterEngineLogProperty('writeWait'), + $this->getClusterEngineLogProperty('readWait'), + ($host_wait_end - $host_wait_start)); + } return $err; } private function executeRepositoryCommand($command) { $repository = $this->getRepository(); $command = PhabricatorDaemon::sudoCommandAsDaemonUser($command); $future = id(new ExecFuture('%C', $command)) ->setEnv($this->getEnvironment()); return $this->newPassthruCommand() ->setIOChannel($this->getIOChannel()) ->setCommandChannelFromExecFuture($future) ->execute(); } + private function updatePushLogWithTimingInformation( + $write_wait, + $read_wait, + $host_wait) { + + if ($write_wait !== null) { + $write_wait = (int)(1000000 * $write_wait); + } + + if ($read_wait !== null) { + $read_wait = (int)(1000000 * $read_wait); + } + + if ($host_wait !== null) { + $host_wait = (int)(1000000 * $host_wait); + } + + $identifier = $this->getRequestIdentifier(); + + $event = new PhabricatorRepositoryPushEvent(); + $conn = $event->establishConnection('w'); + + queryfx( + $conn, + 'UPDATE %T SET writeWait = %nd, readWait = %nd, hostWait = %nd + WHERE requestIdentifier = %s', + $event->getTableName(), + $write_wait, + $read_wait, + $host_wait, + $identifier); + } + } diff --git a/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php b/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php index 6de16e723d..6bc56d767d 100644 --- a/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php +++ b/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php @@ -1,48 +1,58 @@ getErrorChannel()->update(); } + public function writeClusterEngineLogProperty($key, $value) { + $this->engineLogProperties[$key] = $value; + } + + protected function getClusterEngineLogProperty($key, $default = null) { + return idx($this->engineLogProperties, $key, $default); + } + protected function identifyRepository() { $args = $this->getArgs(); $path = head($args->getArg('dir')); return $this->loadRepositoryWithPath( $path, PhabricatorRepositoryType::REPOSITORY_TYPE_GIT); } protected function waitForGitClient() { $io_channel = $this->getIOChannel(); // If we don't wait for the client to close the connection, `git` will // consider it an early abort and fail. Sit around until Git is comfortable // that it really received all the data. while ($io_channel->isOpenForReading()) { $io_channel->update(); $this->getErrorChannel()->flush(); PhutilChannel::waitForAny(array($io_channel)); } } protected function raiseWrongVCSException( PhabricatorRepository $repository) { throw new Exception( pht( 'This repository ("%s") is not a Git repository. Use "%s" to '. 'interact with this repository.', $repository->getDisplayName(), $repository->getVersionControlSystem())); } } diff --git a/src/applications/repository/query/PhabricatorRepositoryPushLogSearchEngine.php b/src/applications/repository/query/PhabricatorRepositoryPushLogSearchEngine.php index 5bf2c84aeb..7b626fff32 100644 --- a/src/applications/repository/query/PhabricatorRepositoryPushLogSearchEngine.php +++ b/src/applications/repository/query/PhabricatorRepositoryPushLogSearchEngine.php @@ -1,243 +1,255 @@ newQuery(); if ($map['repositoryPHIDs']) { $query->withRepositoryPHIDs($map['repositoryPHIDs']); } if ($map['pusherPHIDs']) { $query->withPusherPHIDs($map['pusherPHIDs']); } if ($map['createdStart'] || $map['createdEnd']) { $query->withEpochBetween( $map['createdStart'], $map['createdEnd']); } return $query; } protected function buildCustomSearchFields() { return array( id(new PhabricatorSearchDatasourceField()) ->setDatasource(new DiffusionRepositoryDatasource()) ->setKey('repositoryPHIDs') ->setAliases(array('repository', 'repositories', 'repositoryPHID')) ->setLabel(pht('Repositories')) ->setDescription( pht('Search for pull logs for specific repositories.')), id(new PhabricatorUsersSearchField()) ->setKey('pusherPHIDs') ->setAliases(array('pusher', 'pushers', 'pusherPHID')) ->setLabel(pht('Pushers')) ->setDescription( pht('Search for pull logs by specific users.')), id(new PhabricatorSearchDateField()) ->setLabel(pht('Created After')) ->setKey('createdStart'), id(new PhabricatorSearchDateField()) ->setLabel(pht('Created Before')) ->setKey('createdEnd'), ); } protected function getURI($path) { return '/diffusion/pushlog/'.$path; } protected function getBuiltinQueryNames() { return array( 'all' => pht('All Push Logs'), ); } public function buildSavedQueryFromBuiltin($query_key) { $query = $this->newSavedQuery(); $query->setQueryKey($query_key); switch ($query_key) { case 'all': return $query; } return parent::buildSavedQueryFromBuiltin($query_key); } protected function renderResultList( array $logs, PhabricatorSavedQuery $query, array $handles) { $table = id(new DiffusionPushLogListView()) ->setViewer($this->requireViewer()) ->setLogs($logs); return id(new PhabricatorApplicationSearchResultView()) ->setTable($table); } protected function newExportFields() { $viewer = $this->requireViewer(); $fields = array( - $fields[] = id(new PhabricatorIDExportField()) + id(new PhabricatorIDExportField()) ->setKey('pushID') ->setLabel(pht('Push ID')), - $fields[] = id(new PhabricatorStringExportField()) + id(new PhabricatorStringExportField()) ->setKey('unique') ->setLabel(pht('Unique')), - $fields[] = id(new PhabricatorStringExportField()) + id(new PhabricatorStringExportField()) ->setKey('protocol') ->setLabel(pht('Protocol')), - $fields[] = id(new PhabricatorPHIDExportField()) + id(new PhabricatorPHIDExportField()) ->setKey('repositoryPHID') ->setLabel(pht('Repository PHID')), - $fields[] = id(new PhabricatorStringExportField()) + id(new PhabricatorStringExportField()) ->setKey('repository') ->setLabel(pht('Repository')), - $fields[] = id(new PhabricatorPHIDExportField()) + id(new PhabricatorPHIDExportField()) ->setKey('pusherPHID') ->setLabel(pht('Pusher PHID')), - $fields[] = id(new PhabricatorStringExportField()) + id(new PhabricatorStringExportField()) ->setKey('pusher') ->setLabel(pht('Pusher')), - $fields[] = id(new PhabricatorPHIDExportField()) + id(new PhabricatorPHIDExportField()) ->setKey('devicePHID') ->setLabel(pht('Device PHID')), - $fields[] = id(new PhabricatorStringExportField()) + id(new PhabricatorStringExportField()) ->setKey('device') ->setLabel(pht('Device')), - $fields[] = id(new PhabricatorStringExportField()) + id(new PhabricatorStringExportField()) ->setKey('type') ->setLabel(pht('Ref Type')), - $fields[] = id(new PhabricatorStringExportField()) + id(new PhabricatorStringExportField()) ->setKey('name') ->setLabel(pht('Ref Name')), - $fields[] = id(new PhabricatorStringExportField()) + id(new PhabricatorStringExportField()) ->setKey('old') ->setLabel(pht('Ref Old')), - $fields[] = id(new PhabricatorStringExportField()) + id(new PhabricatorStringExportField()) ->setKey('new') ->setLabel(pht('Ref New')), - $fields[] = id(new PhabricatorIntExportField()) + id(new PhabricatorIntExportField()) ->setKey('flags') ->setLabel(pht('Flags')), - $fields[] = id(new PhabricatorStringListExportField()) + id(new PhabricatorStringListExportField()) ->setKey('flagNames') ->setLabel(pht('Flag Names')), - $fields[] = id(new PhabricatorIntExportField()) + id(new PhabricatorIntExportField()) ->setKey('result') ->setLabel(pht('Result')), - $fields[] = id(new PhabricatorStringExportField()) + id(new PhabricatorStringExportField()) ->setKey('resultName') ->setLabel(pht('Result Name')), + id(new PhabricatorIntExportField()) + ->setKey('writeWait') + ->setLabel(pht('Write Wait (us)')), + id(new PhabricatorIntExportField()) + ->setKey('readWait') + ->setLabel(pht('Read Wait (us)')), + id(new PhabricatorIntExportField()) + ->setKey('hostWait') + ->setLabel(pht('Host Wait (us)')), ); if ($viewer->getIsAdmin()) { $fields[] = id(new PhabricatorStringExportField()) ->setKey('remoteAddress') ->setLabel(pht('Remote Address')); } return $fields; } protected function newExportData(array $logs) { $viewer = $this->requireViewer(); $phids = array(); foreach ($logs as $log) { $phids[] = $log->getPusherPHID(); $phids[] = $log->getDevicePHID(); $phids[] = $log->getPushEvent()->getRepositoryPHID(); } $handles = $viewer->loadHandles($phids); $flag_map = PhabricatorRepositoryPushLog::getFlagDisplayNames(); $reject_map = PhabricatorRepositoryPushLog::getRejectCodeDisplayNames(); $export = array(); foreach ($logs as $log) { $event = $log->getPushEvent(); $repository_phid = $event->getRepositoryPHID(); if ($repository_phid) { $repository_name = $handles[$repository_phid]->getName(); } else { $repository_name = null; } $pusher_phid = $log->getPusherPHID(); if ($pusher_phid) { $pusher_name = $handles[$pusher_phid]->getName(); } else { $pusher_name = null; } $device_phid = $log->getDevicePHID(); if ($device_phid) { $device_name = $handles[$device_phid]->getName(); } else { $device_name = null; } $flags = $log->getChangeFlags(); $flag_names = array(); foreach ($flag_map as $flag_key => $flag_name) { if (($flags & $flag_key) === $flag_key) { $flag_names[] = $flag_name; } } $result = $event->getRejectCode(); $result_name = idx($reject_map, $result, pht('Unknown ("%s")', $result)); $map = array( 'pushID' => $event->getID(), 'unique' => $event->getRequestIdentifier(), 'protocol' => $event->getRemoteProtocol(), 'repositoryPHID' => $repository_phid, 'repository' => $repository_name, 'pusherPHID' => $pusher_phid, 'pusher' => $pusher_name, 'devicePHID' => $device_phid, 'device' => $device_name, 'type' => $log->getRefType(), 'name' => $log->getRefName(), 'old' => $log->getRefOld(), 'new' => $log->getRefNew(), 'flags' => $flags, 'flagNames' => $flag_names, 'result' => $result, 'resultName' => $result_name, + 'writeWait' => $event->getWriteWait(), + 'readWait' => $event->getReadWait(), + 'hostWait' => $event->getHostWait(), ); if ($viewer->getIsAdmin()) { $map['remoteAddress'] = $event->getRemoteAddress(); } $export[] = $map; } return $export; } } diff --git a/src/applications/repository/storage/PhabricatorRepositoryPushEvent.php b/src/applications/repository/storage/PhabricatorRepositoryPushEvent.php index 369af15635..451f8acda5 100644 --- a/src/applications/repository/storage/PhabricatorRepositoryPushEvent.php +++ b/src/applications/repository/storage/PhabricatorRepositoryPushEvent.php @@ -1,98 +1,104 @@ setPusherPHID($viewer->getPHID()); } protected function getConfiguration() { return array( self::CONFIG_AUX_PHID => true, self::CONFIG_TIMESTAMPS => false, self::CONFIG_COLUMN_SCHEMA => array( 'requestIdentifier' => 'bytes12?', 'remoteAddress' => 'ipaddress?', 'remoteProtocol' => 'text32?', 'rejectCode' => 'uint32', 'rejectDetails' => 'text64?', + 'writeWait' => 'uint64?', + 'readWait' => 'uint64?', + 'hostWait' => 'uint64?', ), self::CONFIG_KEY_SCHEMA => array( 'key_repository' => array( 'columns' => array('repositoryPHID'), ), 'key_request' => array( 'columns' => array('requestIdentifier'), 'unique' => true, ), ), ) + parent::getConfiguration(); } public function generatePHID() { return PhabricatorPHID::generateNewPHID( PhabricatorRepositoryPushEventPHIDType::TYPECONST); } public function attachRepository(PhabricatorRepository $repository) { $this->repository = $repository; return $this; } public function getRepository() { return $this->assertAttached($this->repository); } public function attachLogs(array $logs) { $this->logs = $logs; return $this; } public function getLogs() { return $this->assertAttached($this->logs); } /* -( PhabricatorPolicyInterface )----------------------------------------- */ public function getCapabilities() { return array( PhabricatorPolicyCapability::CAN_VIEW, ); } public function getPolicy($capability) { return $this->getRepository()->getPolicy($capability); } public function hasAutomaticCapability($capability, PhabricatorUser $viewer) { return $this->getRepository()->hasAutomaticCapability($capability, $viewer); } public function describeAutomaticCapability($capability) { return pht( "A repository's push events are visible to users who can see the ". "repository."); } }