Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F15381116
D15688.id.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
15 KB
Referenced Files
None
Subscribers
None
D15688.id.diff
View Options
diff --git a/resources/sql/autopatches/20160411.repo.1.version.sql b/resources/sql/autopatches/20160411.repo.1.version.sql
new file mode 100644
--- /dev/null
+++ b/resources/sql/autopatches/20160411.repo.1.version.sql
@@ -0,0 +1,8 @@
+CREATE TABLE {$NAMESPACE}_repository.repository_workingcopyversion (
+ id INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ repositoryPHID VARBINARY(64) NOT NULL,
+ devicePHID VARBINARY(64) NOT NULL,
+ repositoryVersion INT UNSIGNED NOT NULL,
+ isWriting BOOL NOT NULL,
+ UNIQUE KEY `key_workingcopy` (repositoryPHID, devicePHID)
+) ENGINE=InnoDB, COLLATE {$COLLATE_TEXT};
diff --git a/src/__phutil_library_map__.php b/src/__phutil_library_map__.php
--- a/src/__phutil_library_map__.php
+++ b/src/__phutil_library_map__.php
@@ -3210,6 +3210,7 @@
'PhabricatorRepositoryURITestCase' => 'applications/repository/storage/__tests__/PhabricatorRepositoryURITestCase.php',
'PhabricatorRepositoryVCSPassword' => 'applications/repository/storage/PhabricatorRepositoryVCSPassword.php',
'PhabricatorRepositoryVersion' => 'applications/repository/constants/PhabricatorRepositoryVersion.php',
+ 'PhabricatorRepositoryWorkingCopyVersion' => 'applications/repository/storage/PhabricatorRepositoryWorkingCopyVersion.php',
'PhabricatorRequestExceptionHandler' => 'aphront/handler/PhabricatorRequestExceptionHandler.php',
'PhabricatorResourceSite' => 'aphront/site/PhabricatorResourceSite.php',
'PhabricatorRobotsController' => 'applications/system/controller/PhabricatorRobotsController.php',
@@ -7854,6 +7855,7 @@
'PhabricatorRepositoryURITestCase' => 'PhabricatorTestCase',
'PhabricatorRepositoryVCSPassword' => 'PhabricatorRepositoryDAO',
'PhabricatorRepositoryVersion' => 'Phobject',
+ 'PhabricatorRepositoryWorkingCopyVersion' => 'PhabricatorRepositoryDAO',
'PhabricatorRequestExceptionHandler' => 'AphrontRequestExceptionHandler',
'PhabricatorResourceSite' => 'PhabricatorSite',
'PhabricatorRobotsController' => 'PhabricatorController',
diff --git a/src/applications/diffusion/management/DiffusionRepositoryClusterManagementPanel.php b/src/applications/diffusion/management/DiffusionRepositoryClusterManagementPanel.php
--- a/src/applications/diffusion/management/DiffusionRepositoryClusterManagementPanel.php
+++ b/src/applications/diffusion/management/DiffusionRepositoryClusterManagementPanel.php
@@ -44,6 +44,16 @@
$bindings = $service->getBindings();
$bindings = mgroup($bindings, 'getDevicePHID');
+ // This is an unusual read which always comes from the master.
+ if (PhabricatorEnv::isReadOnly()) {
+ $versions = array();
+ } else {
+ $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
+ $repository->getPHID());
+ }
+
+ $versions = mpull($versions, null, 'getDevicePHID');
+
foreach ($bindings as $binding_group) {
$all_disabled = true;
foreach ($binding_group as $binding) {
@@ -73,6 +83,27 @@
$device = $any_binding->getDevice();
+ $version = idx($versions, $device->getPHID());
+ if ($version) {
+ $version_number = $version->getRepositoryVersion();
+ $version_number = phutil_tag(
+ 'a',
+ array(
+ 'href' => "/diffusion/pushlog/view/{$version_number}/",
+ ),
+ $version_number);
+ } else {
+ $version_number = '-';
+ }
+
+ if ($version && $version->getIsWriting()) {
+ $is_writing = id(new PHUIIconView())
+ ->setIcon('fa-pencil green');
+ } else {
+ $is_writing = id(new PHUIIconView())
+ ->setIcon('fa-pencil grey');
+ }
+
$rows[] = array(
$binding_icon,
phutil_tag(
@@ -81,6 +112,8 @@
'href' => $device->getURI(),
),
$device->getName()),
+ $version_number,
+ $is_writing,
);
}
}
@@ -91,11 +124,15 @@
array(
null,
pht('Device'),
+ pht('Version'),
+ pht('Writing'),
))
->setColumnClasses(
array(
null,
- 'wide',
+ null,
+ null,
+ 'right wide',
));
$doc_href = PhabricatorEnv::getDoclink('Cluster: Repositories');
diff --git a/src/applications/diffusion/ssh/DiffusionGitReceivePackSSHWorkflow.php b/src/applications/diffusion/ssh/DiffusionGitReceivePackSSHWorkflow.php
--- a/src/applications/diffusion/ssh/DiffusionGitReceivePackSSHWorkflow.php
+++ b/src/applications/diffusion/ssh/DiffusionGitReceivePackSSHWorkflow.php
@@ -21,8 +21,12 @@
if ($this->shouldProxy()) {
$command = $this->getProxyCommand();
+ $is_proxy = true;
} else {
$command = csprintf('git-receive-pack %s', $repository->getLocalPath());
+ $is_proxy = false;
+
+ $repository->synchronizeWorkingCopyBeforeWrite();
}
$command = PhabricatorDaemon::sudoCommandAsDaemonUser($command);
@@ -41,6 +45,10 @@
$this->waitForGitClient();
}
+ if (!$is_proxy) {
+ $repository->synchronizeWorkingCopyAfterWrite();
+ }
+
return $err;
}
diff --git a/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php b/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php
--- a/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php
+++ b/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php
@@ -20,6 +20,7 @@
$command = $this->getProxyCommand();
} else {
$command = csprintf('git-upload-pack -- %s', $repository->getLocalPath());
+ $repository->synchronizeWorkingCopyBeforeRead();
}
$command = PhabricatorDaemon::sudoCommandAsDaemonUser($command);
diff --git a/src/applications/repository/storage/PhabricatorRepository.php b/src/applications/repository/storage/PhabricatorRepository.php
--- a/src/applications/repository/storage/PhabricatorRepository.php
+++ b/src/applications/repository/storage/PhabricatorRepository.php
@@ -3,6 +3,7 @@
/**
* @task uri Repository URI Management
* @task autoclose Autoclose
+ * @task sync Cluster Synchronization
*/
final class PhabricatorRepository extends PhabricatorRepositoryDAO
implements
@@ -62,6 +63,9 @@
private $mostRecentCommit = self::ATTACHABLE;
private $projectPHIDs = self::ATTACHABLE;
+ private $clusterWriteLock;
+ private $clusterWriteVersion;
+
public static function initializeNewRepository(PhabricatorUser $actor) {
$app = id(new PhabricatorApplicationQuery())
->setViewer($actor)
@@ -2262,6 +2266,161 @@
}
+/* -( Cluster Synchronization )-------------------------------------------- */
+
+
+ /**
+ * @task sync
+ */
+ public function synchronizeWorkingCopyBeforeRead() {
+ $device = AlmanacKeys::getLiveDevice();
+ if (!$device) {
+ return;
+ }
+
+ $repository_phid = $this->getPHID();
+ $device_phid = $device->getPHID();
+
+ $read_lock = PhabricatorRepositoryWorkingCopyVersion::getReadLock(
+ $repository_phid,
+ $device_phid);
+
+ // TODO: Raise a more useful exception if we fail to grab this lock.
+ $read_lock->lock(phutil_units('2 minutes in seconds'));
+
+ $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
+ $repository_phid);
+ $versions = mpull($versions, null, 'getDevicePHID');
+
+ $this_version = idx($versions, $device_phid);
+ if ($this_version) {
+ $this_version = (int)$this_version->getRepositoryVersion();
+ } else {
+ $this_version = 0;
+ }
+
+ if ($versions) {
+ $max_version = (int)max(mpull($versions, 'getRepositoryVersion'));
+ } else {
+ $max_version = 0;
+ }
+
+ if ($max_version > $this_version) {
+ $fetchable = array();
+ foreach ($versions as $version) {
+ if ($version->getRepositoryVersion() == $max_version) {
+ $fetchable[] = $version->getDevicePHID();
+ }
+ }
+
+ // TODO: Actualy fetch the newer version from one of the nodes which has
+ // it.
+
+ PhabricatorRepositoryWorkingCopyVersion::updateVersion(
+ $repository_phid,
+ $device_phid,
+ $max_version);
+ }
+
+ $read_lock->unlock();
+
+ return $max_version;
+ }
+
+
+ /**
+ * @task sync
+ */
+ public function synchronizeWorkingCopyBeforeWrite() {
+ $device = AlmanacKeys::getLiveDevice();
+ if (!$device) {
+ return;
+ }
+
+ $repository_phid = $this->getPHID();
+ $device_phid = $device->getPHID();
+
+ $write_lock = PhabricatorRepositoryWorkingCopyVersion::getWriteLock(
+ $repository_phid);
+
+ // TODO: Raise a more useful exception if we fail to grab this lock.
+ $write_lock->lock(phutil_units('2 minutes in seconds'));
+
+ $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
+ $repository_phid);
+ foreach ($versions as $version) {
+ if (!$version->getIsWriting()) {
+ continue;
+ }
+
+ // TODO: This should provide more help so users can resolve the issue.
+ throw new Exception(
+ pht(
+ 'An incomplete write was previously performed to this repository; '.
+ 'refusing new writes.'));
+ }
+
+ $max_version = $this->synchronizeWorkingCopyBeforeRead();
+
+ PhabricatorRepositoryWorkingCopyVersion::willWrite(
+ $repository_phid,
+ $device_phid);
+
+ $this->clusterWriteVersion = $max_version;
+ $this->clusterWriteLock = $write_lock;
+ }
+
+
+ /**
+ * @task sync
+ */
+ public function synchronizeWorkingCopyAfterWrite() {
+ if (!$this->clusterWriteLock) {
+ throw new Exception(
+ pht(
+ 'Trying to synchronize after write, but not holding a write '.
+ 'lock!'));
+ }
+
+ $device = AlmanacKeys::getLiveDevice();
+ if (!$device) {
+ throw new Exception(
+ pht(
+ 'Trying to synchronize after write, but this host is not an '.
+ 'Almanac device.'));
+ }
+
+ $repository_phid = $this->getPHID();
+ $device_phid = $device->getPHID();
+
+ // NOTE: This means we're still bumping the version when pushes fail. We
+ // could select only un-rejected events instead to bump a little less
+ // often.
+
+ $new_log = id(new PhabricatorRepositoryPushEventQuery())
+ ->setViewer(PhabricatorUser::getOmnipotentUser())
+ ->withRepositoryPHIDs(array($repository_phid))
+ ->setLimit(1)
+ ->executeOne();
+
+ $old_version = $this->clusterWriteVersion;
+ if ($new_log) {
+ $new_version = $new_log->getID();
+ } else {
+ $new_version = $old_version;
+ }
+
+ PhabricatorRepositoryWorkingCopyVersion::didWrite(
+ $repository_phid,
+ $device_phid,
+ $this->clusterWriteVersion,
+ $new_log->getID());
+
+ $this->clusterWriteLock->unlock();
+ $this->clusterWriteLock = null;
+ }
+
+
/* -( Symbols )-------------------------------------------------------------*/
public function getSymbolSources() {
diff --git a/src/applications/repository/storage/PhabricatorRepositoryWorkingCopyVersion.php b/src/applications/repository/storage/PhabricatorRepositoryWorkingCopyVersion.php
new file mode 100644
--- /dev/null
+++ b/src/applications/repository/storage/PhabricatorRepositoryWorkingCopyVersion.php
@@ -0,0 +1,145 @@
+<?php
+
+final class PhabricatorRepositoryWorkingCopyVersion
+ extends PhabricatorRepositoryDAO {
+
+ protected $repositoryPHID;
+ protected $devicePHID;
+ protected $repositoryVersion;
+ protected $isWriting;
+
+ protected function getConfiguration() {
+ return array(
+ self::CONFIG_TIMESTAMPS => false,
+ self::CONFIG_COLUMN_SCHEMA => array(
+ 'repositoryVersion' => 'uint32',
+ 'isWriting' => 'bool',
+ ),
+ self::CONFIG_KEY_SCHEMA => array(
+ 'key_workingcopy' => array(
+ 'columns' => array('repositoryPHID', 'devicePHID'),
+ 'unique' => true,
+ ),
+ ),
+ ) + parent::getConfiguration();
+ }
+
+ public static function loadVersions($repository_phid) {
+ $version = new self();
+ $conn_w = $version->establishConnection('w');
+ $table = $version->getTableName();
+
+ // This is a normal read, but force it to come from the master.
+ $rows = queryfx_all(
+ $conn_w,
+ 'SELECT * FROM %T WHERE repositoryPHID = %s',
+ $table,
+ $repository_phid);
+
+ return $version->loadAllFromArray($rows);
+ }
+
+ public static function getReadLock($repository_phid, $device_phid) {
+ $repository_hash = PhabricatorHash::digestForIndex($repository_phid);
+ $device_hash = PhabricatorHash::digestForIndex($device_phid);
+ $lock_key = "repo.read({$repository_hash}, {$device_hash})";
+
+ return PhabricatorGlobalLock::newLock($lock_key);
+ }
+
+ public static function getWriteLock($repository_phid) {
+ $repository_hash = PhabricatorHash::digestForIndex($repository_phid);
+ $lock_key = "repo.write({$repository_hash})";
+
+ return PhabricatorGlobalLock::newLock($lock_key);
+ }
+
+
+ /**
+ * Before a write, set the "isWriting" flag.
+ *
+ * This allows us to detect when we lose a node partway through a write and
+ * may have committed and acknowledged a write on a node that lost the lock
+ * partway through the write and is no longer reachable.
+ *
+ * In particular, if a node loses its connection to the datbase the global
+ * lock is released by default. This is a durable lock which stays locked
+ * by default.
+ */
+ public static function willWrite($repository_phid, $device_phid) {
+ $version = new self();
+ $conn_w = $version->establishConnection('w');
+ $table = $version->getTableName();
+
+ queryfx(
+ $conn_w,
+ 'INSERT INTO %T
+ (repositoryPHID, devicePHID, repositoryVersion, isWriting)
+ VALUES
+ (%s, %s, %d, %d)
+ ON DUPLICATE KEY UPDATE
+ isWriting = VALUES(isWriting)',
+ $table,
+ $repository_phid,
+ $device_phid,
+ 1,
+ 1);
+ }
+
+
+ /**
+ * After a write, update the version and release the "isWriting" lock.
+ */
+ public static function didWrite(
+ $repository_phid,
+ $device_phid,
+ $old_version,
+ $new_version) {
+ $version = new self();
+ $conn_w = $version->establishConnection('w');
+ $table = $version->getTableName();
+
+ queryfx(
+ $conn_w,
+ 'UPDATE %T SET repositoryVersion = %d, isWriting = 0
+ WHERE
+ repositoryPHID = %s AND
+ devicePHID = %s AND
+ repositoryVersion = %d AND
+ isWriting = 1',
+ $table,
+ $new_version,
+ $repository_phid,
+ $device_phid,
+ $old_version);
+ }
+
+
+ /**
+ * After a fetch, set the local version to the fetched version.
+ */
+ public static function updateVersion(
+ $repository_phid,
+ $device_phid,
+ $new_version) {
+ $version = new self();
+ $conn_w = $version->establishConnection('w');
+ $table = $version->getTableName();
+
+ queryfx(
+ $conn_w,
+ 'INSERT INTO %T
+ (repositoryPHID, devicePHID, repositoryVersion, isWriting)
+ VALUES
+ (%s, %s, %d, %d)
+ ON DUPLICATE KEY UPDATE
+ repositoryVersion = VALUES(repositoryVersion)',
+ $table,
+ $repository_phid,
+ $device_phid,
+ $new_version,
+ 0);
+ }
+
+
+}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Mar 15, 5:58 AM (3 w, 1 d ago)
Storage Engine
blob
Storage Format
Encrypted (AES-256-CBC)
Storage Handle
7684913
Default Alt Text
D15688.id.diff (15 KB)
Attached To
Mode
D15688: Move toward multi-master replicated repositories
Attached
Detach File
Event Timeline
Log In to Comment