diff --git a/src/applications/diffusion/engine/DiffusionCommitHookEngine.php b/src/applications/diffusion/engine/DiffusionCommitHookEngine.php index 0f6b05b578..c172f27466 100644 --- a/src/applications/diffusion/engine/DiffusionCommitHookEngine.php +++ b/src/applications/diffusion/engine/DiffusionCommitHookEngine.php @@ -1,1479 +1,1473 @@ remoteProtocol = $remote_protocol; return $this; } public function getRemoteProtocol() { return $this->remoteProtocol; } public function setRemoteAddress($remote_address) { $this->remoteAddress = $remote_address; return $this; } public function getRemoteAddress() { return $this->remoteAddress; } public function setRequestIdentifier($request_identifier) { $this->requestIdentifier = $request_identifier; return $this; } public function getRequestIdentifier() { return $this->requestIdentifier; } public function setStartTime($start_time) { $this->startTime = $start_time; return $this; } public function getStartTime() { return $this->startTime; } public function setSubversionTransactionInfo($transaction, $repository) { $this->subversionTransaction = $transaction; $this->subversionRepository = $repository; return $this; } public function setStdin($stdin) { $this->stdin = $stdin; return $this; } public function getStdin() { return $this->stdin; } public function setOriginalArgv(array $original_argv) { $this->originalArgv = $original_argv; return $this; } public function getOriginalArgv() { return $this->originalArgv; } public function setRepository(PhabricatorRepository $repository) { $this->repository = $repository; return $this; } public function getRepository() { return $this->repository; } public function setViewer(PhabricatorUser $viewer) { $this->viewer = $viewer; return $this; } public function getViewer() { return $this->viewer; } public function setMercurialHook($mercurial_hook) { $this->mercurialHook = $mercurial_hook; return $this; } public function getMercurialHook() { return $this->mercurialHook; } /* -( Hook Execution )----------------------------------------------------- */ public function execute() { $ref_updates = $this->findRefUpdates(); $all_updates = $ref_updates; $caught = null; try { try { $this->rejectDangerousChanges($ref_updates); } catch (DiffusionCommitHookRejectException $ex) { // If we're rejecting dangerous changes, flag everything that we've // seen as rejected so it's clear that none of it was accepted. $this->rejectCode = PhabricatorRepositoryPushLog::REJECT_DANGEROUS; throw $ex; } $content_updates = $this->findContentUpdates($ref_updates); $all_updates = array_merge($ref_updates, $content_updates); // If this is an "initial import" (a sizable push to a previously empty // repository) we'll allow enormous changes and disable Herald rules. // These rulesets can consume a large amount of time and memory and are // generally not relevant when importing repository history. $is_initial_import = $this->isInitialImport($all_updates); if (!$is_initial_import) { $this->applyHeraldRefRules($ref_updates); } try { if (!$is_initial_import) { $this->rejectOversizedFiles($content_updates); } } catch (DiffusionCommitHookRejectException $ex) { // If we're rejecting oversized files, flag everything. $this->rejectCode = PhabricatorRepositoryPushLog::REJECT_OVERSIZED; throw $ex; } try { if (!$is_initial_import) { $this->rejectCommitsAffectingTooManyPaths($content_updates); } } catch (DiffusionCommitHookRejectException $ex) { $this->rejectCode = PhabricatorRepositoryPushLog::REJECT_TOUCHES; throw $ex; } try { if (!$is_initial_import) { $this->rejectEnormousChanges($content_updates); } } catch (DiffusionCommitHookRejectException $ex) { // If we're rejecting enormous changes, flag everything. $this->rejectCode = PhabricatorRepositoryPushLog::REJECT_ENORMOUS; throw $ex; } if (!$is_initial_import) { $this->applyHeraldContentRules($content_updates); } // Run custom scripts in `hook.d/` directories. $this->applyCustomHooks($all_updates); // If we make it this far, we're accepting these changes. Mark all the // logs as accepted. $this->rejectCode = PhabricatorRepositoryPushLog::REJECT_ACCEPT; } catch (Exception $ex) { // We'll throw this again in a minute, but we want to save all the logs // first. $caught = $ex; } // Save all the logs no matter what the outcome was. $event = $this->newPushEvent(); $event->setRejectCode($this->rejectCode); $event->setRejectDetails($this->rejectDetails); - $event->openTransaction(); - $event->save(); - foreach ($all_updates as $update) { - $update->setPushEventPHID($event->getPHID()); - $update->save(); - } - $event->saveTransaction(); + $event->saveWithLogs($all_updates); if ($caught) { throw $caught; } // If this went through cleanly and was an import, set the importing flag // on the repository. It will be cleared once we fully process everything. if ($is_initial_import) { $repository = $this->getRepository(); $repository->markImporting(); } if ($this->emailPHIDs) { // If Herald rules triggered email to users, queue a worker to send the // mail. We do this out-of-process so that we block pushes as briefly // as possible. // (We do need to pull some commit info here because the commit objects // may not exist yet when this worker runs, which could be immediately.) PhabricatorWorker::scheduleTask( 'PhabricatorRepositoryPushMailWorker', array( 'eventPHID' => $event->getPHID(), 'emailPHIDs' => array_values($this->emailPHIDs), 'info' => $this->loadCommitInfoForWorker($all_updates), ), array( 'priority' => PhabricatorWorker::PRIORITY_ALERTS, )); } return 0; } private function findRefUpdates() { $type = $this->getRepository()->getVersionControlSystem(); switch ($type) { case PhabricatorRepositoryType::REPOSITORY_TYPE_GIT: return $this->findGitRefUpdates(); case PhabricatorRepositoryType::REPOSITORY_TYPE_MERCURIAL: return $this->findMercurialRefUpdates(); case PhabricatorRepositoryType::REPOSITORY_TYPE_SVN: return $this->findSubversionRefUpdates(); default: throw new Exception(pht('Unsupported repository type "%s"!', $type)); } } private function rejectDangerousChanges(array $ref_updates) { assert_instances_of($ref_updates, 'PhabricatorRepositoryPushLog'); $repository = $this->getRepository(); if ($repository->shouldAllowDangerousChanges()) { return; } $flag_dangerous = PhabricatorRepositoryPushLog::CHANGEFLAG_DANGEROUS; foreach ($ref_updates as $ref_update) { if (!$ref_update->hasChangeFlags($flag_dangerous)) { // This is not a dangerous change. continue; } // We either have a branch deletion or a non fast-forward branch update. // Format a message and reject the push. $message = pht( "DANGEROUS CHANGE: %s\n". "Dangerous change protection is enabled for this repository.\n". "Edit the repository configuration before making dangerous changes.", $ref_update->getDangerousChangeDescription()); throw new DiffusionCommitHookRejectException($message); } } private function findContentUpdates(array $ref_updates) { assert_instances_of($ref_updates, 'PhabricatorRepositoryPushLog'); $type = $this->getRepository()->getVersionControlSystem(); switch ($type) { case PhabricatorRepositoryType::REPOSITORY_TYPE_GIT: return $this->findGitContentUpdates($ref_updates); case PhabricatorRepositoryType::REPOSITORY_TYPE_MERCURIAL: return $this->findMercurialContentUpdates($ref_updates); case PhabricatorRepositoryType::REPOSITORY_TYPE_SVN: return $this->findSubversionContentUpdates($ref_updates); default: throw new Exception(pht('Unsupported repository type "%s"!', $type)); } } /* -( Herald )------------------------------------------------------------- */ private function applyHeraldRefRules(array $ref_updates) { $this->applyHeraldRules( $ref_updates, new HeraldPreCommitRefAdapter()); } private function applyHeraldContentRules(array $content_updates) { $this->applyHeraldRules( $content_updates, new HeraldPreCommitContentAdapter()); } private function applyHeraldRules( array $updates, HeraldAdapter $adapter_template) { if (!$updates) { return; } $viewer = $this->getViewer(); $adapter_template ->setHookEngine($this) ->setActingAsPHID($viewer->getPHID()); $engine = new HeraldEngine(); $rules = null; $blocking_effect = null; $blocked_update = null; $blocking_xscript = null; foreach ($updates as $update) { $adapter = id(clone $adapter_template) ->setPushLog($update); if ($rules === null) { $rules = $engine->loadRulesForAdapter($adapter); } $effects = $engine->applyRules($rules, $adapter); $engine->applyEffects($effects, $adapter, $rules); $xscript = $engine->getTranscript(); // Store any PHIDs we want to send email to for later. foreach ($adapter->getEmailPHIDs() as $email_phid) { $this->emailPHIDs[$email_phid] = $email_phid; } $block_action = DiffusionBlockHeraldAction::ACTIONCONST; if ($blocking_effect === null) { foreach ($effects as $effect) { if ($effect->getAction() == $block_action) { $blocking_effect = $effect; $blocked_update = $update; $blocking_xscript = $xscript; break; } } } } if ($blocking_effect) { $rule = $blocking_effect->getRule(); $this->rejectCode = PhabricatorRepositoryPushLog::REJECT_HERALD; $this->rejectDetails = $rule->getPHID(); $message = $blocking_effect->getTarget(); if (!strlen($message)) { $message = pht('(None.)'); } $blocked_ref_name = coalesce( $blocked_update->getRefName(), $blocked_update->getRefNewShort()); $blocked_name = $blocked_update->getRefType().'/'.$blocked_ref_name; throw new DiffusionCommitHookRejectException( pht( "This push was rejected by Herald push rule %s.\n". " Change: %s\n". " Rule: %s\n". " Reason: %s\n". "Transcript: %s", $rule->getMonogram(), $blocked_name, $rule->getName(), $message, PhabricatorEnv::getProductionURI( '/herald/transcript/'.$blocking_xscript->getID().'/'))); } } public function loadViewerProjectPHIDsForHerald() { // This just caches the viewer's projects so we don't need to load them // over and over again when applying Herald rules. if ($this->heraldViewerProjects === null) { $this->heraldViewerProjects = id(new PhabricatorProjectQuery()) ->setViewer($this->getViewer()) ->withMemberPHIDs(array($this->getViewer()->getPHID())) ->execute(); } return mpull($this->heraldViewerProjects, 'getPHID'); } /* -( Git )---------------------------------------------------------------- */ private function findGitRefUpdates() { $ref_updates = array(); // First, parse stdin, which lists all the ref changes. The input looks // like this: // // $stdin = $this->getStdin(); $lines = phutil_split_lines($stdin, $retain_endings = false); foreach ($lines as $line) { $parts = explode(' ', $line, 3); if (count($parts) != 3) { throw new Exception(pht('Expected "old new ref", got "%s".', $line)); } $ref_old = $parts[0]; $ref_new = $parts[1]; $ref_raw = $parts[2]; if (preg_match('(^refs/heads/)', $ref_raw)) { $ref_type = PhabricatorRepositoryPushLog::REFTYPE_BRANCH; $ref_raw = substr($ref_raw, strlen('refs/heads/')); } else if (preg_match('(^refs/tags/)', $ref_raw)) { $ref_type = PhabricatorRepositoryPushLog::REFTYPE_TAG; $ref_raw = substr($ref_raw, strlen('refs/tags/')); } else { $ref_type = PhabricatorRepositoryPushLog::REFTYPE_REF; } $ref_update = $this->newPushLog() ->setRefType($ref_type) ->setRefName($ref_raw) ->setRefOld($ref_old) ->setRefNew($ref_new); $ref_updates[] = $ref_update; } $this->findGitMergeBases($ref_updates); $this->findGitChangeFlags($ref_updates); return $ref_updates; } private function findGitMergeBases(array $ref_updates) { assert_instances_of($ref_updates, 'PhabricatorRepositoryPushLog'); $futures = array(); foreach ($ref_updates as $key => $ref_update) { // If the old hash is "00000...", the ref is being created (either a new // branch, or a new tag). If the new hash is "00000...", the ref is being // deleted. If both are nonempty, the ref is being updated. For updates, // we'll figure out the `merge-base` of the old and new objects here. This // lets us reject non-FF changes cheaply; later, we'll figure out exactly // which commits are new. $ref_old = $ref_update->getRefOld(); $ref_new = $ref_update->getRefNew(); if (($ref_old === self::EMPTY_HASH) || ($ref_new === self::EMPTY_HASH)) { continue; } $futures[$key] = $this->getRepository()->getLocalCommandFuture( 'merge-base %s %s', $ref_old, $ref_new); } $futures = id(new FutureIterator($futures)) ->limit(8); foreach ($futures as $key => $future) { // If 'old' and 'new' have no common ancestors (for example, a force push // which completely rewrites a ref), `git merge-base` will exit with // an error and no output. It would be nice to find a positive test // for this instead, but I couldn't immediately come up with one. See // T4224. Assume this means there are no ancestors. list($err, $stdout) = $future->resolve(); if ($err) { $merge_base = null; } else { $merge_base = rtrim($stdout, "\n"); } $ref_update = $ref_updates[$key]; $ref_update->setMergeBase($merge_base); } return $ref_updates; } private function findGitChangeFlags(array $ref_updates) { assert_instances_of($ref_updates, 'PhabricatorRepositoryPushLog'); foreach ($ref_updates as $key => $ref_update) { $ref_old = $ref_update->getRefOld(); $ref_new = $ref_update->getRefNew(); $ref_type = $ref_update->getRefType(); $ref_flags = 0; $dangerous = null; if (($ref_old === self::EMPTY_HASH) && ($ref_new === self::EMPTY_HASH)) { // This happens if you try to delete a tag or branch which does not // exist by pushing directly to the ref. Git will warn about it but // allow it. Just call it a delete, without flagging it as dangerous. $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_DELETE; } else if ($ref_old === self::EMPTY_HASH) { $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_ADD; } else if ($ref_new === self::EMPTY_HASH) { $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_DELETE; if ($ref_type == PhabricatorRepositoryPushLog::REFTYPE_BRANCH) { $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_DANGEROUS; $dangerous = pht( "The change you're attempting to push deletes the branch '%s'.", $ref_update->getRefName()); } } else { $merge_base = $ref_update->getMergeBase(); if ($merge_base == $ref_old) { // This is a fast-forward update to an existing branch. // These are safe. $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_APPEND; } else { $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_REWRITE; // For now, we don't consider deleting or moving tags to be a // "dangerous" update. It's way harder to get wrong and should be easy // to recover from once we have better logging. Only add the dangerous // flag if this ref is a branch. if ($ref_type == PhabricatorRepositoryPushLog::REFTYPE_BRANCH) { $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_DANGEROUS; $dangerous = pht( "The change you're attempting to push updates the branch '%s' ". "from '%s' to '%s', but this is not a fast-forward. Pushes ". "which rewrite published branch history are dangerous.", $ref_update->getRefName(), $ref_update->getRefOldShort(), $ref_update->getRefNewShort()); } } } $ref_update->setChangeFlags($ref_flags); if ($dangerous !== null) { $ref_update->attachDangerousChangeDescription($dangerous); } } return $ref_updates; } private function findGitContentUpdates(array $ref_updates) { $flag_delete = PhabricatorRepositoryPushLog::CHANGEFLAG_DELETE; $futures = array(); foreach ($ref_updates as $key => $ref_update) { if ($ref_update->hasChangeFlags($flag_delete)) { // Deleting a branch or tag can never create any new commits. continue; } // NOTE: This piece of magic finds all new commits, by walking backward // from the new value to the value of *any* existing ref in the // repository. Particularly, this will cover the cases of a new branch, a // completely moved tag, etc. $futures[$key] = $this->getRepository()->getLocalCommandFuture( 'log %s %s --not --all --', '--format=%H', gitsprintf('%s', $ref_update->getRefNew())); } $content_updates = array(); $futures = id(new FutureIterator($futures)) ->limit(8); foreach ($futures as $key => $future) { list($stdout) = $future->resolvex(); if (!strlen(trim($stdout))) { // This change doesn't have any new commits. One common case of this // is creating a new tag which points at an existing commit. continue; } $commits = phutil_split_lines($stdout, $retain_newlines = false); // If we're looking at a branch, mark all of the new commits as on that // branch. It's only possible for these commits to be on updated branches, // since any other branch heads are necessarily behind them. $branch_name = null; $ref_update = $ref_updates[$key]; $type_branch = PhabricatorRepositoryPushLog::REFTYPE_BRANCH; if ($ref_update->getRefType() == $type_branch) { $branch_name = $ref_update->getRefName(); } foreach ($commits as $commit) { if ($branch_name) { $this->gitCommits[$commit][] = $branch_name; } $content_updates[$commit] = $this->newPushLog() ->setRefType(PhabricatorRepositoryPushLog::REFTYPE_COMMIT) ->setRefNew($commit) ->setChangeFlags(PhabricatorRepositoryPushLog::CHANGEFLAG_ADD); } } return $content_updates; } /* -( Custom )------------------------------------------------------------- */ private function applyCustomHooks(array $updates) { $args = $this->getOriginalArgv(); $stdin = $this->getStdin(); $console = PhutilConsole::getConsole(); $env = array( self::ENV_REPOSITORY => $this->getRepository()->getPHID(), self::ENV_USER => $this->getViewer()->getUsername(), self::ENV_REQUEST => $this->getRequestIdentifier(), self::ENV_REMOTE_PROTOCOL => $this->getRemoteProtocol(), self::ENV_REMOTE_ADDRESS => $this->getRemoteAddress(), ); $repository = $this->getRepository(); $env += $repository->getPassthroughEnvironmentalVariables(); $directories = $repository->getHookDirectories(); foreach ($directories as $directory) { $hooks = $this->getExecutablesInDirectory($directory); sort($hooks); foreach ($hooks as $hook) { // NOTE: We're explicitly running the hooks in sequential order to // make this more predictable. $future = id(new ExecFuture('%s %Ls', $hook, $args)) ->setEnv($env, $wipe_process_env = false) ->write($stdin); list($err, $stdout, $stderr) = $future->resolve(); if (!$err) { // This hook ran OK, but echo its output in case there was something // informative. $console->writeOut('%s', $stdout); $console->writeErr('%s', $stderr); continue; } $this->rejectCode = PhabricatorRepositoryPushLog::REJECT_EXTERNAL; $this->rejectDetails = basename($hook); throw new DiffusionCommitHookRejectException( pht( "This push was rejected by custom hook script '%s':\n\n%s%s", basename($hook), $stdout, $stderr)); } } } private function getExecutablesInDirectory($directory) { $executables = array(); if (!Filesystem::pathExists($directory)) { return $executables; } foreach (Filesystem::listDirectory($directory) as $path) { $full_path = $directory.DIRECTORY_SEPARATOR.$path; if (!is_executable($full_path)) { // Don't include non-executable files. continue; } if (basename($full_path) == 'README') { // Don't include README, even if it is marked as executable. It almost // certainly got caught in the crossfire of a sweeping `chmod`, since // users do this with some frequency. continue; } $executables[] = $full_path; } return $executables; } /* -( Mercurial )---------------------------------------------------------- */ private function findMercurialRefUpdates() { $hook = $this->getMercurialHook(); switch ($hook) { case 'pretxnchangegroup': return $this->findMercurialChangegroupRefUpdates(); case 'prepushkey': return $this->findMercurialPushKeyRefUpdates(); default: throw new Exception(pht('Unrecognized hook "%s"!', $hook)); } } private function findMercurialChangegroupRefUpdates() { $hg_node = getenv('HG_NODE'); if (!$hg_node) { throw new Exception( pht( 'Expected %s in environment!', 'HG_NODE')); } // NOTE: We need to make sure this is passed to subprocesses, or they won't // be able to see new commits. Mercurial uses this as a marker to determine // whether the pending changes are visible or not. $_ENV['HG_PENDING'] = getenv('HG_PENDING'); $repository = $this->getRepository(); $futures = array(); foreach (array('old', 'new') as $key) { $futures[$key] = $repository->getLocalCommandFuture( 'heads --template %s', '{node}\1{branch}\2'); } // Wipe HG_PENDING out of the old environment so we see the pre-commit // state of the repository. $futures['old']->updateEnv('HG_PENDING', null); $futures['commits'] = $repository->getLocalCommandFuture( 'log --rev %s --template %s', hgsprintf('%s:%s', $hg_node, 'tip'), '{node}\1{branch}\2'); // Resolve all of the futures now. We don't need the 'commits' future yet, // but it simplifies the logic to just get it out of the way. foreach (new FutureIterator($futures) as $future) { $future->resolve(); } list($commit_raw) = $futures['commits']->resolvex(); $commit_map = $this->parseMercurialCommits($commit_raw); $this->mercurialCommits = $commit_map; // NOTE: `hg heads` exits with an error code and no output if the repository // has no heads. Most commonly this happens on a new repository. We know // we can run `hg` successfully since the `hg log` above didn't error, so // just ignore the error code. list($err, $old_raw) = $futures['old']->resolve(); $old_refs = $this->parseMercurialHeads($old_raw); list($err, $new_raw) = $futures['new']->resolve(); $new_refs = $this->parseMercurialHeads($new_raw); $all_refs = array_keys($old_refs + $new_refs); $ref_updates = array(); foreach ($all_refs as $ref) { $old_heads = idx($old_refs, $ref, array()); $new_heads = idx($new_refs, $ref, array()); sort($old_heads); sort($new_heads); if (!$old_heads && !$new_heads) { // This should never be possible, as it makes no sense. Explode. throw new Exception( pht( 'Mercurial repository has no new or old heads for branch "%s" '. 'after push. This makes no sense; rejecting change.', $ref)); } if ($old_heads === $new_heads) { // No changes to this branch, so skip it. continue; } $stray_heads = array(); $head_map = array(); if ($old_heads && !$new_heads) { // This is a branch deletion with "--close-branch". foreach ($old_heads as $old_head) { $head_map[$old_head] = array(self::EMPTY_HASH); } } else if (count($old_heads) > 1) { // HORRIBLE: In Mercurial, branches can have multiple heads. If the // old branch had multiple heads, we need to figure out which new // heads descend from which old heads, so we can tell whether you're // actively creating new heads (dangerous) or just working in a // repository that's already full of garbage (strongly discouraged but // not as inherently dangerous). These cases should be very uncommon. // NOTE: We're only looking for heads on the same branch. The old // tip of the branch may be the branchpoint for other branches, but that // is OK. $dfutures = array(); foreach ($old_heads as $old_head) { $dfutures[$old_head] = $repository->getLocalCommandFuture( 'log --branch %s --rev %s --template %s', $ref, hgsprintf('(descendants(%s) and head())', $old_head), '{node}\1'); } foreach (new FutureIterator($dfutures) as $future_head => $dfuture) { list($stdout) = $dfuture->resolvex(); $descendant_heads = array_filter(explode("\1", $stdout)); if ($descendant_heads) { // This old head has at least one descendant in the push. $head_map[$future_head] = $descendant_heads; } else { // This old head has no descendants, so it is being deleted. $head_map[$future_head] = array(self::EMPTY_HASH); } } // Now, find all the new stray heads this push creates, if any. These // are new heads which do not descend from the old heads. $seen = array_fuse(array_mergev($head_map)); foreach ($new_heads as $new_head) { if ($new_head === self::EMPTY_HASH) { // If a branch head is being deleted, don't insert it as an add. continue; } if (empty($seen[$new_head])) { $head_map[self::EMPTY_HASH][] = $new_head; } } } else if ($old_heads) { $head_map[head($old_heads)] = $new_heads; } else { $head_map[self::EMPTY_HASH] = $new_heads; } foreach ($head_map as $old_head => $child_heads) { foreach ($child_heads as $new_head) { if ($new_head === $old_head) { continue; } $ref_flags = 0; $dangerous = null; if ($old_head == self::EMPTY_HASH) { $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_ADD; } else { $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_APPEND; } $deletes_existing_head = ($new_head == self::EMPTY_HASH); $splits_existing_head = (count($child_heads) > 1); $creates_duplicate_head = ($old_head == self::EMPTY_HASH) && (count($head_map) > 1); if ($splits_existing_head || $creates_duplicate_head) { $readable_child_heads = array(); foreach ($child_heads as $child_head) { $readable_child_heads[] = substr($child_head, 0, 12); } $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_DANGEROUS; if ($splits_existing_head) { // We're splitting an existing head into two or more heads. // This is dangerous, and a super bad idea. Note that we're only // raising this if you're actively splitting a branch head. If a // head split in the past, we don't consider appends to it // to be dangerous. $dangerous = pht( "The change you're attempting to push splits the head of ". "branch '%s' into multiple heads: %s. This is inadvisable ". "and dangerous.", $ref, implode(', ', $readable_child_heads)); } else { // We're adding a second (or more) head to a branch. The new // head is not a descendant of any old head. $dangerous = pht( "The change you're attempting to push creates new, divergent ". "heads for the branch '%s': %s. This is inadvisable and ". "dangerous.", $ref, implode(', ', $readable_child_heads)); } } if ($deletes_existing_head) { // TODO: Somewhere in here we should be setting CHANGEFLAG_REWRITE // if we are also creating at least one other head to replace // this one. // NOTE: In Git, this is a dangerous change, but it is not dangerous // in Mercurial. Mercurial branches are version controlled, and // Mercurial does not prompt you for any special flags when pushing // a `--close-branch` commit by default. $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_DELETE; } $ref_update = $this->newPushLog() ->setRefType(PhabricatorRepositoryPushLog::REFTYPE_BRANCH) ->setRefName($ref) ->setRefOld($old_head) ->setRefNew($new_head) ->setChangeFlags($ref_flags); if ($dangerous !== null) { $ref_update->attachDangerousChangeDescription($dangerous); } $ref_updates[] = $ref_update; } } } return $ref_updates; } private function findMercurialPushKeyRefUpdates() { $key_namespace = getenv('HG_NAMESPACE'); if ($key_namespace === 'phases') { // Mercurial changes commit phases as part of normal push operations. We // just ignore these, as they don't seem to represent anything // interesting. return array(); } $key_name = getenv('HG_KEY'); $key_old = getenv('HG_OLD'); if (!strlen($key_old)) { $key_old = null; } $key_new = getenv('HG_NEW'); if (!strlen($key_new)) { $key_new = null; } if ($key_namespace !== 'bookmarks') { throw new Exception( pht( "Unknown Mercurial key namespace '%s', with key '%s' (%s -> %s). ". "Rejecting push.", $key_namespace, $key_name, coalesce($key_old, pht('null')), coalesce($key_new, pht('null')))); } if ($key_old === $key_new) { // We get a callback when the bookmark doesn't change. Just ignore this, // as it's a no-op. return array(); } $ref_flags = 0; $merge_base = null; if ($key_old === null) { $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_ADD; } else if ($key_new === null) { $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_DELETE; } else { list($merge_base_raw) = $this->getRepository()->execxLocalCommand( 'log --template %s --rev %s', '{node}', hgsprintf('ancestor(%s, %s)', $key_old, $key_new)); if (strlen(trim($merge_base_raw))) { $merge_base = trim($merge_base_raw); } if ($merge_base && ($merge_base === $key_old)) { $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_APPEND; } else { $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_REWRITE; } } $ref_update = $this->newPushLog() ->setRefType(PhabricatorRepositoryPushLog::REFTYPE_BOOKMARK) ->setRefName($key_name) ->setRefOld(coalesce($key_old, self::EMPTY_HASH)) ->setRefNew(coalesce($key_new, self::EMPTY_HASH)) ->setChangeFlags($ref_flags); return array($ref_update); } private function findMercurialContentUpdates(array $ref_updates) { $content_updates = array(); foreach ($this->mercurialCommits as $commit => $branches) { $content_updates[$commit] = $this->newPushLog() ->setRefType(PhabricatorRepositoryPushLog::REFTYPE_COMMIT) ->setRefNew($commit) ->setChangeFlags(PhabricatorRepositoryPushLog::CHANGEFLAG_ADD); } return $content_updates; } private function parseMercurialCommits($raw) { $commits_lines = explode("\2", $raw); $commits_lines = array_filter($commits_lines); $commit_map = array(); foreach ($commits_lines as $commit_line) { list($node, $branch) = explode("\1", $commit_line); $commit_map[$node] = array($branch); } return $commit_map; } private function parseMercurialHeads($raw) { $heads_map = $this->parseMercurialCommits($raw); $heads = array(); foreach ($heads_map as $commit => $branches) { foreach ($branches as $branch) { $heads[$branch][] = $commit; } } return $heads; } /* -( Subversion )--------------------------------------------------------- */ private function findSubversionRefUpdates() { // Subversion doesn't have any kind of mutable ref metadata. return array(); } private function findSubversionContentUpdates(array $ref_updates) { list($youngest) = execx( 'svnlook youngest %s', $this->subversionRepository); $ref_new = (int)$youngest + 1; $ref_flags = 0; $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_ADD; $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_APPEND; $ref_content = $this->newPushLog() ->setRefType(PhabricatorRepositoryPushLog::REFTYPE_COMMIT) ->setRefNew($ref_new) ->setChangeFlags($ref_flags); return array($ref_content); } /* -( Internals )---------------------------------------------------------- */ private function newPushLog() { // NOTE: We generate PHIDs up front so the Herald transcripts can pick them // up. $phid = id(new PhabricatorRepositoryPushLog())->generatePHID(); $device = AlmanacKeys::getLiveDevice(); if ($device) { $device_phid = $device->getPHID(); } else { $device_phid = null; } return PhabricatorRepositoryPushLog::initializeNewLog($this->getViewer()) ->setPHID($phid) ->setDevicePHID($device_phid) ->setRepositoryPHID($this->getRepository()->getPHID()) ->attachRepository($this->getRepository()) ->setEpoch(PhabricatorTime::getNow()); } private function newPushEvent() { $viewer = $this->getViewer(); $hook_start = $this->getStartTime(); $event = PhabricatorRepositoryPushEvent::initializeNewEvent($viewer) ->setRepositoryPHID($this->getRepository()->getPHID()) ->setRemoteAddress($this->getRemoteAddress()) ->setRemoteProtocol($this->getRemoteProtocol()) ->setEpoch(PhabricatorTime::getNow()) ->setHookWait(phutil_microseconds_since($hook_start)); $identifier = $this->getRequestIdentifier(); if (strlen($identifier)) { $event->setRequestIdentifier($identifier); } return $event; } private function rejectEnormousChanges(array $content_updates) { $repository = $this->getRepository(); if ($repository->shouldAllowEnormousChanges()) { return; } // See T13142. Don't cache more than 64MB of changesets. For normal small // pushes, caching everything here can let us hit the cache from Herald if // we need to run content rules, which speeds things up a bit. For large // pushes, we may not be able to hold everything in memory. $cache_limit = 1024 * 1024 * 64; foreach ($content_updates as $update) { $identifier = $update->getRefNew(); try { $info = $this->loadChangesetsForCommit($identifier); list($changesets, $size) = $info; if ($this->changesetsSize + $size <= $cache_limit) { $this->changesets[$identifier] = $changesets; $this->changesetsSize += $size; } } catch (Exception $ex) { $this->changesets[$identifier] = $ex; $message = pht( 'ENORMOUS CHANGE'. "\n". 'Enormous change protection is enabled for this repository, but '. 'you are pushing an enormous change ("%s"). Edit the repository '. 'configuration before making enormous changes.'. "\n\n". "Content Exception: %s", $identifier, $ex->getMessage()); throw new DiffusionCommitHookRejectException($message); } } } private function loadChangesetsForCommit($identifier) { $byte_limit = HeraldCommitAdapter::getEnormousByteLimit(); $time_limit = HeraldCommitAdapter::getEnormousTimeLimit(); $vcs = $this->getRepository()->getVersionControlSystem(); switch ($vcs) { case PhabricatorRepositoryType::REPOSITORY_TYPE_GIT: case PhabricatorRepositoryType::REPOSITORY_TYPE_MERCURIAL: // For git and hg, we can use normal commands. $drequest = DiffusionRequest::newFromDictionary( array( 'repository' => $this->getRepository(), 'user' => $this->getViewer(), 'commit' => $identifier, )); $raw_diff = DiffusionRawDiffQuery::newFromDiffusionRequest($drequest) ->setTimeout($time_limit) ->setByteLimit($byte_limit) ->setLinesOfContext(0) ->executeInline(); break; case PhabricatorRepositoryType::REPOSITORY_TYPE_SVN: // TODO: This diff has 3 lines of context, which produces slightly // incorrect "added file content" and "removed file content" results. // This may also choke on binaries, but "svnlook diff" does not support // the "--diff-cmd" flag. // For subversion, we need to use `svnlook`. $future = new ExecFuture( 'svnlook diff -t %s %s', $this->subversionTransaction, $this->subversionRepository); $future->setTimeout($time_limit); $future->setStdoutSizeLimit($byte_limit); $future->setStderrSizeLimit($byte_limit); list($raw_diff) = $future->resolvex(); break; default: throw new Exception(pht("Unknown VCS '%s!'", $vcs)); } if (strlen($raw_diff) >= $byte_limit) { throw new Exception( pht( 'The raw text of this change ("%s") is enormous (larger than %s '. 'bytes).', $identifier, new PhutilNumber($byte_limit))); } if (!strlen($raw_diff)) { // If the commit is actually empty, just return no changesets. return array(array(), 0); } $parser = new ArcanistDiffParser(); $changes = $parser->parseDiff($raw_diff); $diff = DifferentialDiff::newEphemeralFromRawChanges( $changes); $changesets = $diff->getChangesets(); $size = strlen($raw_diff); return array($changesets, $size); } public function getChangesetsForCommit($identifier) { if (isset($this->changesets[$identifier])) { $cached = $this->changesets[$identifier]; if ($cached instanceof Exception) { throw $cached; } return $cached; } $info = $this->loadChangesetsForCommit($identifier); list($changesets, $size) = $info; return $changesets; } private function rejectOversizedFiles(array $content_updates) { $repository = $this->getRepository(); $limit = $repository->getFilesizeLimit(); if (!$limit) { return; } foreach ($content_updates as $update) { $identifier = $update->getRefNew(); $sizes = $this->getFileSizesForCommit($identifier); foreach ($sizes as $path => $size) { if ($size <= $limit) { continue; } $message = pht( 'OVERSIZED FILE'. "\n". 'This repository ("%s") is configured with a maximum individual '. 'file size limit, but you are pushing a change ("%s") which causes '. 'the size of a file ("%s") to exceed the limit. The commit makes '. 'the file %s bytes long, but the limit for this repository is '. '%s bytes.', $repository->getDisplayName(), $identifier, $path, new PhutilNumber($size), new PhutilNumber($limit)); throw new DiffusionCommitHookRejectException($message); } } } private function rejectCommitsAffectingTooManyPaths(array $content_updates) { $repository = $this->getRepository(); $limit = $repository->getTouchLimit(); if (!$limit) { return; } foreach ($content_updates as $update) { $identifier = $update->getRefNew(); $sizes = $this->getFileSizesForCommit($identifier); if (count($sizes) > $limit) { $message = pht( 'COMMIT AFFECTS TOO MANY PATHS'. "\n". 'This repository ("%s") is configured with a touched files limit '. 'that caps the maximum number of paths any single commit may '. 'affect. You are pushing a change ("%s") which exceeds this '. 'limit: it affects %s paths, but the largest number of paths any '. 'commit may affect is %s paths.', $repository->getDisplayName(), $identifier, phutil_count($sizes), new PhutilNumber($limit)); throw new DiffusionCommitHookRejectException($message); } } } public function getFileSizesForCommit($identifier) { if (!isset($this->filesizeCache[$identifier])) { $file_sizes = $this->loadFileSizesForCommit($identifier); $this->filesizeCache[$identifier] = $file_sizes; } return $this->filesizeCache[$identifier]; } private function loadFileSizesForCommit($identifier) { $repository = $this->getRepository(); return id(new DiffusionLowLevelFilesizeQuery()) ->setRepository($repository) ->withIdentifier($identifier) ->execute(); } public function loadCommitRefForCommit($identifier) { $repository = $this->getRepository(); $vcs = $repository->getVersionControlSystem(); switch ($vcs) { case PhabricatorRepositoryType::REPOSITORY_TYPE_GIT: case PhabricatorRepositoryType::REPOSITORY_TYPE_MERCURIAL: return id(new DiffusionLowLevelCommitQuery()) ->setRepository($repository) ->withIdentifier($identifier) ->execute(); case PhabricatorRepositoryType::REPOSITORY_TYPE_SVN: // For subversion, we need to use `svnlook`. list($message) = execx( 'svnlook log -t %s %s', $this->subversionTransaction, $this->subversionRepository); return id(new DiffusionCommitRef()) ->setMessage($message); break; default: throw new Exception(pht("Unknown VCS '%s!'", $vcs)); } } public function loadBranches($identifier) { $repository = $this->getRepository(); $vcs = $repository->getVersionControlSystem(); switch ($vcs) { case PhabricatorRepositoryType::REPOSITORY_TYPE_GIT: return idx($this->gitCommits, $identifier, array()); case PhabricatorRepositoryType::REPOSITORY_TYPE_MERCURIAL: // NOTE: This will be "the branch the commit was made to", not // "a list of all branch heads which descend from the commit". // This is consistent with Mercurial, but possibly confusing. return idx($this->mercurialCommits, $identifier, array()); case PhabricatorRepositoryType::REPOSITORY_TYPE_SVN: // Subversion doesn't have branches. return array(); } } private function loadCommitInfoForWorker(array $all_updates) { $type_commit = PhabricatorRepositoryPushLog::REFTYPE_COMMIT; $map = array(); foreach ($all_updates as $update) { if ($update->getRefType() != $type_commit) { continue; } $map[$update->getRefNew()] = array(); } foreach ($map as $identifier => $info) { $ref = $this->loadCommitRefForCommit($identifier); $map[$identifier] += array( 'summary' => $ref->getSummary(), 'branches' => $this->loadBranches($identifier), ); } return $map; } private function isInitialImport(array $all_updates) { $repository = $this->getRepository(); $vcs = $repository->getVersionControlSystem(); switch ($vcs) { case PhabricatorRepositoryType::REPOSITORY_TYPE_SVN: // There is no meaningful way to import history into Subversion by // pushing. return false; default: break; } // Now, apply a heuristic to guess whether this is a normal commit or // an initial import. We guess something is an initial import if: // // - the repository is currently empty; and // - it pushes more than 7 commits at once. // // The number "7" is chosen arbitrarily as seeming reasonable. We could // also look at author data (do the commits come from multiple different // authors?) and commit date data (is the oldest commit more than 48 hours // old), but we don't have immediate access to those and this simple // heuristic might be good enough. $commit_count = 0; $type_commit = PhabricatorRepositoryPushLog::REFTYPE_COMMIT; foreach ($all_updates as $update) { if ($update->getRefType() != $type_commit) { continue; } $commit_count++; } if ($commit_count <= PhabricatorRepository::IMPORT_THRESHOLD) { // If this pushes a very small number of commits, assume it's an // initial commit or stack of a few initial commits. return false; } $any_commits = id(new DiffusionCommitQuery()) ->setViewer($this->getViewer()) ->withRepository($repository) ->setLimit(1) ->execute(); if ($any_commits) { // If the repository already has commits, this isn't an import. return false; } return true; } } diff --git a/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php b/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php index a364828397..0372e44f4b 100644 --- a/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php +++ b/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php @@ -1,904 +1,941 @@ repository = $repository; return $this; } public function getRepository() { return $this->repository; } public function setViewer(PhabricatorUser $viewer) { $this->viewer = $viewer; return $this; } public function getViewer() { return $this->viewer; } public function setLog(DiffusionRepositoryClusterEngineLogInterface $log) { $this->logger = $log; return $this; } public function setActingAsPHID($acting_as_phid) { $this->actingAsPHID = $acting_as_phid; return $this; } public function getActingAsPHID() { return $this->actingAsPHID; } private function getEffectiveActingAsPHID() { if ($this->actingAsPHID) { return $this->actingAsPHID; } return $this->getViewer()->getPHID(); } /* -( Cluster Synchronization )-------------------------------------------- */ /** * Synchronize repository version information after creating a repository. * * This initializes working copy versions for all currently bound devices to * 0, so that we don't get stuck making an ambiguous choice about which * devices are leaders when we later synchronize before a read. * * @task sync */ public function synchronizeWorkingCopyAfterCreation() { if (!$this->shouldEnableSynchronization(false)) { return; } $repository = $this->getRepository(); $repository_phid = $repository->getPHID(); $service = $repository->loadAlmanacService(); if (!$service) { throw new Exception(pht('Failed to load repository cluster service.')); } $bindings = $service->getActiveBindings(); foreach ($bindings as $binding) { PhabricatorRepositoryWorkingCopyVersion::updateVersion( $repository_phid, $binding->getDevicePHID(), 0); } return $this; } /** * @task sync */ public function synchronizeWorkingCopyAfterHostingChange() { if (!$this->shouldEnableSynchronization(false)) { return; } $repository = $this->getRepository(); $repository_phid = $repository->getPHID(); $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( $repository_phid); $versions = mpull($versions, null, 'getDevicePHID'); // After converting a hosted repository to observed, or vice versa, we // need to reset version numbers because the clocks for observed and hosted // repositories run on different units. // We identify all the cluster leaders and reset their version to 0. // We identify all the cluster followers and demote them. // This allows the cluster to start over again at version 0 but keep the // same leaders. if ($versions) { $max_version = (int)max(mpull($versions, 'getRepositoryVersion')); foreach ($versions as $version) { $device_phid = $version->getDevicePHID(); if ($version->getRepositoryVersion() == $max_version) { PhabricatorRepositoryWorkingCopyVersion::updateVersion( $repository_phid, $device_phid, 0); } else { PhabricatorRepositoryWorkingCopyVersion::demoteDevice( $repository_phid, $device_phid); } } } return $this; } /** * @task sync */ public function synchronizeWorkingCopyBeforeRead() { if (!$this->shouldEnableSynchronization(true)) { return; } $repository = $this->getRepository(); $repository_phid = $repository->getPHID(); $device = AlmanacKeys::getLiveDevice(); $device_phid = $device->getPHID(); $read_lock = PhabricatorRepositoryWorkingCopyVersion::getReadLock( $repository_phid, $device_phid); $lock_wait = phutil_units('2 minutes in seconds'); $this->logLine( pht( 'Acquiring read lock for repository "%s" on device "%s"...', $repository->getDisplayName(), $device->getName())); try { $start = PhabricatorTime::getNow(); $read_lock->lock($lock_wait); $waited = (PhabricatorTime::getNow() - $start); if ($waited) { $this->logLine( pht( 'Acquired read lock after %s second(s).', new PhutilNumber($waited))); } else { $this->logLine( pht( 'Acquired read lock immediately.')); } } catch (PhutilLockException $ex) { throw new PhutilProxyException( pht( 'Failed to acquire read lock after waiting %s second(s). You '. 'may be able to retry later. (%s)', new PhutilNumber($lock_wait), $ex->getHint()), $ex); } $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( $repository_phid); $versions = mpull($versions, null, 'getDevicePHID'); $this_version = idx($versions, $device_phid); if ($this_version) { $this_version = (int)$this_version->getRepositoryVersion(); } else { $this_version = null; } if ($versions) { // This is the normal case, where we have some version information and // can identify which nodes are leaders. If the current node is not a // leader, we want to fetch from a leader and then update our version. $max_version = (int)max(mpull($versions, 'getRepositoryVersion')); if (($this_version === null) || ($max_version > $this_version)) { if ($repository->isHosted()) { $fetchable = array(); foreach ($versions as $version) { if ($version->getRepositoryVersion() == $max_version) { $fetchable[] = $version->getDevicePHID(); } } $this->synchronizeWorkingCopyFromDevices( $fetchable, $this_version, $max_version); } else { $this->synchronizeWorkingCopyFromRemote(); } PhabricatorRepositoryWorkingCopyVersion::updateVersion( $repository_phid, $device_phid, $max_version); } else { $this->logLine( pht( 'Device "%s" is already a cluster leader and does not need '. 'to be synchronized.', $device->getName())); } $result_version = $max_version; } else { // If no version records exist yet, we need to be careful, because we // can not tell which nodes are leaders. // There might be several nodes with arbitrary existing data, and we have // no way to tell which one has the "right" data. If we pick wrong, we // might erase some or all of the data in the repository. // Since this is dangerous, we refuse to guess unless there is only one // device. If we're the only device in the group, we obviously must be // a leader. $service = $repository->loadAlmanacService(); if (!$service) { throw new Exception(pht('Failed to load repository cluster service.')); } $bindings = $service->getActiveBindings(); $device_map = array(); foreach ($bindings as $binding) { $device_map[$binding->getDevicePHID()] = true; } if (count($device_map) > 1) { throw new Exception( pht( 'Repository "%s" exists on more than one device, but no device '. 'has any repository version information. Phabricator can not '. 'guess which copy of the existing data is authoritative. Promote '. 'a device or see "Ambiguous Leaders" in the documentation.', $repository->getDisplayName())); } if (empty($device_map[$device->getPHID()])) { throw new Exception( pht( 'Repository "%s" is being synchronized on device "%s", but '. 'this device is not bound to the corresponding cluster '. 'service ("%s").', $repository->getDisplayName(), $device->getName(), $service->getName())); } // The current device is the only device in service, so it must be a // leader. We can safely have any future nodes which come online read // from it. PhabricatorRepositoryWorkingCopyVersion::updateVersion( $repository_phid, $device_phid, 0); $result_version = 0; } $read_lock->unlock(); return $result_version; } /** * @task sync */ public function synchronizeWorkingCopyBeforeWrite() { if (!$this->shouldEnableSynchronization(true)) { return; } $repository = $this->getRepository(); $viewer = $this->getViewer(); $repository_phid = $repository->getPHID(); $device = AlmanacKeys::getLiveDevice(); $device_phid = $device->getPHID(); $table = new PhabricatorRepositoryWorkingCopyVersion(); $locked_connection = $table->establishConnection('w'); $write_lock = PhabricatorRepositoryWorkingCopyVersion::getWriteLock( $repository_phid); $write_lock->setExternalConnection($locked_connection); $this->logLine( pht( 'Acquiring write lock for repository "%s"...', $repository->getDisplayName())); // See T13590. On the HTTP pathway, it's possible for us to hit the script // time limit while holding the durable write lock if a user makes a big // push. Remove the time limit before we acquire the durable lock. set_time_limit(0); $lock_wait = phutil_units('2 minutes in seconds'); try { $write_wait_start = microtime(true); $start = PhabricatorTime::getNow(); $step_wait = 1; while (true) { try { $write_lock->lock((int)floor($step_wait)); $write_wait_end = microtime(true); break; } catch (PhutilLockException $ex) { $waited = (PhabricatorTime::getNow() - $start); if ($waited > $lock_wait) { throw $ex; } $this->logActiveWriter($viewer, $repository); } // Wait a little longer before the next message we print. $step_wait = $step_wait + 0.5; $step_wait = min($step_wait, 3); } $waited = (PhabricatorTime::getNow() - $start); if ($waited) { $this->logLine( pht( 'Acquired write lock after %s second(s).', new PhutilNumber($waited))); } else { $this->logLine( pht( 'Acquired write lock immediately.')); } } catch (PhutilLockException $ex) { throw new PhutilProxyException( pht( 'Failed to acquire write lock after waiting %s second(s). You '. 'may be able to retry later. (%s)', new PhutilNumber($lock_wait), $ex->getHint()), $ex); } $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( $repository_phid); foreach ($versions as $version) { if (!$version->getIsWriting()) { continue; } throw new Exception( pht( 'An previous write to this repository was interrupted; refusing '. 'new writes. This issue requires operator intervention to resolve, '. 'see "Write Interruptions" in the "Cluster: Repositories" in the '. 'documentation for instructions.')); } $read_wait_start = microtime(true); try { $max_version = $this->synchronizeWorkingCopyBeforeRead(); } catch (Exception $ex) { $write_lock->unlock(); throw $ex; } $read_wait_end = microtime(true); $pid = getmypid(); $hash = Filesystem::readRandomCharacters(12); $this->clusterWriteOwner = "{$pid}.{$hash}"; PhabricatorRepositoryWorkingCopyVersion::willWrite( $locked_connection, $repository_phid, $device_phid, array( 'userPHID' => $this->getEffectiveActingAsPHID(), 'epoch' => PhabricatorTime::getNow(), 'devicePHID' => $device_phid, ), $this->clusterWriteOwner); $this->clusterWriteVersion = $max_version; $this->clusterWriteLock = $write_lock; $write_wait = ($write_wait_end - $write_wait_start); $read_wait = ($read_wait_end - $read_wait_start); $log = $this->logger; if ($log) { $log->writeClusterEngineLogProperty('writeWait', $write_wait); $log->writeClusterEngineLogProperty('readWait', $read_wait); } } public function synchronizeWorkingCopyAfterDiscovery($new_version) { if (!$this->shouldEnableSynchronization(true)) { return; } $repository = $this->getRepository(); $repository_phid = $repository->getPHID(); if ($repository->isHosted()) { return; } $device = AlmanacKeys::getLiveDevice(); $device_phid = $device->getPHID(); // NOTE: We are not holding a lock here because this method is only called // from PhabricatorRepositoryDiscoveryEngine, which already holds a device // lock. Even if we do race here and record an older version, the // consequences are mild: we only do extra work to correct it later. $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( $repository_phid); $versions = mpull($versions, null, 'getDevicePHID'); $this_version = idx($versions, $device_phid); if ($this_version) { $this_version = (int)$this_version->getRepositoryVersion(); } else { $this_version = null; } if (($this_version === null) || ($new_version > $this_version)) { PhabricatorRepositoryWorkingCopyVersion::updateVersion( $repository_phid, $device_phid, $new_version); } } /** * @task sync */ public function synchronizeWorkingCopyAfterWrite() { if (!$this->shouldEnableSynchronization(true)) { return; } if (!$this->clusterWriteLock) { throw new Exception( pht( 'Trying to synchronize after write, but not holding a write '. 'lock!')); } $repository = $this->getRepository(); $repository_phid = $repository->getPHID(); $device = AlmanacKeys::getLiveDevice(); $device_phid = $device->getPHID(); // It is possible that we've lost the global lock while receiving the push. // For example, the master database may have been restarted between the // time we acquired the global lock and now, when the push has finished. // We wrote a durable lock while we were holding the the global lock, // essentially upgrading our lock. We can still safely release this upgraded // lock even if we're no longer holding the global lock. // If we fail to release the lock, the repository will be frozen until // an operator can figure out what happened, so we try pretty hard to // reconnect to the database and release the lock. $now = PhabricatorTime::getNow(); $duration = phutil_units('5 minutes in seconds'); $try_until = $now + $duration; $did_release = false; $already_failed = false; while (PhabricatorTime::getNow() <= $try_until) { try { // NOTE: This means we're still bumping the version when pushes fail. We // could select only un-rejected events instead to bump a little less // often. $new_log = id(new PhabricatorRepositoryPushEventQuery()) ->setViewer(PhabricatorUser::getOmnipotentUser()) ->withRepositoryPHIDs(array($repository_phid)) ->setLimit(1) ->executeOne(); $old_version = $this->clusterWriteVersion; if ($new_log) { $new_version = $new_log->getID(); } else { $new_version = $old_version; } PhabricatorRepositoryWorkingCopyVersion::didWrite( $repository_phid, $device_phid, $this->clusterWriteVersion, $new_version, $this->clusterWriteOwner); $did_release = true; break; } catch (AphrontConnectionQueryException $ex) { $connection_exception = $ex; } catch (AphrontConnectionLostQueryException $ex) { $connection_exception = $ex; } if (!$already_failed) { $already_failed = true; $this->logLine( pht('CRITICAL. Failed to release cluster write lock!')); $this->logLine( pht( 'The connection to the master database was lost while receiving '. 'the write.')); $this->logLine( pht( 'This process will spend %s more second(s) attempting to '. 'recover, then give up.', new PhutilNumber($duration))); } sleep(1); } if ($did_release) { if ($already_failed) { $this->logLine( pht('RECOVERED. Link to master database was restored.')); } $this->logLine(pht('Released cluster write lock.')); } else { throw new Exception( pht( 'Failed to reconnect to master database and release held write '. 'lock ("%s") on device "%s" for repository "%s" after trying '. 'for %s seconds(s). This repository will be frozen.', $this->clusterWriteOwner, $device->getName(), $this->getDisplayName(), new PhutilNumber($duration))); } // We can continue even if we've lost this lock, everything is still // consistent. try { $this->clusterWriteLock->unlock(); } catch (Exception $ex) { // Ignore. } $this->clusterWriteLock = null; $this->clusterWriteOwner = null; } /* -( Internals )---------------------------------------------------------- */ /** * @task internal */ private function shouldEnableSynchronization($require_device) { $repository = $this->getRepository(); $service_phid = $repository->getAlmanacServicePHID(); if (!$service_phid) { return false; } if (!$repository->supportsSynchronization()) { return false; } if ($require_device) { $device = AlmanacKeys::getLiveDevice(); if (!$device) { return false; } } return true; } /** * @task internal */ private function synchronizeWorkingCopyFromRemote() { $repository = $this->getRepository(); $device = AlmanacKeys::getLiveDevice(); $local_path = $repository->getLocalPath(); $fetch_uri = $repository->getRemoteURIEnvelope(); if ($repository->isGit()) { $this->requireWorkingCopy(); $argv = array( 'fetch --prune -- %P %s', $fetch_uri, '+refs/*:refs/*', ); } else { throw new Exception(pht('Remote sync only supported for git!')); } $future = DiffusionCommandEngine::newCommandEngine($repository) ->setArgv($argv) ->setSudoAsDaemon(true) ->setCredentialPHID($repository->getCredentialPHID()) ->setURI($repository->getRemoteURIObject()) ->newFuture(); $future->setCWD($local_path); try { $future->resolvex(); } catch (Exception $ex) { $this->logLine( pht( 'Synchronization of "%s" from remote failed: %s', $device->getName(), $ex->getMessage())); throw $ex; } } /** * @task internal */ private function synchronizeWorkingCopyFromDevices( array $device_phids, $local_version, $remote_version) { $repository = $this->getRepository(); $service = $repository->loadAlmanacService(); if (!$service) { throw new Exception(pht('Failed to load repository cluster service.')); } $device_map = array_fuse($device_phids); $bindings = $service->getActiveBindings(); $fetchable = array(); foreach ($bindings as $binding) { // We can't fetch from nodes which don't have the newest version. $device_phid = $binding->getDevicePHID(); if (empty($device_map[$device_phid])) { continue; } // TODO: For now, only fetch over SSH. We could support fetching over // HTTP eventually. if ($binding->getAlmanacPropertyValue('protocol') != 'ssh') { continue; } $fetchable[] = $binding; } if (!$fetchable) { throw new Exception( pht( 'Leader lost: no up-to-date nodes in repository cluster are '. 'fetchable.')); } // If we can synchronize from multiple sources, choose one at random. shuffle($fetchable); $caught = null; foreach ($fetchable as $binding) { try { $this->synchronizeWorkingCopyFromBinding( $binding, $local_version, $remote_version); $caught = null; break; } catch (Exception $ex) { $caught = $ex; } } if ($caught) { throw $caught; } } /** * @task internal */ private function synchronizeWorkingCopyFromBinding( AlmanacBinding $binding, $local_version, $remote_version) { $repository = $this->getRepository(); $device = AlmanacKeys::getLiveDevice(); $this->logLine( pht( 'Synchronizing this device ("%s") from cluster leader ("%s").', $device->getName(), $binding->getDevice()->getName())); $fetch_uri = $repository->getClusterRepositoryURIFromBinding($binding); $local_path = $repository->getLocalPath(); if ($repository->isGit()) { $this->requireWorkingCopy(); $argv = array( 'fetch --prune -- %s %s', $fetch_uri, '+refs/*:refs/*', ); } else { throw new Exception(pht('Binding sync only supported for git!')); } $future = DiffusionCommandEngine::newCommandEngine($repository) ->setArgv($argv) ->setConnectAsDevice(true) ->setSudoAsDaemon(true) ->setURI($fetch_uri) ->newFuture(); $future->setCWD($local_path); $log = PhabricatorRepositorySyncEvent::initializeNewEvent() ->setRepositoryPHID($repository->getPHID()) ->setEpoch(PhabricatorTime::getNow()) ->setDevicePHID($device->getPHID()) ->setFromDevicePHID($binding->getDevice()->getPHID()) ->setDeviceVersion($local_version) ->setFromDeviceVersion($remote_version); $sync_start = microtime(true); try { $future->resolvex(); } catch (Exception $ex) { $log->setSyncWait(phutil_microseconds_since($sync_start)); if ($ex instanceof CommandException) { if ($future->getWasKilledByTimeout()) { $result_type = PhabricatorRepositorySyncEvent::RESULT_TIMEOUT; } else { $result_type = PhabricatorRepositorySyncEvent::RESULT_ERROR; } $log ->setResultCode($ex->getError()) ->setResultType($result_type) ->setProperty('stdout', $ex->getStdout()) ->setProperty('stderr', $ex->getStderr()); } else { $log ->setResultCode(1) ->setResultType(PhabricatorRepositorySyncEvent::RESULT_EXCEPTION) ->setProperty('message', $ex->getMessage()); } $log->save(); $this->logLine( pht( 'Synchronization of "%s" from leader "%s" failed: %s', $device->getName(), $binding->getDevice()->getName(), $ex->getMessage())); throw $ex; } $log ->setSyncWait(phutil_microseconds_since($sync_start)) ->setResultCode(0) ->setResultType(PhabricatorRepositorySyncEvent::RESULT_SYNC) ->save(); } /** * @task internal */ private function logLine($message) { return $this->logText("# {$message}\n"); } /** * @task internal */ private function logText($message) { $log = $this->logger; if ($log) { $log->writeClusterEngineLogMessage($message); } return $this; } private function requireWorkingCopy() { $repository = $this->getRepository(); $local_path = $repository->getLocalPath(); if (!Filesystem::pathExists($local_path)) { $device = AlmanacKeys::getLiveDevice(); throw new Exception( pht( 'Repository "%s" does not have a working copy on this device '. 'yet, so it can not be synchronized. Wait for the daemons to '. 'construct one or run `bin/repository update %s` on this host '. '("%s") to build it explicitly.', $repository->getDisplayName(), $repository->getMonogram(), $device->getName())); } } private function logActiveWriter( PhabricatorUser $viewer, PhabricatorRepository $repository) { $writer = PhabricatorRepositoryWorkingCopyVersion::loadWriter( $repository->getPHID()); if (!$writer) { $this->logLine(pht('Waiting on another user to finish writing...')); return; } $user_phid = $writer->getWriteProperty('userPHID'); $device_phid = $writer->getWriteProperty('devicePHID'); $epoch = $writer->getWriteProperty('epoch'); $phids = array($user_phid, $device_phid); $handles = $viewer->loadHandles($phids); $duration = (PhabricatorTime::getNow() - $epoch) + 1; $this->logLine( pht( 'Waiting for %s to finish writing (on device "%s" for %ss)...', $handles[$user_phid]->getName(), $handles[$device_phid]->getName(), new PhutilNumber($duration))); } + public function newMaintenanceEvent() { + $viewer = $this->getViewer(); + $repository = $this->getRepository(); + $now = PhabricatorTime::getNow(); + + $event = PhabricatorRepositoryPushEvent::initializeNewEvent($viewer) + ->setRepositoryPHID($repository->getPHID()) + ->setEpoch($now) + ->setPusherPHID($this->getEffectiveActingAsPHID()) + ->setRejectCode(PhabricatorRepositoryPushLog::REJECT_ACCEPT); + + return $event; + } + + public function newMaintenanceLog() { + $viewer = $this->getViewer(); + $repository = $this->getRepository(); + $now = PhabricatorTime::getNow(); + + $device = AlmanacKeys::getLiveDevice(); + if ($device) { + $device_phid = $device->getPHID(); + } else { + $device_phid = null; + } + + return PhabricatorRepositoryPushLog::initializeNewLog($viewer) + ->setDevicePHID($device_phid) + ->setRepositoryPHID($repository->getPHID()) + ->attachRepository($repository) + ->setEpoch($now) + ->setPusherPHID($this->getEffectiveActingAsPHID()) + ->setChangeFlags(PhabricatorRepositoryPushLog::CHANGEFLAG_MAINTENANCE) + ->setRefType(PhabricatorRepositoryPushLog::REFTYPE_MAINTENANCE) + ->setRefNew(''); + } + } diff --git a/src/applications/repository/storage/PhabricatorRepositoryPushEvent.php b/src/applications/repository/storage/PhabricatorRepositoryPushEvent.php index ac97aa2bcf..c0936e9c6e 100644 --- a/src/applications/repository/storage/PhabricatorRepositoryPushEvent.php +++ b/src/applications/repository/storage/PhabricatorRepositoryPushEvent.php @@ -1,108 +1,123 @@ setPusherPHID($viewer->getPHID()); } protected function getConfiguration() { return array( self::CONFIG_AUX_PHID => true, self::CONFIG_TIMESTAMPS => false, self::CONFIG_COLUMN_SCHEMA => array( 'requestIdentifier' => 'bytes12?', 'remoteAddress' => 'ipaddress?', 'remoteProtocol' => 'text32?', 'rejectCode' => 'uint32', 'rejectDetails' => 'text64?', 'writeWait' => 'uint64?', 'readWait' => 'uint64?', 'hostWait' => 'uint64?', 'hookWait' => 'uint64?', ), self::CONFIG_KEY_SCHEMA => array( 'key_repository' => array( 'columns' => array('repositoryPHID'), ), 'key_identifier' => array( 'columns' => array('requestIdentifier'), ), 'key_reject' => array( 'columns' => array('rejectCode', 'rejectDetails'), ), ), ) + parent::getConfiguration(); } public function generatePHID() { return PhabricatorPHID::generateNewPHID( PhabricatorRepositoryPushEventPHIDType::TYPECONST); } public function attachRepository(PhabricatorRepository $repository) { $this->repository = $repository; return $this; } public function getRepository() { return $this->assertAttached($this->repository); } public function attachLogs(array $logs) { $this->logs = $logs; return $this; } public function getLogs() { return $this->assertAttached($this->logs); } + public function saveWithLogs(array $logs) { + assert_instances_of($logs, 'PhabricatorRepositoryPushLog'); + + $this->openTransaction(); + $this->save(); + foreach ($logs as $log) { + $log->setPushEventPHID($this->getPHID()); + $log->save(); + } + $this->saveTransaction(); + + $this->attachLogs($logs); + + return $this; + } /* -( PhabricatorPolicyInterface )----------------------------------------- */ public function getCapabilities() { return array( PhabricatorPolicyCapability::CAN_VIEW, ); } public function getPolicy($capability) { return $this->getRepository()->getPolicy($capability); } public function hasAutomaticCapability($capability, PhabricatorUser $viewer) { return $this->getRepository()->hasAutomaticCapability($capability, $viewer); } public function describeAutomaticCapability($capability) { return pht( "A repository's push events are visible to users who can see the ". "repository."); } } diff --git a/src/applications/repository/storage/PhabricatorRepositoryPushLog.php b/src/applications/repository/storage/PhabricatorRepositoryPushLog.php index 095c227994..d9892b2dff 100644 --- a/src/applications/repository/storage/PhabricatorRepositoryPushLog.php +++ b/src/applications/repository/storage/PhabricatorRepositoryPushLog.php @@ -1,240 +1,243 @@ setPusherPHID($viewer->getPHID()); } public static function getFlagDisplayNames() { return array( self::CHANGEFLAG_ADD => pht('Create'), self::CHANGEFLAG_DELETE => pht('Delete'), self::CHANGEFLAG_APPEND => pht('Append'), self::CHANGEFLAG_REWRITE => pht('Rewrite'), self::CHANGEFLAG_DANGEROUS => pht('Dangerous'), self::CHANGEFLAG_ENORMOUS => pht('Enormous'), self::CHANGEFLAG_OVERSIZED => pht('Oversized'), self::CHANGEFLAG_TOUCHES => pht('Touches Too Many Paths'), + self::CHANGEFLAG_MAINTENANCE => pht('Maintenance'), ); } public static function getRejectCodeDisplayNames() { return array( self::REJECT_ACCEPT => pht('Accepted'), self::REJECT_DANGEROUS => pht('Rejected: Dangerous'), self::REJECT_HERALD => pht('Rejected: Herald'), self::REJECT_EXTERNAL => pht('Rejected: External Hook'), self::REJECT_BROKEN => pht('Rejected: Broken'), self::REJECT_ENORMOUS => pht('Rejected: Enormous'), self::REJECT_OVERSIZED => pht('Rejected: Oversized File'), self::REJECT_TOUCHES => pht('Rejected: Touches Too Many Paths'), ); } public static function getHeraldChangeFlagConditionOptions() { return array( self::CHANGEFLAG_ADD => pht('change creates ref'), self::CHANGEFLAG_DELETE => pht('change deletes ref'), self::CHANGEFLAG_REWRITE => pht('change rewrites ref'), self::CHANGEFLAG_DANGEROUS => pht('dangerous change'), ); } protected function getConfiguration() { return array( self::CONFIG_AUX_PHID => true, self::CONFIG_TIMESTAMPS => false, self::CONFIG_BINARY => array( 'refNameRaw' => true, ), self::CONFIG_COLUMN_SCHEMA => array( 'refType' => 'text12', 'refNameHash' => 'bytes12?', 'refNameRaw' => 'bytes?', 'refNameEncoding' => 'text16?', 'refOld' => 'text40?', 'refNew' => 'text40', 'mergeBase' => 'text40?', 'changeFlags' => 'uint32', 'devicePHID' => 'phid?', ), self::CONFIG_KEY_SCHEMA => array( 'key_repository' => array( 'columns' => array('repositoryPHID'), ), 'key_ref' => array( 'columns' => array('repositoryPHID', 'refNew'), ), 'key_name' => array( 'columns' => array('repositoryPHID', 'refNameHash'), ), 'key_event' => array( 'columns' => array('pushEventPHID'), ), 'key_pusher' => array( 'columns' => array('pusherPHID'), ), 'key_epoch' => array( 'columns' => array('epoch'), ), ), ) + parent::getConfiguration(); } public function generatePHID() { return PhabricatorPHID::generateNewPHID( PhabricatorRepositoryPushLogPHIDType::TYPECONST); } public function attachPushEvent(PhabricatorRepositoryPushEvent $push_event) { $this->pushEvent = $push_event; return $this; } public function getPushEvent() { return $this->assertAttached($this->pushEvent); } public function getRefName() { return $this->getUTF8StringFromStorage( $this->getRefNameRaw(), $this->getRefNameEncoding()); } public function setRefName($ref_raw) { $this->setRefNameRaw($ref_raw); $this->setRefNameHash(PhabricatorHash::digestForIndex($ref_raw)); $this->setRefNameEncoding($this->detectEncodingForStorage($ref_raw)); return $this; } public function getRefOldShort() { if ($this->getRepository()->isSVN()) { return $this->getRefOld(); } return substr($this->getRefOld(), 0, 12); } public function getRefNewShort() { if ($this->getRepository()->isSVN()) { return $this->getRefNew(); } return substr($this->getRefNew(), 0, 12); } public function hasChangeFlags($mask) { return ($this->changeFlags & $mask); } public function attachDangerousChangeDescription($description) { $this->dangerousChangeDescription = $description; return $this; } public function getDangerousChangeDescription() { return $this->assertAttached($this->dangerousChangeDescription); } public function attachRepository(PhabricatorRepository $repository) { // NOTE: Some gymnastics around this because of object construction order // in the hook engine. Particularly, web build the logs before we build // their push event. $this->repository = $repository; return $this; } public function getRepository() { if ($this->repository == self::ATTACHABLE) { return $this->getPushEvent()->getRepository(); } return $this->assertAttached($this->repository); } /* -( PhabricatorPolicyInterface )----------------------------------------- */ public function getCapabilities() { return array( PhabricatorPolicyCapability::CAN_VIEW, ); } public function getPolicy($capability) { // NOTE: We're passing through the repository rather than the push event // mostly because we need to do policy checks in Herald before we create // the event. The two approaches are equivalent in practice. return $this->getRepository()->getPolicy($capability); } public function hasAutomaticCapability($capability, PhabricatorUser $viewer) { return $this->getRepository()->hasAutomaticCapability($capability, $viewer); } public function describeAutomaticCapability($capability) { return pht( "A repository's push logs are visible to users who can see the ". "repository."); } }