Changeset View
Changeset View
Standalone View
Standalone View
src/applications/files/engine/PhabricatorFileChunkIterator.php
<?php | <?php | ||||
final class PhabricatorFileChunkIterator | final class PhabricatorFileChunkIterator | ||||
extends Phobject | extends Phobject | ||||
implements Iterator { | implements Iterator { | ||||
private $chunks; | private $chunks; | ||||
private $cursor; | private $cursor; | ||||
private $begin; | private $begin; | ||||
private $end; | private $end; | ||||
private $data; | private $data; | ||||
private $queue; | |||||
private $futures = array(); | |||||
public function __construct(array $chunks, $begin = null, $end = null) { | public function __construct(array $chunks, $begin = null, $end = null) { | ||||
$chunks = msort($chunks, 'getByteStart'); | $chunks = msort($chunks, 'getByteStart'); | ||||
$this->chunks = $chunks; | $this->chunks = $chunks; | ||||
if ($begin !== null) { | if ($begin !== null) { | ||||
foreach ($chunks as $key => $chunk) { | foreach ($chunks as $key => $chunk) { | ||||
if ($chunk->getByteEnd() >= $begin) { | if ($chunk->getByteEnd() >= $begin) { | ||||
unset($chunks[$key]); | unset($chunks[$key]); | ||||
} | } | ||||
break; | break; | ||||
} | } | ||||
$this->begin = $begin; | $this->begin = $begin; | ||||
} | } | ||||
if ($end !== null) { | if ($end !== null) { | ||||
foreach ($chunks as $key => $chunk) { | foreach ($chunks as $key => $chunk) { | ||||
if ($chunk->getByteStart() <= $end) { | if ($chunk->getByteStart() <= $end) { | ||||
unset($chunks[$key]); | unset($chunks[$key]); | ||||
} | } | ||||
} | } | ||||
$this->end = $end; | $this->end = $end; | ||||
} | } | ||||
$this->queue = array_reverse(array_keys($chunks)); | |||||
} | } | ||||
public function current() { | public function current() { | ||||
$chunk = head($this->chunks); | // If the underlying storage engine where the chunk data actually lives | ||||
// supports futures, we'll try to pre-fetch the next few chunks. If the | |||||
// engine has relatively high latency relative to its bandwidth (like | |||||
// Amazon S3), this can improve total throughput rate significantly. | |||||
$limit = 8; | |||||
while ($this->queue) { | |||||
if (count($this->futures) >= $limit) { | |||||
break; | |||||
} | |||||
$key = array_pop($this->queue); | |||||
$chunk = $this->chunks[$key]; | |||||
$future = $chunk->getDataFile()->newFileDataIteratorFuture(); | |||||
if ($future) { | |||||
$this->futures[$key] = $future; | |||||
} | |||||
} | |||||
$next_key = head_key($this->chunks); | |||||
if (isset($this->futures[$next_key])) { | |||||
foreach (new FutureIterator($this->futures) as $future_key => $future) { | |||||
if ($future_key === $next_key) { | |||||
break; | |||||
} | |||||
// NOTE: We don't remove any futures which have already resolved, | |||||
// because we want to use only a constant amount of memory. If we free | |||||
// up the slots of resolved futures, we might get more throughput, but | |||||
// could end up with the entire file buffered in memory if the storage | |||||
// engine can send data to us faster than the client can read it. | |||||
} | |||||
} | |||||
$chunk = $this->chunks[$next_key]; | |||||
$chunk_future = idx($this->futures, $next_key); | |||||
unset($this->futures[$next_key]); | |||||
if ($chunk_future) { | |||||
$data = ''; | |||||
foreach ($chunk_future->resolve() as $piece) { | |||||
$data .= $piece; | |||||
} | |||||
} else { | |||||
$data = $chunk->getDataFile()->loadFileData(); | $data = $chunk->getDataFile()->loadFileData(); | ||||
} | |||||
if ($this->end !== null) { | if ($this->end !== null) { | ||||
if ($chunk->getByteEnd() > $this->end) { | if ($chunk->getByteEnd() > $this->end) { | ||||
$data = substr($data, 0, ($this->end - $chunk->getByteStart())); | $data = substr($data, 0, ($this->end - $chunk->getByteStart())); | ||||
} | } | ||||
} | } | ||||
if ($this->begin !== null) { | if ($this->begin !== null) { | ||||
Show All 25 Lines |