diff --git a/src/aphront/storage/connection/AphrontDatabaseConnection.php b/src/aphront/storage/connection/AphrontDatabaseConnection.php index e1a8bfd..49dfb66 100644 --- a/src/aphront/storage/connection/AphrontDatabaseConnection.php +++ b/src/aphront/storage/connection/AphrontDatabaseConnection.php @@ -1,268 +1,284 @@ close(); + } + final public function setLastActiveEpoch($epoch) { $this->lastActiveEpoch = $epoch; return $this; } final public function getLastActiveEpoch() { return $this->lastActiveEpoch; } + final public function setPersistent($persistent) { + $this->persistent = $persistent; + return $this; + } + + final public function getPersistent() { + return $this->persistent; + } + public function queryData($pattern/* , $arg, $arg, ... */) { $args = func_get_args(); array_unshift($args, $this); return call_user_func_array('queryfx_all', $args); } public function query($pattern/* , $arg, $arg, ... */) { $args = func_get_args(); array_unshift($args, $this); return call_user_func_array('queryfx', $args); } public function supportsAsyncQueries() { return false; } public function supportsParallelQueries() { return false; } public function setReadOnly($read_only) { $this->readOnly = $read_only; return $this; } public function getReadOnly() { return $this->readOnly; } public function setQueryTimeout($query_timeout) { $this->queryTimeout = $query_timeout; return $this; } public function getQueryTimeout() { return $this->queryTimeout; } public function asyncQuery($raw_query) { throw new Exception(pht('Async queries are not supported.')); } public static function resolveAsyncQueries(array $conns, array $asyncs) { throw new Exception(pht('Async queries are not supported.')); } /* -( Global Locks )------------------------------------------------------- */ public function rememberLock($lock) { if (isset($this->locks[$lock])) { throw new Exception( pht( 'Trying to remember lock "%s", but this lock has already been '. 'remembered.', $lock)); } $this->locks[$lock] = true; return $this; } public function forgetLock($lock) { if (empty($this->locks[$lock])) { throw new Exception( pht( 'Trying to forget lock "%s", but this connection does not remember '. 'that lock.', $lock)); } unset($this->locks[$lock]); return $this; } public function forgetAllLocks() { $this->locks = array(); return $this; } public function isHoldingAnyLock() { return (bool)$this->locks; } /* -( Transaction Management )--------------------------------------------- */ /** * Begin a transaction, or set a savepoint if the connection is already * transactional. * * @return this * @task xaction */ public function openTransaction() { $state = $this->getTransactionState(); $point = $state->getSavepointName(); $depth = $state->getDepth(); $new_transaction = ($depth == 0); if ($new_transaction) { $this->query('START TRANSACTION'); } else { $this->query('SAVEPOINT '.$point); } $state->increaseDepth(); return $this; } /** * Commit a transaction, or stage a savepoint for commit once the entire * transaction completes if inside a transaction stack. * * @return this * @task xaction */ public function saveTransaction() { $state = $this->getTransactionState(); $depth = $state->decreaseDepth(); if ($depth == 0) { $this->query('COMMIT'); } return $this; } /** * Rollback a transaction, or unstage the last savepoint if inside a * transaction stack. * * @return this */ public function killTransaction() { $state = $this->getTransactionState(); $depth = $state->decreaseDepth(); if ($depth == 0) { $this->query('ROLLBACK'); } else { $this->query('ROLLBACK TO SAVEPOINT '.$state->getSavepointName()); } return $this; } /** * Returns true if the connection is transactional. * * @return bool True if the connection is currently transactional. * @task xaction */ public function isInsideTransaction() { $state = $this->getTransactionState(); return ($state->getDepth() > 0); } /** * Get the current @{class:AphrontDatabaseTransactionState} object, or create * one if none exists. * * @return AphrontDatabaseTransactionState Current transaction state. * @task xaction */ protected function getTransactionState() { if (!$this->transactionState) { $this->transactionState = new AphrontDatabaseTransactionState(); } return $this->transactionState; } /** * @task xaction */ public function beginReadLocking() { $this->getTransactionState()->beginReadLocking(); return $this; } /** * @task xaction */ public function endReadLocking() { $this->getTransactionState()->endReadLocking(); return $this; } /** * @task xaction */ public function isReadLocking() { return $this->getTransactionState()->isReadLocking(); } /** * @task xaction */ public function beginWriteLocking() { $this->getTransactionState()->beginWriteLocking(); return $this; } /** * @task xaction */ public function endWriteLocking() { $this->getTransactionState()->endWriteLocking(); return $this; } /** * @task xaction */ public function isWriteLocking() { return $this->getTransactionState()->isWriteLocking(); } } diff --git a/src/aphront/storage/connection/mysql/AphrontMySQLiDatabaseConnection.php b/src/aphront/storage/connection/mysql/AphrontMySQLiDatabaseConnection.php index 240dff4..9aa0262 100644 --- a/src/aphront/storage/connection/mysql/AphrontMySQLiDatabaseConnection.php +++ b/src/aphront/storage/connection/mysql/AphrontMySQLiDatabaseConnection.php @@ -1,199 +1,203 @@ validateUTF8String($string); return $this->escapeBinaryString($string); } public function escapeBinaryString($string) { return $this->requireConnection()->escape_string($string); } public function getInsertID() { return $this->requireConnection()->insert_id; } public function getAffectedRows() { return $this->requireConnection()->affected_rows; } protected function closeConnection() { $this->requireConnection()->close(); } protected function connect() { if (!class_exists('mysqli', false)) { throw new Exception(pht( 'About to call new %s, but the PHP MySQLi extension is not available!', 'mysqli()')); } $user = $this->getConfiguration('user'); $host = $this->getConfiguration('host'); $port = $this->getConfiguration('port'); $database = $this->getConfiguration('database'); $pass = $this->getConfiguration('pass'); if ($pass instanceof PhutilOpaqueEnvelope) { $pass = $pass->openEnvelope(); } // If the host is "localhost", the port is ignored and mysqli attempts to // connect over a socket. if ($port) { if ($host === 'localhost' || $host === null) { $host = '127.0.0.1'; } } $conn = mysqli_init(); $timeout = $this->getConfiguration('timeout'); if ($timeout) { $conn->options(MYSQLI_OPT_CONNECT_TIMEOUT, $timeout); } + if ($this->getPersistent()) { + $host = 'p:'.$host; + } + @$conn->real_connect( $host, $user, $pass, $database, $port); $errno = $conn->connect_errno; if ($errno) { $error = $conn->connect_error; $this->throwConnectionException($errno, $error, $user, $host); } $ok = @$conn->set_charset('utf8mb4'); if (!$ok) { $ok = $conn->set_charset('binary'); } return $conn; } protected function rawQuery($raw_query) { $conn = $this->requireConnection(); $time_limit = $this->getQueryTimeout(); // If we have a query time limit, run this query synchronously but use // the async API. This allows us to kill queries which take too long // without requiring any configuration on the server side. if ($time_limit && $this->supportsAsyncQueries()) { $conn->query($raw_query, MYSQLI_ASYNC); $read = array($conn); $error = array($conn); $reject = array($conn); $result = mysqli::poll($read, $error, $reject, $time_limit); if ($result === false) { $this->closeConnection(); throw new Exception( pht('Failed to poll mysqli connection!')); } else if ($result === 0) { $this->closeConnection(); throw new AphrontQueryTimeoutQueryException( pht( 'Query timed out after %s second(s)!', new PhutilNumber($time_limit))); } return @$conn->reap_async_query(); } return @$conn->query($raw_query); } protected function rawQueries(array $raw_queries) { $conn = $this->requireConnection(); $have_result = false; $results = array(); foreach ($raw_queries as $key => $raw_query) { if (!$have_result) { // End line in front of semicolon to allow single line comments at the // end of queries. $have_result = $conn->multi_query(implode("\n;\n\n", $raw_queries)); } else { $have_result = $conn->next_result(); } array_shift($raw_queries); $result = $conn->store_result(); if (!$result && !$this->getErrorCode($conn)) { $result = true; } $results[$key] = $this->processResult($result); } if ($conn->more_results()) { throw new Exception( pht('There are some results left in the result set.')); } return $results; } protected function freeResult($result) { $result->free_result(); } protected function fetchAssoc($result) { return $result->fetch_assoc(); } protected function getErrorCode($connection) { return $connection->errno; } protected function getErrorDescription($connection) { return $connection->error; } public function supportsAsyncQueries() { return defined('MYSQLI_ASYNC'); } public function asyncQuery($raw_query) { $this->checkWrite($raw_query); $async = $this->beginAsyncConnection(); $async->query($raw_query, MYSQLI_ASYNC); return $async; } public static function resolveAsyncQueries(array $conns, array $asyncs) { assert_instances_of($conns, __CLASS__); assert_instances_of($asyncs, 'mysqli'); $read = $error = $reject = array(); foreach ($asyncs as $async) { $read[] = $error[] = $reject[] = $async; } if (!mysqli::poll($read, $error, $reject, 0)) { return array(); } $results = array(); foreach ($read as $async) { $key = array_search($async, $asyncs, $strict = true); $conn = $conns[$key]; $conn->endAsyncConnection($async); $results[$key] = $conn->processResult($async->reap_async_query()); } return $results; } }