Page MenuHomePhabricator

Implement read streaming on HTTPSFutures
ClosedPublic

Authored by epriestley on Dec 6 2013, 12:36 AM.
Tags
None
Referenced Files
F14113048: D7724.diff
Thu, Nov 28, 3:39 AM
Unknown Object (File)
Tue, Nov 26, 8:05 PM
Unknown Object (File)
Tue, Nov 26, 8:05 PM
Unknown Object (File)
Tue, Nov 26, 8:05 PM
Unknown Object (File)
Tue, Nov 26, 7:55 PM
Unknown Object (File)
Sun, Nov 24, 5:56 PM
Unknown Object (File)
Tue, Nov 19, 11:54 PM
Unknown Object (File)
Fri, Nov 15, 5:49 PM
Subscribers
Tokens
"Evil Spooky Haunted Tree" token, awarded by chad.

Details

Summary

See D7723.

Test Plan
<?php

require_once 'scripts/__init_script__.php';

$future1 = new HTTPSFuture('https://www.google.com/');
$future2 = new HTTPSFuture('https://www.facebook.com/');

$wait = 0.01; // In seconds. This is very small for demo purposes, a value
               // like "1" is probably better in practice.

$futures = array('google' => $future1, 'facebook' => $future2);
foreach (Futures($futures)->setUpdateInterval($wait) as $key => $future) {

  // This might be a periodic update, in which case $future will be null.
  // It might also be a future exiting, in which case $future will not be null.
  // In either case, let's read any data we can first.
  foreach ($futures as $key => $future) {
    $new_data = $future->read();
    // Throw the buffered data away after we read it.
    $future->discardBuffers();

    $len = strlen($new_data);
    echo "Got {$len} bytes from {$key}.\n";
    if (strlen($new_data)) {
      // Do something interesting with the data.
    }
  }

  // Now, check if a future exited.
  if ($future !== null) {
    // This future has exited. We should sleep for a bit and then restart it,
    // or whatever.
    echo "Future {$key} exited!\n";
  }
}

Diff Detail

Lint
Lint Skipped
Unit
Tests Skipped

Event Timeline

>>> orbital ~/devtools/phabricator $ php -f future.php 
Got 0 bytes from google.
Got 0 bytes from facebook.
Future facebook exited!
Got 0 bytes from google.
Got 0 bytes from facebook.
Future facebook exited!
Got 0 bytes from google.
Got 0 bytes from facebook.
Future facebook exited!
Got 0 bytes from google.
Got 0 bytes from facebook.
Future facebook exited!
Got 0 bytes from google.
Got 0 bytes from facebook.
Future facebook exited!
Got 0 bytes from google.
Got 0 bytes from facebook.
Future facebook exited!
Got 13924 bytes from google.
Got 0 bytes from facebook.
Future facebook exited!
Got 2786 bytes from google.
Got 0 bytes from facebook.
Future facebook exited!
Got 13930 bytes from google.
Got 0 bytes from facebook.
Future facebook exited!
Got 15293 bytes from google.
Got 0 bytes from facebook.
Future facebook exited!
Got 0 bytes from google.
Got 0 bytes from facebook.
Future facebook exited!
Got 0 bytes from google.
Got 496 bytes from facebook.
Future facebook exited!
Got 0 bytes from google.
Got 0 bytes from facebook.
Future facebook exited!
Got 0 bytes from google.
Got 0 bytes from facebook.
Future facebook exited!
Got 0 bytes from google.
Got 0 bytes from facebook.
Future facebook exited!
Got 0 bytes from google.
Got 0 bytes from facebook.
Future facebook exited!
Got 0 bytes from google.
Got 0 bytes from facebook.
Future facebook exited!
Got 0 bytes from google.
Got 0 bytes from facebook.
Future facebook exited!
Got 0 bytes from google.
Got 0 bytes from facebook.
Future facebook exited!
Got 0 bytes from google.
Got 0 bytes from facebook.
Future facebook exited!
Got 0 bytes from google.
Got 0 bytes from facebook.
Future facebook exited!
Got 0 bytes from google.
Got 1624 bytes from facebook.
Future facebook exited!
Got 0 bytes from google.
Got 11478 bytes from facebook.
Future facebook exited!
Got 0 bytes from google.
Got 8143 bytes from facebook.
Future facebook exited!
Got 0 bytes from google.
Got 0 bytes from facebook.
Future facebook exited!

Not 100% sure this is correct, but that output looks pretty promising.

src/future/FutureIterator.php
237

This fixes a bug where setUpdateInterval() was not respected for non-socket futures (basically just HTTPSFuture).

This seems to be working for me. From your test script:

// Now, check if a future exited.
if ($future !== null) {

is that actually correct? ie shouldn't the future be === null if its exited?

I've basically slapped your test script into my bot, substituting in a Twitter public stream future and only having one in the array.

From https://secure.phabricator.com/D7723#comment-5 -- I'm having trouble getting my channel class to work, which is

final class NuanceTwitterChannel extends PhutilProtocolChannel {

/**
 * These messages are already encoded as json strings delimited by new lines.
 */
protected function encodeMessage($message) {
  return $message;
}

protected function decodeStream($data) {
  $objects = array();
  $json_objects = explode("\n", $data);
  foreach ($json_objects as $json_object) {
    $object = json_decode($json_object, true);
    $objects[] = $object;
  }

  return $objects;
}
}

I initialize it above the foreach(Futures) loop like

$raw_channel = new PhutilSocketChannel(
  fopen('php://stdin', 'r'),
  fopen('php://stdout', 'w'));
$this->channel = new NuanceTwitterChannel($raw_channel);

where you say

// Do something interesting with the data.

I now do

$channel->write($new_data);

and then at the end of the script, I added

$message = $this->channel->read();

but $message is always null...?

more on the message being null, my decodeStream function never gets called.

Pretty sure $future !== null is correct. If a future exited, the exited future is available in $future. If no future exited, $future will be null.

The normal form of this loop is:

foreach (Futures($futures) as $future) {
  // the body enters as each future exits
}

By doing ->setUpdateInterval(...), the loop still works like that, but you're saying "in addition to that, also yield a null every so often so I can do stream/additional processing.

Your protocol buffer needs to retain an internal buffer of data which it has seen but has not produced a message yet. For example, suppose twitter sends this message:

{"tweet":"hello"}\n

There's no guarantee you'll read() that off the future all in one go. It might happen in several reads -- for example:

{"twe
eet":"he
llo"}\n

The class shouldn't necessarily extend PhutilProtocolChannel, since the underlying datastream isn't a Channel -- it will just look similar to PhutilJSONProtocolChannel in that they'll both be stream-based parsers for JSON data with an internal buffer. It might be simpler to not extend it. The problem right now is that you're putting a protocol channel on top of stdin/stdout, so when you call write(), it means "write to stdout", not "receive this data". If you call $channel->update(), it should flush its write buffer and print the data to your console. That's not what you want, and there's no mechanism for forcing a channel to receive arbitrary out-of-band data since it doesn't normally make sense.

If you do want to extend it, we either need to write a PhutilHTTPSChannel or do this:

list($channel_in, $channel_out) = PhutilSocketChannel::newChannelPair();
$protocol_channel = new NuanceProtocolChannel($channel_out);

Then, when data arrives, do:

$channel_in->write($data);

To read, do:

$protocol_channel->update();
$message = $protocol_channel->read();

But unless you anticipate needing to use this same protocol parser on real channels later, it's probably much simpler to not extend.

Could you land this when you get a chance? Seems to work well for me. :)