blob: e01e2e74f2a3056d24f8bd960463f724f6d12abe [file] [log] [blame] [edit]
/*
** 2017 April 24
**
** The author disclaims copyright to this source code. In place of
** a legal notice, here is a blessing:
**
** May you do good and not evil.
** May you find forgiveness for yourself and forgive others.
** May you share freely, never taking more than you give.
**
*************************************************************************
*/
#include "sqliteInt.h"
#ifdef SQLITE_SERVER_EDITION
/*
** Page-locking slot format:
**
** Assuming HMA_MAX_TRANSACTIONID is set to 16.
**
** The least-significant 16 bits are used for read locks. When a read
** lock is taken, the client sets the bit associated with its
** transaction-id.
**
** The next 5 bits are set to 0 if no client currently holds a write
** lock. Or to (transaction-id + 1) if a write lock is held.
**
** The next 8 bits are set to the number of transient-read locks
** currently held on the page.
*/
#define HMA_SLOT_RL_BITS 16 /* bits for Read Locks */
#define HMA_SLOT_WL_BITS 5 /* bits for Write Locks */
#define HMA_SLOT_TR_BITS 8 /* bits for Transient Reader locks */
#define HMA_SLOT_RLWL_BITS (HMA_SLOT_RL_BITS + HMA_SLOT_WL_BITS)
#define HMA_SLOT_RL_MASK ((1 << HMA_SLOT_RL_BITS)-1)
#define HMA_SLOT_WL_MASK (((1 << HMA_SLOT_WL_BITS)-1) << HMA_SLOT_RL_BITS)
#define HMA_SLOT_TR_MASK (((1 << HMA_SLOT_TR_BITS)-1) << HMA_SLOT_RLWL_BITS)
/* Number of page-locking slots */
#define HMA_PAGELOCK_SLOTS (256*1024)
/* Maximum concurrent read/write transactions */
#define HMA_MAX_TRANSACTIONID 16
/* Number of buckets in hash table used for MVCC in single-process mode */
#define HMA_HASH_SIZE 512
/*
** The argument to this macro is the value of a locking slot. It returns
** -1 if no client currently holds the write lock, or the transaction-id
** of the locker otherwise.
*/
#define slotGetWriter(v) ((((int)(v)&HMA_SLOT_WL_MASK) >> HMA_SLOT_RL_BITS) - 1)
/*
** The argument to this macro is the value of a locking slot. This macro
** returns the current number of slow reader clients reading the page.
*/
#define slotGetSlowReaders(v) (((v) & HMA_SLOT_TR_MASK) >> HMA_SLOT_RLWL_BITS)
#define slotReaderMask(v) ((v) & HMA_SLOT_RL_MASK)
#define fdOpen(pFd) ((pFd)->pMethods!=0)
/*
** Atomic CAS primitive used in multi-process mode. Equivalent to:
**
** int serverCompareAndSwap(u32 *ptr, u32 oldval, u32 newval){
** if( *ptr==oldval ){
** *ptr = newval;
** return 1;
** }
** return 0;
** }
*/
#define serverCompareAndSwap(ptr,oldval,newval) \
__sync_bool_compare_and_swap(ptr,oldval,newval)
typedef struct ServerDb ServerDb;
typedef struct ServerJournal ServerJournal;
struct ServerJournal {
char *zJournal;
sqlite3_file *jfd;
};
/*
** There is one instance of the following structure for each distinct
** database file opened in server mode by this process.
*/
struct ServerDb {
i64 aFileId[2]; /* Opaque VFS file-id */
ServerDb *pNext; /* Next db in this process */
int nClient; /* Current number of clients */
sqlite3_mutex *mutex; /* Non-recursive mutex */
/* Variables above this point are protected by the global mutex -
** serverEnterMutex()/LeaveMutex(). Those below this point are
** protected by the ServerDb.mutex mutex. */
int bInit; /* True once initialized */
u32 transmask; /* Bitmask of taken transaction ids */
u32 *aSlot; /* Array of page locking slots */
sqlite3_vfs *pVfs;
ServerJournal aJrnl[HMA_MAX_TRANSACTIONID];
u8 *aJrnlFdSpace;
void *pServerShm; /* SHMOPEN handle (multi-process only) */
u32 *aClient; /* Client "transaction active" flags */
int iNextCommit; /* Commit id for next pre-commit call */
Server *pCommit; /* List of connections currently commiting */
Server *pReader; /* Connections in slower-reader transaction */
ServerPage *pPgFirst; /* First (oldest) in list of pages */
ServerPage *pPgLast; /* Last (newest) in list of pages */
ServerPage *apPg[HMA_HASH_SIZE];/* Hash table of "old" page data */
ServerPage *pFree; /* List of free page buffers */
};
/*
** Once instance for each client connection open on a server mode database
** in this process.
*/
struct Server {
ServerDb *pDb; /* Database object */
Pager *pPager; /* Associated pager object */
int eTrans; /* One of the SERVER_TRANS_xxx values */
int iTransId; /* Current transaction id (or -1) */
int iCommitId; /* Current commit id (or 0) */
int nAlloc; /* Allocated size of aLock[] array */
int nLock; /* Number of entries in aLock[] */
u32 *aLock; /* Array of held locks */
Server *pNext; /* Next in pCommit or pReader list */
};
/*
** Global variables used by this module.
*/
struct ServerGlobal {
ServerDb *pDb; /* Linked list of all ServerDb objects */
};
static struct ServerGlobal g_server;
struct ServerFcntlArg {
void *h; /* Handle from SHMOPEN */
void *p; /* Mapping */
int i1; /* Integer value 1 */
int i2; /* Integer value 2 */
};
typedef struct ServerFcntlArg ServerFcntlArg;
/*
** Possible values for Server.eTrans.
*/
#define SERVER_TRANS_NONE 0
#define SERVER_TRANS_READONLY 1
#define SERVER_TRANS_READWRITE 2
/*
** Global mutex functions used by code in this file.
*/
static void serverEnterMutex(void){
sqlite3_mutex_enter(sqlite3MutexAlloc(SQLITE_MUTEX_STATIC_APP1));
}
static void serverLeaveMutex(void){
sqlite3_mutex_leave(sqlite3MutexAlloc(SQLITE_MUTEX_STATIC_APP1));
}
#if 0
static void serverAssertMutexHeld(void){
assert( sqlite3_mutex_held(sqlite3MutexAlloc(SQLITE_MUTEX_STATIC_APP1)) );
}
#endif
/*
** Locate the ServerDb object shared by all connections to the db identified
** by aFileId[2], increment its ref count and set pNew->pDb to point to it.
** In this context "locate" may mean to find an existing object or to
** allocate a new one.
*/
static int serverFindDatabase(Server *pNew, i64 *aFileId){
ServerDb *p;
int rc = SQLITE_OK;
serverEnterMutex();
for(p=g_server.pDb; p; p=p->pNext){
if( p->aFileId[0]==aFileId[0] && p->aFileId[1]==aFileId[1] ){
break;
}
}
if( p==0 ){
p = (ServerDb*)sqlite3MallocZero(sizeof(ServerDb));
if( p ){
p->mutex = sqlite3_mutex_alloc(SQLITE_MUTEX_FAST);
#if SQLITE_THREADSAFE!=0
if( p->mutex==0 ) rc = SQLITE_NOMEM_BKPT;
#endif
if( rc==SQLITE_NOMEM ){
sqlite3_free(p);
p = 0;
}else{
p->nClient = 1;
p->iNextCommit = 1;
p->aFileId[0] = aFileId[0];
p->aFileId[1] = aFileId[1];
p->pNext = g_server.pDb;
g_server.pDb = p;
}
}else{
rc = SQLITE_NOMEM_BKPT;
}
}else{
p->nClient++;
}
pNew->pDb = p;
serverLeaveMutex();
return rc;
}
/*
** Roll back journal iClient. This is a hot-journal rollback - the
** connection passed as the first argument does not currently have an
** open transaction that uses the journal (although it may have an
** open transaction that uses some other journal).
*/
static int serverClientRollback(Server *p, int iClient){
ServerDb *pDb = p->pDb;
ServerJournal *pJ = &pDb->aJrnl[iClient];
int bExist = 1;
int rc = SQLITE_OK;
/* If it is not exists on disk but is not already open, open the
** journal file in question. */
if( fdOpen(pJ->jfd)==0 ){
bExist = 0;
rc = sqlite3OsAccess(pDb->pVfs, pJ->zJournal, SQLITE_ACCESS_EXISTS,&bExist);
if( bExist && rc==SQLITE_OK ){
int flags = SQLITE_OPEN_READWRITE|SQLITE_OPEN_MAIN_JOURNAL;
rc = sqlite3OsOpen(pDb->pVfs, pJ->zJournal, pJ->jfd, flags, &flags);
}
}
if( bExist && rc==SQLITE_OK ){
rc = sqlite3PagerRollbackJournal(p->pPager, pJ->jfd);
}
return rc;
}
/*
** Free all resources allocated by serverInitDatabase() associated with the
** object passed as the only argument.
*/
static void serverShutdownDatabase(
Server *p,
sqlite3_file *dbfd,
int bDelete
){
ServerDb *pDb = p->pDb;
int i;
assert( pDb->pServerShm || bDelete );
for(i=0; i<HMA_MAX_TRANSACTIONID; i++){
ServerJournal *pJ = &pDb->aJrnl[i];
if( bDelete && (pDb->pServerShm || fdOpen(pJ->jfd)) ){
int rc = serverClientRollback(p, i);
if( rc!=SQLITE_OK ) bDelete = 0;
}
if( fdOpen(pJ->jfd) ){
sqlite3OsClose(pJ->jfd);
if( bDelete ) sqlite3OsDelete(pDb->pVfs, pJ->zJournal, 0);
}
sqlite3_free(pJ->zJournal);
}
memset(pDb->aJrnl, 0, sizeof(ServerJournal)*HMA_MAX_TRANSACTIONID);
if( pDb->aJrnlFdSpace ){
sqlite3_free(pDb->aJrnlFdSpace);
pDb->aJrnlFdSpace = 0;
}
if( pDb->pServerShm ){
ServerFcntlArg arg;
memset(&arg, 0, sizeof(ServerFcntlArg));
arg.h = pDb->pServerShm;
sqlite3OsFileControl(dbfd, SQLITE_FCNTL_SERVER_SHMCLOSE, (void*)&arg);
}else{
sqlite3_free(pDb->aSlot);
}
pDb->aSlot = 0;
pDb->bInit = 0;
}
/*
** Clear all page locks held by client iClient. The handle passed as the
** first argument may or may not correspond to client iClient.
**
** This function is called in multi-process mode as part of restoring the
** system state after it has been detected that client iClient may have
** failed mid transaction. It is never called for a single process system.
*/
static void serverClientUnlock(Server *p, int iClient){
ServerDb *pDb = p->pDb;
int i;
assert( pDb->pServerShm );
for(i=0; i<HMA_PAGELOCK_SLOTS; i++){
u32 *pSlot = &pDb->aSlot[i];
while( 1 ){
u32 o = *pSlot;
u32 n = o & ~((u32)1 << iClient);
if( slotGetWriter(n)==iClient ){
n -= ((iClient + 1) << HMA_MAX_TRANSACTIONID);
}
if( o==n || serverCompareAndSwap(pSlot, o, n) ) break;
}
}
}
/*
** This function is called when the very first connection to a database
** is established. It is responsible for rolling back any hot journal
** files found in the file-system.
*/
static int serverInitDatabase(Server *pNew, int eServer){
int nByte;
int rc = SQLITE_OK;
ServerDb *pDb = pNew->pDb;
sqlite3_vfs *pVfs;
sqlite3_file *dbfd = sqlite3PagerFile(pNew->pPager);
const char *zFilename = sqlite3PagerFilename(pNew->pPager, 0);
int bRollback = 0;
assert( zFilename );
assert( eServer==1 || eServer==2 );
pVfs = pDb->pVfs = sqlite3PagerVfs(pNew->pPager);
nByte = ROUND8(pVfs->szOsFile) * HMA_MAX_TRANSACTIONID;
pDb->aJrnlFdSpace = (u8*)sqlite3MallocZero(nByte);
if( pDb->aJrnlFdSpace==0 ){
rc = SQLITE_NOMEM_BKPT;
}else{
if( eServer==2 ){
ServerFcntlArg arg;
arg.h = 0;
arg.p = 0;
arg.i1 = sizeof(u32)*(HMA_PAGELOCK_SLOTS + HMA_MAX_TRANSACTIONID);
arg.i2 = 0;
rc = sqlite3OsFileControl(dbfd, SQLITE_FCNTL_SERVER_SHMOPEN, (void*)&arg);
if( rc==SQLITE_OK ){
pDb->aSlot = (u32*)arg.p;
pDb->aClient = &pDb->aSlot[HMA_PAGELOCK_SLOTS];
pDb->pServerShm = arg.h;
bRollback = arg.i2;
}
}else{
pDb->aSlot = (u32*)sqlite3MallocZero(sizeof(u32)*HMA_PAGELOCK_SLOTS);
if( pDb->aSlot==0 ) rc = SQLITE_NOMEM_BKPT;
bRollback = 1;
}
}
if( rc==SQLITE_OK ){
u8 *a = pDb->aJrnlFdSpace;
int i;
for(i=0; rc==SQLITE_OK && i<HMA_MAX_TRANSACTIONID; i++){
ServerJournal *pJ = &pDb->aJrnl[i];
pJ->jfd = (sqlite3_file*)&a[ROUND8(pVfs->szOsFile)*i];
pJ->zJournal = sqlite3_mprintf("%s-journal/%d-journal", zFilename, i);
if( pJ->zJournal==0 ){
rc = SQLITE_NOMEM_BKPT;
break;
}
if( bRollback ){
rc = serverClientRollback(pNew, i);
}
}
}
if( rc==SQLITE_OK && pDb->pServerShm && bRollback ){
ServerFcntlArg arg;
arg.h = pDb->pServerShm;
arg.p = 0;
arg.p = 0;
arg.i2 = 0;
rc = sqlite3OsFileControl(dbfd, SQLITE_FCNTL_SERVER_SHMOPEN2, (void*)&arg);
}
if( rc==SQLITE_OK ){
pDb->bInit = 1;
}else{
serverShutdownDatabase(pNew, dbfd, eServer==1);
}
return rc;
}
/*
** Take (bLock==1) or release (bLock==0) a server shmlock on slot iSlot.
** Return SQLITE_OK if successful, or SQLITE_BUSY if the lock cannot be
** obtained.
*/
static int serverFcntlLock(Server *p, int iSlot, int bLock){
sqlite3_file *dbfd = sqlite3PagerFile(p->pPager);
int rc;
ServerFcntlArg arg;
arg.h = p->pDb->pServerShm;
arg.p = 0;
arg.i1 = iSlot;
arg.i2 = bLock;
rc = sqlite3OsFileControl(dbfd, SQLITE_FCNTL_SERVER_SHMLOCK, (void*)&arg);
return rc;
}
/*
** Close the connection.
*/
void sqlite3ServerDisconnect(Server *p, sqlite3_file *dbfd){
ServerDb *pDb = p->pDb;
/* In a multi-process setup, release the lock on the client slot and
** clear the bit in the ServerDb.transmask bitmask. */
if( pDb->pServerShm && p->iTransId>=0 ){
serverFcntlLock(p, p->iTransId, 0);
sqlite3_mutex_enter(pDb->mutex);
pDb->transmask &= ~((u32)1 << p->iTransId);
sqlite3_mutex_leave(pDb->mutex);
}
serverEnterMutex();
pDb->nClient--;
if( pDb->nClient==0 ){
sqlite3_file *dbfd = sqlite3PagerFile(p->pPager);
ServerPage *pFree;
ServerDb **pp;
/* Delete the journal files on shutdown if an EXCLUSIVE lock is already
** held (single process mode) or can be obtained (multi process mode)
** on the database file.
**
** TODO: Need to account for disk-full errors and the like here. It
** is not necessarily safe to delete journal files here. */
int bDelete = 0;
if( pDb->pServerShm ){
int res;
res = sqlite3OsLock(dbfd, EXCLUSIVE_LOCK);
if( res==SQLITE_OK ) bDelete = 1;
}else{
bDelete = 1;
}
serverShutdownDatabase(p, dbfd, bDelete);
for(pp=&g_server.pDb; *pp!=pDb; pp=&((*pp)->pNext));
*pp = pDb->pNext;
sqlite3_mutex_free(pDb->mutex);
while( (pFree = pDb->pFree) ){
pDb->pFree = pFree->pNext;
sqlite3_free(pFree);
}
sqlite3_free(pDb);
}
serverLeaveMutex();
sqlite3_free(p->aLock);
sqlite3_free(p);
}
/*
** Connect to the system.
*/
int sqlite3ServerConnect(
Pager *pPager, /* Pager object */
int eServer, /* 1 -> single process, 2 -> multi process */
Server **ppOut /* OUT: Server handle */
){
Server *pNew = 0;
sqlite3_file *dbfd = sqlite3PagerFile(pPager);
i64 aFileId[2];
int rc;
rc = sqlite3OsFileControl(dbfd, SQLITE_FCNTL_FILEID, (void*)aFileId);
if( rc==SQLITE_OK ){
pNew = (Server*)sqlite3MallocZero(sizeof(Server));
if( pNew ){
pNew->pPager = pPager;
pNew->iTransId = -1;
rc = serverFindDatabase(pNew, aFileId);
if( rc!=SQLITE_OK ){
sqlite3_free(pNew);
pNew = 0;
}else{
ServerDb *pDb = pNew->pDb;
sqlite3_mutex_enter(pNew->pDb->mutex);
if( pDb->bInit==0 ){
rc = serverInitDatabase(pNew, eServer);
}
/* If this is a multi-process connection, need to lock a
** client locking-slot before continuing. */
if( rc==SQLITE_OK && pDb->pServerShm ){
int i;
rc = SQLITE_BUSY;
for(i=0; rc==SQLITE_BUSY && i<HMA_MAX_TRANSACTIONID; i++){
if( 0==(pDb->transmask & ((u32)1 << i)) ){
rc = serverFcntlLock(pNew, i, 1);
if( rc==SQLITE_OK ){
pNew->iTransId = i;
pDb->transmask |= ((u32)1 << i);
}
}
}
}
sqlite3_mutex_leave(pNew->pDb->mutex);
/* If this is a multi-process database, it may be that the previous
** user of client-id pNew->iTransId crashed mid transaction. Roll
** back any hot journal file in the file-system and release
** page locks held by any crashed process. TODO: The call to
** serverClientUnlock() is expensive. */
if( rc==SQLITE_OK && pDb->pServerShm && pDb->aClient[pNew->iTransId] ){
serverClientUnlock(pNew, pNew->iTransId);
rc = serverClientRollback(pNew, pNew->iTransId);
}
}
}else{
rc = SQLITE_NOMEM_BKPT;
}
}
if( rc!=SQLITE_OK && pNew ){
sqlite3ServerDisconnect(pNew, dbfd);
pNew = 0;
}
*ppOut = pNew;
return rc;
}
/*
** Begin a transaction.
*/
int sqlite3ServerBegin(Server *p, int bReadonly){
int rc = SQLITE_OK;
if( p->eTrans==SERVER_TRANS_NONE ){
ServerDb *pDb = p->pDb;
u32 t;
assert( p->pNext==0 );
if( pDb->pServerShm ){
p->eTrans = SERVER_TRANS_READWRITE;
pDb->aClient[p->iTransId] = 1;
}else{
assert( p->iTransId<0 );
sqlite3_mutex_enter(pDb->mutex);
if( bReadonly ){
Server *pIter;
p->iCommitId = pDb->iNextCommit;
for(pIter=pDb->pCommit; pIter; pIter=pIter->pNext){
if( pIter->iCommitId<p->iCommitId ){
p->iCommitId = pIter->iCommitId;
}
}
p->pNext = pDb->pReader;
pDb->pReader = p;
p->eTrans = SERVER_TRANS_READONLY;
}else{
int id;
/* Find a transaction id to use */
rc = SQLITE_BUSY;
t = pDb->transmask;
for(id=0; id<HMA_MAX_TRANSACTIONID; id++){
if( (t & (1 << id))==0 ){
t = t | (1 << id);
rc = SQLITE_OK;
break;
}
}
pDb->transmask = t;
p->eTrans = SERVER_TRANS_READWRITE;
if( rc==SQLITE_OK ){
p->iTransId = id;
}
}
sqlite3_mutex_leave(pDb->mutex);
}
if( rc==SQLITE_OK && p->eTrans==SERVER_TRANS_READWRITE ){
ServerJournal *pJrnl = &pDb->aJrnl[p->iTransId];
sqlite3PagerServerJournal(p->pPager, pJrnl->jfd, pJrnl->zJournal);
}
}
return rc;
}
static u32 *serverLockingSlot(ServerDb *pDb, u32 pgno){
return &pDb->aSlot[pgno % HMA_PAGELOCK_SLOTS];
}
static void serverReleaseLocks(Server *p){
ServerDb *pDb = p->pDb;
int i;
assert( pDb->pServerShm || sqlite3_mutex_held(pDb->mutex) );
for(i=0; i<p->nLock; i++){
while( 1 ){
u32 *pSlot = serverLockingSlot(pDb, p->aLock[i]);
u32 o = *pSlot;
u32 n = o & ~((u32)1 << p->iTransId);
if( slotGetWriter(n)==p->iTransId ){
n -= ((p->iTransId + 1) << HMA_MAX_TRANSACTIONID);
}
if( serverCompareAndSwap(pSlot, o, n) ) break;
}
}
p->nLock = 0;
}
static void serverRecycleBuffers(ServerDb *pDb){
assert( pDb->pServerShm==0 );
assert( sqlite3_mutex_held(pDb->mutex) );
/* See if it is possible to free any ServerPage records. If so, remove
** them from the linked list and hash table, and add them to the pFree
** list. */
if( pDb->pPgFirst ){
ServerPage *pPg;
Server *pIter;
ServerPage *pLast = 0;
int iOldest = 0x7FFFFFFF;
for(pIter=pDb->pReader; pIter; pIter=pIter->pNext){
iOldest = MIN(iOldest, pIter->iCommitId);
}
for(pIter=pDb->pCommit; pIter; pIter=pIter->pNext){
iOldest = MIN(iOldest, pIter->iCommitId);
}
for(pPg=pDb->pPgFirst; pPg && pPg->iCommitId<iOldest; pPg=pPg->pNext){
if( pPg->pHashPrev ){
pPg->pHashPrev->pHashNext = pPg->pHashNext;
}else{
int iHash = pPg->pgno % HMA_HASH_SIZE;
assert( pDb->apPg[iHash]==pPg );
pDb->apPg[iHash] = pPg->pHashNext;
}
if( pPg->pHashNext ){
pPg->pHashNext->pHashPrev = pPg->pHashPrev;
}
pLast = pPg;
}
if( pLast ){
assert( pLast->pNext==pPg );
pLast->pNext = pDb->pFree;
pDb->pFree = pDb->pPgFirst;
}
if( pPg==0 ){
pDb->pPgFirst = pDb->pPgLast = 0;
}else{
pDb->pPgFirst = pPg;
}
}
}
/*
** End a transaction (and release all locks). This version runs in
** single process mode only.
*/
static void serverEndSingle(Server *p){
Server **pp;
ServerDb *pDb = p->pDb;
assert( p->eTrans!=SERVER_TRANS_NONE );
assert( pDb->pServerShm==0 );
sqlite3_mutex_enter(pDb->mutex);
if( p->eTrans==SERVER_TRANS_READONLY ){
/* Remove the connection from the readers list */
for(pp=&pDb->pReader; *pp!=p; pp = &((*pp)->pNext));
*pp = p->pNext;
}else{
serverReleaseLocks(p);
/* Clear the bit in the transaction mask. */
pDb->transmask &= ~((u32)1 << p->iTransId);
}
serverRecycleBuffers(pDb);
sqlite3_mutex_leave(pDb->mutex);
p->pNext = 0;
p->iTransId = -1;
}
/*
** End a transaction (and release all locks).
*/
int sqlite3ServerEnd(Server *p){
if( p->eTrans!=SERVER_TRANS_NONE ){
if( p->pDb->pServerShm ){
serverReleaseLocks(p);
p->pDb->aClient[p->iTransId] = 0;
}else{
serverEndSingle(p);
}
p->eTrans = SERVER_TRANS_NONE;
}
return SQLITE_OK;
}
#if 0
static void dump_commit_list(ServerDb *pDb, int bRemove){
Server *pIter;
printf("commitlist(%d):", bRemove);
for(pIter=pDb->pCommit; pIter; pIter=pIter->pNext ){
printf(" %p", (void*)pIter);
}
printf("\n");
}
#endif
int sqlite3ServerPreCommit(Server *p, ServerPage *pPg){
ServerDb *pDb = p->pDb;
int rc = SQLITE_OK;
ServerPage *pIter;
/* This should never be called in multi-process mode */
assert( pDb->pServerShm==0 );
if( pPg==0 ) return SQLITE_OK;
sqlite3_mutex_enter(pDb->mutex);
/* Assign a commit id to this transaction */
assert( p->iCommitId==0 );
assert( p->eTrans==SERVER_TRANS_READWRITE );
assert( p->iTransId>=0 );
p->iCommitId = pDb->iNextCommit++;
/* Iterate through all pages. For each:
**
** 1. Set the iCommitId field.
** 2. Add the page to the hash table.
** 3. Wait until all slow-reader locks have cleared.
*/
for(pIter=pPg; pIter; pIter=pIter->pNext){
u32 *pSlot = &pDb->aSlot[pIter->pgno % HMA_PAGELOCK_SLOTS];
int iHash = pIter->pgno % HMA_HASH_SIZE;
pIter->iCommitId = p->iCommitId;
pIter->pHashNext = pDb->apPg[iHash];
if( pIter->pHashNext ){
pIter->pHashNext->pHashPrev = pIter;
}
pDb->apPg[iHash] = pIter;
/* TODO: Something better than this! */
while( slotGetSlowReaders(*pSlot)>0 ){
sqlite3_mutex_leave(pDb->mutex);
sqlite3_mutex_enter(pDb->mutex);
}
/* If pIter is the last element in the list, append the new list to
** the ServerDb.pPgFirst/pPgLast list at this point. */
if( pIter->pNext==0 ){
if( pDb->pPgLast ){
assert( pDb->pPgFirst );
pDb->pPgLast->pNext = pPg;
}else{
assert( pDb->pPgFirst==0 );
pDb->pPgFirst = pPg;
}
pDb->pPgLast = pIter;
}
}
/* Add this connection to the list of current committers */
assert( p->pNext==0 );
p->pNext = pDb->pCommit;
pDb->pCommit = p;
sqlite3_mutex_leave(pDb->mutex);
return rc;
}
/*
** Release all write-locks.
*/
int sqlite3ServerEndWrite(Server *p){
ServerDb *pDb = p->pDb;
int i;
if( pDb->pServerShm==0 ) sqlite3_mutex_enter(pDb->mutex);
for(i=0; i<p->nLock; i++){
while( 1 ){
u32 *pSlot = serverLockingSlot(pDb, p->aLock[i]);
u32 o = *pSlot;
u32 n = o & ~((u32)1 << p->iTransId);
if( slotGetWriter(n)==p->iTransId ){
n -= ((p->iTransId + 1) << HMA_MAX_TRANSACTIONID);
n |= ((u32)1 << p->iTransId);
}
if( o==n || serverCompareAndSwap(pSlot, o, n) ) break;
}
}
if( pDb->pServerShm==0 ){
Server **pp;
/* If this connection is in the committers list, remove it. */
for(pp=&pDb->pCommit; *pp; pp = &((*pp)->pNext)){
if( *pp==p ){
*pp = p->pNext;
break;
}
}
p->iCommitId = 0;
sqlite3_mutex_leave(pDb->mutex);
}
return SQLITE_OK;
}
static int serverCheckClient(Server *p, int iClient){
ServerDb *pDb = p->pDb;
int rc = SQLITE_BUSY_DEADLOCK;
if( pDb->pServerShm && 0==(pDb->transmask & (1 << iClient)) ){
/* At this point it is know that client iClient, if it exists, resides in
** some other process. Check that it is still alive by attempting to lock
** its client slot. If the client is not alive, clear all its locks and
** rollback its journal. */
rc = serverFcntlLock(p, iClient, 1);
if( rc==SQLITE_OK ){
serverClientUnlock(p, iClient);
rc = serverClientRollback(p, iClient);
serverFcntlLock(p, iClient, 0);
pDb->transmask &= ~(1 << iClient);
}else if( rc==SQLITE_BUSY ){
rc = SQLITE_BUSY_DEADLOCK;
}
}
return rc;
}
/*
** Lock page pgno for reading (bWrite==0) or writing (bWrite==1).
**
** If parameter bBlock is non-zero, then make this a blocking lock if
** possible.
*/
int sqlite3ServerLock(Server *p, Pgno pgno, int bWrite, int bBlock){
int rc = SQLITE_OK;
assert( p->eTrans==SERVER_TRANS_READWRITE
|| (p->eTrans==SERVER_TRANS_READONLY && p->pDb->pServerShm==0)
);
if( p->eTrans==SERVER_TRANS_READWRITE ){
ServerDb *pDb = p->pDb;
int iWriter;
int bSkip = 0;
u32 *pSlot;
/* Grow the aLock[] array if required */
assert( p->iTransId>=0 );
assert( p->nLock<=p->nAlloc );
if( p->nLock==p->nAlloc ){
int nNew = p->nLock ? p->nLock*2 : 256;
u32 *aNew = sqlite3_realloc(p->aLock, nNew*sizeof(u32));
if( aNew==0 ) return SQLITE_NOMEM_BKPT;
memset(&aNew[p->nLock], 0, sizeof(u32) * (nNew - p->nLock));
p->nAlloc = nNew;
p->aLock = aNew;
}
/* Find the locking slot for the page in question */
pSlot = serverLockingSlot(pDb, pgno);
if( pDb->pServerShm==0 ) sqlite3_mutex_enter(pDb->mutex);
while( 1 ){
u32 o = *pSlot;
u32 n = o;
assert( slotGetWriter(o)<0
|| slotReaderMask(o)==0
|| slotReaderMask(o)==(1 << slotGetWriter(o))
);
iWriter = slotGetWriter(o);
if( iWriter==p->iTransId || (bWrite==0 && (o & (1<<p->iTransId))) ){
bSkip = 1;
break;
}else if( iWriter>=0 ){
rc = serverCheckClient(p, iWriter);
}else if( bWrite ){
if( (slotReaderMask(o) & ~(1 << p->iTransId))==0 ){
n += ((p->iTransId + 1) << HMA_MAX_TRANSACTIONID);
}else{
int i;
for(i=0; i<HMA_MAX_TRANSACTIONID; i++){
if( o & (1 << i) ){
rc = serverCheckClient(p, i);
break;
}
}
}
}else{
n |= (1 << p->iTransId);
}
assert( slotGetWriter(n)<0
|| slotReaderMask(n)==0
|| slotReaderMask(n)==(1 << slotGetWriter(n))
);
if( rc!=SQLITE_OK || serverCompareAndSwap(pSlot, o, n) ) break;
}
if( pDb->pServerShm==0 ){
sqlite3_mutex_leave(pDb->mutex);
}
if( bSkip==0 && rc==SQLITE_OK ){
p->aLock[p->nLock++] = pgno;
}
}
return rc;
}
static void serverIncrSlowReader(u32 *pSlot, int n){
assert( n==1 || n==-1 );
*pSlot += (n * (1 << HMA_SLOT_RLWL_BITS));
}
void sqlite3ServerReadPage(Server *p, Pgno pgno, u8 **ppData){
if( p->eTrans==SERVER_TRANS_READONLY ){
ServerDb *pDb = p->pDb;
ServerPage *pIter;
ServerPage *pBest = 0;
int iHash = pgno % HMA_HASH_SIZE;
/* There are no READONLY transactions in a multi process system */
assert( pDb->pServerShm==0 );
sqlite3_mutex_enter(pDb->mutex);
/* Search the hash table for the oldest version of page pgno with
** a commit-id greater than or equal to Server.iCommitId. */
for(pIter=pDb->apPg[iHash]; pIter; pIter=pIter->pHashNext){
if( pIter->pgno==pgno
&& pIter->iCommitId>=p->iCommitId
&& (pBest==0 || pIter->iCommitId<pBest->iCommitId)
){
pBest = pIter;
}
}
if( pBest ){
*ppData = pBest->aData;
}else{
u32 *pSlot = &pDb->aSlot[pgno % HMA_PAGELOCK_SLOTS];
serverIncrSlowReader(pSlot, 1);
}
sqlite3_mutex_leave(pDb->mutex);
}
}
void sqlite3ServerEndReadPage(Server *p, Pgno pgno){
if( p->eTrans==SERVER_TRANS_READONLY ){
ServerDb *pDb = p->pDb;
u32 *pSlot = &pDb->aSlot[pgno % HMA_PAGELOCK_SLOTS];
assert( pDb->pServerShm==0 );
sqlite3_mutex_enter(pDb->mutex);
serverIncrSlowReader(pSlot, -1);
assert( slotGetSlowReaders(*pSlot)>=0 );
sqlite3_mutex_leave(pDb->mutex);
}
}
ServerPage *sqlite3ServerBuffer(Server *p){
ServerDb *pDb = p->pDb;
ServerPage *pRet = 0;
assert( pDb->pServerShm==0 );
sqlite3_mutex_enter(pDb->mutex);
if( pDb->pFree ){
pRet = pDb->pFree;
pDb->pFree = pRet->pNext;
pRet->pNext = 0;
}
sqlite3_mutex_leave(pDb->mutex);
return pRet;
}
/*
** Return true if the handle passed as the only argument is not NULL and
** currently has an open readonly transaction (one started with BEGIN
** READONLY). Return false if the argument is NULL, if there is no open
** transaction, or if the open transaction is read/write.
*/
int sqlite3ServerIsReadonly(Server *p){
return (p && p->eTrans==SERVER_TRANS_READONLY);
}
/*
** Return true if the argument is non-NULL and connects to a single-process
** server system. Return false if the argument is NULL or the system supports
** multiple processes.
*/
int sqlite3ServerIsSingleProcess(Server *p){
return (p && p->pDb->pServerShm==0);
}
#endif /* ifdef SQLITE_SERVER_EDITION */