diff --git a/src/applications/repository/daemon/PhabricatorRepositoryPullLocalDaemon.php b/src/applications/repository/daemon/PhabricatorRepositoryPullLocalDaemon.php index db7fe9222f..f3627c2e9c 100644 --- a/src/applications/repository/daemon/PhabricatorRepositoryPullLocalDaemon.php +++ b/src/applications/repository/daemon/PhabricatorRepositoryPullLocalDaemon.php @@ -1,204 +1,389 @@ getArgv(); array_unshift($argv, __CLASS__); $args = new PhutilArgumentParser($argv); $args->parse( array( array( 'name' => 'no-discovery', 'help' => 'Pull only, without discovering commits.', ), array( 'name' => 'not', 'param' => 'repository', 'repeat' => true, 'help' => 'Do not pull __repository__.', ), array( 'name' => 'repositories', 'wildcard' => true, 'help' => 'Pull specific __repositories__ instead of all.', ), )); $no_discovery = $args->getArg('no-discovery'); - $repo_names = $args->getArg('repositories'); - $exclude_names = $args->getArg('not'); + $include = $args->getArg('repositories'); + $exclude = $args->getArg('not'); // Each repository has an individual pull frequency; after we pull it, // wait that long to pull it again. When we start up, try to pull everything // serially. $retry_after = array(); $min_sleep = 15; + $max_futures = 4; + $futures = array(); + $queue = array(); while (true) { - $repositories = $this->loadRepositories($repo_names); - if ($exclude_names) { - $exclude = $this->loadRepositories($exclude_names); - $repositories = array_diff_key($repositories, $exclude); - } - - // Shuffle the repositories, then re-key the array since shuffle() - // discards keys. This is mostly for startup, we'll use soft priorities - // later. - shuffle($repositories); - $repositories = mpull($repositories, null, 'getID'); + $pullable = $this->loadPullableRepositories($include, $exclude); // If any repositories have the NEEDS_UPDATE flag set, pull them // as soon as possible. $need_update_messages = $this->loadRepositoryUpdateMessages(); foreach ($need_update_messages as $message) { + $repo = idx($pullable, $message->getRepositoryID()); + if (!$repo) { + continue; + } + + $this->log( + pht( + 'Got an update message for repository "%s"!', + $repo->getMonogram())); + $retry_after[$message->getRepositoryID()] = time(); } // If any repositories were deleted, remove them from the retry timer map // so we don't end up with a retry timer that never gets updated and // causes us to sleep for the minimum amount of time. $retry_after = array_select_keys( $retry_after, - array_keys($repositories)); + array_keys($pullable)); - // Assign soft priorities to repositories based on how frequently they - // should pull again. - asort($retry_after); - $repositories = array_select_keys( - $repositories, - array_keys($retry_after)) + $repositories; - foreach ($repositories as $id => $repository) { - $after = idx($retry_after, $id, 0); - if ($after > time()) { + // Figure out which repositories we need to queue for an update. + foreach ($pullable as $id => $repository) { + $monogram = $repository->getMonogram(); + + if (isset($futures[$id])) { + $this->log(pht('Repository "%s" is currently updating.', $monogram)); + continue; + } + + if (isset($queue[$id])) { + $this->log(pht('Repository "%s" is already queued.', $monogram)); continue; } - $tracked = $repository->isTracked(); - if (!$tracked) { + $after = idx($retry_after, $id, 0); + if ($after > time()) { + $this->log( + pht( + 'Repository "%s" is not due for an update for %s second(s).', + $monogram, + new PhutilNumber($after - time()))); continue; } - $callsign = $repository->getCallsign(); + if (!$after) { + $this->log( + pht( + 'Scheduling repository "%s" for an initial update.', + $monogram)); + } else { + $this->log( + pht( + 'Scheduling repository "%s" for an update (%s seconds overdue).', + $monogram, + new PhutilNumber(time() - $after))); + } - try { - $this->log("Updating repository '{$callsign}'."); + $queue[$id] = $after; + } - $bin_dir = dirname(phutil_get_library_root('phabricator')).'/bin'; - $flags = array(); - if ($no_discovery) { - $flags[] = '--no-discovery'; + // Process repositories in the order they became candidates for updates. + asort($queue); + + // Dequeue repositories until we hit maximum parallelism. + while ($queue && (count($futures) < $max_futures)) { + foreach ($queue as $id => $time) { + $repository = idx($pullable, $id); + if (!$repository) { + $this->log( + pht('Repository %s is no longer pullable; skipping.', $id)); + break; } - list($stdout, $stderr) = execx( - '%s/repository update %Ls -- %s', - $bin_dir, - $flags, - $callsign); - - if (strlen($stderr)) { - $stderr_msg = pht( - 'Unexpected output while updating the %s repository: %s', - $callsign, - $stderr); - phlog($stderr_msg); - } + $monogram = $repository->getMonogram(); + $this->log(pht('Starting update for repository "%s".', $monogram)); - $sleep_for = $repository->getDetail('pull-frequency', $min_sleep); - $retry_after[$id] = time() + $sleep_for; - } catch (Exception $ex) { - $retry_after[$id] = time() + $min_sleep; + unset($queue[$id]); + $futures[$id] = $this->buildUpdateFuture( + $repository, + $no_discovery); - $proxy = new PhutilProxyException( - "Error while fetching changes to the '{$callsign}' repository.", - $ex); - phlog($proxy); + break; } - - $this->stillWorking(); } - if ($retry_after) { - $sleep_until = max(min($retry_after), time() + $min_sleep); - } else { - $sleep_until = time() + $min_sleep; + if ($queue) { + $this->log( + pht( + 'Not enough process slots to schedule the other %s '. + 'repository(s) for updates yet.', + new PhutilNumber(count($queue)))); } - while (($sleep_until - time()) > 0) { - $this->sleep(1); - if ($this->loadRepositoryUpdateMessages()) { + if ($futures) { + $iterator = id(new FutureIterator($futures)) + ->setUpdateInterval($min_sleep); + + foreach ($iterator as $id => $future) { + $this->stillWorking(); + + if ($future === null) { + $this->log(pht('Waiting for updates to complete...')); + $this->stillWorking(); + + if ($this->loadRepositoryUpdateMessages()) { + $this->log(pht('Interrupted by pending updates!')); + break; + } + + continue; + } + + unset($futures[$id]); + $retry_after[$id] = $this->resolveUpdateFuture( + $pullable[$id], + $future, + $min_sleep); + + // We have a free slot now, so go try to fill it. break; } + + // Jump back into prioritization if we had any futures to deal with. + continue; } + + $this->waitForUpdates($min_sleep, $retry_after); + } + + } + + + /** + * @task pull + */ + private function buildUpdateFuture( + PhabricatorRepository $repository, + $no_discovery) { + + $bin = dirname(phutil_get_library_root('phabricator')).'/bin/repository'; + + $flags = array(); + if ($no_discovery) { + $flags[] = '--no-discovery'; + } + + $callsign = $repository->getCallsign(); + + $future = new ExecFuture('%s update %Ls -- %s', $bin, $flags, $callsign); + + // Sometimes, the underlying VCS commands will hang indefinitely. We've + // observed this occasionally with GitHub, and other users have observed + // it with other VCS servers. + + // To limit the damage this can cause, kill the update out after a + // reasonable amount of time, under the assumption that it has hung. + + // Since it's hard to know what a "reasonable" amount of time is given that + // users may be downloading a repository full of pirated movies over a + // potato, these limits are fairly generous. Repositories exceeding these + // limits can be manually pulled with `bin/repository update X`, which can + // just run for as long as it wants. + + if ($repository->isImporting()) { + $timeout = phutil_units('4 hours in seconds'); + } else { + $timeout = phutil_units('15 minutes in seconds'); } + + $future->setTimeout($timeout); + + return $future; } + + /** + * @task pull + */ private function loadRepositoryUpdateMessages() { $type_need_update = PhabricatorRepositoryStatusMessage::TYPE_NEEDS_UPDATE; return id(new PhabricatorRepositoryStatusMessage()) ->loadAllWhere('statusType = %s', $type_need_update); } + /** * @task pull */ - protected function loadRepositories(array $names) { + private function loadPullableRepositories(array $include, array $exclude) { $query = id(new PhabricatorRepositoryQuery()) ->setViewer($this->getViewer()); - if ($names) { - $query->withCallsigns($names); + if ($include) { + $query->withCallsigns($include); } - $repos = $query->execute(); + $repositories = $query->execute(); - if ($names) { - $by_callsign = mpull($repos, null, 'getCallsign'); - foreach ($names as $name) { + if ($include) { + $by_callsign = mpull($repositories, null, 'getCallsign'); + foreach ($include as $name) { if (empty($by_callsign[$name])) { throw new Exception( "No repository exists with callsign '{$name}'!"); } } } - return $repos; + if ($exclude) { + $exclude = array_fuse($exclude); + foreach ($repositories as $key => $repository) { + if (isset($exclude[$repository->getCallsign()])) { + unset($repositories[$key]); + } + } + } + + foreach ($repositories as $key => $repository) { + if (!$repository->isTracked()) { + unset($repositories[$key]); + } + } + + // Shuffle the repositories, then re-key the array since shuffle() + // discards keys. This is mostly for startup, we'll use soft priorities + // later. + shuffle($repositories); + $repositories = mpull($repositories, null, 'getID'); + + return $repositories; + } + + + /** + * @task pull + */ + private function resolveUpdateFuture( + PhabricatorRepository $repository, + ExecFuture $future, + $min_sleep) { + + $monogram = $repository->getMonogram(); + + $this->log(pht('Resolving update for "%s".', $monogram)); + + try { + list($stdout, $stderr) = $future->resolvex(); + } catch (Exception $ex) { + $proxy = new PhutilProxyException( + pht( + 'Error while updating the "%s" repository.', + $repository->getMonogram()), + $ex); + phlog($proxy); + + return time() + $min_sleep; + } + + if (strlen($stderr)) { + $stderr_msg = pht( + 'Unexpected output while updating repository "%s": %s', + $monogram, + $stderr); + phlog($stderr_msg); + } + + $sleep_for = (int)$repository->getDetail('pull-frequency', $min_sleep); + if ($sleep_for < $min_sleep) { + $sleep_for = $min_sleep; + } + + return time() + $sleep_for; + } + + + + /** + * Sleep for a short period of time, waiting for update messages from the + * + * + * @task pull + */ + private function waitForUpdates($min_sleep, array $retry_after) { + $this->log( + pht('No repositories need updates right now, sleeping...')); + + $sleep_until = time() + $min_sleep; + if ($retry_after) { + $sleep_until = min($sleep_until, min($retry_after)); + } + + while (($sleep_until - time()) > 0) { + $sleep_duration = ($sleep_until - time()); + + $this->log( + pht( + 'Sleeping for %s more second(s)...', + new PhutilNumber($sleep_duration))); + + $this->sleep(1); + if ($this->loadRepositoryUpdateMessages()) { + $this->log(pht('Awakened from sleep by pending updates!')); + break; + } + } } }