Changeset View
Changeset View
Standalone View
Standalone View
src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php
<?php | <?php | ||||
abstract class DiffusionGitSSHWorkflow | abstract class DiffusionGitSSHWorkflow | ||||
extends DiffusionSSHWorkflow | extends DiffusionSSHWorkflow | ||||
implements DiffusionRepositoryClusterEngineLogInterface { | implements DiffusionRepositoryClusterEngineLogInterface { | ||||
private $engineLogProperties = array(); | private $engineLogProperties = array(); | ||||
private $protocolLog; | private $protocolLog; | ||||
private $wireProtocol; | private $wireProtocol; | ||||
private $ioBytesRead = 0; | private $ioBytesRead = 0; | ||||
private $ioBytesWritten = 0; | private $ioBytesWritten = 0; | ||||
private $requestAttempts = 0; | |||||
private $requestFailures = 0; | |||||
protected function writeError($message) { | protected function writeError($message) { | ||||
// Git assumes we'll add our own newlines. | // Git assumes we'll add our own newlines. | ||||
return parent::writeError($message."\n"); | return parent::writeError($message."\n"); | ||||
} | } | ||||
public function writeClusterEngineLogMessage($message) { | public function writeClusterEngineLogMessage($message) { | ||||
parent::writeError($message); | parent::writeError($message); | ||||
▲ Show 20 Lines • Show All 120 Lines • ▼ Show 20 Lines | abstract class DiffusionGitSSHWorkflow | ||||
final protected function getIOBytesRead() { | final protected function getIOBytesRead() { | ||||
return $this->ioBytesRead; | return $this->ioBytesRead; | ||||
} | } | ||||
final protected function getIOBytesWritten() { | final protected function getIOBytesWritten() { | ||||
return $this->ioBytesWritten; | return $this->ioBytesWritten; | ||||
} | } | ||||
final protected function executeRepositoryProxyOperations($for_write) { | |||||
$device = AlmanacKeys::getLiveDevice(); | |||||
$refs = $this->getAlmanacServiceRefs($for_write); | |||||
$err = 1; | |||||
while (true) { | |||||
$ref = head($refs); | |||||
$command = $this->getProxyCommandForServiceRef($ref); | |||||
if ($device) { | |||||
$this->writeClusterEngineLogMessage( | |||||
pht( | |||||
"# Request received by \"%s\", forwarding to cluster ". | |||||
"host \"%s\".\n", | |||||
$device->getName(), | |||||
$ref->getDeviceName())); | |||||
} | |||||
$command = PhabricatorDaemon::sudoCommandAsDaemonUser($command); | |||||
$future = id(new ExecFuture('%C', $command)) | |||||
->setEnv($this->getEnvironment()); | |||||
$this->didBeginRequest(); | |||||
$err = $this->newPassthruCommand() | |||||
->setIOChannel($this->getIOChannel()) | |||||
->setCommandChannelFromExecFuture($future) | |||||
->execute(); | |||||
// TODO: Currently, when proxying, we do not write an event log on the | |||||
// proxy. Perhaps we should write a "proxy log". This is not very useful | |||||
// for statistics or auditing, but could be useful for diagnostics. | |||||
// Marking the proxy logs as proxied (and recording devicePHID on all | |||||
// logs) would make differentiating between these use cases easier. | |||||
if (!$err) { | |||||
$this->waitForGitClient(); | |||||
return $err; | |||||
} | |||||
// Throw away this service: the request failed and we're treating the | |||||
// failure as persistent, so we don't want to retry another request to | |||||
// the same host. | |||||
array_shift($refs); | |||||
$should_retry = $this->shouldRetryRequest($refs); | |||||
if (!$should_retry) { | |||||
return $err; | |||||
} | |||||
// If we haven't bailed out yet, we'll retry the request with the next | |||||
// service. | |||||
} | |||||
throw new Exception(pht('Reached an unreachable place.')); | |||||
} | |||||
private function didBeginRequest() { | |||||
$this->requestAttempts++; | |||||
return $this; | |||||
} | |||||
private function shouldRetryRequest(array $remaining_refs) { | |||||
$this->requestFailures++; | |||||
if ($this->requestFailures > $this->requestAttempts) { | |||||
throw new Exception( | |||||
pht( | |||||
"Workflow has recorded more failures than attempts; there is a ". | |||||
"missing call to \"didBeginRequest()\".\n")); | |||||
} | |||||
if (!$remaining_refs) { | |||||
$this->writeClusterEngineLogMessage( | |||||
pht( | |||||
"# All available services failed to serve the request, ". | |||||
"giving up.\n")); | |||||
return false; | |||||
} | |||||
$read_len = $this->getIOBytesRead(); | |||||
if ($read_len) { | |||||
$this->writeClusterEngineLogMessage( | |||||
pht( | |||||
"# Client already read from service (%s bytes), unable to retry.\n", | |||||
new PhutilNumber($read_len))); | |||||
return false; | |||||
} | |||||
$write_len = $this->getIOBytesWritten(); | |||||
if ($write_len) { | |||||
$this->writeClusterEngineLogMessage( | |||||
pht( | |||||
"# Client already wrote to service (%s bytes), unable to retry.\n", | |||||
new PhutilNumber($write_len))); | |||||
return false; | |||||
} | |||||
$this->writeClusterEngineLogMessage( | |||||
pht( | |||||
"# Service request failed, retrying (making attempt %s of %s).\n", | |||||
new PhutilNumber($this->requestAttempts + 1), | |||||
new PhutilNumber($this->requestAttempts + count($remaining_refs)))); | |||||
return true; | |||||
} | |||||
} | } |