Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F14046604
D8785.id.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
12 KB
Referenced Files
None
Subscribers
None
D8785.id.diff
View Options
diff --git a/src/applications/repository/daemon/PhabricatorRepositoryPullLocalDaemon.php b/src/applications/repository/daemon/PhabricatorRepositoryPullLocalDaemon.php
--- a/src/applications/repository/daemon/PhabricatorRepositoryPullLocalDaemon.php
+++ b/src/applications/repository/daemon/PhabricatorRepositoryPullLocalDaemon.php
@@ -23,8 +23,6 @@
* repository).
*
* @task pull Pulling Repositories
- * @task git Git Implementation
- * @task hg Mercurial Implementation
*/
final class PhabricatorRepositoryPullLocalDaemon
extends PhabricatorDaemon {
@@ -60,8 +58,8 @@
));
$no_discovery = $args->getArg('no-discovery');
- $repo_names = $args->getArg('repositories');
- $exclude_names = $args->getArg('not');
+ $include = $args->getArg('repositories');
+ $exclude = $args->getArg('not');
// Each repository has an individual pull frequency; after we pull it,
// wait that long to pull it again. When we start up, try to pull everything
@@ -69,24 +67,27 @@
$retry_after = array();
$min_sleep = 15;
+ $max_futures = 4;
+ $futures = array();
+ $queue = array();
while (true) {
- $repositories = $this->loadRepositories($repo_names);
- if ($exclude_names) {
- $exclude = $this->loadRepositories($exclude_names);
- $repositories = array_diff_key($repositories, $exclude);
- }
-
- // Shuffle the repositories, then re-key the array since shuffle()
- // discards keys. This is mostly for startup, we'll use soft priorities
- // later.
- shuffle($repositories);
- $repositories = mpull($repositories, null, 'getID');
+ $pullable = $this->loadPullableRepositories($include, $exclude);
// If any repositories have the NEEDS_UPDATE flag set, pull them
// as soon as possible.
$need_update_messages = $this->loadRepositoryUpdateMessages();
foreach ($need_update_messages as $message) {
+ $repo = idx($pullable, $message->getRepositoryID());
+ if (!$repo) {
+ continue;
+ }
+
+ $this->log(
+ pht(
+ 'Got an update message for repository "%s"!',
+ $repo->getMonogram()));
+
$retry_after[$message->getRepositoryID()] = time();
}
@@ -95,102 +96,190 @@
// causes us to sleep for the minimum amount of time.
$retry_after = array_select_keys(
$retry_after,
- array_keys($repositories));
+ array_keys($pullable));
- // Assign soft priorities to repositories based on how frequently they
- // should pull again.
- asort($retry_after);
- $repositories = array_select_keys(
- $repositories,
- array_keys($retry_after)) + $repositories;
- foreach ($repositories as $id => $repository) {
- $after = idx($retry_after, $id, 0);
- if ($after > time()) {
+ // Figure out which repositories we need to queue for an update.
+ foreach ($pullable as $id => $repository) {
+ $monogram = $repository->getMonogram();
+
+ if (isset($futures[$id])) {
+ $this->log(pht('Repository "%s" is currently updating.', $monogram));
+ continue;
+ }
+
+ if (isset($queue[$id])) {
+ $this->log(pht('Repository "%s" is already queued.', $monogram));
continue;
}
- $tracked = $repository->isTracked();
- if (!$tracked) {
+ $after = idx($retry_after, $id, 0);
+ if ($after > time()) {
+ $this->log(
+ pht(
+ 'Repository "%s" is not due for an update for %s second(s).',
+ $monogram,
+ new PhutilNumber($after - time())));
continue;
}
- $callsign = $repository->getCallsign();
+ if (!$after) {
+ $this->log(
+ pht(
+ 'Scheduling repository "%s" for an initial update.',
+ $monogram));
+ } else {
+ $this->log(
+ pht(
+ 'Scheduling repository "%s" for an update (%s seconds overdue).',
+ $monogram,
+ new PhutilNumber(time() - $after)));
+ }
- try {
- $this->log("Updating repository '{$callsign}'.");
+ $queue[$id] = $after;
+ }
- $bin_dir = dirname(phutil_get_library_root('phabricator')).'/bin';
- $flags = array();
- if ($no_discovery) {
- $flags[] = '--no-discovery';
+ // Process repositories in the order they became candidates for updates.
+ asort($queue);
+
+ // Dequeue repositories until we hit maximum parallelism.
+ while ($queue && (count($futures) < $max_futures)) {
+ foreach ($queue as $id => $time) {
+ $repository = idx($pullable, $id);
+ if (!$repository) {
+ $this->log(
+ pht('Repository %s is no longer pullable; skipping.', $id));
+ break;
}
- list($stdout, $stderr) = execx(
- '%s/repository update %Ls -- %s',
- $bin_dir,
- $flags,
- $callsign);
-
- if (strlen($stderr)) {
- $stderr_msg = pht(
- 'Unexpected output while updating the %s repository: %s',
- $callsign,
- $stderr);
- phlog($stderr_msg);
- }
+ $monogram = $repository->getMonogram();
+ $this->log(pht('Starting update for repository "%s".', $monogram));
- $sleep_for = $repository->getDetail('pull-frequency', $min_sleep);
- $retry_after[$id] = time() + $sleep_for;
- } catch (Exception $ex) {
- $retry_after[$id] = time() + $min_sleep;
+ unset($queue[$id]);
+ $futures[$id] = $this->buildUpdateFuture(
+ $repository,
+ $no_discovery);
- $proxy = new PhutilProxyException(
- "Error while fetching changes to the '{$callsign}' repository.",
- $ex);
- phlog($proxy);
+ break;
}
-
- $this->stillWorking();
}
- if ($retry_after) {
- $sleep_until = max(min($retry_after), time() + $min_sleep);
- } else {
- $sleep_until = time() + $min_sleep;
+ if ($queue) {
+ $this->log(
+ pht(
+ 'Not enough process slots to schedule the other %s '.
+ 'repository(s) for updates yet.',
+ new PhutilNumber(count($queue))));
}
- while (($sleep_until - time()) > 0) {
- $this->sleep(1);
- if ($this->loadRepositoryUpdateMessages()) {
+ if ($futures) {
+ $iterator = id(new FutureIterator($futures))
+ ->setUpdateInterval($min_sleep);
+
+ foreach ($iterator as $id => $future) {
+ $this->stillWorking();
+
+ if ($future === null) {
+ $this->log(pht('Waiting for updates to complete...'));
+ $this->stillWorking();
+
+ if ($this->loadRepositoryUpdateMessages()) {
+ $this->log(pht('Interrupted by pending updates!'));
+ break;
+ }
+
+ continue;
+ }
+
+ unset($futures[$id]);
+ $retry_after[$id] = $this->resolveUpdateFuture(
+ $pullable[$id],
+ $future,
+ $min_sleep);
+
+ // We have a free slot now, so go try to fill it.
break;
}
+
+ // Jump back into prioritization if we had any futures to deal with.
+ continue;
}
+
+ $this->waitForUpdates($min_sleep, $retry_after);
+ }
+
+ }
+
+
+ /**
+ * @task pull
+ */
+ private function buildUpdateFuture(
+ PhabricatorRepository $repository,
+ $no_discovery) {
+
+ $bin = dirname(phutil_get_library_root('phabricator')).'/bin/repository';
+
+ $flags = array();
+ if ($no_discovery) {
+ $flags[] = '--no-discovery';
+ }
+
+ $callsign = $repository->getCallsign();
+
+ $future = new ExecFuture('%s update %Ls -- %s', $bin, $flags, $callsign);
+
+ // Sometimes, the underlying VCS commands will hang indefinitely. We've
+ // observed this occasionally with GitHub, and other users have observed
+ // it with other VCS servers.
+
+ // To limit the damage this can cause, kill the update out after a
+ // reasonable amount of time, under the assumption that it has hung.
+
+ // Since it's hard to know what a "reasonable" amount of time is given that
+ // users may be downloading a repository full of pirated movies over a
+ // potato, these limits are fairly generous. Repositories exceeding these
+ // limits can be manually pulled with `bin/repository update X`, which can
+ // just run for as long as it wants.
+
+ if ($repository->isImporting()) {
+ $timeout = phutil_units('4 hours in seconds');
+ } else {
+ $timeout = phutil_units('15 minutes in seconds');
}
+
+ $future->setTimeout($timeout);
+
+ return $future;
}
+
+ /**
+ * @task pull
+ */
private function loadRepositoryUpdateMessages() {
$type_need_update = PhabricatorRepositoryStatusMessage::TYPE_NEEDS_UPDATE;
return id(new PhabricatorRepositoryStatusMessage())
->loadAllWhere('statusType = %s', $type_need_update);
}
+
/**
* @task pull
*/
- protected function loadRepositories(array $names) {
+ private function loadPullableRepositories(array $include, array $exclude) {
$query = id(new PhabricatorRepositoryQuery())
->setViewer($this->getViewer());
- if ($names) {
- $query->withCallsigns($names);
+ if ($include) {
+ $query->withCallsigns($include);
}
- $repos = $query->execute();
+ $repositories = $query->execute();
- if ($names) {
- $by_callsign = mpull($repos, null, 'getCallsign');
- foreach ($names as $name) {
+ if ($include) {
+ $by_callsign = mpull($repositories, null, 'getCallsign');
+ foreach ($include as $name) {
if (empty($by_callsign[$name])) {
throw new Exception(
"No repository exists with callsign '{$name}'!");
@@ -198,7 +287,103 @@
}
}
- return $repos;
+ if ($exclude) {
+ $exclude = array_fuse($exclude);
+ foreach ($repositories as $key => $repository) {
+ if (isset($exclude[$repository->getCallsign()])) {
+ unset($repositories[$key]);
+ }
+ }
+ }
+
+ foreach ($repositories as $key => $repository) {
+ if (!$repository->isTracked()) {
+ unset($repositories[$key]);
+ }
+ }
+
+ // Shuffle the repositories, then re-key the array since shuffle()
+ // discards keys. This is mostly for startup, we'll use soft priorities
+ // later.
+ shuffle($repositories);
+ $repositories = mpull($repositories, null, 'getID');
+
+ return $repositories;
+ }
+
+
+ /**
+ * @task pull
+ */
+ private function resolveUpdateFuture(
+ PhabricatorRepository $repository,
+ ExecFuture $future,
+ $min_sleep) {
+
+ $monogram = $repository->getMonogram();
+
+ $this->log(pht('Resolving update for "%s".', $monogram));
+
+ try {
+ list($stdout, $stderr) = $future->resolvex();
+ } catch (Exception $ex) {
+ $proxy = new PhutilProxyException(
+ pht(
+ 'Error while updating the "%s" repository.',
+ $repository->getMonogram()),
+ $ex);
+ phlog($proxy);
+
+ return time() + $min_sleep;
+ }
+
+ if (strlen($stderr)) {
+ $stderr_msg = pht(
+ 'Unexpected output while updating repository "%s": %s',
+ $monogram,
+ $stderr);
+ phlog($stderr_msg);
+ }
+
+ $sleep_for = (int)$repository->getDetail('pull-frequency', $min_sleep);
+ if ($sleep_for < $min_sleep) {
+ $sleep_for = $min_sleep;
+ }
+
+ return time() + $sleep_for;
+ }
+
+
+
+ /**
+ * Sleep for a short period of time, waiting for update messages from the
+ *
+ *
+ * @task pull
+ */
+ private function waitForUpdates($min_sleep, array $retry_after) {
+ $this->log(
+ pht('No repositories need updates right now, sleeping...'));
+
+ $sleep_until = time() + $min_sleep;
+ if ($retry_after) {
+ $sleep_until = min($sleep_until, min($retry_after));
+ }
+
+ while (($sleep_until - time()) > 0) {
+ $sleep_duration = ($sleep_until - time());
+
+ $this->log(
+ pht(
+ 'Sleeping for %s more second(s)...',
+ new PhutilNumber($sleep_duration)));
+
+ $this->sleep(1);
+ if ($this->loadRepositoryUpdateMessages()) {
+ $this->log(pht('Awakened from sleep by pending updates!'));
+ break;
+ }
+ }
}
}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Nov 14, 10:19 PM (4 d, 9 h ago)
Storage Engine
blob
Storage Format
Encrypted (AES-256-CBC)
Storage Handle
6719461
Default Alt Text
D8785.id.diff (12 KB)
Attached To
Mode
D8785: Make PullLocal daemon more flexible and transparent about scheduling
Attached
Detach File
Event Timeline
Log In to Comment