diff --git a/resources/sql/autopatches/20150115.trigger.1.sql b/resources/sql/autopatches/20150115.trigger.1.sql new file mode 100644 --- /dev/null +++ b/resources/sql/autopatches/20150115.trigger.1.sql @@ -0,0 +1,11 @@ +CREATE TABLE {$NAMESPACE}_worker.worker_trigger ( + id INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, + phid VARBINARY(64) NOT NULL, + triggerVersion INT UNSIGNED NOT NULL, + clockClass VARCHAR(64) NOT NULL COLLATE {$COLLATE_TEXT}, + clockProperties LONGTEXT NOT NULL COLLATE {$COLLATE_TEXT}, + actionClass VARCHAR(64) NOT NULL COLLATE {$COLLATE_TEXT}, + actionProperties LONGTEXT NOT NULL COLLATE {$COLLATE_TEXT}, + UNIQUE KEY `key_phid` (phid), + UNIQUE KEY `key_trigger` (triggerVersion) +) ENGINE=InnoDB, COLLATE {$COLLATE_TEXT}; diff --git a/resources/sql/autopatches/20150115.trigger.2.sql b/resources/sql/autopatches/20150115.trigger.2.sql new file mode 100644 --- /dev/null +++ b/resources/sql/autopatches/20150115.trigger.2.sql @@ -0,0 +1,8 @@ +CREATE TABLE {$NAMESPACE}_worker.worker_triggerevent ( + id INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, + triggerID INT UNSIGNED NOT NULL, + lastEventEpoch INT UNSIGNED, + nextEventEpoch INT UNSIGNED, + UNIQUE KEY `key_trigger` (triggerID), + KEY `key_next` (nextEventEpoch) +) ENGINE=InnoDB, COLLATE {$COLLATE_TEXT}; diff --git a/src/__phutil_library_map__.php b/src/__phutil_library_map__.php --- a/src/__phutil_library_map__.php +++ b/src/__phutil_library_map__.php @@ -1873,6 +1873,7 @@ 'PhabricatorListFilterUIExample' => 'applications/uiexample/examples/PhabricatorListFilterUIExample.php', 'PhabricatorLocalDiskFileStorageEngine' => 'applications/files/engine/PhabricatorLocalDiskFileStorageEngine.php', 'PhabricatorLocalTimeTestCase' => 'view/__tests__/PhabricatorLocalTimeTestCase.php', + 'PhabricatorLogTriggerAction' => 'infrastructure/daemon/workers/action/PhabricatorLogTriggerAction.php', 'PhabricatorLogoutController' => 'applications/auth/controller/PhabricatorLogoutController.php', 'PhabricatorLunarPhasePolicyRule' => 'applications/policy/rule/PhabricatorLunarPhasePolicyRule.php', 'PhabricatorMacroApplication' => 'applications/macro/application/PhabricatorMacroApplication.php', @@ -1968,6 +1969,7 @@ 'PhabricatorMetaMTASchemaSpec' => 'applications/metamta/storage/PhabricatorMetaMTASchemaSpec.php', 'PhabricatorMetaMTASendGridReceiveController' => 'applications/metamta/controller/PhabricatorMetaMTASendGridReceiveController.php', 'PhabricatorMetaMTAWorker' => 'applications/metamta/PhabricatorMetaMTAWorker.php', + 'PhabricatorMetronomicTriggerClock' => 'infrastructure/daemon/workers/clock/PhabricatorMetronomicTriggerClock.php', 'PhabricatorMultiColumnUIExample' => 'applications/uiexample/examples/PhabricatorMultiColumnUIExample.php', 'PhabricatorMultiFactorSettingsPanel' => 'applications/settings/panel/PhabricatorMultiFactorSettingsPanel.php', 'PhabricatorMustVerifyEmailController' => 'applications/auth/controller/PhabricatorMustVerifyEmailController.php', @@ -1977,6 +1979,7 @@ 'PhabricatorNamedQuery' => 'applications/search/storage/PhabricatorNamedQuery.php', 'PhabricatorNamedQueryQuery' => 'applications/search/query/PhabricatorNamedQueryQuery.php', 'PhabricatorNavigationRemarkupRule' => 'infrastructure/markup/rule/PhabricatorNavigationRemarkupRule.php', + 'PhabricatorNeverTriggerClock' => 'infrastructure/daemon/workers/clock/PhabricatorNeverTriggerClock.php', 'PhabricatorNotificationAdHocFeedStory' => 'applications/notification/feed/PhabricatorNotificationAdHocFeedStory.php', 'PhabricatorNotificationBuilder' => 'applications/notification/builder/PhabricatorNotificationBuilder.php', 'PhabricatorNotificationClearController' => 'applications/notification/controller/PhabricatorNotificationClearController.php', @@ -2522,8 +2525,10 @@ 'PhabricatorTransformedFile' => 'applications/files/storage/PhabricatorTransformedFile.php', 'PhabricatorTranslation' => 'infrastructure/internationalization/translation/PhabricatorTranslation.php', 'PhabricatorTranslationsConfigOptions' => 'applications/config/option/PhabricatorTranslationsConfigOptions.php', + 'PhabricatorTriggerAction' => 'infrastructure/daemon/workers/action/PhabricatorTriggerAction.php', 'PhabricatorTriggerClock' => 'infrastructure/daemon/workers/clock/PhabricatorTriggerClock.php', 'PhabricatorTriggerClockTestCase' => 'infrastructure/daemon/workers/clock/__tests__/PhabricatorTriggerClockTestCase.php', + 'PhabricatorTriggerDaemon' => 'infrastructure/daemon/workers/PhabricatorTriggerDaemon.php', 'PhabricatorTrivialTestCase' => 'infrastructure/testing/__tests__/PhabricatorTrivialTestCase.php', 'PhabricatorTwitchAuthProvider' => 'applications/auth/provider/PhabricatorTwitchAuthProvider.php', 'PhabricatorTwitterAuthProvider' => 'applications/auth/provider/PhabricatorTwitterAuthProvider.php', @@ -2591,6 +2596,10 @@ 'PhabricatorWorkerTaskData' => 'infrastructure/daemon/workers/storage/PhabricatorWorkerTaskData.php', 'PhabricatorWorkerTaskDetailController' => 'applications/daemon/controller/PhabricatorWorkerTaskDetailController.php', 'PhabricatorWorkerTestCase' => 'infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php', + 'PhabricatorWorkerTrigger' => 'infrastructure/daemon/workers/storage/PhabricatorWorkerTrigger.php', + 'PhabricatorWorkerTriggerEvent' => 'infrastructure/daemon/workers/storage/PhabricatorWorkerTriggerEvent.php', + 'PhabricatorWorkerTriggerPHIDType' => 'infrastructure/daemon/workers/phid/PhabricatorWorkerTriggerPHIDType.php', + 'PhabricatorWorkerTriggerQuery' => 'infrastructure/daemon/workers/query/PhabricatorWorkerTriggerQuery.php', 'PhabricatorWorkerYieldException' => 'infrastructure/daemon/workers/exception/PhabricatorWorkerYieldException.php', 'PhabricatorWorkingCopyDiscoveryTestCase' => 'applications/repository/engine/__tests__/PhabricatorWorkingCopyDiscoveryTestCase.php', 'PhabricatorWorkingCopyPullTestCase' => 'applications/repository/engine/__tests__/PhabricatorWorkingCopyPullTestCase.php', @@ -5083,6 +5092,7 @@ 'PhabricatorListFilterUIExample' => 'PhabricatorUIExample', 'PhabricatorLocalDiskFileStorageEngine' => 'PhabricatorFileStorageEngine', 'PhabricatorLocalTimeTestCase' => 'PhabricatorTestCase', + 'PhabricatorLogTriggerAction' => 'PhabricatorTriggerAction', 'PhabricatorLogoutController' => 'PhabricatorAuthController', 'PhabricatorLunarPhasePolicyRule' => 'PhabricatorPolicyRule', 'PhabricatorMacroApplication' => 'PhabricatorApplication', @@ -5169,6 +5179,7 @@ 'PhabricatorMetaMTASchemaSpec' => 'PhabricatorConfigSchemaSpec', 'PhabricatorMetaMTASendGridReceiveController' => 'PhabricatorMetaMTAController', 'PhabricatorMetaMTAWorker' => 'PhabricatorWorker', + 'PhabricatorMetronomicTriggerClock' => 'PhabricatorTriggerClock', 'PhabricatorMultiColumnUIExample' => 'PhabricatorUIExample', 'PhabricatorMultiFactorSettingsPanel' => 'PhabricatorSettingsPanel', 'PhabricatorMustVerifyEmailController' => 'PhabricatorAuthController', @@ -5181,6 +5192,7 @@ ), 'PhabricatorNamedQueryQuery' => 'PhabricatorCursorPagedPolicyAwareQuery', 'PhabricatorNavigationRemarkupRule' => 'PhutilRemarkupRule', + 'PhabricatorNeverTriggerClock' => 'PhabricatorTriggerClock', 'PhabricatorNotificationAdHocFeedStory' => 'PhabricatorFeedStory', 'PhabricatorNotificationClearController' => 'PhabricatorNotificationController', 'PhabricatorNotificationConfigOptions' => 'PhabricatorApplicationConfigOptions', @@ -5778,8 +5790,10 @@ 'PhabricatorTransactionsApplication' => 'PhabricatorApplication', 'PhabricatorTransformedFile' => 'PhabricatorFileDAO', 'PhabricatorTranslationsConfigOptions' => 'PhabricatorApplicationConfigOptions', + 'PhabricatorTriggerAction' => 'Phobject', 'PhabricatorTriggerClock' => 'Phobject', 'PhabricatorTriggerClockTestCase' => 'PhabricatorTestCase', + 'PhabricatorTriggerDaemon' => 'PhabricatorDaemon', 'PhabricatorTrivialTestCase' => 'PhabricatorTestCase', 'PhabricatorTwitchAuthProvider' => 'PhabricatorOAuth2AuthProvider', 'PhabricatorTwitterAuthProvider' => 'PhabricatorOAuth1AuthProvider', @@ -5857,6 +5871,10 @@ 'PhabricatorWorkerTaskData' => 'PhabricatorWorkerDAO', 'PhabricatorWorkerTaskDetailController' => 'PhabricatorDaemonController', 'PhabricatorWorkerTestCase' => 'PhabricatorTestCase', + 'PhabricatorWorkerTrigger' => 'PhabricatorWorkerDAO', + 'PhabricatorWorkerTriggerEvent' => 'PhabricatorWorkerDAO', + 'PhabricatorWorkerTriggerPHIDType' => 'PhabricatorPHIDType', + 'PhabricatorWorkerTriggerQuery' => 'PhabricatorOffsetPagedQuery', 'PhabricatorWorkerYieldException' => 'Exception', 'PhabricatorWorkingCopyDiscoveryTestCase' => 'PhabricatorWorkingCopyTestCase', 'PhabricatorWorkingCopyPullTestCase' => 'PhabricatorWorkingCopyTestCase', diff --git a/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php b/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php --- a/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php +++ b/src/applications/daemon/controller/PhabricatorDaemonConsoleController.php @@ -189,6 +189,18 @@ ->setTasks($upcoming) ->setNoDataString(pht('Task queue is empty.'))); + $triggers = id(new PhabricatorWorkerTriggerQuery()) + ->setOrder(PhabricatorWorkerTriggerQuery::ORDER_EXECUTION) + ->needEvents(true) + ->setLimit(10) + ->execute(); + + $triggers_table = $this->buildTriggersTable($triggers); + + $triggers_panel = id(new PHUIObjectBoxView()) + ->setHeaderText(pht('Upcoming Triggers')) + ->appendChild($triggers_table); + $crumbs = $this->buildApplicationCrumbs(); $crumbs->addTextCrumb(pht('Console')); @@ -202,6 +214,7 @@ $queued_panel, $leased_panel, $upcoming_panel, + $triggers_panel, )); return $this->buildApplicationPage( @@ -212,4 +225,47 @@ )); } + private function buildTriggersTable(array $triggers) { + $viewer = $this->getViewer(); + + $rows = array(); + foreach ($triggers as $trigger) { + $event = $trigger->getEvent(); + if ($event) { + $last_epoch = $event->getLastEventEpoch(); + $next_epoch = $event->getNextEventEpoch(); + } else { + $last_epoch = null; + $next_epoch = null; + } + + $rows[] = array( + $trigger->getID(), + $trigger->getClockClass(), + $trigger->getActionClass(), + $last_epoch ? phabricator_datetime($last_epoch, $viewer) : null, + $next_epoch ? phabricator_datetime($next_epoch, $viewer) : null, + ); + } + + return id(new AphrontTableView($rows)) + ->setNoDataString(pht('There are no upcoming event triggers.')) + ->setHeaders( + array( + 'ID', + 'Clock', + 'Action', + 'Last', + 'Next', + )) + ->setColumnClasses( + array( + '', + '', + 'wide', + 'date', + 'date', + )); + } + } diff --git a/src/infrastructure/daemon/workers/PhabricatorTriggerDaemon.php b/src/infrastructure/daemon/workers/PhabricatorTriggerDaemon.php new file mode 100644 --- /dev/null +++ b/src/infrastructure/daemon/workers/PhabricatorTriggerDaemon.php @@ -0,0 +1,290 @@ +lock(5); + } catch (PhutilLockException $ex) { + throw new PhutilProxyException( + pht( + 'Another process is holding the trigger lock. Usually, this '. + 'means another copy of the trigger daemon is running elsewhere. '. + 'Multiple processes are not permitted to update triggers '. + 'simultaneously.'), + $ex); + } + + // Run the scheduling phase. This finds updated triggers which we have + // not scheduled yet and schedules them. + $last_version = $this->loadCurrentCursor(); + $head_version = $this->loadCurrentVersion(); + + // The cursor points at the next record to process, so we can only skip + // this step if we're ahead of the version number. + if ($last_version <= $head_version) { + $this->scheduleTriggers($last_version); + } + + // Run the execution phase. This finds events which are due to execute + // and runs them. + $this->executeTriggers(); + + $lock->unlock(); + + $this->sleep($this->getSleepDuration()); + } while (!$this->shouldExit()); + } + + + /** + * Process all of the triggers which have been updated since the last time + * the daemon ran, scheduling them into the event table. + * + * @param int Cursor for the next version update to process. + * @return void + */ + private function scheduleTriggers($cursor) { + $limit = 100; + + $query = id(new PhabricatorWorkerTriggerQuery()) + ->withVersionBetween($cursor, null) + ->setOrder(PhabricatorWorkerTriggerQuery::ORDER_VERSION) + ->needEvents(true) + ->setLimit($limit); + while (true) { + $triggers = $query->execute(); + + foreach ($triggers as $trigger) { + $event = $trigger->getEvent(); + if ($event) { + $last_epoch = $event->getLastEventEpoch(); + } else { + $last_epoch = null; + } + + $next_epoch = $trigger->getNextEventEpoch( + $last_epoch, + $is_reschedule = false); + + $new_event = PhabricatorWorkerTriggerEvent::initializeNewEvent($trigger) + ->setLastEventEpoch($last_epoch) + ->setNextEventEpoch($next_epoch); + + $new_event->openTransaction(); + if ($event) { + $event->delete(); + } + + // Always save the new event. Note that we save it even if the next + // epoch is `null`, indicating that it will never fire, because we + // would lose the last epoch information if we delete it. + // + // In particular, some events may want to execute exactly once. + // Retaining the last epoch allows them to do this, even if the + // trigger is updated. + $new_event->save(); + + // Move the cursor forward to make sure we don't reprocess this + // trigger until it is updated again. + $this->updateCursor($trigger->getTriggerVersion() + 1); + $new_event->saveTransaction(); + } + + // If we saw fewer than a full page of updated triggers, we're caught + // up, so we can move on to the execution phase. + if (count($triggers) < $limit) { + break; + } + + // Otherwise, skip past the stuff we just processed and grab another + // page of updated triggers. + $min = last($triggers)->getTriggerVersion() + 1; + $query->withVersionBetween($min, null); + + $this->stillWorking(); + } + } + + + /** + * Run scheduled event triggers which are due for execution. + * + * @return void + */ + private function executeTriggers() { + + // We run only a limited number of triggers before ending the execution + // phase. If we ran until exhaustion, we could end up executing very + // out-of-date triggers if there was a long backlog: trigger changes + // during this phase are not reflected in the event table until we run + // another scheduling phase. + + // If we exit this phase with triggers still ready to execute we'll + // jump back into the scheduling phase immediately, so this just makes + // sure we don't spend an unreasonably long amount of time without + // processing trigger updates and doing rescheduling. + + $limit = 100; + $now = PhabricatorTime::getNow(); + + $triggers = id(new PhabricatorWorkerTriggerQuery()) + ->setOrder(PhabricatorWorkerTriggerQuery::ORDER_EXECUTION) + ->withNextEventBetween(null, $now) + ->needEvents(true) + ->setLimit($limit) + ->execute(); + foreach ($triggers as $trigger) { + $event = $trigger->getEvent(); + + // Execute the trigger action. + $trigger->executeTrigger( + $event->getLastEventEpoch(), + $event->getNextEventEpoch()); + + // Now that we've executed the trigger, the current trigger epoch is + // going to become the last epoch. + $last_epoch = $event->getNextEventEpoch(); + + // If this is a recurring trigger, give it an opportunity to reschedule. + $reschedule_epoch = $trigger->getNextEventEpoch( + $last_epoch, + $is_reschedule = true); + + // Don't reschedule events unless the next occurrence is in the future. + if (($reschedule_epoch !== null) && + ($last_epoch !== null) && + ($reschedule_epoch <= $last_epoch)) { + throw new Exception( + pht( + 'Trigger is attempting to perform a routine reschedule where '. + 'the next event (at %s) does not occur after the previous event '. + '(at %s). Routine reschedules must strictly move event triggers '. + 'forward through time to avoid executing a trigger an infinite '. + 'number of times instantaneously.', + $reschedule_epoch, + $last_epoch)); + } + + $new_event = PhabricatorWorkerTriggerEvent::initializeNewEvent($trigger) + ->setLastEventEpoch($last_epoch) + ->setNextEventEpoch($reschedule_epoch); + + $event->openTransaction(); + // Remove the event we just processed. + $event->delete(); + + // See note in the scheduling phase about this; we save the new event + // even if the next epoch is `null`. + $new_event->save(); + $event->saveTransaction(); + } + } + + + /** + * Get the number of seconds to sleep for before starting the next scheduling + * phase. + * + * If no events are scheduled soon, we'll sleep for 60 seconds. Otherwise, + * we'll sleep until the next scheduled event. + * + * @return int Number of seconds to sleep for. + */ + private function getSleepDuration() { + $sleep = 60; + + $next_triggers = id(new PhabricatorWorkerTriggerQuery()) + ->setOrder(PhabricatorWorkerTriggerQuery::ORDER_EXECUTION) + ->setLimit(1) + ->needEvents(true) + ->execute(); + if ($next_triggers) { + $next_trigger = head($next_triggers); + $next_epoch = $next_trigger->getEvent()->getNextEventEpoch(); + $until = max(0, $next_epoch - PhabricatorTime::getNow()); + $sleep = min($sleep, $until); + } + + return $sleep; + } + + +/* -( Counters )----------------------------------------------------------- */ + + + private function loadCurrentCursor() { + return $this->loadCurrentCounter(self::COUNTER_CURSOR); + } + + private function loadCurrentVersion() { + return $this->loadCurrentCounter(self::COUNTER_VERSION); + } + + private function updateCursor($value) { + LiskDAO::overwriteCounterValue( + id(new PhabricatorWorkerTrigger())->establishConnection('w'), + self::COUNTER_CURSOR, + $value); + } + + private function loadCurrentCounter($counter_name) { + return (int)LiskDAO::loadCurrentCounterValue( + id(new PhabricatorWorkerTrigger())->establishConnection('w'), + $counter_name); + } +} diff --git a/src/infrastructure/daemon/workers/action/PhabricatorLogTriggerAction.php b/src/infrastructure/daemon/workers/action/PhabricatorLogTriggerAction.php new file mode 100644 --- /dev/null +++ b/src/infrastructure/daemon/workers/action/PhabricatorLogTriggerAction.php @@ -0,0 +1,30 @@ + 'string', + )); + } + + public function execute($last_epoch, $this_epoch) { + $message = pht( + '(%s -> %s @ %s) %s', + $last_epoch ? date('Y-m-d g:i:s A', $last_epoch) : 'null', + date('Y-m-d g:i:s A', $this_epoch), + date('Y-m-d g:i:s A', PhabricatorTime::getNow()), + $this->getProperty('message')); + + phlog($message); + } + +} diff --git a/src/infrastructure/daemon/workers/action/PhabricatorTriggerAction.php b/src/infrastructure/daemon/workers/action/PhabricatorTriggerAction.php new file mode 100644 --- /dev/null +++ b/src/infrastructure/daemon/workers/action/PhabricatorTriggerAction.php @@ -0,0 +1,66 @@ +validateProperties($properties); + $this->properties = $properties; + } + + public function getProperties() { + return $this->properties; + } + + public function getProperty($key, $default = null) { + return idx($this->properties, $key, $default); + } + + + /** + * Validate action configuration. + * + * @param map Map of action properties. + * @return void + */ + abstract public function validateProperties(array $properties); + + + /** + * Execute this action. + * + * IMPORTANT: Trigger actions must execute quickly! + * + * In most cases, trigger actions should queue a worker task and then exit. + * The actual trigger execution occurs in a locked section in the trigger + * daemon and blocks all other triggers. By queueing a task instead of + * performing processing directly, triggers can execute more involved actions + * without blocking other triggers. + * + * An action may trigger a long time after it is scheduled. For example, + * a meeting reminder may be scheduled at 9:45 AM, but the action may not + * execute until later (for example, because the server was down for + * maintenance). You can detect cases like this by comparing `$this_epoch` + * (which holds the time the event was scheduled to execute at) to + * `PhabricatorTime::getNow()` (which returns the current time). In the + * case of a meeting reminder, you may want to ignore the action if it + * executes too late to be useful (for example, after a meeting is over). + * + * Because actions should normally queue a task and there may be a second, + * arbitrarily long delay between trigger execution and task execution, it + * may be simplest to pass the trigger time to the task and then make the + * decision to discard the action there. + * + * @param int|null Last time the event occurred, or null if it has never + * triggered before. + * @param int The scheduled time for the current action. This may be + * significantly different from the current time. + * @return void + */ + abstract public function execute($last_epoch, $this_epoch); + +} diff --git a/src/infrastructure/daemon/workers/clock/PhabricatorMetronomicTriggerClock.php b/src/infrastructure/daemon/workers/clock/PhabricatorMetronomicTriggerClock.php new file mode 100644 --- /dev/null +++ b/src/infrastructure/daemon/workers/clock/PhabricatorMetronomicTriggerClock.php @@ -0,0 +1,33 @@ + 'int', + )); + } + + public function getNextEventEpoch($last_epoch, $is_reschedule) { + $period = $this->getProperty('period'); + + if ($last_epoch) { + $next = $last_epoch + $period; + $next = max($next, $last_epoch + 1); + } else { + $next = PhabricatorTime::getNow() + $period; + } + + return $next; + } + +} diff --git a/src/infrastructure/daemon/workers/clock/PhabricatorNeverTriggerClock.php b/src/infrastructure/daemon/workers/clock/PhabricatorNeverTriggerClock.php new file mode 100644 --- /dev/null +++ b/src/infrastructure/daemon/workers/clock/PhabricatorNeverTriggerClock.php @@ -0,0 +1,21 @@ +assertEqual( + null, + $clock->getNextEventEpoch(null, false), + pht('Should never trigger.')); + } + public function testSubscriptionTriggerClock() { $start = strtotime('2014-01-31 2:34:56 UTC'); diff --git a/src/infrastructure/daemon/workers/phid/PhabricatorWorkerTriggerPHIDType.php b/src/infrastructure/daemon/workers/phid/PhabricatorWorkerTriggerPHIDType.php new file mode 100644 --- /dev/null +++ b/src/infrastructure/daemon/workers/phid/PhabricatorWorkerTriggerPHIDType.php @@ -0,0 +1,41 @@ + $handle) { + $trigger = $objects[$phid]; + + $id = $trigger->getID(); + + $handle->setName(pht('Trigger %d', $id)); + } + } + +} diff --git a/src/infrastructure/daemon/workers/query/PhabricatorWorkerTriggerQuery.php b/src/infrastructure/daemon/workers/query/PhabricatorWorkerTriggerQuery.php new file mode 100644 --- /dev/null +++ b/src/infrastructure/daemon/workers/query/PhabricatorWorkerTriggerQuery.php @@ -0,0 +1,178 @@ +versionMin = $min; + $this->versionMax = $max; + return $this; + } + + public function withNextEventBetween($min, $max) { + $this->nextEpochMin = $min; + $this->nextEpochMax = $max; + return $this; + } + + public function needEvents($need_events) { + $this->needEvents = $need_events; + return $this; + } + + public function setOrder($order) { + $this->order = $order; + return $this; + } + + public function execute() { + $task_table = new PhabricatorWorkerTrigger(); + + $conn_r = $task_table->establishConnection('r'); + + $rows = queryfx_all( + $conn_r, + 'SELECT t.* FROM %T t %Q %Q %Q %Q', + $task_table->getTableName(), + $this->buildJoinClause($conn_r), + $this->buildWhereClause($conn_r), + $this->buildOrderClause($conn_r), + $this->buildLimitClause($conn_r)); + + $triggers = $task_table->loadAllFromArray($rows); + + if ($triggers) { + if ($this->needEvents) { + $ids = mpull($triggers, 'getID'); + + $events = id(new PhabricatorWorkerTriggerEvent())->loadAllWhere( + 'triggerID IN (%Ld)', + $ids); + $events = mpull($events, null, 'getTriggerID'); + + foreach ($triggers as $key => $trigger) { + $event = idx($events, $trigger->getID()); + $trigger->attachEvent($event); + } + } + + foreach ($triggers as $key => $trigger) { + $clock_class = $trigger->getClockClass(); + if (!is_subclass_of($clock_class, 'PhabricatorTriggerClock')) { + unset($triggers[$key]); + continue; + } + + try { + $argv = array($trigger->getClockProperties()); + $clock = newv($clock_class, $argv); + } catch (Exception $ex) { + unset($triggers[$key]); + continue; + } + + $trigger->attachClock($clock); + } + + + foreach ($triggers as $key => $trigger) { + $action_class = $trigger->getActionClass(); + if (!is_subclass_of($action_class, 'PhabricatorTriggerAction')) { + unset($triggers[$key]); + continue; + } + + try { + $argv = array($trigger->getActionProperties()); + $action = newv($action_class, $argv); + } catch (Exception $ex) { + unset($triggers[$key]); + continue; + } + + $trigger->attachAction($action); + } + } + + return $triggers; + } + + private function buildJoinClause(AphrontDatabaseConnection $conn_r) { + $joins = array(); + + if (($this->nextEpochMin !== null) || + ($this->nextEpochMax !== null) || + ($this->order == PhabricatorWorkerTriggerQuery::ORDER_EXECUTION)) { + $joins[] = qsprintf( + $conn_r, + 'JOIN %T e ON e.triggerID = t.id', + id(new PhabricatorWorkerTriggerEvent())->getTableName()); + } + + return implode(' ', $joins); + } + + private function buildWhereClause(AphrontDatabaseConnection $conn_r) { + $where = array(); + + if ($this->versionMin !== null) { + $where[] = qsprintf( + $conn_r, + 't.triggerVersion >= %d', + $this->versionMin); + } + + if ($this->versionMax !== null) { + $where[] = qsprintf( + $conn_r, + 't.triggerVersion <= %d', + $this->versionMax); + } + + if ($this->nextEpochMin !== null) { + $where[] = qsprintf( + $conn_r, + 'e.nextEventEpoch >= %d', + $this->nextEpochMin); + } + + if ($this->nextEpochMax !== null) { + $where[] = qsprintf( + $conn_r, + 'e.nextEventEpoch <= %d', + $this->nextEpochMax); + } + + return $this->formatWhereClause($where); + } + + private function buildOrderClause(AphrontDatabaseConnection $conn_r) { + switch ($this->order) { + case self::ORDER_EXECUTION: + return qsprintf( + $conn_r, + 'ORDER BY e.nextEventEpoch ASC, e.id ASC'); + case self::ORDER_VERSION: + return qsprintf( + $conn_r, + 'ORDER BY t.triggerVersion ASC'); + default: + throw new Exception( + pht( + 'Unsupported order "%s".', + $this->order)); + } + } + +} diff --git a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTrigger.php b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTrigger.php new file mode 100644 --- /dev/null +++ b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTrigger.php @@ -0,0 +1,130 @@ + false, + self::CONFIG_AUX_PHID => true, + self::CONFIG_SERIALIZATION => array( + 'clockProperties' => self::SERIALIZATION_JSON, + 'actionProperties' => self::SERIALIZATION_JSON, + ), + self::CONFIG_COLUMN_SCHEMA => array( + 'triggerVersion' => 'uint32', + 'clockClass' => 'text64', + 'actionClass' => 'text64', + ), + self::CONFIG_KEY_SCHEMA => array( + 'key_trigger' => array( + 'columns' => array('triggerVersion'), + 'unique' => true, + ), + ), + ) + parent::getConfiguration(); + } + + public function save() { + $conn_w = $this->establishConnection('w'); + + $this->openTransaction(); + $next_version = LiskDAO::loadNextCounterValue( + $conn_w, + PhabricatorTriggerDaemon::COUNTER_VERSION); + $this->setTriggerVersion($next_version); + + $result = parent::save(); + $this->saveTransaction(); + + return $this; + } + + public function generatePHID() { + return PhabricatorPHID::generateNewPHID( + PhabricatorWorkerTriggerPHIDType::TYPECONST); + } + + /** + * Return the next time this trigger should execute. + * + * This method can be called either after the daemon executed the trigger + * successfully (giving the trigger an opportunity to reschedule itself + * into the future, if it is a recurring event) or after the trigger itself + * is changed (usually because of an application edit). The `$is_reschedule` + * parameter distinguishes between these cases. + * + * @param int|null Epoch of the most recent successful event execution. + * @param bool `true` if we're trying to reschedule the event after + * execution; `false` if this is in response to a trigger update. + * @return int|null Return an epoch to schedule the next event execution, + * or `null` to stop the event from executing again. + */ + public function getNextEventEpoch($last_epoch, $is_reschedule) { + return $this->getClock()->getNextEventEpoch($last_epoch, $is_reschedule); + } + + + /** + * Execute the event. + * + * @param int|null Epoch of previous execution, or null if this is the first + * execution. + * @param int Scheduled epoch of this execution. This may not be the same + * as the current time. + * @return void + */ + public function executeTrigger($last_event, $this_event) { + return $this->getAction()->execute($last_event, $this_event); + } + + public function getEvent() { + return $this->assertAttached($this->event); + } + + public function attachEvent(PhabricatorWorkerTriggerEvent $event = null) { + $this->event = $event; + return $this; + } + + public function setAction(PhabricatorTriggerAction $action) { + $this->actionClass = get_class($action); + $this->actionProperties = $action->getProperties(); + return $this->attachAction($action); + } + + public function getAction() { + return $this->assertAttached($this->action); + } + + public function attachAction(PhabricatorTriggerAction $action) { + $this->action = $action; + return $this; + } + + public function setClock(PhabricatorTriggerClock $clock) { + $this->clockClass = get_class($clock); + $this->clockProperties = $clock->getProperties(); + return $this->attachClock($clock); + } + + public function getClock() { + return $this->assertAttached($this->clock); + } + + public function attachClock(PhabricatorTriggerClock $clock) { + $this->clock = $clock; + return $this; + } + +} diff --git a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTriggerEvent.php b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTriggerEvent.php new file mode 100644 --- /dev/null +++ b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerTriggerEvent.php @@ -0,0 +1,35 @@ + false, + self::CONFIG_COLUMN_SCHEMA => array( + 'lastEventEpoch' => 'epoch?', + 'nextEventEpoch' => 'epoch?', + ), + self::CONFIG_KEY_SCHEMA => array( + 'key_trigger' => array( + 'columns' => array('triggerID'), + 'unique' => true, + ), + 'key_next' => array( + 'columns' => array('nextEventEpoch'), + ), + ), + ) + parent::getConfiguration(); + } + + public static function initializeNewEvent(PhabricatorWorkerTrigger $trigger) { + $event = new PhabricatorWorkerTriggerEvent(); + $event->setTriggerID($trigger->getID()); + return $event; + } + +}