Changeset View
Changeset View
Standalone View
Standalone View
src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php
<?php | <?php | ||||
final class DiffusionGitUploadPackSSHWorkflow extends DiffusionGitSSHWorkflow { | final class DiffusionGitUploadPackSSHWorkflow | ||||
extends DiffusionGitSSHWorkflow { | |||||
private $requestAttempts = 0; | |||||
private $requestFailures = 0; | |||||
protected function didConstruct() { | protected function didConstruct() { | ||||
$this->setName('git-upload-pack'); | $this->setName('git-upload-pack'); | ||||
$this->setArguments( | $this->setArguments( | ||||
array( | array( | ||||
array( | array( | ||||
'name' => 'dir', | 'name' => 'dir', | ||||
'wildcard' => true, | 'wildcard' => true, | ||||
), | ), | ||||
)); | )); | ||||
} | } | ||||
protected function executeRepositoryOperations() { | protected function executeRepositoryOperations() { | ||||
$repository = $this->getRepository(); | $is_proxy = $this->shouldProxy(); | ||||
if ($is_proxy) { | |||||
return $this->executeRepositoryProxyOperations(); | |||||
} | |||||
$viewer = $this->getSSHUser(); | $viewer = $this->getSSHUser(); | ||||
$repository = $this->getRepository(); | |||||
$device = AlmanacKeys::getLiveDevice(); | $device = AlmanacKeys::getLiveDevice(); | ||||
$skip_sync = $this->shouldSkipReadSynchronization(); | $skip_sync = $this->shouldSkipReadSynchronization(); | ||||
$is_proxy = $this->shouldProxy(); | |||||
if ($is_proxy) { | |||||
$command = $this->getProxyCommand(false); | |||||
if ($device) { | |||||
$this->writeClusterEngineLogMessage( | |||||
pht( | |||||
"# Fetch received by \"%s\", forwarding to cluster host.\n", | |||||
$device->getName())); | |||||
} | |||||
} else { | |||||
$command = csprintf('git-upload-pack -- %s', $repository->getLocalPath()); | $command = csprintf('git-upload-pack -- %s', $repository->getLocalPath()); | ||||
if (!$skip_sync) { | if (!$skip_sync) { | ||||
$cluster_engine = id(new DiffusionRepositoryClusterEngine()) | $cluster_engine = id(new DiffusionRepositoryClusterEngine()) | ||||
->setViewer($viewer) | ->setViewer($viewer) | ||||
->setRepository($repository) | ->setRepository($repository) | ||||
->setLog($this) | ->setLog($this) | ||||
->synchronizeWorkingCopyBeforeRead(); | ->synchronizeWorkingCopyBeforeRead(); | ||||
if ($device) { | if ($device) { | ||||
$this->writeClusterEngineLogMessage( | $this->writeClusterEngineLogMessage( | ||||
pht( | pht( | ||||
"# Cleared to fetch on cluster host \"%s\".\n", | "# Cleared to fetch on cluster host \"%s\".\n", | ||||
$device->getName())); | $device->getName())); | ||||
} | } | ||||
} | } | ||||
} | |||||
$command = PhabricatorDaemon::sudoCommandAsDaemonUser($command); | $command = PhabricatorDaemon::sudoCommandAsDaemonUser($command); | ||||
$pull_event = $this->newPullEvent(); | $pull_event = $this->newPullEvent(); | ||||
$future = id(new ExecFuture('%C', $command)) | $future = id(new ExecFuture('%C', $command)) | ||||
->setEnv($this->getEnvironment()); | ->setEnv($this->getEnvironment()); | ||||
$log = $this->newProtocolLog($is_proxy); | $log = $this->newProtocolLog($is_proxy); | ||||
if ($log) { | if ($log) { | ||||
$this->setProtocolLog($log); | $this->setProtocolLog($log); | ||||
$log->didStartSession($command); | $log->didStartSession($command); | ||||
} | } | ||||
if (!$is_proxy) { | |||||
if (PhabricatorEnv::getEnvConfig('phabricator.show-prototypes')) { | if (PhabricatorEnv::getEnvConfig('phabricator.show-prototypes')) { | ||||
$protocol = new DiffusionGitUploadPackWireProtocol(); | $protocol = new DiffusionGitUploadPackWireProtocol(); | ||||
if ($log) { | if ($log) { | ||||
$protocol->setProtocolLog($log); | $protocol->setProtocolLog($log); | ||||
} | } | ||||
$this->setWireProtocol($protocol); | $this->setWireProtocol($protocol); | ||||
} | } | ||||
} | |||||
$err = $this->newPassthruCommand() | $err = $this->newPassthruCommand() | ||||
->setIOChannel($this->getIOChannel()) | ->setIOChannel($this->getIOChannel()) | ||||
->setCommandChannelFromExecFuture($future) | ->setCommandChannelFromExecFuture($future) | ||||
->execute(); | ->execute(); | ||||
if ($log) { | if ($log) { | ||||
$log->didEndSession(); | $log->didEndSession(); | ||||
} | } | ||||
if ($err) { | if ($err) { | ||||
$pull_event | $pull_event | ||||
->setResultType(PhabricatorRepositoryPullEvent::RESULT_ERROR) | ->setResultType(PhabricatorRepositoryPullEvent::RESULT_ERROR) | ||||
->setResultCode($err); | ->setResultCode($err); | ||||
} else { | } else { | ||||
$pull_event | $pull_event | ||||
->setResultType(PhabricatorRepositoryPullEvent::RESULT_PULL) | ->setResultType(PhabricatorRepositoryPullEvent::RESULT_PULL) | ||||
->setResultCode(0); | ->setResultCode(0); | ||||
} | } | ||||
// TODO: Currently, when proxying, we do not write a log on the proxy. | |||||
// Perhaps we should write a "proxy log". This is not very useful for | |||||
// statistics or auditing, but could be useful for diagnostics. Marking | |||||
// the proxy logs as proxied (and recording devicePHID on all logs) would | |||||
// make differentiating between these use cases easier. | |||||
if (!$is_proxy) { | |||||
$pull_event->save(); | $pull_event->save(); | ||||
if (!$err) { | |||||
$this->waitForGitClient(); | |||||
} | |||||
return $err; | |||||
} | |||||
private function executeRepositoryProxyOperations() { | |||||
$device = AlmanacKeys::getLiveDevice(); | |||||
$for_write = false; | |||||
$refs = $this->getAlmanacServiceRefs($for_write); | |||||
$err = 1; | |||||
while (true) { | |||||
$ref = head($refs); | |||||
$command = $this->getProxyCommandForServiceRef($ref); | |||||
if ($device) { | |||||
$this->writeClusterEngineLogMessage( | |||||
pht( | |||||
"# Fetch received by \"%s\", forwarding to cluster host \"%s\".\n", | |||||
$device->getName(), | |||||
$ref->getDeviceName())); | |||||
} | } | ||||
$command = PhabricatorDaemon::sudoCommandAsDaemonUser($command); | |||||
$future = id(new ExecFuture('%C', $command)) | |||||
->setEnv($this->getEnvironment()); | |||||
$this->didBeginRequest(); | |||||
$err = $this->newPassthruCommand() | |||||
->setIOChannel($this->getIOChannel()) | |||||
->setCommandChannelFromExecFuture($future) | |||||
->execute(); | |||||
$err = 1; | |||||
// TODO: Currently, when proxying, we do not write an event log on the | |||||
// proxy. Perhaps we should write a "proxy log". This is not very useful | |||||
// for statistics or auditing, but could be useful for diagnostics. | |||||
// Marking the proxy logs as proxied (and recording devicePHID on all | |||||
// logs) would make differentiating between these use cases easier. | |||||
if (!$err) { | if (!$err) { | ||||
$this->waitForGitClient(); | $this->waitForGitClient(); | ||||
return $err; | |||||
} | } | ||||
// Throw away this service: the request failed and we're treating the | |||||
// failure as persistent, so we don't want to retry another request to | |||||
// the same host. | |||||
array_shift($refs); | |||||
// Check if we have more services we can try. If we do, we'll make an | |||||
// effort to fall back to them below. If not, we can't do anything to | |||||
// recover so just bail out. | |||||
if (!$refs) { | |||||
return $err; | return $err; | ||||
} | } | ||||
$should_retry = $this->shouldRetryRequest(); | |||||
if (!$should_retry) { | |||||
return $err; | |||||
} | |||||
// If we haven't bailed out yet, we'll retry the request with the next | |||||
// service. | |||||
} | |||||
throw new Exception(pht('Reached an unreachable place.')); | |||||
} | |||||
private function didBeginRequest() { | |||||
$this->requestAttempts++; | |||||
return $this; | |||||
} | |||||
private function shouldRetryRequest() { | |||||
$this->requestFailures++; | |||||
if ($this->requestFailures > $this->requestAttempts) { | |||||
throw new Exception( | |||||
pht( | |||||
"Workflow has recorded more failures than attempts; there is a ". | |||||
"missing call to \"didBeginRequest()\".\n")); | |||||
} | |||||
$max_failures = 3; | |||||
if ($this->requestFailures >= $max_failures) { | |||||
$this->writeClusterEngineLogMessage( | |||||
pht( | |||||
"# Reached maximum number of retry attempts, giving up.\n")); | |||||
return false; | |||||
} | |||||
$read_len = $this->getIOBytesRead(); | |||||
if ($read_len) { | |||||
$this->writeClusterEngineLogMessage( | |||||
pht( | |||||
"# Client already read from service (%s bytes), unable to retry.\n", | |||||
new PhutilNumber($read_len))); | |||||
return false; | |||||
} | |||||
$write_len = $this->getIOBytesWritten(); | |||||
if ($write_len) { | |||||
$this->writeClusterEngineLogMessage( | |||||
pht( | |||||
"# Client already wrote to service (%s bytes), unable to retry.\n", | |||||
new PhutilNumber($write_len))); | |||||
return false; | |||||
} | |||||
$this->writeClusterEngineLogMessage( | |||||
pht( | |||||
"# Service request failed, retrying (making attempt %s of %s).\n", | |||||
new PhutilNumber($this->requestAttempts + 1), | |||||
new PhutilNumber($max_failures))); | |||||
return true; | |||||
} | |||||
} | } |