Differential D15440 Diff 37216 src/applications/nuance/cursor/NuanceGitHubRepositoryImportCursor.php
Changeset View
Changeset View
Standalone View
Standalone View
src/applications/nuance/cursor/NuanceGitHubRepositoryImportCursor.php
Show All 31 Lines | if ($limit && ($limit >= $now)) { | ||||
new PhutilNumber(1 + ($limit - $now)))); | new PhutilNumber(1 + ($limit - $now)))); | ||||
return false; | return false; | ||||
} | } | ||||
return true; | return true; | ||||
} | } | ||||
protected function pullDataFromSource() { | protected function pullDataFromSource() { | ||||
$viewer = $this->getViewer(); | |||||
$now = PhabricatorTime::getNow(); | |||||
$source = $this->getSource(); | $source = $this->getSource(); | ||||
$user = $source->getSourceProperty('github.user'); | $user = $source->getSourceProperty('github.user'); | ||||
$repository = $source->getSourceProperty('github.repository'); | $repository = $source->getSourceProperty('github.repository'); | ||||
$api_token = $source->getSourceProperty('github.token'); | $api_token = $source->getSourceProperty('github.token'); | ||||
// This API only supports fetching 10 pages of 30 events each, for a total | |||||
// of 300 events. | |||||
$etag = null; | |||||
$new_items = array(); | |||||
$hit_known_items = false; | |||||
for ($page = 1; $page <= 10; $page++) { | |||||
$uri = "/repos/{$user}/{$repository}/events"; | $uri = "/repos/{$user}/{$repository}/events"; | ||||
$data = array(); | $data = array( | ||||
'page' => $page, | |||||
); | |||||
$future = id(new PhutilGitHubFuture()) | $future = id(new PhutilGitHubFuture()) | ||||
->setAccessToken($api_token) | ->setAccessToken($api_token) | ||||
->setRawGitHubQuery($uri, $data); | ->setRawGitHubQuery($uri, $data); | ||||
$etag = $this->getCursorProperty('github.poll.etag'); | if ($page == 1) { | ||||
if ($etag) { | $cursor_etag = $this->getCursorProperty('github.poll.etag'); | ||||
$future->addHeader('If-None-Match', $etag); | if ($cursor_etag) { | ||||
$future->addHeader('If-None-Match', $cursor_etag); | |||||
} | |||||
} | } | ||||
$this->logInfo( | $this->logInfo( | ||||
pht( | pht( | ||||
'Polling GitHub Repository API endpoint "%s".', | 'Polling GitHub Repository API endpoint "%s".', | ||||
$uri)); | $uri)); | ||||
$response = $future->resolve(); | $response = $future->resolve(); | ||||
// Do this first: if we hit the rate limit, we get a response but the | // Do this first: if we hit the rate limit, we get a response but the | ||||
// body isn't valid. | // body isn't valid. | ||||
$this->updateRateLimits($response); | $this->updateRateLimits($response); | ||||
// This means we hit a rate limit or a "Not Modified" because of the "ETag" | if ($response->getStatus()->getStatusCode() == 304) { | ||||
// header. In either case, we should bail out. | $this->logInfo( | ||||
pht( | |||||
'Received a 304 Not Modified from GitHub, no new events.')); | |||||
} | |||||
// This means we hit a rate limit or a "Not Modified" because of the | |||||
// "ETag" header. In either case, we should bail out. | |||||
if ($response->getStatus()->isError()) { | if ($response->getStatus()->isError()) { | ||||
// TODO: Save cursor data! | $this->updatePolling($response, $now, false); | ||||
$this->getCursorData()->save(); | |||||
return false; | return false; | ||||
} | } | ||||
$this->updateETag($response); | if ($page == 1) { | ||||
$etag = $response->getHeaderValue('ETag'); | |||||
} | |||||
$records = $response->getBody(); | |||||
foreach ($records as $record) { | |||||
$item = $this->newNuanceItemFromGitHubEvent($record); | |||||
$item_key = $item->getItemKey(); | |||||
$this->logInfo( | |||||
pht( | |||||
'Fetched event "%s".', | |||||
$item_key)); | |||||
$new_items[$item->getItemKey()] = $item; | |||||
} | |||||
if ($new_items) { | |||||
$existing = id(new NuanceItemQuery()) | |||||
->setViewer($viewer) | |||||
->withSourcePHIDs(array($source->getPHID())) | |||||
->withItemKeys(array_keys($new_items)) | |||||
->execute(); | |||||
$existing = mpull($existing, null, 'getItemKey'); | |||||
foreach ($new_items as $key => $new_item) { | |||||
if (isset($existing[$key])) { | |||||
unset($new_items[$key]); | |||||
$hit_known_items = true; | |||||
$this->logInfo( | |||||
pht( | |||||
'Event "%s" is previously known.', | |||||
$key)); | |||||
} | |||||
} | |||||
} | |||||
if ($hit_known_items) { | |||||
break; | |||||
} | |||||
if (count($records) < 30) { | |||||
break; | |||||
} | |||||
} | |||||
// TODO: When we go through the whole queue without hitting anything we | |||||
// have seen before, we should record some sort of global event so we | |||||
// can tell the user when the bridging started or was interrupted? | |||||
if (!$hit_known_items) { | |||||
$already_polled = $this->getCursorProperty('github.polled'); | |||||
if ($already_polled) { | |||||
// TODO: This is bad: we missed some items, maybe because too much | |||||
// stuff happened too fast or the daemons were broken for a long | |||||
// time. | |||||
} else { | |||||
// TODO: This is OK, we're doing the initial import. | |||||
} | |||||
} | |||||
if ($etag !== null) { | |||||
$this->updateETag($etag); | |||||
} | |||||
$this->updatePolling($response, $now, true); | |||||
$source->openTransaction(); | |||||
foreach ($new_items as $new_item) { | |||||
$new_item->save(); | |||||
} | |||||
$this->getCursorData()->save(); | |||||
$source->saveTransaction(); | |||||
foreach ($new_items as $new_item) { | |||||
PhabricatorWorker::scheduleTask( | |||||
'NuanceImportWorker', | |||||
array( | |||||
'itemPHID' => $new_item->getPHID(), | |||||
), | |||||
array( | |||||
'objectPHID' => $new_item->getPHID(), | |||||
)); | |||||
} | |||||
var_dump($response->getBody()); | return false; | ||||
} | } | ||||
private function updateRateLimits(PhutilGitHubResponse $response) { | private function updateRateLimits(PhutilGitHubResponse $response) { | ||||
$remaining = $response->getHeaderValue('X-RateLimit-Remaining'); | $remaining = $response->getHeaderValue('X-RateLimit-Remaining'); | ||||
$limit_reset = $response->getHeaderValue('X-RateLimit-Reset'); | $limit_reset = $response->getHeaderValue('X-RateLimit-Reset'); | ||||
$now = PhabricatorTime::getNow(); | $now = PhabricatorTime::getNow(); | ||||
$limit_ttl = null; | $limit_ttl = null; | ||||
Show All 9 Lines | private function updateRateLimits(PhutilGitHubResponse $response) { | ||||
$this->logInfo( | $this->logInfo( | ||||
pht( | pht( | ||||
'This key has %s remaining API request(s), '. | 'This key has %s remaining API request(s), '. | ||||
'limit resets in %s second(s).', | 'limit resets in %s second(s).', | ||||
new PhutilNumber($remaining), | new PhutilNumber($remaining), | ||||
new PhutilNumber($limit_reset - $now))); | new PhutilNumber($limit_reset - $now))); | ||||
} | } | ||||
private function updateETag(PhutilGitHubResponse $response) { | private function updateETag($etag) { | ||||
$etag = $response->getHeaderValue('ETag'); | |||||
$this->setCursorProperty('github.poll.etag', $etag); | $this->setCursorProperty('github.poll.etag', $etag); | ||||
$this->logInfo( | $this->logInfo( | ||||
pht( | pht( | ||||
'ETag for this request was "%s".', | 'ETag for this request was "%s".', | ||||
$etag)); | $etag)); | ||||
} | } | ||||
private function updatePolling( | |||||
PhutilGitHubResponse $response, | |||||
$start, | |||||
$success) { | |||||
if ($success) { | |||||
$this->setCursorProperty('github.polled', true); | |||||
} | |||||
$poll_interval = (int)$response->getHeaderValue('X-Poll-Interval'); | |||||
$poll_ttl = $start + $poll_interval; | |||||
$this->setCursorProperty('github.poll.ttl', $poll_ttl); | |||||
$now = PhabricatorTime::getNow(); | |||||
$this->logInfo( | |||||
pht( | |||||
'Set API poll TTL to +%s second(s) (%s second(s) from now).', | |||||
new PhutilNumber($poll_interval), | |||||
new PhutilNumber($poll_ttl - $now))); | |||||
} | |||||
private function newNuanceItemFromGitHubEvent(array $record) { | |||||
$source = $this->getSource(); | |||||
$id = $record['id']; | |||||
$item_key = "github.event.{$id}"; | |||||
$container_key = null; | |||||
$issue_id = idxv( | |||||
$record, | |||||
array( | |||||
'payload', | |||||
'issue', | |||||
'id', | |||||
)); | |||||
if ($issue_id) { | |||||
$container_key = "github.issue.{$issue_id}"; | |||||
} | |||||
return NuanceItem::initializeNewItem() | |||||
->setStatus(NuanceItem::STATUS_IMPORTING) | |||||
->setSourcePHID($source->getPHID()) | |||||
->setItemType('github.event') | |||||
->setItemKey($item_key) | |||||
->setItemContainerKey($container_key) | |||||
->setItemProperty('api.raw', $record); | |||||
} | |||||
} | } |