
网友投稿 962 2022-11-13




src/backend/storage/lmgr/lock.c * For the most part, this code should be invoked via lmgr.c * or another lock-management module, not directly. * Interface: * InitLocks(), GetLocksMethodTable(), GetLockTagsMethodTable(), * LockAcquire(), LockRelease(), LockReleaseAll(), * LockCheckConflicts(), GrantLock()


主锁表 static HTAB *LockMethodLockHash 主锁表用来保存当前数据库中所有事务的锁对象(也就是Lock结构体),其存储在共享内存中。对应该哈希表来说,其键是LOCKTAG,值是LOCK结构体。

typedef struct LOCKTAG { uint32 locktag_field1; /* a 32-bit ID field */ uint32 locktag_field2; /* a 32-bit ID field */ uint32 locktag_field3; /* a 32-bit ID field */ uint16 locktag_field4; /* a 16-bit ID field */ uint8 locktag_type; /* see enum LockTagType */ uint8 locktag_lockmethodid; /* lockmethod indicator */} LOCKTAG;typedef struct LOCK { /* hash key */ LOCKTAG tag; /* unique identifier of lockable object */ /* data */ LOCKMASK grantMask; /* bitmask for lock types already granted */ LOCKMASK waitMask; /* bitmask for lock types awaited */ SHM_QUEUE procLocks; /* list of PROCLOCK objects assoc. with lock */ PROC_QUEUE waitProcs; /* list of PGPROC objects waiting on lock */ int requested[MAX_LOCKMODES]; /* counts of requested locks */ int nRequested; /* total of requested[] array */ int granted[MAX_LOCKMODES]; /* counts of granted locks */ int nGranted; /* total of granted[] array */} LOCK;

进程锁表 static HTAB *LockMethodProcLockHash 进程锁表用来保存当前进程(会话)的事务锁的状态(保存的是PROCLOCK结构体),其存储在共享内存中。对应该哈希表来说,其键是PROCLOCKTAG,值是PROCLOCK。

本地锁表 static HTAB *LockMethodLocalHash 对应每个后端进程来说是本地的,每个后端都会有,用于存放lock计数和resouce owner信息。对应该哈希表来说,其键是LOCALLOCKTAG,其值是LOCALLOCK。本地锁表的查询标签是LockTag+请求的锁模式。如果没有从本地锁表中检查到对应的锁,说明这是本事务第一次申请该锁。如果没有持有过该锁,其LOCALLOCK结构体中的nLocks为0,如果能够在本地锁表中找到对应的锁(nLocks不大于零)就说明这个锁对象和锁模式已经授予给本事务,只需要给本地锁表的这个锁增加引用计数就可以,首先增加nLocks计数,增加ResourceOwner中锁的计数。

typedef struct LOCALLOCKTAG { LOCKTAG lock; /* identifies the lockable object */ LOCKMODE mode; /* lock mode for this table entry */} LOCALLOCKTAG;typedef struct LOCALLOCK { /* tag */ LOCALLOCKTAG tag; /* unique identifier of locallock entry */ /* data */ uint32 hashcode; /* copy of LOCKTAG's hash value */ LOCK *lock; /* associated LOCK object, if any */ PROCLOCK *proclock; /* associated PROCLOCK object, if any */ int64 nLocks; /* total number of times lock is held */ int numLockOwners; /* # of relevant ResourceOwners */ int maxLockOwners; /* allocated size of array */ LOCALLOCKOWNER *lockOwners; /* dynamically resizable array */ bool holdsStrongLockCount; /* bumped FastPathStrongRelationLocks */ bool lockCleared; /* we read all sinval msgs for lock */} LOCALLOCK;

快路径强关闭表锁 static volatile FastPathStrongRelationLockData *FastPathStrongRelationLocks 快路径强关闭表锁,用来记录是否有事务已经获得这个对象的强锁。从锁的相容性矩阵可以看出,AccessShareLock、RowShareLock、RowExclusiveLock这3个锁是不互相冲突的,是相容的,而这几个锁主要用于事务的DML,因此可以把这几个锁定义为偌锁,而其他5个锁则定义为强锁。快路径强关闭表锁是一个FastPathStrongRelationLockData的结构体。其包含了一个mutex用作SpinLock。

typedef struct { slock_t mutex; uint32 count[FAST_PATH_STRONG_LOCK_HASH_PARTITIONS];} FastPathStrongRelationLockData;


/* This configuration variable is used to set the lock table size */int max_locks_per_xact; /* set by guc.c */#define NLOCKENTS() mul_size(max_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))

void InitLocks(void) { HASHCTL info; long init_table_size, max_table_size; bool found; /* Compute init/max size to request for lock hashtables. Note these calculations must agree with LockShmemSize! */ max_table_size = NLOCKENTS(); init_table_size = max_table_size / 2; /* Allocate hash table for LOCK structs. This stores per-locked-object information. */ MemSet(&info, 0, sizeof(info)); info.keysize = sizeof(LOCKTAG); info.entrysize = sizeof(LOCK); info.num_partitions = NUM_LOCK_PARTITIONS; LockMethodLockHash = ShmemInitHash("LOCK hash", init_table_size, max_table_size, &info, HASH_ELEM | HASH_BLOBS | HASH_PARTITION); /* Assume an average of 2 holders per lock */ max_table_size *= 2; init_table_size *= 2; /* Allocate hash table for PROCLOCK structs. This stores per-lock-per-holder information. */ info.keysize = sizeof(PROCLOCKTAG); info.entrysize = sizeof(PROCLOCK); info.hash = proclock_hash; info.num_partitions = NUM_LOCK_PARTITIONS; LockMethodProcLockHash = ShmemInitHash("PROCLOCK hash", init_table_size, max_table_size, &info, HASH_ELEM | HASH_FUNCTION | HASH_PARTITION); /* * Allocate fast-path structures. */ FastPathStrongRelationLocks = ShmemInitStruct("Fast Path Strong Relation Lock Data", sizeof(FastPathStrongRelationLockData), &found); if (!found) SpinLockInit(&FastPathStrongRelationLocks->mutex); /* Allocate non-shared hash table for LOCALLOCK structs. This stores lock * counts and resource owner information. * The non-shared table could already exist in this process (this occurs * when the postmaster is recreating shared memory after a backend crash). * If so, delete and recreate it. (We could simply leave it, since it * ought to be empty in the postmaster, but for safety let's zap it.) */ if (LockMethodLocalHash) hash_destroy(LockMethodLocalHash); info.keysize = sizeof(LOCALLOCKTAG); info.entrysize = sizeof(LOCALLOCK); LockMethodLocalHash = hash_create("LOCALLOCK hash", 16,&info, HASH_ELEM | HASH_BLOBS);}



/* LockAcquire -- Check for lock conflicts, sleep if conflict found, set lock if/when no conflicts. * Inputs: * locktag: unique identifier for the lockable object * lockmode: lock mode to acquire * sessionLock: if true, acquire lock for session not current transaction * dontWait: if true, don't wait to acquire lock * Returns one of: * LOCKACQUIRE_NOT_AVAIL lock not available, and dontWait=true * LOCKACQUIRE_OK lock successfully acquired * LOCKACQUIRE_ALREADY_HELD incremented count for lock already held * LOCKACQUIRE_ALREADY_CLEAR incremented count for lock already clear * * In the normal case where dontWait=false and the caller doesn't need to * distinguish a freshly acquired lock from one already taken earlier in * this same transaction, there is no need to examine the return value. * * Side Effects: The lock is acquired and recorded in lock tables. * * NOTE: if we wait for the lock, there is no way to abort the wait * short of aborting the transaction. */LockAcquireResult LockAcquire(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock, bool dontWait) { return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait, true, NULL);}

/* * LockAcquireExtended - allows us to specify additional options * * reportMemoryError specifies whether a lock request that fills the lock * table should generate an ERROR or not. Passing "false" allows the caller * to attempt to recover from lock-table-full situations, perhaps by forcibly * cancelling other lock holders and then retrying. Note, however, that the * return code for that is LOCKACQUIRE_NOT_AVAIL, so that it's unsafe to use * in combination with dontWait = true, as the cause of failure couldn't be * distinguished. * * If locallockp isn't NULL, *locallockp receives a pointer to the LOCALLOCK * table entry if a lock is successfully acquired, or NULL if not. */LockAcquireResultLockAcquireExtended(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock, bool dontWait, bool reportMemoryError, LOCALLOCK **locallockp){ LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid; LockMethod lockMethodTable; LOCALLOCKTAG localtag; LOCALLOCK *locallock; LOCK *lock; PROCLOCK *proclock; bool found; ResourceOwner owner; uint32 hashcode; LWLock *partitionLock; int status; bool log_lock = false; if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) elog(ERROR, "unrecognized lock method: %d", lockmethodid); lockMethodTable = LockMethods[lockmethodid]; if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes) elog(ERROR, "unrecognized lock mode: %d", lockmode); if (RecoveryInProgress() && !InRecovery && (locktag->locktag_type == LOCKTAG_OBJECT || locktag->locktag_type == LOCKTAG_RELATION) && lockmode > RowExclusiveLock) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot acquire lock mode %s on database objects while recovery is in progress", lockMethodTable->lockModeNames[lockmode]), errhint("Only RowExclusiveLock or less can be acquired on database objects during recovery.")));#ifdef LOCK_DEBUG if (LOCK_DEBUG_ENABLED(locktag)) elog(LOG, "LockAcquire: lock [%u,%u] %s", locktag->locktag_field1, locktag->locktag_field2, lockMethodTable->lockModeNames[lockmode]);#endif /* Identify owner for lock */ if (sessionLock) owner = NULL; else owner = CurrentResourceOwner; /* * Find or create a LOCALLOCK entry for this lock and lockmode */ MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */ localtag.lock = *locktag; localtag.mode = lockmode; locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash, (void *) &localtag, HASH_ENTER, &found);


/* * if it's a new locallock object, initialize it */ if (!found) { locallock->lock = NULL; locallock->proclock = NULL; locallock->hashcode = LockTagHashCode(&(localtag.lock)); locallock->nLocks = 0; locallock->holdsStrongLockCount = false; locallock->lockCleared = false; locallock->numLockOwners = 0; locallock->maxLockOwners = 8; locallock->lockOwners = NULL; /* in case next line fails */ locallock->lockOwners = (LOCALLOCKOWNER *) MemoryContextAlloc(TopMemoryContext, locallock->maxLockOwners * sizeof(LOCALLOCKOWNER)); } else { /* Make sure there will be room to remember the lock */ if (locallock->numLockOwners >= locallock->maxLockOwners) { int newsize = locallock->maxLockOwners * 2; locallock->lockOwners = (LOCALLOCKOWNER *) repalloc(locallock->lockOwners, newsize * sizeof(LOCALLOCKOWNER)); locallock->maxLockOwners = newsize; } } hashcode = locallock->hashcode; if (locallockp) *locallockp = locallock; /* * If we already hold the lock, we can just increase the count locally. * * If lockCleared is already set, caller need not worry about absorbing * sinval messages related to the lock's object. */ if (locallock->nLocks > 0) { GrantLockLocal(locallock, owner); if (locallock->lockCleared) return LOCKACQUIRE_ALREADY_CLEAR; else return LOCKACQUIRE_ALREADY_HELD; } /* * Prepare to emit a WAL record if acquisition of this lock needs to be * replayed in a standby server. * * Here we prepare to log; after lock is acquired we'll issue log record. * This arrangement simplifies error recovery in case the preparation step * fails. * * Only AccessExclusiveLocks can conflict with lock types that read-only * transactions can acquire in a standby server. Make sure this definition * matches the one in GetRunningTransactionLocks(). */ if (lockmode >= AccessExclusiveLock && locktag->locktag_type == LOCKTAG_RELATION && !RecoveryInProgress() && XLogStandbyInfoActive()) { LogAccessExclusiveLockPrepare(); log_lock = true; } /* * Attempt to take lock via fast path, if eligible. But if we remember * having filled up the fast path array, we don't attempt to make any * further use of it until we release some locks. It's possible that some * other backend has transferred some of those locks to the shared hash * table, leaving space free, but it's not worth acquiring the LWLock just * to check. It's also possible that we're acquiring a second or third * lock type on a relation we have already locked using the fast-path, but * for now we don't worry about that case either. */ if (EligibleForRelationFastPath(locktag, lockmode) && FastPathLocalUseCount < FP_LOCK_SLOTS_PER_BACKEND) { uint32 fasthashcode = FastPathStrongLockHashPartition(hashcode); bool acquired; /* * LWLockAcquire acts as a memory sequencing point, so it's safe to * assume that any strong locker whose increment to * FastPathStrongRelationLocks->counts becomes visible after we test * it has yet to begin to transfer fast-path locks. */ LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE); if (FastPathStrongRelationLocks->count[fasthashcode] != 0) acquired = false; else acquired = FastPathGrantRelationLock(locktag->locktag_field2, lockmode); LWLockRelease(&MyProc->backendLock); if (acquired) { /* * The locallock might contain stale pointers to some old shared * objects; we MUST reset these to null before considering the * lock to be acquired via fast-path. */ locallock->lock = NULL; locallock->proclock = NULL; GrantLockLocal(locallock, owner); return LOCKACQUIRE_OK; } } /* * If this lock could potentially have been taken via the fast-path by * some other backend, we must (temporarily) disable further use of the * fast-path for this lock tag, and migrate any locks already taken via * this method to the main lock table. */ if (ConflictsWithRelationFastPath(locktag, lockmode)) { uint32 fasthashcode = FastPathStrongLockHashPartition(hashcode); BeginStrongLockAcquire(locallock, fasthashcode); if (!FastPathTransferRelationLocks(lockMethodTable, locktag, hashcode)) { AbortStrongLockAcquire(); if (locallock->nLocks == 0) RemoveLocalLock(locallock); if (locallockp) *locallockp = NULL; if (reportMemoryError) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of shared memory"), errhint("You might need to increase max_locks_per_transaction."))); else return LOCKACQUIRE_NOT_AVAIL; } } /* * We didn't find the lock in our LOCALLOCK table, and we didn't manage to * take it via the fast-path, either, so we've got to mess with the shared * lock table. */ partitionLock = LockHashPartitionLock(hashcode); LWLockAcquire(partitionLock, LW_EXCLUSIVE); /* * Find or create lock and proclock entries with this tag * * Note: if the locallock object already existed, it might have a pointer * to the lock already ... but we should not assume that that pointer is * valid, since a lock object with zero hold and request counts can go * away anytime. So we have to use SetupLockInTable() to recompute the * lock and proclock pointers, even if they're already set. */ proclock = SetupLockInTable(lockMethodTable, MyProc, locktag, hashcode, lockmode); if (!proclock) { AbortStrongLockAcquire(); LWLockRelease(partitionLock); if (locallock->nLocks == 0) RemoveLocalLock(locallock); if (locallockp) *locallockp = NULL; if (reportMemoryError) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of shared memory"), errhint("You might need to increase max_locks_per_transaction."))); else return LOCKACQUIRE_NOT_AVAIL; } locallock->proclock = proclock; lock = proclock->tag.myLock; locallock->lock = lock; /* * If lock requested conflicts with locks requested by waiters, must join * wait queue. Otherwise, check for conflict with already-held locks. * (That's last because most complex check.) */ if (lockMethodTable->conflictTab[lockmode] & lock->waitMask) status = STATUS_FOUND; else status = LockCheckConflicts(lockMethodTable, lockmode, lock, proclock); if (status == STATUS_OK) { /* No conflict with held or previously requested locks */ GrantLock(lock, proclock, lockmode); GrantLockLocal(locallock, owner); } else { Assert(status == STATUS_FOUND); /* * We can't acquire the lock immediately. If caller specified no * blocking, remove useless table entries and return NOT_AVAIL without * waiting. */ if (dontWait) { AbortStrongLockAcquire(); if (proclock->holdMask == 0) { uint32 proclock_hashcode; proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode); SHMQueueDelete(&proclock->lockLink); SHMQueueDelete(&proclock->procLink); if (!hash_search_with_hash_value(LockMethodProcLockHash, (void *) &(proclock->tag), proclock_hashcode, HASH_REMOVE, NULL)) elog(PANIC, "proclock table corrupted"); } else PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock); lock->nRequested--; lock->requested[lockmode]--; LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode); Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0)); Assert(lock->nGranted <= lock->nRequested); LWLockRelease(partitionLock); if (locallock->nLocks == 0) RemoveLocalLock(locallock); if (locallockp) *locallockp = NULL; return LOCKACQUIRE_NOT_AVAIL; } /* * Set bitmask of locks this process already holds on this object. */ MyProc->heldLocks = proclock->holdMask; /* * Sleep till someone wakes me up. */ TRACE_POSTGRESQL_LOCK_WAIT_START(locktag->locktag_field1, locktag->locktag_field2, locktag->locktag_field3, locktag->locktag_field4, locktag->locktag_type, lockmode); WaitOnLock(locallock, owner); TRACE_POSTGRESQL_LOCK_WAIT_DONE(locktag->locktag_field1, locktag->locktag_field2, locktag->locktag_field3, locktag->locktag_field4, locktag->locktag_type, lockmode); /* * NOTE: do not do any material change of state between here and * return. All required changes in locktable state must have been * done when the lock was granted to us --- see notes in WaitOnLock. */ /* * Check the proclock entry status, in case something in the ipc * communication doesn't work correctly. */ if (!(proclock->holdMask & LOCKBIT_ON(lockmode))) { AbortStrongLockAcquire(); PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock); LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode); /* Should we retry ? */ LWLockRelease(partitionLock); elog(ERROR, "LockAcquire failed"); } PROCLOCK_PRINT("LockAcquire: granted", proclock); LOCK_PRINT("LockAcquire: granted", lock, lockmode); } /* * Lock state is fully up-to-date now; if we error out after this, no * special error cleanup is required. */ FinishStrongLockAcquire(); LWLockRelease(partitionLock); /* * Emit a WAL record if acquisition of this lock needs to be replayed in a * standby server. */ if (log_lock) { /* * Decode the locktag back to the original values, to avoid sending * lots of empty bytes with every message. See lock.h to check how a * locktag is defined for LOCKTAG_RELATION */ LogAccessExclusiveLock(locktag->locktag_field1, locktag->locktag_field2); } return LOCKACQUIRE_OK;}

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们 处理,核实后本网站将在24小时内删除侵权内容。

下一篇:Greenplum数据库分布式事务系统——Distributed Log

