Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * lwlock.c
4 : * Lightweight lock manager
5 : *
6 : * Lightweight locks are intended primarily to provide mutual exclusion of
7 : * access to shared-memory data structures. Therefore, they offer both
8 : * exclusive and shared lock modes (to support read/write and read-only
9 : * access to a shared object). There are few other frammishes. User-level
10 : * locking should be done with the full lock manager --- which depends on
11 : * LWLocks to protect its shared state.
12 : *
13 : * In addition to exclusive and shared modes, lightweight locks can be used to
14 : * wait until a variable changes value. The variable is initially not set
15 : * when the lock is acquired with LWLockAcquire, i.e. it remains set to the
16 : * value it was set to when the lock was released last, and can be updated
17 : * without releasing the lock by calling LWLockUpdateVar. LWLockWaitForVar
18 : * waits for the variable to be updated, or until the lock is free. When
19 : * releasing the lock with LWLockReleaseClearVar() the value can be set to an
20 : * appropriate value for a free lock. The meaning of the variable is up to
21 : * the caller, the lightweight lock code just assigns and compares it.
22 : *
23 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
24 : * Portions Copyright (c) 1994, Regents of the University of California
25 : *
26 : * IDENTIFICATION
27 : * src/backend/storage/lmgr/lwlock.c
28 : *
29 : * NOTES:
30 : *
31 : * This used to be a pretty straight forward reader-writer lock
32 : * implementation, in which the internal state was protected by a
33 : * spinlock. Unfortunately the overhead of taking the spinlock proved to be
34 : * too high for workloads/locks that were taken in shared mode very
35 : * frequently. Often we were spinning in the (obviously exclusive) spinlock,
36 : * while trying to acquire a shared lock that was actually free.
37 : *
38 : * Thus a new implementation was devised that provides wait-free shared lock
39 : * acquisition for locks that aren't exclusively locked.
40 : *
41 : * The basic idea is to have a single atomic variable 'lockcount' instead of
42 : * the formerly separate shared and exclusive counters and to use atomic
43 : * operations to acquire the lock. That's fairly easy to do for plain
44 : * rw-spinlocks, but a lot harder for something like LWLocks that want to wait
45 : * in the OS.
46 : *
47 : * For lock acquisition we use an atomic compare-and-exchange on the lockcount
48 : * variable. For exclusive lock we swap in a sentinel value
49 : * (LW_VAL_EXCLUSIVE), for shared locks we count the number of holders.
50 : *
51 : * To release the lock we use an atomic decrement to release the lock. If the
52 : * new value is zero (we get that atomically), we know we can/have to release
53 : * waiters.
54 : *
55 : * Obviously it is important that the sentinel value for exclusive locks
56 : * doesn't conflict with the maximum number of possible share lockers -
57 : * luckily MAX_BACKENDS makes that easily possible.
58 : *
59 : *
60 : * The attentive reader might have noticed that naively doing the above has a
61 : * glaring race condition: We try to lock using the atomic operations and
62 : * notice that we have to wait. Unfortunately by the time we have finished
63 : * queuing, the former locker very well might have already finished its
64 : * work. That's problematic because we're now stuck waiting inside the OS.
65 :
66 : * To mitigate those races we use a two phased attempt at locking:
67 : * Phase 1: Try to do it atomically, if we succeed, nice
68 : * Phase 2: Add ourselves to the waitqueue of the lock
69 : * Phase 3: Try to grab the lock again, if we succeed, remove ourselves from
70 : * the queue
71 : * Phase 4: Sleep till wake-up, goto Phase 1
72 : *
73 : * This protects us against the problem from above as nobody can release too
74 : * quick, before we're queued, since after Phase 2 we're already queued.
75 : * -------------------------------------------------------------------------
76 : */
77 : #include "postgres.h"
78 :
79 : #include "miscadmin.h"
80 : #include "pg_trace.h"
81 : #include "pgstat.h"
82 : #include "port/pg_bitutils.h"
83 : #include "storage/proc.h"
84 : #include "storage/proclist.h"
85 : #include "storage/procnumber.h"
86 : #include "storage/spin.h"
87 : #include "storage/subsystems.h"
88 : #include "utils/memutils.h"
89 : #include "utils/wait_event.h"
90 :
91 : #ifdef LWLOCK_STATS
92 : #include "utils/hsearch.h"
93 : #endif
94 :
95 :
96 : #define LW_FLAG_HAS_WAITERS ((uint32) 1 << 31)
97 : #define LW_FLAG_WAKE_IN_PROGRESS ((uint32) 1 << 30)
98 : #define LW_FLAG_LOCKED ((uint32) 1 << 29)
99 : #define LW_FLAG_BITS 3
100 : #define LW_FLAG_MASK (((1<<LW_FLAG_BITS)-1)<<(32-LW_FLAG_BITS))
101 :
102 : /* assumes MAX_BACKENDS is a (power of 2) - 1, checked below */
103 : #define LW_VAL_EXCLUSIVE (MAX_BACKENDS + 1)
104 : #define LW_VAL_SHARED 1
105 :
106 : /* already (power of 2)-1, i.e. suitable for a mask */
107 : #define LW_SHARED_MASK MAX_BACKENDS
108 : #define LW_LOCK_MASK (MAX_BACKENDS | LW_VAL_EXCLUSIVE)
109 :
110 :
111 : StaticAssertDecl(((MAX_BACKENDS + 1) & MAX_BACKENDS) == 0,
112 : "MAX_BACKENDS + 1 needs to be a power of 2");
113 :
114 : StaticAssertDecl((MAX_BACKENDS & LW_FLAG_MASK) == 0,
115 : "MAX_BACKENDS and LW_FLAG_MASK overlap");
116 :
117 : StaticAssertDecl((LW_VAL_EXCLUSIVE & LW_FLAG_MASK) == 0,
118 : "LW_VAL_EXCLUSIVE and LW_FLAG_MASK overlap");
119 :
120 : /*
121 : * There are three sorts of LWLock "tranches":
122 : *
123 : * 1. The individually-named locks defined in lwlocklist.h each have their
124 : * own tranche. We absorb the names of these tranches from there into
125 : * BuiltinTrancheNames here.
126 : *
127 : * 2. There are some predefined tranches for built-in groups of locks defined
128 : * in lwlocklist.h. We absorb the names of these tranches, too.
129 : *
130 : * 3. Extensions can create new tranches, via either RequestNamedLWLockTranche
131 : * or LWLockNewTrancheId. These are stored in shared memory and can be
132 : * accessed via LWLockTranches.
133 : *
134 : * All these names are user-visible as wait event names, so choose with care
135 : * ... and do not forget to update the documentation's list of wait events.
136 : */
137 : static const char *const BuiltinTrancheNames[] = {
138 : #define PG_LWLOCK(id, lockname) [id] = CppAsString(lockname),
139 : #define PG_LWLOCKTRANCHE(id, lockname) [LWTRANCHE_##id] = CppAsString(lockname),
140 : #include "storage/lwlocklist.h"
141 : #undef PG_LWLOCK
142 : #undef PG_LWLOCKTRANCHE
143 : };
144 :
145 : StaticAssertDecl(lengthof(BuiltinTrancheNames) ==
146 : LWTRANCHE_FIRST_USER_DEFINED,
147 : "missing entries in BuiltinTrancheNames[]");
148 :
149 : /* Main array of LWLocks in shared memory */
150 : LWLockPadded *MainLWLockArray = NULL;
151 :
152 : /*
153 : * We use this structure to keep track of locked LWLocks for release
154 : * during error recovery. Normally, only a few will be held at once, but
155 : * occasionally the number can be much higher.
156 : */
157 : #define MAX_SIMUL_LWLOCKS 200
158 :
159 : /* struct representing the LWLocks we're holding */
160 : typedef struct LWLockHandle
161 : {
162 : LWLock *lock;
163 : LWLockMode mode;
164 : } LWLockHandle;
165 :
166 : static int num_held_lwlocks = 0;
167 : static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS];
168 :
169 : /* Maximum number of LWLock tranches that can be assigned by extensions */
170 : #define MAX_USER_DEFINED_TRANCHES 256
171 :
172 : /*
173 : * Shared memory structure holding user-defined tranches.
174 : */
175 : typedef struct LWLockTrancheShmemData
176 : {
177 : /* This is indexed by tranche ID minus LWTRANCHE_FIRST_USER_DEFINED */
178 : struct
179 : {
180 : char name[NAMEDATALEN];
181 :
182 : /*
183 : * Index of the tranche's locks in MainLWLockArray if this tranche was
184 : * allocated with RequestNamedLWLockTranche(), or -1 if the tranche
185 : * was allocated with LWLockNewTrancheId()
186 : */
187 : int main_array_idx;
188 : } user_defined[MAX_USER_DEFINED_TRANCHES];
189 :
190 : int num_user_defined; /* 'user_defined' entries in use */
191 :
192 : slock_t lock; /* protects the above */
193 : } LWLockTrancheShmemData;
194 :
195 : static LWLockTrancheShmemData *LWLockTranches;
196 :
197 : /* backend-local copy of LWLockTranches->num_user_defined */
198 : static int LocalNumUserDefinedTranches;
199 :
200 : /*
201 : * NamedLWLockTrancheRequests is a list of tranches requested with
202 : * RequestNamedLWLockTranche(). It is only valid in the postmaster; after
203 : * startup the tranches are tracked in LWLockTranches in shared memory.
204 : */
205 : typedef struct NamedLWLockTrancheRequest
206 : {
207 : char tranche_name[NAMEDATALEN];
208 : int num_lwlocks;
209 : } NamedLWLockTrancheRequest;
210 :
211 : static List *NamedLWLockTrancheRequests = NIL;
212 :
213 : /* Size of MainLWLockArray. Only valid in postmaster. */
214 : static int num_main_array_locks;
215 :
216 : static void LWLockShmemRequest(void *arg);
217 : static void LWLockShmemInit(void *arg);
218 :
219 : const ShmemCallbacks LWLockCallbacks = {
220 : .request_fn = LWLockShmemRequest,
221 : .init_fn = LWLockShmemInit,
222 : };
223 :
224 :
225 : static inline void LWLockReportWaitStart(LWLock *lock);
226 : static inline void LWLockReportWaitEnd(void);
227 : static const char *GetLWTrancheName(uint16 trancheId);
228 :
229 : #define T_NAME(lock) \
230 : GetLWTrancheName((lock)->tranche)
231 :
232 : #ifdef LWLOCK_STATS
233 : typedef struct lwlock_stats_key
234 : {
235 : int tranche;
236 : void *instance;
237 : } lwlock_stats_key;
238 :
239 : typedef struct lwlock_stats
240 : {
241 : lwlock_stats_key key;
242 : int sh_acquire_count;
243 : int ex_acquire_count;
244 : int block_count;
245 : int dequeue_self_count;
246 : int spin_delay_count;
247 : } lwlock_stats;
248 :
249 : static HTAB *lwlock_stats_htab;
250 : static lwlock_stats lwlock_stats_dummy;
251 : #endif
252 :
253 : #ifdef LOCK_DEBUG
254 : bool Trace_lwlocks = false;
255 :
256 : inline static void
257 : PRINT_LWDEBUG(const char *where, LWLock *lock, LWLockMode mode)
258 : {
259 : /* hide statement & context here, otherwise the log is just too verbose */
260 : if (Trace_lwlocks)
261 : {
262 : uint32 state = pg_atomic_read_u32(&lock->state);
263 :
264 : ereport(LOG,
265 : (errhidestmt(true),
266 : errhidecontext(true),
267 : errmsg_internal("%d: %s(%s %p): excl %u shared %u haswaiters %u waiters %u waking %d",
268 : MyProcPid,
269 : where, T_NAME(lock), lock,
270 : (state & LW_VAL_EXCLUSIVE) != 0,
271 : state & LW_SHARED_MASK,
272 : (state & LW_FLAG_HAS_WAITERS) != 0,
273 : pg_atomic_read_u32(&lock->nwaiters),
274 : (state & LW_FLAG_WAKE_IN_PROGRESS) != 0)));
275 : }
276 : }
277 :
278 : inline static void
279 : LOG_LWDEBUG(const char *where, LWLock *lock, const char *msg)
280 : {
281 : /* hide statement & context here, otherwise the log is just too verbose */
282 : if (Trace_lwlocks)
283 : {
284 : ereport(LOG,
285 : (errhidestmt(true),
286 : errhidecontext(true),
287 : errmsg_internal("%s(%s %p): %s", where,
288 : T_NAME(lock), lock, msg)));
289 : }
290 : }
291 :
292 : #else /* not LOCK_DEBUG */
293 : #define PRINT_LWDEBUG(a,b,c) ((void)0)
294 : #define LOG_LWDEBUG(a,b,c) ((void)0)
295 : #endif /* LOCK_DEBUG */
296 :
297 : #ifdef LWLOCK_STATS
298 :
299 : static void init_lwlock_stats(void);
300 : static void print_lwlock_stats(int code, Datum arg);
301 : static lwlock_stats * get_lwlock_stats_entry(LWLock *lock);
302 :
303 : static void
304 : init_lwlock_stats(void)
305 : {
306 : HASHCTL ctl;
307 : static MemoryContext lwlock_stats_cxt = NULL;
308 : static bool exit_registered = false;
309 :
310 : if (lwlock_stats_cxt != NULL)
311 : MemoryContextDelete(lwlock_stats_cxt);
312 :
313 : /*
314 : * The LWLock stats will be updated within a critical section, which
315 : * requires allocating new hash entries. Allocations within a critical
316 : * section are normally not allowed because running out of memory would
317 : * lead to a PANIC, but LWLOCK_STATS is debugging code that's not normally
318 : * turned on in production, so that's an acceptable risk. The hash entries
319 : * are small, so the risk of running out of memory is minimal in practice.
320 : */
321 : lwlock_stats_cxt = AllocSetContextCreate(TopMemoryContext,
322 : "LWLock stats",
323 : ALLOCSET_DEFAULT_SIZES);
324 : MemoryContextAllowInCriticalSection(lwlock_stats_cxt, true);
325 :
326 : ctl.keysize = sizeof(lwlock_stats_key);
327 : ctl.entrysize = sizeof(lwlock_stats);
328 : ctl.hcxt = lwlock_stats_cxt;
329 : lwlock_stats_htab = hash_create("lwlock stats", 16384, &ctl,
330 : HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
331 : if (!exit_registered)
332 : {
333 : on_shmem_exit(print_lwlock_stats, 0);
334 : exit_registered = true;
335 : }
336 : }
337 :
338 : static void
339 : print_lwlock_stats(int code, Datum arg)
340 : {
341 : HASH_SEQ_STATUS scan;
342 : lwlock_stats *lwstats;
343 :
344 : hash_seq_init(&scan, lwlock_stats_htab);
345 :
346 : /* Grab an LWLock to keep different backends from mixing reports */
347 : LWLockAcquire(&MainLWLockArray[0].lock, LW_EXCLUSIVE);
348 :
349 : while ((lwstats = (lwlock_stats *) hash_seq_search(&scan)) != NULL)
350 : {
351 : fprintf(stderr,
352 : "PID %d lwlock %s %p: shacq %u exacq %u blk %u spindelay %u dequeue self %u\n",
353 : MyProcPid, GetLWTrancheName(lwstats->key.tranche),
354 : lwstats->key.instance, lwstats->sh_acquire_count,
355 : lwstats->ex_acquire_count, lwstats->block_count,
356 : lwstats->spin_delay_count, lwstats->dequeue_self_count);
357 : }
358 :
359 : LWLockRelease(&MainLWLockArray[0].lock);
360 : }
361 :
362 : static lwlock_stats *
363 : get_lwlock_stats_entry(LWLock *lock)
364 : {
365 : lwlock_stats_key key;
366 : lwlock_stats *lwstats;
367 : bool found;
368 :
369 : /*
370 : * During shared memory initialization, the hash table doesn't exist yet.
371 : * Stats of that phase aren't very interesting, so just collect operations
372 : * on all locks in a single dummy entry.
373 : */
374 : if (lwlock_stats_htab == NULL)
375 : return &lwlock_stats_dummy;
376 :
377 : /* Fetch or create the entry. */
378 : MemSet(&key, 0, sizeof(key));
379 : key.tranche = lock->tranche;
380 : key.instance = lock;
381 : lwstats = hash_search(lwlock_stats_htab, &key, HASH_ENTER, &found);
382 : if (!found)
383 : {
384 : lwstats->sh_acquire_count = 0;
385 : lwstats->ex_acquire_count = 0;
386 : lwstats->block_count = 0;
387 : lwstats->dequeue_self_count = 0;
388 : lwstats->spin_delay_count = 0;
389 : }
390 : return lwstats;
391 : }
392 : #endif /* LWLOCK_STATS */
393 :
394 :
395 : /*
396 : * Compute number of LWLocks required by user-defined tranches requested with
397 : * RequestNamedLWLockTranche(). These will be allocated in the main array.
398 : */
399 : static int
400 1234 : NumLWLocksForNamedTranches(void)
401 : {
402 1234 : int numLocks = 0;
403 :
404 2470 : foreach_ptr(NamedLWLockTrancheRequest, request, NamedLWLockTrancheRequests)
405 : {
406 2 : numLocks += request->num_lwlocks;
407 : }
408 :
409 1234 : return numLocks;
410 : }
411 :
412 : /*
413 : * Request shmem space for user-defined tranches and the main LWLock array.
414 : */
415 : static void
416 1234 : LWLockShmemRequest(void *arg)
417 : {
418 : size_t size;
419 :
420 : /* Space for user-defined tranches */
421 1234 : ShmemRequestStruct(.name = "LWLock tranches",
422 : .size = sizeof(LWLockTrancheShmemData),
423 : .ptr = (void **) &LWLockTranches,
424 : );
425 :
426 : /* Space for the LWLock array */
427 1234 : if (!IsUnderPostmaster)
428 : {
429 1234 : num_main_array_locks = NUM_FIXED_LWLOCKS + NumLWLocksForNamedTranches();
430 1234 : size = num_main_array_locks * sizeof(LWLockPadded);
431 : }
432 : else
433 0 : size = SHMEM_ATTACH_UNKNOWN_SIZE;
434 :
435 1234 : ShmemRequestStruct(.name = "Main LWLock array",
436 : .size = size,
437 : .ptr = (void **) &MainLWLockArray,
438 : );
439 1234 : }
440 :
441 : /*
442 : * Initialize shmem space for user-defined tranches and the main LWLock array.
443 : */
444 : static void
445 1231 : LWLockShmemInit(void *arg)
446 : {
447 : int pos;
448 :
449 : /* Initialize the dynamic-allocation counter for tranches */
450 1231 : LWLockTranches->num_user_defined = 0;
451 :
452 1231 : SpinLockInit(&LWLockTranches->lock);
453 :
454 : /*
455 : * Allocate and initialize all LWLocks in the main array. It includes all
456 : * LWLocks for built-in tranches and those requested with
457 : * RequestNamedLWLockTranche().
458 : */
459 1231 : pos = 0;
460 :
461 : /* Initialize all individual LWLocks in main array */
462 71398 : for (int id = 0; id < NUM_INDIVIDUAL_LWLOCKS; id++)
463 70167 : LWLockInitialize(&MainLWLockArray[pos++].lock, id);
464 :
465 : /* Initialize buffer mapping LWLocks in main array */
466 : Assert(pos == BUFFER_MAPPING_LWLOCK_OFFSET);
467 158799 : for (int i = 0; i < NUM_BUFFER_PARTITIONS; i++)
468 157568 : LWLockInitialize(&MainLWLockArray[pos++].lock, LWTRANCHE_BUFFER_MAPPING);
469 :
470 : /* Initialize lmgrs' LWLocks in main array */
471 : Assert(pos == LOCK_MANAGER_LWLOCK_OFFSET);
472 20927 : for (int i = 0; i < NUM_LOCK_PARTITIONS; i++)
473 19696 : LWLockInitialize(&MainLWLockArray[pos++].lock, LWTRANCHE_LOCK_MANAGER);
474 :
475 : /* Initialize predicate lmgrs' LWLocks in main array */
476 : Assert(pos == PREDICATELOCK_MANAGER_LWLOCK_OFFSET);
477 20927 : for (int i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
478 19696 : LWLockInitialize(&MainLWLockArray[pos++].lock, LWTRANCHE_PREDICATE_LOCK_MANAGER);
479 :
480 : /*
481 : * Copy the info about any user-defined tranches into shared memory (so
482 : * that other processes can see it), and initialize the requested LWLocks.
483 : */
484 : Assert(pos == NUM_FIXED_LWLOCKS);
485 2464 : foreach_ptr(NamedLWLockTrancheRequest, request, NamedLWLockTrancheRequests)
486 : {
487 2 : int idx = (LWLockTranches->num_user_defined++);
488 :
489 2 : strlcpy(LWLockTranches->user_defined[idx].name,
490 2 : request->tranche_name,
491 : NAMEDATALEN);
492 2 : LWLockTranches->user_defined[idx].main_array_idx = pos;
493 :
494 13 : for (int i = 0; i < request->num_lwlocks; i++)
495 11 : LWLockInitialize(&MainLWLockArray[pos++].lock, LWTRANCHE_FIRST_USER_DEFINED + idx);
496 : }
497 :
498 : /* Cross-check that we agree on the total size with LWLockShmemRequest() */
499 : Assert(pos == num_main_array_locks);
500 1231 : }
501 :
502 : /*
503 : * InitLWLockAccess - initialize backend-local state needed to hold LWLocks
504 : */
505 : void
506 24775 : InitLWLockAccess(void)
507 : {
508 : #ifdef LWLOCK_STATS
509 : init_lwlock_stats();
510 : #endif
511 24775 : }
512 :
513 : /*
514 : * GetNamedLWLockTranche - returns the base address of LWLock from the
515 : * specified tranche.
516 : *
517 : * Caller needs to retrieve the requested number of LWLocks starting from
518 : * the base lock address returned by this API. This can be used for
519 : * tranches that are requested by using RequestNamedLWLockTranche() API.
520 : */
521 : LWLockPadded *
522 4 : GetNamedLWLockTranche(const char *tranche_name)
523 : {
524 4 : SpinLockAcquire(&LWLockTranches->lock);
525 4 : LocalNumUserDefinedTranches = LWLockTranches->num_user_defined;
526 4 : SpinLockRelease(&LWLockTranches->lock);
527 :
528 : /*
529 : * Obtain the position of base address of LWLock belonging to requested
530 : * tranche_name in MainLWLockArray. LWLocks for user-defined tranches
531 : * requested with RequestNamedLWLockTranche() are placed in
532 : * MainLWLockArray after fixed locks.
533 : */
534 13 : for (int i = 0; i < LocalNumUserDefinedTranches; i++)
535 : {
536 12 : if (strcmp(LWLockTranches->user_defined[i].name,
537 : tranche_name) == 0)
538 : {
539 3 : int lock_pos = LWLockTranches->user_defined[i].main_array_idx;
540 :
541 : /*
542 : * GetNamedLWLockTranche() should only be used for locks requested
543 : * with RequestNamedLWLockTranche(), not those allocated with
544 : * LWLockNewTrancheId().
545 : */
546 3 : if (lock_pos == -1)
547 1 : elog(ERROR, "requested tranche was not registered with RequestNamedLWLockTranche()");
548 2 : return &MainLWLockArray[lock_pos];
549 : }
550 : }
551 :
552 1 : elog(ERROR, "requested tranche is not registered");
553 :
554 : /* just to keep compiler quiet */
555 : return NULL;
556 : }
557 :
558 : /*
559 : * Allocate a new tranche ID with the provided name.
560 : */
561 : int
562 289 : LWLockNewTrancheId(const char *name)
563 : {
564 : int idx;
565 :
566 289 : if (!name)
567 1 : ereport(ERROR,
568 : (errcode(ERRCODE_INVALID_NAME),
569 : errmsg("tranche name cannot be NULL")));
570 :
571 288 : if (strlen(name) >= NAMEDATALEN)
572 1 : ereport(ERROR,
573 : (errcode(ERRCODE_NAME_TOO_LONG),
574 : errmsg("tranche name too long"),
575 : errdetail("LWLock tranche names must be no longer than %d bytes.",
576 : NAMEDATALEN - 1)));
577 :
578 : /* The counter and the tranche names are protected by the spinlock */
579 287 : SpinLockAcquire(&LWLockTranches->lock);
580 :
581 287 : if (LWLockTranches->num_user_defined >= MAX_USER_DEFINED_TRANCHES)
582 : {
583 1 : SpinLockRelease(&LWLockTranches->lock);
584 1 : ereport(ERROR,
585 : (errmsg("maximum number of tranches already registered"),
586 : errdetail("No more than %d tranches may be registered.",
587 : MAX_USER_DEFINED_TRANCHES)));
588 : }
589 :
590 : /* Allocate an entry in the user_defined array */
591 286 : idx = (LWLockTranches->num_user_defined)++;
592 :
593 : /* update our local copy while we're at it */
594 286 : LocalNumUserDefinedTranches = LWLockTranches->num_user_defined;
595 :
596 : /* Initialize it */
597 286 : strlcpy(LWLockTranches->user_defined[idx].name, name, NAMEDATALEN);
598 :
599 : /* the locks are not in the main array */
600 286 : LWLockTranches->user_defined[idx].main_array_idx = -1;
601 :
602 286 : SpinLockRelease(&LWLockTranches->lock);
603 :
604 286 : return LWTRANCHE_FIRST_USER_DEFINED + idx;
605 : }
606 :
607 : /*
608 : * RequestNamedLWLockTranche
609 : * Request that extra LWLocks be allocated during postmaster
610 : * startup.
611 : *
612 : * This may only be called via the shmem_request_hook of a library that is
613 : * loaded into the postmaster via shared_preload_libraries. Calls from
614 : * elsewhere will fail.
615 : *
616 : * The tranche name will be user-visible as a wait event name, so try to
617 : * use a name that fits the style for those.
618 : */
619 : void
620 2 : RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks)
621 : {
622 : NamedLWLockTrancheRequest *request;
623 : MemoryContext oldcontext;
624 :
625 2 : if (!process_shmem_requests_in_progress)
626 0 : elog(FATAL, "cannot request additional LWLocks outside shmem_request_hook");
627 :
628 2 : if (!tranche_name)
629 0 : ereport(ERROR,
630 : (errcode(ERRCODE_INVALID_NAME),
631 : errmsg("tranche name cannot be NULL")));
632 :
633 2 : if (strlen(tranche_name) >= NAMEDATALEN)
634 0 : ereport(ERROR,
635 : (errcode(ERRCODE_NAME_TOO_LONG),
636 : errmsg("tranche name too long"),
637 : errdetail("LWLock tranche names must be no longer than %d bytes.",
638 : NAMEDATALEN - 1)));
639 :
640 2 : if (list_length(NamedLWLockTrancheRequests) >= MAX_USER_DEFINED_TRANCHES)
641 0 : ereport(ERROR,
642 : (errmsg("maximum number of tranches already registered"),
643 : errdetail("No more than %d tranches may be registered.",
644 : MAX_USER_DEFINED_TRANCHES)));
645 :
646 : /* Check that the name isn't already in use */
647 5 : foreach_ptr(NamedLWLockTrancheRequest, existing, NamedLWLockTrancheRequests)
648 : {
649 1 : if (strcmp(existing->tranche_name, tranche_name) == 0)
650 0 : elog(ERROR, "requested tranche \"%s\" is already registered", tranche_name);
651 : }
652 :
653 2 : if (IsPostmasterEnvironment)
654 2 : oldcontext = MemoryContextSwitchTo(PostmasterContext);
655 : else
656 0 : oldcontext = MemoryContextSwitchTo(TopMemoryContext);
657 :
658 2 : request = palloc0(sizeof(NamedLWLockTrancheRequest));
659 2 : strlcpy(request->tranche_name, tranche_name, NAMEDATALEN);
660 2 : request->num_lwlocks = num_lwlocks;
661 2 : NamedLWLockTrancheRequests = lappend(NamedLWLockTrancheRequests, request);
662 :
663 2 : MemoryContextSwitchTo(oldcontext);
664 2 : }
665 :
666 : /*
667 : * LWLockInitialize - initialize a new lwlock; it's initially unlocked
668 : */
669 : void
670 2557374 : LWLockInitialize(LWLock *lock, int tranche_id)
671 : {
672 : /* verify the tranche_id is valid */
673 2557374 : (void) GetLWTrancheName(tranche_id);
674 :
675 2557373 : pg_atomic_init_u32(&lock->state, 0);
676 : #ifdef LOCK_DEBUG
677 : pg_atomic_init_u32(&lock->nwaiters, 0);
678 : #endif
679 2557373 : lock->tranche = tranche_id;
680 2557373 : proclist_init(&lock->waiters);
681 2557373 : }
682 :
683 : /*
684 : * Report start of wait event for light-weight locks.
685 : *
686 : * This function will be used by all the light-weight lock calls which
687 : * needs to wait to acquire the lock. This function distinguishes wait
688 : * event based on tranche and lock id.
689 : */
690 : static inline void
691 5603543 : LWLockReportWaitStart(LWLock *lock)
692 : {
693 5603543 : pgstat_report_wait_start(PG_WAIT_LWLOCK | lock->tranche);
694 5603543 : }
695 :
696 : /*
697 : * Report end of wait event for light-weight locks.
698 : */
699 : static inline void
700 5603543 : LWLockReportWaitEnd(void)
701 : {
702 5603543 : pgstat_report_wait_end();
703 5603543 : }
704 :
705 : /*
706 : * Return the name of an LWLock tranche.
707 : */
708 : static const char *
709 2557399 : GetLWTrancheName(uint16 trancheId)
710 : {
711 : int idx;
712 :
713 : /* Built-in tranche or individual LWLock? */
714 2557399 : if (trancheId < LWTRANCHE_FIRST_USER_DEFINED)
715 2556172 : return BuiltinTrancheNames[trancheId];
716 :
717 : /*
718 : * It's an extension tranche, so look in LWLockTranches->user_defined.
719 : */
720 1227 : idx = trancheId - LWTRANCHE_FIRST_USER_DEFINED;
721 :
722 : /*
723 : * We only ever add new entries to LWLockTranches->user_defined, so most
724 : * lookups can avoid taking the spinlock as long as the backend-local
725 : * counter (LocalNumUserDefinedTranches) is greater than the requested
726 : * tranche ID. Else, we need to first update the backend-local counter
727 : * with the spinlock held before attempting the lookup again. In
728 : * practice, the latter case is probably rare.
729 : */
730 1227 : if (idx >= LocalNumUserDefinedTranches)
731 : {
732 3 : SpinLockAcquire(&LWLockTranches->lock);
733 3 : LocalNumUserDefinedTranches = LWLockTranches->num_user_defined;
734 3 : SpinLockRelease(&LWLockTranches->lock);
735 :
736 3 : if (idx >= LocalNumUserDefinedTranches)
737 1 : elog(ERROR, "tranche %d is not registered", trancheId);
738 : }
739 :
740 1226 : return LWLockTranches->user_defined[idx].name;
741 : }
742 :
743 : /*
744 : * Return an identifier for an LWLock based on the wait class and event.
745 : */
746 : const char *
747 25 : GetLWLockIdentifier(uint32 classId, uint16 eventId)
748 : {
749 : Assert(classId == PG_WAIT_LWLOCK);
750 : /* The event IDs are just tranche numbers. */
751 25 : return GetLWTrancheName(eventId);
752 : }
753 :
754 : /*
755 : * Internal function that tries to atomically acquire the lwlock in the passed
756 : * in mode.
757 : *
758 : * This function will not block waiting for a lock to become free - that's the
759 : * caller's job.
760 : *
761 : * Returns true if the lock isn't free and we need to wait.
762 : */
763 : static bool
764 360140799 : LWLockAttemptLock(LWLock *lock, LWLockMode mode)
765 : {
766 : uint32 old_state;
767 :
768 : Assert(mode == LW_EXCLUSIVE || mode == LW_SHARED);
769 :
770 : /*
771 : * Read once outside the loop, later iterations will get the newer value
772 : * via compare & exchange.
773 : */
774 360140799 : old_state = pg_atomic_read_u32(&lock->state);
775 :
776 : /* loop until we've determined whether we could acquire the lock or not */
777 : while (true)
778 305169 : {
779 : uint32 desired_state;
780 : bool lock_free;
781 :
782 360445968 : desired_state = old_state;
783 :
784 360445968 : if (mode == LW_EXCLUSIVE)
785 : {
786 242654668 : lock_free = (old_state & LW_LOCK_MASK) == 0;
787 242654668 : if (lock_free)
788 240813617 : desired_state += LW_VAL_EXCLUSIVE;
789 : }
790 : else
791 : {
792 117791300 : lock_free = (old_state & LW_VAL_EXCLUSIVE) == 0;
793 117791300 : if (lock_free)
794 108431358 : desired_state += LW_VAL_SHARED;
795 : }
796 :
797 : /*
798 : * Attempt to swap in the state we are expecting. If we didn't see
799 : * lock to be free, that's just the old value. If we saw it as free,
800 : * we'll attempt to mark it acquired. The reason that we always swap
801 : * in the value is that this doubles as a memory barrier. We could try
802 : * to be smarter and only swap in values if we saw the lock as free,
803 : * but benchmark haven't shown it as beneficial so far.
804 : *
805 : * Retry if the value changed since we last looked at it.
806 : */
807 360445968 : if (pg_atomic_compare_exchange_u32(&lock->state,
808 : &old_state, desired_state))
809 : {
810 360140799 : if (lock_free)
811 : {
812 : /* Great! Got the lock. */
813 : #ifdef LOCK_DEBUG
814 : if (mode == LW_EXCLUSIVE)
815 : lock->owner = MyProc;
816 : #endif
817 349059477 : return false;
818 : }
819 : else
820 11081322 : return true; /* somebody else has the lock */
821 : }
822 : }
823 : pg_unreachable();
824 : }
825 :
826 : /*
827 : * Lock the LWLock's wait list against concurrent activity.
828 : *
829 : * NB: even though the wait list is locked, non-conflicting lock operations
830 : * may still happen concurrently.
831 : *
832 : * Time spent holding mutex should be short!
833 : */
834 : static void
835 13671965 : LWLockWaitListLock(LWLock *lock)
836 : {
837 : uint32 old_state;
838 : #ifdef LWLOCK_STATS
839 : lwlock_stats *lwstats;
840 : uint32 delays = 0;
841 :
842 : lwstats = get_lwlock_stats_entry(lock);
843 : #endif
844 :
845 : while (true)
846 : {
847 : /*
848 : * Always try once to acquire the lock directly, without setting up
849 : * the spin-delay infrastructure. The work necessary for that shows up
850 : * in profiles and is rarely necessary.
851 : */
852 13723768 : old_state = pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_LOCKED);
853 13723768 : if (likely(!(old_state & LW_FLAG_LOCKED)))
854 13671965 : break; /* got lock */
855 :
856 : /* and then spin without atomic operations until lock is released */
857 : {
858 : SpinDelayStatus delayStatus;
859 :
860 51803 : init_local_spin_delay(&delayStatus);
861 :
862 235156 : while (old_state & LW_FLAG_LOCKED)
863 : {
864 183353 : perform_spin_delay(&delayStatus);
865 183353 : old_state = pg_atomic_read_u32(&lock->state);
866 : }
867 : #ifdef LWLOCK_STATS
868 : delays += delayStatus.delays;
869 : #endif
870 51803 : finish_spin_delay(&delayStatus);
871 : }
872 :
873 : /*
874 : * Retry. The lock might obviously already be re-acquired by the time
875 : * we're attempting to get it again.
876 : */
877 : }
878 :
879 : #ifdef LWLOCK_STATS
880 : lwstats->spin_delay_count += delays;
881 : #endif
882 13671965 : }
883 :
884 : /*
885 : * Unlock the LWLock's wait list.
886 : *
887 : * Note that it can be more efficient to manipulate flags and release the
888 : * locks in a single atomic operation.
889 : */
890 : static void
891 8523169 : LWLockWaitListUnlock(LWLock *lock)
892 : {
893 : uint32 old_state PG_USED_FOR_ASSERTS_ONLY;
894 :
895 8523169 : old_state = pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_LOCKED);
896 :
897 : Assert(old_state & LW_FLAG_LOCKED);
898 8523169 : }
899 :
900 : /*
901 : * Wakeup all the lockers that currently have a chance to acquire the lock.
902 : */
903 : static void
904 5148796 : LWLockWakeup(LWLock *lock)
905 : {
906 5148796 : bool new_wake_in_progress = false;
907 5148796 : bool wokeup_somebody = false;
908 : proclist_head wakeup;
909 : proclist_mutable_iter iter;
910 :
911 5148796 : proclist_init(&wakeup);
912 :
913 : /* lock wait list while collecting backends to wake up */
914 5148796 : LWLockWaitListLock(lock);
915 :
916 9931777 : proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
917 : {
918 5650079 : PGPROC *waiter = GetPGProcByNumber(iter.cur);
919 :
920 5650079 : if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
921 26962 : continue;
922 :
923 5623117 : proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
924 5623117 : proclist_push_tail(&wakeup, iter.cur, lwWaitLink);
925 :
926 5623117 : if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
927 : {
928 : /*
929 : * Prevent additional wakeups until retryer gets to run. Backends
930 : * that are just waiting for the lock to become free don't retry
931 : * automatically.
932 : */
933 5511886 : new_wake_in_progress = true;
934 :
935 : /*
936 : * Don't wakeup (further) exclusive locks.
937 : */
938 5511886 : wokeup_somebody = true;
939 : }
940 :
941 : /*
942 : * Signal that the process isn't on the wait list anymore. This allows
943 : * LWLockDequeueSelf() to remove itself of the waitlist with a
944 : * proclist_delete(), rather than having to check if it has been
945 : * removed from the list.
946 : */
947 : Assert(waiter->lwWaiting == LW_WS_WAITING);
948 5623117 : waiter->lwWaiting = LW_WS_PENDING_WAKEUP;
949 :
950 : /*
951 : * Once we've woken up an exclusive lock, there's no point in waking
952 : * up anybody else.
953 : */
954 5623117 : if (waiter->lwWaitMode == LW_EXCLUSIVE)
955 867098 : break;
956 : }
957 :
958 : Assert(proclist_is_empty(&wakeup) || pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS);
959 :
960 : /* unset required flags, and release lock, in one fell swoop */
961 : {
962 : uint32 old_state;
963 : uint32 desired_state;
964 :
965 5148796 : old_state = pg_atomic_read_u32(&lock->state);
966 : while (true)
967 : {
968 5156667 : desired_state = old_state;
969 :
970 : /* compute desired flags */
971 :
972 5156667 : if (new_wake_in_progress)
973 5089402 : desired_state |= LW_FLAG_WAKE_IN_PROGRESS;
974 : else
975 67265 : desired_state &= ~LW_FLAG_WAKE_IN_PROGRESS;
976 :
977 5156667 : if (proclist_is_empty(&lock->waiters))
978 5075191 : desired_state &= ~LW_FLAG_HAS_WAITERS;
979 :
980 5156667 : desired_state &= ~LW_FLAG_LOCKED; /* release lock */
981 :
982 5156667 : if (pg_atomic_compare_exchange_u32(&lock->state, &old_state,
983 : desired_state))
984 5148796 : break;
985 : }
986 : }
987 :
988 : /* Awaken any waiters I removed from the queue. */
989 10771913 : proclist_foreach_modify(iter, &wakeup, lwWaitLink)
990 : {
991 5623117 : PGPROC *waiter = GetPGProcByNumber(iter.cur);
992 :
993 : LOG_LWDEBUG("LWLockRelease", lock, "release waiter");
994 5623117 : proclist_delete(&wakeup, iter.cur, lwWaitLink);
995 :
996 : /*
997 : * Guarantee that lwWaiting being unset only becomes visible once the
998 : * unlink from the link has completed. Otherwise the target backend
999 : * could be woken up for other reason and enqueue for a new lock - if
1000 : * that happens before the list unlink happens, the list would end up
1001 : * being corrupted.
1002 : *
1003 : * The barrier pairs with the LWLockWaitListLock() when enqueuing for
1004 : * another lock.
1005 : */
1006 5623117 : pg_write_barrier();
1007 5623117 : waiter->lwWaiting = LW_WS_NOT_WAITING;
1008 5623117 : PGSemaphoreUnlock(waiter->sem);
1009 : }
1010 5148796 : }
1011 :
1012 : /*
1013 : * Add ourselves to the end of the queue.
1014 : *
1015 : * NB: Mode can be LW_WAIT_UNTIL_FREE here!
1016 : */
1017 : static void
1018 5691903 : LWLockQueueSelf(LWLock *lock, LWLockMode mode)
1019 : {
1020 : /*
1021 : * If we don't have a PGPROC structure, there's no way to wait. This
1022 : * should never occur, since MyProc should only be null during shared
1023 : * memory initialization.
1024 : */
1025 5691903 : if (MyProc == NULL)
1026 0 : elog(PANIC, "cannot wait without a PGPROC structure");
1027 :
1028 5691903 : if (MyProc->lwWaiting != LW_WS_NOT_WAITING)
1029 0 : elog(PANIC, "queueing for lock while waiting on another one");
1030 :
1031 5691903 : LWLockWaitListLock(lock);
1032 :
1033 : /* setting the flag is protected by the spinlock */
1034 5691903 : pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_HAS_WAITERS);
1035 :
1036 5691903 : MyProc->lwWaiting = LW_WS_WAITING;
1037 5691903 : MyProc->lwWaitMode = mode;
1038 :
1039 : /* LW_WAIT_UNTIL_FREE waiters are always at the front of the queue */
1040 5691903 : if (mode == LW_WAIT_UNTIL_FREE)
1041 113556 : proclist_push_head(&lock->waiters, MyProcNumber, lwWaitLink);
1042 : else
1043 5578347 : proclist_push_tail(&lock->waiters, MyProcNumber, lwWaitLink);
1044 :
1045 : /* Can release the mutex now */
1046 5691903 : LWLockWaitListUnlock(lock);
1047 :
1048 : #ifdef LOCK_DEBUG
1049 : pg_atomic_fetch_add_u32(&lock->nwaiters, 1);
1050 : #endif
1051 5691903 : }
1052 :
1053 : /*
1054 : * Remove ourselves from the waitlist.
1055 : *
1056 : * This is used if we queued ourselves because we thought we needed to sleep
1057 : * but, after further checking, we discovered that we don't actually need to
1058 : * do so.
1059 : */
1060 : static void
1061 88360 : LWLockDequeueSelf(LWLock *lock)
1062 : {
1063 : bool on_waitlist;
1064 :
1065 : #ifdef LWLOCK_STATS
1066 : lwlock_stats *lwstats;
1067 :
1068 : lwstats = get_lwlock_stats_entry(lock);
1069 :
1070 : lwstats->dequeue_self_count++;
1071 : #endif
1072 :
1073 88360 : LWLockWaitListLock(lock);
1074 :
1075 : /*
1076 : * Remove ourselves from the waitlist, unless we've already been removed.
1077 : * The removal happens with the wait list lock held, so there's no race in
1078 : * this check.
1079 : */
1080 88360 : on_waitlist = MyProc->lwWaiting == LW_WS_WAITING;
1081 88360 : if (on_waitlist)
1082 68000 : proclist_delete(&lock->waiters, MyProcNumber, lwWaitLink);
1083 :
1084 88360 : if (proclist_is_empty(&lock->waiters) &&
1085 82108 : (pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS) != 0)
1086 : {
1087 64893 : pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS);
1088 : }
1089 :
1090 : /* XXX: combine with fetch_and above? */
1091 88360 : LWLockWaitListUnlock(lock);
1092 :
1093 : /* clear waiting state again, nice for debugging */
1094 88360 : if (on_waitlist)
1095 68000 : MyProc->lwWaiting = LW_WS_NOT_WAITING;
1096 : else
1097 : {
1098 20360 : int extraWaits = 0;
1099 :
1100 : /*
1101 : * Somebody else dequeued us and has or will wake us up. Deal with the
1102 : * superfluous absorption of a wakeup.
1103 : */
1104 :
1105 : /*
1106 : * Clear LW_FLAG_WAKE_IN_PROGRESS if somebody woke us before we
1107 : * removed ourselves - they'll have set it.
1108 : */
1109 20360 : pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_WAKE_IN_PROGRESS);
1110 :
1111 : /*
1112 : * Now wait for the scheduled wakeup, otherwise our ->lwWaiting would
1113 : * get reset at some inconvenient point later. Most of the time this
1114 : * will immediately return.
1115 : */
1116 : for (;;)
1117 : {
1118 20360 : PGSemaphoreLock(MyProc->sem);
1119 20360 : if (MyProc->lwWaiting == LW_WS_NOT_WAITING)
1120 20360 : break;
1121 0 : extraWaits++;
1122 : }
1123 :
1124 : /*
1125 : * Fix the process wait semaphore's count for any absorbed wakeups.
1126 : */
1127 20360 : while (extraWaits-- > 0)
1128 0 : PGSemaphoreUnlock(MyProc->sem);
1129 : }
1130 :
1131 : #ifdef LOCK_DEBUG
1132 : {
1133 : /* not waiting anymore */
1134 : uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1135 :
1136 : Assert(nwaiters < MAX_BACKENDS);
1137 : }
1138 : #endif
1139 88360 : }
1140 :
1141 : /*
1142 : * LWLockAcquire - acquire a lightweight lock in the specified mode
1143 : *
1144 : * If the lock is not available, sleep until it is. Returns true if the lock
1145 : * was available immediately, false if we had to sleep.
1146 : *
1147 : * Side effect: cancel/die interrupts are held off until lock release.
1148 : */
1149 : bool
1150 347427082 : LWLockAcquire(LWLock *lock, LWLockMode mode)
1151 : {
1152 347427082 : PGPROC *proc = MyProc;
1153 347427082 : bool result = true;
1154 347427082 : int extraWaits = 0;
1155 : #ifdef LWLOCK_STATS
1156 : lwlock_stats *lwstats;
1157 :
1158 : lwstats = get_lwlock_stats_entry(lock);
1159 : #endif
1160 :
1161 : Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1162 :
1163 : PRINT_LWDEBUG("LWLockAcquire", lock, mode);
1164 :
1165 : #ifdef LWLOCK_STATS
1166 : /* Count lock acquisition attempts */
1167 : if (mode == LW_EXCLUSIVE)
1168 : lwstats->ex_acquire_count++;
1169 : else
1170 : lwstats->sh_acquire_count++;
1171 : #endif /* LWLOCK_STATS */
1172 :
1173 : /*
1174 : * We can't wait if we haven't got a PGPROC. This should only occur
1175 : * during bootstrap or shared memory initialization. Put an Assert here
1176 : * to catch unsafe coding practices.
1177 : */
1178 : Assert(!(proc == NULL && IsUnderPostmaster));
1179 :
1180 : /* Ensure we will have room to remember the lock */
1181 347427082 : if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1182 0 : elog(ERROR, "too many LWLocks taken");
1183 :
1184 : /*
1185 : * Lock out cancel/die interrupts until we exit the code section protected
1186 : * by the LWLock. This ensures that interrupts will not interfere with
1187 : * manipulations of data structures in shared memory.
1188 : */
1189 347427082 : HOLD_INTERRUPTS();
1190 :
1191 : /*
1192 : * Loop here to try to acquire lock after each time we are signaled by
1193 : * LWLockRelease.
1194 : *
1195 : * NOTE: it might seem better to have LWLockRelease actually grant us the
1196 : * lock, rather than retrying and possibly having to go back to sleep. But
1197 : * in practice that is no good because it means a process swap for every
1198 : * lock acquisition when two or more processes are contending for the same
1199 : * lock. Since LWLocks are normally used to protect not-very-long
1200 : * sections of computation, a process needs to be able to acquire and
1201 : * release the same lock many times during a single CPU time slice, even
1202 : * in the presence of contention. The efficiency of being able to do that
1203 : * outweighs the inefficiency of sometimes wasting a process dispatch
1204 : * cycle because the lock is not free when a released waiter finally gets
1205 : * to run. See pgsql-hackers archives for 29-Dec-01.
1206 : */
1207 : for (;;)
1208 5491776 : {
1209 : bool mustwait;
1210 :
1211 : /*
1212 : * Try to grab the lock the first time, we're not in the waitqueue
1213 : * yet/anymore.
1214 : */
1215 352918858 : mustwait = LWLockAttemptLock(lock, mode);
1216 :
1217 352918858 : if (!mustwait)
1218 : {
1219 : LOG_LWDEBUG("LWLockAcquire", lock, "immediately acquired lock");
1220 347340511 : break; /* got the lock */
1221 : }
1222 :
1223 : /*
1224 : * Ok, at this point we couldn't grab the lock on the first try. We
1225 : * cannot simply queue ourselves to the end of the list and wait to be
1226 : * woken up because by now the lock could long have been released.
1227 : * Instead add us to the queue and try to grab the lock again. If we
1228 : * succeed we need to revert the queuing and be happy, otherwise we
1229 : * recheck the lock. If we still couldn't grab it, we know that the
1230 : * other locker will see our queue entries when releasing since they
1231 : * existed before we checked for the lock.
1232 : */
1233 :
1234 : /* add to the queue */
1235 5578347 : LWLockQueueSelf(lock, mode);
1236 :
1237 : /* we're now guaranteed to be woken up if necessary */
1238 5578347 : mustwait = LWLockAttemptLock(lock, mode);
1239 :
1240 : /* ok, grabbed the lock the second time round, need to undo queueing */
1241 5578347 : if (!mustwait)
1242 : {
1243 : LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue");
1244 :
1245 86571 : LWLockDequeueSelf(lock);
1246 86571 : break;
1247 : }
1248 :
1249 : /*
1250 : * Wait until awakened.
1251 : *
1252 : * It is possible that we get awakened for a reason other than being
1253 : * signaled by LWLockRelease. If so, loop back and wait again. Once
1254 : * we've gotten the LWLock, re-increment the sema by the number of
1255 : * additional signals received.
1256 : */
1257 : LOG_LWDEBUG("LWLockAcquire", lock, "waiting");
1258 :
1259 : #ifdef LWLOCK_STATS
1260 : lwstats->block_count++;
1261 : #endif
1262 :
1263 5491776 : LWLockReportWaitStart(lock);
1264 : if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED())
1265 : TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
1266 :
1267 : for (;;)
1268 : {
1269 5491776 : PGSemaphoreLock(proc->sem);
1270 5491776 : if (proc->lwWaiting == LW_WS_NOT_WAITING)
1271 5491776 : break;
1272 0 : extraWaits++;
1273 : }
1274 :
1275 : /* Retrying, allow LWLockRelease to release waiters again. */
1276 5491776 : pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_WAKE_IN_PROGRESS);
1277 :
1278 : #ifdef LOCK_DEBUG
1279 : {
1280 : /* not waiting anymore */
1281 : uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1282 :
1283 : Assert(nwaiters < MAX_BACKENDS);
1284 : }
1285 : #endif
1286 :
1287 : if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED())
1288 : TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
1289 5491776 : LWLockReportWaitEnd();
1290 :
1291 : LOG_LWDEBUG("LWLockAcquire", lock, "awakened");
1292 :
1293 : /* Now loop back and try to acquire lock again. */
1294 5491776 : result = false;
1295 : }
1296 :
1297 : if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_ENABLED())
1298 : TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), mode);
1299 :
1300 : /* Add lock to list of locks held by this backend */
1301 347427082 : held_lwlocks[num_held_lwlocks].lock = lock;
1302 347427082 : held_lwlocks[num_held_lwlocks++].mode = mode;
1303 :
1304 : /*
1305 : * Fix the process wait semaphore's count for any absorbed wakeups.
1306 : */
1307 347427082 : while (extraWaits-- > 0)
1308 0 : PGSemaphoreUnlock(proc->sem);
1309 :
1310 347427082 : return result;
1311 : }
1312 :
1313 : /*
1314 : * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
1315 : *
1316 : * If the lock is not available, return false with no side-effects.
1317 : *
1318 : * If successful, cancel/die interrupts are held off until lock release.
1319 : */
1320 : bool
1321 1471528 : LWLockConditionalAcquire(LWLock *lock, LWLockMode mode)
1322 : {
1323 : bool mustwait;
1324 :
1325 : Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1326 :
1327 : PRINT_LWDEBUG("LWLockConditionalAcquire", lock, mode);
1328 :
1329 : /* Ensure we will have room to remember the lock */
1330 1471528 : if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1331 0 : elog(ERROR, "too many LWLocks taken");
1332 :
1333 : /*
1334 : * Lock out cancel/die interrupts until we exit the code section protected
1335 : * by the LWLock. This ensures that interrupts will not interfere with
1336 : * manipulations of data structures in shared memory.
1337 : */
1338 1471528 : HOLD_INTERRUPTS();
1339 :
1340 : /* Check for the lock */
1341 1471528 : mustwait = LWLockAttemptLock(lock, mode);
1342 :
1343 1471528 : if (mustwait)
1344 : {
1345 : /* Failed to get lock, so release interrupt holdoff */
1346 3704 : RESUME_INTERRUPTS();
1347 :
1348 : LOG_LWDEBUG("LWLockConditionalAcquire", lock, "failed");
1349 : if (TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL_ENABLED())
1350 : TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(lock), mode);
1351 : }
1352 : else
1353 : {
1354 : /* Add lock to list of locks held by this backend */
1355 1467824 : held_lwlocks[num_held_lwlocks].lock = lock;
1356 1467824 : held_lwlocks[num_held_lwlocks++].mode = mode;
1357 : if (TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_ENABLED())
1358 : TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(lock), mode);
1359 : }
1360 1471528 : return !mustwait;
1361 : }
1362 :
1363 : /*
1364 : * LWLockAcquireOrWait - Acquire lock, or wait until it's free
1365 : *
1366 : * The semantics of this function are a bit funky. If the lock is currently
1367 : * free, it is acquired in the given mode, and the function returns true. If
1368 : * the lock isn't immediately free, the function waits until it is released
1369 : * and returns false, but does not acquire the lock.
1370 : *
1371 : * This is currently used for WALWriteLock: when a backend flushes the WAL,
1372 : * holding WALWriteLock, it can flush the commit records of many other
1373 : * backends as a side-effect. Those other backends need to wait until the
1374 : * flush finishes, but don't need to acquire the lock anymore. They can just
1375 : * wake up, observe that their records have already been flushed, and return.
1376 : */
1377 : bool
1378 168256 : LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
1379 : {
1380 168256 : PGPROC *proc = MyProc;
1381 : bool mustwait;
1382 168256 : int extraWaits = 0;
1383 : #ifdef LWLOCK_STATS
1384 : lwlock_stats *lwstats;
1385 :
1386 : lwstats = get_lwlock_stats_entry(lock);
1387 : #endif
1388 :
1389 : Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1390 :
1391 : PRINT_LWDEBUG("LWLockAcquireOrWait", lock, mode);
1392 :
1393 : /* Ensure we will have room to remember the lock */
1394 168256 : if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1395 0 : elog(ERROR, "too many LWLocks taken");
1396 :
1397 : /*
1398 : * Lock out cancel/die interrupts until we exit the code section protected
1399 : * by the LWLock. This ensures that interrupts will not interfere with
1400 : * manipulations of data structures in shared memory.
1401 : */
1402 168256 : HOLD_INTERRUPTS();
1403 :
1404 : /*
1405 : * NB: We're using nearly the same twice-in-a-row lock acquisition
1406 : * protocol as LWLockAcquire(). Check its comments for details.
1407 : */
1408 168256 : mustwait = LWLockAttemptLock(lock, mode);
1409 :
1410 168256 : if (mustwait)
1411 : {
1412 3810 : LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
1413 :
1414 3810 : mustwait = LWLockAttemptLock(lock, mode);
1415 :
1416 3810 : if (mustwait)
1417 : {
1418 : /*
1419 : * Wait until awakened. Like in LWLockAcquire, be prepared for
1420 : * bogus wakeups.
1421 : */
1422 : LOG_LWDEBUG("LWLockAcquireOrWait", lock, "waiting");
1423 :
1424 : #ifdef LWLOCK_STATS
1425 : lwstats->block_count++;
1426 : #endif
1427 :
1428 3685 : LWLockReportWaitStart(lock);
1429 : if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED())
1430 : TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
1431 :
1432 : for (;;)
1433 : {
1434 3685 : PGSemaphoreLock(proc->sem);
1435 3685 : if (proc->lwWaiting == LW_WS_NOT_WAITING)
1436 3685 : break;
1437 0 : extraWaits++;
1438 : }
1439 :
1440 : #ifdef LOCK_DEBUG
1441 : {
1442 : /* not waiting anymore */
1443 : uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1444 :
1445 : Assert(nwaiters < MAX_BACKENDS);
1446 : }
1447 : #endif
1448 : if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED())
1449 : TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
1450 3685 : LWLockReportWaitEnd();
1451 :
1452 : LOG_LWDEBUG("LWLockAcquireOrWait", lock, "awakened");
1453 : }
1454 : else
1455 : {
1456 : LOG_LWDEBUG("LWLockAcquireOrWait", lock, "acquired, undoing queue");
1457 :
1458 : /*
1459 : * Got lock in the second attempt, undo queueing. We need to treat
1460 : * this as having successfully acquired the lock, otherwise we'd
1461 : * not necessarily wake up people we've prevented from acquiring
1462 : * the lock.
1463 : */
1464 125 : LWLockDequeueSelf(lock);
1465 : }
1466 : }
1467 :
1468 : /*
1469 : * Fix the process wait semaphore's count for any absorbed wakeups.
1470 : */
1471 168256 : while (extraWaits-- > 0)
1472 0 : PGSemaphoreUnlock(proc->sem);
1473 :
1474 168256 : if (mustwait)
1475 : {
1476 : /* Failed to get lock, so release interrupt holdoff */
1477 3685 : RESUME_INTERRUPTS();
1478 : LOG_LWDEBUG("LWLockAcquireOrWait", lock, "failed");
1479 : if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL_ENABLED())
1480 : TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL(T_NAME(lock), mode);
1481 : }
1482 : else
1483 : {
1484 : LOG_LWDEBUG("LWLockAcquireOrWait", lock, "succeeded");
1485 : /* Add lock to list of locks held by this backend */
1486 164571 : held_lwlocks[num_held_lwlocks].lock = lock;
1487 164571 : held_lwlocks[num_held_lwlocks++].mode = mode;
1488 : if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_ENABLED())
1489 : TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT(T_NAME(lock), mode);
1490 : }
1491 :
1492 168256 : return !mustwait;
1493 : }
1494 :
1495 : /*
1496 : * Does the lwlock in its current state need to wait for the variable value to
1497 : * change?
1498 : *
1499 : * If we don't need to wait, and it's because the value of the variable has
1500 : * changed, store the current value in newval.
1501 : *
1502 : * *result is set to true if the lock was free, and false otherwise.
1503 : */
1504 : static bool
1505 4429291 : LWLockConflictsWithVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 oldval,
1506 : uint64 *newval, bool *result)
1507 : {
1508 : bool mustwait;
1509 : uint64 value;
1510 :
1511 : /*
1512 : * Test first to see if it the slot is free right now.
1513 : *
1514 : * XXX: the unique caller of this routine, WaitXLogInsertionsToFinish()
1515 : * via LWLockWaitForVar(), uses an implied barrier with a spinlock before
1516 : * this, so we don't need a memory barrier here as far as the current
1517 : * usage is concerned. But that might not be safe in general.
1518 : */
1519 4429291 : mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
1520 :
1521 4429291 : if (!mustwait)
1522 : {
1523 2963842 : *result = true;
1524 2963842 : return false;
1525 : }
1526 :
1527 1465449 : *result = false;
1528 :
1529 : /*
1530 : * Reading this value atomically is safe even on platforms where uint64
1531 : * cannot be read without observing a torn value.
1532 : */
1533 1465449 : value = pg_atomic_read_u64(valptr);
1534 :
1535 1465449 : if (value != oldval)
1536 : {
1537 1247621 : mustwait = false;
1538 1247621 : *newval = value;
1539 : }
1540 : else
1541 : {
1542 217828 : mustwait = true;
1543 : }
1544 :
1545 1465449 : return mustwait;
1546 : }
1547 :
1548 : /*
1549 : * LWLockWaitForVar - Wait until lock is free, or a variable is updated.
1550 : *
1551 : * If the lock is held and *valptr equals oldval, waits until the lock is
1552 : * either freed, or the lock holder updates *valptr by calling
1553 : * LWLockUpdateVar. If the lock is free on exit (immediately or after
1554 : * waiting), returns true. If the lock is still held, but *valptr no longer
1555 : * matches oldval, returns false and sets *newval to the current value in
1556 : * *valptr.
1557 : *
1558 : * Note: this function ignores shared lock holders; if the lock is held
1559 : * in shared mode, returns 'true'.
1560 : *
1561 : * Be aware that LWLockConflictsWithVar() does not include a memory barrier,
1562 : * hence the caller of this function may want to rely on an explicit barrier or
1563 : * an implied barrier via spinlock or LWLock to avoid memory ordering issues.
1564 : */
1565 : bool
1566 4211463 : LWLockWaitForVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 oldval,
1567 : uint64 *newval)
1568 : {
1569 4211463 : PGPROC *proc = MyProc;
1570 4211463 : int extraWaits = 0;
1571 4211463 : bool result = false;
1572 : #ifdef LWLOCK_STATS
1573 : lwlock_stats *lwstats;
1574 :
1575 : lwstats = get_lwlock_stats_entry(lock);
1576 : #endif
1577 :
1578 : PRINT_LWDEBUG("LWLockWaitForVar", lock, LW_WAIT_UNTIL_FREE);
1579 :
1580 : /*
1581 : * Lock out cancel/die interrupts while we sleep on the lock. There is no
1582 : * cleanup mechanism to remove us from the wait queue if we got
1583 : * interrupted.
1584 : */
1585 4211463 : HOLD_INTERRUPTS();
1586 :
1587 : /*
1588 : * Loop here to check the lock's status after each time we are signaled.
1589 : */
1590 : for (;;)
1591 108082 : {
1592 : bool mustwait;
1593 :
1594 4319545 : mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
1595 : &result);
1596 :
1597 4319545 : if (!mustwait)
1598 4209799 : break; /* the lock was free or value didn't match */
1599 :
1600 : /*
1601 : * Add myself to wait queue. Note that this is racy, somebody else
1602 : * could wakeup before we're finished queuing. NB: We're using nearly
1603 : * the same twice-in-a-row lock acquisition protocol as
1604 : * LWLockAcquire(). Check its comments for details. The only
1605 : * difference is that we also have to check the variable's values when
1606 : * checking the state of the lock.
1607 : */
1608 109746 : LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
1609 :
1610 : /*
1611 : * Clear LW_FLAG_WAKE_IN_PROGRESS flag, to make sure we get woken up
1612 : * as soon as the lock is released.
1613 : */
1614 109746 : pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_WAKE_IN_PROGRESS);
1615 :
1616 : /*
1617 : * We're now guaranteed to be woken up if necessary. Recheck the lock
1618 : * and variables state.
1619 : */
1620 109746 : mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
1621 : &result);
1622 :
1623 : /* Ok, no conflict after we queued ourselves. Undo queueing. */
1624 109746 : if (!mustwait)
1625 : {
1626 : LOG_LWDEBUG("LWLockWaitForVar", lock, "free, undoing queue");
1627 :
1628 1664 : LWLockDequeueSelf(lock);
1629 1664 : break;
1630 : }
1631 :
1632 : /*
1633 : * Wait until awakened.
1634 : *
1635 : * It is possible that we get awakened for a reason other than being
1636 : * signaled by LWLockRelease. If so, loop back and wait again. Once
1637 : * we've gotten the LWLock, re-increment the sema by the number of
1638 : * additional signals received.
1639 : */
1640 : LOG_LWDEBUG("LWLockWaitForVar", lock, "waiting");
1641 :
1642 : #ifdef LWLOCK_STATS
1643 : lwstats->block_count++;
1644 : #endif
1645 :
1646 108082 : LWLockReportWaitStart(lock);
1647 : if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED())
1648 : TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), LW_EXCLUSIVE);
1649 :
1650 : for (;;)
1651 : {
1652 108082 : PGSemaphoreLock(proc->sem);
1653 108082 : if (proc->lwWaiting == LW_WS_NOT_WAITING)
1654 108082 : break;
1655 0 : extraWaits++;
1656 : }
1657 :
1658 : #ifdef LOCK_DEBUG
1659 : {
1660 : /* not waiting anymore */
1661 : uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1662 :
1663 : Assert(nwaiters < MAX_BACKENDS);
1664 : }
1665 : #endif
1666 :
1667 : if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED())
1668 : TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), LW_EXCLUSIVE);
1669 108082 : LWLockReportWaitEnd();
1670 :
1671 : LOG_LWDEBUG("LWLockWaitForVar", lock, "awakened");
1672 :
1673 : /* Now loop back and check the status of the lock again. */
1674 : }
1675 :
1676 : /*
1677 : * Fix the process wait semaphore's count for any absorbed wakeups.
1678 : */
1679 4211463 : while (extraWaits-- > 0)
1680 0 : PGSemaphoreUnlock(proc->sem);
1681 :
1682 : /*
1683 : * Now okay to allow cancel/die interrupts.
1684 : */
1685 4211463 : RESUME_INTERRUPTS();
1686 :
1687 4211463 : return result;
1688 : }
1689 :
1690 :
1691 : /*
1692 : * LWLockUpdateVar - Update a variable and wake up waiters atomically
1693 : *
1694 : * Sets *valptr to 'val', and wakes up all processes waiting for us with
1695 : * LWLockWaitForVar(). It first sets the value atomically and then wakes up
1696 : * waiting processes so that any process calling LWLockWaitForVar() on the same
1697 : * lock is guaranteed to see the new value, and act accordingly.
1698 : *
1699 : * The caller must be holding the lock in exclusive mode.
1700 : */
1701 : void
1702 2742906 : LWLockUpdateVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 val)
1703 : {
1704 : proclist_head wakeup;
1705 : proclist_mutable_iter iter;
1706 :
1707 : PRINT_LWDEBUG("LWLockUpdateVar", lock, LW_EXCLUSIVE);
1708 :
1709 : /*
1710 : * Note that pg_atomic_exchange_u64 is a full barrier, so we're guaranteed
1711 : * that the variable is updated before waking up waiters.
1712 : */
1713 2742906 : pg_atomic_exchange_u64(valptr, val);
1714 :
1715 2742906 : proclist_init(&wakeup);
1716 :
1717 2742906 : LWLockWaitListLock(lock);
1718 :
1719 : Assert(pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE);
1720 :
1721 : /*
1722 : * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
1723 : * up. They are always in the front of the queue.
1724 : */
1725 2743826 : proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
1726 : {
1727 42909 : PGPROC *waiter = GetPGProcByNumber(iter.cur);
1728 :
1729 42909 : if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
1730 41989 : break;
1731 :
1732 920 : proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
1733 920 : proclist_push_tail(&wakeup, iter.cur, lwWaitLink);
1734 :
1735 : /* see LWLockWakeup() */
1736 : Assert(waiter->lwWaiting == LW_WS_WAITING);
1737 920 : waiter->lwWaiting = LW_WS_PENDING_WAKEUP;
1738 : }
1739 :
1740 : /* We are done updating shared state of the lock itself. */
1741 2742906 : LWLockWaitListUnlock(lock);
1742 :
1743 : /*
1744 : * Awaken any waiters I removed from the queue.
1745 : */
1746 2743826 : proclist_foreach_modify(iter, &wakeup, lwWaitLink)
1747 : {
1748 920 : PGPROC *waiter = GetPGProcByNumber(iter.cur);
1749 :
1750 920 : proclist_delete(&wakeup, iter.cur, lwWaitLink);
1751 : /* check comment in LWLockWakeup() about this barrier */
1752 920 : pg_write_barrier();
1753 920 : waiter->lwWaiting = LW_WS_NOT_WAITING;
1754 920 : PGSemaphoreUnlock(waiter->sem);
1755 : }
1756 2742906 : }
1757 :
1758 :
1759 : /*
1760 : * LWLockRelease - release a previously acquired lock
1761 : *
1762 : * NB: This will leave lock->owner pointing to the current backend (if
1763 : * LOCK_DEBUG is set). This is somewhat intentional, as it makes it easier to
1764 : * debug cases of missing wakeups during lock release.
1765 : */
1766 : void
1767 349059477 : LWLockRelease(LWLock *lock)
1768 : {
1769 : LWLockMode mode;
1770 : uint32 oldstate;
1771 : bool check_waiters;
1772 : int i;
1773 :
1774 : /*
1775 : * Remove lock from list of locks held. Usually, but not always, it will
1776 : * be the latest-acquired lock; so search array backwards.
1777 : */
1778 393432347 : for (i = num_held_lwlocks; --i >= 0;)
1779 393432347 : if (lock == held_lwlocks[i].lock)
1780 349059477 : break;
1781 :
1782 349059477 : if (i < 0)
1783 0 : elog(ERROR, "lock %s is not held", T_NAME(lock));
1784 :
1785 349059477 : mode = held_lwlocks[i].mode;
1786 :
1787 349059477 : num_held_lwlocks--;
1788 393432347 : for (; i < num_held_lwlocks; i++)
1789 44372870 : held_lwlocks[i] = held_lwlocks[i + 1];
1790 :
1791 : PRINT_LWDEBUG("LWLockRelease", lock, mode);
1792 :
1793 : /*
1794 : * Release my hold on lock, after that it can immediately be acquired by
1795 : * others, even if we still have to wakeup other waiters.
1796 : */
1797 349059477 : if (mode == LW_EXCLUSIVE)
1798 240714445 : oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_EXCLUSIVE);
1799 : else
1800 108345032 : oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_SHARED);
1801 :
1802 : /* nobody else can have that kind of lock */
1803 : Assert(!(oldstate & LW_VAL_EXCLUSIVE));
1804 :
1805 : if (TRACE_POSTGRESQL_LWLOCK_RELEASE_ENABLED())
1806 : TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock));
1807 :
1808 : /*
1809 : * Check if we're still waiting for backends to get scheduled, if so,
1810 : * don't wake them up again.
1811 : */
1812 349059477 : if ((oldstate & LW_FLAG_HAS_WAITERS) &&
1813 6085072 : !(oldstate & LW_FLAG_WAKE_IN_PROGRESS) &&
1814 5151967 : (oldstate & LW_LOCK_MASK) == 0)
1815 5148796 : check_waiters = true;
1816 : else
1817 343910681 : check_waiters = false;
1818 :
1819 : /*
1820 : * As waking up waiters requires the spinlock to be acquired, only do so
1821 : * if necessary.
1822 : */
1823 349059477 : if (check_waiters)
1824 : {
1825 : /* XXX: remove before commit? */
1826 : LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters");
1827 5148796 : LWLockWakeup(lock);
1828 : }
1829 :
1830 : /*
1831 : * Now okay to allow cancel/die interrupts.
1832 : */
1833 349059477 : RESUME_INTERRUPTS();
1834 349059477 : }
1835 :
1836 : /*
1837 : * LWLockReleaseClearVar - release a previously acquired lock, reset variable
1838 : */
1839 : void
1840 24162571 : LWLockReleaseClearVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 val)
1841 : {
1842 : /*
1843 : * Note that pg_atomic_exchange_u64 is a full barrier, so we're guaranteed
1844 : * that the variable is updated before releasing the lock.
1845 : */
1846 24162571 : pg_atomic_exchange_u64(valptr, val);
1847 :
1848 24162571 : LWLockRelease(lock);
1849 24162571 : }
1850 :
1851 :
1852 : /*
1853 : * LWLockReleaseAll - release all currently-held locks
1854 : *
1855 : * Used to clean up after ereport(ERROR). An important difference between this
1856 : * function and retail LWLockRelease calls is that InterruptHoldoffCount is
1857 : * unchanged by this operation. This is necessary since InterruptHoldoffCount
1858 : * has been set to an appropriate level earlier in error recovery. We could
1859 : * decrement it below zero if we allow it to drop for each released lock!
1860 : *
1861 : * Note that this function must be safe to call even before the LWLock
1862 : * subsystem has been initialized (e.g., during early startup failures).
1863 : * In that case, num_held_lwlocks will be 0 and we do nothing.
1864 : */
1865 : void
1866 123543 : LWLockReleaseAll(void)
1867 : {
1868 123646 : while (num_held_lwlocks > 0)
1869 : {
1870 103 : HOLD_INTERRUPTS(); /* match the upcoming RESUME_INTERRUPTS */
1871 :
1872 103 : LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock);
1873 : }
1874 :
1875 : Assert(num_held_lwlocks == 0);
1876 123543 : }
1877 :
1878 :
1879 : /*
1880 : * LWLockHeldByMe - test whether my process holds a lock in any mode
1881 : *
1882 : * This is meant as debug support only.
1883 : */
1884 : bool
1885 0 : LWLockHeldByMe(LWLock *lock)
1886 : {
1887 : int i;
1888 :
1889 0 : for (i = 0; i < num_held_lwlocks; i++)
1890 : {
1891 0 : if (held_lwlocks[i].lock == lock)
1892 0 : return true;
1893 : }
1894 0 : return false;
1895 : }
1896 :
1897 : /*
1898 : * LWLockAnyHeldByMe - test whether my process holds any of an array of locks
1899 : *
1900 : * This is meant as debug support only.
1901 : */
1902 : bool
1903 0 : LWLockAnyHeldByMe(LWLock *lock, int nlocks, size_t stride)
1904 : {
1905 : char *held_lock_addr;
1906 : char *begin;
1907 : char *end;
1908 : int i;
1909 :
1910 0 : begin = (char *) lock;
1911 0 : end = begin + nlocks * stride;
1912 0 : for (i = 0; i < num_held_lwlocks; i++)
1913 : {
1914 0 : held_lock_addr = (char *) held_lwlocks[i].lock;
1915 0 : if (held_lock_addr >= begin &&
1916 0 : held_lock_addr < end &&
1917 0 : (held_lock_addr - begin) % stride == 0)
1918 0 : return true;
1919 : }
1920 0 : return false;
1921 : }
1922 :
1923 : /*
1924 : * LWLockHeldByMeInMode - test whether my process holds a lock in given mode
1925 : *
1926 : * This is meant as debug support only.
1927 : */
1928 : bool
1929 0 : LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
1930 : {
1931 : int i;
1932 :
1933 0 : for (i = 0; i < num_held_lwlocks; i++)
1934 : {
1935 0 : if (held_lwlocks[i].lock == lock && held_lwlocks[i].mode == mode)
1936 0 : return true;
1937 : }
1938 0 : return false;
1939 : }
|