See D7723.
Details
- Reviewers
btrahan - Commits
- rPHU8182cd18afc6: Implement read streaming on HTTPSFutures
<?php require_once 'scripts/__init_script__.php'; $future1 = new HTTPSFuture('https://www.google.com/'); $future2 = new HTTPSFuture('https://www.facebook.com/'); $wait = 0.01; // In seconds. This is very small for demo purposes, a value // like "1" is probably better in practice. $futures = array('google' => $future1, 'facebook' => $future2); foreach (Futures($futures)->setUpdateInterval($wait) as $key => $future) { // This might be a periodic update, in which case $future will be null. // It might also be a future exiting, in which case $future will not be null. // In either case, let's read any data we can first. foreach ($futures as $key => $future) { $new_data = $future->read(); // Throw the buffered data away after we read it. $future->discardBuffers(); $len = strlen($new_data); echo "Got {$len} bytes from {$key}.\n"; if (strlen($new_data)) { // Do something interesting with the data. } } // Now, check if a future exited. if ($future !== null) { // This future has exited. We should sleep for a bit and then restart it, // or whatever. echo "Future {$key} exited!\n"; } }
Diff Detail
- Branch
- httpstream
- Lint
Lint Passed - Unit
Tests Passed
Event Timeline
>>> orbital ~/devtools/phabricator $ php -f future.php Got 0 bytes from google. Got 0 bytes from facebook. Future facebook exited! Got 0 bytes from google. Got 0 bytes from facebook. Future facebook exited! Got 0 bytes from google. Got 0 bytes from facebook. Future facebook exited! Got 0 bytes from google. Got 0 bytes from facebook. Future facebook exited! Got 0 bytes from google. Got 0 bytes from facebook. Future facebook exited! Got 0 bytes from google. Got 0 bytes from facebook. Future facebook exited! Got 13924 bytes from google. Got 0 bytes from facebook. Future facebook exited! Got 2786 bytes from google. Got 0 bytes from facebook. Future facebook exited! Got 13930 bytes from google. Got 0 bytes from facebook. Future facebook exited! Got 15293 bytes from google. Got 0 bytes from facebook. Future facebook exited! Got 0 bytes from google. Got 0 bytes from facebook. Future facebook exited! Got 0 bytes from google. Got 496 bytes from facebook. Future facebook exited! Got 0 bytes from google. Got 0 bytes from facebook. Future facebook exited! Got 0 bytes from google. Got 0 bytes from facebook. Future facebook exited! Got 0 bytes from google. Got 0 bytes from facebook. Future facebook exited! Got 0 bytes from google. Got 0 bytes from facebook. Future facebook exited! Got 0 bytes from google. Got 0 bytes from facebook. Future facebook exited! Got 0 bytes from google. Got 0 bytes from facebook. Future facebook exited! Got 0 bytes from google. Got 0 bytes from facebook. Future facebook exited! Got 0 bytes from google. Got 0 bytes from facebook. Future facebook exited! Got 0 bytes from google. Got 0 bytes from facebook. Future facebook exited! Got 0 bytes from google. Got 1624 bytes from facebook. Future facebook exited! Got 0 bytes from google. Got 11478 bytes from facebook. Future facebook exited! Got 0 bytes from google. Got 8143 bytes from facebook. Future facebook exited! Got 0 bytes from google. Got 0 bytes from facebook. Future facebook exited!
Not 100% sure this is correct, but that output looks pretty promising.
src/future/FutureIterator.php | ||
---|---|---|
237 | This fixes a bug where setUpdateInterval() was not respected for non-socket futures (basically just HTTPSFuture). |
This seems to be working for me. From your test script:
// Now, check if a future exited. if ($future !== null) {
is that actually correct? ie shouldn't the future be === null if its exited?
I've basically slapped your test script into my bot, substituting in a Twitter public stream future and only having one in the array.
From https://secure.phabricator.com/D7723#comment-5 -- I'm having trouble getting my channel class to work, which is
final class NuanceTwitterChannel extends PhutilProtocolChannel { /** * These messages are already encoded as json strings delimited by new lines. */ protected function encodeMessage($message) { return $message; } protected function decodeStream($data) { $objects = array(); $json_objects = explode("\n", $data); foreach ($json_objects as $json_object) { $object = json_decode($json_object, true); $objects[] = $object; } return $objects; } }
I initialize it above the foreach(Futures) loop like
$raw_channel = new PhutilSocketChannel( fopen('php://stdin', 'r'), fopen('php://stdout', 'w')); $this->channel = new NuanceTwitterChannel($raw_channel);
where you say
// Do something interesting with the data.
I now do
$channel->write($new_data);
and then at the end of the script, I added
$message = $this->channel->read();
but $message is always null...?
Pretty sure $future !== null is correct. If a future exited, the exited future is available in $future. If no future exited, $future will be null.
The normal form of this loop is:
foreach (Futures($futures) as $future) { // the body enters as each future exits }
By doing ->setUpdateInterval(...), the loop still works like that, but you're saying "in addition to that, also yield a null every so often so I can do stream/additional processing.
Your protocol buffer needs to retain an internal buffer of data which it has seen but has not produced a message yet. For example, suppose twitter sends this message:
{"tweet":"hello"}\n
There's no guarantee you'll read() that off the future all in one go. It might happen in several reads -- for example:
{"twe eet":"he llo"}\n
The class shouldn't necessarily extend PhutilProtocolChannel, since the underlying datastream isn't a Channel -- it will just look similar to PhutilJSONProtocolChannel in that they'll both be stream-based parsers for JSON data with an internal buffer. It might be simpler to not extend it. The problem right now is that you're putting a protocol channel on top of stdin/stdout, so when you call write(), it means "write to stdout", not "receive this data". If you call $channel->update(), it should flush its write buffer and print the data to your console. That's not what you want, and there's no mechanism for forcing a channel to receive arbitrary out-of-band data since it doesn't normally make sense.
If you do want to extend it, we either need to write a PhutilHTTPSChannel or do this:
list($channel_in, $channel_out) = PhutilSocketChannel::newChannelPair(); $protocol_channel = new NuanceProtocolChannel($channel_out);
Then, when data arrives, do:
$channel_in->write($data);
To read, do:
$protocol_channel->update(); $message = $protocol_channel->read();
But unless you anticipate needing to use this same protocol parser on real channels later, it's probably much simpler to not extend.