Differential D20712 Diff 49392 src/applications/repository/engine/PhabricatorRepositoryDiscoveryEngine.php
Changeset View
Changeset View
Standalone View
Standalone View
src/applications/repository/engine/PhabricatorRepositoryDiscoveryEngine.php
| Show First 20 Lines • Show All 87 Lines • ▼ Show 20 Lines | if ($this->isInitialImport($refs)) { | ||||
| new PhutilNumber(PhabricatorRepository::IMPORT_THRESHOLD))); | new PhutilNumber(PhabricatorRepository::IMPORT_THRESHOLD))); | ||||
| $repository->markImporting(); | $repository->markImporting(); | ||||
| } | } | ||||
| // Clear the working set cache. | // Clear the working set cache. | ||||
| $this->workingSet = array(); | $this->workingSet = array(); | ||||
| $task_priority = $this->getImportTaskPriority($repository, $refs); | |||||
| // Record discovered commits and mark them in the cache. | // Record discovered commits and mark them in the cache. | ||||
| foreach ($refs as $ref) { | foreach ($refs as $ref) { | ||||
| $this->recordCommit( | $this->recordCommit( | ||||
| $repository, | $repository, | ||||
| $ref->getIdentifier(), | $ref->getIdentifier(), | ||||
| $ref->getEpoch(), | $ref->getEpoch(), | ||||
| $ref->getCanCloseImmediately(), | $ref->getCanCloseImmediately(), | ||||
| $ref->getParents()); | $ref->getParents(), | ||||
| $task_priority); | |||||
| $this->commitCache[$ref->getIdentifier()] = true; | $this->commitCache[$ref->getIdentifier()] = true; | ||||
| } | } | ||||
| $this->markUnreachableCommits($repository); | $this->markUnreachableCommits($repository); | ||||
| $version = $this->getObservedVersion($repository); | $version = $this->getObservedVersion($repository); | ||||
| if ($version !== null) { | if ($version !== null) { | ||||
| ▲ Show 20 Lines • Show All 419 Lines • ▼ Show 20 Lines | /* -( Internals )---------------------------------------------------------- */ | ||||
| } | } | ||||
| private function recordCommit( | private function recordCommit( | ||||
| PhabricatorRepository $repository, | PhabricatorRepository $repository, | ||||
| $commit_identifier, | $commit_identifier, | ||||
| $epoch, | $epoch, | ||||
| $close_immediately, | $close_immediately, | ||||
| array $parents) { | array $parents, | ||||
| $task_priority) { | |||||
| $commit = new PhabricatorRepositoryCommit(); | $commit = new PhabricatorRepositoryCommit(); | ||||
| $conn_w = $repository->establishConnection('w'); | $conn_w = $repository->establishConnection('w'); | ||||
| // First, try to revive an existing unreachable commit (if one exists) by | // First, try to revive an existing unreachable commit (if one exists) by | ||||
| // removing the "unreachable" flag. If we succeed, we don't need to do | // removing the "unreachable" flag. If we succeed, we don't need to do | ||||
| // anything else: we already discovered this commit some time ago. | // anything else: we already discovered this commit some time ago. | ||||
| queryfx( | queryfx( | ||||
| $conn_w, | $conn_w, | ||||
| 'UPDATE %T SET importStatus = (importStatus & ~%d) | 'UPDATE %T SET importStatus = (importStatus & ~%d) | ||||
| WHERE repositoryID = %d AND commitIdentifier = %s', | WHERE repositoryID = %d AND commitIdentifier = %s', | ||||
| $commit->getTableName(), | $commit->getTableName(), | ||||
| PhabricatorRepositoryCommit::IMPORTED_UNREACHABLE, | PhabricatorRepositoryCommit::IMPORTED_UNREACHABLE, | ||||
| $repository->getID(), | $repository->getID(), | ||||
| $commit_identifier); | $commit_identifier); | ||||
| if ($conn_w->getAffectedRows()) { | if ($conn_w->getAffectedRows()) { | ||||
| $commit = $commit->loadOneWhere( | $commit = $commit->loadOneWhere( | ||||
| 'repositoryID = %d AND commitIdentifier = %s', | 'repositoryID = %d AND commitIdentifier = %s', | ||||
| $repository->getID(), | $repository->getID(), | ||||
| $commit_identifier); | $commit_identifier); | ||||
| // After reviving a commit, schedule new daemons for it. | // After reviving a commit, schedule new daemons for it. | ||||
| $this->didDiscoverCommit($repository, $commit, $epoch); | $this->didDiscoverCommit($repository, $commit, $epoch, $task_priority); | ||||
| return; | return; | ||||
| } | } | ||||
| $commit->setRepositoryID($repository->getID()); | $commit->setRepositoryID($repository->getID()); | ||||
| $commit->setCommitIdentifier($commit_identifier); | $commit->setCommitIdentifier($commit_identifier); | ||||
| $commit->setEpoch($epoch); | $commit->setEpoch($epoch); | ||||
| if ($close_immediately) { | if ($close_immediately) { | ||||
| $commit->setImportStatus(PhabricatorRepositoryCommit::IMPORTED_CLOSEABLE); | $commit->setImportStatus(PhabricatorRepositoryCommit::IMPORTED_CLOSEABLE); | ||||
| ▲ Show 20 Lines • Show All 44 Lines • ▼ Show 20 Lines | try { | ||||
| 'INSERT IGNORE INTO %T (childCommitID, parentCommitID) | 'INSERT IGNORE INTO %T (childCommitID, parentCommitID) | ||||
| VALUES (%d, %d)', | VALUES (%d, %d)', | ||||
| PhabricatorRepository::TABLE_PARENTS, | PhabricatorRepository::TABLE_PARENTS, | ||||
| $commit->getID(), | $commit->getID(), | ||||
| $parent_id); | $parent_id); | ||||
| } | } | ||||
| $commit->saveTransaction(); | $commit->saveTransaction(); | ||||
| $this->didDiscoverCommit($repository, $commit, $epoch); | $this->didDiscoverCommit($repository, $commit, $epoch, $task_priority); | ||||
| if ($this->repairMode) { | if ($this->repairMode) { | ||||
| // Normally, the query should throw a duplicate key exception. If we | // Normally, the query should throw a duplicate key exception. If we | ||||
| // reach this in repair mode, we've actually performed a repair. | // reach this in repair mode, we've actually performed a repair. | ||||
| $this->log(pht('Repaired commit "%s".', $commit_identifier)); | $this->log(pht('Repaired commit "%s".', $commit_identifier)); | ||||
| } | } | ||||
| PhutilEventEngine::dispatchEvent( | PhutilEventEngine::dispatchEvent( | ||||
| Show All 11 Lines | try { | ||||
| // data inconsistency or cosmic radiation; in any case, we're still | // data inconsistency or cosmic radiation; in any case, we're still | ||||
| // in a good state if we ignore the failure. | // in a good state if we ignore the failure. | ||||
| } | } | ||||
| } | } | ||||
| private function didDiscoverCommit( | private function didDiscoverCommit( | ||||
| PhabricatorRepository $repository, | PhabricatorRepository $repository, | ||||
| PhabricatorRepositoryCommit $commit, | PhabricatorRepositoryCommit $commit, | ||||
| $epoch) { | $epoch, | ||||
| $task_priority) { | |||||
| $this->insertTask($repository, $commit); | $this->insertTask($repository, $commit, $task_priority); | ||||
| // Update the repository summary table. | // Update the repository summary table. | ||||
| queryfx( | queryfx( | ||||
| $commit->establishConnection('w'), | $commit->establishConnection('w'), | ||||
| 'INSERT INTO %T (repositoryID, size, lastCommitID, epoch) | 'INSERT INTO %T (repositoryID, size, lastCommitID, epoch) | ||||
| VALUES (%d, 1, %d, %d) | VALUES (%d, 1, %d, %d) | ||||
| ON DUPLICATE KEY UPDATE | ON DUPLICATE KEY UPDATE | ||||
| size = size + 1, | size = size + 1, | ||||
| Show All 10 Lines | private function didDiscoverRefs(array $refs) { | ||||
| foreach ($refs as $ref) { | foreach ($refs as $ref) { | ||||
| $this->workingSet[$ref->getIdentifier()] = true; | $this->workingSet[$ref->getIdentifier()] = true; | ||||
| } | } | ||||
| } | } | ||||
| private function insertTask( | private function insertTask( | ||||
| PhabricatorRepository $repository, | PhabricatorRepository $repository, | ||||
| PhabricatorRepositoryCommit $commit, | PhabricatorRepositoryCommit $commit, | ||||
| $task_priority, | |||||
| $data = array()) { | $data = array()) { | ||||
| $vcs = $repository->getVersionControlSystem(); | $vcs = $repository->getVersionControlSystem(); | ||||
| switch ($vcs) { | switch ($vcs) { | ||||
| case PhabricatorRepositoryType::REPOSITORY_TYPE_GIT: | case PhabricatorRepositoryType::REPOSITORY_TYPE_GIT: | ||||
| $class = 'PhabricatorRepositoryGitCommitMessageParserWorker'; | $class = 'PhabricatorRepositoryGitCommitMessageParserWorker'; | ||||
| break; | break; | ||||
| case PhabricatorRepositoryType::REPOSITORY_TYPE_SVN: | case PhabricatorRepositoryType::REPOSITORY_TYPE_SVN: | ||||
| $class = 'PhabricatorRepositorySvnCommitMessageParserWorker'; | $class = 'PhabricatorRepositorySvnCommitMessageParserWorker'; | ||||
| break; | break; | ||||
| case PhabricatorRepositoryType::REPOSITORY_TYPE_MERCURIAL: | case PhabricatorRepositoryType::REPOSITORY_TYPE_MERCURIAL: | ||||
| $class = 'PhabricatorRepositoryMercurialCommitMessageParserWorker'; | $class = 'PhabricatorRepositoryMercurialCommitMessageParserWorker'; | ||||
| break; | break; | ||||
| default: | default: | ||||
| throw new Exception(pht("Unknown repository type '%s'!", $vcs)); | throw new Exception(pht("Unknown repository type '%s'!", $vcs)); | ||||
| } | } | ||||
| $data['commitID'] = $commit->getID(); | $data['commitID'] = $commit->getID(); | ||||
| // If the repository is importing for the first time, we schedule tasks | |||||
| // at IMPORT priority, which is very low. Making progress on importing a | |||||
| // new repository for the first time is less important than any other | |||||
| // daemon task. | |||||
| // If the repository has finished importing and we're just catching up | |||||
| // on recent commits, we schedule discovery at COMMIT priority, which is | |||||
| // slightly below the default priority. | |||||
| // Note that followup tasks and triggered tasks (like those generated by | |||||
| // Herald or Harbormaster) will queue at DEFAULT priority, so that each | |||||
| // commit tends to fully import before we start the next one. This tends | |||||
| // to give imports fairly predictable progress. See T11677 for some | |||||
| // discussion. | |||||
| if ($repository->isImporting()) { | |||||
| $task_priority = PhabricatorWorker::PRIORITY_IMPORT; | |||||
| } else { | |||||
| $task_priority = PhabricatorWorker::PRIORITY_COMMIT; | |||||
| } | |||||
| $options = array( | $options = array( | ||||
| 'priority' => $task_priority, | 'priority' => $task_priority, | ||||
| ); | ); | ||||
| PhabricatorWorker::scheduleTask($class, $data, $options); | PhabricatorWorker::scheduleTask($class, $data, $options); | ||||
| } | } | ||||
| private function isInitialImport(array $refs) { | private function isInitialImport(array $refs) { | ||||
| ▲ Show 20 Lines • Show All 201 Lines • ▼ Show 20 Lines | queryfx( | ||||
| epoch = VALUES(epoch)', | epoch = VALUES(epoch)', | ||||
| PhabricatorRepository::TABLE_SUMMARY, | PhabricatorRepository::TABLE_SUMMARY, | ||||
| $repository->getID(), | $repository->getID(), | ||||
| $data['N'], | $data['N'], | ||||
| $data['id'], | $data['id'], | ||||
| $data['epoch']); | $data['epoch']); | ||||
| } | } | ||||
| private function getImportTaskPriority( | |||||
| PhabricatorRepository $repository, | |||||
| array $refs) { | |||||
| // If the repository is importing for the first time, we schedule tasks | |||||
| // at IMPORT priority, which is very low. Making progress on importing a | |||||
| // new repository for the first time is less important than any other | |||||
| // daemon task. | |||||
| // If the repository has finished importing and we're just catching up | |||||
| // on recent commits, we usually schedule discovery at COMMIT priority, | |||||
| // which is slightly below the default priority. | |||||
| // Note that followup tasks and triggered tasks (like those generated by | |||||
| // Herald or Harbormaster) will queue at DEFAULT priority, so that each | |||||
| // commit tends to fully import before we start the next one. This tends | |||||
| // to give imports fairly predictable progress. See T11677 for some | |||||
| // discussion. | |||||
| if ($repository->isImporting()) { | |||||
| $this->log( | |||||
| pht( | |||||
| 'Importing %s commit(s) at low priority ("PRIORITY_IMPORT") '. | |||||
| 'because this repository is still importing.', | |||||
| phutil_count($refs))); | |||||
| return PhabricatorWorker::PRIORITY_IMPORT; | |||||
| } | |||||
| // See T13369. If we've discovered a lot of commits at once, import them | |||||
| // at lower priority. | |||||
| // This is mostly aimed at reducing the impact that synchronizing thousands | |||||
| // of commits from a remote upstream has on other repositories. The queue | |||||
| // is "mostly FIFO", so queueing a thousand commit imports can stall other | |||||
| // repositories. | |||||
| // In a perfect world we'd probably give repositories round-robin queue | |||||
| // priority, but we don't currently have the primitives for this and there | |||||
| // isn't a strong case for building them. | |||||
| // Use "a whole lot of commits showed up at once" as a heuristic for | |||||
| // detecting "someone synchronized an upstream", and import them at a lower | |||||
| // priority to more closely approximate fair scheduling. | |||||
| if (count($refs) >= PhabricatorRepository::LOWPRI_THRESHOLD) { | |||||
| $this->log( | |||||
| pht( | |||||
| 'Importing %s commit(s) at low priority ("PRIORITY_IMPORT") '. | |||||
| 'because many commits were discovered at once.', | |||||
| phutil_count($refs))); | |||||
| return PhabricatorWorker::PRIORITY_IMPORT; | |||||
| } | |||||
| // Otherwise, import at normal priority. | |||||
| if ($refs) { | |||||
| $this->log( | |||||
| pht( | |||||
| 'Importing %s commit(s) at normal priority ("PRIORITY_COMMIT").', | |||||
| phutil_count($refs))); | |||||
| } | |||||
| return PhabricatorWorker::PRIORITY_COMMIT; | |||||
| } | |||||
| } | } | ||||