LCOV - code coverage report
Current view: top level - src/backend/storage/lmgr - lwlock.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 362 411 88.1 %
Date: 2026-01-10 06:17:41 Functions: 30 36 83.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * lwlock.c
       4             :  *    Lightweight lock manager
       5             :  *
       6             :  * Lightweight locks are intended primarily to provide mutual exclusion of
       7             :  * access to shared-memory data structures.  Therefore, they offer both
       8             :  * exclusive and shared lock modes (to support read/write and read-only
       9             :  * access to a shared object).  There are few other frammishes.  User-level
      10             :  * locking should be done with the full lock manager --- which depends on
      11             :  * LWLocks to protect its shared state.
      12             :  *
      13             :  * In addition to exclusive and shared modes, lightweight locks can be used to
      14             :  * wait until a variable changes value.  The variable is initially not set
      15             :  * when the lock is acquired with LWLockAcquire, i.e. it remains set to the
      16             :  * value it was set to when the lock was released last, and can be updated
      17             :  * without releasing the lock by calling LWLockUpdateVar.  LWLockWaitForVar
      18             :  * waits for the variable to be updated, or until the lock is free.  When
      19             :  * releasing the lock with LWLockReleaseClearVar() the value can be set to an
      20             :  * appropriate value for a free lock.  The meaning of the variable is up to
      21             :  * the caller, the lightweight lock code just assigns and compares it.
      22             :  *
      23             :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      24             :  * Portions Copyright (c) 1994, Regents of the University of California
      25             :  *
      26             :  * IDENTIFICATION
      27             :  *    src/backend/storage/lmgr/lwlock.c
      28             :  *
      29             :  * NOTES:
      30             :  *
      31             :  * This used to be a pretty straight forward reader-writer lock
      32             :  * implementation, in which the internal state was protected by a
      33             :  * spinlock. Unfortunately the overhead of taking the spinlock proved to be
      34             :  * too high for workloads/locks that were taken in shared mode very
      35             :  * frequently. Often we were spinning in the (obviously exclusive) spinlock,
      36             :  * while trying to acquire a shared lock that was actually free.
      37             :  *
      38             :  * Thus a new implementation was devised that provides wait-free shared lock
      39             :  * acquisition for locks that aren't exclusively locked.
      40             :  *
      41             :  * The basic idea is to have a single atomic variable 'lockcount' instead of
      42             :  * the formerly separate shared and exclusive counters and to use atomic
      43             :  * operations to acquire the lock. That's fairly easy to do for plain
      44             :  * rw-spinlocks, but a lot harder for something like LWLocks that want to wait
      45             :  * in the OS.
      46             :  *
      47             :  * For lock acquisition we use an atomic compare-and-exchange on the lockcount
      48             :  * variable. For exclusive lock we swap in a sentinel value
      49             :  * (LW_VAL_EXCLUSIVE), for shared locks we count the number of holders.
      50             :  *
      51             :  * To release the lock we use an atomic decrement to release the lock. If the
      52             :  * new value is zero (we get that atomically), we know we can/have to release
      53             :  * waiters.
      54             :  *
      55             :  * Obviously it is important that the sentinel value for exclusive locks
      56             :  * doesn't conflict with the maximum number of possible share lockers -
      57             :  * luckily MAX_BACKENDS makes that easily possible.
      58             :  *
      59             :  *
      60             :  * The attentive reader might have noticed that naively doing the above has a
      61             :  * glaring race condition: We try to lock using the atomic operations and
      62             :  * notice that we have to wait. Unfortunately by the time we have finished
      63             :  * queuing, the former locker very well might have already finished its
      64             :  * work. That's problematic because we're now stuck waiting inside the OS.
      65             : 
      66             :  * To mitigate those races we use a two phased attempt at locking:
      67             :  *   Phase 1: Try to do it atomically, if we succeed, nice
      68             :  *   Phase 2: Add ourselves to the waitqueue of the lock
      69             :  *   Phase 3: Try to grab the lock again, if we succeed, remove ourselves from
      70             :  *            the queue
      71             :  *   Phase 4: Sleep till wake-up, goto Phase 1
      72             :  *
      73             :  * This protects us against the problem from above as nobody can release too
      74             :  *    quick, before we're queued, since after Phase 2 we're already queued.
      75             :  * -------------------------------------------------------------------------
      76             :  */
      77             : #include "postgres.h"
      78             : 
      79             : #include "miscadmin.h"
      80             : #include "pg_trace.h"
      81             : #include "pgstat.h"
      82             : #include "port/pg_bitutils.h"
      83             : #include "storage/proc.h"
      84             : #include "storage/proclist.h"
      85             : #include "storage/procnumber.h"
      86             : #include "storage/spin.h"
      87             : #include "utils/memutils.h"
      88             : 
      89             : #ifdef LWLOCK_STATS
      90             : #include "utils/hsearch.h"
      91             : #endif
      92             : 
      93             : 
      94             : #define LW_FLAG_HAS_WAITERS         ((uint32) 1 << 31)
      95             : #define LW_FLAG_RELEASE_OK          ((uint32) 1 << 30)
      96             : #define LW_FLAG_LOCKED              ((uint32) 1 << 29)
      97             : #define LW_FLAG_BITS                3
      98             : #define LW_FLAG_MASK                (((1<<LW_FLAG_BITS)-1)<<(32-LW_FLAG_BITS))
      99             : 
     100             : /* assumes MAX_BACKENDS is a (power of 2) - 1, checked below */
     101             : #define LW_VAL_EXCLUSIVE            (MAX_BACKENDS + 1)
     102             : #define LW_VAL_SHARED               1
     103             : 
     104             : /* already (power of 2)-1, i.e. suitable for a mask */
     105             : #define LW_SHARED_MASK              MAX_BACKENDS
     106             : #define LW_LOCK_MASK                (MAX_BACKENDS | LW_VAL_EXCLUSIVE)
     107             : 
     108             : 
     109             : StaticAssertDecl(((MAX_BACKENDS + 1) & MAX_BACKENDS) == 0,
     110             :                  "MAX_BACKENDS + 1 needs to be a power of 2");
     111             : 
     112             : StaticAssertDecl((MAX_BACKENDS & LW_FLAG_MASK) == 0,
     113             :                  "MAX_BACKENDS and LW_FLAG_MASK overlap");
     114             : 
     115             : StaticAssertDecl((LW_VAL_EXCLUSIVE & LW_FLAG_MASK) == 0,
     116             :                  "LW_VAL_EXCLUSIVE and LW_FLAG_MASK overlap");
     117             : 
     118             : /*
     119             :  * There are three sorts of LWLock "tranches":
     120             :  *
     121             :  * 1. The individually-named locks defined in lwlocklist.h each have their
     122             :  * own tranche.  We absorb the names of these tranches from there into
     123             :  * BuiltinTrancheNames here.
     124             :  *
     125             :  * 2. There are some predefined tranches for built-in groups of locks defined
     126             :  * in lwlocklist.h.  We absorb the names of these tranches, too.
     127             :  *
     128             :  * 3. Extensions can create new tranches, via either RequestNamedLWLockTranche
     129             :  * or LWLockNewTrancheId.  These names are stored in shared memory and can be
     130             :  * accessed via LWLockTrancheNames.
     131             :  *
     132             :  * All these names are user-visible as wait event names, so choose with care
     133             :  * ... and do not forget to update the documentation's list of wait events.
     134             :  */
     135             : static const char *const BuiltinTrancheNames[] = {
     136             : #define PG_LWLOCK(id, lockname) [id] = CppAsString(lockname),
     137             : #define PG_LWLOCKTRANCHE(id, lockname) [LWTRANCHE_##id] = CppAsString(lockname),
     138             : #include "storage/lwlocklist.h"
     139             : #undef PG_LWLOCK
     140             : #undef PG_LWLOCKTRANCHE
     141             : };
     142             : 
     143             : StaticAssertDecl(lengthof(BuiltinTrancheNames) ==
     144             :                  LWTRANCHE_FIRST_USER_DEFINED,
     145             :                  "missing entries in BuiltinTrancheNames[]");
     146             : 
     147             : /*
     148             :  * This is indexed by tranche ID minus LWTRANCHE_FIRST_USER_DEFINED, and
     149             :  * points to the shared memory locations of the names of all
     150             :  * dynamically-created tranches.  Backends inherit the pointer by fork from the
     151             :  * postmaster (except in the EXEC_BACKEND case, where we have special measures
     152             :  * to pass it down).
     153             :  */
     154             : char      **LWLockTrancheNames = NULL;
     155             : 
     156             : /*
     157             :  * This points to the main array of LWLocks in shared memory.  Backends inherit
     158             :  * the pointer by fork from the postmaster (except in the EXEC_BACKEND case,
     159             :  * where we have special measures to pass it down).
     160             :  */
     161             : LWLockPadded *MainLWLockArray = NULL;
     162             : 
     163             : /*
     164             :  * We use this structure to keep track of locked LWLocks for release
     165             :  * during error recovery.  Normally, only a few will be held at once, but
     166             :  * occasionally the number can be much higher.
     167             :  */
     168             : #define MAX_SIMUL_LWLOCKS   200
     169             : 
     170             : /* struct representing the LWLocks we're holding */
     171             : typedef struct LWLockHandle
     172             : {
     173             :     LWLock     *lock;
     174             :     LWLockMode  mode;
     175             : } LWLockHandle;
     176             : 
     177             : static int  num_held_lwlocks = 0;
     178             : static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS];
     179             : 
     180             : /* struct representing the LWLock tranche request for named tranche */
     181             : typedef struct NamedLWLockTrancheRequest
     182             : {
     183             :     char        tranche_name[NAMEDATALEN];
     184             :     int         num_lwlocks;
     185             : } NamedLWLockTrancheRequest;
     186             : 
     187             : /*
     188             :  * NamedLWLockTrancheRequests is the valid length of the request array.  These
     189             :  * variables are non-static so that launch_backend.c can copy them to child
     190             :  * processes in EXEC_BACKEND builds.
     191             :  */
     192             : int         NamedLWLockTrancheRequests = 0;
     193             : NamedLWLockTrancheRequest *NamedLWLockTrancheRequestArray = NULL;
     194             : 
     195             : /* postmaster's local copy of the request array */
     196             : static NamedLWLockTrancheRequest *LocalNamedLWLockTrancheRequestArray = NULL;
     197             : 
     198             : /* shared memory counter of registered tranches */
     199             : int        *LWLockCounter = NULL;
     200             : 
     201             : /* backend-local counter of registered tranches */
     202             : static int  LocalLWLockCounter;
     203             : 
     204             : #define MAX_NAMED_TRANCHES 256
     205             : 
     206             : static void InitializeLWLocks(void);
     207             : static inline void LWLockReportWaitStart(LWLock *lock);
     208             : static inline void LWLockReportWaitEnd(void);
     209             : static const char *GetLWTrancheName(uint16 trancheId);
     210             : 
     211             : #define T_NAME(lock) \
     212             :     GetLWTrancheName((lock)->tranche)
     213             : 
     214             : #ifdef LWLOCK_STATS
     215             : typedef struct lwlock_stats_key
     216             : {
     217             :     int         tranche;
     218             :     void       *instance;
     219             : }           lwlock_stats_key;
     220             : 
     221             : typedef struct lwlock_stats
     222             : {
     223             :     lwlock_stats_key key;
     224             :     int         sh_acquire_count;
     225             :     int         ex_acquire_count;
     226             :     int         block_count;
     227             :     int         dequeue_self_count;
     228             :     int         spin_delay_count;
     229             : }           lwlock_stats;
     230             : 
     231             : static HTAB *lwlock_stats_htab;
     232             : static lwlock_stats lwlock_stats_dummy;
     233             : #endif
     234             : 
     235             : #ifdef LOCK_DEBUG
     236             : bool        Trace_lwlocks = false;
     237             : 
     238             : inline static void
     239             : PRINT_LWDEBUG(const char *where, LWLock *lock, LWLockMode mode)
     240             : {
     241             :     /* hide statement & context here, otherwise the log is just too verbose */
     242             :     if (Trace_lwlocks)
     243             :     {
     244             :         uint32      state = pg_atomic_read_u32(&lock->state);
     245             : 
     246             :         ereport(LOG,
     247             :                 (errhidestmt(true),
     248             :                  errhidecontext(true),
     249             :                  errmsg_internal("%d: %s(%s %p): excl %u shared %u haswaiters %u waiters %u rOK %d",
     250             :                                  MyProcPid,
     251             :                                  where, T_NAME(lock), lock,
     252             :                                  (state & LW_VAL_EXCLUSIVE) != 0,
     253             :                                  state & LW_SHARED_MASK,
     254             :                                  (state & LW_FLAG_HAS_WAITERS) != 0,
     255             :                                  pg_atomic_read_u32(&lock->nwaiters),
     256             :                                  (state & LW_FLAG_RELEASE_OK) != 0)));
     257             :     }
     258             : }
     259             : 
     260             : inline static void
     261             : LOG_LWDEBUG(const char *where, LWLock *lock, const char *msg)
     262             : {
     263             :     /* hide statement & context here, otherwise the log is just too verbose */
     264             :     if (Trace_lwlocks)
     265             :     {
     266             :         ereport(LOG,
     267             :                 (errhidestmt(true),
     268             :                  errhidecontext(true),
     269             :                  errmsg_internal("%s(%s %p): %s", where,
     270             :                                  T_NAME(lock), lock, msg)));
     271             :     }
     272             : }
     273             : 
     274             : #else                           /* not LOCK_DEBUG */
     275             : #define PRINT_LWDEBUG(a,b,c) ((void)0)
     276             : #define LOG_LWDEBUG(a,b,c) ((void)0)
     277             : #endif                          /* LOCK_DEBUG */
     278             : 
     279             : #ifdef LWLOCK_STATS
     280             : 
     281             : static void init_lwlock_stats(void);
     282             : static void print_lwlock_stats(int code, Datum arg);
     283             : static lwlock_stats * get_lwlock_stats_entry(LWLock *lock);
     284             : 
     285             : static void
     286             : init_lwlock_stats(void)
     287             : {
     288             :     HASHCTL     ctl;
     289             :     static MemoryContext lwlock_stats_cxt = NULL;
     290             :     static bool exit_registered = false;
     291             : 
     292             :     if (lwlock_stats_cxt != NULL)
     293             :         MemoryContextDelete(lwlock_stats_cxt);
     294             : 
     295             :     /*
     296             :      * The LWLock stats will be updated within a critical section, which
     297             :      * requires allocating new hash entries. Allocations within a critical
     298             :      * section are normally not allowed because running out of memory would
     299             :      * lead to a PANIC, but LWLOCK_STATS is debugging code that's not normally
     300             :      * turned on in production, so that's an acceptable risk. The hash entries
     301             :      * are small, so the risk of running out of memory is minimal in practice.
     302             :      */
     303             :     lwlock_stats_cxt = AllocSetContextCreate(TopMemoryContext,
     304             :                                              "LWLock stats",
     305             :                                              ALLOCSET_DEFAULT_SIZES);
     306             :     MemoryContextAllowInCriticalSection(lwlock_stats_cxt, true);
     307             : 
     308             :     ctl.keysize = sizeof(lwlock_stats_key);
     309             :     ctl.entrysize = sizeof(lwlock_stats);
     310             :     ctl.hcxt = lwlock_stats_cxt;
     311             :     lwlock_stats_htab = hash_create("lwlock stats", 16384, &ctl,
     312             :                                     HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
     313             :     if (!exit_registered)
     314             :     {
     315             :         on_shmem_exit(print_lwlock_stats, 0);
     316             :         exit_registered = true;
     317             :     }
     318             : }
     319             : 
     320             : static void
     321             : print_lwlock_stats(int code, Datum arg)
     322             : {
     323             :     HASH_SEQ_STATUS scan;
     324             :     lwlock_stats *lwstats;
     325             : 
     326             :     hash_seq_init(&scan, lwlock_stats_htab);
     327             : 
     328             :     /* Grab an LWLock to keep different backends from mixing reports */
     329             :     LWLockAcquire(&MainLWLockArray[0].lock, LW_EXCLUSIVE);
     330             : 
     331             :     while ((lwstats = (lwlock_stats *) hash_seq_search(&scan)) != NULL)
     332             :     {
     333             :         fprintf(stderr,
     334             :                 "PID %d lwlock %s %p: shacq %u exacq %u blk %u spindelay %u dequeue self %u\n",
     335             :                 MyProcPid, GetLWTrancheName(lwstats->key.tranche),
     336             :                 lwstats->key.instance, lwstats->sh_acquire_count,
     337             :                 lwstats->ex_acquire_count, lwstats->block_count,
     338             :                 lwstats->spin_delay_count, lwstats->dequeue_self_count);
     339             :     }
     340             : 
     341             :     LWLockRelease(&MainLWLockArray[0].lock);
     342             : }
     343             : 
     344             : static lwlock_stats *
     345             : get_lwlock_stats_entry(LWLock *lock)
     346             : {
     347             :     lwlock_stats_key key;
     348             :     lwlock_stats *lwstats;
     349             :     bool        found;
     350             : 
     351             :     /*
     352             :      * During shared memory initialization, the hash table doesn't exist yet.
     353             :      * Stats of that phase aren't very interesting, so just collect operations
     354             :      * on all locks in a single dummy entry.
     355             :      */
     356             :     if (lwlock_stats_htab == NULL)
     357             :         return &lwlock_stats_dummy;
     358             : 
     359             :     /* Fetch or create the entry. */
     360             :     MemSet(&key, 0, sizeof(key));
     361             :     key.tranche = lock->tranche;
     362             :     key.instance = lock;
     363             :     lwstats = hash_search(lwlock_stats_htab, &key, HASH_ENTER, &found);
     364             :     if (!found)
     365             :     {
     366             :         lwstats->sh_acquire_count = 0;
     367             :         lwstats->ex_acquire_count = 0;
     368             :         lwstats->block_count = 0;
     369             :         lwstats->dequeue_self_count = 0;
     370             :         lwstats->spin_delay_count = 0;
     371             :     }
     372             :     return lwstats;
     373             : }
     374             : #endif                          /* LWLOCK_STATS */
     375             : 
     376             : 
     377             : /*
     378             :  * Compute number of LWLocks required by named tranches.  These will be
     379             :  * allocated in the main array.
     380             :  */
     381             : static int
     382        6510 : NumLWLocksForNamedTranches(void)
     383             : {
     384        6510 :     int         numLocks = 0;
     385             :     int         i;
     386             : 
     387        6744 :     for (i = 0; i < NamedLWLockTrancheRequests; i++)
     388         234 :         numLocks += NamedLWLockTrancheRequestArray[i].num_lwlocks;
     389             : 
     390        6510 :     return numLocks;
     391             : }
     392             : 
     393             : /*
     394             :  * Compute shmem space needed for LWLocks and named tranches.
     395             :  */
     396             : Size
     397        6510 : LWLockShmemSize(void)
     398             : {
     399             :     Size        size;
     400        6510 :     int         numLocks = NUM_FIXED_LWLOCKS;
     401             : 
     402             :     /*
     403             :      * If re-initializing shared memory, the request array will no longer be
     404             :      * accessible, so switch to the copy in postmaster's local memory.  We'll
     405             :      * copy it back into shared memory later when CreateLWLocks() is called
     406             :      * again.
     407             :      */
     408        6510 :     if (LocalNamedLWLockTrancheRequestArray)
     409           0 :         NamedLWLockTrancheRequestArray = LocalNamedLWLockTrancheRequestArray;
     410             : 
     411             :     /* Calculate total number of locks needed in the main array. */
     412        6510 :     numLocks += NumLWLocksForNamedTranches();
     413             : 
     414             :     /* Space for dynamic allocation counter. */
     415        6510 :     size = MAXALIGN(sizeof(int));
     416             : 
     417             :     /* Space for named tranches. */
     418        6510 :     size = add_size(size, mul_size(MAX_NAMED_TRANCHES, sizeof(char *)));
     419        6510 :     size = add_size(size, mul_size(MAX_NAMED_TRANCHES, NAMEDATALEN));
     420             : 
     421             :     /*
     422             :      * Make space for named tranche requests.  This is done for the benefit of
     423             :      * EXEC_BACKEND builds, which otherwise wouldn't be able to call
     424             :      * GetNamedLWLockTranche() outside postmaster.
     425             :      */
     426        6510 :     size = add_size(size, mul_size(NamedLWLockTrancheRequests,
     427             :                                    sizeof(NamedLWLockTrancheRequest)));
     428             : 
     429             :     /* Space for the LWLock array, plus room for cache line alignment. */
     430        6510 :     size = add_size(size, LWLOCK_PADDED_SIZE);
     431        6510 :     size = add_size(size, mul_size(numLocks, sizeof(LWLockPadded)));
     432             : 
     433        6510 :     return size;
     434             : }
     435             : 
     436             : /*
     437             :  * Allocate shmem space for the main LWLock array and all tranches and
     438             :  * initialize it.
     439             :  */
     440             : void
     441        2272 : CreateLWLocks(void)
     442             : {
     443        2272 :     if (!IsUnderPostmaster)
     444             :     {
     445        2272 :         Size        spaceLocks = LWLockShmemSize();
     446             :         char       *ptr;
     447             : 
     448             :         /* Allocate space */
     449        2272 :         ptr = (char *) ShmemAlloc(spaceLocks);
     450             : 
     451             :         /* Initialize the dynamic-allocation counter for tranches */
     452        2272 :         LWLockCounter = (int *) ptr;
     453        2272 :         *LWLockCounter = LWTRANCHE_FIRST_USER_DEFINED;
     454        2272 :         ptr += MAXALIGN(sizeof(int));
     455             : 
     456             :         /* Initialize tranche names */
     457        2272 :         LWLockTrancheNames = (char **) ptr;
     458        2272 :         ptr += MAX_NAMED_TRANCHES * sizeof(char *);
     459      583904 :         for (int i = 0; i < MAX_NAMED_TRANCHES; i++)
     460             :         {
     461      581632 :             LWLockTrancheNames[i] = ptr;
     462      581632 :             ptr += NAMEDATALEN;
     463             :         }
     464             : 
     465             :         /*
     466             :          * Move named tranche requests to shared memory.  This is done for the
     467             :          * benefit of EXEC_BACKEND builds, which otherwise wouldn't be able to
     468             :          * call GetNamedLWLockTranche() outside postmaster.
     469             :          */
     470        2272 :         if (NamedLWLockTrancheRequests > 0)
     471             :         {
     472             :             /*
     473             :              * Save the pointer to the request array in postmaster's local
     474             :              * memory.  We'll need it if we ever need to re-initialize shared
     475             :              * memory after a crash.
     476             :              */
     477          16 :             LocalNamedLWLockTrancheRequestArray = NamedLWLockTrancheRequestArray;
     478             : 
     479          16 :             memcpy(ptr, NamedLWLockTrancheRequestArray,
     480             :                    NamedLWLockTrancheRequests * sizeof(NamedLWLockTrancheRequest));
     481          16 :             NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *) ptr;
     482          16 :             ptr += NamedLWLockTrancheRequests * sizeof(NamedLWLockTrancheRequest);
     483             :         }
     484             : 
     485             :         /* Ensure desired alignment of LWLock array */
     486        2272 :         ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE;
     487        2272 :         MainLWLockArray = (LWLockPadded *) ptr;
     488             : 
     489             :         /* Initialize all LWLocks */
     490        2272 :         InitializeLWLocks();
     491             :     }
     492        2272 : }
     493             : 
     494             : /*
     495             :  * Initialize LWLocks that are fixed and those belonging to named tranches.
     496             :  */
     497             : static void
     498        2272 : InitializeLWLocks(void)
     499             : {
     500             :     int         id;
     501             :     int         i;
     502             :     int         j;
     503             :     LWLockPadded *lock;
     504             : 
     505             :     /* Initialize all individual LWLocks in main array */
     506      129504 :     for (id = 0, lock = MainLWLockArray; id < NUM_INDIVIDUAL_LWLOCKS; id++, lock++)
     507      127232 :         LWLockInitialize(&lock->lock, id);
     508             : 
     509             :     /* Initialize buffer mapping LWLocks in main array */
     510        2272 :     lock = MainLWLockArray + BUFFER_MAPPING_LWLOCK_OFFSET;
     511      293088 :     for (id = 0; id < NUM_BUFFER_PARTITIONS; id++, lock++)
     512      290816 :         LWLockInitialize(&lock->lock, LWTRANCHE_BUFFER_MAPPING);
     513             : 
     514             :     /* Initialize lmgrs' LWLocks in main array */
     515        2272 :     lock = MainLWLockArray + LOCK_MANAGER_LWLOCK_OFFSET;
     516       38624 :     for (id = 0; id < NUM_LOCK_PARTITIONS; id++, lock++)
     517       36352 :         LWLockInitialize(&lock->lock, LWTRANCHE_LOCK_MANAGER);
     518             : 
     519             :     /* Initialize predicate lmgrs' LWLocks in main array */
     520        2272 :     lock = MainLWLockArray + PREDICATELOCK_MANAGER_LWLOCK_OFFSET;
     521       38624 :     for (id = 0; id < NUM_PREDICATELOCK_PARTITIONS; id++, lock++)
     522       36352 :         LWLockInitialize(&lock->lock, LWTRANCHE_PREDICATE_LOCK_MANAGER);
     523             : 
     524             :     /*
     525             :      * Copy the info about any named tranches into shared memory (so that
     526             :      * other processes can see it), and initialize the requested LWLocks.
     527             :      */
     528        2272 :     if (NamedLWLockTrancheRequests > 0)
     529             :     {
     530          16 :         lock = &MainLWLockArray[NUM_FIXED_LWLOCKS];
     531             : 
     532          94 :         for (i = 0; i < NamedLWLockTrancheRequests; i++)
     533             :         {
     534             :             NamedLWLockTrancheRequest *request;
     535             :             int         tranche;
     536             : 
     537          78 :             request = &NamedLWLockTrancheRequestArray[i];
     538          78 :             tranche = LWLockNewTrancheId(request->tranche_name);
     539             : 
     540         156 :             for (j = 0; j < request->num_lwlocks; j++, lock++)
     541          78 :                 LWLockInitialize(&lock->lock, tranche);
     542             :         }
     543             :     }
     544        2272 : }
     545             : 
     546             : /*
     547             :  * InitLWLockAccess - initialize backend-local state needed to hold LWLocks
     548             :  */
     549             : void
     550       45684 : InitLWLockAccess(void)
     551             : {
     552             : #ifdef LWLOCK_STATS
     553             :     init_lwlock_stats();
     554             : #endif
     555       45684 : }
     556             : 
     557             : /*
     558             :  * GetNamedLWLockTranche - returns the base address of LWLock from the
     559             :  *      specified tranche.
     560             :  *
     561             :  * Caller needs to retrieve the requested number of LWLocks starting from
     562             :  * the base lock address returned by this API.  This can be used for
     563             :  * tranches that are requested by using RequestNamedLWLockTranche() API.
     564             :  */
     565             : LWLockPadded *
     566          18 : GetNamedLWLockTranche(const char *tranche_name)
     567             : {
     568             :     int         lock_pos;
     569             :     int         i;
     570             : 
     571             :     /*
     572             :      * Obtain the position of base address of LWLock belonging to requested
     573             :      * tranche_name in MainLWLockArray.  LWLocks for named tranches are placed
     574             :      * in MainLWLockArray after fixed locks.
     575             :      */
     576          18 :     lock_pos = NUM_FIXED_LWLOCKS;
     577          82 :     for (i = 0; i < NamedLWLockTrancheRequests; i++)
     578             :     {
     579          80 :         if (strcmp(NamedLWLockTrancheRequestArray[i].tranche_name,
     580             :                    tranche_name) == 0)
     581          16 :             return &MainLWLockArray[lock_pos];
     582             : 
     583          64 :         lock_pos += NamedLWLockTrancheRequestArray[i].num_lwlocks;
     584             :     }
     585             : 
     586           2 :     elog(ERROR, "requested tranche is not registered");
     587             : 
     588             :     /* just to keep compiler quiet */
     589             :     return NULL;
     590             : }
     591             : 
     592             : /*
     593             :  * Allocate a new tranche ID with the provided name.
     594             :  */
     595             : int
     596         566 : LWLockNewTrancheId(const char *name)
     597             : {
     598             :     int         result;
     599             : 
     600         566 :     if (!name)
     601           2 :         ereport(ERROR,
     602             :                 (errcode(ERRCODE_INVALID_NAME),
     603             :                  errmsg("tranche name cannot be NULL")));
     604             : 
     605         564 :     if (strlen(name) >= NAMEDATALEN)
     606           2 :         ereport(ERROR,
     607             :                 (errcode(ERRCODE_NAME_TOO_LONG),
     608             :                  errmsg("tranche name too long"),
     609             :                  errdetail("LWLock tranche names must be no longer than %d bytes.",
     610             :                            NAMEDATALEN - 1)));
     611             : 
     612             :     /*
     613             :      * We use the ShmemLock spinlock to protect LWLockCounter and
     614             :      * LWLockTrancheNames.
     615             :      */
     616         562 :     SpinLockAcquire(ShmemLock);
     617             : 
     618         562 :     if (*LWLockCounter - LWTRANCHE_FIRST_USER_DEFINED >= MAX_NAMED_TRANCHES)
     619             :     {
     620           2 :         SpinLockRelease(ShmemLock);
     621           2 :         ereport(ERROR,
     622             :                 (errmsg("maximum number of tranches already registered"),
     623             :                  errdetail("No more than %d tranches may be registered.",
     624             :                            MAX_NAMED_TRANCHES)));
     625             :     }
     626             : 
     627         560 :     result = (*LWLockCounter)++;
     628         560 :     LocalLWLockCounter = *LWLockCounter;
     629         560 :     strlcpy(LWLockTrancheNames[result - LWTRANCHE_FIRST_USER_DEFINED], name, NAMEDATALEN);
     630             : 
     631         560 :     SpinLockRelease(ShmemLock);
     632             : 
     633         560 :     return result;
     634             : }
     635             : 
     636             : /*
     637             :  * RequestNamedLWLockTranche
     638             :  *      Request that extra LWLocks be allocated during postmaster
     639             :  *      startup.
     640             :  *
     641             :  * This may only be called via the shmem_request_hook of a library that is
     642             :  * loaded into the postmaster via shared_preload_libraries.  Calls from
     643             :  * elsewhere will fail.
     644             :  *
     645             :  * The tranche name will be user-visible as a wait event name, so try to
     646             :  * use a name that fits the style for those.
     647             :  */
     648             : void
     649          78 : RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks)
     650             : {
     651             :     NamedLWLockTrancheRequest *request;
     652             :     static int  NamedLWLockTrancheRequestsAllocated;
     653             : 
     654          78 :     if (!process_shmem_requests_in_progress)
     655           0 :         elog(FATAL, "cannot request additional LWLocks outside shmem_request_hook");
     656             : 
     657          78 :     if (!tranche_name)
     658           0 :         ereport(ERROR,
     659             :                 (errcode(ERRCODE_INVALID_NAME),
     660             :                  errmsg("tranche name cannot be NULL")));
     661             : 
     662          78 :     if (strlen(tranche_name) >= NAMEDATALEN)
     663           0 :         ereport(ERROR,
     664             :                 (errcode(ERRCODE_NAME_TOO_LONG),
     665             :                  errmsg("tranche name too long"),
     666             :                  errdetail("LWLock tranche names must be no longer than %d bytes.",
     667             :                            NAMEDATALEN - 1)));
     668             : 
     669          78 :     if (NamedLWLockTrancheRequestArray == NULL)
     670             :     {
     671          16 :         NamedLWLockTrancheRequestsAllocated = 16;
     672          16 :         NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *)
     673          16 :             MemoryContextAlloc(TopMemoryContext,
     674             :                                NamedLWLockTrancheRequestsAllocated
     675             :                                * sizeof(NamedLWLockTrancheRequest));
     676             :     }
     677             : 
     678          78 :     if (NamedLWLockTrancheRequests >= NamedLWLockTrancheRequestsAllocated)
     679             :     {
     680           2 :         int         i = pg_nextpower2_32(NamedLWLockTrancheRequests + 1);
     681             : 
     682           2 :         NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *)
     683           2 :             repalloc(NamedLWLockTrancheRequestArray,
     684             :                      i * sizeof(NamedLWLockTrancheRequest));
     685           2 :         NamedLWLockTrancheRequestsAllocated = i;
     686             :     }
     687             : 
     688          78 :     request = &NamedLWLockTrancheRequestArray[NamedLWLockTrancheRequests];
     689          78 :     strlcpy(request->tranche_name, tranche_name, NAMEDATALEN);
     690          78 :     request->num_lwlocks = num_lwlocks;
     691          78 :     NamedLWLockTrancheRequests++;
     692          78 : }
     693             : 
     694             : /*
     695             :  * LWLockInitialize - initialize a new lwlock; it's initially unlocked
     696             :  */
     697             : void
     698    25793944 : LWLockInitialize(LWLock *lock, int tranche_id)
     699             : {
     700             :     /* verify the tranche_id is valid */
     701    25793944 :     (void) GetLWTrancheName(tranche_id);
     702             : 
     703    25793942 :     pg_atomic_init_u32(&lock->state, LW_FLAG_RELEASE_OK);
     704             : #ifdef LOCK_DEBUG
     705             :     pg_atomic_init_u32(&lock->nwaiters, 0);
     706             : #endif
     707    25793942 :     lock->tranche = tranche_id;
     708    25793942 :     proclist_init(&lock->waiters);
     709    25793942 : }
     710             : 
     711             : /*
     712             :  * Report start of wait event for light-weight locks.
     713             :  *
     714             :  * This function will be used by all the light-weight lock calls which
     715             :  * needs to wait to acquire the lock.  This function distinguishes wait
     716             :  * event based on tranche and lock id.
     717             :  */
     718             : static inline void
     719     7647500 : LWLockReportWaitStart(LWLock *lock)
     720             : {
     721     7647500 :     pgstat_report_wait_start(PG_WAIT_LWLOCK | lock->tranche);
     722     7647500 : }
     723             : 
     724             : /*
     725             :  * Report end of wait event for light-weight locks.
     726             :  */
     727             : static inline void
     728     7647500 : LWLockReportWaitEnd(void)
     729             : {
     730     7647500 :     pgstat_report_wait_end();
     731     7647500 : }
     732             : 
     733             : /*
     734             :  * Return the name of an LWLock tranche.
     735             :  */
     736             : static const char *
     737    25794530 : GetLWTrancheName(uint16 trancheId)
     738             : {
     739             :     /* Built-in tranche or individual LWLock? */
     740    25794530 :     if (trancheId < LWTRANCHE_FIRST_USER_DEFINED)
     741    25792992 :         return BuiltinTrancheNames[trancheId];
     742             : 
     743             :     /*
     744             :      * We only ever add new entries to LWLockTrancheNames, so most lookups can
     745             :      * avoid taking the spinlock as long as the backend-local counter
     746             :      * (LocalLWLockCounter) is greater than the requested tranche ID.  Else,
     747             :      * we need to first update the backend-local counter with ShmemLock held
     748             :      * before attempting the lookup again.  In practice, the latter case is
     749             :      * probably rare.
     750             :      */
     751        1538 :     if (trancheId >= LocalLWLockCounter)
     752             :     {
     753           2 :         SpinLockAcquire(ShmemLock);
     754           2 :         LocalLWLockCounter = *LWLockCounter;
     755           2 :         SpinLockRelease(ShmemLock);
     756             : 
     757           2 :         if (trancheId >= LocalLWLockCounter)
     758           2 :             elog(ERROR, "tranche %d is not registered", trancheId);
     759             :     }
     760             : 
     761             :     /*
     762             :      * It's an extension tranche, so look in LWLockTrancheNames.
     763             :      */
     764        1536 :     trancheId -= LWTRANCHE_FIRST_USER_DEFINED;
     765             : 
     766        1536 :     return LWLockTrancheNames[trancheId];
     767             : }
     768             : 
     769             : /*
     770             :  * Return an identifier for an LWLock based on the wait class and event.
     771             :  */
     772             : const char *
     773         586 : GetLWLockIdentifier(uint32 classId, uint16 eventId)
     774             : {
     775             :     Assert(classId == PG_WAIT_LWLOCK);
     776             :     /* The event IDs are just tranche numbers. */
     777         586 :     return GetLWTrancheName(eventId);
     778             : }
     779             : 
     780             : /*
     781             :  * Internal function that tries to atomically acquire the lwlock in the passed
     782             :  * in mode.
     783             :  *
     784             :  * This function will not block waiting for a lock to become free - that's the
     785             :  * caller's job.
     786             :  *
     787             :  * Returns true if the lock isn't free and we need to wait.
     788             :  */
     789             : static bool
     790   763770150 : LWLockAttemptLock(LWLock *lock, LWLockMode mode)
     791             : {
     792             :     uint32      old_state;
     793             : 
     794             :     Assert(mode == LW_EXCLUSIVE || mode == LW_SHARED);
     795             : 
     796             :     /*
     797             :      * Read once outside the loop, later iterations will get the newer value
     798             :      * via compare & exchange.
     799             :      */
     800   763770150 :     old_state = pg_atomic_read_u32(&lock->state);
     801             : 
     802             :     /* loop until we've determined whether we could acquire the lock or not */
     803             :     while (true)
     804      565514 :     {
     805             :         uint32      desired_state;
     806             :         bool        lock_free;
     807             : 
     808   764335664 :         desired_state = old_state;
     809             : 
     810   764335664 :         if (mode == LW_EXCLUSIVE)
     811             :         {
     812   475003894 :             lock_free = (old_state & LW_LOCK_MASK) == 0;
     813   475003894 :             if (lock_free)
     814   469645218 :                 desired_state += LW_VAL_EXCLUSIVE;
     815             :         }
     816             :         else
     817             :         {
     818   289331770 :             lock_free = (old_state & LW_VAL_EXCLUSIVE) == 0;
     819   289331770 :             if (lock_free)
     820   279282746 :                 desired_state += LW_VAL_SHARED;
     821             :         }
     822             : 
     823             :         /*
     824             :          * Attempt to swap in the state we are expecting. If we didn't see
     825             :          * lock to be free, that's just the old value. If we saw it as free,
     826             :          * we'll attempt to mark it acquired. The reason that we always swap
     827             :          * in the value is that this doubles as a memory barrier. We could try
     828             :          * to be smarter and only swap in values if we saw the lock as free,
     829             :          * but benchmark haven't shown it as beneficial so far.
     830             :          *
     831             :          * Retry if the value changed since we last looked at it.
     832             :          */
     833   764335664 :         if (pg_atomic_compare_exchange_u32(&lock->state,
     834             :                                            &old_state, desired_state))
     835             :         {
     836   763770150 :             if (lock_free)
     837             :             {
     838             :                 /* Great! Got the lock. */
     839             : #ifdef LOCK_DEBUG
     840             :                 if (mode == LW_EXCLUSIVE)
     841             :                     lock->owner = MyProc;
     842             : #endif
     843   748524186 :                 return false;
     844             :             }
     845             :             else
     846    15245964 :                 return true;    /* somebody else has the lock */
     847             :         }
     848             :     }
     849             :     pg_unreachable();
     850             : }
     851             : 
     852             : /*
     853             :  * Lock the LWLock's wait list against concurrent activity.
     854             :  *
     855             :  * NB: even though the wait list is locked, non-conflicting lock operations
     856             :  * may still happen concurrently.
     857             :  *
     858             :  * Time spent holding mutex should be short!
     859             :  */
     860             : static void
     861    21215730 : LWLockWaitListLock(LWLock *lock)
     862             : {
     863             :     uint32      old_state;
     864             : #ifdef LWLOCK_STATS
     865             :     lwlock_stats *lwstats;
     866             :     uint32      delays = 0;
     867             : 
     868             :     lwstats = get_lwlock_stats_entry(lock);
     869             : #endif
     870             : 
     871             :     while (true)
     872             :     {
     873             :         /*
     874             :          * Always try once to acquire the lock directly, without setting up
     875             :          * the spin-delay infrastructure. The work necessary for that shows up
     876             :          * in profiles and is rarely necessary.
     877             :          */
     878    21358254 :         old_state = pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_LOCKED);
     879    21358254 :         if (likely(!(old_state & LW_FLAG_LOCKED)))
     880    21215730 :             break;              /* got lock */
     881             : 
     882             :         /* and then spin without atomic operations until lock is released */
     883             :         {
     884             :             SpinDelayStatus delayStatus;
     885             : 
     886      142524 :             init_local_spin_delay(&delayStatus);
     887             : 
     888      489244 :             while (old_state & LW_FLAG_LOCKED)
     889             :             {
     890      346720 :                 perform_spin_delay(&delayStatus);
     891      346720 :                 old_state = pg_atomic_read_u32(&lock->state);
     892             :             }
     893             : #ifdef LWLOCK_STATS
     894             :             delays += delayStatus.delays;
     895             : #endif
     896      142524 :             finish_spin_delay(&delayStatus);
     897             :         }
     898             : 
     899             :         /*
     900             :          * Retry. The lock might obviously already be re-acquired by the time
     901             :          * we're attempting to get it again.
     902             :          */
     903             :     }
     904             : 
     905             : #ifdef LWLOCK_STATS
     906             :     lwstats->spin_delay_count += delays;
     907             : #endif
     908    21215730 : }
     909             : 
     910             : /*
     911             :  * Unlock the LWLock's wait list.
     912             :  *
     913             :  * Note that it can be more efficient to manipulate flags and release the
     914             :  * locks in a single atomic operation.
     915             :  */
     916             : static void
     917    13672962 : LWLockWaitListUnlock(LWLock *lock)
     918             : {
     919             :     uint32      old_state PG_USED_FOR_ASSERTS_ONLY;
     920             : 
     921    13672962 :     old_state = pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_LOCKED);
     922             : 
     923             :     Assert(old_state & LW_FLAG_LOCKED);
     924    13672962 : }
     925             : 
     926             : /*
     927             :  * Wakeup all the lockers that currently have a chance to acquire the lock.
     928             :  */
     929             : static void
     930     7542768 : LWLockWakeup(LWLock *lock)
     931             : {
     932             :     bool        new_release_ok;
     933     7542768 :     bool        wokeup_somebody = false;
     934             :     proclist_head wakeup;
     935             :     proclist_mutable_iter iter;
     936             : 
     937     7542768 :     proclist_init(&wakeup);
     938             : 
     939     7542768 :     new_release_ok = true;
     940             : 
     941             :     /* lock wait list while collecting backends to wake up */
     942     7542768 :     LWLockWaitListLock(lock);
     943             : 
     944    12709328 :     proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
     945             :     {
     946     7724018 :         PGPROC     *waiter = GetPGProcByNumber(iter.cur);
     947             : 
     948     7724018 :         if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
     949       19320 :             continue;
     950             : 
     951     7704698 :         proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
     952     7704698 :         proclist_push_tail(&wakeup, iter.cur, lwWaitLink);
     953             : 
     954     7704698 :         if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
     955             :         {
     956             :             /*
     957             :              * Prevent additional wakeups until retryer gets to run. Backends
     958             :              * that are just waiting for the lock to become free don't retry
     959             :              * automatically.
     960             :              */
     961     7550570 :             new_release_ok = false;
     962             : 
     963             :             /*
     964             :              * Don't wakeup (further) exclusive locks.
     965             :              */
     966     7550570 :             wokeup_somebody = true;
     967             :         }
     968             : 
     969             :         /*
     970             :          * Signal that the process isn't on the wait list anymore. This allows
     971             :          * LWLockDequeueSelf() to remove itself of the waitlist with a
     972             :          * proclist_delete(), rather than having to check if it has been
     973             :          * removed from the list.
     974             :          */
     975             :         Assert(waiter->lwWaiting == LW_WS_WAITING);
     976     7704698 :         waiter->lwWaiting = LW_WS_PENDING_WAKEUP;
     977             : 
     978             :         /*
     979             :          * Once we've woken up an exclusive lock, there's no point in waking
     980             :          * up anybody else.
     981             :          */
     982     7704698 :         if (waiter->lwWaitMode == LW_EXCLUSIVE)
     983     2557458 :             break;
     984             :     }
     985             : 
     986             :     Assert(proclist_is_empty(&wakeup) || pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS);
     987             : 
     988             :     /* unset required flags, and release lock, in one fell swoop */
     989             :     {
     990             :         uint32      old_state;
     991             :         uint32      desired_state;
     992             : 
     993     7542768 :         old_state = pg_atomic_read_u32(&lock->state);
     994             :         while (true)
     995             :         {
     996     7560244 :             desired_state = old_state;
     997             : 
     998             :             /* compute desired flags */
     999             : 
    1000     7560244 :             if (new_release_ok)
    1001      125230 :                 desired_state |= LW_FLAG_RELEASE_OK;
    1002             :             else
    1003     7435014 :                 desired_state &= ~LW_FLAG_RELEASE_OK;
    1004             : 
    1005     7560244 :             if (proclist_is_empty(&lock->waiters))
    1006     7223738 :                 desired_state &= ~LW_FLAG_HAS_WAITERS;
    1007             : 
    1008     7560244 :             desired_state &= ~LW_FLAG_LOCKED;   /* release lock */
    1009             : 
    1010     7560244 :             if (pg_atomic_compare_exchange_u32(&lock->state, &old_state,
    1011             :                                                desired_state))
    1012     7542768 :                 break;
    1013             :         }
    1014             :     }
    1015             : 
    1016             :     /* Awaken any waiters I removed from the queue. */
    1017    15247466 :     proclist_foreach_modify(iter, &wakeup, lwWaitLink)
    1018             :     {
    1019     7704698 :         PGPROC     *waiter = GetPGProcByNumber(iter.cur);
    1020             : 
    1021             :         LOG_LWDEBUG("LWLockRelease", lock, "release waiter");
    1022     7704698 :         proclist_delete(&wakeup, iter.cur, lwWaitLink);
    1023             : 
    1024             :         /*
    1025             :          * Guarantee that lwWaiting being unset only becomes visible once the
    1026             :          * unlink from the link has completed. Otherwise the target backend
    1027             :          * could be woken up for other reason and enqueue for a new lock - if
    1028             :          * that happens before the list unlink happens, the list would end up
    1029             :          * being corrupted.
    1030             :          *
    1031             :          * The barrier pairs with the LWLockWaitListLock() when enqueuing for
    1032             :          * another lock.
    1033             :          */
    1034     7704698 :         pg_write_barrier();
    1035     7704698 :         waiter->lwWaiting = LW_WS_NOT_WAITING;
    1036     7704698 :         PGSemaphoreUnlock(waiter->sem);
    1037             :     }
    1038     7542768 : }
    1039             : 
    1040             : /*
    1041             :  * Add ourselves to the end of the queue.
    1042             :  *
    1043             :  * NB: Mode can be LW_WAIT_UNTIL_FREE here!
    1044             :  */
    1045             : static void
    1046     7887732 : LWLockQueueSelf(LWLock *lock, LWLockMode mode)
    1047             : {
    1048             :     /*
    1049             :      * If we don't have a PGPROC structure, there's no way to wait. This
    1050             :      * should never occur, since MyProc should only be null during shared
    1051             :      * memory initialization.
    1052             :      */
    1053     7887732 :     if (MyProc == NULL)
    1054           0 :         elog(PANIC, "cannot wait without a PGPROC structure");
    1055             : 
    1056     7887732 :     if (MyProc->lwWaiting != LW_WS_NOT_WAITING)
    1057           0 :         elog(PANIC, "queueing for lock while waiting on another one");
    1058             : 
    1059     7887732 :     LWLockWaitListLock(lock);
    1060             : 
    1061             :     /* setting the flag is protected by the spinlock */
    1062     7887732 :     pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_HAS_WAITERS);
    1063             : 
    1064     7887732 :     MyProc->lwWaiting = LW_WS_WAITING;
    1065     7887732 :     MyProc->lwWaitMode = mode;
    1066             : 
    1067             :     /* LW_WAIT_UNTIL_FREE waiters are always at the front of the queue */
    1068     7887732 :     if (mode == LW_WAIT_UNTIL_FREE)
    1069      163070 :         proclist_push_head(&lock->waiters, MyProcNumber, lwWaitLink);
    1070             :     else
    1071     7724662 :         proclist_push_tail(&lock->waiters, MyProcNumber, lwWaitLink);
    1072             : 
    1073             :     /* Can release the mutex now */
    1074     7887732 :     LWLockWaitListUnlock(lock);
    1075             : 
    1076             : #ifdef LOCK_DEBUG
    1077             :     pg_atomic_fetch_add_u32(&lock->nwaiters, 1);
    1078             : #endif
    1079     7887732 : }
    1080             : 
    1081             : /*
    1082             :  * Remove ourselves from the waitlist.
    1083             :  *
    1084             :  * This is used if we queued ourselves because we thought we needed to sleep
    1085             :  * but, after further checking, we discovered that we don't actually need to
    1086             :  * do so.
    1087             :  */
    1088             : static void
    1089      240232 : LWLockDequeueSelf(LWLock *lock)
    1090             : {
    1091             :     bool        on_waitlist;
    1092             : 
    1093             : #ifdef LWLOCK_STATS
    1094             :     lwlock_stats *lwstats;
    1095             : 
    1096             :     lwstats = get_lwlock_stats_entry(lock);
    1097             : 
    1098             :     lwstats->dequeue_self_count++;
    1099             : #endif
    1100             : 
    1101      240232 :     LWLockWaitListLock(lock);
    1102             : 
    1103             :     /*
    1104             :      * Remove ourselves from the waitlist, unless we've already been removed.
    1105             :      * The removal happens with the wait list lock held, so there's no race in
    1106             :      * this check.
    1107             :      */
    1108      240232 :     on_waitlist = MyProc->lwWaiting == LW_WS_WAITING;
    1109      240232 :     if (on_waitlist)
    1110      187712 :         proclist_delete(&lock->waiters, MyProcNumber, lwWaitLink);
    1111             : 
    1112      240232 :     if (proclist_is_empty(&lock->waiters) &&
    1113      218064 :         (pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS) != 0)
    1114             :     {
    1115      172856 :         pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS);
    1116             :     }
    1117             : 
    1118             :     /* XXX: combine with fetch_and above? */
    1119      240232 :     LWLockWaitListUnlock(lock);
    1120             : 
    1121             :     /* clear waiting state again, nice for debugging */
    1122      240232 :     if (on_waitlist)
    1123      187712 :         MyProc->lwWaiting = LW_WS_NOT_WAITING;
    1124             :     else
    1125             :     {
    1126       52520 :         int         extraWaits = 0;
    1127             : 
    1128             :         /*
    1129             :          * Somebody else dequeued us and has or will wake us up. Deal with the
    1130             :          * superfluous absorption of a wakeup.
    1131             :          */
    1132             : 
    1133             :         /*
    1134             :          * Reset RELEASE_OK flag if somebody woke us before we removed
    1135             :          * ourselves - they'll have set it to false.
    1136             :          */
    1137       52520 :         pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
    1138             : 
    1139             :         /*
    1140             :          * Now wait for the scheduled wakeup, otherwise our ->lwWaiting would
    1141             :          * get reset at some inconvenient point later. Most of the time this
    1142             :          * will immediately return.
    1143             :          */
    1144             :         for (;;)
    1145             :         {
    1146       52520 :             PGSemaphoreLock(MyProc->sem);
    1147       52520 :             if (MyProc->lwWaiting == LW_WS_NOT_WAITING)
    1148       52520 :                 break;
    1149           0 :             extraWaits++;
    1150             :         }
    1151             : 
    1152             :         /*
    1153             :          * Fix the process wait semaphore's count for any absorbed wakeups.
    1154             :          */
    1155       52520 :         while (extraWaits-- > 0)
    1156           0 :             PGSemaphoreUnlock(MyProc->sem);
    1157             :     }
    1158             : 
    1159             : #ifdef LOCK_DEBUG
    1160             :     {
    1161             :         /* not waiting anymore */
    1162             :         uint32      nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
    1163             : 
    1164             :         Assert(nwaiters < MAX_BACKENDS);
    1165             :     }
    1166             : #endif
    1167      240232 : }
    1168             : 
    1169             : /*
    1170             :  * LWLockAcquire - acquire a lightweight lock in the specified mode
    1171             :  *
    1172             :  * If the lock is not available, sleep until it is.  Returns true if the lock
    1173             :  * was available immediately, false if we had to sleep.
    1174             :  *
    1175             :  * Side effect: cancel/die interrupts are held off until lock release.
    1176             :  */
    1177             : bool
    1178   743843234 : LWLockAcquire(LWLock *lock, LWLockMode mode)
    1179             : {
    1180   743843234 :     PGPROC     *proc = MyProc;
    1181   743843234 :     bool        result = true;
    1182   743843234 :     int         extraWaits = 0;
    1183             : #ifdef LWLOCK_STATS
    1184             :     lwlock_stats *lwstats;
    1185             : 
    1186             :     lwstats = get_lwlock_stats_entry(lock);
    1187             : #endif
    1188             : 
    1189             :     Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE);
    1190             : 
    1191             :     PRINT_LWDEBUG("LWLockAcquire", lock, mode);
    1192             : 
    1193             : #ifdef LWLOCK_STATS
    1194             :     /* Count lock acquisition attempts */
    1195             :     if (mode == LW_EXCLUSIVE)
    1196             :         lwstats->ex_acquire_count++;
    1197             :     else
    1198             :         lwstats->sh_acquire_count++;
    1199             : #endif                          /* LWLOCK_STATS */
    1200             : 
    1201             :     /*
    1202             :      * We can't wait if we haven't got a PGPROC.  This should only occur
    1203             :      * during bootstrap or shared memory initialization.  Put an Assert here
    1204             :      * to catch unsafe coding practices.
    1205             :      */
    1206             :     Assert(!(proc == NULL && IsUnderPostmaster));
    1207             : 
    1208             :     /* Ensure we will have room to remember the lock */
    1209   743843234 :     if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
    1210           0 :         elog(ERROR, "too many LWLocks taken");
    1211             : 
    1212             :     /*
    1213             :      * Lock out cancel/die interrupts until we exit the code section protected
    1214             :      * by the LWLock.  This ensures that interrupts will not interfere with
    1215             :      * manipulations of data structures in shared memory.
    1216             :      */
    1217   743843234 :     HOLD_INTERRUPTS();
    1218             : 
    1219             :     /*
    1220             :      * Loop here to try to acquire lock after each time we are signaled by
    1221             :      * LWLockRelease.
    1222             :      *
    1223             :      * NOTE: it might seem better to have LWLockRelease actually grant us the
    1224             :      * lock, rather than retrying and possibly having to go back to sleep. But
    1225             :      * in practice that is no good because it means a process swap for every
    1226             :      * lock acquisition when two or more processes are contending for the same
    1227             :      * lock.  Since LWLocks are normally used to protect not-very-long
    1228             :      * sections of computation, a process needs to be able to acquire and
    1229             :      * release the same lock many times during a single CPU time slice, even
    1230             :      * in the presence of contention.  The efficiency of being able to do that
    1231             :      * outweighs the inefficiency of sometimes wasting a process dispatch
    1232             :      * cycle because the lock is not free when a released waiter finally gets
    1233             :      * to run.  See pgsql-hackers archives for 29-Dec-01.
    1234             :      */
    1235             :     for (;;)
    1236     7491510 :     {
    1237             :         bool        mustwait;
    1238             : 
    1239             :         /*
    1240             :          * Try to grab the lock the first time, we're not in the waitqueue
    1241             :          * yet/anymore.
    1242             :          */
    1243   751334744 :         mustwait = LWLockAttemptLock(lock, mode);
    1244             : 
    1245   751334744 :         if (!mustwait)
    1246             :         {
    1247             :             LOG_LWDEBUG("LWLockAcquire", lock, "immediately acquired lock");
    1248   743610082 :             break;              /* got the lock */
    1249             :         }
    1250             : 
    1251             :         /*
    1252             :          * Ok, at this point we couldn't grab the lock on the first try. We
    1253             :          * cannot simply queue ourselves to the end of the list and wait to be
    1254             :          * woken up because by now the lock could long have been released.
    1255             :          * Instead add us to the queue and try to grab the lock again. If we
    1256             :          * succeed we need to revert the queuing and be happy, otherwise we
    1257             :          * recheck the lock. If we still couldn't grab it, we know that the
    1258             :          * other locker will see our queue entries when releasing since they
    1259             :          * existed before we checked for the lock.
    1260             :          */
    1261             : 
    1262             :         /* add to the queue */
    1263     7724662 :         LWLockQueueSelf(lock, mode);
    1264             : 
    1265             :         /* we're now guaranteed to be woken up if necessary */
    1266     7724662 :         mustwait = LWLockAttemptLock(lock, mode);
    1267             : 
    1268             :         /* ok, grabbed the lock the second time round, need to undo queueing */
    1269     7724662 :         if (!mustwait)
    1270             :         {
    1271             :             LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue");
    1272             : 
    1273      233152 :             LWLockDequeueSelf(lock);
    1274      233152 :             break;
    1275             :         }
    1276             : 
    1277             :         /*
    1278             :          * Wait until awakened.
    1279             :          *
    1280             :          * It is possible that we get awakened for a reason other than being
    1281             :          * signaled by LWLockRelease.  If so, loop back and wait again.  Once
    1282             :          * we've gotten the LWLock, re-increment the sema by the number of
    1283             :          * additional signals received.
    1284             :          */
    1285             :         LOG_LWDEBUG("LWLockAcquire", lock, "waiting");
    1286             : 
    1287             : #ifdef LWLOCK_STATS
    1288             :         lwstats->block_count++;
    1289             : #endif
    1290             : 
    1291     7491510 :         LWLockReportWaitStart(lock);
    1292             :         if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED())
    1293             :             TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
    1294             : 
    1295             :         for (;;)
    1296             :         {
    1297     7491510 :             PGSemaphoreLock(proc->sem);
    1298     7491510 :             if (proc->lwWaiting == LW_WS_NOT_WAITING)
    1299     7491510 :                 break;
    1300           0 :             extraWaits++;
    1301             :         }
    1302             : 
    1303             :         /* Retrying, allow LWLockRelease to release waiters again. */
    1304     7491510 :         pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
    1305             : 
    1306             : #ifdef LOCK_DEBUG
    1307             :         {
    1308             :             /* not waiting anymore */
    1309             :             uint32      nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
    1310             : 
    1311             :             Assert(nwaiters < MAX_BACKENDS);
    1312             :         }
    1313             : #endif
    1314             : 
    1315             :         if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED())
    1316             :             TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
    1317     7491510 :         LWLockReportWaitEnd();
    1318             : 
    1319             :         LOG_LWDEBUG("LWLockAcquire", lock, "awakened");
    1320             : 
    1321             :         /* Now loop back and try to acquire lock again. */
    1322     7491510 :         result = false;
    1323             :     }
    1324             : 
    1325             :     if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_ENABLED())
    1326             :         TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), mode);
    1327             : 
    1328             :     /* Add lock to list of locks held by this backend */
    1329   743843234 :     held_lwlocks[num_held_lwlocks].lock = lock;
    1330   743843234 :     held_lwlocks[num_held_lwlocks++].mode = mode;
    1331             : 
    1332             :     /*
    1333             :      * Fix the process wait semaphore's count for any absorbed wakeups.
    1334             :      */
    1335   743843234 :     while (extraWaits-- > 0)
    1336           0 :         PGSemaphoreUnlock(proc->sem);
    1337             : 
    1338   743843234 :     return result;
    1339             : }
    1340             : 
    1341             : /*
    1342             :  * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
    1343             :  *
    1344             :  * If the lock is not available, return false with no side-effects.
    1345             :  *
    1346             :  * If successful, cancel/die interrupts are held off until lock release.
    1347             :  */
    1348             : bool
    1349     4416258 : LWLockConditionalAcquire(LWLock *lock, LWLockMode mode)
    1350             : {
    1351             :     bool        mustwait;
    1352             : 
    1353             :     Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE);
    1354             : 
    1355             :     PRINT_LWDEBUG("LWLockConditionalAcquire", lock, mode);
    1356             : 
    1357             :     /* Ensure we will have room to remember the lock */
    1358     4416258 :     if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
    1359           0 :         elog(ERROR, "too many LWLocks taken");
    1360             : 
    1361             :     /*
    1362             :      * Lock out cancel/die interrupts until we exit the code section protected
    1363             :      * by the LWLock.  This ensures that interrupts will not interfere with
    1364             :      * manipulations of data structures in shared memory.
    1365             :      */
    1366     4416258 :     HOLD_INTERRUPTS();
    1367             : 
    1368             :     /* Check for the lock */
    1369     4416258 :     mustwait = LWLockAttemptLock(lock, mode);
    1370             : 
    1371     4416258 :     if (mustwait)
    1372             :     {
    1373             :         /* Failed to get lock, so release interrupt holdoff */
    1374        6946 :         RESUME_INTERRUPTS();
    1375             : 
    1376             :         LOG_LWDEBUG("LWLockConditionalAcquire", lock, "failed");
    1377             :         if (TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL_ENABLED())
    1378             :             TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(lock), mode);
    1379             :     }
    1380             :     else
    1381             :     {
    1382             :         /* Add lock to list of locks held by this backend */
    1383     4409312 :         held_lwlocks[num_held_lwlocks].lock = lock;
    1384     4409312 :         held_lwlocks[num_held_lwlocks++].mode = mode;
    1385             :         if (TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_ENABLED())
    1386             :             TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(lock), mode);
    1387             :     }
    1388     4416258 :     return !mustwait;
    1389             : }
    1390             : 
    1391             : /*
    1392             :  * LWLockAcquireOrWait - Acquire lock, or wait until it's free
    1393             :  *
    1394             :  * The semantics of this function are a bit funky.  If the lock is currently
    1395             :  * free, it is acquired in the given mode, and the function returns true.  If
    1396             :  * the lock isn't immediately free, the function waits until it is released
    1397             :  * and returns false, but does not acquire the lock.
    1398             :  *
    1399             :  * This is currently used for WALWriteLock: when a backend flushes the WAL,
    1400             :  * holding WALWriteLock, it can flush the commit records of many other
    1401             :  * backends as a side-effect.  Those other backends need to wait until the
    1402             :  * flush finishes, but don't need to acquire the lock anymore.  They can just
    1403             :  * wake up, observe that their records have already been flushed, and return.
    1404             :  */
    1405             : bool
    1406      282892 : LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
    1407             : {
    1408      282892 :     PGPROC     *proc = MyProc;
    1409             :     bool        mustwait;
    1410      282892 :     int         extraWaits = 0;
    1411             : #ifdef LWLOCK_STATS
    1412             :     lwlock_stats *lwstats;
    1413             : 
    1414             :     lwstats = get_lwlock_stats_entry(lock);
    1415             : #endif
    1416             : 
    1417             :     Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE);
    1418             : 
    1419             :     PRINT_LWDEBUG("LWLockAcquireOrWait", lock, mode);
    1420             : 
    1421             :     /* Ensure we will have room to remember the lock */
    1422      282892 :     if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
    1423           0 :         elog(ERROR, "too many LWLocks taken");
    1424             : 
    1425             :     /*
    1426             :      * Lock out cancel/die interrupts until we exit the code section protected
    1427             :      * by the LWLock.  This ensures that interrupts will not interfere with
    1428             :      * manipulations of data structures in shared memory.
    1429             :      */
    1430      282892 :     HOLD_INTERRUPTS();
    1431             : 
    1432             :     /*
    1433             :      * NB: We're using nearly the same twice-in-a-row lock acquisition
    1434             :      * protocol as LWLockAcquire(). Check its comments for details.
    1435             :      */
    1436      282892 :     mustwait = LWLockAttemptLock(lock, mode);
    1437             : 
    1438      282892 :     if (mustwait)
    1439             :     {
    1440       11594 :         LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
    1441             : 
    1442       11594 :         mustwait = LWLockAttemptLock(lock, mode);
    1443             : 
    1444       11594 :         if (mustwait)
    1445             :         {
    1446             :             /*
    1447             :              * Wait until awakened.  Like in LWLockAcquire, be prepared for
    1448             :              * bogus wakeups.
    1449             :              */
    1450             :             LOG_LWDEBUG("LWLockAcquireOrWait", lock, "waiting");
    1451             : 
    1452             : #ifdef LWLOCK_STATS
    1453             :             lwstats->block_count++;
    1454             : #endif
    1455             : 
    1456       11252 :             LWLockReportWaitStart(lock);
    1457             :             if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED())
    1458             :                 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
    1459             : 
    1460             :             for (;;)
    1461             :             {
    1462       11252 :                 PGSemaphoreLock(proc->sem);
    1463       11252 :                 if (proc->lwWaiting == LW_WS_NOT_WAITING)
    1464       11252 :                     break;
    1465           0 :                 extraWaits++;
    1466             :             }
    1467             : 
    1468             : #ifdef LOCK_DEBUG
    1469             :             {
    1470             :                 /* not waiting anymore */
    1471             :                 uint32      nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
    1472             : 
    1473             :                 Assert(nwaiters < MAX_BACKENDS);
    1474             :             }
    1475             : #endif
    1476             :             if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED())
    1477             :                 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
    1478       11252 :             LWLockReportWaitEnd();
    1479             : 
    1480             :             LOG_LWDEBUG("LWLockAcquireOrWait", lock, "awakened");
    1481             :         }
    1482             :         else
    1483             :         {
    1484             :             LOG_LWDEBUG("LWLockAcquireOrWait", lock, "acquired, undoing queue");
    1485             : 
    1486             :             /*
    1487             :              * Got lock in the second attempt, undo queueing. We need to treat
    1488             :              * this as having successfully acquired the lock, otherwise we'd
    1489             :              * not necessarily wake up people we've prevented from acquiring
    1490             :              * the lock.
    1491             :              */
    1492         342 :             LWLockDequeueSelf(lock);
    1493             :         }
    1494             :     }
    1495             : 
    1496             :     /*
    1497             :      * Fix the process wait semaphore's count for any absorbed wakeups.
    1498             :      */
    1499      282892 :     while (extraWaits-- > 0)
    1500           0 :         PGSemaphoreUnlock(proc->sem);
    1501             : 
    1502      282892 :     if (mustwait)
    1503             :     {
    1504             :         /* Failed to get lock, so release interrupt holdoff */
    1505       11252 :         RESUME_INTERRUPTS();
    1506             :         LOG_LWDEBUG("LWLockAcquireOrWait", lock, "failed");
    1507             :         if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL_ENABLED())
    1508             :             TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL(T_NAME(lock), mode);
    1509             :     }
    1510             :     else
    1511             :     {
    1512             :         LOG_LWDEBUG("LWLockAcquireOrWait", lock, "succeeded");
    1513             :         /* Add lock to list of locks held by this backend */
    1514      271640 :         held_lwlocks[num_held_lwlocks].lock = lock;
    1515      271640 :         held_lwlocks[num_held_lwlocks++].mode = mode;
    1516             :         if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_ENABLED())
    1517             :             TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT(T_NAME(lock), mode);
    1518             :     }
    1519             : 
    1520      282892 :     return !mustwait;
    1521             : }
    1522             : 
    1523             : /*
    1524             :  * Does the lwlock in its current state need to wait for the variable value to
    1525             :  * change?
    1526             :  *
    1527             :  * If we don't need to wait, and it's because the value of the variable has
    1528             :  * changed, store the current value in newval.
    1529             :  *
    1530             :  * *result is set to true if the lock was free, and false otherwise.
    1531             :  */
    1532             : static bool
    1533     7530152 : LWLockConflictsWithVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 oldval,
    1534             :                        uint64 *newval, bool *result)
    1535             : {
    1536             :     bool        mustwait;
    1537             :     uint64      value;
    1538             : 
    1539             :     /*
    1540             :      * Test first to see if it the slot is free right now.
    1541             :      *
    1542             :      * XXX: the unique caller of this routine, WaitXLogInsertionsToFinish()
    1543             :      * via LWLockWaitForVar(), uses an implied barrier with a spinlock before
    1544             :      * this, so we don't need a memory barrier here as far as the current
    1545             :      * usage is concerned.  But that might not be safe in general.
    1546             :      */
    1547     7530152 :     mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
    1548             : 
    1549     7530152 :     if (!mustwait)
    1550             :     {
    1551     5137056 :         *result = true;
    1552     5137056 :         return false;
    1553             :     }
    1554             : 
    1555     2393096 :     *result = false;
    1556             : 
    1557             :     /*
    1558             :      * Reading this value atomically is safe even on platforms where uint64
    1559             :      * cannot be read without observing a torn value.
    1560             :      */
    1561     2393096 :     value = pg_atomic_read_u64(valptr);
    1562             : 
    1563     2393096 :     if (value != oldval)
    1564             :     {
    1565     2096882 :         mustwait = false;
    1566     2096882 :         *newval = value;
    1567             :     }
    1568             :     else
    1569             :     {
    1570      296214 :         mustwait = true;
    1571             :     }
    1572             : 
    1573     2393096 :     return mustwait;
    1574             : }
    1575             : 
    1576             : /*
    1577             :  * LWLockWaitForVar - Wait until lock is free, or a variable is updated.
    1578             :  *
    1579             :  * If the lock is held and *valptr equals oldval, waits until the lock is
    1580             :  * either freed, or the lock holder updates *valptr by calling
    1581             :  * LWLockUpdateVar.  If the lock is free on exit (immediately or after
    1582             :  * waiting), returns true.  If the lock is still held, but *valptr no longer
    1583             :  * matches oldval, returns false and sets *newval to the current value in
    1584             :  * *valptr.
    1585             :  *
    1586             :  * Note: this function ignores shared lock holders; if the lock is held
    1587             :  * in shared mode, returns 'true'.
    1588             :  *
    1589             :  * Be aware that LWLockConflictsWithVar() does not include a memory barrier,
    1590             :  * hence the caller of this function may want to rely on an explicit barrier or
    1591             :  * an implied barrier via spinlock or LWLock to avoid memory ordering issues.
    1592             :  */
    1593             : bool
    1594     7233938 : LWLockWaitForVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 oldval,
    1595             :                  uint64 *newval)
    1596             : {
    1597     7233938 :     PGPROC     *proc = MyProc;
    1598     7233938 :     int         extraWaits = 0;
    1599     7233938 :     bool        result = false;
    1600             : #ifdef LWLOCK_STATS
    1601             :     lwlock_stats *lwstats;
    1602             : 
    1603             :     lwstats = get_lwlock_stats_entry(lock);
    1604             : #endif
    1605             : 
    1606             :     PRINT_LWDEBUG("LWLockWaitForVar", lock, LW_WAIT_UNTIL_FREE);
    1607             : 
    1608             :     /*
    1609             :      * Lock out cancel/die interrupts while we sleep on the lock.  There is no
    1610             :      * cleanup mechanism to remove us from the wait queue if we got
    1611             :      * interrupted.
    1612             :      */
    1613     7233938 :     HOLD_INTERRUPTS();
    1614             : 
    1615             :     /*
    1616             :      * Loop here to check the lock's status after each time we are signaled.
    1617             :      */
    1618             :     for (;;)
    1619      144738 :     {
    1620             :         bool        mustwait;
    1621             : 
    1622     7378676 :         mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
    1623             :                                           &result);
    1624             : 
    1625     7378676 :         if (!mustwait)
    1626     7227200 :             break;              /* the lock was free or value didn't match */
    1627             : 
    1628             :         /*
    1629             :          * Add myself to wait queue. Note that this is racy, somebody else
    1630             :          * could wakeup before we're finished queuing. NB: We're using nearly
    1631             :          * the same twice-in-a-row lock acquisition protocol as
    1632             :          * LWLockAcquire(). Check its comments for details. The only
    1633             :          * difference is that we also have to check the variable's values when
    1634             :          * checking the state of the lock.
    1635             :          */
    1636      151476 :         LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
    1637             : 
    1638             :         /*
    1639             :          * Set RELEASE_OK flag, to make sure we get woken up as soon as the
    1640             :          * lock is released.
    1641             :          */
    1642      151476 :         pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
    1643             : 
    1644             :         /*
    1645             :          * We're now guaranteed to be woken up if necessary. Recheck the lock
    1646             :          * and variables state.
    1647             :          */
    1648      151476 :         mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
    1649             :                                           &result);
    1650             : 
    1651             :         /* Ok, no conflict after we queued ourselves. Undo queueing. */
    1652      151476 :         if (!mustwait)
    1653             :         {
    1654             :             LOG_LWDEBUG("LWLockWaitForVar", lock, "free, undoing queue");
    1655             : 
    1656        6738 :             LWLockDequeueSelf(lock);
    1657        6738 :             break;
    1658             :         }
    1659             : 
    1660             :         /*
    1661             :          * Wait until awakened.
    1662             :          *
    1663             :          * It is possible that we get awakened for a reason other than being
    1664             :          * signaled by LWLockRelease.  If so, loop back and wait again.  Once
    1665             :          * we've gotten the LWLock, re-increment the sema by the number of
    1666             :          * additional signals received.
    1667             :          */
    1668             :         LOG_LWDEBUG("LWLockWaitForVar", lock, "waiting");
    1669             : 
    1670             : #ifdef LWLOCK_STATS
    1671             :         lwstats->block_count++;
    1672             : #endif
    1673             : 
    1674      144738 :         LWLockReportWaitStart(lock);
    1675             :         if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED())
    1676             :             TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), LW_EXCLUSIVE);
    1677             : 
    1678             :         for (;;)
    1679             :         {
    1680      144738 :             PGSemaphoreLock(proc->sem);
    1681      144738 :             if (proc->lwWaiting == LW_WS_NOT_WAITING)
    1682      144738 :                 break;
    1683           0 :             extraWaits++;
    1684             :         }
    1685             : 
    1686             : #ifdef LOCK_DEBUG
    1687             :         {
    1688             :             /* not waiting anymore */
    1689             :             uint32      nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
    1690             : 
    1691             :             Assert(nwaiters < MAX_BACKENDS);
    1692             :         }
    1693             : #endif
    1694             : 
    1695             :         if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED())
    1696             :             TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), LW_EXCLUSIVE);
    1697      144738 :         LWLockReportWaitEnd();
    1698             : 
    1699             :         LOG_LWDEBUG("LWLockWaitForVar", lock, "awakened");
    1700             : 
    1701             :         /* Now loop back and check the status of the lock again. */
    1702             :     }
    1703             : 
    1704             :     /*
    1705             :      * Fix the process wait semaphore's count for any absorbed wakeups.
    1706             :      */
    1707     7233938 :     while (extraWaits-- > 0)
    1708           0 :         PGSemaphoreUnlock(proc->sem);
    1709             : 
    1710             :     /*
    1711             :      * Now okay to allow cancel/die interrupts.
    1712             :      */
    1713     7233938 :     RESUME_INTERRUPTS();
    1714             : 
    1715     7233938 :     return result;
    1716             : }
    1717             : 
    1718             : 
    1719             : /*
    1720             :  * LWLockUpdateVar - Update a variable and wake up waiters atomically
    1721             :  *
    1722             :  * Sets *valptr to 'val', and wakes up all processes waiting for us with
    1723             :  * LWLockWaitForVar().  It first sets the value atomically and then wakes up
    1724             :  * waiting processes so that any process calling LWLockWaitForVar() on the same
    1725             :  * lock is guaranteed to see the new value, and act accordingly.
    1726             :  *
    1727             :  * The caller must be holding the lock in exclusive mode.
    1728             :  */
    1729             : void
    1730     5544998 : LWLockUpdateVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 val)
    1731             : {
    1732             :     proclist_head wakeup;
    1733             :     proclist_mutable_iter iter;
    1734             : 
    1735             :     PRINT_LWDEBUG("LWLockUpdateVar", lock, LW_EXCLUSIVE);
    1736             : 
    1737             :     /*
    1738             :      * Note that pg_atomic_exchange_u64 is a full barrier, so we're guaranteed
    1739             :      * that the variable is updated before waking up waiters.
    1740             :      */
    1741     5544998 :     pg_atomic_exchange_u64(valptr, val);
    1742             : 
    1743     5544998 :     proclist_init(&wakeup);
    1744             : 
    1745     5544998 :     LWLockWaitListLock(lock);
    1746             : 
    1747             :     Assert(pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE);
    1748             : 
    1749             :     /*
    1750             :      * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
    1751             :      * up. They are always in the front of the queue.
    1752             :      */
    1753     5548850 :     proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
    1754             :     {
    1755      102968 :         PGPROC     *waiter = GetPGProcByNumber(iter.cur);
    1756             : 
    1757      102968 :         if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
    1758       99116 :             break;
    1759             : 
    1760        3852 :         proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
    1761        3852 :         proclist_push_tail(&wakeup, iter.cur, lwWaitLink);
    1762             : 
    1763             :         /* see LWLockWakeup() */
    1764             :         Assert(waiter->lwWaiting == LW_WS_WAITING);
    1765        3852 :         waiter->lwWaiting = LW_WS_PENDING_WAKEUP;
    1766             :     }
    1767             : 
    1768             :     /* We are done updating shared state of the lock itself. */
    1769     5544998 :     LWLockWaitListUnlock(lock);
    1770             : 
    1771             :     /*
    1772             :      * Awaken any waiters I removed from the queue.
    1773             :      */
    1774     5548850 :     proclist_foreach_modify(iter, &wakeup, lwWaitLink)
    1775             :     {
    1776        3852 :         PGPROC     *waiter = GetPGProcByNumber(iter.cur);
    1777             : 
    1778        3852 :         proclist_delete(&wakeup, iter.cur, lwWaitLink);
    1779             :         /* check comment in LWLockWakeup() about this barrier */
    1780        3852 :         pg_write_barrier();
    1781        3852 :         waiter->lwWaiting = LW_WS_NOT_WAITING;
    1782        3852 :         PGSemaphoreUnlock(waiter->sem);
    1783             :     }
    1784     5544998 : }
    1785             : 
    1786             : 
    1787             : /*
    1788             :  * Stop treating lock as held by current backend.
    1789             :  *
    1790             :  * This is the code that can be shared between actually releasing a lock
    1791             :  * (LWLockRelease()) and just not tracking ownership of the lock anymore
    1792             :  * without releasing the lock (LWLockDisown()).
    1793             :  *
    1794             :  * Returns the mode in which the lock was held by the current backend.
    1795             :  *
    1796             :  * NB: This does not call RESUME_INTERRUPTS(), but leaves that responsibility
    1797             :  * of the caller.
    1798             :  *
    1799             :  * NB: This will leave lock->owner pointing to the current backend (if
    1800             :  * LOCK_DEBUG is set). This is somewhat intentional, as it makes it easier to
    1801             :  * debug cases of missing wakeups during lock release.
    1802             :  */
    1803             : static inline LWLockMode
    1804   748524186 : LWLockDisownInternal(LWLock *lock)
    1805             : {
    1806             :     LWLockMode  mode;
    1807             :     int         i;
    1808             : 
    1809             :     /*
    1810             :      * Remove lock from list of locks held.  Usually, but not always, it will
    1811             :      * be the latest-acquired lock; so search array backwards.
    1812             :      */
    1813   833916944 :     for (i = num_held_lwlocks; --i >= 0;)
    1814   833916944 :         if (lock == held_lwlocks[i].lock)
    1815   748524186 :             break;
    1816             : 
    1817   748524186 :     if (i < 0)
    1818           0 :         elog(ERROR, "lock %s is not held", T_NAME(lock));
    1819             : 
    1820   748524186 :     mode = held_lwlocks[i].mode;
    1821             : 
    1822   748524186 :     num_held_lwlocks--;
    1823   833916944 :     for (; i < num_held_lwlocks; i++)
    1824    85392758 :         held_lwlocks[i] = held_lwlocks[i + 1];
    1825             : 
    1826   748524186 :     return mode;
    1827             : }
    1828             : 
    1829             : /*
    1830             :  * Helper function to release lock, shared between LWLockRelease() and
    1831             :  * LWLockReleaseDisowned().
    1832             :  */
    1833             : static void
    1834   748524186 : LWLockReleaseInternal(LWLock *lock, LWLockMode mode)
    1835             : {
    1836             :     uint32      oldstate;
    1837             :     bool        check_waiters;
    1838             : 
    1839             :     /*
    1840             :      * Release my hold on lock, after that it can immediately be acquired by
    1841             :      * others, even if we still have to wakeup other waiters.
    1842             :      */
    1843   748524186 :     if (mode == LW_EXCLUSIVE)
    1844   469446246 :         oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_EXCLUSIVE);
    1845             :     else
    1846   279077940 :         oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_SHARED);
    1847             : 
    1848             :     /* nobody else can have that kind of lock */
    1849             :     Assert(!(oldstate & LW_VAL_EXCLUSIVE));
    1850             : 
    1851             :     if (TRACE_POSTGRESQL_LWLOCK_RELEASE_ENABLED())
    1852             :         TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock));
    1853             : 
    1854             :     /*
    1855             :      * We're still waiting for backends to get scheduled, don't wake them up
    1856             :      * again.
    1857             :      */
    1858   748524186 :     if ((oldstate & (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK)) ==
    1859     7579330 :         (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK) &&
    1860     7579330 :         (oldstate & LW_LOCK_MASK) == 0)
    1861     7542768 :         check_waiters = true;
    1862             :     else
    1863   740981418 :         check_waiters = false;
    1864             : 
    1865             :     /*
    1866             :      * As waking up waiters requires the spinlock to be acquired, only do so
    1867             :      * if necessary.
    1868             :      */
    1869   748524186 :     if (check_waiters)
    1870             :     {
    1871             :         /* XXX: remove before commit? */
    1872             :         LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters");
    1873     7542768 :         LWLockWakeup(lock);
    1874             :     }
    1875   748524186 : }
    1876             : 
    1877             : 
    1878             : /*
    1879             :  * Stop treating lock as held by current backend.
    1880             :  *
    1881             :  * After calling this function it's the callers responsibility to ensure that
    1882             :  * the lock gets released (via LWLockReleaseDisowned()), even in case of an
    1883             :  * error. This only is desirable if the lock is going to be released in a
    1884             :  * different process than the process that acquired it.
    1885             :  */
    1886             : void
    1887           0 : LWLockDisown(LWLock *lock)
    1888             : {
    1889           0 :     LWLockDisownInternal(lock);
    1890             : 
    1891           0 :     RESUME_INTERRUPTS();
    1892           0 : }
    1893             : 
    1894             : /*
    1895             :  * LWLockRelease - release a previously acquired lock
    1896             :  */
    1897             : void
    1898   748524186 : LWLockRelease(LWLock *lock)
    1899             : {
    1900             :     LWLockMode  mode;
    1901             : 
    1902   748524186 :     mode = LWLockDisownInternal(lock);
    1903             : 
    1904             :     PRINT_LWDEBUG("LWLockRelease", lock, mode);
    1905             : 
    1906   748524186 :     LWLockReleaseInternal(lock, mode);
    1907             : 
    1908             :     /*
    1909             :      * Now okay to allow cancel/die interrupts.
    1910             :      */
    1911   748524186 :     RESUME_INTERRUPTS();
    1912   748524186 : }
    1913             : 
    1914             : /*
    1915             :  * Release lock previously disowned with LWLockDisown().
    1916             :  */
    1917             : void
    1918           0 : LWLockReleaseDisowned(LWLock *lock, LWLockMode mode)
    1919             : {
    1920           0 :     LWLockReleaseInternal(lock, mode);
    1921           0 : }
    1922             : 
    1923             : /*
    1924             :  * LWLockReleaseClearVar - release a previously acquired lock, reset variable
    1925             :  */
    1926             : void
    1927    30847954 : LWLockReleaseClearVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 val)
    1928             : {
    1929             :     /*
    1930             :      * Note that pg_atomic_exchange_u64 is a full barrier, so we're guaranteed
    1931             :      * that the variable is updated before releasing the lock.
    1932             :      */
    1933    30847954 :     pg_atomic_exchange_u64(valptr, val);
    1934             : 
    1935    30847954 :     LWLockRelease(lock);
    1936    30847954 : }
    1937             : 
    1938             : 
    1939             : /*
    1940             :  * LWLockReleaseAll - release all currently-held locks
    1941             :  *
    1942             :  * Used to clean up after ereport(ERROR). An important difference between this
    1943             :  * function and retail LWLockRelease calls is that InterruptHoldoffCount is
    1944             :  * unchanged by this operation.  This is necessary since InterruptHoldoffCount
    1945             :  * has been set to an appropriate level earlier in error recovery. We could
    1946             :  * decrement it below zero if we allow it to drop for each released lock!
    1947             :  */
    1948             : void
    1949      116656 : LWLockReleaseAll(void)
    1950             : {
    1951      117054 :     while (num_held_lwlocks > 0)
    1952             :     {
    1953         398 :         HOLD_INTERRUPTS();      /* match the upcoming RESUME_INTERRUPTS */
    1954             : 
    1955         398 :         LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock);
    1956             :     }
    1957      116656 : }
    1958             : 
    1959             : 
    1960             : /*
    1961             :  * ForEachLWLockHeldByMe - run a callback for each held lock
    1962             :  *
    1963             :  * This is meant as debug support only.
    1964             :  */
    1965             : void
    1966           0 : ForEachLWLockHeldByMe(void (*callback) (LWLock *, LWLockMode, void *),
    1967             :                       void *context)
    1968             : {
    1969             :     int         i;
    1970             : 
    1971           0 :     for (i = 0; i < num_held_lwlocks; i++)
    1972           0 :         callback(held_lwlocks[i].lock, held_lwlocks[i].mode, context);
    1973           0 : }
    1974             : 
    1975             : /*
    1976             :  * LWLockHeldByMe - test whether my process holds a lock in any mode
    1977             :  *
    1978             :  * This is meant as debug support only.
    1979             :  */
    1980             : bool
    1981           0 : LWLockHeldByMe(LWLock *lock)
    1982             : {
    1983             :     int         i;
    1984             : 
    1985           0 :     for (i = 0; i < num_held_lwlocks; i++)
    1986             :     {
    1987           0 :         if (held_lwlocks[i].lock == lock)
    1988           0 :             return true;
    1989             :     }
    1990           0 :     return false;
    1991             : }
    1992             : 
    1993             : /*
    1994             :  * LWLockAnyHeldByMe - test whether my process holds any of an array of locks
    1995             :  *
    1996             :  * This is meant as debug support only.
    1997             :  */
    1998             : bool
    1999           0 : LWLockAnyHeldByMe(LWLock *lock, int nlocks, size_t stride)
    2000             : {
    2001             :     char       *held_lock_addr;
    2002             :     char       *begin;
    2003             :     char       *end;
    2004             :     int         i;
    2005             : 
    2006           0 :     begin = (char *) lock;
    2007           0 :     end = begin + nlocks * stride;
    2008           0 :     for (i = 0; i < num_held_lwlocks; i++)
    2009             :     {
    2010           0 :         held_lock_addr = (char *) held_lwlocks[i].lock;
    2011           0 :         if (held_lock_addr >= begin &&
    2012           0 :             held_lock_addr < end &&
    2013           0 :             (held_lock_addr - begin) % stride == 0)
    2014           0 :             return true;
    2015             :     }
    2016           0 :     return false;
    2017             : }
    2018             : 
    2019             : /*
    2020             :  * LWLockHeldByMeInMode - test whether my process holds a lock in given mode
    2021             :  *
    2022             :  * This is meant as debug support only.
    2023             :  */
    2024             : bool
    2025           0 : LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
    2026             : {
    2027             :     int         i;
    2028             : 
    2029           0 :     for (i = 0; i < num_held_lwlocks; i++)
    2030             :     {
    2031           0 :         if (held_lwlocks[i].lock == lock && held_lwlocks[i].mode == mode)
    2032           0 :             return true;
    2033             :     }
    2034           0 :     return false;
    2035             : }

Generated by: LCOV version 1.16