Page MenuHomePhabricator

D8785.id.diff
No OneTemporary

D8785.id.diff

diff --git a/src/applications/repository/daemon/PhabricatorRepositoryPullLocalDaemon.php b/src/applications/repository/daemon/PhabricatorRepositoryPullLocalDaemon.php
--- a/src/applications/repository/daemon/PhabricatorRepositoryPullLocalDaemon.php
+++ b/src/applications/repository/daemon/PhabricatorRepositoryPullLocalDaemon.php
@@ -23,8 +23,6 @@
* repository).
*
* @task pull Pulling Repositories
- * @task git Git Implementation
- * @task hg Mercurial Implementation
*/
final class PhabricatorRepositoryPullLocalDaemon
extends PhabricatorDaemon {
@@ -60,8 +58,8 @@
));
$no_discovery = $args->getArg('no-discovery');
- $repo_names = $args->getArg('repositories');
- $exclude_names = $args->getArg('not');
+ $include = $args->getArg('repositories');
+ $exclude = $args->getArg('not');
// Each repository has an individual pull frequency; after we pull it,
// wait that long to pull it again. When we start up, try to pull everything
@@ -69,24 +67,27 @@
$retry_after = array();
$min_sleep = 15;
+ $max_futures = 4;
+ $futures = array();
+ $queue = array();
while (true) {
- $repositories = $this->loadRepositories($repo_names);
- if ($exclude_names) {
- $exclude = $this->loadRepositories($exclude_names);
- $repositories = array_diff_key($repositories, $exclude);
- }
-
- // Shuffle the repositories, then re-key the array since shuffle()
- // discards keys. This is mostly for startup, we'll use soft priorities
- // later.
- shuffle($repositories);
- $repositories = mpull($repositories, null, 'getID');
+ $pullable = $this->loadPullableRepositories($include, $exclude);
// If any repositories have the NEEDS_UPDATE flag set, pull them
// as soon as possible.
$need_update_messages = $this->loadRepositoryUpdateMessages();
foreach ($need_update_messages as $message) {
+ $repo = idx($pullable, $message->getRepositoryID());
+ if (!$repo) {
+ continue;
+ }
+
+ $this->log(
+ pht(
+ 'Got an update message for repository "%s"!',
+ $repo->getMonogram()));
+
$retry_after[$message->getRepositoryID()] = time();
}
@@ -95,102 +96,190 @@
// causes us to sleep for the minimum amount of time.
$retry_after = array_select_keys(
$retry_after,
- array_keys($repositories));
+ array_keys($pullable));
- // Assign soft priorities to repositories based on how frequently they
- // should pull again.
- asort($retry_after);
- $repositories = array_select_keys(
- $repositories,
- array_keys($retry_after)) + $repositories;
- foreach ($repositories as $id => $repository) {
- $after = idx($retry_after, $id, 0);
- if ($after > time()) {
+ // Figure out which repositories we need to queue for an update.
+ foreach ($pullable as $id => $repository) {
+ $monogram = $repository->getMonogram();
+
+ if (isset($futures[$id])) {
+ $this->log(pht('Repository "%s" is currently updating.', $monogram));
+ continue;
+ }
+
+ if (isset($queue[$id])) {
+ $this->log(pht('Repository "%s" is already queued.', $monogram));
continue;
}
- $tracked = $repository->isTracked();
- if (!$tracked) {
+ $after = idx($retry_after, $id, 0);
+ if ($after > time()) {
+ $this->log(
+ pht(
+ 'Repository "%s" is not due for an update for %s second(s).',
+ $monogram,
+ new PhutilNumber($after - time())));
continue;
}
- $callsign = $repository->getCallsign();
+ if (!$after) {
+ $this->log(
+ pht(
+ 'Scheduling repository "%s" for an initial update.',
+ $monogram));
+ } else {
+ $this->log(
+ pht(
+ 'Scheduling repository "%s" for an update (%s seconds overdue).',
+ $monogram,
+ new PhutilNumber(time() - $after)));
+ }
- try {
- $this->log("Updating repository '{$callsign}'.");
+ $queue[$id] = $after;
+ }
- $bin_dir = dirname(phutil_get_library_root('phabricator')).'/bin';
- $flags = array();
- if ($no_discovery) {
- $flags[] = '--no-discovery';
+ // Process repositories in the order they became candidates for updates.
+ asort($queue);
+
+ // Dequeue repositories until we hit maximum parallelism.
+ while ($queue && (count($futures) < $max_futures)) {
+ foreach ($queue as $id => $time) {
+ $repository = idx($pullable, $id);
+ if (!$repository) {
+ $this->log(
+ pht('Repository %s is no longer pullable; skipping.', $id));
+ break;
}
- list($stdout, $stderr) = execx(
- '%s/repository update %Ls -- %s',
- $bin_dir,
- $flags,
- $callsign);
-
- if (strlen($stderr)) {
- $stderr_msg = pht(
- 'Unexpected output while updating the %s repository: %s',
- $callsign,
- $stderr);
- phlog($stderr_msg);
- }
+ $monogram = $repository->getMonogram();
+ $this->log(pht('Starting update for repository "%s".', $monogram));
- $sleep_for = $repository->getDetail('pull-frequency', $min_sleep);
- $retry_after[$id] = time() + $sleep_for;
- } catch (Exception $ex) {
- $retry_after[$id] = time() + $min_sleep;
+ unset($queue[$id]);
+ $futures[$id] = $this->buildUpdateFuture(
+ $repository,
+ $no_discovery);
- $proxy = new PhutilProxyException(
- "Error while fetching changes to the '{$callsign}' repository.",
- $ex);
- phlog($proxy);
+ break;
}
-
- $this->stillWorking();
}
- if ($retry_after) {
- $sleep_until = max(min($retry_after), time() + $min_sleep);
- } else {
- $sleep_until = time() + $min_sleep;
+ if ($queue) {
+ $this->log(
+ pht(
+ 'Not enough process slots to schedule the other %s '.
+ 'repository(s) for updates yet.',
+ new PhutilNumber(count($queue))));
}
- while (($sleep_until - time()) > 0) {
- $this->sleep(1);
- if ($this->loadRepositoryUpdateMessages()) {
+ if ($futures) {
+ $iterator = id(new FutureIterator($futures))
+ ->setUpdateInterval($min_sleep);
+
+ foreach ($iterator as $id => $future) {
+ $this->stillWorking();
+
+ if ($future === null) {
+ $this->log(pht('Waiting for updates to complete...'));
+ $this->stillWorking();
+
+ if ($this->loadRepositoryUpdateMessages()) {
+ $this->log(pht('Interrupted by pending updates!'));
+ break;
+ }
+
+ continue;
+ }
+
+ unset($futures[$id]);
+ $retry_after[$id] = $this->resolveUpdateFuture(
+ $pullable[$id],
+ $future,
+ $min_sleep);
+
+ // We have a free slot now, so go try to fill it.
break;
}
+
+ // Jump back into prioritization if we had any futures to deal with.
+ continue;
}
+
+ $this->waitForUpdates($min_sleep, $retry_after);
+ }
+
+ }
+
+
+ /**
+ * @task pull
+ */
+ private function buildUpdateFuture(
+ PhabricatorRepository $repository,
+ $no_discovery) {
+
+ $bin = dirname(phutil_get_library_root('phabricator')).'/bin/repository';
+
+ $flags = array();
+ if ($no_discovery) {
+ $flags[] = '--no-discovery';
+ }
+
+ $callsign = $repository->getCallsign();
+
+ $future = new ExecFuture('%s update %Ls -- %s', $bin, $flags, $callsign);
+
+ // Sometimes, the underlying VCS commands will hang indefinitely. We've
+ // observed this occasionally with GitHub, and other users have observed
+ // it with other VCS servers.
+
+ // To limit the damage this can cause, kill the update out after a
+ // reasonable amount of time, under the assumption that it has hung.
+
+ // Since it's hard to know what a "reasonable" amount of time is given that
+ // users may be downloading a repository full of pirated movies over a
+ // potato, these limits are fairly generous. Repositories exceeding these
+ // limits can be manually pulled with `bin/repository update X`, which can
+ // just run for as long as it wants.
+
+ if ($repository->isImporting()) {
+ $timeout = phutil_units('4 hours in seconds');
+ } else {
+ $timeout = phutil_units('15 minutes in seconds');
}
+
+ $future->setTimeout($timeout);
+
+ return $future;
}
+
+ /**
+ * @task pull
+ */
private function loadRepositoryUpdateMessages() {
$type_need_update = PhabricatorRepositoryStatusMessage::TYPE_NEEDS_UPDATE;
return id(new PhabricatorRepositoryStatusMessage())
->loadAllWhere('statusType = %s', $type_need_update);
}
+
/**
* @task pull
*/
- protected function loadRepositories(array $names) {
+ private function loadPullableRepositories(array $include, array $exclude) {
$query = id(new PhabricatorRepositoryQuery())
->setViewer($this->getViewer());
- if ($names) {
- $query->withCallsigns($names);
+ if ($include) {
+ $query->withCallsigns($include);
}
- $repos = $query->execute();
+ $repositories = $query->execute();
- if ($names) {
- $by_callsign = mpull($repos, null, 'getCallsign');
- foreach ($names as $name) {
+ if ($include) {
+ $by_callsign = mpull($repositories, null, 'getCallsign');
+ foreach ($include as $name) {
if (empty($by_callsign[$name])) {
throw new Exception(
"No repository exists with callsign '{$name}'!");
@@ -198,7 +287,103 @@
}
}
- return $repos;
+ if ($exclude) {
+ $exclude = array_fuse($exclude);
+ foreach ($repositories as $key => $repository) {
+ if (isset($exclude[$repository->getCallsign()])) {
+ unset($repositories[$key]);
+ }
+ }
+ }
+
+ foreach ($repositories as $key => $repository) {
+ if (!$repository->isTracked()) {
+ unset($repositories[$key]);
+ }
+ }
+
+ // Shuffle the repositories, then re-key the array since shuffle()
+ // discards keys. This is mostly for startup, we'll use soft priorities
+ // later.
+ shuffle($repositories);
+ $repositories = mpull($repositories, null, 'getID');
+
+ return $repositories;
+ }
+
+
+ /**
+ * @task pull
+ */
+ private function resolveUpdateFuture(
+ PhabricatorRepository $repository,
+ ExecFuture $future,
+ $min_sleep) {
+
+ $monogram = $repository->getMonogram();
+
+ $this->log(pht('Resolving update for "%s".', $monogram));
+
+ try {
+ list($stdout, $stderr) = $future->resolvex();
+ } catch (Exception $ex) {
+ $proxy = new PhutilProxyException(
+ pht(
+ 'Error while updating the "%s" repository.',
+ $repository->getMonogram()),
+ $ex);
+ phlog($proxy);
+
+ return time() + $min_sleep;
+ }
+
+ if (strlen($stderr)) {
+ $stderr_msg = pht(
+ 'Unexpected output while updating repository "%s": %s',
+ $monogram,
+ $stderr);
+ phlog($stderr_msg);
+ }
+
+ $sleep_for = (int)$repository->getDetail('pull-frequency', $min_sleep);
+ if ($sleep_for < $min_sleep) {
+ $sleep_for = $min_sleep;
+ }
+
+ return time() + $sleep_for;
+ }
+
+
+
+ /**
+ * Sleep for a short period of time, waiting for update messages from the
+ *
+ *
+ * @task pull
+ */
+ private function waitForUpdates($min_sleep, array $retry_after) {
+ $this->log(
+ pht('No repositories need updates right now, sleeping...'));
+
+ $sleep_until = time() + $min_sleep;
+ if ($retry_after) {
+ $sleep_until = min($sleep_until, min($retry_after));
+ }
+
+ while (($sleep_until - time()) > 0) {
+ $sleep_duration = ($sleep_until - time());
+
+ $this->log(
+ pht(
+ 'Sleeping for %s more second(s)...',
+ new PhutilNumber($sleep_duration)));
+
+ $this->sleep(1);
+ if ($this->loadRepositoryUpdateMessages()) {
+ $this->log(pht('Awakened from sleep by pending updates!'));
+ break;
+ }
+ }
}
}

File Metadata

Mime Type
text/plain
Expires
Thu, Nov 14, 10:19 PM (4 d, 9 h ago)
Storage Engine
blob
Storage Format
Encrypted (AES-256-CBC)
Storage Handle
6719461
Default Alt Text
D8785.id.diff (12 KB)

Event Timeline