diff --git a/src/applications/repository/engine/PhabricatorRepositoryDiscoveryEngine.php b/src/applications/repository/engine/PhabricatorRepositoryDiscoveryEngine.php index 78fa62eff3..3081a9c5a3 100644 --- a/src/applications/repository/engine/PhabricatorRepositoryDiscoveryEngine.php +++ b/src/applications/repository/engine/PhabricatorRepositoryDiscoveryEngine.php @@ -1,929 +1,928 @@ repairMode = $repair_mode; return $this; } public function getRepairMode() { return $this->repairMode; } /** * @task discovery */ public function discoverCommits() { $repository = $this->getRepository(); $lock = $this->newRepositoryLock($repository, 'repo.look', false); try { $lock->lock(); } catch (PhutilLockException $ex) { throw new DiffusionDaemonLockException( pht( 'Another process is currently discovering repository "%s", '. 'skipping discovery.', $repository->getDisplayName())); } try { $result = $this->discoverCommitsWithLock(); } catch (Exception $ex) { $lock->unlock(); throw $ex; } $lock->unlock(); return $result; } private function discoverCommitsWithLock() { $repository = $this->getRepository(); $viewer = $this->getViewer(); $vcs = $repository->getVersionControlSystem(); switch ($vcs) { case PhabricatorRepositoryType::REPOSITORY_TYPE_SVN: $refs = $this->discoverSubversionCommits(); break; case PhabricatorRepositoryType::REPOSITORY_TYPE_MERCURIAL: $refs = $this->discoverMercurialCommits(); break; case PhabricatorRepositoryType::REPOSITORY_TYPE_GIT: $refs = $this->discoverGitCommits(); break; default: throw new Exception(pht("Unknown VCS '%s'!", $vcs)); } if ($this->isInitialImport($refs)) { $this->log( pht( 'Discovered more than %s commit(s) in an empty repository, '. 'marking repository as importing.', new PhutilNumber(PhabricatorRepository::IMPORT_THRESHOLD))); $repository->markImporting(); } // Clear the working set cache. $this->workingSet = array(); $task_priority = $this->getImportTaskPriority($repository, $refs); // Record discovered commits and mark them in the cache. foreach ($refs as $ref) { $this->recordCommit( $repository, $ref->getIdentifier(), $ref->getEpoch(), $ref->getIsPermanent(), $ref->getParents(), $task_priority); $this->commitCache[$ref->getIdentifier()] = true; } $this->markUnreachableCommits($repository); $version = $this->getObservedVersion($repository); if ($version !== null) { id(new DiffusionRepositoryClusterEngine()) ->setViewer($viewer) ->setRepository($repository) ->synchronizeWorkingCopyAfterDiscovery($version); } return $refs; } /* -( Discovering Git Repositories )--------------------------------------- */ /** * @task git */ private function discoverGitCommits() { $repository = $this->getRepository(); $publisher = $repository->newPublisher(); $heads = id(new DiffusionLowLevelGitRefQuery()) ->setRepository($repository) ->execute(); if (!$heads) { // This repository has no heads at all, so we don't need to do // anything. Generally, this means the repository is empty. return array(); } $this->log( pht( 'Discovering commits in repository "%s".', $repository->getDisplayName())); $ref_lists = array(); $head_groups = $this->getRefGroupsForDiscovery($heads); foreach ($head_groups as $head_group) { $group_identifiers = mpull($head_group, 'getCommitIdentifier'); $group_identifiers = array_fuse($group_identifiers); $this->fillCommitCache($group_identifiers); foreach ($head_group as $ref) { $name = $ref->getShortName(); $commit = $ref->getCommitIdentifier(); $this->log( pht( 'Examining "%s" (%s) at "%s".', $name, $ref->getRefType(), $commit)); if (!$repository->shouldTrackRef($ref)) { $this->log(pht('Skipping, ref is untracked.')); continue; } if ($this->isKnownCommit($commit)) { $this->log(pht('Skipping, HEAD is known.')); continue; } // In Git, it's possible to tag anything. We just skip tags that don't // point to a commit. See T11301. $fields = $ref->getRawFields(); $ref_type = idx($fields, 'objecttype'); $tag_type = idx($fields, '*objecttype'); if ($ref_type != 'commit' && $tag_type != 'commit') { $this->log(pht('Skipping, this is not a commit.')); continue; } $this->log(pht('Looking for new commits.')); $head_refs = $this->discoverStreamAncestry( new PhabricatorGitGraphStream($repository, $commit), $commit, $publisher->isPermanentRef($ref)); $this->didDiscoverRefs($head_refs); $ref_lists[] = $head_refs; } } $refs = array_mergev($ref_lists); return $refs; } /** * @task git */ private function getRefGroupsForDiscovery(array $heads) { $heads = $this->sortRefs($heads); // See T13593. We hold a commit cache with a fixed maximum size. Split the // refs into chunks no larger than the cache size, so we don't overflow the // cache when testing them. $array_iterator = new ArrayIterator($heads); $chunk_iterator = new PhutilChunkedIterator( $array_iterator, self::MAX_COMMIT_CACHE_SIZE); return $chunk_iterator; } /* -( Discovering Subversion Repositories )-------------------------------- */ /** * @task svn */ private function discoverSubversionCommits() { $repository = $this->getRepository(); if (!$repository->isHosted()) { $this->verifySubversionRoot($repository); } $upper_bound = null; $limit = 1; $refs = array(); do { // Find all the unknown commits on this path. Note that we permit // importing an SVN subdirectory rather than the entire repository, so // commits may be nonsequential. if ($upper_bound === null) { $at_rev = 'HEAD'; } else { $at_rev = ($upper_bound - 1); } try { list($xml, $stderr) = $repository->execxRemoteCommand( 'log --xml --quiet --limit %d %s', $limit, $repository->getSubversionBaseURI($at_rev)); } catch (CommandException $ex) { $stderr = $ex->getStderr(); if (preg_match('/(path|File) not found/', $stderr)) { // We've gone all the way back through history and this path was not // affected by earlier commits. break; } throw $ex; } $xml = phutil_utf8ize($xml); $log = new SimpleXMLElement($xml); foreach ($log->logentry as $entry) { $identifier = (int)$entry['revision']; $epoch = (int)strtotime((string)$entry->date[0]); $refs[$identifier] = id(new PhabricatorRepositoryCommitRef()) ->setIdentifier($identifier) ->setEpoch($epoch) ->setIsPermanent(true); if ($upper_bound === null) { $upper_bound = $identifier; } else { $upper_bound = min($upper_bound, $identifier); } } // Discover 2, 4, 8, ... 256 logs at a time. This allows us to initially // import large repositories fairly quickly, while pulling only as much // data as we need in the common case (when we've already imported the // repository and are just grabbing one commit at a time). $limit = min($limit * 2, 256); } while ($upper_bound > 1 && !$this->isKnownCommit($upper_bound)); krsort($refs); while ($refs && $this->isKnownCommit(last($refs)->getIdentifier())) { array_pop($refs); } $refs = array_reverse($refs); $this->didDiscoverRefs($refs); return $refs; } private function verifySubversionRoot(PhabricatorRepository $repository) { list($xml) = $repository->execxRemoteCommand( 'info --xml %s', $repository->getSubversionPathURI()); $xml = phutil_utf8ize($xml); $xml = new SimpleXMLElement($xml); $remote_root = (string)($xml->entry[0]->repository[0]->root[0]); $expect_root = $repository->getSubversionPathURI(); $normal_type_svn = ArcanistRepositoryURINormalizer::TYPE_SVN; $remote_normal = id(new ArcanistRepositoryURINormalizer( $normal_type_svn, $remote_root))->getNormalizedPath(); $expect_normal = id(new ArcanistRepositoryURINormalizer( $normal_type_svn, $expect_root))->getNormalizedPath(); if ($remote_normal != $expect_normal) { throw new Exception( pht( 'Repository "%s" does not have a correctly configured remote URI. '. 'The remote URI for a Subversion repository MUST point at the '. 'repository root. The root for this repository is "%s", but the '. 'configured URI is "%s". To resolve this error, set the remote URI '. 'to point at the repository root. If you want to import only part '. 'of a Subversion repository, use the "Import Only" option.', $repository->getDisplayName(), $remote_root, $expect_root)); } } /* -( Discovering Mercurial Repositories )--------------------------------- */ /** * @task hg */ private function discoverMercurialCommits() { $repository = $this->getRepository(); $branches = id(new DiffusionLowLevelMercurialBranchesQuery()) ->setRepository($repository) ->execute(); $this->fillCommitCache(mpull($branches, 'getCommitIdentifier')); $refs = array(); foreach ($branches as $branch) { // NOTE: Mercurial branches may have multiple heads, so the names may // not be unique. $name = $branch->getShortName(); $commit = $branch->getCommitIdentifier(); $this->log(pht('Examining branch "%s" head "%s".', $name, $commit)); if (!$repository->shouldTrackBranch($name)) { $this->log(pht('Skipping, branch is untracked.')); continue; } if ($this->isKnownCommit($commit)) { $this->log(pht('Skipping, this head is a known commit.')); continue; } $this->log(pht('Looking for new commits.')); $branch_refs = $this->discoverStreamAncestry( new PhabricatorMercurialGraphStream($repository, $commit), $commit, $is_permanent = true); $this->didDiscoverRefs($branch_refs); $refs[] = $branch_refs; } return array_mergev($refs); } /* -( Internals )---------------------------------------------------------- */ private function discoverStreamAncestry( PhabricatorRepositoryGraphStream $stream, $commit, $is_permanent) { $discover = array($commit); $graph = array(); $seen = array(); // Find all the reachable, undiscovered commits. Build a graph of the // edges. while ($discover) { $target = array_pop($discover); if (empty($graph[$target])) { $graph[$target] = array(); } $parents = $stream->getParents($target); foreach ($parents as $parent) { if ($this->isKnownCommit($parent)) { continue; } $graph[$target][$parent] = true; if (empty($seen[$parent])) { $seen[$parent] = true; $discover[] = $parent; } } } // Now, sort them topologically. $commits = $this->reduceGraph($graph); $refs = array(); foreach ($commits as $commit) { $epoch = $stream->getCommitDate($commit); // If the epoch doesn't fit into a uint32, treat it as though it stores // the current time. For discussion, see T11537. if ($epoch > 0xFFFFFFFF) { $epoch = PhabricatorTime::getNow(); } // If the epoch is not present at all, treat it as though it stores the // value "0". For discussion, see T12062. This behavior is consistent // with the behavior of "git show". if (!strlen($epoch)) { $epoch = 0; } $refs[] = id(new PhabricatorRepositoryCommitRef()) ->setIdentifier($commit) ->setEpoch($epoch) ->setIsPermanent($is_permanent) ->setParents($stream->getParents($commit)); } return $refs; } private function reduceGraph(array $edges) { foreach ($edges as $commit => $parents) { $edges[$commit] = array_keys($parents); } $graph = new PhutilDirectedScalarGraph(); $graph->addNodes($edges); $commits = $graph->getNodesInTopologicalOrder(); // NOTE: We want the most ancestral nodes first, so we need to reverse the // list we get out of AbstractDirectedGraph. $commits = array_reverse($commits); return $commits; } private function isKnownCommit($identifier) { if (isset($this->commitCache[$identifier])) { return true; } if (isset($this->workingSet[$identifier])) { return true; } $this->fillCommitCache(array($identifier)); return isset($this->commitCache[$identifier]); } private function fillCommitCache(array $identifiers) { if (!$identifiers) { return; } if ($this->repairMode) { // In repair mode, rediscover the entire repository, ignoring the // database state. The engine still maintains a local cache (the // "Working Set") but we just give up before looking in the database. return; } $max_size = self::MAX_COMMIT_CACHE_SIZE; // If we're filling more identifiers than would fit in the cache, ignore // the ones that don't fit. Because the cache is FIFO, overfilling it can // cause the entire cache to miss. See T12296. if (count($identifiers) > $max_size) { $identifiers = array_slice($identifiers, 0, $max_size); } // When filling the cache we ignore commits which have been marked as // unreachable, treating them as though they do not exist. When recording // commits later we'll revive commits that exist but are unreachable. $commits = id(new PhabricatorRepositoryCommit())->loadAllWhere( 'repositoryID = %d AND commitIdentifier IN (%Ls) AND (importStatus & %d) != %d', $this->getRepository()->getID(), $identifiers, PhabricatorRepositoryCommit::IMPORTED_UNREACHABLE, PhabricatorRepositoryCommit::IMPORTED_UNREACHABLE); foreach ($commits as $commit) { $this->commitCache[$commit->getCommitIdentifier()] = true; } while (count($this->commitCache) > $max_size) { array_shift($this->commitCache); } } /** * Sort refs so we process permanent refs first. This makes the whole import * process a little cheaper, since we can publish these commits the first * time through rather than catching them in the refs step. * * @task internal * * @param list List of refs. * @return list Sorted list of refs. */ private function sortRefs(array $refs) { $repository = $this->getRepository(); $publisher = $repository->newPublisher(); $head_refs = array(); $tail_refs = array(); foreach ($refs as $ref) { if ($publisher->isPermanentRef($ref)) { $head_refs[] = $ref; } else { $tail_refs[] = $ref; } } return array_merge($head_refs, $tail_refs); } private function recordCommit( PhabricatorRepository $repository, $commit_identifier, $epoch, $is_permanent, array $parents, $task_priority) { $commit = new PhabricatorRepositoryCommit(); $conn_w = $repository->establishConnection('w'); // First, try to revive an existing unreachable commit (if one exists) by // removing the "unreachable" flag. If we succeed, we don't need to do // anything else: we already discovered this commit some time ago. queryfx( $conn_w, 'UPDATE %T SET importStatus = (importStatus & ~%d) WHERE repositoryID = %d AND commitIdentifier = %s', $commit->getTableName(), PhabricatorRepositoryCommit::IMPORTED_UNREACHABLE, $repository->getID(), $commit_identifier); if ($conn_w->getAffectedRows()) { $commit = $commit->loadOneWhere( 'repositoryID = %d AND commitIdentifier = %s', $repository->getID(), $commit_identifier); // After reviving a commit, schedule new daemons for it. $this->didDiscoverCommit($repository, $commit, $epoch, $task_priority); return; } $commit->setRepositoryID($repository->getID()); $commit->setCommitIdentifier($commit_identifier); $commit->setEpoch($epoch); if ($is_permanent) { $commit->setImportStatus(PhabricatorRepositoryCommit::IMPORTED_PERMANENT); } $data = new PhabricatorRepositoryCommitData(); try { // If this commit has parents, look up their IDs. The parent commits // should always exist already. $parent_ids = array(); if ($parents) { $parent_rows = queryfx_all( $conn_w, 'SELECT id, commitIdentifier FROM %T WHERE commitIdentifier IN (%Ls) AND repositoryID = %d', $commit->getTableName(), $parents, $repository->getID()); $parent_map = ipull($parent_rows, 'id', 'commitIdentifier'); foreach ($parents as $parent) { if (empty($parent_map[$parent])) { throw new Exception( pht('Unable to identify parent "%s"!', $parent)); } $parent_ids[] = $parent_map[$parent]; } } else { // Write an explicit 0 so we can distinguish between "really no // parents" and "data not available". if (!$repository->isSVN()) { $parent_ids = array(0); } } $commit->openTransaction(); $commit->save(); $data->setCommitID($commit->getID()); $data->save(); foreach ($parent_ids as $parent_id) { queryfx( $conn_w, 'INSERT IGNORE INTO %T (childCommitID, parentCommitID) VALUES (%d, %d)', PhabricatorRepository::TABLE_PARENTS, $commit->getID(), $parent_id); } $commit->saveTransaction(); $this->didDiscoverCommit($repository, $commit, $epoch, $task_priority); if ($this->repairMode) { // Normally, the query should throw a duplicate key exception. If we // reach this in repair mode, we've actually performed a repair. $this->log(pht('Repaired commit "%s".', $commit_identifier)); } PhutilEventEngine::dispatchEvent( new PhabricatorEvent( PhabricatorEventType::TYPE_DIFFUSION_DIDDISCOVERCOMMIT, array( 'repository' => $repository, 'commit' => $commit, ))); } catch (AphrontDuplicateKeyQueryException $ex) { $commit->killTransaction(); // Ignore. This can happen because we discover the same new commit // more than once when looking at history, or because of races or // data inconsistency or cosmic radiation; in any case, we're still // in a good state if we ignore the failure. } } private function didDiscoverCommit( PhabricatorRepository $repository, PhabricatorRepositoryCommit $commit, $epoch, $task_priority) { $this->queueCommitImportTask( $repository, - $commit->getID(), $commit->getPHID(), $task_priority, $via = 'discovery'); // Update the repository summary table. queryfx( $commit->establishConnection('w'), 'INSERT INTO %T (repositoryID, size, lastCommitID, epoch) VALUES (%d, 1, %d, %d) ON DUPLICATE KEY UPDATE size = size + 1, lastCommitID = IF(VALUES(epoch) > epoch, VALUES(lastCommitID), lastCommitID), epoch = IF(VALUES(epoch) > epoch, VALUES(epoch), epoch)', PhabricatorRepository::TABLE_SUMMARY, $repository->getID(), $commit->getID(), $epoch); } private function didDiscoverRefs(array $refs) { foreach ($refs as $ref) { $this->workingSet[$ref->getIdentifier()] = true; } } private function isInitialImport(array $refs) { $commit_count = count($refs); if ($commit_count <= PhabricatorRepository::IMPORT_THRESHOLD) { // If we fetched a small number of commits, assume it's an initial // commit or a stack of a few initial commits. return false; } $viewer = $this->getViewer(); $repository = $this->getRepository(); $any_commits = id(new DiffusionCommitQuery()) ->setViewer($viewer) ->withRepository($repository) ->setLimit(1) ->execute(); if ($any_commits) { // If the repository already has commits, this isn't an import. return false; } return true; } private function getObservedVersion(PhabricatorRepository $repository) { if ($repository->isHosted()) { return null; } if ($repository->isGit()) { return $this->getGitObservedVersion($repository); } return null; } private function getGitObservedVersion(PhabricatorRepository $repository) { $refs = id(new DiffusionLowLevelGitRefQuery()) ->setRepository($repository) ->execute(); if (!$refs) { return null; } // In Git, the observed version is the most recently discovered commit // at any repository HEAD. It's possible for this to regress temporarily // if a branch is pushed and then deleted. This is acceptable because it // doesn't do anything meaningfully bad and will fix itself on the next // push. $ref_identifiers = mpull($refs, 'getCommitIdentifier'); $ref_identifiers = array_fuse($ref_identifiers); $version = queryfx_one( $repository->establishConnection('w'), 'SELECT MAX(id) version FROM %T WHERE repositoryID = %d AND commitIdentifier IN (%Ls)', id(new PhabricatorRepositoryCommit())->getTableName(), $repository->getID(), $ref_identifiers); if (!$version) { return null; } return (int)$version['version']; } private function markUnreachableCommits(PhabricatorRepository $repository) { // For now, this is only supported for Git. if (!$repository->isGit()) { return; } // Find older versions of refs which we haven't processed yet. We're going // to make sure their commits are still reachable. $old_refs = id(new PhabricatorRepositoryOldRef())->loadAllWhere( 'repositoryPHID = %s', $repository->getPHID()); // If we don't have any refs to update, bail out before building a graph // stream. In particular, this improves behavior in empty repositories, // where `git log` exits with an error. if (!$old_refs) { return; } // We can share a single graph stream across all the checks we need to do. $stream = new PhabricatorGitGraphStream($repository); foreach ($old_refs as $old_ref) { $identifier = $old_ref->getCommitIdentifier(); $this->markUnreachableFrom($repository, $stream, $identifier); // If nothing threw an exception, we're all done with this ref. $old_ref->delete(); } } private function markUnreachableFrom( PhabricatorRepository $repository, PhabricatorGitGraphStream $stream, $identifier) { $unreachable = array(); $commit = id(new PhabricatorRepositoryCommit())->loadOneWhere( 'repositoryID = %s AND commitIdentifier = %s', $repository->getID(), $identifier); if (!$commit) { return; } $look = array($commit); $seen = array(); while ($look) { $target = array_pop($look); // If we've already checked this commit (for example, because history // branches and then merges) we don't need to check it again. $target_identifier = $target->getCommitIdentifier(); if (isset($seen[$target_identifier])) { continue; } $seen[$target_identifier] = true; // See PHI1688. If this commit is already marked as unreachable, we don't // need to consider its ancestors. This may skip a lot of work if many // branches with a lot of shared ancestry are deleted at the same time. if ($target->isUnreachable()) { continue; } try { $stream->getCommitDate($target_identifier); $reachable = true; } catch (Exception $ex) { $reachable = false; } if ($reachable) { // This commit is reachable, so we don't need to go any further // down this road. continue; } $unreachable[] = $target; // Find the commit's parents and check them for reachability, too. We // have to look in the database since we no may longer have the commit // in the repository. $rows = queryfx_all( $commit->establishConnection('w'), 'SELECT commit.* FROM %T commit JOIN %T parents ON commit.id = parents.parentCommitID WHERE parents.childCommitID = %d', $commit->getTableName(), PhabricatorRepository::TABLE_PARENTS, $target->getID()); if (!$rows) { continue; } $parents = id(new PhabricatorRepositoryCommit()) ->loadAllFromArray($rows); foreach ($parents as $parent) { $look[] = $parent; } } $unreachable = array_reverse($unreachable); $flag = PhabricatorRepositoryCommit::IMPORTED_UNREACHABLE; foreach ($unreachable as $unreachable_commit) { $unreachable_commit->writeImportStatusFlag($flag); } // If anything was unreachable, just rebuild the whole summary table. // We can't really update it incrementally when a commit becomes // unreachable. if ($unreachable) { $this->rebuildSummaryTable($repository); } } private function rebuildSummaryTable(PhabricatorRepository $repository) { $conn_w = $repository->establishConnection('w'); $data = queryfx_one( $conn_w, 'SELECT COUNT(*) N, MAX(id) id, MAX(epoch) epoch FROM %T WHERE repositoryID = %d AND (importStatus & %d) != %d', id(new PhabricatorRepositoryCommit())->getTableName(), $repository->getID(), PhabricatorRepositoryCommit::IMPORTED_UNREACHABLE, PhabricatorRepositoryCommit::IMPORTED_UNREACHABLE); queryfx( $conn_w, 'INSERT INTO %T (repositoryID, size, lastCommitID, epoch) VALUES (%d, %d, %d, %d) ON DUPLICATE KEY UPDATE size = VALUES(size), lastCommitID = VALUES(lastCommitID), epoch = VALUES(epoch)', PhabricatorRepository::TABLE_SUMMARY, $repository->getID(), $data['N'], $data['id'], $data['epoch']); } } diff --git a/src/applications/repository/engine/PhabricatorRepositoryEngine.php b/src/applications/repository/engine/PhabricatorRepositoryEngine.php index 4d89d03140..edf54d1d2b 100644 --- a/src/applications/repository/engine/PhabricatorRepositoryEngine.php +++ b/src/applications/repository/engine/PhabricatorRepositoryEngine.php @@ -1,197 +1,196 @@ repository = $repository; return $this; } /** * @task config */ protected function getRepository() { if ($this->repository === null) { throw new PhutilInvalidStateException('setRepository'); } return $this->repository; } /** * @task config */ public function setVerbose($verbose) { $this->verbose = $verbose; return $this; } /** * @task config */ public function getVerbose() { return $this->verbose; } public function getViewer() { return PhabricatorUser::getOmnipotentUser(); } protected function newRepositoryLock( PhabricatorRepository $repository, $lock_key, $lock_device_only) { $lock_parts = array( 'repositoryPHID' => $repository->getPHID(), ); if ($lock_device_only) { $device = AlmanacKeys::getLiveDevice(); if ($device) { $lock_parts['devicePHID'] = $device->getPHID(); } } return PhabricatorGlobalLock::newLock($lock_key, $lock_parts); } /** * @task internal */ protected function log($pattern /* ... */) { if ($this->getVerbose()) { $console = PhutilConsole::getConsole(); $argv = func_get_args(); array_unshift($argv, "%s\n"); call_user_func_array(array($console, 'writeOut'), $argv); } return $this; } final protected function queueCommitImportTask( PhabricatorRepository $repository, - $commit_id, $commit_phid, $task_priority, - $via = null) { + $via) { $vcs = $repository->getVersionControlSystem(); switch ($vcs) { case PhabricatorRepositoryType::REPOSITORY_TYPE_GIT: $class = 'PhabricatorRepositoryGitCommitMessageParserWorker'; break; case PhabricatorRepositoryType::REPOSITORY_TYPE_SVN: $class = 'PhabricatorRepositorySvnCommitMessageParserWorker'; break; case PhabricatorRepositoryType::REPOSITORY_TYPE_MERCURIAL: $class = 'PhabricatorRepositoryMercurialCommitMessageParserWorker'; break; default: throw new Exception( pht( 'Unknown repository type "%s"!', $vcs)); } $data = array( - 'commitID' => $commit_id, + 'commitPHID' => $commit_phid, ); if ($via !== null) { $data['via'] = $via; } $options = array( 'priority' => $task_priority, 'objectPHID' => $commit_phid, 'containerPHID' => $repository->getPHID(), ); PhabricatorWorker::scheduleTask($class, $data, $options); } final protected function getImportTaskPriority( PhabricatorRepository $repository, array $refs) { assert_instances_of($refs, 'PhabricatorRepositoryCommitRef'); // If the repository is importing for the first time, we schedule tasks // at IMPORT priority, which is very low. Making progress on importing a // new repository for the first time is less important than any other // daemon task. // If the repository has finished importing and we're just catching up // on recent commits, we usually schedule discovery at COMMIT priority, // which is slightly below the default priority. // Note that followup tasks and triggered tasks (like those generated by // Herald or Harbormaster) will queue at DEFAULT priority, so that each // commit tends to fully import before we start the next one. This tends // to give imports fairly predictable progress. See T11677 for some // discussion. if ($repository->isImporting()) { $this->log( pht( 'Importing %s commit(s) at low priority ("PRIORITY_IMPORT") '. 'because this repository is still importing.', phutil_count($refs))); return PhabricatorWorker::PRIORITY_IMPORT; } // See T13369. If we've discovered a lot of commits at once, import them // at lower priority. // This is mostly aimed at reducing the impact that synchronizing thousands // of commits from a remote upstream has on other repositories. The queue // is "mostly FIFO", so queueing a thousand commit imports can stall other // repositories. // In a perfect world we'd probably give repositories round-robin queue // priority, but we don't currently have the primitives for this and there // isn't a strong case for building them. // Use "a whole lot of commits showed up at once" as a heuristic for // detecting "someone synchronized an upstream", and import them at a lower // priority to more closely approximate fair scheduling. if (count($refs) >= PhabricatorRepository::LOWPRI_THRESHOLD) { $this->log( pht( 'Importing %s commit(s) at low priority ("PRIORITY_IMPORT") '. 'because many commits were discovered at once.', phutil_count($refs))); return PhabricatorWorker::PRIORITY_IMPORT; } // Otherwise, import at normal priority. if ($refs) { $this->log( pht( 'Importing %s commit(s) at normal priority ("PRIORITY_COMMIT").', phutil_count($refs))); } return PhabricatorWorker::PRIORITY_COMMIT; } } diff --git a/src/applications/repository/engine/PhabricatorRepositoryRefEngine.php b/src/applications/repository/engine/PhabricatorRepositoryRefEngine.php index aafea1f81a..60a96578a3 100644 --- a/src/applications/repository/engine/PhabricatorRepositoryRefEngine.php +++ b/src/applications/repository/engine/PhabricatorRepositoryRefEngine.php @@ -1,731 +1,730 @@ rebuild = $rebuild; return $this; } public function getRebuild() { return $this->rebuild; } public function updateRefs() { $this->newPositions = array(); $this->deadPositions = array(); $this->permanentCommits = array(); $repository = $this->getRepository(); $viewer = $this->getViewer(); $branches_may_close = false; $vcs = $repository->getVersionControlSystem(); switch ($vcs) { case PhabricatorRepositoryType::REPOSITORY_TYPE_SVN: // No meaningful refs of any type in Subversion. $maps = array(); break; case PhabricatorRepositoryType::REPOSITORY_TYPE_MERCURIAL: $branches = $this->loadMercurialBranchPositions($repository); $bookmarks = $this->loadMercurialBookmarkPositions($repository); $maps = array( PhabricatorRepositoryRefCursor::TYPE_BRANCH => $branches, PhabricatorRepositoryRefCursor::TYPE_BOOKMARK => $bookmarks, ); $branches_may_close = true; break; case PhabricatorRepositoryType::REPOSITORY_TYPE_GIT: $maps = $this->loadGitRefPositions($repository); break; default: throw new Exception(pht('Unknown VCS "%s"!', $vcs)); } // Fill in any missing types with empty lists. $maps = $maps + array( PhabricatorRepositoryRefCursor::TYPE_BRANCH => array(), PhabricatorRepositoryRefCursor::TYPE_TAG => array(), PhabricatorRepositoryRefCursor::TYPE_BOOKMARK => array(), PhabricatorRepositoryRefCursor::TYPE_REF => array(), ); $all_cursors = id(new PhabricatorRepositoryRefCursorQuery()) ->setViewer($viewer) ->withRepositoryPHIDs(array($repository->getPHID())) ->needPositions(true) ->execute(); $cursor_groups = mgroup($all_cursors, 'getRefType'); // Find all the heads of permanent refs. $all_closing_heads = array(); foreach ($all_cursors as $cursor) { // See T13284. Note that we're considering whether this ref was a // permanent ref or not the last time we updated refs for this // repository. This allows us to handle things properly when a ref // is reconfigured from non-permanent to permanent. $was_permanent = $cursor->getIsPermanent(); if (!$was_permanent) { continue; } foreach ($cursor->getPositionIdentifiers() as $identifier) { $all_closing_heads[] = $identifier; } } $all_closing_heads = array_unique($all_closing_heads); $all_closing_heads = $this->removeMissingCommits($all_closing_heads); foreach ($maps as $type => $refs) { $cursor_group = idx($cursor_groups, $type, array()); $this->updateCursors($cursor_group, $refs, $type, $all_closing_heads); } if ($this->permanentCommits) { $this->setPermanentFlagOnCommits($this->permanentCommits); } $save_cursors = $this->getCursorsForUpdate($repository, $all_cursors); if ($this->newPositions || $this->deadPositions || $save_cursors) { $repository->openTransaction(); $this->saveNewPositions(); $this->deleteDeadPositions(); foreach ($save_cursors as $cursor) { $cursor->save(); } $repository->saveTransaction(); } $branches = $maps[PhabricatorRepositoryRefCursor::TYPE_BRANCH]; if ($branches && $branches_may_close) { $this->updateBranchStates($repository, $branches); } } private function getCursorsForUpdate( PhabricatorRepository $repository, array $cursors) { assert_instances_of($cursors, 'PhabricatorRepositoryRefCursor'); $publisher = $repository->newPublisher(); $results = array(); foreach ($cursors as $cursor) { $diffusion_ref = $cursor->newDiffusionRepositoryRef(); $is_permanent = $publisher->isPermanentRef($diffusion_ref); if ($is_permanent == $cursor->getIsPermanent()) { continue; } $cursor->setIsPermanent((int)$is_permanent); $results[] = $cursor; } return $results; } private function updateBranchStates( PhabricatorRepository $repository, array $branches) { assert_instances_of($branches, 'DiffusionRepositoryRef'); $viewer = $this->getViewer(); $all_cursors = id(new PhabricatorRepositoryRefCursorQuery()) ->setViewer($viewer) ->withRepositoryPHIDs(array($repository->getPHID())) ->needPositions(true) ->execute(); $state_map = array(); $type_branch = PhabricatorRepositoryRefCursor::TYPE_BRANCH; foreach ($all_cursors as $cursor) { if ($cursor->getRefType() !== $type_branch) { continue; } $raw_name = $cursor->getRefNameRaw(); foreach ($cursor->getPositions() as $position) { $hash = $position->getCommitIdentifier(); $state_map[$raw_name][$hash] = $position; } } $updates = array(); foreach ($branches as $branch) { $position = idx($state_map, $branch->getShortName(), array()); $position = idx($position, $branch->getCommitIdentifier()); if (!$position) { continue; } $fields = $branch->getRawFields(); $position_state = (bool)$position->getIsClosed(); $branch_state = (bool)idx($fields, 'closed'); if ($position_state != $branch_state) { $updates[$position->getID()] = (int)$branch_state; } } if ($updates) { $position_table = id(new PhabricatorRepositoryRefPosition()); $conn = $position_table->establishConnection('w'); $position_table->openTransaction(); foreach ($updates as $position_id => $branch_state) { queryfx( $conn, 'UPDATE %T SET isClosed = %d WHERE id = %d', $position_table->getTableName(), $branch_state, $position_id); } $position_table->saveTransaction(); } } private function markPositionNew( PhabricatorRepositoryRefPosition $position) { $this->newPositions[] = $position; return $this; } private function markPositionDead( PhabricatorRepositoryRefPosition $position) { $this->deadPositions[] = $position; return $this; } private function markPermanentCommits(array $identifiers) { foreach ($identifiers as $identifier) { $this->permanentCommits[$identifier] = $identifier; } return $this; } /** * Remove commits which no longer exist in the repository from a list. * * After a force push and garbage collection, we may have branch cursors which * point at commits which no longer exist. This can make commands issued later * fail. See T5839 for discussion. * * @param list List of commit identifiers. * @return list List with nonexistent identifiers removed. */ private function removeMissingCommits(array $identifiers) { if (!$identifiers) { return array(); } $resolved = id(new DiffusionLowLevelResolveRefsQuery()) ->setRepository($this->getRepository()) ->withRefs($identifiers) ->execute(); foreach ($identifiers as $key => $identifier) { if (empty($resolved[$identifier])) { unset($identifiers[$key]); } } return $identifiers; } private function updateCursors( array $cursors, array $new_refs, $ref_type, array $all_closing_heads) { $repository = $this->getRepository(); $publisher = $repository->newPublisher(); // NOTE: Mercurial branches may have multiple branch heads; this logic // is complex primarily to account for that. $cursors = mpull($cursors, null, 'getRefNameRaw'); // Group all the new ref values by their name. As above, these groups may // have multiple members in Mercurial. $ref_groups = mgroup($new_refs, 'getShortName'); foreach ($ref_groups as $name => $refs) { $new_commits = mpull($refs, 'getCommitIdentifier', 'getCommitIdentifier'); $ref_cursor = idx($cursors, $name); if ($ref_cursor) { $old_positions = $ref_cursor->getPositions(); } else { $old_positions = array(); } // We're going to delete all the cursors pointing at commits which are // no longer associated with the refs. This primarily makes the Mercurial // multiple head case easier, and means that when we update a ref we // delete the old one and write a new one. foreach ($old_positions as $old_position) { $hash = $old_position->getCommitIdentifier(); if (isset($new_commits[$hash])) { // This ref previously pointed at this commit, and still does. $this->log( pht( 'Ref %s "%s" still points at %s.', $ref_type, $name, $hash)); continue; } // This ref previously pointed at this commit, but no longer does. $this->log( pht( 'Ref %s "%s" no longer points at %s.', $ref_type, $name, $hash)); // Nuke the obsolete cursor. $this->markPositionDead($old_position); } // Now, we're going to insert new cursors for all the commits which are // associated with this ref that don't currently have cursors. $old_commits = mpull($old_positions, 'getCommitIdentifier'); $old_commits = array_fuse($old_commits); $added_commits = array_diff_key($new_commits, $old_commits); foreach ($added_commits as $identifier) { $this->log( pht( 'Ref %s "%s" now points at %s.', $ref_type, $name, $identifier)); if (!$ref_cursor) { // If this is the first time we've seen a particular ref (for // example, a new branch) we need to insert a RefCursor record // for it before we can insert a RefPosition. $ref_cursor = $this->newRefCursor( $repository, $ref_type, $name); } $new_position = id(new PhabricatorRepositoryRefPosition()) ->setCursorID($ref_cursor->getID()) ->setCommitIdentifier($identifier) ->setIsClosed(0); $this->markPositionNew($new_position); } if ($publisher->isPermanentRef(head($refs))) { // See T13284. If this cursor was already marked as permanent, we // only need to publish the newly created ref positions. However, if // this cursor was not previously permanent but has become permanent, // we need to publish all the ref positions. // This corresponds to users reconfiguring a branch to make it // permanent without pushing any new commits to it. $is_rebuild = $this->getRebuild(); $was_permanent = $ref_cursor->getIsPermanent(); if ($is_rebuild || !$was_permanent) { $update_all = true; } else { $update_all = false; } if ($update_all) { $update_commits = $new_commits; } else { $update_commits = $added_commits; } if ($is_rebuild) { $exclude = array(); } else { $exclude = $all_closing_heads; } foreach ($update_commits as $identifier) { $new_identifiers = $this->loadNewCommitIdentifiers( $identifier, $exclude); $this->markPermanentCommits($new_identifiers); } } } // Find any cursors for refs which no longer exist. This happens when a // branch, tag or bookmark is deleted. foreach ($cursors as $name => $cursor) { if (!empty($ref_groups[$name])) { // This ref still has some positions, so we don't need to wipe it // out. Try the next one. continue; } foreach ($cursor->getPositions() as $position) { $this->log( pht( 'Ref %s "%s" no longer exists.', $cursor->getRefType(), $cursor->getRefName())); $this->markPositionDead($position); } } } /** * Find all ancestors of a new closing branch head which are not ancestors * of any old closing branch head. */ private function loadNewCommitIdentifiers( $new_head, array $all_closing_heads) { $repository = $this->getRepository(); $vcs = $repository->getVersionControlSystem(); switch ($vcs) { case PhabricatorRepositoryType::REPOSITORY_TYPE_MERCURIAL: if ($all_closing_heads) { $parts = array(); foreach ($all_closing_heads as $head) { $parts[] = hgsprintf('%s', $head); } // See T5896. Mercurial can not parse an "X or Y or ..." rev list // with more than about 300 items, because it exceeds the maximum // allowed recursion depth. Split all the heads into chunks of // 256, and build a query like this: // // ((1 or 2 or ... or 255) or (256 or 257 or ... 511)) // // If we have more than 65535 heads, we'll do that again: // // (((1 or ...) or ...) or ((65536 or ...) or ...)) $chunk_size = 256; while (count($parts) > $chunk_size) { $chunks = array_chunk($parts, $chunk_size); foreach ($chunks as $key => $chunk) { $chunks[$key] = '('.implode(' or ', $chunk).')'; } $parts = array_values($chunks); } $parts = '('.implode(' or ', $parts).')'; list($stdout) = $this->getRepository()->execxLocalCommand( 'log --template %s --rev %s', '{node}\n', hgsprintf('%s', $new_head).' - '.$parts); } else { list($stdout) = $this->getRepository()->execxLocalCommand( 'log --template %s --rev %s', '{node}\n', hgsprintf('%s', $new_head)); } $stdout = trim($stdout); if (!strlen($stdout)) { return array(); } return phutil_split_lines($stdout, $retain_newlines = false); case PhabricatorRepositoryType::REPOSITORY_TYPE_GIT: if ($all_closing_heads) { // See PHI1474. This length of list may exceed the maximum size of // a command line argument list, so pipe the list in using "--stdin" // instead. $ref_list = array(); $ref_list[] = $new_head; foreach ($all_closing_heads as $old_head) { $ref_list[] = '^'.$old_head; } $ref_list[] = '--'; $ref_list = implode("\n", $ref_list)."\n"; $future = $this->getRepository()->getLocalCommandFuture( 'log %s --stdin --', '--format=%H'); list($stdout) = $future ->write($ref_list) ->resolvex(); } else { list($stdout) = $this->getRepository()->execxLocalCommand( 'log %s %s --', '--format=%H', gitsprintf('%s', $new_head)); } $stdout = trim($stdout); if (!strlen($stdout)) { return array(); } return phutil_split_lines($stdout, $retain_newlines = false); default: throw new Exception(pht('Unsupported VCS "%s"!', $vcs)); } } /** * Mark a list of commits as permanent, and queue workers for those commits * which don't already have the flag. */ private function setPermanentFlagOnCommits(array $identifiers) { $repository = $this->getRepository(); $commit_table = new PhabricatorRepositoryCommit(); $conn = $commit_table->establishConnection('w'); $vcs = $repository->getVersionControlSystem(); switch ($vcs) { case PhabricatorRepositoryType::REPOSITORY_TYPE_GIT: $class = 'PhabricatorRepositoryGitCommitMessageParserWorker'; break; case PhabricatorRepositoryType::REPOSITORY_TYPE_SVN: $class = 'PhabricatorRepositorySvnCommitMessageParserWorker'; break; case PhabricatorRepositoryType::REPOSITORY_TYPE_MERCURIAL: $class = 'PhabricatorRepositoryMercurialCommitMessageParserWorker'; break; default: throw new Exception(pht("Unknown repository type '%s'!", $vcs)); } $identifier_tokens = array(); foreach ($identifiers as $identifier) { $identifier_tokens[] = qsprintf( $conn, '%s', $identifier); } $all_commits = array(); foreach (PhabricatorLiskDAO::chunkSQL($identifier_tokens) as $chunk) { $rows = queryfx_all( $conn, 'SELECT id, phid, commitIdentifier, importStatus FROM %T WHERE repositoryID = %d AND commitIdentifier IN (%LQ)', $commit_table->getTableName(), $repository->getID(), $chunk); foreach ($rows as $row) { $all_commits[] = $row; } } $commit_refs = array(); foreach ($identifiers as $identifier) { // See T13591. This construction is a bit ad-hoc, but the priority // function currently only cares about the number of refs we have // discovered, so we'll get the right result even without filling // these records out in detail. $commit_refs[] = id(new PhabricatorRepositoryCommitRef()) ->setIdentifier($identifier); } $task_priority = $this->getImportTaskPriority( $repository, $commit_refs); $permanent_flag = PhabricatorRepositoryCommit::IMPORTED_PERMANENT; $published_flag = PhabricatorRepositoryCommit::IMPORTED_PUBLISH; $all_commits = ipull($all_commits, null, 'commitIdentifier'); foreach ($identifiers as $identifier) { $row = idx($all_commits, $identifier); if (!$row) { throw new Exception( pht( 'Commit "%s" has not been discovered yet! Run discovery before '. 'updating refs.', $identifier)); } $import_status = $row['importStatus']; if (!($import_status & $permanent_flag)) { // Set the "permanent" flag. $import_status = ($import_status | $permanent_flag); // See T13580. Clear the "published" flag, so publishing executes // again. We may have previously performed a no-op "publish" on the // commit to make sure it has all bits in the "IMPORTED_ALL" bitmask. $import_status = ($import_status & ~$published_flag); queryfx( $conn, 'UPDATE %T SET importStatus = %d WHERE id = %d', $commit_table->getTableName(), $import_status, $row['id']); $this->queueCommitImportTask( $repository, - $row['id'], $row['phid'], $task_priority, $via = 'ref'); } } return $this; } private function newRefCursor( PhabricatorRepository $repository, $ref_type, $ref_name) { $cursor = id(new PhabricatorRepositoryRefCursor()) ->setRepositoryPHID($repository->getPHID()) ->setRefType($ref_type) ->setRefName($ref_name); $publisher = $repository->newPublisher(); $diffusion_ref = $cursor->newDiffusionRepositoryRef(); $is_permanent = $publisher->isPermanentRef($diffusion_ref); $cursor->setIsPermanent((int)$is_permanent); try { return $cursor->save(); } catch (AphrontDuplicateKeyQueryException $ex) { // If we raced another daemon to create this position and lost the race, // load the cursor the other daemon created instead. } $viewer = $this->getViewer(); $cursor = id(new PhabricatorRepositoryRefCursorQuery()) ->setViewer($viewer) ->withRepositoryPHIDs(array($repository->getPHID())) ->withRefTypes(array($ref_type)) ->withRefNames(array($ref_name)) ->needPositions(true) ->executeOne(); if (!$cursor) { throw new Exception( pht( 'Failed to create a new ref cursor (for "%s", of type "%s", in '. 'repository "%s") because it collided with an existing cursor, '. 'but then failed to load that cursor.', $ref_name, $ref_type, $repository->getDisplayName())); } return $cursor; } private function saveNewPositions() { $positions = $this->newPositions; foreach ($positions as $position) { try { $position->save(); } catch (AphrontDuplicateKeyQueryException $ex) { // We may race another daemon to create this position. If we do, and // we lose the race, that's fine: the other daemon did our work for // us and we can continue. } } $this->newPositions = array(); } private function deleteDeadPositions() { $positions = $this->deadPositions; $repository = $this->getRepository(); foreach ($positions as $position) { // Shove this ref into the old refs table so the discovery engine // can check if any commits have been rendered unreachable. id(new PhabricatorRepositoryOldRef()) ->setRepositoryPHID($repository->getPHID()) ->setCommitIdentifier($position->getCommitIdentifier()) ->save(); $position->delete(); } $this->deadPositions = array(); } /* -( Updating Git Refs )-------------------------------------------------- */ /** * @task git */ private function loadGitRefPositions(PhabricatorRepository $repository) { $refs = id(new DiffusionLowLevelGitRefQuery()) ->setRepository($repository) ->execute(); return mgroup($refs, 'getRefType'); } /* -( Updating Mercurial Refs )-------------------------------------------- */ /** * @task hg */ private function loadMercurialBranchPositions( PhabricatorRepository $repository) { return id(new DiffusionLowLevelMercurialBranchesQuery()) ->setRepository($repository) ->execute(); } /** * @task hg */ private function loadMercurialBookmarkPositions( PhabricatorRepository $repository) { // TODO: Implement support for Mercurial bookmarks. return array(); } } diff --git a/src/applications/repository/management/PhabricatorRepositoryManagementReparseWorkflow.php b/src/applications/repository/management/PhabricatorRepositoryManagementReparseWorkflow.php index e9e1cd1d6c..fe83d271ab 100644 --- a/src/applications/repository/management/PhabricatorRepositoryManagementReparseWorkflow.php +++ b/src/applications/repository/management/PhabricatorRepositoryManagementReparseWorkflow.php @@ -1,286 +1,286 @@ setName('reparse') ->setExamples('**reparse** [options] __commit__') ->setSynopsis( pht( '**reparse** __what__ __which_parts__ [--trace] [--force]'."\n\n". 'Rerun the Diffusion parser on specific commits and repositories. '. 'Mostly useful for debugging changes to Diffusion.'."\n\n". 'e.g. do same but exclude before yesterday (local time):'."\n". 'repository reparse --all TEST --change --min-date yesterday'."\n". 'repository reparse --all TEST --change --min-date "today -1 day".'. "\n\n". 'e.g. do same but exclude before 03/31/2013 (local time):'."\n". 'repository reparse --all TEST --change --min-date "03/31/2013"')) ->setArguments( array( array( 'name' => 'revision', 'wildcard' => true, ), array( 'name' => 'all', 'param' => 'repository', 'help' => pht( 'Reparse all commits in the specified repository.'), ), array( 'name' => 'min-date', 'param' => 'date', 'help' => pht( "Must be used with __%s__, this will exclude commits which ". "are earlier than __date__.\n". "Valid examples:\n". " 'today', 'today 2pm', '-1 hour', '-2 hours', '-24 hours',\n". " 'yesterday', 'today -1 day', 'yesterday 2pm', '2pm -1 day',\n". " 'last Monday', 'last Monday 14:00', 'last Monday 2pm',\n". " '31 March 2013', '31 Mar', '03/31', '03/31/2013',\n". "See __%s__ for more.", '--all', 'http://www.php.net/manual/en/datetime.formats.php'), ), array( 'name' => 'message', 'help' => pht('Reparse commit messages.'), ), array( 'name' => 'change', 'help' => pht('Reparse source changes.'), ), array( 'name' => 'publish', 'help' => pht( 'Publish changes: send email, publish Feed stories, run '. 'Herald rules, etc.'), ), array( 'name' => 'force', 'short' => 'f', 'help' => pht('Act noninteractively, without prompting.'), ), array( 'name' => 'background', 'help' => pht( 'Queue tasks for the daemons instead of running them in the '. 'foreground.'), ), array( 'name' => 'importing', 'help' => pht('Reparse all steps which have not yet completed.'), ), )); } public function execute(PhutilArgumentParser $args) { $console = PhutilConsole::getConsole(); $all_from_repo = $args->getArg('all'); $reparse_message = $args->getArg('message'); $reparse_change = $args->getArg('change'); $reparse_publish = $args->getArg('publish'); $reparse_what = $args->getArg('revision'); $force = $args->getArg('force'); $background = $args->getArg('background'); $min_date = $args->getArg('min-date'); $importing = $args->getArg('importing'); if (!$all_from_repo && !$reparse_what) { throw new PhutilArgumentUsageException( pht('Specify a commit or repository to reparse.')); } if ($all_from_repo && $reparse_what) { $commits = implode(', ', $reparse_what); throw new PhutilArgumentUsageException( pht( "Specify a commit or repository to reparse, not both:\n". "All from repo: %s\n". "Commit(s) to reparse: %s", $all_from_repo, $commits)); } $any_step = ($reparse_message || $reparse_change || $reparse_publish); if ($any_step && $importing) { throw new PhutilArgumentUsageException( pht( 'Choosing steps with "--importing" conflicts with flags which '. 'select specific steps.')); } else if ($any_step) { // OK. } else if ($importing) { // OK. } else if (!$any_step && !$importing) { throw new PhutilArgumentUsageException( pht( 'Specify which steps to reparse with "--message", "--change", '. 'and/or "--publish"; or "--importing" to run all missing steps.')); } $min_timestamp = false; if ($min_date) { $min_timestamp = strtotime($min_date); if (!$all_from_repo) { throw new PhutilArgumentUsageException( pht( 'You must use "--all" if you specify "--min-date".')); } // previous to PHP 5.1.0 you would compare with -1, instead of false if (false === $min_timestamp) { throw new PhutilArgumentUsageException( pht( "Supplied --min-date is not valid. See help for valid examples.\n". "Supplied value: '%s'\n", $min_date)); } } $commits = array(); if ($all_from_repo) { $repository = id(new PhabricatorRepositoryQuery()) ->setViewer(PhabricatorUser::getOmnipotentUser()) ->withIdentifiers(array($all_from_repo)) ->executeOne(); if (!$repository) { throw new PhutilArgumentUsageException( pht('Unknown repository "%s"!', $all_from_repo)); } $query = id(new DiffusionCommitQuery()) ->setViewer(PhabricatorUser::getOmnipotentUser()) ->withRepository($repository); if ($min_timestamp) { $query->withEpochRange($min_timestamp, null); } if ($importing) { $query->withImporting(true); } $commits = $query->execute(); if (!$commits) { throw new PhutilArgumentUsageException( pht( 'No commits have been discovered in the "%s" repository!', $repository->getDisplayName())); } } else { $commits = $this->loadNamedCommits($reparse_what); } if (!$background) { PhabricatorWorker::setRunAllTasksInProcess(true); } $progress = new PhutilConsoleProgressBar(); $progress->setTotal(count($commits)); $tasks = array(); foreach ($commits as $commit) { $repository = $commit->getRepository(); if ($importing) { $status = $commit->getImportStatus(); // Find the first missing import step and queue that up. $reparse_message = false; $reparse_change = false; $reparse_publish = false; if (!($status & PhabricatorRepositoryCommit::IMPORTED_MESSAGE)) { $reparse_message = true; } else if (!($status & PhabricatorRepositoryCommit::IMPORTED_CHANGE)) { $reparse_change = true; } else if (!($status & PhabricatorRepositoryCommit::IMPORTED_PUBLISH)) { $reparse_publish = true; } else { continue; } } $classes = array(); switch ($repository->getVersionControlSystem()) { case PhabricatorRepositoryType::REPOSITORY_TYPE_GIT: if ($reparse_message) { $classes[] = 'PhabricatorRepositoryGitCommitMessageParserWorker'; } if ($reparse_change) { $classes[] = 'PhabricatorRepositoryGitCommitChangeParserWorker'; } break; case PhabricatorRepositoryType::REPOSITORY_TYPE_MERCURIAL: if ($reparse_message) { $classes[] = 'PhabricatorRepositoryMercurialCommitMessageParserWorker'; } if ($reparse_change) { $classes[] = 'PhabricatorRepositoryMercurialCommitChangeParserWorker'; } break; case PhabricatorRepositoryType::REPOSITORY_TYPE_SVN: if ($reparse_message) { $classes[] = 'PhabricatorRepositorySvnCommitMessageParserWorker'; } if ($reparse_change) { $classes[] = 'PhabricatorRepositorySvnCommitChangeParserWorker'; } break; } if ($reparse_publish) { $classes[] = 'PhabricatorRepositoryCommitPublishWorker'; } // NOTE: With "--importing", we queue the first unparsed step and let // it queue the other ones normally. Without "--importing", we queue // all the requested steps explicitly. $spec = array( - 'commitID' => $commit->getID(), + 'commitPHID' => $commit->getPHID(), 'only' => !$importing, 'via' => 'reparse', ); foreach ($classes as $class) { try { PhabricatorWorker::scheduleTask( $class, $spec, array( 'priority' => PhabricatorWorker::PRIORITY_IMPORT, 'objectPHID' => $commit->getPHID(), 'containerPHID' => $repository->getPHID(), )); } catch (PhabricatorWorkerPermanentFailureException $ex) { // See T13315. We expect some reparse steps to occasionally raise // permanent failures: for example, because they are no longer // reachable. This is a routine condition, not a catastrophic // failure, so let the user know something happened but continue // reparsing any remaining commits. echo tsprintf( "** %s ** %s\n", pht('WARN'), $ex->getMessage()); } } $progress->update(1); } $progress->done(); return 0; } } diff --git a/src/applications/repository/worker/PhabricatorRepositoryCommitParserWorker.php b/src/applications/repository/worker/PhabricatorRepositoryCommitParserWorker.php index 5fe8e03f75..a8cfec5977 100644 --- a/src/applications/repository/worker/PhabricatorRepositoryCommitParserWorker.php +++ b/src/applications/repository/worker/PhabricatorRepositoryCommitParserWorker.php @@ -1,184 +1,205 @@ commit) { return $this->commit; } - $commit_id = idx($this->getTaskData(), 'commitID'); - if (!$commit_id) { + $viewer = $this->getViewer(); + $task_data = $this->getTaskData(); + + $commit_query = id(new DiffusionCommitQuery()) + ->setViewer($viewer); + + $commit_phid = idx($task_data, 'commitPHID'); + + // TODO: See T13591. This supports execution of legacy tasks and can + // eventually be removed. Newer tasks use "commitPHID" instead of + // "commitID". + if (!$commit_phid) { + $commit_id = idx($task_data, 'commitID'); + if ($commit_id) { + $legacy_commit = id(clone $commit_query) + ->withIDs(array($commit_id)) + ->executeOne(); + if ($legacy_commit) { + $commit_phid = $legacy_commit->getPHID(); + } + } + } + + if (!$commit_phid) { throw new PhabricatorWorkerPermanentFailureException( - pht('No "%s" in task data.', 'commitID')); + pht('Task data has no "commitPHID".')); } - $commit = id(new DiffusionCommitQuery()) - ->setViewer(PhabricatorUser::getOmnipotentUser()) - ->withIDs(array($commit_id)) + $commit = id(clone $commit_query) + ->withPHIDs(array($commit_phid)) ->executeOne(); if (!$commit) { throw new PhabricatorWorkerPermanentFailureException( - pht('Commit "%s" does not exist.', $commit_id)); + pht('Commit "%s" does not exist.', $commit_phid)); } if ($commit->isUnreachable()) { throw new PhabricatorWorkerPermanentFailureException( pht( - 'Commit "%s" (with internal ID "%s") is no longer reachable from '. - 'any branch, tag, or ref in this repository, so it will not be '. + 'Commit "%s" (with PHID "%s") is no longer reachable from any '. + 'branch, tag, or ref in this repository, so it will not be '. 'imported. This usually means that the branch the commit was on '. 'was deleted or overwritten.', $commit->getMonogram(), - $commit_id)); + $commit_phid)); } $this->commit = $commit; return $commit; } final protected function doWork() { $commit = $this->loadCommit(); $repository = $commit->getRepository(); $this->repository = $repository; $this->parseCommit($repository, $this->commit); } private function shouldQueueFollowupTasks() { return !idx($this->getTaskData(), 'only'); } final protected function queueCommitTask($task_class) { if (!$this->shouldQueueFollowupTasks()) { return; } $commit = $this->loadCommit(); $repository = $commit->getRepository(); $data = array( - 'commitID' => $commit->getID(), + 'commitPHID' => $commit->getPHID(), ); $task_data = $this->getTaskData(); if (isset($task_data['via'])) { $data['via'] = $task_data['via']; } $options = array( // We queue followup tasks at default priority so that the queue finishes // work it has started before starting more work. If followups are queued // at the same priority level, we do all message parses first, then all // change parses, etc. This makes progress uneven. See T11677 for // discussion. 'priority' => parent::PRIORITY_DEFAULT, 'objectPHID' => $commit->getPHID(), 'containerPHID' => $repository->getPHID(), ); $this->queueTask($task_class, $data, $options); } protected function getImportStepFlag() { return null; } final protected function shouldSkipImportStep() { // If this step has already been performed and this is a "natural" task // which was queued by the normal daemons, decline to do the work again. // This mitigates races if commits are rapidly deleted and revived. $flag = $this->getImportStepFlag(); if (!$flag) { // This step doesn't have an associated flag. return false; } $commit = $this->commit; if (!$commit->isPartiallyImported($flag)) { // This commit doesn't have the flag set yet. return false; } if (!$this->shouldQueueFollowupTasks()) { // This task was queued by administrative tools, so do the work even // if it duplicates existing work. return false; } $this->log( "%s\n", pht( 'Skipping import step; this step was previously completed for '. 'this commit.')); return true; } abstract protected function parseCommit( PhabricatorRepository $repository, PhabricatorRepositoryCommit $commit); protected function loadCommitHint(PhabricatorRepositoryCommit $commit) { $viewer = PhabricatorUser::getOmnipotentUser(); $repository = $commit->getRepository(); return id(new DiffusionCommitHintQuery()) ->setViewer($viewer) ->withRepositoryPHIDs(array($repository->getPHID())) ->withOldCommitIdentifiers(array($commit->getCommitIdentifier())) ->executeOne(); } public function renderForDisplay(PhabricatorUser $viewer) { $suffix = parent::renderForDisplay($viewer); $commit = id(new DiffusionCommitQuery()) ->setViewer($viewer) - ->withIDs(array(idx($this->getTaskData(), 'commitID'))) + ->withPHIDs(array(idx($this->getTaskData(), 'commitPHID'))) ->executeOne(); if (!$commit) { return $suffix; } $link = DiffusionView::linkCommit( $commit->getRepository(), $commit->getCommitIdentifier()); return array($link, $suffix); } final protected function loadCommitData(PhabricatorRepositoryCommit $commit) { if ($commit->hasCommitData()) { return $commit->getCommitData(); } $commit_id = $commit->getID(); $data = id(new PhabricatorRepositoryCommitData())->loadOneWhere( 'commitID = %d', $commit_id); if (!$data) { $data = id(new PhabricatorRepositoryCommitData()) ->setCommitID($commit_id); } $commit->attachCommitData($data); return $data; } final public function getViewer() { return PhabricatorUser::getOmnipotentUser(); } }