Changeset View
Changeset View
Standalone View
Standalone View
src/phage/agent/PhagePHPAgent.php
| <?php | <?php | ||||
| final class PhagePHPAgent extends Phobject { | final class PhagePHPAgent extends Phobject { | ||||
| private $stdin; | private $stdin; | ||||
| private $master; | private $master; | ||||
| private $exec = array(); | private $futurePool; | ||||
| public function __construct($stdin) { | public function __construct($stdin) { | ||||
| $this->stdin = $stdin; | $this->stdin = $stdin; | ||||
| } | } | ||||
| public function execute() { | public function execute() { | ||||
| $future_pool = $this->getFuturePool(); | |||||
| while (true) { | while (true) { | ||||
| if ($this->exec) { | if ($future_pool->hasFutures()) { | ||||
| $iterator = new FutureIterator($this->exec); | while ($future_pool->hasFutures()) { | ||||
| $iterator->setUpdateInterval(0.050); | $future = $future_pool->resolve(); | ||||
| foreach ($iterator as $key => $future) { | |||||
| if ($future === null) { | if ($future === null) { | ||||
| foreach ($this->exec as $read_key => $read_future) { | foreach ($future_pool->getFutures() as $read_future) { | ||||
| $this->readFuture($read_key, $read_future); | $this->readFuture($read_future); | ||||
| } | } | ||||
| break; | break; | ||||
| } else { | |||||
| $this->resolveFuture($key, $future); | |||||
| } | } | ||||
| $this->resolveFuture($future); | |||||
| } | } | ||||
| } else { | } else { | ||||
| PhutilChannel::waitForAny(array($this->getMaster())); | PhutilChannel::waitForAny(array($this->getMaster())); | ||||
| } | } | ||||
| $this->processInput(); | $this->processInput(); | ||||
| } | } | ||||
| } | } | ||||
| private function getFuturePool() { | |||||
| if (!$this->futurePool) { | |||||
| $this->futurePool = $this->newFuturePool(); | |||||
| } | |||||
| return $this->futurePool; | |||||
| } | |||||
| private function newFuturePool() { | |||||
| $future_pool = new FuturePool(); | |||||
| $future_pool->getIteratorTemplate() | |||||
| ->setUpdateInterval(0.050); | |||||
| return $future_pool; | |||||
| } | |||||
| private function getMaster() { | private function getMaster() { | ||||
| if (!$this->master) { | if (!$this->master) { | ||||
| $raw_channel = new PhutilSocketChannel( | $raw_channel = new PhutilSocketChannel( | ||||
| $this->stdin, | $this->stdin, | ||||
| fopen('php://stdout', 'w')); | fopen('php://stdout', 'w')); | ||||
| $json_channel = new PhutilJSONProtocolChannel($raw_channel); | $json_channel = new PhutilJSONProtocolChannel($raw_channel); | ||||
| $this->master = $json_channel; | $this->master = $json_channel; | ||||
| Show All 27 Lines | switch ($spec['type']) { | ||||
| $future = new ExecFuture('%C', $cmd); | $future = new ExecFuture('%C', $cmd); | ||||
| $timeout = $spec['timeout']; | $timeout = $spec['timeout']; | ||||
| if ($timeout) { | if ($timeout) { | ||||
| $future->setTimeout(ceil($timeout)); | $future->setTimeout(ceil($timeout)); | ||||
| } | } | ||||
| $future->isReady(); | $future->setFutureKey($key); | ||||
| $this->exec[$key] = $future; | $this->getFuturePool() | ||||
| ->addFuture($future); | |||||
| break; | break; | ||||
| case 'EXIT': | case 'EXIT': | ||||
| $this->terminateAgent(); | $this->terminateAgent(); | ||||
| break; | break; | ||||
| } | } | ||||
| } | } | ||||
| private function readFuture($key, ExecFuture $future) { | private function readFuture(ExecFuture $future) { | ||||
| $master = $this->getMaster(); | $master = $this->getMaster(); | ||||
| $key = $future->getFutureKey(); | |||||
| list($stdout, $stderr) = $future->read(); | list($stdout, $stderr) = $future->read(); | ||||
| $future->discardBuffers(); | $future->discardBuffers(); | ||||
| if (strlen($stdout)) { | if (strlen($stdout)) { | ||||
| $master->write( | $master->write( | ||||
| array( | array( | ||||
| 'type' => 'TEXT', | 'type' => 'TEXT', | ||||
| Show All 9 Lines | if (strlen($stderr)) { | ||||
| 'type' => 'TEXT', | 'type' => 'TEXT', | ||||
| 'key' => $key, | 'key' => $key, | ||||
| 'kind' => 'stderr', | 'kind' => 'stderr', | ||||
| 'text' => $stderr, | 'text' => $stderr, | ||||
| )); | )); | ||||
| } | } | ||||
| } | } | ||||
| private function resolveFuture($key, ExecFuture $future) { | private function resolveFuture(ExecFuture $future) { | ||||
| $key = $future->getFutureKey(); | |||||
| $result = $future->resolve(); | $result = $future->resolve(); | ||||
| $master = $this->getMaster(); | $master = $this->getMaster(); | ||||
| $master->write( | $master->write( | ||||
| array( | array( | ||||
| 'type' => 'RSLV', | 'type' => 'RSLV', | ||||
| 'key' => $key, | 'key' => $key, | ||||
| 'err' => $result[0], | 'err' => $result[0], | ||||
| 'stdout' => $result[1], | 'stdout' => $result[1], | ||||
| 'stderr' => $result[2], | 'stderr' => $result[2], | ||||
| 'timeout' => (bool)$future->getWasKilledByTimeout(), | 'timeout' => (bool)$future->getWasKilledByTimeout(), | ||||
| )); | )); | ||||
| unset($this->exec[$key]); | |||||
| } | } | ||||
| public function __destruct() { | public function __destruct() { | ||||
| $this->terminateAgent(); | $this->terminateAgent(); | ||||
| } | } | ||||
| private function terminateAgent() { | private function terminateAgent() { | ||||
| foreach ($this->exec as $key => $future) { | $pool = $this->getFuturePool(); | ||||
| foreach ($pool->getFutures() as $future) { | |||||
| $future->resolveKill(); | $future->resolveKill(); | ||||
| } | } | ||||
| exit(0); | exit(0); | ||||
| } | } | ||||
| } | } | ||||