Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * syncrep.c
4 : *
5 : * Synchronous replication is new as of PostgreSQL 9.1.
6 : *
7 : * If requested, transaction commits wait until their commit LSN are
8 : * acknowledged by the synchronous standbys.
9 : *
10 : * This module contains the code for waiting and release of backends.
11 : * All code in this module executes on the primary. The core streaming
12 : * replication transport remains within WALreceiver/WALsender modules.
13 : *
14 : * The essence of this design is that it isolates all logic about
15 : * waiting/releasing onto the primary. The primary defines which standbys
16 : * it wishes to wait for. The standbys are completely unaware of the
17 : * durability requirements of transactions on the primary, reducing the
18 : * complexity of the code and streamlining both standby operations and
19 : * network bandwidth because there is no requirement to ship
20 : * per-transaction state information.
21 : *
22 : * Replication is either synchronous or not synchronous (async). If it is
23 : * async, we just fastpath out of here. If it is sync, then we wait for
24 : * the write, flush or apply location on the standby before releasing
25 : * the waiting backend. Further complexity in that interaction is
26 : * expected in later releases.
27 : *
28 : * The best performing way to manage the waiting backends is to have a
29 : * single ordered queue of waiting backends, so that we can avoid
30 : * searching the through all waiters each time we receive a reply.
31 : *
32 : * In 9.5 or before only a single standby could be considered as
33 : * synchronous. In 9.6 we support a priority-based multiple synchronous
34 : * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
35 : * supported. The number of synchronous standbys that transactions
36 : * must wait for replies from is specified in synchronous_standby_names.
37 : * This parameter also specifies a list of standby names and the method
38 : * (FIRST and ANY) to choose synchronous standbys from the listed ones.
39 : *
40 : * The method FIRST specifies a priority-based synchronous replication
41 : * and makes transaction commits wait until their WAL records are
42 : * replicated to the requested number of synchronous standbys chosen based
43 : * on their priorities. The standbys whose names appear earlier in the list
44 : * are given higher priority and will be considered as synchronous.
45 : * Other standby servers appearing later in this list represent potential
46 : * synchronous standbys. If any of the current synchronous standbys
47 : * disconnects for whatever reason, it will be replaced immediately with
48 : * the next-highest-priority standby.
49 : *
50 : * The method ANY specifies a quorum-based synchronous replication
51 : * and makes transaction commits wait until their WAL records are
52 : * replicated to at least the requested number of synchronous standbys
53 : * in the list. All the standbys appearing in the list are considered as
54 : * candidates for quorum synchronous standbys.
55 : *
56 : * If neither FIRST nor ANY is specified, FIRST is used as the method.
57 : * This is for backward compatibility with 9.6 or before where only a
58 : * priority-based sync replication was supported.
59 : *
60 : * Before the standbys chosen from synchronous_standby_names can
61 : * become the synchronous standbys they must have caught up with
62 : * the primary; that may take some time. Once caught up,
63 : * the standbys which are considered as synchronous at that moment
64 : * will release waiters from the queue.
65 : *
66 : * Portions Copyright (c) 2010-2025, PostgreSQL Global Development Group
67 : *
68 : * IDENTIFICATION
69 : * src/backend/replication/syncrep.c
70 : *
71 : *-------------------------------------------------------------------------
72 : */
73 : #include "postgres.h"
74 :
75 : #include <unistd.h>
76 :
77 : #include "access/xact.h"
78 : #include "common/int.h"
79 : #include "miscadmin.h"
80 : #include "pgstat.h"
81 : #include "replication/syncrep.h"
82 : #include "replication/walsender.h"
83 : #include "replication/walsender_private.h"
84 : #include "storage/proc.h"
85 : #include "tcop/tcopprot.h"
86 : #include "utils/guc_hooks.h"
87 : #include "utils/ps_status.h"
88 :
89 : /* User-settable parameters for sync rep */
90 : char *SyncRepStandbyNames;
91 :
92 : #define SyncStandbysDefined() \
93 : (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
94 :
95 : static bool announce_next_takeover = true;
96 :
97 : SyncRepConfigData *SyncRepConfig = NULL;
98 : static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
99 :
100 : static void SyncRepQueueInsert(int mode);
101 : static void SyncRepCancelWait(void);
102 : static int SyncRepWakeQueue(bool all, int mode);
103 :
104 : static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
105 : XLogRecPtr *flushPtr,
106 : XLogRecPtr *applyPtr,
107 : bool *am_sync);
108 : static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
109 : XLogRecPtr *flushPtr,
110 : XLogRecPtr *applyPtr,
111 : SyncRepStandbyData *sync_standbys,
112 : int num_standbys);
113 : static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
114 : XLogRecPtr *flushPtr,
115 : XLogRecPtr *applyPtr,
116 : SyncRepStandbyData *sync_standbys,
117 : int num_standbys,
118 : uint8 nth);
119 : static int SyncRepGetStandbyPriority(void);
120 : static int standby_priority_comparator(const void *a, const void *b);
121 : static int cmp_lsn(const void *a, const void *b);
122 :
123 : #ifdef USE_ASSERT_CHECKING
124 : static bool SyncRepQueueIsOrderedByLSN(int mode);
125 : #endif
126 :
127 : /*
128 : * ===========================================================
129 : * Synchronous Replication functions for normal user backends
130 : * ===========================================================
131 : */
132 :
133 : /*
134 : * Wait for synchronous replication, if requested by user.
135 : *
136 : * Initially backends start in state SYNC_REP_NOT_WAITING and then
137 : * change that state to SYNC_REP_WAITING before adding ourselves
138 : * to the wait queue. During SyncRepWakeQueue() a WALSender changes
139 : * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
140 : * This backend then resets its state to SYNC_REP_NOT_WAITING.
141 : *
142 : * 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN
143 : * represents a commit record. If it doesn't, then we wait only for the WAL
144 : * to be flushed if synchronous_commit is set to the higher level of
145 : * remote_apply, because only commit records provide apply feedback.
146 : */
147 : void
148 227050 : SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
149 : {
150 : int mode;
151 :
152 : /*
153 : * This should be called while holding interrupts during a transaction
154 : * commit to prevent the follow-up shared memory queue cleanups to be
155 : * influenced by external interruptions.
156 : */
157 : Assert(InterruptHoldoffCount > 0);
158 :
159 : /*
160 : * Fast exit if user has not requested sync replication, or there are no
161 : * sync replication standby names defined.
162 : *
163 : * Since this routine gets called every commit time, it's important to
164 : * exit quickly if sync replication is not requested. So we check
165 : * WalSndCtl->sync_standbys_defined flag without the lock and exit
166 : * immediately if it's false. If it's true, we need to check it again
167 : * later while holding the lock, to check the flag and operate the sync
168 : * rep queue atomically. This is necessary to avoid the race condition
169 : * described in SyncRepUpdateSyncStandbysDefined(). On the other hand, if
170 : * it's false, the lock is not necessary because we don't touch the queue.
171 : */
172 227050 : if (!SyncRepRequested() ||
173 174892 : !((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
174 226972 : return;
175 :
176 : /* Cap the level for anything other than commit to remote flush only. */
177 78 : if (commit)
178 44 : mode = SyncRepWaitMode;
179 : else
180 34 : mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
181 :
182 : Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
183 : Assert(WalSndCtl != NULL);
184 :
185 78 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
186 : Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
187 :
188 : /*
189 : * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
190 : * set. See SyncRepUpdateSyncStandbysDefined.
191 : *
192 : * Also check that the standby hasn't already replied. Unlikely race
193 : * condition but we'll be fetching that cache line anyway so it's likely
194 : * to be a low cost check.
195 : */
196 78 : if (!WalSndCtl->sync_standbys_defined ||
197 78 : lsn <= WalSndCtl->lsn[mode])
198 : {
199 0 : LWLockRelease(SyncRepLock);
200 0 : return;
201 : }
202 :
203 : /*
204 : * Set our waitLSN so WALSender will know when to wake us, and add
205 : * ourselves to the queue.
206 : */
207 78 : MyProc->waitLSN = lsn;
208 78 : MyProc->syncRepState = SYNC_REP_WAITING;
209 78 : SyncRepQueueInsert(mode);
210 : Assert(SyncRepQueueIsOrderedByLSN(mode));
211 78 : LWLockRelease(SyncRepLock);
212 :
213 : /* Alter ps display to show waiting for sync rep. */
214 78 : if (update_process_title)
215 : {
216 : char buffer[32];
217 :
218 78 : sprintf(buffer, "waiting for %X/%X", LSN_FORMAT_ARGS(lsn));
219 78 : set_ps_display_suffix(buffer);
220 : }
221 :
222 : /*
223 : * Wait for specified LSN to be confirmed.
224 : *
225 : * Each proc has its own wait latch, so we perform a normal latch
226 : * check/wait loop here.
227 : */
228 : for (;;)
229 78 : {
230 : int rc;
231 :
232 : /* Must reset the latch before testing state. */
233 156 : ResetLatch(MyLatch);
234 :
235 : /*
236 : * Acquiring the lock is not needed, the latch ensures proper
237 : * barriers. If it looks like we're done, we must really be done,
238 : * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
239 : * it will never update it again, so we can't be seeing a stale value
240 : * in that case.
241 : */
242 156 : if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
243 78 : break;
244 :
245 : /*
246 : * If a wait for synchronous replication is pending, we can neither
247 : * acknowledge the commit nor raise ERROR or FATAL. The latter would
248 : * lead the client to believe that the transaction aborted, which is
249 : * not true: it's already committed locally. The former is no good
250 : * either: the client has requested synchronous replication, and is
251 : * entitled to assume that an acknowledged commit is also replicated,
252 : * which might not be true. So in this case we issue a WARNING (which
253 : * some clients may be able to interpret) and shut off further output.
254 : * We do NOT reset ProcDiePending, so that the process will die after
255 : * the commit is cleaned up.
256 : */
257 78 : if (ProcDiePending)
258 : {
259 0 : ereport(WARNING,
260 : (errcode(ERRCODE_ADMIN_SHUTDOWN),
261 : errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
262 : errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
263 0 : whereToSendOutput = DestNone;
264 0 : SyncRepCancelWait();
265 0 : break;
266 : }
267 :
268 : /*
269 : * It's unclear what to do if a query cancel interrupt arrives. We
270 : * can't actually abort at this point, but ignoring the interrupt
271 : * altogether is not helpful, so we just terminate the wait with a
272 : * suitable warning.
273 : */
274 78 : if (QueryCancelPending)
275 : {
276 0 : QueryCancelPending = false;
277 0 : ereport(WARNING,
278 : (errmsg("canceling wait for synchronous replication due to user request"),
279 : errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
280 0 : SyncRepCancelWait();
281 0 : break;
282 : }
283 :
284 : /*
285 : * Wait on latch. Any condition that should wake us up will set the
286 : * latch, so no need for timeout.
287 : */
288 78 : rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
289 : WAIT_EVENT_SYNC_REP);
290 :
291 : /*
292 : * If the postmaster dies, we'll probably never get an acknowledgment,
293 : * because all the wal sender processes will exit. So just bail out.
294 : */
295 78 : if (rc & WL_POSTMASTER_DEATH)
296 : {
297 0 : ProcDiePending = true;
298 0 : whereToSendOutput = DestNone;
299 0 : SyncRepCancelWait();
300 0 : break;
301 : }
302 : }
303 :
304 : /*
305 : * WalSender has checked our LSN and has removed us from queue. Clean up
306 : * state and leave. It's OK to reset these shared memory fields without
307 : * holding SyncRepLock, because any walsenders will ignore us anyway when
308 : * we're not on the queue. We need a read barrier to make sure we see the
309 : * changes to the queue link (this might be unnecessary without
310 : * assertions, but better safe than sorry).
311 : */
312 78 : pg_read_barrier();
313 : Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
314 78 : MyProc->syncRepState = SYNC_REP_NOT_WAITING;
315 78 : MyProc->waitLSN = 0;
316 :
317 : /* reset ps display to remove the suffix */
318 78 : if (update_process_title)
319 78 : set_ps_display_remove_suffix();
320 : }
321 :
322 : /*
323 : * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
324 : *
325 : * Usually we will go at tail of queue, though it's possible that we arrive
326 : * here out of order, so start at tail and work back to insertion point.
327 : */
328 : static void
329 78 : SyncRepQueueInsert(int mode)
330 : {
331 : dlist_head *queue;
332 : dlist_iter iter;
333 :
334 : Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
335 78 : queue = &WalSndCtl->SyncRepQueue[mode];
336 :
337 78 : dlist_reverse_foreach(iter, queue)
338 : {
339 0 : PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
340 :
341 : /*
342 : * Stop at the queue element that we should insert after to ensure the
343 : * queue is ordered by LSN.
344 : */
345 0 : if (proc->waitLSN < MyProc->waitLSN)
346 : {
347 0 : dlist_insert_after(&proc->syncRepLinks, &MyProc->syncRepLinks);
348 0 : return;
349 : }
350 : }
351 :
352 : /*
353 : * If we get here, the list was either empty, or this process needs to be
354 : * at the head.
355 : */
356 78 : dlist_push_head(queue, &MyProc->syncRepLinks);
357 : }
358 :
359 : /*
360 : * Acquire SyncRepLock and cancel any wait currently in progress.
361 : */
362 : static void
363 0 : SyncRepCancelWait(void)
364 : {
365 0 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
366 0 : if (!dlist_node_is_detached(&MyProc->syncRepLinks))
367 0 : dlist_delete_thoroughly(&MyProc->syncRepLinks);
368 0 : MyProc->syncRepState = SYNC_REP_NOT_WAITING;
369 0 : LWLockRelease(SyncRepLock);
370 0 : }
371 :
372 : void
373 30192 : SyncRepCleanupAtProcExit(void)
374 : {
375 : /*
376 : * First check if we are removed from the queue without the lock to not
377 : * slow down backend exit.
378 : */
379 30192 : if (!dlist_node_is_detached(&MyProc->syncRepLinks))
380 : {
381 0 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
382 :
383 : /* maybe we have just been removed, so recheck */
384 0 : if (!dlist_node_is_detached(&MyProc->syncRepLinks))
385 0 : dlist_delete_thoroughly(&MyProc->syncRepLinks);
386 :
387 0 : LWLockRelease(SyncRepLock);
388 : }
389 30192 : }
390 :
391 : /*
392 : * ===========================================================
393 : * Synchronous Replication functions for wal sender processes
394 : * ===========================================================
395 : */
396 :
397 : /*
398 : * Take any action required to initialise sync rep state from config
399 : * data. Called at WALSender startup and after each SIGHUP.
400 : */
401 : void
402 1270 : SyncRepInitConfig(void)
403 : {
404 : int priority;
405 :
406 : /*
407 : * Determine if we are a potential sync standby and remember the result
408 : * for handling replies from standby.
409 : */
410 1270 : priority = SyncRepGetStandbyPriority();
411 1270 : if (MyWalSnd->sync_standby_priority != priority)
412 : {
413 42 : SpinLockAcquire(&MyWalSnd->mutex);
414 42 : MyWalSnd->sync_standby_priority = priority;
415 42 : SpinLockRelease(&MyWalSnd->mutex);
416 :
417 42 : ereport(DEBUG1,
418 : (errmsg_internal("standby \"%s\" now has synchronous standby priority %d",
419 : application_name, priority)));
420 : }
421 1270 : }
422 :
423 : /*
424 : * Update the LSNs on each queue based upon our latest state. This
425 : * implements a simple policy of first-valid-sync-standby-releases-waiter.
426 : *
427 : * Other policies are possible, which would change what we do here and
428 : * perhaps also which information we store as well.
429 : */
430 : void
431 66492 : SyncRepReleaseWaiters(void)
432 : {
433 66492 : volatile WalSndCtlData *walsndctl = WalSndCtl;
434 : XLogRecPtr writePtr;
435 : XLogRecPtr flushPtr;
436 : XLogRecPtr applyPtr;
437 : bool got_recptr;
438 : bool am_sync;
439 66492 : int numwrite = 0;
440 66492 : int numflush = 0;
441 66492 : int numapply = 0;
442 :
443 : /*
444 : * If this WALSender is serving a standby that is not on the list of
445 : * potential sync standbys then we have nothing to do. If we are still
446 : * starting up, still running base backup or the current flush position is
447 : * still invalid, then leave quickly also. Streaming or stopping WAL
448 : * senders are allowed to release waiters.
449 : */
450 66492 : if (MyWalSnd->sync_standby_priority == 0 ||
451 230 : (MyWalSnd->state != WALSNDSTATE_STREAMING &&
452 70 : MyWalSnd->state != WALSNDSTATE_STOPPING) ||
453 210 : XLogRecPtrIsInvalid(MyWalSnd->flush))
454 : {
455 66282 : announce_next_takeover = true;
456 66284 : return;
457 : }
458 :
459 : /*
460 : * We're a potential sync standby. Release waiters if there are enough
461 : * sync standbys and we are considered as sync.
462 : */
463 210 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
464 :
465 : /*
466 : * Check whether we are a sync standby or not, and calculate the synced
467 : * positions among all sync standbys. (Note: although this step does not
468 : * of itself require holding SyncRepLock, it seems like a good idea to do
469 : * it after acquiring the lock. This ensures that the WAL pointers we use
470 : * to release waiters are newer than any previous execution of this
471 : * routine used.)
472 : */
473 210 : got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
474 :
475 : /*
476 : * If we are managing a sync standby, though we weren't prior to this,
477 : * then announce we are now a sync standby.
478 : */
479 210 : if (announce_next_takeover && am_sync)
480 : {
481 18 : announce_next_takeover = false;
482 :
483 18 : if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
484 18 : ereport(LOG,
485 : (errmsg("standby \"%s\" is now a synchronous standby with priority %d",
486 : application_name, MyWalSnd->sync_standby_priority)));
487 : else
488 0 : ereport(LOG,
489 : (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
490 : application_name)));
491 : }
492 :
493 : /*
494 : * If the number of sync standbys is less than requested or we aren't
495 : * managing a sync standby then just leave.
496 : */
497 210 : if (!got_recptr || !am_sync)
498 : {
499 2 : LWLockRelease(SyncRepLock);
500 2 : announce_next_takeover = !am_sync;
501 2 : return;
502 : }
503 :
504 : /*
505 : * Set the lsn first so that when we wake backends they will release up to
506 : * this location.
507 : */
508 208 : if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
509 : {
510 72 : walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
511 72 : numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
512 : }
513 208 : if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
514 : {
515 86 : walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
516 86 : numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
517 : }
518 208 : if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
519 : {
520 86 : walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
521 86 : numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
522 : }
523 :
524 208 : LWLockRelease(SyncRepLock);
525 :
526 208 : elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
527 : numwrite, LSN_FORMAT_ARGS(writePtr),
528 : numflush, LSN_FORMAT_ARGS(flushPtr),
529 : numapply, LSN_FORMAT_ARGS(applyPtr));
530 : }
531 :
532 : /*
533 : * Calculate the synced Write, Flush and Apply positions among sync standbys.
534 : *
535 : * Return false if the number of sync standbys is less than
536 : * synchronous_standby_names specifies. Otherwise return true and
537 : * store the positions into *writePtr, *flushPtr and *applyPtr.
538 : *
539 : * On return, *am_sync is set to true if this walsender is connecting to
540 : * sync standby. Otherwise it's set to false.
541 : */
542 : static bool
543 210 : SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
544 : XLogRecPtr *applyPtr, bool *am_sync)
545 : {
546 : SyncRepStandbyData *sync_standbys;
547 : int num_standbys;
548 : int i;
549 :
550 : /* Initialize default results */
551 210 : *writePtr = InvalidXLogRecPtr;
552 210 : *flushPtr = InvalidXLogRecPtr;
553 210 : *applyPtr = InvalidXLogRecPtr;
554 210 : *am_sync = false;
555 :
556 : /* Quick out if not even configured to be synchronous */
557 210 : if (SyncRepConfig == NULL)
558 0 : return false;
559 :
560 : /* Get standbys that are considered as synchronous at this moment */
561 210 : num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
562 :
563 : /* Am I among the candidate sync standbys? */
564 212 : for (i = 0; i < num_standbys; i++)
565 : {
566 210 : if (sync_standbys[i].is_me)
567 : {
568 208 : *am_sync = true;
569 208 : break;
570 : }
571 : }
572 :
573 : /*
574 : * Nothing more to do if we are not managing a sync standby or there are
575 : * not enough synchronous standbys.
576 : */
577 210 : if (!(*am_sync) ||
578 208 : num_standbys < SyncRepConfig->num_sync)
579 : {
580 2 : pfree(sync_standbys);
581 2 : return false;
582 : }
583 :
584 : /*
585 : * In a priority-based sync replication, the synced positions are the
586 : * oldest ones among sync standbys. In a quorum-based, they are the Nth
587 : * latest ones.
588 : *
589 : * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
590 : * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
591 : * because it's a bit more efficient.
592 : *
593 : * XXX If the numbers of current and requested sync standbys are the same,
594 : * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
595 : * positions even in a quorum-based sync replication.
596 : */
597 208 : if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
598 : {
599 208 : SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
600 : sync_standbys, num_standbys);
601 : }
602 : else
603 : {
604 0 : SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
605 : sync_standbys, num_standbys,
606 0 : SyncRepConfig->num_sync);
607 : }
608 :
609 208 : pfree(sync_standbys);
610 208 : return true;
611 : }
612 :
613 : /*
614 : * Calculate the oldest Write, Flush and Apply positions among sync standbys.
615 : */
616 : static void
617 208 : SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
618 : XLogRecPtr *flushPtr,
619 : XLogRecPtr *applyPtr,
620 : SyncRepStandbyData *sync_standbys,
621 : int num_standbys)
622 : {
623 : int i;
624 :
625 : /*
626 : * Scan through all sync standbys and calculate the oldest Write, Flush
627 : * and Apply positions. We assume *writePtr et al were initialized to
628 : * InvalidXLogRecPtr.
629 : */
630 416 : for (i = 0; i < num_standbys; i++)
631 : {
632 208 : XLogRecPtr write = sync_standbys[i].write;
633 208 : XLogRecPtr flush = sync_standbys[i].flush;
634 208 : XLogRecPtr apply = sync_standbys[i].apply;
635 :
636 208 : if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
637 208 : *writePtr = write;
638 208 : if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
639 208 : *flushPtr = flush;
640 208 : if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
641 208 : *applyPtr = apply;
642 : }
643 208 : }
644 :
645 : /*
646 : * Calculate the Nth latest Write, Flush and Apply positions among sync
647 : * standbys.
648 : */
649 : static void
650 0 : SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
651 : XLogRecPtr *flushPtr,
652 : XLogRecPtr *applyPtr,
653 : SyncRepStandbyData *sync_standbys,
654 : int num_standbys,
655 : uint8 nth)
656 : {
657 : XLogRecPtr *write_array;
658 : XLogRecPtr *flush_array;
659 : XLogRecPtr *apply_array;
660 : int i;
661 :
662 : /* Should have enough candidates, or somebody messed up */
663 : Assert(nth > 0 && nth <= num_standbys);
664 :
665 0 : write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
666 0 : flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
667 0 : apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
668 :
669 0 : for (i = 0; i < num_standbys; i++)
670 : {
671 0 : write_array[i] = sync_standbys[i].write;
672 0 : flush_array[i] = sync_standbys[i].flush;
673 0 : apply_array[i] = sync_standbys[i].apply;
674 : }
675 :
676 : /* Sort each array in descending order */
677 0 : qsort(write_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
678 0 : qsort(flush_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
679 0 : qsort(apply_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
680 :
681 : /* Get Nth latest Write, Flush, Apply positions */
682 0 : *writePtr = write_array[nth - 1];
683 0 : *flushPtr = flush_array[nth - 1];
684 0 : *applyPtr = apply_array[nth - 1];
685 :
686 0 : pfree(write_array);
687 0 : pfree(flush_array);
688 0 : pfree(apply_array);
689 0 : }
690 :
691 : /*
692 : * Compare lsn in order to sort array in descending order.
693 : */
694 : static int
695 0 : cmp_lsn(const void *a, const void *b)
696 : {
697 0 : XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
698 0 : XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
699 :
700 0 : return pg_cmp_u64(lsn2, lsn1);
701 : }
702 :
703 : /*
704 : * Return data about walsenders that are candidates to be sync standbys.
705 : *
706 : * *standbys is set to a palloc'd array of structs of per-walsender data,
707 : * and the number of valid entries (candidate sync senders) is returned.
708 : * (This might be more or fewer than num_sync; caller must check.)
709 : */
710 : int
711 1538 : SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
712 : {
713 : int i;
714 : int n;
715 :
716 : /* Create result array */
717 1538 : *standbys = (SyncRepStandbyData *)
718 1538 : palloc(max_wal_senders * sizeof(SyncRepStandbyData));
719 :
720 : /* Quick exit if sync replication is not requested */
721 1538 : if (SyncRepConfig == NULL)
722 1292 : return 0;
723 :
724 : /* Collect raw data from shared memory */
725 246 : n = 0;
726 2706 : for (i = 0; i < max_wal_senders; i++)
727 : {
728 : volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
729 : * rearrangement */
730 : SyncRepStandbyData *stby;
731 : WalSndState state; /* not included in SyncRepStandbyData */
732 :
733 2460 : walsnd = &WalSndCtl->walsnds[i];
734 2460 : stby = *standbys + n;
735 :
736 2460 : SpinLockAcquire(&walsnd->mutex);
737 2460 : stby->pid = walsnd->pid;
738 2460 : state = walsnd->state;
739 2460 : stby->write = walsnd->write;
740 2460 : stby->flush = walsnd->flush;
741 2460 : stby->apply = walsnd->apply;
742 2460 : stby->sync_standby_priority = walsnd->sync_standby_priority;
743 2460 : SpinLockRelease(&walsnd->mutex);
744 :
745 : /* Must be active */
746 2460 : if (stby->pid == 0)
747 2154 : continue;
748 :
749 : /* Must be streaming or stopping */
750 306 : if (state != WALSNDSTATE_STREAMING &&
751 : state != WALSNDSTATE_STOPPING)
752 0 : continue;
753 :
754 : /* Must be synchronous */
755 306 : if (stby->sync_standby_priority == 0)
756 14 : continue;
757 :
758 : /* Must have a valid flush position */
759 292 : if (XLogRecPtrIsInvalid(stby->flush))
760 0 : continue;
761 :
762 : /* OK, it's a candidate */
763 292 : stby->walsnd_index = i;
764 292 : stby->is_me = (walsnd == MyWalSnd);
765 292 : n++;
766 : }
767 :
768 : /*
769 : * In quorum mode, we return all the candidates. In priority mode, if we
770 : * have too many candidates then return only the num_sync ones of highest
771 : * priority.
772 : */
773 246 : if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY &&
774 244 : n > SyncRepConfig->num_sync)
775 : {
776 : /* Sort by priority ... */
777 18 : qsort(*standbys, n, sizeof(SyncRepStandbyData),
778 : standby_priority_comparator);
779 : /* ... then report just the first num_sync ones */
780 18 : n = SyncRepConfig->num_sync;
781 : }
782 :
783 246 : return n;
784 : }
785 :
786 : /*
787 : * qsort comparator to sort SyncRepStandbyData entries by priority
788 : */
789 : static int
790 40 : standby_priority_comparator(const void *a, const void *b)
791 : {
792 40 : const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
793 40 : const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
794 :
795 : /* First, sort by increasing priority value */
796 40 : if (sa->sync_standby_priority != sb->sync_standby_priority)
797 18 : return sa->sync_standby_priority - sb->sync_standby_priority;
798 :
799 : /*
800 : * We might have equal priority values; arbitrarily break ties by position
801 : * in the WalSnd array. (This is utterly bogus, since that is arrival
802 : * order dependent, but there are regression tests that rely on it.)
803 : */
804 22 : return sa->walsnd_index - sb->walsnd_index;
805 : }
806 :
807 :
808 : /*
809 : * Check if we are in the list of sync standbys, and if so, determine
810 : * priority sequence. Return priority if set, or zero to indicate that
811 : * we are not a potential sync standby.
812 : *
813 : * Compare the parameter SyncRepStandbyNames against the application_name
814 : * for this WALSender, or allow any name if we find a wildcard "*".
815 : */
816 : static int
817 1270 : SyncRepGetStandbyPriority(void)
818 : {
819 : const char *standby_name;
820 : int priority;
821 1270 : bool found = false;
822 :
823 : /*
824 : * Since synchronous cascade replication is not allowed, we always set the
825 : * priority of cascading walsender to zero.
826 : */
827 1270 : if (am_cascading_walsender)
828 52 : return 0;
829 :
830 1218 : if (!SyncStandbysDefined() || SyncRepConfig == NULL)
831 1170 : return 0;
832 :
833 48 : standby_name = SyncRepConfig->member_names;
834 64 : for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
835 : {
836 62 : if (pg_strcasecmp(standby_name, application_name) == 0 ||
837 36 : strcmp(standby_name, "*") == 0)
838 : {
839 46 : found = true;
840 46 : break;
841 : }
842 16 : standby_name += strlen(standby_name) + 1;
843 : }
844 :
845 48 : if (!found)
846 2 : return 0;
847 :
848 : /*
849 : * In quorum-based sync replication, all the standbys in the list have the
850 : * same priority, one.
851 : */
852 46 : return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
853 : }
854 :
855 : /*
856 : * Walk the specified queue from head. Set the state of any backends that
857 : * need to be woken, remove them from the queue, and then wake them.
858 : * Pass all = true to wake whole queue; otherwise, just wake up to
859 : * the walsender's LSN.
860 : *
861 : * The caller must hold SyncRepLock in exclusive mode.
862 : */
863 : static int
864 250 : SyncRepWakeQueue(bool all, int mode)
865 : {
866 250 : volatile WalSndCtlData *walsndctl = WalSndCtl;
867 250 : int numprocs = 0;
868 : dlist_mutable_iter iter;
869 :
870 : Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
871 : Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
872 : Assert(SyncRepQueueIsOrderedByLSN(mode));
873 :
874 296 : dlist_foreach_modify(iter, &WalSndCtl->SyncRepQueue[mode])
875 : {
876 62 : PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
877 :
878 : /*
879 : * Assume the queue is ordered by LSN
880 : */
881 62 : if (!all && walsndctl->lsn[mode] < proc->waitLSN)
882 16 : return numprocs;
883 :
884 : /*
885 : * Remove from queue.
886 : */
887 46 : dlist_delete_thoroughly(&proc->syncRepLinks);
888 :
889 : /*
890 : * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
891 : * make sure that it sees the queue link being removed before the
892 : * syncRepState change.
893 : */
894 46 : pg_write_barrier();
895 :
896 : /*
897 : * Set state to complete; see SyncRepWaitForLSN() for discussion of
898 : * the various states.
899 : */
900 46 : proc->syncRepState = SYNC_REP_WAIT_COMPLETE;
901 :
902 : /*
903 : * Wake only when we have set state and removed from queue.
904 : */
905 46 : SetLatch(&(proc->procLatch));
906 :
907 46 : numprocs++;
908 : }
909 :
910 234 : return numprocs;
911 : }
912 :
913 : /*
914 : * The checkpointer calls this as needed to update the shared
915 : * sync_standbys_defined flag, so that backends don't remain permanently wedged
916 : * if synchronous_standby_names is unset. It's safe to check the current value
917 : * without the lock, because it's only ever updated by one process. But we
918 : * must take the lock to change it.
919 : */
920 : void
921 1002 : SyncRepUpdateSyncStandbysDefined(void)
922 : {
923 1002 : bool sync_standbys_defined = SyncStandbysDefined();
924 :
925 1002 : if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
926 : {
927 26 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
928 :
929 : /*
930 : * If synchronous_standby_names has been reset to empty, it's futile
931 : * for backends to continue waiting. Since the user no longer wants
932 : * synchronous replication, we'd better wake them up.
933 : */
934 26 : if (!sync_standbys_defined)
935 : {
936 : int i;
937 :
938 8 : for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
939 6 : SyncRepWakeQueue(true, i);
940 : }
941 :
942 : /*
943 : * Only allow people to join the queue when there are synchronous
944 : * standbys defined. Without this interlock, there's a race
945 : * condition: we might wake up all the current waiters; then, some
946 : * backend that hasn't yet reloaded its config might go to sleep on
947 : * the queue (and never wake up). This prevents that.
948 : */
949 26 : WalSndCtl->sync_standbys_defined = sync_standbys_defined;
950 :
951 26 : LWLockRelease(SyncRepLock);
952 : }
953 1002 : }
954 :
955 : #ifdef USE_ASSERT_CHECKING
956 : static bool
957 : SyncRepQueueIsOrderedByLSN(int mode)
958 : {
959 : XLogRecPtr lastLSN;
960 : dlist_iter iter;
961 :
962 : Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
963 :
964 : lastLSN = 0;
965 :
966 : dlist_foreach(iter, &WalSndCtl->SyncRepQueue[mode])
967 : {
968 : PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
969 :
970 : /*
971 : * Check the queue is ordered by LSN and that multiple procs don't
972 : * have matching LSNs
973 : */
974 : if (proc->waitLSN <= lastLSN)
975 : return false;
976 :
977 : lastLSN = proc->waitLSN;
978 : }
979 :
980 : return true;
981 : }
982 : #endif
983 :
984 : /*
985 : * ===========================================================
986 : * Synchronous Replication functions executed by any process
987 : * ===========================================================
988 : */
989 :
990 : bool
991 2128 : check_synchronous_standby_names(char **newval, void **extra, GucSource source)
992 : {
993 2128 : if (*newval != NULL && (*newval)[0] != '\0')
994 130 : {
995 : yyscan_t scanner;
996 : int parse_rc;
997 : SyncRepConfigData *pconf;
998 :
999 : /* Reset communication variables to ensure a fresh start */
1000 130 : syncrep_parse_result = NULL;
1001 130 : syncrep_parse_error_msg = NULL;
1002 :
1003 : /* Parse the synchronous_standby_names string */
1004 130 : syncrep_scanner_init(*newval, &scanner);
1005 130 : parse_rc = syncrep_yyparse(scanner);
1006 130 : syncrep_scanner_finish(scanner);
1007 :
1008 130 : if (parse_rc != 0 || syncrep_parse_result == NULL)
1009 : {
1010 0 : GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
1011 0 : if (syncrep_parse_error_msg)
1012 0 : GUC_check_errdetail("%s", syncrep_parse_error_msg);
1013 : else
1014 0 : GUC_check_errdetail("\"%s\" parser failed.",
1015 : "synchronous_standby_names");
1016 0 : return false;
1017 : }
1018 :
1019 130 : if (syncrep_parse_result->num_sync <= 0)
1020 : {
1021 0 : GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
1022 0 : syncrep_parse_result->num_sync);
1023 0 : return false;
1024 : }
1025 :
1026 : /* GUC extra value must be guc_malloc'd, not palloc'd */
1027 : pconf = (SyncRepConfigData *)
1028 130 : guc_malloc(LOG, syncrep_parse_result->config_size);
1029 130 : if (pconf == NULL)
1030 0 : return false;
1031 130 : memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);
1032 :
1033 130 : *extra = pconf;
1034 :
1035 : /*
1036 : * We need not explicitly clean up syncrep_parse_result. It, and any
1037 : * other cruft generated during parsing, will be freed when the
1038 : * current memory context is deleted. (This code is generally run in
1039 : * a short-lived context used for config file processing, so that will
1040 : * not be very long.)
1041 : */
1042 : }
1043 : else
1044 1998 : *extra = NULL;
1045 :
1046 2128 : return true;
1047 : }
1048 :
1049 : void
1050 2108 : assign_synchronous_standby_names(const char *newval, void *extra)
1051 : {
1052 2108 : SyncRepConfig = (SyncRepConfigData *) extra;
1053 2108 : }
1054 :
1055 : void
1056 5404 : assign_synchronous_commit(int newval, void *extra)
1057 : {
1058 5404 : switch (newval)
1059 : {
1060 0 : case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
1061 0 : SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
1062 0 : break;
1063 2232 : case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
1064 2232 : SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
1065 2232 : break;
1066 4 : case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
1067 4 : SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
1068 4 : break;
1069 3168 : default:
1070 3168 : SyncRepWaitMode = SYNC_REP_NO_WAIT;
1071 3168 : break;
1072 : }
1073 5404 : }
|