diff --git a/src/applications/diffusion/ssh/DiffusionGitReceivePackSSHWorkflow.php b/src/applications/diffusion/ssh/DiffusionGitReceivePackSSHWorkflow.php --- a/src/applications/diffusion/ssh/DiffusionGitReceivePackSSHWorkflow.php +++ b/src/applications/diffusion/ssh/DiffusionGitReceivePackSSHWorkflow.php @@ -14,42 +14,33 @@ } protected function executeRepositoryOperations() { + // This is a write, and must have write access. + $this->requireWriteAccess(); + + $is_proxy = $this->shouldProxy(); + if ($is_proxy) { + return $this->executeRepositoryProxyOperations($for_write = true); + } + $host_wait_start = microtime(true); $repository = $this->getRepository(); $viewer = $this->getSSHUser(); $device = AlmanacKeys::getLiveDevice(); - // This is a write, and must have write access. - $this->requireWriteAccess(); - $cluster_engine = id(new DiffusionRepositoryClusterEngine()) ->setViewer($viewer) ->setRepository($repository) ->setLog($this); - $is_proxy = $this->shouldProxy(); - if ($is_proxy) { - $command = $this->getProxyCommand(true); - $did_write = false; - - if ($device) { - $this->writeClusterEngineLogMessage( - pht( - "# Push received by \"%s\", forwarding to cluster host.\n", - $device->getName())); - } - } else { - $command = csprintf('git-receive-pack %s', $repository->getLocalPath()); - $did_write = true; - $cluster_engine->synchronizeWorkingCopyBeforeWrite(); - - if ($device) { - $this->writeClusterEngineLogMessage( - pht( - "# Ready to receive on cluster host \"%s\".\n", - $device->getName())); - } + $command = csprintf('git-receive-pack %s', $repository->getLocalPath()); + $cluster_engine->synchronizeWorkingCopyBeforeWrite(); + + if ($device) { + $this->writeClusterEngineLogMessage( + pht( + "# Ready to receive on cluster host \"%s\".\n", + $device->getName())); } $log = $this->newProtocolLog($is_proxy); @@ -71,9 +62,7 @@ // We've committed the write (or rejected it), so we can release the lock // without waiting for the client to receive the acknowledgement. - if ($did_write) { - $cluster_engine->synchronizeWorkingCopyAfterWrite(); - } + $cluster_engine->synchronizeWorkingCopyAfterWrite(); if ($caught) { throw $caught; @@ -85,18 +74,16 @@ // When a repository is clustered, we reach this cleanup code on both // the proxy and the actual final endpoint node. Don't do more cleanup // or logging than we need to. - if ($did_write) { - $repository->writeStatusMessage( - PhabricatorRepositoryStatusMessage::TYPE_NEEDS_UPDATE, - PhabricatorRepositoryStatusMessage::CODE_OKAY); - - $host_wait_end = microtime(true); - - $this->updatePushLogWithTimingInformation( - $this->getClusterEngineLogProperty('writeWait'), - $this->getClusterEngineLogProperty('readWait'), - ($host_wait_end - $host_wait_start)); - } + $repository->writeStatusMessage( + PhabricatorRepositoryStatusMessage::TYPE_NEEDS_UPDATE, + PhabricatorRepositoryStatusMessage::CODE_OKAY); + + $host_wait_end = microtime(true); + + $this->updatePushLogWithTimingInformation( + $this->getClusterEngineLogProperty('writeWait'), + $this->getClusterEngineLogProperty('readWait'), + ($host_wait_end - $host_wait_start)); } return $err; diff --git a/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php b/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php --- a/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php +++ b/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php @@ -10,6 +10,8 @@ private $wireProtocol; private $ioBytesRead = 0; private $ioBytesWritten = 0; + private $requestAttempts = 0; + private $requestFailures = 0; protected function writeError($message) { // Git assumes we'll add our own newlines. @@ -146,4 +148,114 @@ return $this->ioBytesWritten; } + final protected function executeRepositoryProxyOperations($for_write) { + $device = AlmanacKeys::getLiveDevice(); + + $refs = $this->getAlmanacServiceRefs($for_write); + $err = 1; + + while (true) { + $ref = head($refs); + + $command = $this->getProxyCommandForServiceRef($ref); + + if ($device) { + $this->writeClusterEngineLogMessage( + pht( + "# Request received by \"%s\", forwarding to cluster ". + "host \"%s\".\n", + $device->getName(), + $ref->getDeviceName())); + } + + $command = PhabricatorDaemon::sudoCommandAsDaemonUser($command); + + $future = id(new ExecFuture('%C', $command)) + ->setEnv($this->getEnvironment()); + + $this->didBeginRequest(); + + $err = $this->newPassthruCommand() + ->setIOChannel($this->getIOChannel()) + ->setCommandChannelFromExecFuture($future) + ->execute(); + + // TODO: Currently, when proxying, we do not write an event log on the + // proxy. Perhaps we should write a "proxy log". This is not very useful + // for statistics or auditing, but could be useful for diagnostics. + // Marking the proxy logs as proxied (and recording devicePHID on all + // logs) would make differentiating between these use cases easier. + + if (!$err) { + $this->waitForGitClient(); + return $err; + } + + // Throw away this service: the request failed and we're treating the + // failure as persistent, so we don't want to retry another request to + // the same host. + array_shift($refs); + + $should_retry = $this->shouldRetryRequest($refs); + if (!$should_retry) { + return $err; + } + + // If we haven't bailed out yet, we'll retry the request with the next + // service. + } + + throw new Exception(pht('Reached an unreachable place.')); + } + + private function didBeginRequest() { + $this->requestAttempts++; + return $this; + } + + private function shouldRetryRequest(array $remaining_refs) { + $this->requestFailures++; + + if ($this->requestFailures > $this->requestAttempts) { + throw new Exception( + pht( + "Workflow has recorded more failures than attempts; there is a ". + "missing call to \"didBeginRequest()\".\n")); + } + + if (!$remaining_refs) { + $this->writeClusterEngineLogMessage( + pht( + "# All available services failed to serve the request, ". + "giving up.\n")); + return false; + } + + $read_len = $this->getIOBytesRead(); + if ($read_len) { + $this->writeClusterEngineLogMessage( + pht( + "# Client already read from service (%s bytes), unable to retry.\n", + new PhutilNumber($read_len))); + return false; + } + + $write_len = $this->getIOBytesWritten(); + if ($write_len) { + $this->writeClusterEngineLogMessage( + pht( + "# Client already wrote to service (%s bytes), unable to retry.\n", + new PhutilNumber($write_len))); + return false; + } + + $this->writeClusterEngineLogMessage( + pht( + "# Service request failed, retrying (making attempt %s of %s).\n", + new PhutilNumber($this->requestAttempts + 1), + new PhutilNumber($this->requestAttempts + count($remaining_refs)))); + + return true; + } + } diff --git a/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php b/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php --- a/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php +++ b/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php @@ -3,9 +3,6 @@ final class DiffusionGitUploadPackSSHWorkflow extends DiffusionGitSSHWorkflow { - private $requestAttempts = 0; - private $requestFailures = 0; - protected function didConstruct() { $this->setName('git-upload-pack'); $this->setArguments( @@ -20,7 +17,7 @@ protected function executeRepositoryOperations() { $is_proxy = $this->shouldProxy(); if ($is_proxy) { - return $this->executeRepositoryProxyOperations(); + return $this->executeRepositoryProxyOperations($for_write = false); } $viewer = $this->getSSHUser(); @@ -94,114 +91,4 @@ return $err; } - private function executeRepositoryProxyOperations() { - $device = AlmanacKeys::getLiveDevice(); - $for_write = false; - - $refs = $this->getAlmanacServiceRefs($for_write); - $err = 1; - - while (true) { - $ref = head($refs); - - $command = $this->getProxyCommandForServiceRef($ref); - - if ($device) { - $this->writeClusterEngineLogMessage( - pht( - "# Fetch received by \"%s\", forwarding to cluster host \"%s\".\n", - $device->getName(), - $ref->getDeviceName())); - } - - $command = PhabricatorDaemon::sudoCommandAsDaemonUser($command); - - $future = id(new ExecFuture('%C', $command)) - ->setEnv($this->getEnvironment()); - - $this->didBeginRequest(); - - $err = $this->newPassthruCommand() - ->setIOChannel($this->getIOChannel()) - ->setCommandChannelFromExecFuture($future) - ->execute(); - - // TODO: Currently, when proxying, we do not write an event log on the - // proxy. Perhaps we should write a "proxy log". This is not very useful - // for statistics or auditing, but could be useful for diagnostics. - // Marking the proxy logs as proxied (and recording devicePHID on all - // logs) would make differentiating between these use cases easier. - - if (!$err) { - $this->waitForGitClient(); - return $err; - } - - // Throw away this service: the request failed and we're treating the - // failure as persistent, so we don't want to retry another request to - // the same host. - array_shift($refs); - - $should_retry = $this->shouldRetryRequest($refs); - if (!$should_retry) { - return $err; - } - - // If we haven't bailed out yet, we'll retry the request with the next - // service. - } - - throw new Exception(pht('Reached an unreachable place.')); - } - - private function didBeginRequest() { - $this->requestAttempts++; - return $this; - } - - private function shouldRetryRequest(array $remaining_refs) { - $this->requestFailures++; - - if ($this->requestFailures > $this->requestAttempts) { - throw new Exception( - pht( - "Workflow has recorded more failures than attempts; there is a ". - "missing call to \"didBeginRequest()\".\n")); - } - - if (!$remaining_refs) { - $this->writeClusterEngineLogMessage( - pht( - "# All available services failed to serve the request, ". - "giving up.\n")); - return false; - } - - $read_len = $this->getIOBytesRead(); - if ($read_len) { - $this->writeClusterEngineLogMessage( - pht( - "# Client already read from service (%s bytes), unable to retry.\n", - new PhutilNumber($read_len))); - return false; - } - - $write_len = $this->getIOBytesWritten(); - if ($write_len) { - $this->writeClusterEngineLogMessage( - pht( - "# Client already wrote to service (%s bytes), unable to retry.\n", - new PhutilNumber($write_len))); - return false; - } - - $this->writeClusterEngineLogMessage( - pht( - "# Service request failed, retrying (making attempt %s of %s).\n", - new PhutilNumber($this->requestAttempts + 1), - new PhutilNumber($this->requestAttempts + count($remaining_refs)))); - - return true; - } - }