Page MenuHomePhabricator

D11419.id27435.diff
No OneTemporary

D11419.id27435.diff

diff --git a/resources/sql/autopatches/20150115.trigger.1.sql b/resources/sql/autopatches/20150115.trigger.1.sql
new file mode 100644
--- /dev/null
+++ b/resources/sql/autopatches/20150115.trigger.1.sql
@@ -0,0 +1,11 @@
+CREATE TABLE {$NAMESPACE}_worker.worker_trigger (
+ id INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ phid VARBINARY(64) NOT NULL,
+ triggerVersion INT UNSIGNED NOT NULL,
+ clockClass VARCHAR(64) NOT NULL COLLATE {$COLLATE_TEXT},
+ clockProperties LONGTEXT NOT NULL COLLATE {$COLLATE_TEXT},
+ actionClass VARCHAR(64) NOT NULL COLLATE {$COLLATE_TEXT},
+ actionProperties LONGTEXT NOT NULL COLLATE {$COLLATE_TEXT},
+ UNIQUE KEY `key_phid` (phid),
+ UNIQUE KEY `key_trigger` (triggerVersion)
+) ENGINE=InnoDB, COLLATE {$COLLATE_TEXT};
diff --git a/resources/sql/autopatches/20150115.trigger.2.sql b/resources/sql/autopatches/20150115.trigger.2.sql
new file mode 100644
--- /dev/null
+++ b/resources/sql/autopatches/20150115.trigger.2.sql
@@ -0,0 +1,8 @@
+CREATE TABLE {$NAMESPACE}_worker.worker_triggerevent (
+ id INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ triggerID INT UNSIGNED NOT NULL,
+ lastEventEpoch INT UNSIGNED,
+ nextEventEpoch INT UNSIGNED,
+ UNIQUE KEY `key_trigger` (triggerID),
+ KEY `key_next` (nextEventEpoch)
+) ENGINE=InnoDB, COLLATE {$COLLATE_TEXT};
diff --git a/src/__phutil_library_map__.php b/src/__phutil_library_map__.php
--- a/src/__phutil_library_map__.php
+++ b/src/__phutil_library_map__.php
@@ -1873,6 +1873,7 @@
'PhabricatorListFilterUIExample' => 'applications/uiexample/examples/PhabricatorListFilterUIExample.php',
'PhabricatorLocalDiskFileStorageEngine' => 'applications/files/engine/PhabricatorLocalDiskFileStorageEngine.php',
'PhabricatorLocalTimeTestCase' => 'view/__tests__/PhabricatorLocalTimeTestCase.php',
+ 'PhabricatorLogTriggerAction' => 'infrastructure/daemon/workers/action/PhabricatorLogTriggerAction.php',
'PhabricatorLogoutController' => 'applications/auth/controller/PhabricatorLogoutController.php',
'PhabricatorLunarPhasePolicyRule' => 'applications/policy/rule/PhabricatorLunarPhasePolicyRule.php',
'PhabricatorMacroApplication' => 'applications/macro/application/PhabricatorMacroApplication.php',
@@ -1968,6 +1969,7 @@
'PhabricatorMetaMTASchemaSpec' => 'applications/metamta/storage/PhabricatorMetaMTASchemaSpec.php',
'PhabricatorMetaMTASendGridReceiveController' => 'applications/metamta/controller/PhabricatorMetaMTASendGridReceiveController.php',
'PhabricatorMetaMTAWorker' => 'applications/metamta/PhabricatorMetaMTAWorker.php',
+ 'PhabricatorMetronomicTriggerClock' => 'infrastructure/daemon/workers/clock/PhabricatorMetronomicTriggerClock.php',
'PhabricatorMultiColumnUIExample' => 'applications/uiexample/examples/PhabricatorMultiColumnUIExample.php',
'PhabricatorMultiFactorSettingsPanel' => 'applications/settings/panel/PhabricatorMultiFactorSettingsPanel.php',
'PhabricatorMustVerifyEmailController' => 'applications/auth/controller/PhabricatorMustVerifyEmailController.php',
@@ -1977,6 +1979,7 @@
'PhabricatorNamedQuery' => 'applications/search/storage/PhabricatorNamedQuery.php',
'PhabricatorNamedQueryQuery' => 'applications/search/query/PhabricatorNamedQueryQuery.php',
'PhabricatorNavigationRemarkupRule' => 'infrastructure/markup/rule/PhabricatorNavigationRemarkupRule.php',
+ 'PhabricatorNeverTriggerClock' => 'infrastructure/daemon/workers/clock/PhabricatorNeverTriggerClock.php',
'PhabricatorNotificationAdHocFeedStory' => 'applications/notification/feed/PhabricatorNotificationAdHocFeedStory.php',
'PhabricatorNotificationBuilder' => 'applications/notification/builder/PhabricatorNotificationBuilder.php',
'PhabricatorNotificationClearController' => 'applications/notification/controller/PhabricatorNotificationClearController.php',
@@ -2522,8 +2525,10 @@
'PhabricatorTransformedFile' => 'applications/files/storage/PhabricatorTransformedFile.php',
'PhabricatorTranslation' => 'infrastructure/internationalization/translation/PhabricatorTranslation.php',
'PhabricatorTranslationsConfigOptions' => 'applications/config/option/PhabricatorTranslationsConfigOptions.php',
+ 'PhabricatorTriggerAction' => 'infrastructure/daemon/workers/action/PhabricatorTriggerAction.php',
'PhabricatorTriggerClock' => 'infrastructure/daemon/workers/clock/PhabricatorTriggerClock.php',
'PhabricatorTriggerClockTestCase' => 'infrastructure/daemon/workers/clock/__tests__/PhabricatorTriggerClockTestCase.php',
+ 'PhabricatorTriggerDaemon' => 'infrastructure/daemon/workers/PhabricatorTriggerDaemon.php',
'PhabricatorTrivialTestCase' => 'infrastructure/testing/__tests__/PhabricatorTrivialTestCase.php',
'PhabricatorTwitchAuthProvider' => 'applications/auth/provider/PhabricatorTwitchAuthProvider.php',
'PhabricatorTwitterAuthProvider' => 'applications/auth/provider/PhabricatorTwitterAuthProvider.php',
@@ -2591,6 +2596,10 @@
'PhabricatorWorkerTaskData' => 'infrastructure/daemon/workers/storage/PhabricatorWorkerTaskData.php',
'PhabricatorWorkerTaskDetailController' => 'applications/daemon/controller/PhabricatorWorkerTaskDetailController.php',
'PhabricatorWorkerTestCase' => 'infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php',
+ 'PhabricatorWorkerTrigger' => 'infrastructure/daemon/workers/storage/PhabricatorWorkerTrigger.php',
+ 'PhabricatorWorkerTriggerEvent' => 'infrastructure/daemon/workers/storage/PhabricatorWorkerTriggerEvent.php',
+ 'PhabricatorWorkerTriggerPHIDType' => 'infrastructure/daemon/workers/phid/PhabricatorWorkerTriggerPHIDType.php',
+ 'PhabricatorWorkerTriggerQuery' => 'infrastructure/daemon/workers/query/PhabricatorWorkerTriggerQuery.php',
'PhabricatorWorkerYieldException' => 'infrastructure/daemon/workers/exception/PhabricatorWorkerYieldException.php',
'PhabricatorWorkingCopyDiscoveryTestCase' => 'applications/repository/engine/__tests__/PhabricatorWorkingCopyDiscoveryTestCase.php',
'PhabricatorWorkingCopyPullTestCase' => 'applications/repository/engine/__tests__/PhabricatorWorkingCopyPullTestCase.php',
@@ -5083,6 +5092,7 @@
'PhabricatorListFilterUIExample' => 'PhabricatorUIExample',
'PhabricatorLocalDiskFileStorageEngine' => 'PhabricatorFileStorageEngine',
'PhabricatorLocalTimeTestCase' => 'PhabricatorTestCase',
+ 'PhabricatorLogTriggerAction' => 'PhabricatorTriggerAction',
'PhabricatorLogoutController' => 'PhabricatorAuthController',
'PhabricatorLunarPhasePolicyRule' => 'PhabricatorPolicyRule',
'PhabricatorMacroApplication' => 'PhabricatorApplication',
@@ -5169,6 +5179,7 @@
'PhabricatorMetaMTASchemaSpec' => 'PhabricatorConfigSchemaSpec',
'PhabricatorMetaMTASendGridReceiveController' => 'PhabricatorMetaMTAController',
'PhabricatorMetaMTAWorker' => 'PhabricatorWorker',
+ 'PhabricatorMetronomicTriggerClock' => 'PhabricatorTriggerClock',
'PhabricatorMultiColumnUIExample' => 'PhabricatorUIExample',
'PhabricatorMultiFactorSettingsPanel' => 'PhabricatorSettingsPanel',
'PhabricatorMustVerifyEmailController' => 'PhabricatorAuthController',
@@ -5181,6 +5192,7 @@
),
'PhabricatorNamedQueryQuery' => 'PhabricatorCursorPagedPolicyAwareQuery',
'PhabricatorNavigationRemarkupRule' => 'PhutilRemarkupRule',
+ 'PhabricatorNeverTriggerClock' => 'PhabricatorTriggerClock',
'PhabricatorNotificationAdHocFeedStory' => 'PhabricatorFeedStory',
'PhabricatorNotificationClearController' => 'PhabricatorNotificationController',
'PhabricatorNotificationConfigOptions' => 'PhabricatorApplicationConfigOptions',
@@ -5778,8 +5790,10 @@
'PhabricatorTransactionsApplication' => 'PhabricatorApplication',
'PhabricatorTransformedFile' => 'PhabricatorFileDAO',
'PhabricatorTranslationsConfigOptions' => 'PhabricatorApplicationConfigOptions',
+ 'PhabricatorTriggerAction' => 'Phobject',
'PhabricatorTriggerClock' => 'Phobject',
'PhabricatorTriggerClockTestCase' => 'PhabricatorTestCase',
+ 'PhabricatorTriggerDaemon' => 'PhabricatorDaemon',
'PhabricatorTrivialTestCase' => 'PhabricatorTestCase',
'PhabricatorTwitchAuthProvider' => 'PhabricatorOAuth2AuthProvider',
'PhabricatorTwitterAuthProvider' => 'PhabricatorOAuth1AuthProvider',
@@ -5857,6 +5871,10 @@
'PhabricatorWorkerTaskData' => 'PhabricatorWorkerDAO',
'PhabricatorWorkerTaskDetailController' => 'PhabricatorDaemonController',
'PhabricatorWorkerTestCase' => 'PhabricatorTestCase',
+ 'PhabricatorWorkerTrigger' => 'PhabricatorWorkerDAO',
+ 'PhabricatorWorkerTriggerEvent' => 'PhabricatorWorkerDAO',
+ 'PhabricatorWorkerTriggerPHIDType' => 'PhabricatorPHIDType',
+ 'PhabricatorWorkerTriggerQuery' => 'PhabricatorOffsetPagedQuery',
'PhabricatorWorkerYieldException' => 'Exception',
'PhabricatorWorkingCopyDiscoveryTestCase' => 'PhabricatorWorkingCopyTestCase',
'PhabricatorWorkingCopyPullTestCase' => 'PhabricatorWorkingCopyTestCase',
diff --git a/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php b/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php
--- a/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php
+++ b/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php
@@ -189,6 +189,18 @@
->setTasks($upcoming)
->setNoDataString(pht('Task queue is empty.')));
+ $triggers = id(new PhabricatorWorkerTriggerQuery())
+ ->setOrder(PhabricatorWorkerTriggerQuery::ORDER_EXECUTION)
+ ->needEvents(true)
+ ->setLimit(10)
+ ->execute();
+
+ $triggers_table = $this->buildTriggersTable($triggers);
+
+ $triggers_panel = id(new PHUIObjectBoxView())
+ ->setHeaderText(pht('Upcoming Triggers'))
+ ->appendChild($triggers_table);
+
$crumbs = $this->buildApplicationCrumbs();
$crumbs->addTextCrumb(pht('Console'));
@@ -202,6 +214,7 @@
$queued_panel,
$leased_panel,
$upcoming_panel,
+ $triggers_panel,
));
return $this->buildApplicationPage(
@@ -212,4 +225,47 @@
));
}
+ private function buildTriggersTable(array $triggers) {
+ $viewer = $this->getViewer();
+
+ $rows = array();
+ foreach ($triggers as $trigger) {
+ $event = $trigger->getEvent();
+ if ($event) {
+ $last_epoch = $event->getLastEventEpoch();
+ $next_epoch = $event->getNextEventEpoch();
+ } else {
+ $last_epoch = null;
+ $next_epoch = null;
+ }
+
+ $rows[] = array(
+ $trigger->getID(),
+ $trigger->getClockClass(),
+ $trigger->getActionClass(),
+ $last_epoch ? phabricator_datetime($last_epoch, $viewer) : null,
+ $next_epoch ? phabricator_datetime($next_epoch, $viewer) : null,
+ );
+ }
+
+ return id(new AphrontTableView($rows))
+ ->setNoDataString(pht('There are no upcoming event triggers.'))
+ ->setHeaders(
+ array(
+ 'ID',
+ 'Clock',
+ 'Action',
+ 'Last',
+ 'Next',
+ ))
+ ->setColumnClasses(
+ array(
+ '',
+ '',
+ 'wide',
+ 'date',
+ 'date',
+ ));
+ }
+
}
diff --git a/src/infrastructure/daemon/workers/PhabricatorTriggerDaemon.php b/src/infrastructure/daemon/workers/PhabricatorTriggerDaemon.php
new file mode 100644
--- /dev/null
+++ b/src/infrastructure/daemon/workers/PhabricatorTriggerDaemon.php
@@ -0,0 +1,290 @@
+<?php
+
+/**
+ * Schedule and execute event triggers, which run code at specific times.
+ */
+final class PhabricatorTriggerDaemon
+ extends PhabricatorDaemon {
+
+ const COUNTER_VERSION = 'trigger.version';
+ const COUNTER_CURSOR = 'trigger.cursor';
+
+ protected function run() {
+
+ // The trigger daemon is a low-level infrastructure daemon which schedules
+ // and executes chronological events. Examples include a subscription which
+ // generates a bill on the 12th of every month, or a reminder email 15
+ // minutes before a meeting.
+
+ // Only one trigger daemon can run at a time, and very little work should
+ // happen in the daemon process. In general, triggered events should
+ // just schedule a task into the normal daemon worker queue and then
+ // return. This allows the real work to take longer to execute without
+ // disrupting other triggers.
+
+ // The trigger mechanism guarantees that events will execute exactly once,
+ // but does not guarantee that they will execute at precisely the specified
+ // time. Under normal circumstances, they should execute within a minute or
+ // so of the desired time, so this mechanism can be used for things like
+ // meeting reminders.
+
+ // If the trigger queue backs up (for example, because it is overwhelmed by
+ // trigger updates, doesn't run for a while, or a trigger action is written
+ // inefficiently) or the daemon queue backs up (usually for similar
+ // reasons), events may execute an arbitrarily long time after they were
+ // scheduled to execute. In some cases (like billing a subscription) this
+ // may be desirable; in other cases (like sending a meeting reminder) the
+ // action may want to check the current time and see if the event is still
+ // relevant.
+
+ // The trigger daemon works in two phases:
+ //
+ // 1. A scheduling phase processes recently updated triggers and
+ // schedules them for future execution. For example, this phase would
+ // see that a meeting trigger had been changed recently, determine
+ // when the reminder for it should execute, and then schedule the
+ // action to execute at that future date.
+ // 2. An execution phase runs the actions for any scheduled events which
+ // are due to execute.
+ //
+ // The major goal of this design is to deliver on the guarantee that events
+ // will execute exactly once. It prevents race conditions in scheduling
+ // and execution by ensuring there is only one writer for either of these
+ // phases. Without this separation of responsibilities, web processes
+ // trying to reschedule events after an update could race with other web
+ // processes or the daemon.
+
+ do {
+ $lock = PhabricatorGlobalLock::newLock('trigger');
+
+ try {
+ $lock->lock(5);
+ } catch (PhutilLockException $ex) {
+ throw new PhutilProxyException(
+ pht(
+ 'Another process is holding the trigger lock. Usually, this '.
+ 'means another copy of the trigger daemon is running elsewhere. '.
+ 'Multiple processes are not permitted to update triggers '.
+ 'simultaneously.'),
+ $ex);
+ }
+
+ // Run the scheduling phase. This finds updated triggers which we have
+ // not scheduled yet and schedules them.
+ $last_version = $this->loadCurrentCursor();
+ $head_version = $this->loadCurrentVersion();
+
+ // The cursor points at the next record to process, so we can only skip
+ // this step if we're ahead of the version number.
+ if ($last_version <= $head_version) {
+ $this->scheduleTriggers($last_version);
+ }
+
+ // Run the execution phase. This finds events which are due to execute
+ // and runs them.
+ $this->executeTriggers();
+
+ $lock->unlock();
+
+ $this->sleep($this->getSleepDuration());
+ } while (!$this->shouldExit());
+ }
+
+
+ /**
+ * Process all of the triggers which have been updated since the last time
+ * the daemon ran, scheduling them into the event table.
+ *
+ * @param int Cursor for the next version update to process.
+ * @return void
+ */
+ private function scheduleTriggers($cursor) {
+ $limit = 100;
+
+ $query = id(new PhabricatorWorkerTriggerQuery())
+ ->withVersionBetween($cursor, null)
+ ->setOrder(PhabricatorWorkerTriggerQuery::ORDER_VERSION)
+ ->needEvents(true)
+ ->setLimit($limit);
+ while (true) {
+ $triggers = $query->execute();
+
+ foreach ($triggers as $trigger) {
+ $event = $trigger->getEvent();
+ if ($event) {
+ $last_epoch = $event->getLastEventEpoch();
+ } else {
+ $last_epoch = null;
+ }
+
+ $next_epoch = $trigger->getNextEventEpoch(
+ $last_epoch,
+ $is_reschedule = false);
+
+ $new_event = PhabricatorWorkerTriggerEvent::initializeNewEvent($trigger)
+ ->setLastEventEpoch($last_epoch)
+ ->setNextEventEpoch($next_epoch);
+
+ $new_event->openTransaction();
+ if ($event) {
+ $event->delete();
+ }
+
+ // Always save the new event. Note that we save it even if the next
+ // epoch is `null`, indicating that it will never fire, because we
+ // would lose the last epoch information if we delete it.
+ //
+ // In particular, some events may want to execute exactly once.
+ // Retaining the last epoch allows them to do this, even if the
+ // trigger is updated.
+ $new_event->save();
+
+ // Move the cursor forward to make sure we don't reprocess this
+ // trigger until it is updated again.
+ $this->updateCursor($trigger->getTriggerVersion() + 1);
+ $new_event->saveTransaction();
+ }
+
+ // If we saw fewer than a full page of updated triggers, we're caught
+ // up, so we can move on to the execution phase.
+ if (count($triggers) < $limit) {
+ break;
+ }
+
+ // Otherwise, skip past the stuff we just processed and grab another
+ // page of updated triggers.
+ $min = last($triggers)->getTriggerVersion() + 1;
+ $query->withVersionBetween($min, null);
+
+ $this->stillWorking();
+ }
+ }
+
+
+ /**
+ * Run scheduled event triggers which are due for execution.
+ *
+ * @return void
+ */
+ private function executeTriggers() {
+
+ // We run only a limited number of triggers before ending the execution
+ // phase. If we ran until exhaustion, we could end up executing very
+ // out-of-date triggers if there was a long backlog: trigger changes
+ // during this phase are not reflected in the event table until we run
+ // another scheduling phase.
+
+ // If we exit this phase with triggers still ready to execute we'll
+ // jump back into the scheduling phase immediately, so this just makes
+ // sure we don't spend an unreasonably long amount of time without
+ // processing trigger updates and doing rescheduling.
+
+ $limit = 100;
+ $now = PhabricatorTime::getNow();
+
+ $triggers = id(new PhabricatorWorkerTriggerQuery())
+ ->setOrder(PhabricatorWorkerTriggerQuery::ORDER_EXECUTION)
+ ->withNextEventBetween(null, $now)
+ ->needEvents(true)
+ ->setLimit($limit)
+ ->execute();
+ foreach ($triggers as $trigger) {
+ $event = $trigger->getEvent();
+
+ // Execute the trigger action.
+ $trigger->executeTrigger(
+ $event->getLastEventEpoch(),
+ $event->getNextEventEpoch());
+
+ // Now that we've executed the trigger, the current trigger epoch is
+ // going to become the last epoch.
+ $last_epoch = $event->getNextEventEpoch();
+
+ // If this is a recurring trigger, give it an opportunity to reschedule.
+ $reschedule_epoch = $trigger->getNextEventEpoch(
+ $last_epoch,
+ $is_reschedule = true);
+
+ // Don't reschedule events unless the next occurrence is in the future.
+ if (($reschedule_epoch !== null) &&
+ ($last_epoch !== null) &&
+ ($reschedule_epoch <= $last_epoch)) {
+ throw new Exception(
+ pht(
+ 'Trigger is attempting to perform a routine reschedule where '.
+ 'the next event (at %s) does not occur after the previous event '.
+ '(at %s). Routine reschedules must strictly move event triggers '.
+ 'forward through time to avoid executing a trigger an infinite '.
+ 'number of times instantaneously.',
+ $reschedule_epoch,
+ $last_epoch));
+ }
+
+ $new_event = PhabricatorWorkerTriggerEvent::initializeNewEvent($trigger)
+ ->setLastEventEpoch($last_epoch)
+ ->setNextEventEpoch($reschedule_epoch);
+
+ $event->openTransaction();
+ // Remove the event we just processed.
+ $event->delete();
+
+ // See note in the scheduling phase about this; we save the new event
+ // even if the next epoch is `null`.
+ $new_event->save();
+ $event->saveTransaction();
+ }
+ }
+
+
+ /**
+ * Get the number of seconds to sleep for before starting the next scheduling
+ * phase.
+ *
+ * If no events are scheduled soon, we'll sleep for 60 seconds. Otherwise,
+ * we'll sleep until the next scheduled event.
+ *
+ * @return int Number of seconds to sleep for.
+ */
+ private function getSleepDuration() {
+ $sleep = 60;
+
+ $next_triggers = id(new PhabricatorWorkerTriggerQuery())
+ ->setOrder(PhabricatorWorkerTriggerQuery::ORDER_EXECUTION)
+ ->setLimit(1)
+ ->needEvents(true)
+ ->execute();
+ if ($next_triggers) {
+ $next_trigger = head($next_triggers);
+ $next_epoch = $next_trigger->getEvent()->getNextEventEpoch();
+ $until = max(0, $next_epoch - PhabricatorTime::getNow());
+ $sleep = min($sleep, $until);
+ }
+
+ return $sleep;
+ }
+
+
+/* -( Counters )----------------------------------------------------------- */
+
+
+ private function loadCurrentCursor() {
+ return $this->loadCurrentCounter(self::COUNTER_CURSOR);
+ }
+
+ private function loadCurrentVersion() {
+ return $this->loadCurrentCounter(self::COUNTER_VERSION);
+ }
+
+ private function updateCursor($value) {
+ LiskDAO::overwriteCounterValue(
+ id(new PhabricatorWorkerTrigger())->establishConnection('w'),
+ self::COUNTER_CURSOR,
+ $value);
+ }
+
+ private function loadCurrentCounter($counter_name) {
+ return (int)LiskDAO::loadCurrentCounterValue(
+ id(new PhabricatorWorkerTrigger())->establishConnection('w'),
+ $counter_name);
+ }
+}
diff --git a/src/infrastructure/daemon/workers/action/PhabricatorLogTriggerAction.php b/src/infrastructure/daemon/workers/action/PhabricatorLogTriggerAction.php
new file mode 100644
--- /dev/null
+++ b/src/infrastructure/daemon/workers/action/PhabricatorLogTriggerAction.php
@@ -0,0 +1,30 @@
+<?php
+
+/**
+ * Trivial action which logs a message.
+ *
+ * This action is primarily useful for testing triggers.
+ */
+final class PhabricatorLogTriggerAction
+ extends PhabricatorTriggerAction {
+
+ public function validateProperties(array $properties) {
+ PhutilTypeSpec::checkMap(
+ $properties,
+ array(
+ 'message' => 'string',
+ ));
+ }
+
+ public function execute($last_epoch, $this_epoch) {
+ $message = pht(
+ '(%s -> %s @ %s) %s',
+ $last_epoch ? date('Y-m-d g:i:s A', $last_epoch) : 'null',
+ date('Y-m-d g:i:s A', $this_epoch),
+ date('Y-m-d g:i:s A', PhabricatorTime::getNow()),
+ $this->getProperty('message'));
+
+ phlog($message);
+ }
+
+}
diff --git a/src/infrastructure/daemon/workers/action/PhabricatorTriggerAction.php b/src/infrastructure/daemon/workers/action/PhabricatorTriggerAction.php
new file mode 100644
--- /dev/null
+++ b/src/infrastructure/daemon/workers/action/PhabricatorTriggerAction.php
@@ -0,0 +1,66 @@
+<?php
+
+/**
+ * A trigger action reacts to a scheduled event.
+ */
+abstract class PhabricatorTriggerAction extends Phobject {
+
+ private $properties;
+
+ public function __construct(array $properties) {
+ $this->validateProperties($properties);
+ $this->properties = $properties;
+ }
+
+ public function getProperties() {
+ return $this->properties;
+ }
+
+ public function getProperty($key, $default = null) {
+ return idx($this->properties, $key, $default);
+ }
+
+
+ /**
+ * Validate action configuration.
+ *
+ * @param map<string, wild> Map of action properties.
+ * @return void
+ */
+ abstract public function validateProperties(array $properties);
+
+
+ /**
+ * Execute this action.
+ *
+ * IMPORTANT: Trigger actions must execute quickly!
+ *
+ * In most cases, trigger actions should queue a worker task and then exit.
+ * The actual trigger execution occurs in a locked section in the trigger
+ * daemon and blocks all other triggers. By queueing a task instead of
+ * performing processing directly, triggers can execute more involved actions
+ * without blocking other triggers.
+ *
+ * An action may trigger a long time after it is scheduled. For example,
+ * a meeting reminder may be scheduled at 9:45 AM, but the action may not
+ * execute until later (for example, because the server was down for
+ * maintenance). You can detect cases like this by comparing `$this_epoch`
+ * (which holds the time the event was scheduled to execute at) to
+ * `PhabricatorTime::getNow()` (which returns the current time). In the
+ * case of a meeting reminder, you may want to ignore the action if it
+ * executes too late to be useful (for example, after a meeting is over).
+ *
+ * Because actions should normally queue a task and there may be a second,
+ * arbitrarily long delay between trigger execution and task execution, it
+ * may be simplest to pass the trigger time to the task and then make the
+ * decision to discard the action there.
+ *
+ * @param int|null Last time the event occurred, or null if it has never
+ * triggered before.
+ * @param int The scheduled time for the current action. This may be
+ * significantly different from the current time.
+ * @return void
+ */
+ abstract public function execute($last_epoch, $this_epoch);
+
+}
diff --git a/src/infrastructure/daemon/workers/clock/PhabricatorMetronomicTriggerClock.php b/src/infrastructure/daemon/workers/clock/PhabricatorMetronomicTriggerClock.php
new file mode 100644
--- /dev/null
+++ b/src/infrastructure/daemon/workers/clock/PhabricatorMetronomicTriggerClock.php
@@ -0,0 +1,33 @@
+<?php
+
+/**
+ * Triggers an event repeatedly, delaying a fixed number of seconds between
+ * triggers.
+ *
+ * For example, this clock can trigger an event every 30 seconds.
+ */
+final class PhabricatorMetronomicTriggerClock
+ extends PhabricatorTriggerClock {
+
+ public function validateProperties(array $properties) {
+ PhutilTypeSpec::checkMap(
+ $properties,
+ array(
+ 'period' => 'int',
+ ));
+ }
+
+ public function getNextEventEpoch($last_epoch, $is_reschedule) {
+ $period = $this->getProperty('period');
+
+ if ($last_epoch) {
+ $next = $last_epoch + $period;
+ $next = max($next, $last_epoch + 1);
+ } else {
+ $next = PhabricatorTime::getNow() + $period;
+ }
+
+ return $next;
+ }
+
+}
diff --git a/src/infrastructure/daemon/workers/clock/PhabricatorNeverTriggerClock.php b/src/infrastructure/daemon/workers/clock/PhabricatorNeverTriggerClock.php
new file mode 100644
--- /dev/null
+++ b/src/infrastructure/daemon/workers/clock/PhabricatorNeverTriggerClock.php
@@ -0,0 +1,21 @@
+<?php
+
+/**
+ * Never triggers an event.
+ *
+ * This clock can be used for testing, or to cancel events.
+ */
+final class PhabricatorNeverTriggerClock
+ extends PhabricatorTriggerClock {
+
+ public function validateProperties(array $properties) {
+ PhutilTypeSpec::checkMap(
+ $properties,
+ array());
+ }
+
+ public function getNextEventEpoch($last_epoch, $is_reschedule) {
+ return null;
+ }
+
+}
diff --git a/src/infrastructure/daemon/workers/clock/__tests__/PhabricatorTriggerClockTestCase.php b/src/infrastructure/daemon/workers/clock/__tests__/PhabricatorTriggerClockTestCase.php
--- a/src/infrastructure/daemon/workers/clock/__tests__/PhabricatorTriggerClockTestCase.php
+++ b/src/infrastructure/daemon/workers/clock/__tests__/PhabricatorTriggerClockTestCase.php
@@ -21,6 +21,15 @@
pht('Should trigger only once.'));
}
+ public function testNeverTriggerClock() {
+ $clock = new PhabricatorNeverTriggerClock(array());
+
+ $this->assertEqual(
+ null,
+ $clock->getNextEventEpoch(null, false),
+ pht('Should never trigger.'));
+ }
+
public function testSubscriptionTriggerClock() {
$start = strtotime('2014-01-31 2:34:56 UTC');
diff --git a/src/infrastructure/daemon/workers/phid/PhabricatorWorkerTriggerPHIDType.php b/src/infrastructure/daemon/workers/phid/PhabricatorWorkerTriggerPHIDType.php
new file mode 100644
--- /dev/null
+++ b/src/infrastructure/daemon/workers/phid/PhabricatorWorkerTriggerPHIDType.php
@@ -0,0 +1,41 @@
+<?php
+
+final class PhabricatorWorkerTriggerPHIDType extends PhabricatorPHIDType {
+
+ const TYPECONST = 'TRIG';
+
+ public function getTypeName() {
+ return pht('Trigger');
+ }
+
+ public function newObject() {
+ return new PhabricatorWorkerTriggerPHIDType();
+ }
+
+ protected function buildQueryForObjects(
+ PhabricatorObjectQuery $query,
+ array $phids) {
+
+ // TODO: Maybe straighten this out eventually, but these aren't policy
+ // objects and don't have an applicable query which we can return here.
+ // Since we should never call this normally, just leave it stubbed for
+ // now.
+
+ throw new PhutilMethodNotImplementedException();
+ }
+
+ public function loadHandles(
+ PhabricatorHandleQuery $query,
+ array $handles,
+ array $objects) {
+
+ foreach ($handles as $phid => $handle) {
+ $trigger = $objects[$phid];
+
+ $id = $trigger->getID();
+
+ $handle->setName(pht('Trigger %d', $id));
+ }
+ }
+
+}
diff --git a/src/infrastructure/daemon/workers/query/PhabricatorWorkerTriggerQuery.php b/src/infrastructure/daemon/workers/query/PhabricatorWorkerTriggerQuery.php
new file mode 100644
--- /dev/null
+++ b/src/infrastructure/daemon/workers/query/PhabricatorWorkerTriggerQuery.php
@@ -0,0 +1,178 @@
+<?php
+
+final class PhabricatorWorkerTriggerQuery
+ extends PhabricatorOffsetPagedQuery {
+
+ const ORDER_EXECUTION = 'execution';
+ const ORDER_VERSION = 'version';
+
+ private $versionMin;
+ private $versionMax;
+ private $nextEpochMin;
+ private $nextEpochMax;
+
+ private $needEvents;
+ private $order = self::ORDER_EXECUTION;
+
+ public function withVersionBetween($min, $max) {
+ $this->versionMin = $min;
+ $this->versionMax = $max;
+ return $this;
+ }
+
+ public function withNextEventBetween($min, $max) {
+ $this->nextEpochMin = $min;
+ $this->nextEpochMax = $max;
+ return $this;
+ }
+
+ public function needEvents($need_events) {
+ $this->needEvents = $need_events;
+ return $this;
+ }
+
+ public function setOrder($order) {
+ $this->order = $order;
+ return $this;
+ }
+
+ public function execute() {
+ $task_table = new PhabricatorWorkerTrigger();
+
+ $conn_r = $task_table->establishConnection('r');
+
+ $rows = queryfx_all(
+ $conn_r,
+ 'SELECT t.* FROM %T t %Q %Q %Q %Q',
+ $task_table->getTableName(),
+ $this->buildJoinClause($conn_r),
+ $this->buildWhereClause($conn_r),
+ $this->buildOrderClause($conn_r),
+ $this->buildLimitClause($conn_r));
+
+ $triggers = $task_table->loadAllFromArray($rows);
+
+ if ($triggers) {
+ if ($this->needEvents) {
+ $ids = mpull($triggers, 'getID');
+
+ $events = id(new PhabricatorWorkerTriggerEvent())->loadAllWhere(
+ 'triggerID IN (%Ld)',
+ $ids);
+ $events = mpull($events, null, 'getTriggerID');
+
+ foreach ($triggers as $key => $trigger) {
+ $event = idx($events, $trigger->getID());
+ $trigger->attachEvent($event);
+ }
+ }
+
+ foreach ($triggers as $key => $trigger) {
+ $clock_class = $trigger->getClockClass();
+ if (!is_subclass_of($clock_class, 'PhabricatorTriggerClock')) {
+ unset($triggers[$key]);
+ continue;
+ }
+
+ try {
+ $argv = array($trigger->getClockProperties());
+ $clock = newv($clock_class, $argv);
+ } catch (Exception $ex) {
+ unset($triggers[$key]);
+ continue;
+ }
+
+ $trigger->attachClock($clock);
+ }
+
+
+ foreach ($triggers as $key => $trigger) {
+ $action_class = $trigger->getActionClass();
+ if (!is_subclass_of($action_class, 'PhabricatorTriggerAction')) {
+ unset($triggers[$key]);
+ continue;
+ }
+
+ try {
+ $argv = array($trigger->getActionProperties());
+ $action = newv($action_class, $argv);
+ } catch (Exception $ex) {
+ unset($triggers[$key]);
+ continue;
+ }
+
+ $trigger->attachAction($action);
+ }
+ }
+
+ return $triggers;
+ }
+
+ private function buildJoinClause(AphrontDatabaseConnection $conn_r) {
+ $joins = array();
+
+ if (($this->nextEpochMin !== null) ||
+ ($this->nextEpochMax !== null) ||
+ ($this->order == PhabricatorWorkerTriggerQuery::ORDER_EXECUTION)) {
+ $joins[] = qsprintf(
+ $conn_r,
+ 'JOIN %T e ON e.triggerID = t.id',
+ id(new PhabricatorWorkerTriggerEvent())->getTableName());
+ }
+
+ return implode(' ', $joins);
+ }
+
+ private function buildWhereClause(AphrontDatabaseConnection $conn_r) {
+ $where = array();
+
+ if ($this->versionMin !== null) {
+ $where[] = qsprintf(
+ $conn_r,
+ 't.triggerVersion >= %d',
+ $this->versionMin);
+ }
+
+ if ($this->versionMax !== null) {
+ $where[] = qsprintf(
+ $conn_r,
+ 't.triggerVersion <= %d',
+ $this->versionMax);
+ }
+
+ if ($this->nextEpochMin !== null) {
+ $where[] = qsprintf(
+ $conn_r,
+ 'e.nextEventEpoch >= %d',
+ $this->nextEpochMin);
+ }
+
+ if ($this->nextEpochMax !== null) {
+ $where[] = qsprintf(
+ $conn_r,
+ 'e.nextEventEpoch <= %d',
+ $this->nextEpochMax);
+ }
+
+ return $this->formatWhereClause($where);
+ }
+
+ private function buildOrderClause(AphrontDatabaseConnection $conn_r) {
+ switch ($this->order) {
+ case self::ORDER_EXECUTION:
+ return qsprintf(
+ $conn_r,
+ 'ORDER BY e.nextEventEpoch ASC, e.id ASC');
+ case self::ORDER_VERSION:
+ return qsprintf(
+ $conn_r,
+ 'ORDER BY t.triggerVersion ASC');
+ default:
+ throw new Exception(
+ pht(
+ 'Unsupported order "%s".',
+ $this->order));
+ }
+ }
+
+}
diff --git a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTrigger.php b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTrigger.php
new file mode 100644
--- /dev/null
+++ b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTrigger.php
@@ -0,0 +1,130 @@
+<?php
+
+final class PhabricatorWorkerTrigger
+ extends PhabricatorWorkerDAO {
+
+ protected $triggerVersion;
+ protected $clockClass;
+ protected $clockProperties;
+ protected $actionClass;
+ protected $actionProperties;
+
+ private $action = self::ATTACHABLE;
+ private $clock = self::ATTACHABLE;
+ private $event = self::ATTACHABLE;
+
+ protected function getConfiguration() {
+ return array(
+ self::CONFIG_TIMESTAMPS => false,
+ self::CONFIG_AUX_PHID => true,
+ self::CONFIG_SERIALIZATION => array(
+ 'clockProperties' => self::SERIALIZATION_JSON,
+ 'actionProperties' => self::SERIALIZATION_JSON,
+ ),
+ self::CONFIG_COLUMN_SCHEMA => array(
+ 'triggerVersion' => 'uint32',
+ 'clockClass' => 'text64',
+ 'actionClass' => 'text64',
+ ),
+ self::CONFIG_KEY_SCHEMA => array(
+ 'key_trigger' => array(
+ 'columns' => array('triggerVersion'),
+ 'unique' => true,
+ ),
+ ),
+ ) + parent::getConfiguration();
+ }
+
+ public function save() {
+ $conn_w = $this->establishConnection('w');
+
+ $this->openTransaction();
+ $next_version = LiskDAO::loadNextCounterValue(
+ $conn_w,
+ PhabricatorTriggerDaemon::COUNTER_VERSION);
+ $this->setTriggerVersion($next_version);
+
+ $result = parent::save();
+ $this->saveTransaction();
+
+ return $this;
+ }
+
+ public function generatePHID() {
+ return PhabricatorPHID::generateNewPHID(
+ PhabricatorWorkerTriggerPHIDType::TYPECONST);
+ }
+
+ /**
+ * Return the next time this trigger should execute.
+ *
+ * This method can be called either after the daemon executed the trigger
+ * successfully (giving the trigger an opportunity to reschedule itself
+ * into the future, if it is a recurring event) or after the trigger itself
+ * is changed (usually because of an application edit). The `$is_reschedule`
+ * parameter distinguishes between these cases.
+ *
+ * @param int|null Epoch of the most recent successful event execution.
+ * @param bool `true` if we're trying to reschedule the event after
+ * execution; `false` if this is in response to a trigger update.
+ * @return int|null Return an epoch to schedule the next event execution,
+ * or `null` to stop the event from executing again.
+ */
+ public function getNextEventEpoch($last_epoch, $is_reschedule) {
+ return $this->getClock()->getNextEventEpoch($last_epoch, $is_reschedule);
+ }
+
+
+ /**
+ * Execute the event.
+ *
+ * @param int|null Epoch of previous execution, or null if this is the first
+ * execution.
+ * @param int Scheduled epoch of this execution. This may not be the same
+ * as the current time.
+ * @return void
+ */
+ public function executeTrigger($last_event, $this_event) {
+ return $this->getAction()->execute($last_event, $this_event);
+ }
+
+ public function getEvent() {
+ return $this->assertAttached($this->event);
+ }
+
+ public function attachEvent(PhabricatorWorkerTriggerEvent $event = null) {
+ $this->event = $event;
+ return $this;
+ }
+
+ public function setAction(PhabricatorTriggerAction $action) {
+ $this->actionClass = get_class($action);
+ $this->actionProperties = $action->getProperties();
+ return $this->attachAction($action);
+ }
+
+ public function getAction() {
+ return $this->assertAttached($this->action);
+ }
+
+ public function attachAction(PhabricatorTriggerAction $action) {
+ $this->action = $action;
+ return $this;
+ }
+
+ public function setClock(PhabricatorTriggerClock $clock) {
+ $this->clockClass = get_class($clock);
+ $this->clockProperties = $clock->getProperties();
+ return $this->attachClock($clock);
+ }
+
+ public function getClock() {
+ return $this->assertAttached($this->clock);
+ }
+
+ public function attachClock(PhabricatorTriggerClock $clock) {
+ $this->clock = $clock;
+ return $this;
+ }
+
+}
diff --git a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTriggerEvent.php b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTriggerEvent.php
new file mode 100644
--- /dev/null
+++ b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTriggerEvent.php
@@ -0,0 +1,35 @@
+<?php
+
+final class PhabricatorWorkerTriggerEvent
+ extends PhabricatorWorkerDAO {
+
+ protected $triggerID;
+ protected $lastEventEpoch;
+ protected $nextEventEpoch;
+
+ protected function getConfiguration() {
+ return array(
+ self::CONFIG_TIMESTAMPS => false,
+ self::CONFIG_COLUMN_SCHEMA => array(
+ 'lastEventEpoch' => 'epoch?',
+ 'nextEventEpoch' => 'epoch?',
+ ),
+ self::CONFIG_KEY_SCHEMA => array(
+ 'key_trigger' => array(
+ 'columns' => array('triggerID'),
+ 'unique' => true,
+ ),
+ 'key_next' => array(
+ 'columns' => array('nextEventEpoch'),
+ ),
+ ),
+ ) + parent::getConfiguration();
+ }
+
+ public static function initializeNewEvent(PhabricatorWorkerTrigger $trigger) {
+ $event = new PhabricatorWorkerTriggerEvent();
+ $event->setTriggerID($trigger->getID());
+ return $event;
+ }
+
+}

File Metadata

Mime Type
text/plain
Expires
Wed, Mar 19, 8:00 AM (3 w, 10 h ago)
Storage Engine
amazon-s3
Storage Format
Encrypted (AES-256-CBC)
Storage Handle
phabricator/secure/yo/mc/4nb3rdvaiugihevg
Default Alt Text
D11419.id27435.diff (39 KB)

Event Timeline