Changeset View
Changeset View
Standalone View
Standalone View
src/channel/PhutilChannel.php
- This file was added.
<?php | |||||
/** | |||||
* Wrapper around streams, pipes, and other things that have basic read/write | |||||
* I/O characteristics. | |||||
* | |||||
* Channels include buffering, so you can do fire-and-forget writes and reads | |||||
* without worrying about network/pipe buffers. Use @{method:read} and | |||||
* @{method:write} to read and write. | |||||
* | |||||
* Channels are nonblocking and provide a select()-oriented interface so you | |||||
* can reasonably write server-like and daemon-like things with them. Use | |||||
* @{method:waitForAny} to select channels. | |||||
* | |||||
* Channel operations other than @{method:update} generally operate on buffers. | |||||
* Writes and reads affect buffers, while @{method:update} flushes output | |||||
* buffers and fills input buffers. | |||||
* | |||||
* Channels are either "open" or "closed". You can detect that a channel has | |||||
* closed by calling @{method:isOpen} or examining the return value of | |||||
* @{method:update}. | |||||
* | |||||
* NOTE: Channels are new (as of June 2012) and subject to interface changes. | |||||
* | |||||
* @task io Reading and Writing | |||||
* @task wait Waiting for Activity | |||||
* @task update Responding to Activity | |||||
* @task impl Channel Implementation | |||||
*/ | |||||
abstract class PhutilChannel extends Phobject { | |||||
private $ibuf = ''; | |||||
private $obuf; | |||||
private $name; | |||||
private $readBufferSize; | |||||
public function __construct() { | |||||
$this->obuf = new PhutilRope(); | |||||
} | |||||
/* -( Reading and Writing )------------------------------------------------ */ | |||||
/** | |||||
* Read from the channel. A channel defines the format of data that is read | |||||
* from it, so this method may return strings, objects, or anything else. | |||||
* | |||||
* The default implementation returns bytes. | |||||
* | |||||
* @return wild Data from the channel, normally bytes. | |||||
* | |||||
* @task io | |||||
*/ | |||||
public function read() { | |||||
$result = $this->ibuf; | |||||
$this->ibuf = ''; | |||||
return $result; | |||||
} | |||||
/** | |||||
* Write to the channel. A channel defines what data format it accepts, | |||||
* so this method may take strings, objects, or anything else. | |||||
* | |||||
* The default implementation accepts bytes. | |||||
* | |||||
* @param wild Data to write to the channel, normally bytes. | |||||
* @return this | |||||
* | |||||
* @task io | |||||
*/ | |||||
public function write($bytes) { | |||||
if (!is_scalar($bytes)) { | |||||
throw new Exception( | |||||
pht( | |||||
'%s may only write strings!', | |||||
__METHOD__.'()')); | |||||
} | |||||
$this->obuf->append($bytes); | |||||
return $this; | |||||
} | |||||
/* -( Waiting for Activity )----------------------------------------------- */ | |||||
/** | |||||
* Wait for any activity on a list of channels. Convenience wrapper around | |||||
* @{method:waitForActivity}. | |||||
* | |||||
* @param list<PhutilChannel> A list of channels to wait for. | |||||
* @param dict Options, see above. | |||||
* @return void | |||||
* | |||||
* @task wait | |||||
*/ | |||||
public static function waitForAny(array $channels, array $options = array()) { | |||||
return self::waitForActivity($channels, $channels, $options); | |||||
} | |||||
/** | |||||
* Wait (using select()) for channels to become ready for reads or writes. | |||||
* This method blocks until some channel is ready to be updated. | |||||
* | |||||
* It does not provide a way to determine which channels are ready to be | |||||
* updated. The expectation is that you'll just update every channel. This | |||||
* might change eventually. | |||||
* | |||||
* Available options are: | |||||
* | |||||
* - 'read' (list<stream>) Additional streams to select for read. | |||||
* - 'write' (list<stream>) Additional streams to select for write. | |||||
* - 'except' (list<stream>) Additional streams to select for except. | |||||
* - 'timeout' (float) Select timeout, defaults to 1. | |||||
* | |||||
* NOTE: Extra streams must be //streams//, not //sockets//, because this | |||||
* method uses `stream_select()`, not `socket_select()`. | |||||
* | |||||
* @param list<PhutilChannel> List of channels to wait for reads on. | |||||
* @param list<PhutilChannel> List of channels to wait for writes on. | |||||
* @return void | |||||
* | |||||
* @task wait | |||||
*/ | |||||
public static function waitForActivity( | |||||
array $reads, | |||||
array $writes, | |||||
array $options = array()) { | |||||
assert_instances_of($reads, __CLASS__); | |||||
assert_instances_of($writes, __CLASS__); | |||||
$read = idx($options, 'read', array()); | |||||
$write = idx($options, 'write', array()); | |||||
$except = idx($options, 'except', array()); | |||||
$wait = idx($options, 'timeout', 1); | |||||
// TODO: It would be nice to just be able to categorically reject these as | |||||
// unselectable. | |||||
foreach (array($reads, $writes) as $channels) { | |||||
foreach ($channels as $channel) { | |||||
$r_sockets = $channel->getReadSockets(); | |||||
$w_sockets = $channel->getWriteSockets(); | |||||
// If any channel has no read sockets and no write sockets, assume it | |||||
// isn't selectable and return immediately (effectively degrading to a | |||||
// busy wait). | |||||
if (!$r_sockets && !$w_sockets) { | |||||
return false; | |||||
} | |||||
} | |||||
} | |||||
foreach ($reads as $channel) { | |||||
// If any of the read channels have data in read buffers, return | |||||
// immediately. If we don't, we risk running select() on a bunch of | |||||
// sockets which won't become readable because the data the application | |||||
// expects is already in a read buffer. | |||||
if (!$channel->isReadBufferEmpty()) { | |||||
return; | |||||
} | |||||
$r_sockets = $channel->getReadSockets(); | |||||
foreach ($r_sockets as $socket) { | |||||
$read[] = $socket; | |||||
$except[] = $socket; | |||||
} | |||||
} | |||||
foreach ($writes as $channel) { | |||||
if ($channel->isWriteBufferEmpty()) { | |||||
// If the channel's write buffer is empty, don't select the write | |||||
// sockets, since they're writable immediately. | |||||
$w_sockets = array(); | |||||
} else { | |||||
$w_sockets = $channel->getWriteSockets(); | |||||
} | |||||
foreach ($w_sockets as $socket) { | |||||
$write[] = $socket; | |||||
$except[] = $socket; | |||||
} | |||||
} | |||||
if (!$read && !$write && !$except) { | |||||
return false; | |||||
} | |||||
$wait_sec = (int)$wait; | |||||
$wait_usec = 1000000 * ($wait - $wait_sec); | |||||
@stream_select($read, $write, $except, $wait_sec, $wait_usec); | |||||
} | |||||
/* -( Responding to Activity )--------------------------------------------- */ | |||||
/** | |||||
* Updates the channel, filling input buffers and flushing output buffers. | |||||
* Returns false if the channel has closed. | |||||
* | |||||
* @return bool True if the channel is still open. | |||||
* | |||||
* @task update | |||||
*/ | |||||
public function update() { | |||||
$maximum_read = PHP_INT_MAX; | |||||
if ($this->readBufferSize !== null) { | |||||
$maximum_read = ($this->readBufferSize - strlen($this->ibuf)); | |||||
} | |||||
while ($maximum_read > 0) { | |||||
$in = $this->readBytes($maximum_read); | |||||
if (!strlen($in)) { | |||||
// Reading is blocked for now. | |||||
break; | |||||
} | |||||
$this->ibuf .= $in; | |||||
$maximum_read -= strlen($in); | |||||
} | |||||
while ($this->obuf->getByteLength()) { | |||||
$len = $this->writeBytes($this->obuf->getAnyPrefix()); | |||||
if (!$len) { | |||||
// Writing is blocked for now. | |||||
break; | |||||
} | |||||
$this->obuf->removeBytesFromHead($len); | |||||
} | |||||
return $this->isOpen(); | |||||
} | |||||
/* -( Channel Implementation )--------------------------------------------- */ | |||||
/** | |||||
* Set a channel name. This is primarily intended to allow you to debug | |||||
* channel code more easily, by naming channels something meaningful. | |||||
* | |||||
* @param string Channel name. | |||||
* @return this | |||||
* | |||||
* @task impl | |||||
*/ | |||||
public function setName($name) { | |||||
$this->name = $name; | |||||
return $this; | |||||
} | |||||
/** | |||||
* Get the channel name, as set by @{method:setName}. | |||||
* | |||||
* @return string Name of the channel. | |||||
* | |||||
* @task impl | |||||
*/ | |||||
public function getName() { | |||||
return coalesce($this->name, get_class($this)); | |||||
} | |||||
/** | |||||
* Test if the channel is open: active, can be read from and written to, etc. | |||||
* | |||||
* @return bool True if the channel is open. | |||||
* | |||||
* @task impl | |||||
*/ | |||||
abstract public function isOpen(); | |||||
/** | |||||
* Close the channel for writing. | |||||
* | |||||
* @return void | |||||
* @task impl | |||||
*/ | |||||
abstract public function closeWriteChannel(); | |||||
/** | |||||
* Test if the channel is open for reading. | |||||
* | |||||
* @return bool True if the channel is open for reading. | |||||
* | |||||
* @task impl | |||||
*/ | |||||
public function isOpenForReading() { | |||||
return $this->isOpen(); | |||||
} | |||||
/** | |||||
* Test if the channel is open for writing. | |||||
* | |||||
* @return bool True if the channel is open for writing. | |||||
* | |||||
* @task impl | |||||
*/ | |||||
public function isOpenForWriting() { | |||||
return $this->isOpen(); | |||||
} | |||||
/** | |||||
* Read from the channel's underlying I/O. | |||||
* | |||||
* @param int Maximum number of bytes to read. | |||||
* @return string Bytes, if available. | |||||
* | |||||
* @task impl | |||||
*/ | |||||
abstract protected function readBytes($length); | |||||
/** | |||||
* Write to the channel's underlying I/O. | |||||
* | |||||
* @param string Bytes to write. | |||||
* @return int Number of bytes written. | |||||
* | |||||
* @task impl | |||||
*/ | |||||
abstract protected function writeBytes($bytes); | |||||
/** | |||||
* Get sockets to select for reading. | |||||
* | |||||
* @return list<stream> Read sockets. | |||||
* | |||||
* @task impl | |||||
*/ | |||||
protected function getReadSockets() { | |||||
return array(); | |||||
} | |||||
/** | |||||
* Get sockets to select for writing. | |||||
* | |||||
* @return list<stream> Write sockets. | |||||
* | |||||
* @task impl | |||||
*/ | |||||
protected function getWriteSockets() { | |||||
return array(); | |||||
} | |||||
/** | |||||
* Set the maximum size of the channel's read buffer. Reads will artificially | |||||
* block once the buffer reaches this size until the in-process buffer is | |||||
* consumed. | |||||
* | |||||
* @param int|null Maximum read buffer size, or `null` for a limitless buffer. | |||||
* @return this | |||||
* @task impl | |||||
*/ | |||||
public function setReadBufferSize($size) { | |||||
$this->readBufferSize = $size; | |||||
return $this; | |||||
} | |||||
/** | |||||
* Test state of the read buffer. | |||||
* | |||||
* @return bool True if the read buffer is empty. | |||||
* | |||||
* @task impl | |||||
*/ | |||||
public function isReadBufferEmpty() { | |||||
return (strlen($this->ibuf) == 0); | |||||
} | |||||
/** | |||||
* Test state of the write buffer. | |||||
* | |||||
* @return bool True if the write buffer is empty. | |||||
* | |||||
* @task impl | |||||
*/ | |||||
public function isWriteBufferEmpty() { | |||||
return !$this->getWriteBufferSize(); | |||||
} | |||||
/** | |||||
* Get the number of bytes we're currently waiting to write. | |||||
* | |||||
* @return int Number of waiting bytes. | |||||
* | |||||
* @task impl | |||||
*/ | |||||
public function getWriteBufferSize() { | |||||
return $this->obuf->getByteLength(); | |||||
} | |||||
/** | |||||
* Wait for any buffered writes to complete. This is a blocking call. When | |||||
* the call returns, the write buffer will be empty. | |||||
* | |||||
* @task impl | |||||
*/ | |||||
public function flush() { | |||||
while (!$this->isWriteBufferEmpty()) { | |||||
self::waitForAny(array($this)); | |||||
if (!$this->update()) { | |||||
throw new Exception(pht('Channel closed while flushing output!')); | |||||
} | |||||
} | |||||
return $this; | |||||
} | |||||
} |