Page MenuHomePhabricator

D15688.id.diff
No OneTemporary

D15688.id.diff

diff --git a/resources/sql/autopatches/20160411.repo.1.version.sql b/resources/sql/autopatches/20160411.repo.1.version.sql
new file mode 100644
--- /dev/null
+++ b/resources/sql/autopatches/20160411.repo.1.version.sql
@@ -0,0 +1,8 @@
+CREATE TABLE {$NAMESPACE}_repository.repository_workingcopyversion (
+ id INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ repositoryPHID VARBINARY(64) NOT NULL,
+ devicePHID VARBINARY(64) NOT NULL,
+ repositoryVersion INT UNSIGNED NOT NULL,
+ isWriting BOOL NOT NULL,
+ UNIQUE KEY `key_workingcopy` (repositoryPHID, devicePHID)
+) ENGINE=InnoDB, COLLATE {$COLLATE_TEXT};
diff --git a/src/__phutil_library_map__.php b/src/__phutil_library_map__.php
--- a/src/__phutil_library_map__.php
+++ b/src/__phutil_library_map__.php
@@ -3210,6 +3210,7 @@
'PhabricatorRepositoryURITestCase' => 'applications/repository/storage/__tests__/PhabricatorRepositoryURITestCase.php',
'PhabricatorRepositoryVCSPassword' => 'applications/repository/storage/PhabricatorRepositoryVCSPassword.php',
'PhabricatorRepositoryVersion' => 'applications/repository/constants/PhabricatorRepositoryVersion.php',
+ 'PhabricatorRepositoryWorkingCopyVersion' => 'applications/repository/storage/PhabricatorRepositoryWorkingCopyVersion.php',
'PhabricatorRequestExceptionHandler' => 'aphront/handler/PhabricatorRequestExceptionHandler.php',
'PhabricatorResourceSite' => 'aphront/site/PhabricatorResourceSite.php',
'PhabricatorRobotsController' => 'applications/system/controller/PhabricatorRobotsController.php',
@@ -7854,6 +7855,7 @@
'PhabricatorRepositoryURITestCase' => 'PhabricatorTestCase',
'PhabricatorRepositoryVCSPassword' => 'PhabricatorRepositoryDAO',
'PhabricatorRepositoryVersion' => 'Phobject',
+ 'PhabricatorRepositoryWorkingCopyVersion' => 'PhabricatorRepositoryDAO',
'PhabricatorRequestExceptionHandler' => 'AphrontRequestExceptionHandler',
'PhabricatorResourceSite' => 'PhabricatorSite',
'PhabricatorRobotsController' => 'PhabricatorController',
diff --git a/src/applications/diffusion/management/DiffusionRepositoryClusterManagementPanel.php b/src/applications/diffusion/management/DiffusionRepositoryClusterManagementPanel.php
--- a/src/applications/diffusion/management/DiffusionRepositoryClusterManagementPanel.php
+++ b/src/applications/diffusion/management/DiffusionRepositoryClusterManagementPanel.php
@@ -44,6 +44,16 @@
$bindings = $service->getBindings();
$bindings = mgroup($bindings, 'getDevicePHID');
+ // This is an unusual read which always comes from the master.
+ if (PhabricatorEnv::isReadOnly()) {
+ $versions = array();
+ } else {
+ $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
+ $repository->getPHID());
+ }
+
+ $versions = mpull($versions, null, 'getDevicePHID');
+
foreach ($bindings as $binding_group) {
$all_disabled = true;
foreach ($binding_group as $binding) {
@@ -73,6 +83,27 @@
$device = $any_binding->getDevice();
+ $version = idx($versions, $device->getPHID());
+ if ($version) {
+ $version_number = $version->getRepositoryVersion();
+ $version_number = phutil_tag(
+ 'a',
+ array(
+ 'href' => "/diffusion/pushlog/view/{$version_number}/",
+ ),
+ $version_number);
+ } else {
+ $version_number = '-';
+ }
+
+ if ($version && $version->getIsWriting()) {
+ $is_writing = id(new PHUIIconView())
+ ->setIcon('fa-pencil green');
+ } else {
+ $is_writing = id(new PHUIIconView())
+ ->setIcon('fa-pencil grey');
+ }
+
$rows[] = array(
$binding_icon,
phutil_tag(
@@ -81,6 +112,8 @@
'href' => $device->getURI(),
),
$device->getName()),
+ $version_number,
+ $is_writing,
);
}
}
@@ -91,11 +124,15 @@
array(
null,
pht('Device'),
+ pht('Version'),
+ pht('Writing'),
))
->setColumnClasses(
array(
null,
- 'wide',
+ null,
+ null,
+ 'right wide',
));
$doc_href = PhabricatorEnv::getDoclink('Cluster: Repositories');
diff --git a/src/applications/diffusion/ssh/DiffusionGitReceivePackSSHWorkflow.php b/src/applications/diffusion/ssh/DiffusionGitReceivePackSSHWorkflow.php
--- a/src/applications/diffusion/ssh/DiffusionGitReceivePackSSHWorkflow.php
+++ b/src/applications/diffusion/ssh/DiffusionGitReceivePackSSHWorkflow.php
@@ -21,8 +21,12 @@
if ($this->shouldProxy()) {
$command = $this->getProxyCommand();
+ $is_proxy = true;
} else {
$command = csprintf('git-receive-pack %s', $repository->getLocalPath());
+ $is_proxy = false;
+
+ $repository->synchronizeWorkingCopyBeforeWrite();
}
$command = PhabricatorDaemon::sudoCommandAsDaemonUser($command);
@@ -41,6 +45,10 @@
$this->waitForGitClient();
}
+ if (!$is_proxy) {
+ $repository->synchronizeWorkingCopyAfterWrite();
+ }
+
return $err;
}
diff --git a/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php b/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php
--- a/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php
+++ b/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php
@@ -20,6 +20,7 @@
$command = $this->getProxyCommand();
} else {
$command = csprintf('git-upload-pack -- %s', $repository->getLocalPath());
+ $repository->synchronizeWorkingCopyBeforeRead();
}
$command = PhabricatorDaemon::sudoCommandAsDaemonUser($command);
diff --git a/src/applications/repository/storage/PhabricatorRepository.php b/src/applications/repository/storage/PhabricatorRepository.php
--- a/src/applications/repository/storage/PhabricatorRepository.php
+++ b/src/applications/repository/storage/PhabricatorRepository.php
@@ -3,6 +3,7 @@
/**
* @task uri Repository URI Management
* @task autoclose Autoclose
+ * @task sync Cluster Synchronization
*/
final class PhabricatorRepository extends PhabricatorRepositoryDAO
implements
@@ -62,6 +63,9 @@
private $mostRecentCommit = self::ATTACHABLE;
private $projectPHIDs = self::ATTACHABLE;
+ private $clusterWriteLock;
+ private $clusterWriteVersion;
+
public static function initializeNewRepository(PhabricatorUser $actor) {
$app = id(new PhabricatorApplicationQuery())
->setViewer($actor)
@@ -2262,6 +2266,161 @@
}
+/* -( Cluster Synchronization )-------------------------------------------- */
+
+
+ /**
+ * @task sync
+ */
+ public function synchronizeWorkingCopyBeforeRead() {
+ $device = AlmanacKeys::getLiveDevice();
+ if (!$device) {
+ return;
+ }
+
+ $repository_phid = $this->getPHID();
+ $device_phid = $device->getPHID();
+
+ $read_lock = PhabricatorRepositoryWorkingCopyVersion::getReadLock(
+ $repository_phid,
+ $device_phid);
+
+ // TODO: Raise a more useful exception if we fail to grab this lock.
+ $read_lock->lock(phutil_units('2 minutes in seconds'));
+
+ $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
+ $repository_phid);
+ $versions = mpull($versions, null, 'getDevicePHID');
+
+ $this_version = idx($versions, $device_phid);
+ if ($this_version) {
+ $this_version = (int)$this_version->getRepositoryVersion();
+ } else {
+ $this_version = 0;
+ }
+
+ if ($versions) {
+ $max_version = (int)max(mpull($versions, 'getRepositoryVersion'));
+ } else {
+ $max_version = 0;
+ }
+
+ if ($max_version > $this_version) {
+ $fetchable = array();
+ foreach ($versions as $version) {
+ if ($version->getRepositoryVersion() == $max_version) {
+ $fetchable[] = $version->getDevicePHID();
+ }
+ }
+
+ // TODO: Actualy fetch the newer version from one of the nodes which has
+ // it.
+
+ PhabricatorRepositoryWorkingCopyVersion::updateVersion(
+ $repository_phid,
+ $device_phid,
+ $max_version);
+ }
+
+ $read_lock->unlock();
+
+ return $max_version;
+ }
+
+
+ /**
+ * @task sync
+ */
+ public function synchronizeWorkingCopyBeforeWrite() {
+ $device = AlmanacKeys::getLiveDevice();
+ if (!$device) {
+ return;
+ }
+
+ $repository_phid = $this->getPHID();
+ $device_phid = $device->getPHID();
+
+ $write_lock = PhabricatorRepositoryWorkingCopyVersion::getWriteLock(
+ $repository_phid);
+
+ // TODO: Raise a more useful exception if we fail to grab this lock.
+ $write_lock->lock(phutil_units('2 minutes in seconds'));
+
+ $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
+ $repository_phid);
+ foreach ($versions as $version) {
+ if (!$version->getIsWriting()) {
+ continue;
+ }
+
+ // TODO: This should provide more help so users can resolve the issue.
+ throw new Exception(
+ pht(
+ 'An incomplete write was previously performed to this repository; '.
+ 'refusing new writes.'));
+ }
+
+ $max_version = $this->synchronizeWorkingCopyBeforeRead();
+
+ PhabricatorRepositoryWorkingCopyVersion::willWrite(
+ $repository_phid,
+ $device_phid);
+
+ $this->clusterWriteVersion = $max_version;
+ $this->clusterWriteLock = $write_lock;
+ }
+
+
+ /**
+ * @task sync
+ */
+ public function synchronizeWorkingCopyAfterWrite() {
+ if (!$this->clusterWriteLock) {
+ throw new Exception(
+ pht(
+ 'Trying to synchronize after write, but not holding a write '.
+ 'lock!'));
+ }
+
+ $device = AlmanacKeys::getLiveDevice();
+ if (!$device) {
+ throw new Exception(
+ pht(
+ 'Trying to synchronize after write, but this host is not an '.
+ 'Almanac device.'));
+ }
+
+ $repository_phid = $this->getPHID();
+ $device_phid = $device->getPHID();
+
+ // NOTE: This means we're still bumping the version when pushes fail. We
+ // could select only un-rejected events instead to bump a little less
+ // often.
+
+ $new_log = id(new PhabricatorRepositoryPushEventQuery())
+ ->setViewer(PhabricatorUser::getOmnipotentUser())
+ ->withRepositoryPHIDs(array($repository_phid))
+ ->setLimit(1)
+ ->executeOne();
+
+ $old_version = $this->clusterWriteVersion;
+ if ($new_log) {
+ $new_version = $new_log->getID();
+ } else {
+ $new_version = $old_version;
+ }
+
+ PhabricatorRepositoryWorkingCopyVersion::didWrite(
+ $repository_phid,
+ $device_phid,
+ $this->clusterWriteVersion,
+ $new_log->getID());
+
+ $this->clusterWriteLock->unlock();
+ $this->clusterWriteLock = null;
+ }
+
+
/* -( Symbols )-------------------------------------------------------------*/
public function getSymbolSources() {
diff --git a/src/applications/repository/storage/PhabricatorRepositoryWorkingCopyVersion.php b/src/applications/repository/storage/PhabricatorRepositoryWorkingCopyVersion.php
new file mode 100644
--- /dev/null
+++ b/src/applications/repository/storage/PhabricatorRepositoryWorkingCopyVersion.php
@@ -0,0 +1,145 @@
+<?php
+
+final class PhabricatorRepositoryWorkingCopyVersion
+ extends PhabricatorRepositoryDAO {
+
+ protected $repositoryPHID;
+ protected $devicePHID;
+ protected $repositoryVersion;
+ protected $isWriting;
+
+ protected function getConfiguration() {
+ return array(
+ self::CONFIG_TIMESTAMPS => false,
+ self::CONFIG_COLUMN_SCHEMA => array(
+ 'repositoryVersion' => 'uint32',
+ 'isWriting' => 'bool',
+ ),
+ self::CONFIG_KEY_SCHEMA => array(
+ 'key_workingcopy' => array(
+ 'columns' => array('repositoryPHID', 'devicePHID'),
+ 'unique' => true,
+ ),
+ ),
+ ) + parent::getConfiguration();
+ }
+
+ public static function loadVersions($repository_phid) {
+ $version = new self();
+ $conn_w = $version->establishConnection('w');
+ $table = $version->getTableName();
+
+ // This is a normal read, but force it to come from the master.
+ $rows = queryfx_all(
+ $conn_w,
+ 'SELECT * FROM %T WHERE repositoryPHID = %s',
+ $table,
+ $repository_phid);
+
+ return $version->loadAllFromArray($rows);
+ }
+
+ public static function getReadLock($repository_phid, $device_phid) {
+ $repository_hash = PhabricatorHash::digestForIndex($repository_phid);
+ $device_hash = PhabricatorHash::digestForIndex($device_phid);
+ $lock_key = "repo.read({$repository_hash}, {$device_hash})";
+
+ return PhabricatorGlobalLock::newLock($lock_key);
+ }
+
+ public static function getWriteLock($repository_phid) {
+ $repository_hash = PhabricatorHash::digestForIndex($repository_phid);
+ $lock_key = "repo.write({$repository_hash})";
+
+ return PhabricatorGlobalLock::newLock($lock_key);
+ }
+
+
+ /**
+ * Before a write, set the "isWriting" flag.
+ *
+ * This allows us to detect when we lose a node partway through a write and
+ * may have committed and acknowledged a write on a node that lost the lock
+ * partway through the write and is no longer reachable.
+ *
+ * In particular, if a node loses its connection to the datbase the global
+ * lock is released by default. This is a durable lock which stays locked
+ * by default.
+ */
+ public static function willWrite($repository_phid, $device_phid) {
+ $version = new self();
+ $conn_w = $version->establishConnection('w');
+ $table = $version->getTableName();
+
+ queryfx(
+ $conn_w,
+ 'INSERT INTO %T
+ (repositoryPHID, devicePHID, repositoryVersion, isWriting)
+ VALUES
+ (%s, %s, %d, %d)
+ ON DUPLICATE KEY UPDATE
+ isWriting = VALUES(isWriting)',
+ $table,
+ $repository_phid,
+ $device_phid,
+ 1,
+ 1);
+ }
+
+
+ /**
+ * After a write, update the version and release the "isWriting" lock.
+ */
+ public static function didWrite(
+ $repository_phid,
+ $device_phid,
+ $old_version,
+ $new_version) {
+ $version = new self();
+ $conn_w = $version->establishConnection('w');
+ $table = $version->getTableName();
+
+ queryfx(
+ $conn_w,
+ 'UPDATE %T SET repositoryVersion = %d, isWriting = 0
+ WHERE
+ repositoryPHID = %s AND
+ devicePHID = %s AND
+ repositoryVersion = %d AND
+ isWriting = 1',
+ $table,
+ $new_version,
+ $repository_phid,
+ $device_phid,
+ $old_version);
+ }
+
+
+ /**
+ * After a fetch, set the local version to the fetched version.
+ */
+ public static function updateVersion(
+ $repository_phid,
+ $device_phid,
+ $new_version) {
+ $version = new self();
+ $conn_w = $version->establishConnection('w');
+ $table = $version->getTableName();
+
+ queryfx(
+ $conn_w,
+ 'INSERT INTO %T
+ (repositoryPHID, devicePHID, repositoryVersion, isWriting)
+ VALUES
+ (%s, %s, %d, %d)
+ ON DUPLICATE KEY UPDATE
+ repositoryVersion = VALUES(repositoryVersion)',
+ $table,
+ $repository_phid,
+ $device_phid,
+ $new_version,
+ 0);
+ }
+
+
+}

File Metadata

Mime Type
text/plain
Expires
Sat, Mar 15, 5:58 AM (3 w, 1 d ago)
Storage Engine
blob
Storage Format
Encrypted (AES-256-CBC)
Storage Handle
7684913
Default Alt Text
D15688.id.diff (15 KB)

Event Timeline