diff --git a/src/phage/agent/PhagePHPAgent.php b/src/phage/agent/PhagePHPAgent.php index 67552ec..39a74e8 100644 --- a/src/phage/agent/PhagePHPAgent.php +++ b/src/phage/agent/PhagePHPAgent.php @@ -1,103 +1,104 @@ stdin = $stdin; } public function execute() { while (true) { if ($this->exec) { $iterator = new FutureIterator($this->exec); $iterator->setUpdateInterval(0.050); foreach ($iterator as $key => $future) { if ($future === null) { break; } $this->resolveFuture($key, $future); break; } } else { PhutilChannel::waitForAny(array($this->getMaster())); } $this->processInput(); } } private function getMaster() { if (!$this->master) { $raw_channel = new PhutilSocketChannel( $this->stdin, fopen('php://stdout', 'w')); $json_channel = new PhutilJSONProtocolChannel($raw_channel); $this->master = $json_channel; } return $this->master; } private function processInput() { $channel = $this->getMaster(); $open = $channel->update(); if (!$open) { throw new Exception(pht('Channel closed!')); } while (true) { $command = $channel->read(); if ($command === null) { break; } $this->processCommand($command); } } private function processCommand(array $spec) { switch ($spec['type']) { case 'EXEC': $key = $spec['key']; $cmd = $spec['command']; $future = new ExecFuture('%C', $cmd); $this->exec[$key] = $future; break; case 'EXIT': $this->terminateAgent(); break; } } private function resolveFuture($key, Future $future) { $result = $future->resolve(); $master = $this->getMaster(); $master->write( array( 'type' => 'RSLV', 'key' => $key, 'err' => $result[0], 'stdout' => $result[1], 'stderr' => $result[2], )); + unset($this->exec[$key]); } public function __destruct() { $this->terminateAgent(); } private function terminateAgent() { foreach ($this->exec as $key => $future) { $future->resolveKill(); } exit(0); } }