Page MenuHomePhabricator
Paste P1945

A still not quite right idea
ActivePublic

Authored by yelirekim on Feb 16 2016, 1:43 AM.
Tags
None
Referenced Files
F1113481: A still not quite right idea
Feb 16 2016, 1:43 AM
Subscribers
None
<?php
final class CITransactionProcessingEngine extends Phobject {
private $batchSize;
private $startTime;
public function setBatchSize($batch) {
$this->batchSize = $batch;
return $this;
}
public function setStartTime($start) {
$this->startTime = $start;
return $this;
}
public function poll() {
$templates = [];
foreach (CITransactionProcessor::getAllProcessors() as $processor_class => $processor) {
foreach ($processor->getProcessableTransactionClasses() as $transaction_class) {
$templates[$transaction_class][$processor_class] = $processor;
}
}
foreach ($templates as $class => $processors) {
$transaction = newv($class, []);
$newest_cursor = (new CITransactionsProcessingCursorQuery())
->setLimit(1)
->withTransactionPHIDType($transaction->getApplicationTransactionType())
->execute();
$conn = $transaction->establishConnection('r');
if ($newest_cursor = head($newest_cursor)) {
$lowest_id = $newest_cursor->getTransactionID() + 1;
$where_clause = qsprintf($conn, 'id >= %d', $lowest_id);
} else {
$where_clause = qsprintf($conn, 'dateCreated > %d', $this->startTime);
}
$transaction_records = queryfx_all(
$conn,
'SELECT t.* FROM %T t WHERE %Q ORDER BY id ASC LIMIT %d',
$transaction->getTableName(),
$where_clause,
$this->batchSize);
if (!$transaction_records) {
continue;
}
$transactions = $transaction->loadAllFromArray($transaction_records);
$transactions = mpull($transactions, null, 'getPHID');
$to_process = [];
foreach ($transactions as $transaction) {
CITransactionsProcessingCursor::initializeNewCursor($transaction)
->save();
foreach ($processors as $processor_class => $processor) {
if ($processor->canProcessTransaction($transaction)) {
$to_process[$processor_class][] = $transaction;
}
}
}
foreach ($to_process as $processor_class => $transactions) {
$processor = $processors[$processor_class];
$processor->processTransactions($transactions);
}
}
}
}