Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * syncrep.c
4 : *
5 : * Synchronous replication is new as of PostgreSQL 9.1.
6 : *
7 : * If requested, transaction commits wait until their commit LSN are
8 : * acknowledged by the synchronous standbys.
9 : *
10 : * This module contains the code for waiting and release of backends.
11 : * All code in this module executes on the primary. The core streaming
12 : * replication transport remains within WALreceiver/WALsender modules.
13 : *
14 : * The essence of this design is that it isolates all logic about
15 : * waiting/releasing onto the primary. The primary defines which standbys
16 : * it wishes to wait for. The standbys are completely unaware of the
17 : * durability requirements of transactions on the primary, reducing the
18 : * complexity of the code and streamlining both standby operations and
19 : * network bandwidth because there is no requirement to ship
20 : * per-transaction state information.
21 : *
22 : * Replication is either synchronous or not synchronous (async). If it is
23 : * async, we just fastpath out of here. If it is sync, then we wait for
24 : * the write, flush or apply location on the standby before releasing
25 : * the waiting backend. Further complexity in that interaction is
26 : * expected in later releases.
27 : *
28 : * The best performing way to manage the waiting backends is to have a
29 : * single ordered queue of waiting backends, so that we can avoid
30 : * searching the through all waiters each time we receive a reply.
31 : *
32 : * In 9.5 or before only a single standby could be considered as
33 : * synchronous. In 9.6 we support a priority-based multiple synchronous
34 : * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
35 : * supported. The number of synchronous standbys that transactions
36 : * must wait for replies from is specified in synchronous_standby_names.
37 : * This parameter also specifies a list of standby names and the method
38 : * (FIRST and ANY) to choose synchronous standbys from the listed ones.
39 : *
40 : * The method FIRST specifies a priority-based synchronous replication
41 : * and makes transaction commits wait until their WAL records are
42 : * replicated to the requested number of synchronous standbys chosen based
43 : * on their priorities. The standbys whose names appear earlier in the list
44 : * are given higher priority and will be considered as synchronous.
45 : * Other standby servers appearing later in this list represent potential
46 : * synchronous standbys. If any of the current synchronous standbys
47 : * disconnects for whatever reason, it will be replaced immediately with
48 : * the next-highest-priority standby.
49 : *
50 : * The method ANY specifies a quorum-based synchronous replication
51 : * and makes transaction commits wait until their WAL records are
52 : * replicated to at least the requested number of synchronous standbys
53 : * in the list. All the standbys appearing in the list are considered as
54 : * candidates for quorum synchronous standbys.
55 : *
56 : * If neither FIRST nor ANY is specified, FIRST is used as the method.
57 : * This is for backward compatibility with 9.6 or before where only a
58 : * priority-based sync replication was supported.
59 : *
60 : * Before the standbys chosen from synchronous_standby_names can
61 : * become the synchronous standbys they must have caught up with
62 : * the primary; that may take some time. Once caught up,
63 : * the standbys which are considered as synchronous at that moment
64 : * will release waiters from the queue.
65 : *
66 : * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
67 : *
68 : * IDENTIFICATION
69 : * src/backend/replication/syncrep.c
70 : *
71 : *-------------------------------------------------------------------------
72 : */
73 : #include "postgres.h"
74 :
75 : #include <unistd.h>
76 :
77 : #include "access/xact.h"
78 : #include "common/int.h"
79 : #include "miscadmin.h"
80 : #include "pgstat.h"
81 : #include "replication/syncrep.h"
82 : #include "replication/walsender.h"
83 : #include "replication/walsender_private.h"
84 : #include "storage/proc.h"
85 : #include "tcop/tcopprot.h"
86 : #include "utils/guc_hooks.h"
87 : #include "utils/ps_status.h"
88 : #include "utils/wait_event.h"
89 :
90 : /* User-settable parameters for sync rep */
91 : char *SyncRepStandbyNames;
92 :
93 : #define SyncStandbysDefined() \
94 : (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
95 :
96 : static bool announce_next_takeover = true;
97 :
98 : SyncRepConfigData *SyncRepConfig = NULL;
99 : static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
100 :
101 : static void SyncRepQueueInsert(int mode);
102 : static void SyncRepCancelWait(void);
103 : static int SyncRepWakeQueue(bool all, int mode);
104 :
105 : static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
106 : XLogRecPtr *flushPtr,
107 : XLogRecPtr *applyPtr,
108 : bool *am_sync);
109 : static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
110 : XLogRecPtr *flushPtr,
111 : XLogRecPtr *applyPtr,
112 : SyncRepStandbyData *sync_standbys,
113 : int num_standbys);
114 : static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
115 : XLogRecPtr *flushPtr,
116 : XLogRecPtr *applyPtr,
117 : SyncRepStandbyData *sync_standbys,
118 : int num_standbys,
119 : uint8 nth);
120 : static int SyncRepGetStandbyPriority(void);
121 : static int standby_priority_comparator(const void *a, const void *b);
122 : static int cmp_lsn(const void *a, const void *b);
123 :
124 : #ifdef USE_ASSERT_CHECKING
125 : static bool SyncRepQueueIsOrderedByLSN(int mode);
126 : #endif
127 :
128 : /*
129 : * ===========================================================
130 : * Synchronous Replication functions for normal user backends
131 : * ===========================================================
132 : */
133 :
134 : /*
135 : * Wait for synchronous replication, if requested by user.
136 : *
137 : * Initially backends start in state SYNC_REP_NOT_WAITING and then
138 : * change that state to SYNC_REP_WAITING before adding ourselves
139 : * to the wait queue. During SyncRepWakeQueue() a WALSender changes
140 : * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
141 : * This backend then resets its state to SYNC_REP_NOT_WAITING.
142 : *
143 : * 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN
144 : * represents a commit record. If it doesn't, then we wait only for the WAL
145 : * to be flushed if synchronous_commit is set to the higher level of
146 : * remote_apply, because only commit records provide apply feedback.
147 : */
148 : void
149 147465 : SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
150 : {
151 : int mode;
152 :
153 : /*
154 : * This should be called while holding interrupts during a transaction
155 : * commit to prevent the follow-up shared memory queue cleanups to be
156 : * influenced by external interruptions.
157 : */
158 : Assert(InterruptHoldoffCount > 0);
159 :
160 : /*
161 : * Fast exit if user has not requested sync replication, or there are no
162 : * sync replication standby names defined.
163 : *
164 : * Since this routine gets called every commit time, it's important to
165 : * exit quickly if sync replication is not requested.
166 : *
167 : * We check WalSndCtl->sync_standbys_status flag without the lock and exit
168 : * immediately if SYNC_STANDBY_INIT is set (the checkpointer has
169 : * initialized this data) but SYNC_STANDBY_DEFINED is missing (no sync
170 : * replication requested).
171 : *
172 : * If SYNC_STANDBY_DEFINED is set, we need to check the status again later
173 : * while holding the lock, to check the flag and operate the sync rep
174 : * queue atomically. This is necessary to avoid the race condition
175 : * described in SyncRepUpdateSyncStandbysDefined(). On the other hand, if
176 : * SYNC_STANDBY_DEFINED is not set, the lock is not necessary because we
177 : * don't touch the queue.
178 : */
179 147465 : if (!SyncRepRequested() ||
180 94942 : ((((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_status) &
181 : (SYNC_STANDBY_INIT | SYNC_STANDBY_DEFINED)) == SYNC_STANDBY_INIT)
182 113659 : return;
183 :
184 : /* Cap the level for anything other than commit to remote flush only. */
185 33806 : if (commit)
186 33786 : mode = SyncRepWaitMode;
187 : else
188 20 : mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
189 :
190 : Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
191 : Assert(WalSndCtl != NULL);
192 :
193 33806 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
194 : Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
195 :
196 : /*
197 : * We don't wait for sync rep if SYNC_STANDBY_DEFINED is not set. See
198 : * SyncRepUpdateSyncStandbysDefined().
199 : *
200 : * Also check that the standby hasn't already replied. Unlikely race
201 : * condition but we'll be fetching that cache line anyway so it's likely
202 : * to be a low cost check.
203 : *
204 : * If the sync standby data has not been initialized yet
205 : * (SYNC_STANDBY_INIT is not set), fall back to a check based on the LSN,
206 : * then do a direct GUC check.
207 : */
208 33806 : if (WalSndCtl->sync_standbys_status & SYNC_STANDBY_INIT)
209 : {
210 42 : if ((WalSndCtl->sync_standbys_status & SYNC_STANDBY_DEFINED) == 0 ||
211 42 : lsn <= WalSndCtl->lsn[mode])
212 : {
213 7 : LWLockRelease(SyncRepLock);
214 7 : return;
215 : }
216 : }
217 33764 : else if (lsn <= WalSndCtl->lsn[mode])
218 : {
219 : /*
220 : * The LSN is older than what we need to wait for. The sync standby
221 : * data has not been initialized yet, but we are OK to not wait
222 : * because we know that there is no point in doing so based on the
223 : * LSN.
224 : */
225 0 : LWLockRelease(SyncRepLock);
226 0 : return;
227 : }
228 33764 : else if (!SyncStandbysDefined())
229 : {
230 : /*
231 : * If we are here, the sync standby data has not been initialized yet,
232 : * and the LSN is newer than what need to wait for, so we have fallen
233 : * back to the best thing we could do in this case: a check on
234 : * SyncStandbysDefined() to see if the GUC is set or not.
235 : *
236 : * When the GUC has a value, we wait until the checkpointer updates
237 : * the status data because we cannot be sure yet if we should wait or
238 : * not. Here, the GUC has *no* value, we are sure that there is no
239 : * point to wait; this matters for example when initializing a
240 : * cluster, where we should never wait, and no sync standbys is the
241 : * default behavior.
242 : */
243 33764 : LWLockRelease(SyncRepLock);
244 33764 : return;
245 : }
246 :
247 : /*
248 : * Set our waitLSN so WALSender will know when to wake us, and add
249 : * ourselves to the queue.
250 : */
251 35 : MyProc->waitLSN = lsn;
252 35 : MyProc->syncRepState = SYNC_REP_WAITING;
253 35 : SyncRepQueueInsert(mode);
254 : Assert(SyncRepQueueIsOrderedByLSN(mode));
255 35 : LWLockRelease(SyncRepLock);
256 :
257 : /* Alter ps display to show waiting for sync rep. */
258 35 : if (update_process_title)
259 : {
260 : char buffer[32];
261 :
262 35 : sprintf(buffer, "waiting for %X/%08X", LSN_FORMAT_ARGS(lsn));
263 35 : set_ps_display_suffix(buffer);
264 : }
265 :
266 : /*
267 : * Wait for specified LSN to be confirmed.
268 : *
269 : * Each proc has its own wait latch, so we perform a normal latch
270 : * check/wait loop here.
271 : */
272 : for (;;)
273 35 : {
274 : int rc;
275 :
276 : /* Must reset the latch before testing state. */
277 70 : ResetLatch(MyLatch);
278 :
279 : /*
280 : * Acquiring the lock is not needed, the latch ensures proper
281 : * barriers. If it looks like we're done, we must really be done,
282 : * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
283 : * it will never update it again, so we can't be seeing a stale value
284 : * in that case.
285 : */
286 70 : if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
287 35 : break;
288 :
289 : /*
290 : * If a wait for synchronous replication is pending, we can neither
291 : * acknowledge the commit nor raise ERROR or FATAL. The latter would
292 : * lead the client to believe that the transaction aborted, which is
293 : * not true: it's already committed locally. The former is no good
294 : * either: the client has requested synchronous replication, and is
295 : * entitled to assume that an acknowledged commit is also replicated,
296 : * which might not be true. So in this case we issue a WARNING (which
297 : * some clients may be able to interpret) and shut off further output.
298 : * We do NOT reset ProcDiePending, so that the process will die after
299 : * the commit is cleaned up.
300 : */
301 35 : if (ProcDiePending)
302 : {
303 0 : ereport(WARNING,
304 : (errcode(ERRCODE_ADMIN_SHUTDOWN),
305 : errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
306 : errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
307 0 : whereToSendOutput = DestNone;
308 0 : SyncRepCancelWait();
309 0 : break;
310 : }
311 :
312 : /*
313 : * It's unclear what to do if a query cancel interrupt arrives. We
314 : * can't actually abort at this point, but ignoring the interrupt
315 : * altogether is not helpful, so we just terminate the wait with a
316 : * suitable warning.
317 : */
318 35 : if (QueryCancelPending)
319 : {
320 0 : QueryCancelPending = false;
321 0 : ereport(WARNING,
322 : (errmsg("canceling wait for synchronous replication due to user request"),
323 : errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
324 0 : SyncRepCancelWait();
325 0 : break;
326 : }
327 :
328 : /*
329 : * Wait on latch. Any condition that should wake us up will set the
330 : * latch, so no need for timeout.
331 : */
332 35 : rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
333 : WAIT_EVENT_SYNC_REP);
334 :
335 : /*
336 : * If the postmaster dies, we'll probably never get an acknowledgment,
337 : * because all the wal sender processes will exit. So just bail out.
338 : */
339 35 : if (rc & WL_POSTMASTER_DEATH)
340 : {
341 0 : ProcDiePending = true;
342 0 : whereToSendOutput = DestNone;
343 0 : SyncRepCancelWait();
344 0 : break;
345 : }
346 : }
347 :
348 : /*
349 : * WalSender has checked our LSN and has removed us from queue. Clean up
350 : * state and leave. It's OK to reset these shared memory fields without
351 : * holding SyncRepLock, because any walsenders will ignore us anyway when
352 : * we're not on the queue. We need a read barrier to make sure we see the
353 : * changes to the queue link (this might be unnecessary without
354 : * assertions, but better safe than sorry).
355 : */
356 35 : pg_read_barrier();
357 : Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
358 35 : MyProc->syncRepState = SYNC_REP_NOT_WAITING;
359 35 : MyProc->waitLSN = InvalidXLogRecPtr;
360 :
361 : /* reset ps display to remove the suffix */
362 35 : if (update_process_title)
363 35 : set_ps_display_remove_suffix();
364 : }
365 :
366 : /*
367 : * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
368 : *
369 : * Usually we will go at tail of queue, though it's possible that we arrive
370 : * here out of order, so start at tail and work back to insertion point.
371 : */
372 : static void
373 35 : SyncRepQueueInsert(int mode)
374 : {
375 : dlist_head *queue;
376 : dlist_iter iter;
377 :
378 : Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
379 35 : queue = &WalSndCtl->SyncRepQueue[mode];
380 :
381 35 : dlist_reverse_foreach(iter, queue)
382 : {
383 0 : PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
384 :
385 : /*
386 : * Stop at the queue element that we should insert after to ensure the
387 : * queue is ordered by LSN.
388 : */
389 0 : if (proc->waitLSN < MyProc->waitLSN)
390 : {
391 0 : dlist_insert_after(&proc->syncRepLinks, &MyProc->syncRepLinks);
392 0 : return;
393 : }
394 : }
395 :
396 : /*
397 : * If we get here, the list was either empty, or this process needs to be
398 : * at the head.
399 : */
400 35 : dlist_push_head(queue, &MyProc->syncRepLinks);
401 : }
402 :
403 : /*
404 : * Acquire SyncRepLock and cancel any wait currently in progress.
405 : */
406 : static void
407 0 : SyncRepCancelWait(void)
408 : {
409 0 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
410 0 : if (!dlist_node_is_detached(&MyProc->syncRepLinks))
411 0 : dlist_delete_thoroughly(&MyProc->syncRepLinks);
412 0 : MyProc->syncRepState = SYNC_REP_NOT_WAITING;
413 0 : LWLockRelease(SyncRepLock);
414 0 : }
415 :
416 : void
417 19358 : SyncRepCleanupAtProcExit(void)
418 : {
419 : /*
420 : * First check if we are removed from the queue without the lock to not
421 : * slow down backend exit.
422 : */
423 19358 : if (!dlist_node_is_detached(&MyProc->syncRepLinks))
424 : {
425 0 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
426 :
427 : /* maybe we have just been removed, so recheck */
428 0 : if (!dlist_node_is_detached(&MyProc->syncRepLinks))
429 0 : dlist_delete_thoroughly(&MyProc->syncRepLinks);
430 :
431 0 : LWLockRelease(SyncRepLock);
432 : }
433 19358 : }
434 :
435 : /*
436 : * ===========================================================
437 : * Synchronous Replication functions for wal sender processes
438 : * ===========================================================
439 : */
440 :
441 : /*
442 : * Take any action required to initialise sync rep state from config
443 : * data. Called at WALSender startup and after each SIGHUP.
444 : */
445 : void
446 789 : SyncRepInitConfig(void)
447 : {
448 : int priority;
449 :
450 : /*
451 : * Determine if we are a potential sync standby and remember the result
452 : * for handling replies from standby.
453 : */
454 789 : priority = SyncRepGetStandbyPriority();
455 789 : if (MyWalSnd->sync_standby_priority != priority)
456 : {
457 18 : SpinLockAcquire(&MyWalSnd->mutex);
458 18 : MyWalSnd->sync_standby_priority = priority;
459 18 : SpinLockRelease(&MyWalSnd->mutex);
460 :
461 18 : ereport(DEBUG1,
462 : (errmsg_internal("standby \"%s\" now has synchronous standby priority %d",
463 : application_name, priority)));
464 : }
465 789 : }
466 :
467 : /*
468 : * Update the LSNs on each queue based upon our latest state. This
469 : * implements a simple policy of first-valid-sync-standby-releases-waiter.
470 : *
471 : * Other policies are possible, which would change what we do here and
472 : * perhaps also which information we store as well.
473 : */
474 : void
475 112838 : SyncRepReleaseWaiters(void)
476 : {
477 112838 : volatile WalSndCtlData *walsndctl = WalSndCtl;
478 : XLogRecPtr writePtr;
479 : XLogRecPtr flushPtr;
480 : XLogRecPtr applyPtr;
481 : bool got_recptr;
482 : bool am_sync;
483 112838 : int numwrite = 0;
484 112838 : int numflush = 0;
485 112838 : int numapply = 0;
486 :
487 : /*
488 : * If this WALSender is serving a standby that is not on the list of
489 : * potential sync standbys then we have nothing to do. If we are still
490 : * starting up, still running base backup or the current flush position is
491 : * still invalid, then leave quickly also. Streaming or stopping WAL
492 : * senders are allowed to release waiters.
493 : */
494 112838 : if (MyWalSnd->sync_standby_priority == 0 ||
495 366 : (MyWalSnd->state != WALSNDSTATE_STREAMING &&
496 233 : MyWalSnd->state != WALSNDSTATE_STOPPING) ||
497 357 : !XLogRecPtrIsValid(MyWalSnd->flush))
498 : {
499 112481 : announce_next_takeover = true;
500 112486 : return;
501 : }
502 :
503 : /*
504 : * We're a potential sync standby. Release waiters if there are enough
505 : * sync standbys and we are considered as sync.
506 : */
507 357 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
508 :
509 : /*
510 : * Check whether we are a sync standby or not, and calculate the synced
511 : * positions among all sync standbys. (Note: although this step does not
512 : * of itself require holding SyncRepLock, it seems like a good idea to do
513 : * it after acquiring the lock. This ensures that the WAL pointers we use
514 : * to release waiters are newer than any previous execution of this
515 : * routine used.)
516 : */
517 357 : got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
518 :
519 : /*
520 : * If we are managing a sync standby, though we weren't prior to this,
521 : * then announce we are now a sync standby.
522 : */
523 357 : if (announce_next_takeover && am_sync)
524 : {
525 13 : announce_next_takeover = false;
526 :
527 13 : if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
528 13 : ereport(LOG,
529 : (errmsg("standby \"%s\" is now a synchronous standby with priority %d",
530 : application_name, MyWalSnd->sync_standby_priority)));
531 : else
532 0 : ereport(LOG,
533 : (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
534 : application_name)));
535 : }
536 :
537 : /*
538 : * If the number of sync standbys is less than requested or we aren't
539 : * managing a sync standby then just leave.
540 : */
541 357 : if (!got_recptr || !am_sync)
542 : {
543 5 : LWLockRelease(SyncRepLock);
544 5 : announce_next_takeover = !am_sync;
545 5 : return;
546 : }
547 :
548 : /*
549 : * Set the lsn first so that when we wake backends they will release up to
550 : * this location.
551 : */
552 352 : if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
553 : {
554 54 : walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
555 54 : numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
556 : }
557 352 : if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
558 : {
559 59 : walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
560 59 : numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
561 : }
562 352 : if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
563 : {
564 54 : walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
565 54 : numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
566 : }
567 :
568 352 : LWLockRelease(SyncRepLock);
569 :
570 352 : elog(DEBUG3, "released %d procs up to write %X/%08X, %d procs up to flush %X/%08X, %d procs up to apply %X/%08X",
571 : numwrite, LSN_FORMAT_ARGS(writePtr),
572 : numflush, LSN_FORMAT_ARGS(flushPtr),
573 : numapply, LSN_FORMAT_ARGS(applyPtr));
574 : }
575 :
576 : /*
577 : * Calculate the synced Write, Flush and Apply positions among sync standbys.
578 : *
579 : * Return false if the number of sync standbys is less than
580 : * synchronous_standby_names specifies. Otherwise return true and
581 : * store the positions into *writePtr, *flushPtr and *applyPtr.
582 : *
583 : * On return, *am_sync is set to true if this walsender is connecting to
584 : * sync standby. Otherwise it's set to false.
585 : */
586 : static bool
587 357 : SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
588 : XLogRecPtr *applyPtr, bool *am_sync)
589 : {
590 : SyncRepStandbyData *sync_standbys;
591 : int num_standbys;
592 : int i;
593 :
594 : /* Initialize default results */
595 357 : *writePtr = InvalidXLogRecPtr;
596 357 : *flushPtr = InvalidXLogRecPtr;
597 357 : *applyPtr = InvalidXLogRecPtr;
598 357 : *am_sync = false;
599 :
600 : /* Quick out if not even configured to be synchronous */
601 357 : if (SyncRepConfig == NULL)
602 0 : return false;
603 :
604 : /* Get standbys that are considered as synchronous at this moment */
605 357 : num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
606 :
607 : /* Am I among the candidate sync standbys? */
608 364 : for (i = 0; i < num_standbys; i++)
609 : {
610 360 : if (sync_standbys[i].is_me)
611 : {
612 353 : *am_sync = true;
613 353 : break;
614 : }
615 : }
616 :
617 : /*
618 : * Nothing more to do if we are not managing a sync standby or there are
619 : * not enough synchronous standbys.
620 : */
621 357 : if (!(*am_sync) ||
622 353 : num_standbys < SyncRepConfig->num_sync)
623 : {
624 5 : pfree(sync_standbys);
625 5 : return false;
626 : }
627 :
628 : /*
629 : * In a priority-based sync replication, the synced positions are the
630 : * oldest ones among sync standbys. In a quorum-based, they are the Nth
631 : * latest ones.
632 : *
633 : * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
634 : * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
635 : * because it's a bit more efficient.
636 : *
637 : * XXX If the numbers of current and requested sync standbys are the same,
638 : * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
639 : * positions even in a quorum-based sync replication.
640 : */
641 352 : if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
642 : {
643 352 : SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
644 : sync_standbys, num_standbys);
645 : }
646 : else
647 : {
648 0 : SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
649 : sync_standbys, num_standbys,
650 0 : SyncRepConfig->num_sync);
651 : }
652 :
653 352 : pfree(sync_standbys);
654 352 : return true;
655 : }
656 :
657 : /*
658 : * Calculate the oldest Write, Flush and Apply positions among sync standbys.
659 : */
660 : static void
661 352 : SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
662 : XLogRecPtr *flushPtr,
663 : XLogRecPtr *applyPtr,
664 : SyncRepStandbyData *sync_standbys,
665 : int num_standbys)
666 : {
667 : int i;
668 :
669 : /*
670 : * Scan through all sync standbys and calculate the oldest Write, Flush
671 : * and Apply positions. We assume *writePtr et al were initialized to
672 : * InvalidXLogRecPtr.
673 : */
674 707 : for (i = 0; i < num_standbys; i++)
675 : {
676 355 : XLogRecPtr write = sync_standbys[i].write;
677 355 : XLogRecPtr flush = sync_standbys[i].flush;
678 355 : XLogRecPtr apply = sync_standbys[i].apply;
679 :
680 355 : if (!XLogRecPtrIsValid(*writePtr) || *writePtr > write)
681 352 : *writePtr = write;
682 355 : if (!XLogRecPtrIsValid(*flushPtr) || *flushPtr > flush)
683 352 : *flushPtr = flush;
684 355 : if (!XLogRecPtrIsValid(*applyPtr) || *applyPtr > apply)
685 352 : *applyPtr = apply;
686 : }
687 352 : }
688 :
689 : /*
690 : * Calculate the Nth latest Write, Flush and Apply positions among sync
691 : * standbys.
692 : */
693 : static void
694 0 : SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
695 : XLogRecPtr *flushPtr,
696 : XLogRecPtr *applyPtr,
697 : SyncRepStandbyData *sync_standbys,
698 : int num_standbys,
699 : uint8 nth)
700 : {
701 : XLogRecPtr *write_array;
702 : XLogRecPtr *flush_array;
703 : XLogRecPtr *apply_array;
704 : int i;
705 :
706 : /* Should have enough candidates, or somebody messed up */
707 : Assert(nth > 0 && nth <= num_standbys);
708 :
709 0 : write_array = palloc_array(XLogRecPtr, num_standbys);
710 0 : flush_array = palloc_array(XLogRecPtr, num_standbys);
711 0 : apply_array = palloc_array(XLogRecPtr, num_standbys);
712 :
713 0 : for (i = 0; i < num_standbys; i++)
714 : {
715 0 : write_array[i] = sync_standbys[i].write;
716 0 : flush_array[i] = sync_standbys[i].flush;
717 0 : apply_array[i] = sync_standbys[i].apply;
718 : }
719 :
720 : /* Sort each array in descending order */
721 0 : qsort(write_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
722 0 : qsort(flush_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
723 0 : qsort(apply_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
724 :
725 : /* Get Nth latest Write, Flush, Apply positions */
726 0 : *writePtr = write_array[nth - 1];
727 0 : *flushPtr = flush_array[nth - 1];
728 0 : *applyPtr = apply_array[nth - 1];
729 :
730 0 : pfree(write_array);
731 0 : pfree(flush_array);
732 0 : pfree(apply_array);
733 0 : }
734 :
735 : /*
736 : * Compare lsn in order to sort array in descending order.
737 : */
738 : static int
739 0 : cmp_lsn(const void *a, const void *b)
740 : {
741 0 : XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
742 0 : XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
743 :
744 0 : return pg_cmp_u64(lsn2, lsn1);
745 : }
746 :
747 : /*
748 : * Return data about walsenders that are candidates to be sync standbys.
749 : *
750 : * *standbys is set to a palloc'd array of structs of per-walsender data,
751 : * and the number of valid entries (candidate sync senders) is returned.
752 : * (This might be more or fewer than num_sync; caller must check.)
753 : */
754 : int
755 991 : SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
756 : {
757 : int i;
758 : int n;
759 :
760 : /* Create result array */
761 991 : *standbys = palloc_array(SyncRepStandbyData, max_wal_senders);
762 :
763 : /* Quick exit if sync replication is not requested */
764 991 : if (SyncRepConfig == NULL)
765 617 : return 0;
766 :
767 : /* Collect raw data from shared memory */
768 374 : n = 0;
769 4114 : for (i = 0; i < max_wal_senders; i++)
770 : {
771 : volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
772 : * rearrangement */
773 : SyncRepStandbyData *stby;
774 : WalSndState state; /* not included in SyncRepStandbyData */
775 :
776 3740 : walsnd = &WalSndCtl->walsnds[i];
777 3740 : stby = *standbys + n;
778 :
779 3740 : SpinLockAcquire(&walsnd->mutex);
780 3740 : stby->pid = walsnd->pid;
781 3740 : state = walsnd->state;
782 3740 : stby->write = walsnd->write;
783 3740 : stby->flush = walsnd->flush;
784 3740 : stby->apply = walsnd->apply;
785 3740 : stby->sync_standby_priority = walsnd->sync_standby_priority;
786 3740 : SpinLockRelease(&walsnd->mutex);
787 :
788 : /* Must be active */
789 3740 : if (stby->pid == 0)
790 3317 : continue;
791 :
792 : /* Must be streaming or stopping */
793 423 : if (state != WALSNDSTATE_STREAMING &&
794 : state != WALSNDSTATE_STOPPING)
795 0 : continue;
796 :
797 : /* Must be synchronous */
798 423 : if (stby->sync_standby_priority == 0)
799 12 : continue;
800 :
801 : /* Must have a valid flush position */
802 411 : if (!XLogRecPtrIsValid(stby->flush))
803 0 : continue;
804 :
805 : /* OK, it's a candidate */
806 411 : stby->walsnd_index = i;
807 411 : stby->is_me = (walsnd == MyWalSnd);
808 411 : n++;
809 : }
810 :
811 : /*
812 : * In quorum mode, we return all the candidates. In priority mode, if we
813 : * have too many candidates then return only the num_sync ones of highest
814 : * priority.
815 : */
816 374 : if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY &&
817 373 : n > SyncRepConfig->num_sync)
818 : {
819 : /* Sort by priority ... */
820 15 : qsort(*standbys, n, sizeof(SyncRepStandbyData),
821 : standby_priority_comparator);
822 : /* ... then report just the first num_sync ones */
823 15 : n = SyncRepConfig->num_sync;
824 : }
825 :
826 374 : return n;
827 : }
828 :
829 : /*
830 : * qsort comparator to sort SyncRepStandbyData entries by priority
831 : */
832 : static int
833 36 : standby_priority_comparator(const void *a, const void *b)
834 : {
835 36 : const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
836 36 : const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
837 :
838 : /* First, sort by increasing priority value */
839 36 : if (sa->sync_standby_priority != sb->sync_standby_priority)
840 19 : return sa->sync_standby_priority - sb->sync_standby_priority;
841 :
842 : /*
843 : * We might have equal priority values; arbitrarily break ties by position
844 : * in the WalSnd array. (This is utterly bogus, since that is arrival
845 : * order dependent, but there are regression tests that rely on it.)
846 : */
847 17 : return sa->walsnd_index - sb->walsnd_index;
848 : }
849 :
850 :
851 : /*
852 : * Check if we are in the list of sync standbys, and if so, determine
853 : * priority sequence. Return priority if set, or zero to indicate that
854 : * we are not a potential sync standby.
855 : *
856 : * Compare the parameter SyncRepStandbyNames against the application_name
857 : * for this WALSender, or allow any name if we find a wildcard "*".
858 : */
859 : static int
860 789 : SyncRepGetStandbyPriority(void)
861 : {
862 : const char *standby_name;
863 : int priority;
864 789 : bool found = false;
865 :
866 : /*
867 : * Since synchronous cascade replication is not allowed, we always set the
868 : * priority of cascading walsender to zero.
869 : */
870 789 : if (am_cascading_walsender)
871 27 : return 0;
872 :
873 762 : if (!SyncStandbysDefined() || SyncRepConfig == NULL)
874 737 : return 0;
875 :
876 25 : standby_name = SyncRepConfig->member_names;
877 33 : for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
878 : {
879 32 : if (pg_strcasecmp(standby_name, application_name) == 0 ||
880 18 : strcmp(standby_name, "*") == 0)
881 : {
882 24 : found = true;
883 24 : break;
884 : }
885 8 : standby_name += strlen(standby_name) + 1;
886 : }
887 :
888 25 : if (!found)
889 1 : return 0;
890 :
891 : /*
892 : * In quorum-based sync replication, all the standbys in the list have the
893 : * same priority, one.
894 : */
895 24 : return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
896 : }
897 :
898 : /*
899 : * Walk the specified queue from head. Set the state of any backends that
900 : * need to be woken, remove them from the queue, and then wake them.
901 : * Pass all = true to wake whole queue; otherwise, just wake up to
902 : * the walsender's LSN.
903 : *
904 : * The caller must hold SyncRepLock in exclusive mode.
905 : */
906 : static int
907 170 : SyncRepWakeQueue(bool all, int mode)
908 : {
909 170 : volatile WalSndCtlData *walsndctl = WalSndCtl;
910 170 : int numprocs = 0;
911 : dlist_mutable_iter iter;
912 :
913 : Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
914 : Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
915 : Assert(SyncRepQueueIsOrderedByLSN(mode));
916 :
917 195 : dlist_foreach_modify(iter, &WalSndCtl->SyncRepQueue[mode])
918 : {
919 28 : PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
920 :
921 : /*
922 : * Assume the queue is ordered by LSN
923 : */
924 28 : if (!all && walsndctl->lsn[mode] < proc->waitLSN)
925 3 : return numprocs;
926 :
927 : /*
928 : * Remove from queue.
929 : */
930 25 : dlist_delete_thoroughly(&proc->syncRepLinks);
931 :
932 : /*
933 : * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
934 : * make sure that it sees the queue link being removed before the
935 : * syncRepState change.
936 : */
937 25 : pg_write_barrier();
938 :
939 : /*
940 : * Set state to complete; see SyncRepWaitForLSN() for discussion of
941 : * the various states.
942 : */
943 25 : proc->syncRepState = SYNC_REP_WAIT_COMPLETE;
944 :
945 : /*
946 : * Wake only when we have set state and removed from queue.
947 : */
948 25 : SetLatch(&(proc->procLatch));
949 :
950 25 : numprocs++;
951 : }
952 :
953 167 : return numprocs;
954 : }
955 :
956 : /*
957 : * The checkpointer calls this as needed to update the shared
958 : * sync_standbys_status flag, so that backends don't remain permanently wedged
959 : * if synchronous_standby_names is unset. It's safe to check the current value
960 : * without the lock, because it's only ever updated by one process. But we
961 : * must take the lock to change it.
962 : */
963 : void
964 685 : SyncRepUpdateSyncStandbysDefined(void)
965 : {
966 685 : bool sync_standbys_defined = SyncStandbysDefined();
967 :
968 685 : if (sync_standbys_defined !=
969 685 : ((WalSndCtl->sync_standbys_status & SYNC_STANDBY_DEFINED) != 0))
970 : {
971 14 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
972 :
973 : /*
974 : * If synchronous_standby_names has been reset to empty, it's futile
975 : * for backends to continue waiting. Since the user no longer wants
976 : * synchronous replication, we'd better wake them up.
977 : */
978 14 : if (!sync_standbys_defined)
979 : {
980 : int i;
981 :
982 4 : for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
983 3 : SyncRepWakeQueue(true, i);
984 : }
985 :
986 : /*
987 : * Only allow people to join the queue when there are synchronous
988 : * standbys defined. Without this interlock, there's a race
989 : * condition: we might wake up all the current waiters; then, some
990 : * backend that hasn't yet reloaded its config might go to sleep on
991 : * the queue (and never wake up). This prevents that.
992 : */
993 14 : WalSndCtl->sync_standbys_status = SYNC_STANDBY_INIT |
994 : (sync_standbys_defined ? SYNC_STANDBY_DEFINED : 0);
995 :
996 14 : LWLockRelease(SyncRepLock);
997 : }
998 671 : else if ((WalSndCtl->sync_standbys_status & SYNC_STANDBY_INIT) == 0)
999 : {
1000 597 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
1001 :
1002 : /*
1003 : * Note that there is no need to wake up the queues here. We would
1004 : * reach this path only if SyncStandbysDefined() returns false, or it
1005 : * would mean that some backends are waiting with the GUC set. See
1006 : * SyncRepWaitForLSN().
1007 : */
1008 : Assert(!SyncStandbysDefined());
1009 :
1010 : /*
1011 : * Even if there is no sync standby defined, let the readers of this
1012 : * information know that the sync standby data has been initialized.
1013 : * This can just be done once, hence the previous check on
1014 : * SYNC_STANDBY_INIT to avoid useless work.
1015 : */
1016 597 : WalSndCtl->sync_standbys_status |= SYNC_STANDBY_INIT;
1017 :
1018 597 : LWLockRelease(SyncRepLock);
1019 : }
1020 685 : }
1021 :
1022 : #ifdef USE_ASSERT_CHECKING
1023 : static bool
1024 : SyncRepQueueIsOrderedByLSN(int mode)
1025 : {
1026 : XLogRecPtr lastLSN;
1027 : dlist_iter iter;
1028 :
1029 : Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
1030 :
1031 : lastLSN = InvalidXLogRecPtr;
1032 :
1033 : dlist_foreach(iter, &WalSndCtl->SyncRepQueue[mode])
1034 : {
1035 : PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
1036 :
1037 : /*
1038 : * Check the queue is ordered by LSN and that multiple procs don't
1039 : * have matching LSNs
1040 : */
1041 : if (proc->waitLSN <= lastLSN)
1042 : return false;
1043 :
1044 : lastLSN = proc->waitLSN;
1045 : }
1046 :
1047 : return true;
1048 : }
1049 : #endif
1050 :
1051 : /*
1052 : * ===========================================================
1053 : * Synchronous Replication functions executed by any process
1054 : * ===========================================================
1055 : */
1056 :
1057 : bool
1058 1312 : check_synchronous_standby_names(char **newval, void **extra, GucSource source)
1059 : {
1060 1312 : if (*newval != NULL && (*newval)[0] != '\0')
1061 77 : {
1062 : yyscan_t scanner;
1063 : int parse_rc;
1064 : SyncRepConfigData *pconf;
1065 :
1066 : /* Result of parsing is returned in one of these two variables */
1067 77 : SyncRepConfigData *syncrep_parse_result = NULL;
1068 77 : char *syncrep_parse_error_msg = NULL;
1069 :
1070 : /* Parse the synchronous_standby_names string */
1071 77 : syncrep_scanner_init(*newval, &scanner);
1072 77 : parse_rc = syncrep_yyparse(&syncrep_parse_result, &syncrep_parse_error_msg, scanner);
1073 77 : syncrep_scanner_finish(scanner);
1074 :
1075 77 : if (parse_rc != 0 || syncrep_parse_result == NULL)
1076 : {
1077 0 : GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
1078 0 : if (syncrep_parse_error_msg)
1079 0 : GUC_check_errdetail("%s", syncrep_parse_error_msg);
1080 : else
1081 : /* translator: %s is a GUC name */
1082 0 : GUC_check_errdetail("\"%s\" parser failed.",
1083 : "synchronous_standby_names");
1084 0 : return false;
1085 : }
1086 :
1087 77 : if (syncrep_parse_result->num_sync <= 0)
1088 : {
1089 0 : GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
1090 0 : syncrep_parse_result->num_sync);
1091 0 : return false;
1092 : }
1093 :
1094 : /* GUC extra value must be guc_malloc'd, not palloc'd */
1095 : pconf = (SyncRepConfigData *)
1096 77 : guc_malloc(LOG, syncrep_parse_result->config_size);
1097 77 : if (pconf == NULL)
1098 0 : return false;
1099 77 : memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);
1100 :
1101 77 : *extra = pconf;
1102 :
1103 : /*
1104 : * We need not explicitly clean up syncrep_parse_result. It, and any
1105 : * other cruft generated during parsing, will be freed when the
1106 : * current memory context is deleted. (This code is generally run in
1107 : * a short-lived context used for config file processing, so that will
1108 : * not be very long.)
1109 : */
1110 : }
1111 : else
1112 1235 : *extra = NULL;
1113 :
1114 1312 : return true;
1115 : }
1116 :
1117 : void
1118 1302 : assign_synchronous_standby_names(const char *newval, void *extra)
1119 : {
1120 1302 : SyncRepConfig = (SyncRepConfigData *) extra;
1121 1302 : }
1122 :
1123 : void
1124 3397 : assign_synchronous_commit(int newval, void *extra)
1125 : {
1126 3397 : switch (newval)
1127 : {
1128 0 : case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
1129 0 : SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
1130 0 : break;
1131 1362 : case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
1132 1362 : SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
1133 1362 : break;
1134 2 : case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
1135 2 : SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
1136 2 : break;
1137 2033 : default:
1138 2033 : SyncRepWaitMode = SYNC_REP_NO_WAIT;
1139 2033 : break;
1140 : }
1141 3397 : }
|