diff --git a/src/__phutil_library_map__.php b/src/__phutil_library_map__.php --- a/src/__phutil_library_map__.php +++ b/src/__phutil_library_map__.php @@ -3107,6 +3107,7 @@ 'PhabricatorFileNameTransaction' => 'applications/files/xaction/PhabricatorFileNameTransaction.php', 'PhabricatorFileQuery' => 'applications/files/query/PhabricatorFileQuery.php', 'PhabricatorFileROT13StorageFormat' => 'applications/files/format/PhabricatorFileROT13StorageFormat.php', + 'PhabricatorFileRawDataIteratorFuture' => 'applications/files/data/PhabricatorFileRawDataIteratorFuture.php', 'PhabricatorFileRawStorageFormat' => 'applications/files/format/PhabricatorFileRawStorageFormat.php', 'PhabricatorFileSchemaSpec' => 'applications/files/storage/PhabricatorFileSchemaSpec.php', 'PhabricatorFileSearchConduitAPIMethod' => 'applications/files/conduit/PhabricatorFileSearchConduitAPIMethod.php', @@ -8835,6 +8836,7 @@ 'PhabricatorFileNameTransaction' => 'PhabricatorFileTransactionType', 'PhabricatorFileQuery' => 'PhabricatorCursorPagedPolicyAwareQuery', 'PhabricatorFileROT13StorageFormat' => 'PhabricatorFileStorageFormat', + 'PhabricatorFileRawDataIteratorFuture' => 'FutureProxy', 'PhabricatorFileRawStorageFormat' => 'PhabricatorFileStorageFormat', 'PhabricatorFileSchemaSpec' => 'PhabricatorConfigSchemaSpec', 'PhabricatorFileSearchConduitAPIMethod' => 'PhabricatorSearchEngineAPIMethod', diff --git a/src/applications/files/data/PhabricatorFileRawDataIteratorFuture.php b/src/applications/files/data/PhabricatorFileRawDataIteratorFuture.php new file mode 100644 --- /dev/null +++ b/src/applications/files/data/PhabricatorFileRawDataIteratorFuture.php @@ -0,0 +1,67 @@ +engine = $engine; + $this->file = $file; + $this->begin = $begin; + $this->end = $end; + $this->format = $format; + } + + protected function didReceiveResult($formatted_data) { + $file = $this->file; + $engine = $this->engine; + + $begin = $this->begin; + $end = $this->end; + $format = $this->format; + + $known_integrity = $file->getIntegrityHash(); + if ($known_integrity !== null) { + $new_integrity = $engine->newIntegrityHash($formatted_data, $format); + if (!phutil_hashes_are_identical($known_integrity, $new_integrity)) { + throw new PhabricatorFileIntegrityException( + pht( + 'File data integrity check failed. Dark forces have corrupted '. + 'or tampered with this file. The file data can not be read.')); + } + } + + $formatted_data = array($formatted_data); + + $data = ''; + $format_iterator = $format->newReadIterator($formatted_data); + foreach ($format_iterator as $raw_chunk) { + $data .= $raw_chunk; + } + + if ($begin !== null && $end !== null) { + $data = substr($data, $begin, ($end - $begin)); + } else if ($begin !== null) { + $data = substr($data, $begin); + } else if ($end !== null) { + $data = substr($data, 0, $end); + } + + return array($data); + } + +} diff --git a/src/applications/files/engine/PhabricatorChunkedFileStorageEngine.php b/src/applications/files/engine/PhabricatorChunkedFileStorageEngine.php --- a/src/applications/files/engine/PhabricatorChunkedFileStorageEngine.php +++ b/src/applications/files/engine/PhabricatorChunkedFileStorageEngine.php @@ -174,6 +174,21 @@ return (4 * 1024 * 1024); } + public function newRawFileDataIteratorFuture( + PhabricatorFile $file, + $begin, + $end, + PhabricatorFileStorageFormat $format) { + + $iterator = $this->getRawFileDataIterator( + $file, + $begin, + $end, + $format); + + return new ImmediateFuture($iterator); + } + public function getRawFileDataIterator( PhabricatorFile $file, $begin, diff --git a/src/applications/files/engine/PhabricatorFileChunkIterator.php b/src/applications/files/engine/PhabricatorFileChunkIterator.php --- a/src/applications/files/engine/PhabricatorFileChunkIterator.php +++ b/src/applications/files/engine/PhabricatorFileChunkIterator.php @@ -10,6 +10,9 @@ private $end; private $data; + private $queue; + private $futures = array(); + public function __construct(array $chunks, $begin = null, $end = null) { $chunks = msort($chunks, 'getByteStart'); $this->chunks = $chunks; @@ -32,11 +35,59 @@ } $this->end = $end; } + + $this->queue = array_reverse(array_keys($chunks)); } public function current() { - $chunk = head($this->chunks); - $data = $chunk->getDataFile()->loadFileData(); + // If the underlying storage engine where the chunk data actually lives + // supports futures, we'll try to pre-fetch the next few chunks. If the + // engine has relatively high latency relative to its bandwidth (like + // Amazon S3), this can improve total throughput rate significantly. + + $limit = 8; + while ($this->queue) { + if (count($this->futures) >= $limit) { + break; + } + + $key = array_pop($this->queue); + $chunk = $this->chunks[$key]; + + $future = $chunk->getDataFile()->newFileDataIteratorFuture(); + if ($future) { + $this->futures[$key] = $future; + } + } + + $next_key = head_key($this->chunks); + if (isset($this->futures[$next_key])) { + foreach (new FutureIterator($this->futures) as $future_key => $future) { + if ($future_key === $next_key) { + break; + } + + // NOTE: We don't remove any futures which have already resolved, + // because we want to use only a constant amount of memory. If we free + // up the slots of resolved futures, we might get more throughput, but + // could end up with the entire file buffered in memory if the storage + // engine can send data to us faster than the client can read it. + + } + } + + $chunk = $this->chunks[$next_key]; + $chunk_future = idx($this->futures, $next_key); + unset($this->futures[$next_key]); + + if ($chunk_future) { + $data = ''; + foreach ($chunk_future->resolve() as $piece) { + $data .= $piece; + } + } else { + $data = $chunk->getDataFile()->loadFileData(); + } if ($this->end !== null) { if ($chunk->getByteEnd() > $this->end) { diff --git a/src/applications/files/engine/PhabricatorFileStorageEngine.php b/src/applications/files/engine/PhabricatorFileStorageEngine.php --- a/src/applications/files/engine/PhabricatorFileStorageEngine.php +++ b/src/applications/files/engine/PhabricatorFileStorageEngine.php @@ -179,7 +179,6 @@ */ abstract public function readFile($handle); - /** * Delete the data for a file previously written by @{method:writeFile}. * @@ -327,42 +326,38 @@ return $engine->getChunkSize(); } - public function getRawFileDataIterator( + public function newRawFileDataIteratorFuture( PhabricatorFile $file, $begin, $end, PhabricatorFileStorageFormat $format) { - $formatted_data = $this->readFile($file->getStorageHandle()); - - $known_integrity = $file->getIntegrityHash(); - if ($known_integrity !== null) { - $new_integrity = $this->newIntegrityHash($formatted_data, $format); - if (!phutil_hashes_are_identical($known_integrity, $new_integrity)) { - throw new PhabricatorFileIntegrityException( - pht( - 'File data integrity check failed. Dark forces have corrupted '. - 'or tampered with this file. The file data can not be read.')); - } - } + return new PhabricatorFileRawDataIteratorFuture( + $this, + $file, + $begin, + $end, + $format, + $this->newRawReadFuture($file->getStorageHandle())); + } - $formatted_data = array($formatted_data); + public function getRawFileDataIterator( + PhabricatorFile $file, + $begin, + $end, + PhabricatorFileStorageFormat $format) { - $data = ''; - $format_iterator = $format->newReadIterator($formatted_data); - foreach ($format_iterator as $raw_chunk) { - $data .= $raw_chunk; - } + $future = $this->newRawFileDataIteratorFuture( + $file, + $begin, + $end, + $format); - if ($begin !== null && $end !== null) { - $data = substr($data, $begin, ($end - $begin)); - } else if ($begin !== null) { - $data = substr($data, $begin); - } else if ($end !== null) { - $data = substr($data, 0, $end); - } + return $future->resolve(); + } - return array($data); + protected function newRawReadFuture($handle) { + return new ImmediateFuture($this->readFile($handle)); } public function newIntegrityHash( diff --git a/src/applications/files/engine/PhabricatorS3FileStorageEngine.php b/src/applications/files/engine/PhabricatorS3FileStorageEngine.php --- a/src/applications/files/engine/PhabricatorS3FileStorageEngine.php +++ b/src/applications/files/engine/PhabricatorS3FileStorageEngine.php @@ -89,24 +89,13 @@ * Load a stored blob from Amazon S3. */ public function readFile($handle) { - $s3 = $this->newS3API(); - - $profiler = PhutilServiceProfiler::getInstance(); - $call_id = $profiler->beginServiceCall( - array( - 'type' => 's3', - 'method' => 'getObject', - )); - - $result = $s3 - ->setParametersForGetObject($handle) - ->resolve(); - - $profiler->endServiceCall($call_id, array()); - - return $result; + return $this->newRawReadFuture()->resolve(); } + protected function newRawReadFuture($handle) { + return $this->newS3API() + ->setParametersForGetObject($handle); + } /** * Delete a blob from Amazon S3. diff --git a/src/applications/files/storage/PhabricatorFile.php b/src/applications/files/storage/PhabricatorFile.php --- a/src/applications/files/storage/PhabricatorFile.php +++ b/src/applications/files/storage/PhabricatorFile.php @@ -786,6 +786,15 @@ return $this->loadDataFromIterator($iterator); } + public function newFileDataIteratorFuture($begin = null, $end = null) { + $engine = $this->instantiateStorageEngine(); + $format = $this->newStorageFormat(); + return $engine->newRawFileDataIteratorFuture( + $this, + $begin, + $end, + $format); + } /** * Return an iterable which emits file content bytes. @@ -795,17 +804,8 @@ * @return Iterable Iterable object which emits requested data. */ public function getFileDataIterator($begin = null, $end = null) { - $engine = $this->instantiateStorageEngine(); - - $format = $this->newStorageFormat(); - - $iterator = $engine->getRawFileDataIterator( - $this, - $begin, - $end, - $format); - - return $iterator; + return $this->newFileDataIteratorFuture($begin, $end) + ->resolve(); } public function getURI() {