diff --git a/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php b/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php index d9cc8063d5..d8d0116017 100644 --- a/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php +++ b/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php @@ -1,131 +1,149 @@ getErrorChannel()->update(); } public function writeClusterEngineLogProperty($key, $value) { $this->engineLogProperties[$key] = $value; } protected function getClusterEngineLogProperty($key, $default = null) { return idx($this->engineLogProperties, $key, $default); } protected function identifyRepository() { $args = $this->getArgs(); $path = head($args->getArg('dir')); return $this->loadRepositoryWithPath( $path, PhabricatorRepositoryType::REPOSITORY_TYPE_GIT); } protected function waitForGitClient() { $io_channel = $this->getIOChannel(); // If we don't wait for the client to close the connection, `git` will // consider it an early abort and fail. Sit around until Git is comfortable // that it really received all the data. while ($io_channel->isOpenForReading()) { $io_channel->update(); $this->getErrorChannel()->flush(); PhutilChannel::waitForAny(array($io_channel)); } } protected function raiseWrongVCSException( PhabricatorRepository $repository) { throw new Exception( pht( 'This repository ("%s") is not a Git repository. Use "%s" to '. 'interact with this repository.', $repository->getDisplayName(), $repository->getVersionControlSystem())); } protected function newPassthruCommand() { return parent::newPassthruCommand() ->setWillWriteCallback(array($this, 'willWriteMessageCallback')) ->setWillReadCallback(array($this, 'willReadMessageCallback')); } protected function newProtocolLog($is_proxy) { if ($is_proxy) { return null; } // While developing, do this to write a full protocol log to disk: // // return new PhabricatorProtocolLog('/tmp/git-protocol.log'); return null; } final protected function getProtocolLog() { return $this->protocolLog; } final protected function setProtocolLog(PhabricatorProtocolLog $log) { $this->protocolLog = $log; } final protected function getWireProtocol() { return $this->wireProtocol; } final protected function setWireProtocol( DiffusionGitWireProtocol $protocol) { $this->wireProtocol = $protocol; return $this; } public function willWriteMessageCallback( PhabricatorSSHPassthruCommand $command, $message) { + $this->ioBytesWritten += strlen($message); + $log = $this->getProtocolLog(); if ($log) { $log->didWriteBytes($message); } $protocol = $this->getWireProtocol(); if ($protocol) { $message = $protocol->willWriteBytes($message); } return $message; } public function willReadMessageCallback( PhabricatorSSHPassthruCommand $command, $message) { $log = $this->getProtocolLog(); if ($log) { $log->didReadBytes($message); } $protocol = $this->getWireProtocol(); if ($protocol) { $message = $protocol->willReadBytes($message); } + // Note that bytes aren't counted until they're emittted by the protocol + // layer. This means the underlying command might emit bytes, but if they + // are buffered by the protocol layer they won't count as read bytes yet. + + $this->ioBytesRead += strlen($message); + return $message; } + final protected function getIOBytesRead() { + return $this->ioBytesRead; + } + + final protected function getIOBytesWritten() { + return $this->ioBytesWritten; + } + } diff --git a/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php b/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php index 7e1f4a4f33..3e8186190a 100644 --- a/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php +++ b/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php @@ -1,109 +1,216 @@ setName('git-upload-pack'); $this->setArguments( array( array( 'name' => 'dir', 'wildcard' => true, ), )); } protected function executeRepositoryOperations() { - $repository = $this->getRepository(); + $is_proxy = $this->shouldProxy(); + if ($is_proxy) { + return $this->executeRepositoryProxyOperations(); + } + $viewer = $this->getSSHUser(); + $repository = $this->getRepository(); $device = AlmanacKeys::getLiveDevice(); $skip_sync = $this->shouldSkipReadSynchronization(); - $is_proxy = $this->shouldProxy(); - if ($is_proxy) { - $command = $this->getProxyCommand(false); + $command = csprintf('git-upload-pack -- %s', $repository->getLocalPath()); + if (!$skip_sync) { + $cluster_engine = id(new DiffusionRepositoryClusterEngine()) + ->setViewer($viewer) + ->setRepository($repository) + ->setLog($this) + ->synchronizeWorkingCopyBeforeRead(); if ($device) { $this->writeClusterEngineLogMessage( pht( - "# Fetch received by \"%s\", forwarding to cluster host.\n", + "# Cleared to fetch on cluster host \"%s\".\n", $device->getName())); } - } else { - $command = csprintf('git-upload-pack -- %s', $repository->getLocalPath()); - if (!$skip_sync) { - $cluster_engine = id(new DiffusionRepositoryClusterEngine()) - ->setViewer($viewer) - ->setRepository($repository) - ->setLog($this) - ->synchronizeWorkingCopyBeforeRead(); - - if ($device) { - $this->writeClusterEngineLogMessage( - pht( - "# Cleared to fetch on cluster host \"%s\".\n", - $device->getName())); - } - } } + $command = PhabricatorDaemon::sudoCommandAsDaemonUser($command); $pull_event = $this->newPullEvent(); $future = id(new ExecFuture('%C', $command)) ->setEnv($this->getEnvironment()); $log = $this->newProtocolLog($is_proxy); if ($log) { $this->setProtocolLog($log); $log->didStartSession($command); } - if (!$is_proxy) { - if (PhabricatorEnv::getEnvConfig('phabricator.show-prototypes')) { - $protocol = new DiffusionGitUploadPackWireProtocol(); - if ($log) { - $protocol->setProtocolLog($log); - } - $this->setWireProtocol($protocol); + if (PhabricatorEnv::getEnvConfig('phabricator.show-prototypes')) { + $protocol = new DiffusionGitUploadPackWireProtocol(); + if ($log) { + $protocol->setProtocolLog($log); } + $this->setWireProtocol($protocol); } $err = $this->newPassthruCommand() ->setIOChannel($this->getIOChannel()) ->setCommandChannelFromExecFuture($future) ->execute(); if ($log) { $log->didEndSession(); } if ($err) { $pull_event ->setResultType(PhabricatorRepositoryPullEvent::RESULT_ERROR) ->setResultCode($err); } else { $pull_event ->setResultType(PhabricatorRepositoryPullEvent::RESULT_PULL) ->setResultCode(0); } - // TODO: Currently, when proxying, we do not write a log on the proxy. - // Perhaps we should write a "proxy log". This is not very useful for - // statistics or auditing, but could be useful for diagnostics. Marking - // the proxy logs as proxied (and recording devicePHID on all logs) would - // make differentiating between these use cases easier. - - if (!$is_proxy) { - $pull_event->save(); - } + $pull_event->save(); if (!$err) { $this->waitForGitClient(); } return $err; } + private function executeRepositoryProxyOperations() { + $device = AlmanacKeys::getLiveDevice(); + $for_write = false; + + $refs = $this->getAlmanacServiceRefs($for_write); + $err = 1; + + while (true) { + $ref = head($refs); + + $command = $this->getProxyCommandForServiceRef($ref); + + if ($device) { + $this->writeClusterEngineLogMessage( + pht( + "# Fetch received by \"%s\", forwarding to cluster host \"%s\".\n", + $device->getName(), + $ref->getDeviceName())); + } + + $command = PhabricatorDaemon::sudoCommandAsDaemonUser($command); + + $future = id(new ExecFuture('%C', $command)) + ->setEnv($this->getEnvironment()); + + $this->didBeginRequest(); + + $err = $this->newPassthruCommand() + ->setIOChannel($this->getIOChannel()) + ->setCommandChannelFromExecFuture($future) + ->execute(); + + $err = 1; + + // TODO: Currently, when proxying, we do not write an event log on the + // proxy. Perhaps we should write a "proxy log". This is not very useful + // for statistics or auditing, but could be useful for diagnostics. + // Marking the proxy logs as proxied (and recording devicePHID on all + // logs) would make differentiating between these use cases easier. + + if (!$err) { + $this->waitForGitClient(); + return $err; + } + + // Throw away this service: the request failed and we're treating the + // failure as persistent, so we don't want to retry another request to + // the same host. + array_shift($refs); + + // Check if we have more services we can try. If we do, we'll make an + // effort to fall back to them below. If not, we can't do anything to + // recover so just bail out. + if (!$refs) { + return $err; + } + + $should_retry = $this->shouldRetryRequest(); + if (!$should_retry) { + return $err; + } + + // If we haven't bailed out yet, we'll retry the request with the next + // service. + } + + throw new Exception(pht('Reached an unreachable place.')); + } + + private function didBeginRequest() { + $this->requestAttempts++; + return $this; + } + + private function shouldRetryRequest() { + $this->requestFailures++; + + if ($this->requestFailures > $this->requestAttempts) { + throw new Exception( + pht( + "Workflow has recorded more failures than attempts; there is a ". + "missing call to \"didBeginRequest()\".\n")); + } + + $max_failures = 3; + if ($this->requestFailures >= $max_failures) { + $this->writeClusterEngineLogMessage( + pht( + "# Reached maximum number of retry attempts, giving up.\n")); + return false; + } + + $read_len = $this->getIOBytesRead(); + if ($read_len) { + $this->writeClusterEngineLogMessage( + pht( + "# Client already read from service (%s bytes), unable to retry.\n", + new PhutilNumber($read_len))); + return false; + } + + $write_len = $this->getIOBytesWritten(); + if ($write_len) { + $this->writeClusterEngineLogMessage( + pht( + "# Client already wrote to service (%s bytes), unable to retry.\n", + new PhutilNumber($write_len))); + return false; + } + + $this->writeClusterEngineLogMessage( + pht( + "# Service request failed, retrying (making attempt %s of %s).\n", + new PhutilNumber($this->requestAttempts + 1), + new PhutilNumber($max_failures))); + + return true; + } + }