diff --git a/resources/sql/autopatches/20140711.workerpriority.sql b/resources/sql/autopatches/20140711.workerpriority.sql new file mode 100644 index 0000000000..3fabc545c3 --- /dev/null +++ b/resources/sql/autopatches/20140711.workerpriority.sql @@ -0,0 +1,11 @@ +ALTER TABLE {$NAMESPACE}_worker.worker_activetask + ADD COLUMN priority int unsigned NOT NULL; + +ALTER TABLE {$NAMESPACE}_worker.worker_activetask + ADD KEY (leaseOwner, priority, id); + +ALTER TABLE {$NAMESPACE}_worker.worker_archivetask + ADD COLUMN priority int unsigned NOT NULL; + +ALTER TABLE {$NAMESPACE}_worker.worker_archivetask + ADD KEY (leaseOwner, priority, id); diff --git a/scripts/repository/reparse.php b/scripts/repository/reparse.php index 9e4e86e9ea..258ef28807 100755 --- a/scripts/repository/reparse.php +++ b/scripts/repository/reparse.php @@ -1,291 +1,294 @@ #!/usr/bin/env php setSynopsis(<<parseStandardArguments(); $args->parse( array( // what array( 'name' => 'revision', 'wildcard' => true, ), array( 'name' => 'all', 'param' => 'callsign or phid', 'help' => 'Reparse all commits in the specified repository. This '. 'mode queues parsers into the task queue; you must run '. 'taskmasters to actually do the parses. Use with '. '__--force-local__ to run the tasks locally instead of '. 'with taskmasters.', ), array( 'name' => 'min-date', 'param' => 'date', 'help' => 'Must be used with __--all__, this will exclude commits '. 'which are earlier than __date__.'. "\n".$min_date_usage_examples, ), // which parts array( 'name' => 'message', 'help' => 'Reparse commit messages.', ), array( 'name' => 'change', 'help' => 'Reparse changes.', ), array( 'name' => 'herald', 'help' => 'Reevaluate Herald rules (may send huge amounts of email!)', ), array( 'name' => 'owners', 'help' => 'Reevaluate related commits for owners packages (may '. 'delete existing relationship entries between your '. 'package and some old commits!)', ), array( 'name' => 'harbormaster', 'help' => 'EXPERIMENTAL. Execute Harbormaster.', ), // misc options array( 'name' => 'force', 'short' => 'f', 'help' => 'Act noninteractively, without prompting.', ), array( 'name' => 'force-local', 'help' => 'Only used with __--all__, use this to run the tasks '. 'locally instead of deferring them to taskmaster daemons.', ), )); $all_from_repo = $args->getArg('all'); $reparse_message = $args->getArg('message'); $reparse_change = $args->getArg('change'); $reparse_herald = $args->getArg('herald'); $reparse_owners = $args->getArg('owners'); $reparse_harbormaster = $args->getArg('harbormaster'); $reparse_what = $args->getArg('revision'); $force = $args->getArg('force'); $force_local = $args->getArg('force-local'); $min_date = $args->getArg('min-date'); if (!$all_from_repo && !$reparse_what) { usage('Specify a commit or repository to reparse.'); } if ($all_from_repo && $reparse_what) { $commits = implode(', ', $reparse_what); usage( "Specify a commit or repository to reparse, not both:\n". "All from repo: ".$all_from_repo."\n". "Commit(s) to reparse: ".$commits); } if (!$reparse_message && !$reparse_change && !$reparse_herald && !$reparse_owners && !$reparse_harbormaster) { usage('Specify what information to reparse with --message, --change, '. '--herald, --harbormaster, and/or --owners'); } $min_timestamp = false; if ($min_date) { $min_timestamp = strtotime($min_date); if (!$all_from_repo) { usage( "You must use --all if you specify --min-date\n". "e.g.\n". " ./reparse.php --all TEST --owners --min-date yesterday"); } // previous to PHP 5.1.0 you would compare with -1, instead of false if (false === $min_timestamp) { usage( "Supplied --min-date is not valid\n". "Supplied value: '".$min_date."'\n". $min_date_usage_examples); } } if ($reparse_owners && !$force) { echo phutil_console_wrap( 'You are about to recreate the relationship entries between the commits '. 'and the packages they touch. This might delete some existing '. 'relationship entries for some old commits.'); if (!phutil_console_confirm('Are you ready to continue?')) { echo "Cancelled.\n"; exit(1); } } $commits = array(); if ($all_from_repo) { $repository = id(new PhabricatorRepository())->loadOneWhere( 'callsign = %s OR phid = %s', $all_from_repo, $all_from_repo); if (!$repository) { throw new Exception("Unknown repository {$all_from_repo}!"); } $constraint = ''; if ($min_timestamp) { echo "Excluding entries before UNIX timestamp: ".$min_timestamp."\n"; $table = new PhabricatorRepositoryCommit(); $conn_r = $table->establishConnection('r'); $constraint = qsprintf( $conn_r, 'AND epoch >= %d', $min_timestamp); } $commits = id(new PhabricatorRepositoryCommit())->loadAllWhere( 'repositoryID = %d %Q', $repository->getID(), $constraint); $callsign = $repository->getCallsign(); if (!$commits) { echo "No commits have been discovered in {$callsign} repository!\n"; exit; } } else { $commits = array(); foreach ($reparse_what as $identifier) { $matches = null; if (!preg_match('/r([A-Z]+)([a-z0-9]+)/', $identifier, $matches)) { throw new Exception("Can't parse commit identifier!"); } $callsign = $matches[1]; $commit_identifier = $matches[2]; $repository = id(new PhabricatorRepository())->loadOneWhere( 'callsign = %s', $callsign); if (!$repository) { throw new Exception("No repository with callsign '{$callsign}'!"); } $commit = id(new PhabricatorRepositoryCommit())->loadOneWhere( 'repositoryID = %d AND commitIdentifier = %s', $repository->getID(), $commit_identifier); if (!$commit) { throw new Exception( "No matching commit '{$commit_identifier}' in repository ". "'{$callsign}'. (For git and mercurial repositories, you must specify ". "the entire commit hash.)"); } $commits[] = $commit; } } if ($all_from_repo && !$force_local) { echo phutil_console_format( '**NOTE**: This script will queue tasks to reparse the data. Once the '. 'tasks have been queued, you need to run Taskmaster daemons to execute '. 'them.'); echo "\n\n"; echo "QUEUEING TASKS (".number_format(count($commits))." Commits):\n"; } $tasks = array(); foreach ($commits as $commit) { $classes = array(); switch ($repository->getVersionControlSystem()) { case PhabricatorRepositoryType::REPOSITORY_TYPE_GIT: if ($reparse_message) { $classes[] = 'PhabricatorRepositoryGitCommitMessageParserWorker'; } if ($reparse_change) { $classes[] = 'PhabricatorRepositoryGitCommitChangeParserWorker'; } break; case PhabricatorRepositoryType::REPOSITORY_TYPE_MERCURIAL: if ($reparse_message) { $classes[] = 'PhabricatorRepositoryMercurialCommitMessageParserWorker'; } if ($reparse_change) { $classes[] = 'PhabricatorRepositoryMercurialCommitChangeParserWorker'; } break; case PhabricatorRepositoryType::REPOSITORY_TYPE_SVN: if ($reparse_message) { $classes[] = 'PhabricatorRepositorySvnCommitMessageParserWorker'; } if ($reparse_change) { $classes[] = 'PhabricatorRepositorySvnCommitChangeParserWorker'; } break; } if ($reparse_herald) { $classes[] = 'PhabricatorRepositoryCommitHeraldWorker'; } if ($reparse_owners) { $classes[] = 'PhabricatorRepositoryCommitOwnersWorker'; } if ($reparse_harbormaster) { $classes[] = 'HarbormasterRunnerWorker'; } $spec = array( 'commitID' => $commit->getID(), 'only' => true, ); if ($all_from_repo && !$force_local) { foreach ($classes as $class) { - PhabricatorWorker::scheduleTask($class, $spec); + PhabricatorWorker::scheduleTask( + $class, + $spec, + PhabricatorWorker::PRIORITY_IMPORT); $commit_name = 'r'.$callsign.$commit->getCommitIdentifier(); echo " Queued '{$class}' for commit '{$commit_name}'.\n"; } } else { foreach ($classes as $class) { $worker = newv($class, array($spec)); echo "Running '{$class}'...\n"; $worker->executeTask(); } } } echo "\nDone.\n"; function usage($message) { echo phutil_console_format( '**Usage Exception:** '.$message."\n". "Use __--help__ to display full help\n"); exit(1); } diff --git a/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php b/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php index c810b9a3a7..e7ac2b992c 100644 --- a/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php +++ b/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php @@ -1,233 +1,236 @@ getRequest(); $user = $request->getUser(); $window_start = (time() - (60 * 15)); // Assume daemons spend about 250ms second in overhead per task acquiring // leases and doing other bookkeeping. This is probably an over-estimation, // but we'd rather show that utilization is too high than too low. $lease_overhead = 0.250; $completed = id(new PhabricatorWorkerArchiveTask())->loadAllWhere( 'dateModified > %d', $window_start); $failed = id(new PhabricatorWorkerActiveTask())->loadAllWhere( 'failureTime > %d', $window_start); $usage_total = 0; $usage_start = PHP_INT_MAX; $completed_info = array(); foreach ($completed as $completed_task) { $class = $completed_task->getTaskClass(); if (empty($completed_info[$class])) { $completed_info[$class] = array( 'n' => 0, 'duration' => 0, ); } $completed_info[$class]['n']++; $duration = $completed_task->getDuration(); $completed_info[$class]['duration'] += $duration; // NOTE: Duration is in microseconds, but we're just using seconds to // compute utilization. $usage_total += $lease_overhead + ($duration / 1000000); $usage_start = min($usage_start, $completed_task->getDateModified()); } $completed_info = isort($completed_info, 'n'); $rows = array(); foreach ($completed_info as $class => $info) { $rows[] = array( $class, number_format($info['n']), number_format((int)($info['duration'] / $info['n'])).' us', ); } if ($failed) { // Add the time it takes to restart the daemons. This includes a guess // about other overhead of 2X. $usage_total += PhutilDaemonOverseer::RESTART_WAIT * count($failed) * 2; foreach ($failed as $failed_task) { $usage_start = min($usage_start, $failed_task->getFailureTime()); } $rows[] = array( phutil_tag('em', array(), pht('Temporary Failures')), count($failed), null, ); } $logs = id(new PhabricatorDaemonLogQuery()) ->setViewer($user) ->withStatus(PhabricatorDaemonLogQuery::STATUS_ALIVE) ->setAllowStatusWrites(true) ->execute(); $taskmasters = 0; foreach ($logs as $log) { if ($log->getDaemon() == 'PhabricatorTaskmasterDaemon') { $taskmasters++; } } if ($taskmasters && $usage_total) { // Total number of wall-time seconds the daemons have been running since // the oldest event. For very short times round up to 15s so we don't // render any ridiculous numbers if you reload the page immediately after // restarting the daemons. $available_time = $taskmasters * max(15, (time() - $usage_start)); // Percentage of those wall-time seconds we can account for, which the // daemons spent doing work: $used_time = ($usage_total / $available_time); $rows[] = array( phutil_tag('em', array(), pht('Queue Utilization (Approximate)')), sprintf('%.1f%%', 100 * $used_time), null, ); } $completed_table = new AphrontTableView($rows); $completed_table->setNoDataString( pht('No tasks have completed in the last 15 minutes.')); $completed_table->setHeaders( array( pht('Class'), pht('Count'), pht('Avg'), )); $completed_table->setColumnClasses( array( 'wide', 'n', 'n', )); $completed_panel = new PHUIObjectBoxView(); $completed_panel->setHeaderText( pht('Recently Completed Tasks (Last 15m)')); $completed_panel->appendChild($completed_table); $daemon_table = new PhabricatorDaemonLogListView(); $daemon_table->setUser($user); $daemon_table->setDaemonLogs($logs); $tasks = id(new PhabricatorWorkerActiveTask())->loadAllWhere( 'leaseOwner IS NOT NULL'); $rows = array(); foreach ($tasks as $task) { $rows[] = array( $task->getID(), $task->getTaskClass(), $task->getLeaseOwner(), $task->getLeaseExpires() - time(), + $task->getPriority(), $task->getFailureCount(), phutil_tag( 'a', array( 'href' => '/daemon/task/'.$task->getID().'/', 'class' => 'button small grey', ), pht('View Task')), ); } $daemon_panel = new PHUIObjectBoxView(); $daemon_panel->setHeaderText(pht('Active Daemons')); $daemon_panel->appendChild($daemon_table); $leased_table = new AphrontTableView($rows); $leased_table->setHeaders( array( pht('ID'), pht('Class'), pht('Owner'), pht('Expires'), + pht('Priority'), pht('Failures'), '', )); $leased_table->setColumnClasses( array( 'n', 'wide', '', '', 'n', + 'n', 'action', )); $leased_table->setNoDataString(pht('No tasks are leased by workers.')); $leased_panel = new PHUIObjectBoxView(); $leased_panel->setHeaderText(pht('Leased Tasks')); $leased_panel->appendChild($leased_table); $task_table = new PhabricatorWorkerActiveTask(); $queued = queryfx_all( $task_table->establishConnection('r'), 'SELECT taskClass, count(*) N FROM %T GROUP BY taskClass ORDER BY N DESC', $task_table->getTableName()); $rows = array(); foreach ($queued as $row) { $rows[] = array( $row['taskClass'], number_format($row['N']), ); } $queued_table = new AphrontTableView($rows); $queued_table->setHeaders( array( pht('Class'), pht('Count'), )); $queued_table->setColumnClasses( array( 'wide', 'n', )); $queued_table->setNoDataString(pht('Task queue is empty.')); $queued_panel = new PHUIObjectBoxView(); $queued_panel->setHeaderText(pht('Queued Tasks')); $queued_panel->appendChild($queued_table); $crumbs = $this->buildApplicationCrumbs(); $crumbs->addTextCrumb(pht('Console')); $nav = $this->buildSideNavView(); $nav->selectFilter('/'); $nav->appendChild( array( $crumbs, $completed_panel, $daemon_panel, $queued_panel, $leased_panel, )); return $this->buildApplicationPage( $nav, array( 'title' => pht('Console'), 'device' => false, )); } } diff --git a/src/applications/diffusion/engine/DiffusionCommitHookEngine.php b/src/applications/diffusion/engine/DiffusionCommitHookEngine.php index ae1423cc21..09a65f3c3d 100644 --- a/src/applications/diffusion/engine/DiffusionCommitHookEngine.php +++ b/src/applications/diffusion/engine/DiffusionCommitHookEngine.php @@ -1,1185 +1,1186 @@ remoteProtocol = $remote_protocol; return $this; } public function getRemoteProtocol() { return $this->remoteProtocol; } public function setRemoteAddress($remote_address) { $this->remoteAddress = $remote_address; return $this; } public function getRemoteAddress() { return $this->remoteAddress; } private function getRemoteAddressForLog() { // If whatever we have here isn't a valid IPv4 address, just store `null`. // Older versions of PHP return `-1` on failure instead of `false`. $remote_address = $this->getRemoteAddress(); $remote_address = max(0, ip2long($remote_address)); $remote_address = nonempty($remote_address, null); return $remote_address; } public function setSubversionTransactionInfo($transaction, $repository) { $this->subversionTransaction = $transaction; $this->subversionRepository = $repository; return $this; } public function setStdin($stdin) { $this->stdin = $stdin; return $this; } public function getStdin() { return $this->stdin; } public function setOriginalArgv(array $original_argv) { $this->originalArgv = $original_argv; return $this; } public function getOriginalArgv() { return $this->originalArgv; } public function setRepository(PhabricatorRepository $repository) { $this->repository = $repository; return $this; } public function getRepository() { return $this->repository; } public function setViewer(PhabricatorUser $viewer) { $this->viewer = $viewer; return $this; } public function getViewer() { return $this->viewer; } public function setMercurialHook($mercurial_hook) { $this->mercurialHook = $mercurial_hook; return $this; } public function getMercurialHook() { return $this->mercurialHook; } /* -( Hook Execution )----------------------------------------------------- */ public function execute() { $ref_updates = $this->findRefUpdates(); $all_updates = $ref_updates; $caught = null; try { try { $this->rejectDangerousChanges($ref_updates); } catch (DiffusionCommitHookRejectException $ex) { // If we're rejecting dangerous changes, flag everything that we've // seen as rejected so it's clear that none of it was accepted. $this->rejectCode = PhabricatorRepositoryPushLog::REJECT_DANGEROUS; throw $ex; } $this->applyHeraldRefRules($ref_updates, $all_updates); $content_updates = $this->findContentUpdates($ref_updates); $all_updates = array_merge($all_updates, $content_updates); $this->applyHeraldContentRules($content_updates, $all_updates); // Run custom scripts in `hook.d/` directories. $this->applyCustomHooks($all_updates); // If we make it this far, we're accepting these changes. Mark all the // logs as accepted. $this->rejectCode = PhabricatorRepositoryPushLog::REJECT_ACCEPT; } catch (Exception $ex) { // We'll throw this again in a minute, but we want to save all the logs // first. $caught = $ex; } // Save all the logs no matter what the outcome was. $event = $this->newPushEvent(); $event->setRejectCode($this->rejectCode); $event->setRejectDetails($this->rejectDetails); $event->openTransaction(); $event->save(); foreach ($all_updates as $update) { $update->setPushEventPHID($event->getPHID()); $update->save(); } $event->saveTransaction(); if ($caught) { throw $caught; } if ($this->emailPHIDs) { // If Herald rules triggered email to users, queue a worker to send the // mail. We do this out-of-process so that we block pushes as briefly // as possible. // (We do need to pull some commit info here because the commit objects // may not exist yet when this worker runs, which could be immediately.) PhabricatorWorker::scheduleTask( 'PhabricatorRepositoryPushMailWorker', array( 'eventPHID' => $event->getPHID(), 'emailPHIDs' => array_values($this->emailPHIDs), 'info' => $this->loadCommitInfoForWorker($all_updates), - )); + ), + PhabricatorWorker::PRIORITY_ALERTS); } return 0; } private function findRefUpdates() { $type = $this->getRepository()->getVersionControlSystem(); switch ($type) { case PhabricatorRepositoryType::REPOSITORY_TYPE_GIT: return $this->findGitRefUpdates(); case PhabricatorRepositoryType::REPOSITORY_TYPE_MERCURIAL: return $this->findMercurialRefUpdates(); case PhabricatorRepositoryType::REPOSITORY_TYPE_SVN: return $this->findSubversionRefUpdates(); default: throw new Exception(pht('Unsupported repository type "%s"!', $type)); } } private function rejectDangerousChanges(array $ref_updates) { assert_instances_of($ref_updates, 'PhabricatorRepositoryPushLog'); $repository = $this->getRepository(); if ($repository->shouldAllowDangerousChanges()) { return; } $flag_dangerous = PhabricatorRepositoryPushLog::CHANGEFLAG_DANGEROUS; foreach ($ref_updates as $ref_update) { if (!$ref_update->hasChangeFlags($flag_dangerous)) { // This is not a dangerous change. continue; } // We either have a branch deletion or a non fast-forward branch update. // Format a message and reject the push. $message = pht( "DANGEROUS CHANGE: %s\n". "Dangerous change protection is enabled for this repository.\n". "Edit the repository configuration before making dangerous changes.", $ref_update->getDangerousChangeDescription()); throw new DiffusionCommitHookRejectException($message); } } private function findContentUpdates(array $ref_updates) { assert_instances_of($ref_updates, 'PhabricatorRepositoryPushLog'); $type = $this->getRepository()->getVersionControlSystem(); switch ($type) { case PhabricatorRepositoryType::REPOSITORY_TYPE_GIT: return $this->findGitContentUpdates($ref_updates); case PhabricatorRepositoryType::REPOSITORY_TYPE_MERCURIAL: return $this->findMercurialContentUpdates($ref_updates); case PhabricatorRepositoryType::REPOSITORY_TYPE_SVN: return $this->findSubversionContentUpdates($ref_updates); default: throw new Exception(pht('Unsupported repository type "%s"!', $type)); } } /* -( Herald )------------------------------------------------------------- */ private function applyHeraldRefRules( array $ref_updates, array $all_updates) { $this->applyHeraldRules( $ref_updates, new HeraldPreCommitRefAdapter(), $all_updates); } private function applyHeraldContentRules( array $content_updates, array $all_updates) { $this->applyHeraldRules( $content_updates, new HeraldPreCommitContentAdapter(), $all_updates); } private function applyHeraldRules( array $updates, HeraldAdapter $adapter_template, array $all_updates) { if (!$updates) { return; } $adapter_template->setHookEngine($this); $engine = new HeraldEngine(); $rules = null; $blocking_effect = null; $blocked_update = null; foreach ($updates as $update) { $adapter = id(clone $adapter_template) ->setPushLog($update); if ($rules === null) { $rules = $engine->loadRulesForAdapter($adapter); } $effects = $engine->applyRules($rules, $adapter); $engine->applyEffects($effects, $adapter, $rules); $xscript = $engine->getTranscript(); // Store any PHIDs we want to send email to for later. foreach ($adapter->getEmailPHIDs() as $email_phid) { $this->emailPHIDs[$email_phid] = $email_phid; } if ($blocking_effect === null) { foreach ($effects as $effect) { if ($effect->getAction() == HeraldAdapter::ACTION_BLOCK) { $blocking_effect = $effect; $blocked_update = $update; break; } } } } if ($blocking_effect) { $this->rejectCode = PhabricatorRepositoryPushLog::REJECT_HERALD; $this->rejectDetails = $blocking_effect->getRulePHID(); $message = $blocking_effect->getTarget(); if (!strlen($message)) { $message = pht('(None.)'); } $rules = mpull($rules, null, 'getID'); $rule = idx($rules, $effect->getRuleID()); if ($rule && strlen($rule->getName())) { $rule_name = $rule->getName(); } else { $rule_name = pht('Unnamed Herald Rule'); } $blocked_ref_name = coalesce( $blocked_update->getRefName(), $blocked_update->getRefNewShort()); $blocked_name = $blocked_update->getRefType().'/'.$blocked_ref_name; throw new DiffusionCommitHookRejectException( pht( "This push was rejected by Herald push rule %s.\n". "Change: %s\n". " Rule: %s\n". "Reason: %s", 'H'.$blocking_effect->getRuleID(), $blocked_name, $rule_name, $message)); } } public function loadViewerProjectPHIDsForHerald() { // This just caches the viewer's projects so we don't need to load them // over and over again when applying Herald rules. if ($this->heraldViewerProjects === null) { $this->heraldViewerProjects = id(new PhabricatorProjectQuery()) ->setViewer($this->getViewer()) ->withMemberPHIDs(array($this->getViewer()->getPHID())) ->execute(); } return mpull($this->heraldViewerProjects, 'getPHID'); } /* -( Git )---------------------------------------------------------------- */ private function findGitRefUpdates() { $ref_updates = array(); // First, parse stdin, which lists all the ref changes. The input looks // like this: // // $stdin = $this->getStdin(); $lines = phutil_split_lines($stdin, $retain_endings = false); foreach ($lines as $line) { $parts = explode(' ', $line, 3); if (count($parts) != 3) { throw new Exception(pht('Expected "old new ref", got "%s".', $line)); } $ref_old = $parts[0]; $ref_new = $parts[1]; $ref_raw = $parts[2]; if (preg_match('(^refs/heads/)', $ref_raw)) { $ref_type = PhabricatorRepositoryPushLog::REFTYPE_BRANCH; $ref_raw = substr($ref_raw, strlen('refs/heads/')); } else if (preg_match('(^refs/tags/)', $ref_raw)) { $ref_type = PhabricatorRepositoryPushLog::REFTYPE_TAG; $ref_raw = substr($ref_raw, strlen('refs/tags/')); } else { throw new Exception( pht( "Unable to identify the reftype of '%s'. Rejecting push.", $ref_raw)); } $ref_update = $this->newPushLog() ->setRefType($ref_type) ->setRefName($ref_raw) ->setRefOld($ref_old) ->setRefNew($ref_new); $ref_updates[] = $ref_update; } $this->findGitMergeBases($ref_updates); $this->findGitChangeFlags($ref_updates); return $ref_updates; } private function findGitMergeBases(array $ref_updates) { assert_instances_of($ref_updates, 'PhabricatorRepositoryPushLog'); $futures = array(); foreach ($ref_updates as $key => $ref_update) { // If the old hash is "00000...", the ref is being created (either a new // branch, or a new tag). If the new hash is "00000...", the ref is being // deleted. If both are nonempty, the ref is being updated. For updates, // we'll figure out the `merge-base` of the old and new objects here. This // lets us reject non-FF changes cheaply; later, we'll figure out exactly // which commits are new. $ref_old = $ref_update->getRefOld(); $ref_new = $ref_update->getRefNew(); if (($ref_old === self::EMPTY_HASH) || ($ref_new === self::EMPTY_HASH)) { continue; } $futures[$key] = $this->getRepository()->getLocalCommandFuture( 'merge-base %s %s', $ref_old, $ref_new); } foreach (Futures($futures)->limit(8) as $key => $future) { // If 'old' and 'new' have no common ancestors (for example, a force push // which completely rewrites a ref), `git merge-base` will exit with // an error and no output. It would be nice to find a positive test // for this instead, but I couldn't immediately come up with one. See // T4224. Assume this means there are no ancestors. list($err, $stdout) = $future->resolve(); if ($err) { $merge_base = null; } else { $merge_base = rtrim($stdout, "\n"); } $ref_update = $ref_updates[$key]; $ref_update->setMergeBase($merge_base); } return $ref_updates; } private function findGitChangeFlags(array $ref_updates) { assert_instances_of($ref_updates, 'PhabricatorRepositoryPushLog'); foreach ($ref_updates as $key => $ref_update) { $ref_old = $ref_update->getRefOld(); $ref_new = $ref_update->getRefNew(); $ref_type = $ref_update->getRefType(); $ref_flags = 0; $dangerous = null; if (($ref_old === self::EMPTY_HASH) && ($ref_new === self::EMPTY_HASH)) { // This happens if you try to delete a tag or branch which does not // exist by pushing directly to the ref. Git will warn about it but // allow it. Just call it a delete, without flagging it as dangerous. $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_DELETE; } else if ($ref_old === self::EMPTY_HASH) { $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_ADD; } else if ($ref_new === self::EMPTY_HASH) { $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_DELETE; if ($ref_type == PhabricatorRepositoryPushLog::REFTYPE_BRANCH) { $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_DANGEROUS; $dangerous = pht( "The change you're attempting to push deletes the branch '%s'.", $ref_update->getRefName()); } } else { $merge_base = $ref_update->getMergeBase(); if ($merge_base == $ref_old) { // This is a fast-forward update to an existing branch. // These are safe. $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_APPEND; } else { $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_REWRITE; // For now, we don't consider deleting or moving tags to be a // "dangerous" update. It's way harder to get wrong and should be easy // to recover from once we have better logging. Only add the dangerous // flag if this ref is a branch. if ($ref_type == PhabricatorRepositoryPushLog::REFTYPE_BRANCH) { $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_DANGEROUS; $dangerous = pht( "The change you're attempting to push updates the branch '%s' ". "from '%s' to '%s', but this is not a fast-forward. Pushes ". "which rewrite published branch history are dangerous.", $ref_update->getRefName(), $ref_update->getRefOldShort(), $ref_update->getRefNewShort()); } } } $ref_update->setChangeFlags($ref_flags); if ($dangerous !== null) { $ref_update->attachDangerousChangeDescription($dangerous); } } return $ref_updates; } private function findGitContentUpdates(array $ref_updates) { $flag_delete = PhabricatorRepositoryPushLog::CHANGEFLAG_DELETE; $futures = array(); foreach ($ref_updates as $key => $ref_update) { if ($ref_update->hasChangeFlags($flag_delete)) { // Deleting a branch or tag can never create any new commits. continue; } // NOTE: This piece of magic finds all new commits, by walking backward // from the new value to the value of *any* existing ref in the // repository. Particularly, this will cover the cases of a new branch, a // completely moved tag, etc. $futures[$key] = $this->getRepository()->getLocalCommandFuture( 'log --format=%s %s --not --all', '%H', $ref_update->getRefNew()); } $content_updates = array(); foreach (Futures($futures)->limit(8) as $key => $future) { list($stdout) = $future->resolvex(); if (!strlen(trim($stdout))) { // This change doesn't have any new commits. One common case of this // is creating a new tag which points at an existing commit. continue; } $commits = phutil_split_lines($stdout, $retain_newlines = false); // If we're looking at a branch, mark all of the new commits as on that // branch. It's only possible for these commits to be on updated branches, // since any other branch heads are necessarily behind them. $branch_name = null; $ref_update = $ref_updates[$key]; $type_branch = PhabricatorRepositoryPushLog::REFTYPE_BRANCH; if ($ref_update->getRefType() == $type_branch) { $branch_name = $ref_update->getRefName(); } foreach ($commits as $commit) { if ($branch_name) { $this->gitCommits[$commit][] = $branch_name; } $content_updates[$commit] = $this->newPushLog() ->setRefType(PhabricatorRepositoryPushLog::REFTYPE_COMMIT) ->setRefNew($commit) ->setChangeFlags(PhabricatorRepositoryPushLog::CHANGEFLAG_ADD); } } return $content_updates; } /* -( Custom )------------------------------------------------------------- */ private function applyCustomHooks(array $updates) { $args = $this->getOriginalArgv(); $stdin = $this->getStdin(); $console = PhutilConsole::getConsole(); $env = array( 'PHABRICATOR_REPOSITORY' => $this->getRepository()->getCallsign(), self::ENV_USER => $this->getViewer()->getUsername(), self::ENV_REMOTE_PROTOCOL => $this->getRemoteProtocol(), self::ENV_REMOTE_ADDRESS => $this->getRemoteAddress(), ); $directories = $this->getRepository()->getHookDirectories(); foreach ($directories as $directory) { $hooks = $this->getExecutablesInDirectory($directory); sort($hooks); foreach ($hooks as $hook) { // NOTE: We're explicitly running the hooks in sequential order to // make this more predictable. $future = id(new ExecFuture('%s %Ls', $hook, $args)) ->setEnv($env, $wipe_process_env = false) ->write($stdin); list($err, $stdout, $stderr) = $future->resolve(); if (!$err) { // This hook ran OK, but echo its output in case there was something // informative. $console->writeOut('%s', $stdout); $console->writeErr('%s', $stderr); continue; } $this->rejectCode = PhabricatorRepositoryPushLog::REJECT_EXTERNAL; $this->rejectDetails = basename($hook); throw new DiffusionCommitHookRejectException( pht( "This push was rejected by custom hook script '%s':\n\n%s%s", basename($hook), $stdout, $stderr)); } } } private function getExecutablesInDirectory($directory) { $executables = array(); if (!Filesystem::pathExists($directory)) { return $executables; } foreach (Filesystem::listDirectory($directory) as $path) { $full_path = $directory.DIRECTORY_SEPARATOR.$path; if (!is_executable($full_path)) { // Don't include non-executable files. continue; } if (basename($full_path) == 'README') { // Don't include README, even if it is marked as executable. It almost // certainly got caught in the crossfire of a sweeping `chmod`, since // users do this with some frequency. continue; } $executables[] = $full_path; } return $executables; } /* -( Mercurial )---------------------------------------------------------- */ private function findMercurialRefUpdates() { $hook = $this->getMercurialHook(); switch ($hook) { case 'pretxnchangegroup': return $this->findMercurialChangegroupRefUpdates(); case 'prepushkey': return $this->findMercurialPushKeyRefUpdates(); default: throw new Exception(pht('Unrecognized hook "%s"!', $hook)); } } private function findMercurialChangegroupRefUpdates() { $hg_node = getenv('HG_NODE'); if (!$hg_node) { throw new Exception(pht('Expected HG_NODE in environment!')); } // NOTE: We need to make sure this is passed to subprocesses, or they won't // be able to see new commits. Mercurial uses this as a marker to determine // whether the pending changes are visible or not. $_ENV['HG_PENDING'] = getenv('HG_PENDING'); $repository = $this->getRepository(); $futures = array(); foreach (array('old', 'new') as $key) { $futures[$key] = $repository->getLocalCommandFuture( 'heads --template %s', '{node}\1{branch}\2'); } // Wipe HG_PENDING out of the old environment so we see the pre-commit // state of the repository. $futures['old']->updateEnv('HG_PENDING', null); $futures['commits'] = $repository->getLocalCommandFuture( 'log --rev %s --template %s', hgsprintf('%s:%s', $hg_node, 'tip'), '{node}\1{branch}\2'); // Resolve all of the futures now. We don't need the 'commits' future yet, // but it simplifies the logic to just get it out of the way. foreach (Futures($futures) as $future) { $future->resolve(); } list($commit_raw) = $futures['commits']->resolvex(); $commit_map = $this->parseMercurialCommits($commit_raw); $this->mercurialCommits = $commit_map; // NOTE: `hg heads` exits with an error code and no output if the repository // has no heads. Most commonly this happens on a new repository. We know // we can run `hg` successfully since the `hg log` above didn't error, so // just ignore the error code. list($err, $old_raw) = $futures['old']->resolve(); $old_refs = $this->parseMercurialHeads($old_raw); list($err, $new_raw) = $futures['new']->resolve(); $new_refs = $this->parseMercurialHeads($new_raw); $all_refs = array_keys($old_refs + $new_refs); $ref_updates = array(); foreach ($all_refs as $ref) { $old_heads = idx($old_refs, $ref, array()); $new_heads = idx($new_refs, $ref, array()); sort($old_heads); sort($new_heads); if (!$old_heads && !$new_heads) { // This should never be possible, as it makes no sense. Explode. throw new Exception( pht( 'Mercurial repository has no new or old heads for branch "%s" '. 'after push. This makes no sense; rejecting change.', $ref)); } if ($old_heads === $new_heads) { // No changes to this branch, so skip it. continue; } $stray_heads = array(); if ($old_heads && !$new_heads) { // This is a branch deletion with "--close-branch". $head_map = array(); foreach ($old_heads as $old_head) { $head_map[$old_head] = array(self::EMPTY_HASH); } } else if (count($old_heads) > 1) { // HORRIBLE: In Mercurial, branches can have multiple heads. If the // old branch had multiple heads, we need to figure out which new // heads descend from which old heads, so we can tell whether you're // actively creating new heads (dangerous) or just working in a // repository that's already full of garbage (strongly discouraged but // not as inherently dangerous). These cases should be very uncommon. // NOTE: We're only looking for heads on the same branch. The old // tip of the branch may be the branchpoint for other branches, but that // is OK. $dfutures = array(); foreach ($old_heads as $old_head) { $dfutures[$old_head] = $repository->getLocalCommandFuture( 'log --branch %s --rev %s --template %s', $ref, hgsprintf('(descendants(%s) and head())', $old_head), '{node}\1'); } $head_map = array(); foreach (Futures($dfutures) as $future_head => $dfuture) { list($stdout) = $dfuture->resolvex(); $descendant_heads = array_filter(explode("\1", $stdout)); if ($descendant_heads) { // This old head has at least one descendant in the push. $head_map[$future_head] = $descendant_heads; } else { // This old head has no descendants, so it is being deleted. $head_map[$future_head] = array(self::EMPTY_HASH); } } // Now, find all the new stray heads this push creates, if any. These // are new heads which do not descend from the old heads. $seen = array_fuse(array_mergev($head_map)); foreach ($new_heads as $new_head) { if ($new_head === self::EMPTY_HASH) { // If a branch head is being deleted, don't insert it as an add. continue; } if (empty($seen[$new_head])) { $head_map[self::EMPTY_HASH][] = $new_head; } } } else if ($old_heads) { $head_map[head($old_heads)] = $new_heads; } else { $head_map[self::EMPTY_HASH] = $new_heads; } foreach ($head_map as $old_head => $child_heads) { foreach ($child_heads as $new_head) { if ($new_head === $old_head) { continue; } $ref_flags = 0; $dangerous = null; if ($old_head == self::EMPTY_HASH) { $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_ADD; } else { $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_APPEND; } $deletes_existing_head = ($new_head == self::EMPTY_HASH); $splits_existing_head = (count($child_heads) > 1); $creates_duplicate_head = ($old_head == self::EMPTY_HASH) && (count($head_map) > 1); if ($splits_existing_head || $creates_duplicate_head) { $readable_child_heads = array(); foreach ($child_heads as $child_head) { $readable_child_heads[] = substr($child_head, 0, 12); } $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_DANGEROUS; if ($splits_existing_head) { // We're splitting an existing head into two or more heads. // This is dangerous, and a super bad idea. Note that we're only // raising this if you're actively splitting a branch head. If a // head split in the past, we don't consider appends to it // to be dangerous. $dangerous = pht( "The change you're attempting to push splits the head of ". "branch '%s' into multiple heads: %s. This is inadvisable ". "and dangerous.", $ref, implode(', ', $readable_child_heads)); } else { // We're adding a second (or more) head to a branch. The new // head is not a descendant of any old head. $dangerous = pht( "The change you're attempting to push creates new, divergent ". "heads for the branch '%s': %s. This is inadvisable and ". "dangerous.", $ref, implode(', ', $readable_child_heads)); } } if ($deletes_existing_head) { // TODO: Somewhere in here we should be setting CHANGEFLAG_REWRITE // if we are also creating at least one other head to replace // this one. // NOTE: In Git, this is a dangerous change, but it is not dangerous // in Mercurial. Mercurial branches are version controlled, and // Mercurial does not prompt you for any special flags when pushing // a `--close-branch` commit by default. $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_DELETE; } $ref_update = $this->newPushLog() ->setRefType(PhabricatorRepositoryPushLog::REFTYPE_BRANCH) ->setRefName($ref) ->setRefOld($old_head) ->setRefNew($new_head) ->setChangeFlags($ref_flags); if ($dangerous !== null) { $ref_update->attachDangerousChangeDescription($dangerous); } $ref_updates[] = $ref_update; } } } return $ref_updates; } private function findMercurialPushKeyRefUpdates() { $key_namespace = getenv('HG_NAMESPACE'); if ($key_namespace === 'phases') { // Mercurial changes commit phases as part of normal push operations. We // just ignore these, as they don't seem to represent anything // interesting. return array(); } $key_name = getenv('HG_KEY'); $key_old = getenv('HG_OLD'); if (!strlen($key_old)) { $key_old = null; } $key_new = getenv('HG_NEW'); if (!strlen($key_new)) { $key_new = null; } if ($key_namespace !== 'bookmarks') { throw new Exception( pht( "Unknown Mercurial key namespace '%s', with key '%s' (%s -> %s). ". "Rejecting push.", $key_namespace, $key_name, coalesce($key_old, pht('null')), coalesce($key_new, pht('null')))); } if ($key_old === $key_new) { // We get a callback when the bookmark doesn't change. Just ignore this, // as it's a no-op. return array(); } $ref_flags = 0; $merge_base = null; if ($key_old === null) { $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_ADD; } else if ($key_new === null) { $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_DELETE; } else { list($merge_base_raw) = $this->getRepository()->execxLocalCommand( 'log --template %s --rev %s', '{node}', hgsprintf('ancestor(%s, %s)', $key_old, $key_new)); if (strlen(trim($merge_base_raw))) { $merge_base = trim($merge_base_raw); } if ($merge_base && ($merge_base === $key_old)) { $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_APPEND; } else { $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_REWRITE; } } $ref_update = $this->newPushLog() ->setRefType(PhabricatorRepositoryPushLog::REFTYPE_BOOKMARK) ->setRefName($key_name) ->setRefOld(coalesce($key_old, self::EMPTY_HASH)) ->setRefNew(coalesce($key_new, self::EMPTY_HASH)) ->setChangeFlags($ref_flags); return array($ref_update); } private function findMercurialContentUpdates(array $ref_updates) { $content_updates = array(); foreach ($this->mercurialCommits as $commit => $branches) { $content_updates[$commit] = $this->newPushLog() ->setRefType(PhabricatorRepositoryPushLog::REFTYPE_COMMIT) ->setRefNew($commit) ->setChangeFlags(PhabricatorRepositoryPushLog::CHANGEFLAG_ADD); } return $content_updates; } private function parseMercurialCommits($raw) { $commits_lines = explode("\2", $raw); $commits_lines = array_filter($commits_lines); $commit_map = array(); foreach ($commits_lines as $commit_line) { list($node, $branch) = explode("\1", $commit_line); $commit_map[$node] = array($branch); } return $commit_map; } private function parseMercurialHeads($raw) { $heads_map = $this->parseMercurialCommits($raw); $heads = array(); foreach ($heads_map as $commit => $branches) { foreach ($branches as $branch) { $heads[$branch][] = $commit; } } return $heads; } /* -( Subversion )--------------------------------------------------------- */ private function findSubversionRefUpdates() { // Subversion doesn't have any kind of mutable ref metadata. return array(); } private function findSubversionContentUpdates(array $ref_updates) { list($youngest) = execx( 'svnlook youngest %s', $this->subversionRepository); $ref_new = (int)$youngest + 1; $ref_flags = 0; $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_ADD; $ref_flags |= PhabricatorRepositoryPushLog::CHANGEFLAG_APPEND; $ref_content = $this->newPushLog() ->setRefType(PhabricatorRepositoryPushLog::REFTYPE_COMMIT) ->setRefNew($ref_new) ->setChangeFlags($ref_flags); return array($ref_content); } /* -( Internals )---------------------------------------------------------- */ private function newPushLog() { // NOTE: We generate PHIDs up front so the Herald transcripts can pick them // up. $phid = id(new PhabricatorRepositoryPushLog())->generatePHID(); return PhabricatorRepositoryPushLog::initializeNewLog($this->getViewer()) ->setPHID($phid) ->setRepositoryPHID($this->getRepository()->getPHID()) ->attachRepository($this->getRepository()) ->setEpoch(time()); } private function newPushEvent() { $viewer = $this->getViewer(); return PhabricatorRepositoryPushEvent::initializeNewEvent($viewer) ->setRepositoryPHID($this->getRepository()->getPHID()) ->setRemoteAddress($this->getRemoteAddressForLog()) ->setRemoteProtocol($this->getRemoteProtocol()) ->setEpoch(time()); } public function loadChangesetsForCommit($identifier) { $byte_limit = HeraldCommitAdapter::getEnormousByteLimit(); $time_limit = HeraldCommitAdapter::getEnormousTimeLimit(); $vcs = $this->getRepository()->getVersionControlSystem(); switch ($vcs) { case PhabricatorRepositoryType::REPOSITORY_TYPE_GIT: case PhabricatorRepositoryType::REPOSITORY_TYPE_MERCURIAL: // For git and hg, we can use normal commands. $drequest = DiffusionRequest::newFromDictionary( array( 'repository' => $this->getRepository(), 'user' => $this->getViewer(), 'commit' => $identifier, )); $raw_diff = DiffusionRawDiffQuery::newFromDiffusionRequest($drequest) ->setTimeout($time_limit) ->setByteLimit($byte_limit) ->setLinesOfContext(0) ->loadRawDiff(); break; case PhabricatorRepositoryType::REPOSITORY_TYPE_SVN: // TODO: This diff has 3 lines of context, which produces slightly // incorrect "added file content" and "removed file content" results. // This may also choke on binaries, but "svnlook diff" does not support // the "--diff-cmd" flag. // For subversion, we need to use `svnlook`. $future = new ExecFuture( 'svnlook diff -t %s %s', $this->subversionTransaction, $this->subversionRepository); $future->setTimeout($time_limit); $future->setStdoutSizeLimit($byte_limit); $future->setStderrSizeLimit($byte_limit); list($raw_diff) = $future->resolvex(); break; default: throw new Exception(pht("Unknown VCS '%s!'", $vcs)); } if (strlen($raw_diff) >= $byte_limit) { throw new Exception( pht( 'The raw text of this change is enormous (larger than %d '. 'bytes). Herald can not process it.', $byte_limit)); } if (!strlen($raw_diff)) { // If the commit is actually empty, just return no changesets. return array(); } $parser = new ArcanistDiffParser(); $changes = $parser->parseDiff($raw_diff); $diff = DifferentialDiff::newFromRawChanges($changes); return $diff->getChangesets(); } public function loadCommitRefForCommit($identifier) { $repository = $this->getRepository(); $vcs = $repository->getVersionControlSystem(); switch ($vcs) { case PhabricatorRepositoryType::REPOSITORY_TYPE_GIT: case PhabricatorRepositoryType::REPOSITORY_TYPE_MERCURIAL: return id(new DiffusionLowLevelCommitQuery()) ->setRepository($repository) ->withIdentifier($identifier) ->execute(); case PhabricatorRepositoryType::REPOSITORY_TYPE_SVN: // For subversion, we need to use `svnlook`. list($message) = execx( 'svnlook log -t %s %s', $this->subversionTransaction, $this->subversionRepository); return id(new DiffusionCommitRef()) ->setMessage($message); break; default: throw new Exception(pht("Unknown VCS '%s!'", $vcs)); } } public function loadBranches($identifier) { $repository = $this->getRepository(); $vcs = $repository->getVersionControlSystem(); switch ($vcs) { case PhabricatorRepositoryType::REPOSITORY_TYPE_GIT: return idx($this->gitCommits, $identifier, array()); case PhabricatorRepositoryType::REPOSITORY_TYPE_MERCURIAL: // NOTE: This will be "the branch the commit was made to", not // "a list of all branch heads which descend from the commit". // This is consistent with Mercurial, but possibly confusing. return idx($this->mercurialCommits, $identifier, array()); case PhabricatorRepositoryType::REPOSITORY_TYPE_SVN: // Subversion doesn't have branches. return array(); } } private function loadCommitInfoForWorker(array $all_updates) { $type_commit = PhabricatorRepositoryPushLog::REFTYPE_COMMIT; $map = array(); foreach ($all_updates as $update) { if ($update->getRefType() != $type_commit) { continue; } $map[$update->getRefNew()] = array(); } foreach ($map as $identifier => $info) { $ref = $this->loadCommitRefForCommit($identifier); $map[$identifier] += array( 'summary' => $ref->getSummary(), 'branches' => $this->loadBranches($identifier), ); } return $map; } } diff --git a/src/applications/metamta/management/PhabricatorMailManagementResendWorkflow.php b/src/applications/metamta/management/PhabricatorMailManagementResendWorkflow.php index 952ec44f33..36d462e927 100644 --- a/src/applications/metamta/management/PhabricatorMailManagementResendWorkflow.php +++ b/src/applications/metamta/management/PhabricatorMailManagementResendWorkflow.php @@ -1,60 +1,61 @@ setName('resend') ->setSynopsis('Send mail again.') ->setExamples( '**resend** --id 1 --id 2') ->setArguments( array( array( 'name' => 'id', 'param' => 'id', 'help' => 'Send mail with a given ID again.', 'repeat' => true, ), )); } public function execute(PhutilArgumentParser $args) { $console = PhutilConsole::getConsole(); $ids = $args->getArg('id'); if (!$ids) { throw new PhutilArgumentUsageException( "Use the '--id' flag to specify one or more messages to resend."); } $messages = id(new PhabricatorMetaMTAMail())->loadAllWhere( 'id IN (%Ld)', $ids); if ($ids) { $ids = array_fuse($ids); $missing = array_diff_key($ids, $messages); if ($missing) { throw new PhutilArgumentUsageException( 'Some specified messages do not exist: '. implode(', ', array_keys($missing))); } } foreach ($messages as $message) { $message->setStatus(PhabricatorMetaMTAMail::STATUS_QUEUE); $message->save(); $mailer_task = PhabricatorWorker::scheduleTask( 'PhabricatorMetaMTAWorker', - $message->getID()); + $message->getID(), + PhabricatorWorker::PRIORITY_ALERTS); $console->writeOut( "Queued message #%d for resend.\n", $message->getID()); } } } diff --git a/src/applications/metamta/storage/PhabricatorMetaMTAMail.php b/src/applications/metamta/storage/PhabricatorMetaMTAMail.php index ded773c79a..545ad9c822 100644 --- a/src/applications/metamta/storage/PhabricatorMetaMTAMail.php +++ b/src/applications/metamta/storage/PhabricatorMetaMTAMail.php @@ -1,889 +1,890 @@ status = self::STATUS_QUEUE; $this->parameters = array(); parent::__construct(); } public function getConfiguration() { return array( self::CONFIG_SERIALIZATION => array( 'parameters' => self::SERIALIZATION_JSON, ), ) + parent::getConfiguration(); } protected function setParam($param, $value) { $this->parameters[$param] = $value; return $this; } protected function getParam($param, $default = null) { return idx($this->parameters, $param, $default); } /** * Set tags (@{class:MetaMTANotificationType} constants) which identify the * content of this mail in a general way. These tags are used to allow users * to opt out of receiving certain types of mail, like updates when a task's * projects change. * * @param list List of @{class:MetaMTANotificationType} constants. * @return this */ public function setMailTags(array $tags) { $this->setParam('mailtags', array_unique($tags)); return $this; } public function getMailTags() { return $this->getParam('mailtags', array()); } /** * In Gmail, conversations will be broken if you reply to a thread and the * server sends back a response without referencing your Message-ID, even if * it references a Message-ID earlier in the thread. To avoid this, use the * parent email's message ID explicitly if it's available. This overwrites the * "In-Reply-To" and "References" headers we would otherwise generate. This * needs to be set whenever an action is triggered by an email message. See * T251 for more details. * * @param string The "Message-ID" of the email which precedes this one. * @return this */ public function setParentMessageID($id) { $this->setParam('parent-message-id', $id); return $this; } public function getParentMessageID() { return $this->getParam('parent-message-id'); } public function getSubject() { return $this->getParam('subject'); } public function addTos(array $phids) { $phids = array_unique($phids); $this->setParam('to', $phids); return $this; } public function addRawTos(array $raw_email) { $this->setParam('raw-to', $raw_email); return $this; } public function addCCs(array $phids) { $phids = array_unique($phids); $this->setParam('cc', $phids); return $this; } public function setExcludeMailRecipientPHIDs($exclude) { $this->excludePHIDs = $exclude; return $this; } private function getExcludeMailRecipientPHIDs() { return $this->excludePHIDs; } public function getOverrideNoSelfMailPreference() { return $this->overrideNoSelfMail; } public function setOverrideNoSelfMailPreference($override) { $this->overrideNoSelfMail = $override; return $this; } public function getTranslation(array $objects) { $default_translation = PhabricatorEnv::getEnvConfig('translation.provider'); $return = null; $recipients = array_merge( idx($this->parameters, 'to', array()), idx($this->parameters, 'cc', array())); foreach (array_select_keys($objects, $recipients) as $object) { $translation = null; if ($object instanceof PhabricatorUser) { $translation = $object->getTranslation(); } if (!$translation) { $translation = $default_translation; } if ($return && $translation != $return) { return $default_translation; } $return = $translation; } if (!$return) { $return = $default_translation; } return $return; } public function addPHIDHeaders($name, array $phids) { foreach ($phids as $phid) { $this->addHeader($name, '<'.$phid.'>'); } return $this; } public function addHeader($name, $value) { $this->parameters['headers'][] = array($name, $value); return $this; } public function addAttachment(PhabricatorMetaMTAAttachment $attachment) { $this->parameters['attachments'][] = $attachment->toDictionary(); return $this; } public function getAttachments() { $dicts = $this->getParam('attachments'); $result = array(); foreach ($dicts as $dict) { $result[] = PhabricatorMetaMTAAttachment::newFromDictionary($dict); } return $result; } public function setAttachments(array $attachments) { assert_instances_of($attachments, 'PhabricatorMetaMTAAttachment'); $this->setParam('attachments', mpull($attachments, 'toDictionary')); return $this; } public function setFrom($from) { $this->setParam('from', $from); return $this; } public function setReplyTo($reply_to) { $this->setParam('reply-to', $reply_to); return $this; } public function setSubject($subject) { $this->setParam('subject', $subject); return $this; } public function setSubjectPrefix($prefix) { $this->setParam('subject-prefix', $prefix); return $this; } public function setVarySubjectPrefix($prefix) { $this->setParam('vary-subject-prefix', $prefix); return $this; } public function setBody($body) { $this->setParam('body', $body); return $this; } public function getBody() { return $this->getParam('body'); } public function setIsHTML($html) { $this->setParam('is-html', $html); return $this; } public function setIsErrorEmail($is_error) { $this->setParam('is-error', $is_error); return $this; } public function getIsErrorEmail() { return $this->getParam('is-error', false); } public function getToPHIDs() { return $this->getParam('to', array()); } public function getRawToAddresses() { return $this->getParam('raw-to', array()); } public function getCcPHIDs() { return $this->getParam('cc', array()); } /** * Flag that this is an auto-generated bulk message and should have bulk * headers added to it if appropriate. Broadly, this means some flavor of * "Precedence: bulk" or similar, but is implementation and configuration * dependent. * * @param bool True if the mail is automated bulk mail. * @return this */ public function setIsBulk($is_bulk) { $this->setParam('is-bulk', $is_bulk); return $this; } /** * Use this method to set an ID used for message threading. MetaMTA will * set appropriate headers (Message-ID, In-Reply-To, References and * Thread-Index) based on the capabilities of the underlying mailer. * * @param string Unique identifier, appropriate for use in a Message-ID, * In-Reply-To or References headers. * @param bool If true, indicates this is the first message in the thread. * @return this */ public function setThreadID($thread_id, $is_first_message = false) { $this->setParam('thread-id', $thread_id); $this->setParam('is-first-message', $is_first_message); return $this; } /** * Save a newly created mail to the database. The mail will eventually be * delivered by the MetaMTA daemon. * * @return this */ public function saveAndSend() { return $this->save(); } public function save() { if ($this->getID()) { return parent::save(); } // NOTE: When mail is sent from CLI scripts that run tasks in-process, we // may re-enter this method from within scheduleTask(). The implementation // is intended to avoid anything awkward if we end up reentering this // method. $this->openTransaction(); // Save to generate a task ID. $result = parent::save(); // Queue a task to send this mail. $mailer_task = PhabricatorWorker::scheduleTask( 'PhabricatorMetaMTAWorker', - $this->getID()); + $this->getID(), + PhabricatorWorker::PRIORITY_ALERTS); $this->saveTransaction(); return $result; } public function buildDefaultMailer() { return PhabricatorEnv::newObjectFromConfig('metamta.mail-adapter'); } /** * Attempt to deliver an email immediately, in this process. * * @param bool Try to deliver this email even if it has already been * delivered or is in backoff after a failed delivery attempt. * @param PhabricatorMailImplementationAdapter Use a specific mail adapter, * instead of the default. * * @return void */ public function sendNow( $force_send = false, PhabricatorMailImplementationAdapter $mailer = null) { if ($mailer === null) { $mailer = $this->buildDefaultMailer(); } if (!$force_send) { if ($this->getStatus() != self::STATUS_QUEUE) { throw new Exception('Trying to send an already-sent mail!'); } } try { $params = $this->parameters; $actors = $this->loadAllActors(); $deliverable_actors = $this->filterDeliverableActors($actors); $default_from = PhabricatorEnv::getEnvConfig('metamta.default-address'); if (empty($params['from'])) { $mailer->setFrom($default_from); } $is_first = idx($params, 'is-first-message'); unset($params['is-first-message']); $is_threaded = (bool)idx($params, 'thread-id'); $reply_to_name = idx($params, 'reply-to-name', ''); unset($params['reply-to-name']); $add_cc = array(); $add_to = array(); foreach ($params as $key => $value) { switch ($key) { case 'from': $from = $value; $actor_email = null; $actor_name = null; $actor = idx($actors, $from); if ($actor) { $actor_email = $actor->getEmailAddress(); $actor_name = $actor->getName(); } $can_send_as_user = $actor_email && PhabricatorEnv::getEnvConfig('metamta.can-send-as-user'); if ($can_send_as_user) { $mailer->setFrom($actor_email, $actor_name); } else { $from_email = coalesce($actor_email, $default_from); $from_name = coalesce($actor_name, pht('Phabricator')); if (empty($params['reply-to'])) { $params['reply-to'] = $from_email; $params['reply-to-name'] = $from_name; } $mailer->setFrom($default_from, $from_name); } break; case 'reply-to': $mailer->addReplyTo($value, $reply_to_name); break; case 'to': $to_phids = $this->expandRecipients($value); $to_actors = array_select_keys($deliverable_actors, $to_phids); $add_to = array_merge( $add_to, mpull($to_actors, 'getEmailAddress')); break; case 'raw-to': $add_to = array_merge($add_to, $value); break; case 'cc': $cc_phids = $this->expandRecipients($value); $cc_actors = array_select_keys($deliverable_actors, $cc_phids); $add_cc = array_merge( $add_cc, mpull($cc_actors, 'getEmailAddress')); break; case 'headers': foreach ($value as $pair) { list($header_key, $header_value) = $pair; // NOTE: If we have \n in a header, SES rejects the email. $header_value = str_replace("\n", ' ', $header_value); $mailer->addHeader($header_key, $header_value); } break; case 'attachments': $value = $this->getAttachments(); foreach ($value as $attachment) { $mailer->addAttachment( $attachment->getData(), $attachment->getFilename(), $attachment->getMimeType()); } break; case 'body': $max = PhabricatorEnv::getEnvConfig('metamta.email-body-limit'); if (strlen($value) > $max) { $value = phutil_utf8_shorten($value, $max); $value .= "\n"; $value .= pht('(This email was truncated at %d bytes.)', $max); } $mailer->setBody($value); break; case 'subject': // Only try to use preferences if everything is multiplexed, so we // get consistent behavior. $use_prefs = self::shouldMultiplexAllMail(); $prefs = null; if ($use_prefs) { // If multiplexing is enabled, some recipients will be in "Cc" // rather than "To". We'll move them to "To" later (or supply a // dummy "To") but need to look for the recipient in either the // "To" or "Cc" fields here. $target_phid = head(idx($params, 'to', array())); if (!$target_phid) { $target_phid = head(idx($params, 'cc', array())); } if ($target_phid) { $user = id(new PhabricatorUser())->loadOneWhere( 'phid = %s', $target_phid); if ($user) { $prefs = $user->loadPreferences(); } } } $subject = array(); if ($is_threaded) { $add_re = PhabricatorEnv::getEnvConfig('metamta.re-prefix'); if ($prefs) { $add_re = $prefs->getPreference( PhabricatorUserPreferences::PREFERENCE_RE_PREFIX, $add_re); } if ($add_re) { $subject[] = 'Re:'; } } $subject[] = trim(idx($params, 'subject-prefix')); $vary_prefix = idx($params, 'vary-subject-prefix'); if ($vary_prefix != '') { $use_subject = PhabricatorEnv::getEnvConfig( 'metamta.vary-subjects'); if ($prefs) { $use_subject = $prefs->getPreference( PhabricatorUserPreferences::PREFERENCE_VARY_SUBJECT, $use_subject); } if ($use_subject) { $subject[] = $vary_prefix; } } $subject[] = $value; $mailer->setSubject(implode(' ', array_filter($subject))); break; case 'is-html': if ($value) { $mailer->setIsHTML(true); } break; case 'is-bulk': if ($value) { if (PhabricatorEnv::getEnvConfig('metamta.precedence-bulk')) { $mailer->addHeader('Precedence', 'bulk'); } } break; case 'thread-id': // NOTE: Gmail freaks out about In-Reply-To and References which // aren't in the form ""; this is also required // by RFC 2822, although some clients are more liberal in what they // accept. $domain = PhabricatorEnv::getEnvConfig('metamta.domain'); $value = '<'.$value.'@'.$domain.'>'; if ($is_first && $mailer->supportsMessageIDHeader()) { $mailer->addHeader('Message-ID', $value); } else { $in_reply_to = $value; $references = array($value); $parent_id = $this->getParentMessageID(); if ($parent_id) { $in_reply_to = $parent_id; // By RFC 2822, the most immediate parent should appear last // in the "References" header, so this order is intentional. $references[] = $parent_id; } $references = implode(' ', $references); $mailer->addHeader('In-Reply-To', $in_reply_to); $mailer->addHeader('References', $references); } $thread_index = $this->generateThreadIndex($value, $is_first); $mailer->addHeader('Thread-Index', $thread_index); break; case 'mailtags': // Handled below. break; case 'subject-prefix': case 'vary-subject-prefix': // Handled above. break; default: // Just discard. } } if (!$add_to && !$add_cc) { $this->setStatus(self::STATUS_VOID); $this->setMessage( 'Message has no valid recipients: all To/Cc are disabled, invalid, '. 'or configured not to receive this mail.'); return $this->save(); } if ($this->getIsErrorEmail()) { $all_recipients = array_merge($add_to, $add_cc); if ($this->shouldRateLimitMail($all_recipients)) { $this->setStatus(self::STATUS_VOID); $this->setMessage( pht( 'This is an error email, but one or more recipients have '. 'exceeded the error email rate limit. Declining to deliver '. 'message.')); return $this->save(); } } $mailer->addHeader('X-Phabricator-Sent-This-Message', 'Yes'); $mailer->addHeader('X-Mail-Transport-Agent', 'MetaMTA'); // Some clients respect this to suppress OOF and other auto-responses. $mailer->addHeader('X-Auto-Response-Suppress', 'All'); // If the message has mailtags, filter out any recipients who don't want // to receive this type of mail. $mailtags = $this->getParam('mailtags'); if ($mailtags) { $tag_header = array(); foreach ($mailtags as $mailtag) { $tag_header[] = '<'.$mailtag.'>'; } $tag_header = implode(', ', $tag_header); $mailer->addHeader('X-Phabricator-Mail-Tags', $tag_header); } // Some mailers require a valid "To:" in order to deliver mail. If we // don't have any "To:", try to fill it in with a placeholder "To:". // If that also fails, move the "Cc:" line to "To:". if (!$add_to) { $placeholder_key = 'metamta.placeholder-to-recipient'; $placeholder = PhabricatorEnv::getEnvConfig($placeholder_key); if ($placeholder !== null) { $add_to = array($placeholder); } else { $add_to = $add_cc; $add_cc = array(); } } $add_to = array_unique($add_to); $add_cc = array_unique($add_cc); $mailer->addTos($add_to); if ($add_cc) { $mailer->addCCs($add_cc); } } catch (Exception $ex) { $this ->setStatus(self::STATUS_FAIL) ->setMessage($ex->getMessage()) ->save(); throw $ex; } try { $ok = $mailer->send(); if (!$ok) { // TODO: At some point, we should clean this up and make all mailers // throw. throw new Exception( pht('Mail adapter encountered an unexpected, unspecified failure.')); } $this->setStatus(self::STATUS_SENT); $this->save(); return $this; } catch (PhabricatorMetaMTAPermanentFailureException $ex) { $this ->setStatus(self::STATUS_FAIL) ->setMessage($ex->getMessage()) ->save(); throw $ex; } catch (Exception $ex) { $this ->setMessage($ex->getMessage()."\n".$ex->getTraceAsString()) ->save(); throw $ex; } } public static function getReadableStatus($status_code) { static $readable = array( self::STATUS_QUEUE => 'Queued for Delivery', self::STATUS_FAIL => 'Delivery Failed', self::STATUS_SENT => 'Sent', self::STATUS_VOID => 'Void', ); $status_code = coalesce($status_code, '?'); return idx($readable, $status_code, $status_code); } private function generateThreadIndex($seed, $is_first_mail) { // When threading, Outlook ignores the 'References' and 'In-Reply-To' // headers that most clients use. Instead, it uses a custom 'Thread-Index' // header. The format of this header is something like this (from // camel-exchange-folder.c in Evolution Exchange): /* A new post to a folder gets a 27-byte-long thread index. (The value * is apparently unique but meaningless.) Each reply to a post gets a * 32-byte-long thread index whose first 27 bytes are the same as the * parent's thread index. Each reply to any of those gets a * 37-byte-long thread index, etc. The Thread-Index header contains a * base64 representation of this value. */ // The specific implementation uses a 27-byte header for the first email // a recipient receives, and a random 5-byte suffix (32 bytes total) // thereafter. This means that all the replies are (incorrectly) siblings, // but it would be very difficult to keep track of the entire tree and this // gets us reasonable client behavior. $base = substr(md5($seed), 0, 27); if (!$is_first_mail) { // Not totally sure, but it seems like outlook orders replies by // thread-index rather than timestamp, so to get these to show up in the // right order we use the time as the last 4 bytes. $base .= ' '.pack('N', time()); } return base64_encode($base); } public static function shouldMultiplexAllMail() { return PhabricatorEnv::getEnvConfig('metamta.one-mail-per-recipient'); } /* -( Managing Recipients )------------------------------------------------ */ /** * Get all of the recipients for this mail, after preference filters are * applied. This list has all objects to whom delivery will be attempted. * * @return list A list of all recipients to whom delivery will be * attempted. * @task recipients */ public function buildRecipientList() { $actors = $this->loadActors( array_merge( $this->getToPHIDs(), $this->getCcPHIDs())); $actors = $this->filterDeliverableActors($actors); return mpull($actors, 'getPHID'); } public function loadAllActors() { $actor_phids = array_merge( array($this->getParam('from')), $this->getToPHIDs(), $this->getCcPHIDs()); $this->loadRecipientExpansions($actor_phids); $actor_phids = $this->expandRecipients($actor_phids); return $this->loadActors($actor_phids); } private function loadRecipientExpansions(array $phids) { $expansions = id(new PhabricatorMetaMTAMemberQuery()) ->setViewer(PhabricatorUser::getOmnipotentUser()) ->withPHIDs($phids) ->execute(); $this->recipientExpansionMap = $expansions; return $this; } /** * Expand a list of recipient PHIDs (possibly including aggregate recipients * like projects) into a deaggregated list of individual recipient PHIDs. * For example, this will expand project PHIDs into a list of the project's * members. * * @param list List of recipient PHIDs, possibly including aggregate * recipients. * @return list Deaggregated list of mailable recipients. */ private function expandRecipients(array $phids) { if ($this->recipientExpansionMap === null) { throw new Exception( pht( 'Call loadRecipientExpansions() before expandRecipients()!')); } $results = array(); foreach ($phids as $phid) { if (!isset($this->recipientExpansionMap[$phid])) { $results[$phid] = $phid; } else { foreach ($this->recipientExpansionMap[$phid] as $recipient_phid) { $results[$recipient_phid] = $recipient_phid; } } } return array_keys($results); } private function filterDeliverableActors(array $actors) { assert_instances_of($actors, 'PhabricatorMetaMTAActor'); $deliverable_actors = array(); foreach ($actors as $phid => $actor) { if ($actor->isDeliverable()) { $deliverable_actors[$phid] = $actor; } } return $deliverable_actors; } private function loadActors(array $actor_phids) { $actor_phids = array_filter($actor_phids); $viewer = PhabricatorUser::getOmnipotentUser(); $actors = id(new PhabricatorMetaMTAActorQuery()) ->setViewer($viewer) ->withPHIDs($actor_phids) ->execute(); if (!$actors) { return array(); } // Exclude explicit recipients. foreach ($this->getExcludeMailRecipientPHIDs() as $phid) { $actor = idx($actors, $phid); if (!$actor) { continue; } $actor->setUndeliverable( pht( 'This message is a response to another email message, and this '. 'recipient received the original email message, so we are not '. 'sending them this substantially similar message (for example, '. 'the sender used "Reply All" instead of "Reply" in response to '. 'mail from Phabricator).')); } // Exclude the actor if their preferences are set. $from_phid = $this->getParam('from'); $from_actor = idx($actors, $from_phid); if ($from_actor) { $from_user = id(new PhabricatorPeopleQuery()) ->setViewer($viewer) ->withPHIDs(array($from_phid)) ->execute(); $from_user = head($from_user); if ($from_user && !$this->getOverrideNoSelfMailPreference()) { $pref_key = PhabricatorUserPreferences::PREFERENCE_NO_SELF_MAIL; $exclude_self = $from_user ->loadPreferences() ->getPreference($pref_key); if ($exclude_self) { $from_actor->setUndeliverable( pht( 'This recipient is the user whose actions caused delivery of '. 'this message, but they have set preferences so they do not '. 'receive mail about their own actions (Settings > Email '. 'Preferences > Self Actions).')); } } } // Exclude all recipients who have set preferences to not receive this type // of email (for example, a user who says they don't want emails about task // CC changes). $tags = $this->getParam('mailtags'); if ($tags) { $all_prefs = id(new PhabricatorUserPreferences())->loadAllWhere( 'userPHID in (%Ls)', $actor_phids); $all_prefs = mpull($all_prefs, null, 'getUserPHID'); foreach ($all_prefs as $phid => $prefs) { $user_mailtags = $prefs->getPreference( PhabricatorUserPreferences::PREFERENCE_MAILTAGS, array()); // The user must have elected to receive mail for at least one // of the mailtags. $send = false; foreach ($tags as $tag) { if (idx($user_mailtags, $tag, true)) { $send = true; break; } } if (!$send) { $actors[$phid]->setUndeliverable( pht( 'This mail has tags which control which users receive it, and '. 'this recipient has not elected to receive mail with any of '. 'the tags on this message (Settings > Email Preferences).')); } } } return $actors; } private function shouldRateLimitMail(array $all_recipients) { try { PhabricatorSystemActionEngine::willTakeAction( $all_recipients, new PhabricatorMetaMTAErrorMailAction(), 1); return false; } catch (PhabricatorSystemActionRateLimitException $ex) { return true; } } } diff --git a/src/applications/search/index/PhabricatorSearchIndexer.php b/src/applications/search/index/PhabricatorSearchIndexer.php index 8d59a07841..c9143f5cce 100644 --- a/src/applications/search/index/PhabricatorSearchIndexer.php +++ b/src/applications/search/index/PhabricatorSearchIndexer.php @@ -1,28 +1,29 @@ $phid, - )); + ), + PhabricatorWorker::PRIORITY_IMPORT); } public function indexDocumentByPHID($phid) { $indexers = id(new PhutilSymbolLoader()) ->setAncestorClass('PhabricatorSearchDocumentIndexer') ->loadObjects(); foreach ($indexers as $indexer) { if ($indexer->shouldIndexDocumentByPHID($phid)) { $indexer->indexDocumentByPHID($phid); break; } } return $this; } } diff --git a/src/infrastructure/daemon/workers/PhabricatorWorker.php b/src/infrastructure/daemon/workers/PhabricatorWorker.php index 074d580a52..699afdfc50 100644 --- a/src/infrastructure/daemon/workers/PhabricatorWorker.php +++ b/src/infrastructure/daemon/workers/PhabricatorWorker.php @@ -1,239 +1,251 @@ data = $data; } final protected function getTaskData() { return $this->data; } final public function executeTask() { $this->doWork(); } - final public static function scheduleTask($task_class, $data) { + final public static function scheduleTask( + $task_class, + $data, + $priority = null) { + + if ($priority === null) { + $priority = self::PRIORITY_DEFAULT; + } + $task = id(new PhabricatorWorkerActiveTask()) ->setTaskClass($task_class) - ->setData($data); + ->setData($data) + ->setPriority($priority); if (self::$runAllTasksInProcess) { // Do the work in-process. $worker = newv($task_class, array($data)); while (true) { try { $worker->doWork(); foreach ($worker->getQueuedTasks() as $queued_task) { - list($queued_class, $queued_data) = $queued_task; - self::scheduleTask($queued_class, $queued_data); + list($queued_class, $queued_data, $queued_priority) = $queued_task; + self::scheduleTask($queued_class, $queued_data, $queued_priority); } break; } catch (PhabricatorWorkerYieldException $ex) { phlog( pht( 'In-process task "%s" yielded for %s seconds, sleeping...', $task_class, $ex->getDuration())); - sleep($ex->getDuration()); } } // Now, save a task row and immediately archive it so we can return an // object with a valid ID. $task->openTransaction(); $task->save(); $archived = $task->archiveTask( PhabricatorWorkerArchiveTask::RESULT_SUCCESS, 0); $task->saveTransaction(); return $archived; } else { $task->save(); return $task; } } /** * Wait for tasks to complete. If tasks are not leased by other workers, they * will be executed in this process while waiting. * * @param list List of queued task IDs to wait for. * @return void */ final public static function waitForTasks(array $task_ids) { if (!$task_ids) { return; } $task_table = new PhabricatorWorkerActiveTask(); $waiting = array_fuse($task_ids); while ($waiting) { $conn_w = $task_table->establishConnection('w'); // Check if any of the tasks we're waiting on are still queued. If they // are not, we're done waiting. $row = queryfx_one( $conn_w, 'SELECT COUNT(*) N FROM %T WHERE id IN (%Ld)', $task_table->getTableName(), $waiting); if (!$row['N']) { // Nothing is queued anymore. Stop waiting. break; } $tasks = id(new PhabricatorWorkerLeaseQuery()) ->withIDs($waiting) ->setLimit(1) ->execute(); if (!$tasks) { // We were not successful in leasing anything. Sleep for a bit and // see if we have better luck later. sleep(1); continue; } $task = head($tasks)->executeTask(); $ex = $task->getExecutionException(); if ($ex) { throw $ex; } } $tasks = id(new PhabricatorWorkerArchiveTask())->loadAllWhere( 'id IN (%Ld)', $task_ids); foreach ($tasks as $task) { if ($task->getResult() != PhabricatorWorkerArchiveTask::RESULT_SUCCESS) { - throw new Exception( - pht('Task %d failed!', $task->getID())); + throw new Exception(pht('Task %d failed!', $task->getID())); } } } public function renderForDisplay(PhabricatorUser $viewer) { $data = PhutilReadableSerializer::printableValue($this->data); return phutil_tag('pre', array(), $data); } /** * Set this flag to execute scheduled tasks synchronously, in the same * process. This is useful for debugging, and otherwise dramatically worse * in every way imaginable. */ public static function setRunAllTasksInProcess($all) { self::$runAllTasksInProcess = $all; } - protected function log($pattern /* $args */) { + final protected function log($pattern /* , ... */) { $console = PhutilConsole::getConsole(); $argv = func_get_args(); call_user_func_array(array($console, 'writeLog'), $argv); return $this; } /** * Queue a task to be executed after this one succeeds. * * The followup task will be queued only if this task completes cleanly. * - * @param string Task class to queue. - * @param array Data for the followup task. + * @param string Task class to queue. + * @param array Data for the followup task. + * @param int|null Priority for the followup task. * @return this */ - protected function queueTask($class, array $data) { - $this->queuedTasks[] = array($class, $data); + final protected function queueTask($class, array $data, $priority = null) { + $this->queuedTasks[] = array($class, $data, $priority); return $this; } /** * Get tasks queued as followups by @{method:queueTask}. * - * @return list> Queued task specifications. + * @return list> Queued task specifications. */ - public function getQueuedTasks() { + final public function getQueuedTasks() { return $this->queuedTasks; } } diff --git a/src/infrastructure/daemon/workers/__tests__/PhabricatorTestWorker.php b/src/infrastructure/daemon/workers/__tests__/PhabricatorTestWorker.php index f871a00fb0..1b4788a513 100644 --- a/src/infrastructure/daemon/workers/__tests__/PhabricatorTestWorker.php +++ b/src/infrastructure/daemon/workers/__tests__/PhabricatorTestWorker.php @@ -1,39 +1,38 @@ getTaskData(), 'getRequiredLeaseTime', parent::getRequiredLeaseTime()); } public function getMaximumRetryCount() { return idx( $this->getTaskData(), 'getMaximumRetryCount', parent::getMaximumRetryCount()); } public function getWaitBeforeRetry(PhabricatorWorkerTask $task) { return idx( $this->getTaskData(), 'getWaitBeforeRetry', parent::getWaitBeforeRetry($task)); } protected function doWork() { switch (idx($this->getTaskData(), 'doWork')) { case 'fail-temporary': - throw new Exception( - 'Temporary failure!'); + throw new Exception('Temporary failure!'); case 'fail-permanent': throw new PhabricatorWorkerPermanentFailureException( 'Permanent failure!'); default: return; } } } diff --git a/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php b/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php index 90950c613a..47e229ec62 100644 --- a/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php +++ b/src/infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php @@ -1,193 +1,212 @@ true, ); } public function testLeaseTask() { - // Leasing should work. - $task = $this->scheduleTask(); - - $this->expectNextLease($task); + $this->expectNextLease($task, 'Leasing should work.'); } public function testMultipleLease() { - // We should not be able to lease a task multiple times. - $task = $this->scheduleTask(); $this->expectNextLease($task); - $this->expectNextLease(null); + $this->expectNextLease( + null, + 'We should not be able to lease a task multiple times.'); } public function testOldestFirst() { - // Older tasks should lease first, all else being equal. - $task1 = $this->scheduleTask(); $task2 = $this->scheduleTask(); - $this->expectNextLease($task1); + $this->expectNextLease( + $task1, + 'Older tasks should lease first, all else being equal.'); $this->expectNextLease($task2); } public function testNewBeforeLeased() { - // Tasks not previously leased should lease before previously leased tasks. - $task1 = $this->scheduleTask(); $task2 = $this->scheduleTask(); $task1->setLeaseOwner('test'); $task1->setLeaseExpires(time() - 100000); $task1->forceSaveWithoutLease(); - $this->expectNextLease($task2); + $this->expectNextLease( + $task2, + 'Tasks not previously leased should lease before previously '. + 'leased tasks.'); $this->expectNextLease($task1); } public function testExecuteTask() { $task = $this->scheduleAndExecuteTask(); $this->assertEqual(true, $task->isArchived()); $this->assertEqual( PhabricatorWorkerArchiveTask::RESULT_SUCCESS, $task->getResult()); } public function testPermanentTaskFailure() { $task = $this->scheduleAndExecuteTask( array( 'doWork' => 'fail-permanent', )); $this->assertEqual(true, $task->isArchived()); $this->assertEqual( PhabricatorWorkerArchiveTask::RESULT_FAILURE, $task->getResult()); } public function testTemporaryTaskFailure() { $task = $this->scheduleAndExecuteTask( array( 'doWork' => 'fail-temporary', )); $this->assertFalse($task->isArchived()); $this->assertTrue($task->getExecutionException() instanceof Exception); } public function testTooManyTaskFailures() { // Expect temporary failures, then a permanent failure. $task = $this->scheduleAndExecuteTask( array( 'doWork' => 'fail-temporary', 'getMaximumRetryCount' => 3, 'getWaitBeforeRetry' => -60, )); // Temporary... $this->assertFalse($task->isArchived()); $this->assertTrue($task->getExecutionException() instanceof Exception); $this->assertEqual(1, $task->getFailureCount()); // Temporary... $task = $this->expectNextLease($task); $task = $task->executeTask(); $this->assertFalse($task->isArchived()); $this->assertTrue($task->getExecutionException() instanceof Exception); $this->assertEqual(2, $task->getFailureCount()); // Temporary... $task = $this->expectNextLease($task); $task = $task->executeTask(); $this->assertFalse($task->isArchived()); $this->assertTrue($task->getExecutionException() instanceof Exception); $this->assertEqual(3, $task->getFailureCount()); // Temporary... $task = $this->expectNextLease($task); $task = $task->executeTask(); $this->assertFalse($task->isArchived()); $this->assertTrue($task->getExecutionException() instanceof Exception); $this->assertEqual(4, $task->getFailureCount()); // Permanent. $task = $this->expectNextLease($task); $task = $task->executeTask(); $this->assertTrue($task->isArchived()); $this->assertEqual( PhabricatorWorkerArchiveTask::RESULT_FAILURE, $task->getResult()); } public function testWaitBeforeRetry() { $task = $this->scheduleTask( array( 'doWork' => 'fail-temporary', 'getWaitBeforeRetry' => 1000000, )); $this->expectNextLease($task)->executeTask(); $this->expectNextLease(null); } public function testRequiredLeaseTime() { $task = $this->scheduleAndExecuteTask( array( - 'getRequiredLeaseTime' => 1000000, + 'getRequiredLeaseTime' => 1000000, )); $this->assertTrue(($task->getLeaseExpires() - time()) > 1000); } public function testLeasedIsOldestFirst() { - // Tasks which expired earlier should lease first, all else being equal. - $task1 = $this->scheduleTask(); $task2 = $this->scheduleTask(); $task1->setLeaseOwner('test'); $task1->setLeaseExpires(time() - 100000); $task1->forceSaveWithoutLease(); $task2->setLeaseOwner('test'); $task2->setLeaseExpires(time() - 200000); $task2->forceSaveWithoutLease(); - $this->expectNextLease($task2); + $this->expectNextLease( + $task2, + 'Tasks which expired earlier should lease first, all else being equal.'); $this->expectNextLease($task1); } - private function expectNextLease($task) { + public function testLeasedIsHighestPriority() { + $task1 = $this->scheduleTask(array(), 2); + $task2 = $this->scheduleTask(array(), 1); + $task3 = $this->scheduleTask(array(), 1); + + $this->expectNextLease( + $task1, + 'Tasks with a higher priority should be scheduled first.'); + $this->expectNextLease( + $task2, + 'Tasks with the same priority should be FIFO.'); + $this->expectNextLease($task3); + } + + private function expectNextLease($task, $message = null) { $leased = id(new PhabricatorWorkerLeaseQuery()) ->setLimit(1) ->execute(); if ($task === null) { - $this->assertEqual(0, count($leased)); + $this->assertEqual(0, count($leased), $message); return null; } else { - $this->assertEqual(1, count($leased)); + $this->assertEqual(1, count($leased), $message); $this->assertEqual( (int)head($leased)->getID(), - (int)$task->getID()); + (int)$task->getID(), + $message); return head($leased); } } - private function scheduleAndExecuteTask(array $data = array()) { - $task = $this->scheduleTask($data); + private function scheduleAndExecuteTask( + array $data = array(), + $priority = null) { + + $task = $this->scheduleTask($data, $priority); $task = $this->expectNextLease($task); $task = $task->executeTask(); return $task; } - private function scheduleTask(array $data = array()) { - return PhabricatorWorker::scheduleTask('PhabricatorTestWorker', $data); + private function scheduleTask(array $data = array(), $priority = null) { + return PhabricatorWorker::scheduleTask( + 'PhabricatorTestWorker', + $data, + $priority); } } diff --git a/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php b/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php index ca0d5db5c6..bc2dd22cc8 100644 --- a/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php +++ b/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php @@ -1,234 +1,220 @@ ids = $ids; return $this; } public function setLimit($limit) { $this->limit = $limit; return $this; } public function execute() { if (!$this->limit) { throw new Exception('You must setLimit() when leasing tasks.'); } $task_table = new PhabricatorWorkerActiveTask(); $taskdata_table = new PhabricatorWorkerTaskData(); $lease_ownership_name = $this->getLeaseOwnershipName(); $conn_w = $task_table->establishConnection('w'); // Try to satisfy the request from new, unleased tasks first. If we don't // find enough tasks, try tasks with expired leases (i.e., tasks which have // previously failed). $phases = array( self::PHASE_UNLEASED, self::PHASE_EXPIRED, ); $limit = $this->limit; $leased = 0; foreach ($phases as $phase) { - // NOTE: If we issue `UPDATE ... WHERE ... ORDER BY id ASC`, the query // goes very, very slowly. The `ORDER BY` triggers this, although we get // the same apparent results without it. Without the ORDER BY, binary // read slaves complain that the query isn't repeatable. To avoid both // problems, do a SELECT and then an UPDATE. $rows = queryfx_all( $conn_w, 'SELECT id, leaseOwner FROM %T %Q %Q %Q', $task_table->getTableName(), $this->buildWhereClause($conn_w, $phase), $this->buildOrderClause($conn_w, $phase), $this->buildLimitClause($conn_w, $limit - $leased)); // NOTE: Sometimes, we'll race with another worker and they'll grab // this task before we do. We could reduce how often this happens by // selecting more tasks than we need, then shuffling them and trying // to lock only the number we're actually after. However, the amount // of time workers spend here should be very small relative to their // total runtime, so keep it simple for the moment. if ($rows) { queryfx( $conn_w, 'UPDATE %T task SET leaseOwner = %s, leaseExpires = UNIX_TIMESTAMP() + %d %Q', $task_table->getTableName(), $lease_ownership_name, self::getDefaultLeaseDuration(), $this->buildUpdateWhereClause($conn_w, $phase, $rows)); $leased += $conn_w->getAffectedRows(); if ($leased == $limit) { break; } } } if (!$leased) { return array(); } $data = queryfx_all( $conn_w, 'SELECT task.*, taskdata.data _taskData, UNIX_TIMESTAMP() _serverTime FROM %T task LEFT JOIN %T taskdata ON taskdata.id = task.dataID WHERE leaseOwner = %s AND leaseExpires > UNIX_TIMESTAMP() %Q %Q', $task_table->getTableName(), $taskdata_table->getTableName(), $lease_ownership_name, $this->buildOrderClause($conn_w, $phase), $this->buildLimitClause($conn_w, $limit)); $tasks = $task_table->loadAllFromArray($data); $tasks = mpull($tasks, null, 'getID'); foreach ($data as $row) { $tasks[$row['id']]->setServerTime($row['_serverTime']); if ($row['_taskData']) { $task_data = json_decode($row['_taskData'], true); } else { $task_data = null; } $tasks[$row['id']]->setData($task_data); } return $tasks; } private function buildWhereClause(AphrontDatabaseConnection $conn_w, $phase) { - $where = array(); switch ($phase) { case self::PHASE_UNLEASED: $where[] = 'leaseOwner IS NULL'; break; case self::PHASE_EXPIRED: $where[] = 'leaseExpires < UNIX_TIMESTAMP()'; break; default: throw new Exception("Unknown phase '{$phase}'!"); } if ($this->ids) { - $where[] = qsprintf( - $conn_w, - 'id IN (%Ld)', - $this->ids); + $where[] = qsprintf($conn_w, 'id IN (%Ld)', $this->ids); } return $this->formatWhereClause($where); } private function buildUpdateWhereClause( AphrontDatabaseConnection $conn_w, $phase, array $rows) { $where = array(); // NOTE: This is basically working around the MySQL behavior that // `IN (NULL)` doesn't match NULL. switch ($phase) { case self::PHASE_UNLEASED: - $where[] = qsprintf( - $conn_w, - 'leaseOwner IS NULL'); - $where[] = qsprintf( - $conn_w, - 'id IN (%Ld)', - ipull($rows, 'id')); + $where[] = qsprintf($conn_w, 'leaseOwner IS NULL'); + $where[] = qsprintf($conn_w, 'id IN (%Ld)', ipull($rows, 'id')); break; case self::PHASE_EXPIRED: $in = array(); foreach ($rows as $row) { $in[] = qsprintf( $conn_w, '(id = %d AND leaseOwner = %s)', $row['id'], $row['leaseOwner']); } - $where[] = qsprintf( - $conn_w, - '(%Q)', - implode(' OR ', $in)); + $where[] = qsprintf($conn_w, '(%Q)', implode(' OR ', $in)); break; default: throw new Exception("Unknown phase '{$phase}'!"); } return $this->formatWhereClause($where); - } private function buildOrderClause(AphrontDatabaseConnection $conn_w, $phase) { switch ($phase) { case self::PHASE_UNLEASED: - // When selecting new tasks, we want to consume them in roughly - // FIFO order, so we order by the task ID. + // When selecting new tasks, we want to consume them in order of + // decreasing priority (and then FIFO). return qsprintf($conn_w, 'ORDER BY id ASC'); case self::PHASE_EXPIRED: // When selecting failed tasks, we want to consume them in roughly // FIFO order of their failures, which is not necessarily their original // queue order. // Particularly, this is important for tasks which use soft failures to // indicate that they are waiting on other tasks to complete: we need to // push them to the end of the queue after they fail, at least on // average, so we don't deadlock retrying the same blocked task over // and over again. return qsprintf($conn_w, 'ORDER BY leaseExpires ASC'); default: throw new Exception(pht('Unknown phase "%s"!', $phase)); } } private function buildLimitClause(AphrontDatabaseConnection $conn_w, $limit) { return qsprintf($conn_w, 'LIMIT %d', $limit); } private function getLeaseOwnershipName() { static $sequence = 0; $parts = array( getmypid(), time(), php_uname('n'), ++$sequence, ); return implode(':', $parts); } } diff --git a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php index c40ebbb824..be5f51cbad 100644 --- a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php +++ b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php @@ -1,175 +1,175 @@ self::IDS_COUNTER, self::CONFIG_TIMESTAMPS => false, ) + parent::getConfiguration(); } public function setServerTime($server_time) { $this->serverTime = $server_time; $this->localTime = time(); return $this; } public function setLeaseDuration($lease_duration) { $this->checkLease(); $server_lease_expires = $this->serverTime + $lease_duration; $this->setLeaseExpires($server_lease_expires); // NOTE: This is primarily to allow unit tests to set negative lease // durations so they don't have to wait around for leases to expire. We // check that the lease is valid above. return $this->forceSaveWithoutLease(); } public function save() { $this->checkLease(); return $this->forceSaveWithoutLease(); } public function forceSaveWithoutLease() { $is_new = !$this->getID(); if ($is_new) { $this->failureCount = 0; } if ($is_new && ($this->getData() !== null)) { $data = new PhabricatorWorkerTaskData(); $data->setData($this->getData()); $data->save(); $this->setDataID($data->getID()); } return parent::save(); } protected function checkLease() { if ($this->leaseOwner) { $current_server_time = $this->serverTime + (time() - $this->localTime); if ($current_server_time >= $this->leaseExpires) { $id = $this->getID(); $class = $this->getTaskClass(); throw new Exception( "Trying to update Task {$id} ({$class}) after lease expiration!"); } } } public function delete() { throw new Exception( 'Active tasks can not be deleted directly. '. 'Use archiveTask() to move tasks to the archive.'); } public function archiveTask($result, $duration) { if ($this->getID() === null) { throw new Exception( "Attempting to archive a task which hasn't been save()d!"); } $this->checkLease(); $archive = id(new PhabricatorWorkerArchiveTask()) ->setID($this->getID()) ->setTaskClass($this->getTaskClass()) ->setLeaseOwner($this->getLeaseOwner()) ->setLeaseExpires($this->getLeaseExpires()) ->setFailureCount($this->getFailureCount()) ->setDataID($this->getDataID()) + ->setPriority($this->getPriority()) ->setResult($result) ->setDuration($duration); // NOTE: This deletes the active task (this object)! $archive->save(); return $archive; } public function executeTask() { // We do this outside of the try .. catch because we don't have permission // to release the lease otherwise. $this->checkLease(); $did_succeed = false; try { $worker = $this->getWorkerInstance(); $maximum_failures = $worker->getMaximumRetryCount(); if ($maximum_failures !== null) { if ($this->getFailureCount() > $maximum_failures) { $id = $this->getID(); throw new PhabricatorWorkerPermanentFailureException( "Task {$id} has exceeded the maximum number of failures ". "({$maximum_failures})."); } } $lease = $worker->getRequiredLeaseTime(); if ($lease !== null) { $this->setLeaseDuration($lease); } $t_start = microtime(true); $worker->executeTask(); $t_end = microtime(true); $duration = (int)(1000000 * ($t_end - $t_start)); $result = $this->archiveTask( PhabricatorWorkerArchiveTask::RESULT_SUCCESS, $duration); $did_succeed = true; } catch (PhabricatorWorkerPermanentFailureException $ex) { $result = $this->archiveTask( PhabricatorWorkerArchiveTask::RESULT_FAILURE, 0); $result->setExecutionException($ex); } catch (PhabricatorWorkerYieldException $ex) { $this->setExecutionException($ex); $retry = $ex->getDuration(); $retry = max($retry, 5); // NOTE: As a side effect, this saves the object. $this->setLeaseDuration($retry); $result = $this; } catch (Exception $ex) { $this->setExecutionException($ex); $this->setFailureCount($this->getFailureCount() + 1); $this->setFailureTime(time()); $retry = $worker->getWaitBeforeRetry($this); $retry = coalesce( $retry, PhabricatorWorkerLeaseQuery::getDefaultWaitBeforeRetry()); // NOTE: As a side effect, this saves the object. $this->setLeaseDuration($retry); $result = $this; } // NOTE: If this throws, we don't want it to cause the task to fail again, // so execute it out here and just let the exception escape. if ($did_succeed) { foreach ($worker->getQueuedTasks() as $task) { list($class, $data) = $task; - PhabricatorWorker::scheduleTask($class, $data); + PhabricatorWorker::scheduleTask($class, $data, $this->getPriority()); } } return $result; } - } diff --git a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerArchiveTask.php b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerArchiveTask.php index cf985ca880..23acdf226b 100644 --- a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerArchiveTask.php +++ b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerArchiveTask.php @@ -1,69 +1,69 @@ getID() === null) { - throw new Exception( - 'Trying to archive a task with no ID.'); + throw new Exception('Trying to archive a task with no ID.'); } $other = new PhabricatorWorkerActiveTask(); $conn_w = $this->establishConnection('w'); $this->openTransaction(); queryfx( $conn_w, 'DELETE FROM %T WHERE id = %d', $other->getTableName(), $this->getID()); $result = parent::insert(); $this->saveTransaction(); return $result; } public function delete() { $this->openTransaction(); if ($this->getDataID()) { $conn_w = $this->establishConnection('w'); $data_table = new PhabricatorWorkerTaskData(); queryfx( $conn_w, 'DELETE FROM %T WHERE id = %d', $data_table->getTableName(), $this->getDataID()); } $result = parent::delete(); $this->saveTransaction(); return $result; } public function unarchiveTask() { $this->openTransaction(); $active = id(new PhabricatorWorkerActiveTask()) ->setID($this->getID()) ->setTaskClass($this->getTaskClass()) ->setLeaseOwner(null) ->setLeaseExpires(0) ->setFailureCount(0) ->setDataID($this->getDataID()) + ->setPriority($this->getPriority()) ->insert(); $this->setDataID(null); $this->delete(); $this->saveTransaction(); return $active; } } diff --git a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTask.php b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTask.php index 6cff2d2c80..baa367a53d 100644 --- a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTask.php +++ b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTask.php @@ -1,55 +1,56 @@ executionException = $execution_exception; return $this; } - public function getExecutionException() { + final public function getExecutionException() { return $this->executionException; } - public function setData($data) { + final public function setData($data) { $this->data = $data; return $this; } - public function getData() { + final public function getData() { return $this->data; } - public function isArchived() { + final public function isArchived() { return ($this instanceof PhabricatorWorkerArchiveTask); } - public function getWorkerInstance() { + final public function getWorkerInstance() { $id = $this->getID(); $class = $this->getTaskClass(); if (!class_exists($class)) { throw new PhabricatorWorkerPermanentFailureException( "Task class '{$class}' does not exist!"); } if (!is_subclass_of($class, 'PhabricatorWorker')) { throw new PhabricatorWorkerPermanentFailureException( "Task class '{$class}' does not extend PhabricatorWorker."); } return newv($class, array($this->getData())); } } diff --git a/src/infrastructure/sms/adapter/PhabricatorSMSImplementationAdapter.php b/src/infrastructure/sms/adapter/PhabricatorSMSImplementationAdapter.php index 581a2cd3af..da13192eff 100644 --- a/src/infrastructure/sms/adapter/PhabricatorSMSImplementationAdapter.php +++ b/src/infrastructure/sms/adapter/PhabricatorSMSImplementationAdapter.php @@ -1,84 +1,86 @@ fromNumber = $number; return $this; } public function getFrom() { return $this->fromNumber; } public function setTo($number) { $this->toNumber = $number; return $this; } public function getTo() { return $this->toNumber; } public function setBody($body) { $this->body = $body; return $this; } public function getBody() { return $this->body; } /** * 16 characters or less, to be used in database columns and exposed * to administrators during configuration directly. */ abstract public function getProviderShortName(); /** * Send the message. Generally, this means connecting to some service and * handing data to it. SMS APIs are generally asynchronous, so truly * determining success or failure is probably impossible synchronously. * * That said, if the adapter determines that the SMS will never be * deliverable, or there is some other known failure, it should throw * an exception. * * @return null */ abstract public function send(); /** * Most (all?) SMS APIs are asynchronous, but some do send back some * initial information. Use this hook to determine what the updated * sentStatus should be and what the provider is using for an SMS ID, * as well as throw exceptions if there are any failures. * * @return array Tuple of ($sms_id and $sent_status) */ abstract public function getSMSDataFromResult($result); /** * Due to the asynchronous nature of sending SMS messages, it can be * necessary to poll the provider regarding the sent status of a given * sms. * * For now, this *MUST* be implemented and *MUST* work. */ abstract public function pollSMSSentStatus(PhabricatorSMS $sms); /** * Convenience function to handle sending an SMS. */ public static function sendSMS(array $to_numbers, $body) { PhabricatorWorker::scheduleTask( 'PhabricatorSMSDemultiplexWorker', array( 'toNumbers' => $to_numbers, - 'body' => $body)); + 'body' => $body, + ), + PhabricatorWorker::PRIORITY_ALERTS); } }