LCOV - code coverage report
Current view: top level - src/backend/replication - syncrep.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 210 284 73.9 %
Date: 2024-02-22 01:11:03 Functions: 15 18 83.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * syncrep.c
       4             :  *
       5             :  * Synchronous replication is new as of PostgreSQL 9.1.
       6             :  *
       7             :  * If requested, transaction commits wait until their commit LSN are
       8             :  * acknowledged by the synchronous standbys.
       9             :  *
      10             :  * This module contains the code for waiting and release of backends.
      11             :  * All code in this module executes on the primary. The core streaming
      12             :  * replication transport remains within WALreceiver/WALsender modules.
      13             :  *
      14             :  * The essence of this design is that it isolates all logic about
      15             :  * waiting/releasing onto the primary. The primary defines which standbys
      16             :  * it wishes to wait for. The standbys are completely unaware of the
      17             :  * durability requirements of transactions on the primary, reducing the
      18             :  * complexity of the code and streamlining both standby operations and
      19             :  * network bandwidth because there is no requirement to ship
      20             :  * per-transaction state information.
      21             :  *
      22             :  * Replication is either synchronous or not synchronous (async). If it is
      23             :  * async, we just fastpath out of here. If it is sync, then we wait for
      24             :  * the write, flush or apply location on the standby before releasing
      25             :  * the waiting backend. Further complexity in that interaction is
      26             :  * expected in later releases.
      27             :  *
      28             :  * The best performing way to manage the waiting backends is to have a
      29             :  * single ordered queue of waiting backends, so that we can avoid
      30             :  * searching the through all waiters each time we receive a reply.
      31             :  *
      32             :  * In 9.5 or before only a single standby could be considered as
      33             :  * synchronous. In 9.6 we support a priority-based multiple synchronous
      34             :  * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
      35             :  * supported. The number of synchronous standbys that transactions
      36             :  * must wait for replies from is specified in synchronous_standby_names.
      37             :  * This parameter also specifies a list of standby names and the method
      38             :  * (FIRST and ANY) to choose synchronous standbys from the listed ones.
      39             :  *
      40             :  * The method FIRST specifies a priority-based synchronous replication
      41             :  * and makes transaction commits wait until their WAL records are
      42             :  * replicated to the requested number of synchronous standbys chosen based
      43             :  * on their priorities. The standbys whose names appear earlier in the list
      44             :  * are given higher priority and will be considered as synchronous.
      45             :  * Other standby servers appearing later in this list represent potential
      46             :  * synchronous standbys. If any of the current synchronous standbys
      47             :  * disconnects for whatever reason, it will be replaced immediately with
      48             :  * the next-highest-priority standby.
      49             :  *
      50             :  * The method ANY specifies a quorum-based synchronous replication
      51             :  * and makes transaction commits wait until their WAL records are
      52             :  * replicated to at least the requested number of synchronous standbys
      53             :  * in the list. All the standbys appearing in the list are considered as
      54             :  * candidates for quorum synchronous standbys.
      55             :  *
      56             :  * If neither FIRST nor ANY is specified, FIRST is used as the method.
      57             :  * This is for backward compatibility with 9.6 or before where only a
      58             :  * priority-based sync replication was supported.
      59             :  *
      60             :  * Before the standbys chosen from synchronous_standby_names can
      61             :  * become the synchronous standbys they must have caught up with
      62             :  * the primary; that may take some time. Once caught up,
      63             :  * the standbys which are considered as synchronous at that moment
      64             :  * will release waiters from the queue.
      65             :  *
      66             :  * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
      67             :  *
      68             :  * IDENTIFICATION
      69             :  *    src/backend/replication/syncrep.c
      70             :  *
      71             :  *-------------------------------------------------------------------------
      72             :  */
      73             : #include "postgres.h"
      74             : 
      75             : #include <unistd.h>
      76             : 
      77             : #include "access/xact.h"
      78             : #include "common/int.h"
      79             : #include "miscadmin.h"
      80             : #include "pgstat.h"
      81             : #include "replication/syncrep.h"
      82             : #include "replication/walsender.h"
      83             : #include "replication/walsender_private.h"
      84             : #include "storage/pmsignal.h"
      85             : #include "storage/proc.h"
      86             : #include "tcop/tcopprot.h"
      87             : #include "utils/builtins.h"
      88             : #include "utils/guc_hooks.h"
      89             : #include "utils/ps_status.h"
      90             : 
      91             : /* User-settable parameters for sync rep */
      92             : char       *SyncRepStandbyNames;
      93             : 
      94             : #define SyncStandbysDefined() \
      95             :     (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
      96             : 
      97             : static bool announce_next_takeover = true;
      98             : 
      99             : SyncRepConfigData *SyncRepConfig = NULL;
     100             : static int  SyncRepWaitMode = SYNC_REP_NO_WAIT;
     101             : 
     102             : static void SyncRepQueueInsert(int mode);
     103             : static void SyncRepCancelWait(void);
     104             : static int  SyncRepWakeQueue(bool all, int mode);
     105             : 
     106             : static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
     107             :                                  XLogRecPtr *flushPtr,
     108             :                                  XLogRecPtr *applyPtr,
     109             :                                  bool *am_sync);
     110             : static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
     111             :                                        XLogRecPtr *flushPtr,
     112             :                                        XLogRecPtr *applyPtr,
     113             :                                        SyncRepStandbyData *sync_standbys,
     114             :                                        int num_standbys);
     115             : static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
     116             :                                           XLogRecPtr *flushPtr,
     117             :                                           XLogRecPtr *applyPtr,
     118             :                                           SyncRepStandbyData *sync_standbys,
     119             :                                           int num_standbys,
     120             :                                           uint8 nth);
     121             : static int  SyncRepGetStandbyPriority(void);
     122             : static int  standby_priority_comparator(const void *a, const void *b);
     123             : static int  cmp_lsn(const void *a, const void *b);
     124             : 
     125             : #ifdef USE_ASSERT_CHECKING
     126             : static bool SyncRepQueueIsOrderedByLSN(int mode);
     127             : #endif
     128             : 
     129             : /*
     130             :  * ===========================================================
     131             :  * Synchronous Replication functions for normal user backends
     132             :  * ===========================================================
     133             :  */
     134             : 
     135             : /*
     136             :  * Wait for synchronous replication, if requested by user.
     137             :  *
     138             :  * Initially backends start in state SYNC_REP_NOT_WAITING and then
     139             :  * change that state to SYNC_REP_WAITING before adding ourselves
     140             :  * to the wait queue. During SyncRepWakeQueue() a WALSender changes
     141             :  * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
     142             :  * This backend then resets its state to SYNC_REP_NOT_WAITING.
     143             :  *
     144             :  * 'lsn' represents the LSN to wait for.  'commit' indicates whether this LSN
     145             :  * represents a commit record.  If it doesn't, then we wait only for the WAL
     146             :  * to be flushed if synchronous_commit is set to the higher level of
     147             :  * remote_apply, because only commit records provide apply feedback.
     148             :  */
     149             : void
     150      192028 : SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
     151             : {
     152             :     int         mode;
     153             : 
     154             :     /*
     155             :      * This should be called while holding interrupts during a transaction
     156             :      * commit to prevent the follow-up shared memory queue cleanups to be
     157             :      * influenced by external interruptions.
     158             :      */
     159             :     Assert(InterruptHoldoffCount > 0);
     160             : 
     161             :     /*
     162             :      * Fast exit if user has not requested sync replication, or there are no
     163             :      * sync replication standby names defined.
     164             :      *
     165             :      * Since this routine gets called every commit time, it's important to
     166             :      * exit quickly if sync replication is not requested. So we check
     167             :      * WalSndCtl->sync_standbys_defined flag without the lock and exit
     168             :      * immediately if it's false. If it's true, we need to check it again
     169             :      * later while holding the lock, to check the flag and operate the sync
     170             :      * rep queue atomically. This is necessary to avoid the race condition
     171             :      * described in SyncRepUpdateSyncStandbysDefined(). On the other hand, if
     172             :      * it's false, the lock is not necessary because we don't touch the queue.
     173             :      */
     174      192028 :     if (!SyncRepRequested() ||
     175      141062 :         !((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
     176      191936 :         return;
     177             : 
     178             :     /* Cap the level for anything other than commit to remote flush only. */
     179          92 :     if (commit)
     180          44 :         mode = SyncRepWaitMode;
     181             :     else
     182          48 :         mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
     183             : 
     184             :     Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
     185             :     Assert(WalSndCtl != NULL);
     186             : 
     187          92 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     188             :     Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
     189             : 
     190             :     /*
     191             :      * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
     192             :      * set.  See SyncRepUpdateSyncStandbysDefined.
     193             :      *
     194             :      * Also check that the standby hasn't already replied. Unlikely race
     195             :      * condition but we'll be fetching that cache line anyway so it's likely
     196             :      * to be a low cost check.
     197             :      */
     198          92 :     if (!WalSndCtl->sync_standbys_defined ||
     199          92 :         lsn <= WalSndCtl->lsn[mode])
     200             :     {
     201           0 :         LWLockRelease(SyncRepLock);
     202           0 :         return;
     203             :     }
     204             : 
     205             :     /*
     206             :      * Set our waitLSN so WALSender will know when to wake us, and add
     207             :      * ourselves to the queue.
     208             :      */
     209          92 :     MyProc->waitLSN = lsn;
     210          92 :     MyProc->syncRepState = SYNC_REP_WAITING;
     211          92 :     SyncRepQueueInsert(mode);
     212             :     Assert(SyncRepQueueIsOrderedByLSN(mode));
     213          92 :     LWLockRelease(SyncRepLock);
     214             : 
     215             :     /* Alter ps display to show waiting for sync rep. */
     216          92 :     if (update_process_title)
     217             :     {
     218             :         char        buffer[32];
     219             : 
     220          92 :         sprintf(buffer, "waiting for %X/%X", LSN_FORMAT_ARGS(lsn));
     221          92 :         set_ps_display_suffix(buffer);
     222             :     }
     223             : 
     224             :     /*
     225             :      * Wait for specified LSN to be confirmed.
     226             :      *
     227             :      * Each proc has its own wait latch, so we perform a normal latch
     228             :      * check/wait loop here.
     229             :      */
     230             :     for (;;)
     231          92 :     {
     232             :         int         rc;
     233             : 
     234             :         /* Must reset the latch before testing state. */
     235         184 :         ResetLatch(MyLatch);
     236             : 
     237             :         /*
     238             :          * Acquiring the lock is not needed, the latch ensures proper
     239             :          * barriers. If it looks like we're done, we must really be done,
     240             :          * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
     241             :          * it will never update it again, so we can't be seeing a stale value
     242             :          * in that case.
     243             :          */
     244         184 :         if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
     245          92 :             break;
     246             : 
     247             :         /*
     248             :          * If a wait for synchronous replication is pending, we can neither
     249             :          * acknowledge the commit nor raise ERROR or FATAL.  The latter would
     250             :          * lead the client to believe that the transaction aborted, which is
     251             :          * not true: it's already committed locally. The former is no good
     252             :          * either: the client has requested synchronous replication, and is
     253             :          * entitled to assume that an acknowledged commit is also replicated,
     254             :          * which might not be true. So in this case we issue a WARNING (which
     255             :          * some clients may be able to interpret) and shut off further output.
     256             :          * We do NOT reset ProcDiePending, so that the process will die after
     257             :          * the commit is cleaned up.
     258             :          */
     259          92 :         if (ProcDiePending)
     260             :         {
     261           0 :             ereport(WARNING,
     262             :                     (errcode(ERRCODE_ADMIN_SHUTDOWN),
     263             :                      errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
     264             :                      errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
     265           0 :             whereToSendOutput = DestNone;
     266           0 :             SyncRepCancelWait();
     267           0 :             break;
     268             :         }
     269             : 
     270             :         /*
     271             :          * It's unclear what to do if a query cancel interrupt arrives.  We
     272             :          * can't actually abort at this point, but ignoring the interrupt
     273             :          * altogether is not helpful, so we just terminate the wait with a
     274             :          * suitable warning.
     275             :          */
     276          92 :         if (QueryCancelPending)
     277             :         {
     278           0 :             QueryCancelPending = false;
     279           0 :             ereport(WARNING,
     280             :                     (errmsg("canceling wait for synchronous replication due to user request"),
     281             :                      errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
     282           0 :             SyncRepCancelWait();
     283           0 :             break;
     284             :         }
     285             : 
     286             :         /*
     287             :          * Wait on latch.  Any condition that should wake us up will set the
     288             :          * latch, so no need for timeout.
     289             :          */
     290          92 :         rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
     291             :                        WAIT_EVENT_SYNC_REP);
     292             : 
     293             :         /*
     294             :          * If the postmaster dies, we'll probably never get an acknowledgment,
     295             :          * because all the wal sender processes will exit. So just bail out.
     296             :          */
     297          92 :         if (rc & WL_POSTMASTER_DEATH)
     298             :         {
     299           0 :             ProcDiePending = true;
     300           0 :             whereToSendOutput = DestNone;
     301           0 :             SyncRepCancelWait();
     302           0 :             break;
     303             :         }
     304             :     }
     305             : 
     306             :     /*
     307             :      * WalSender has checked our LSN and has removed us from queue. Clean up
     308             :      * state and leave.  It's OK to reset these shared memory fields without
     309             :      * holding SyncRepLock, because any walsenders will ignore us anyway when
     310             :      * we're not on the queue.  We need a read barrier to make sure we see the
     311             :      * changes to the queue link (this might be unnecessary without
     312             :      * assertions, but better safe than sorry).
     313             :      */
     314          92 :     pg_read_barrier();
     315             :     Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
     316          92 :     MyProc->syncRepState = SYNC_REP_NOT_WAITING;
     317          92 :     MyProc->waitLSN = 0;
     318             : 
     319             :     /* reset ps display to remove the suffix */
     320          92 :     if (update_process_title)
     321          92 :         set_ps_display_remove_suffix();
     322             : }
     323             : 
     324             : /*
     325             :  * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
     326             :  *
     327             :  * Usually we will go at tail of queue, though it's possible that we arrive
     328             :  * here out of order, so start at tail and work back to insertion point.
     329             :  */
     330             : static void
     331          92 : SyncRepQueueInsert(int mode)
     332             : {
     333             :     dlist_head *queue;
     334             :     dlist_iter  iter;
     335             : 
     336             :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
     337          92 :     queue = &WalSndCtl->SyncRepQueue[mode];
     338             : 
     339          92 :     dlist_reverse_foreach(iter, queue)
     340             :     {
     341           0 :         PGPROC     *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
     342             : 
     343             :         /*
     344             :          * Stop at the queue element that we should insert after to ensure the
     345             :          * queue is ordered by LSN.
     346             :          */
     347           0 :         if (proc->waitLSN < MyProc->waitLSN)
     348             :         {
     349           0 :             dlist_insert_after(&proc->syncRepLinks, &MyProc->syncRepLinks);
     350           0 :             return;
     351             :         }
     352             :     }
     353             : 
     354             :     /*
     355             :      * If we get here, the list was either empty, or this process needs to be
     356             :      * at the head.
     357             :      */
     358          92 :     dlist_push_head(queue, &MyProc->syncRepLinks);
     359             : }
     360             : 
     361             : /*
     362             :  * Acquire SyncRepLock and cancel any wait currently in progress.
     363             :  */
     364             : static void
     365           0 : SyncRepCancelWait(void)
     366             : {
     367           0 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     368           0 :     if (!dlist_node_is_detached(&MyProc->syncRepLinks))
     369           0 :         dlist_delete_thoroughly(&MyProc->syncRepLinks);
     370           0 :     MyProc->syncRepState = SYNC_REP_NOT_WAITING;
     371           0 :     LWLockRelease(SyncRepLock);
     372           0 : }
     373             : 
     374             : void
     375       24964 : SyncRepCleanupAtProcExit(void)
     376             : {
     377             :     /*
     378             :      * First check if we are removed from the queue without the lock to not
     379             :      * slow down backend exit.
     380             :      */
     381       24964 :     if (!dlist_node_is_detached(&MyProc->syncRepLinks))
     382             :     {
     383           0 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     384             : 
     385             :         /* maybe we have just been removed, so recheck */
     386           0 :         if (!dlist_node_is_detached(&MyProc->syncRepLinks))
     387           0 :             dlist_delete_thoroughly(&MyProc->syncRepLinks);
     388             : 
     389           0 :         LWLockRelease(SyncRepLock);
     390             :     }
     391       24964 : }
     392             : 
     393             : /*
     394             :  * ===========================================================
     395             :  * Synchronous Replication functions for wal sender processes
     396             :  * ===========================================================
     397             :  */
     398             : 
     399             : /*
     400             :  * Take any action required to initialise sync rep state from config
     401             :  * data. Called at WALSender startup and after each SIGHUP.
     402             :  */
     403             : void
     404        1088 : SyncRepInitConfig(void)
     405             : {
     406             :     int         priority;
     407             : 
     408             :     /*
     409             :      * Determine if we are a potential sync standby and remember the result
     410             :      * for handling replies from standby.
     411             :      */
     412        1088 :     priority = SyncRepGetStandbyPriority();
     413        1088 :     if (MyWalSnd->sync_standby_priority != priority)
     414             :     {
     415          40 :         SpinLockAcquire(&MyWalSnd->mutex);
     416          40 :         MyWalSnd->sync_standby_priority = priority;
     417          40 :         SpinLockRelease(&MyWalSnd->mutex);
     418             : 
     419          40 :         ereport(DEBUG1,
     420             :                 (errmsg_internal("standby \"%s\" now has synchronous standby priority %d",
     421             :                                  application_name, priority)));
     422             :     }
     423        1088 : }
     424             : 
     425             : /*
     426             :  * Update the LSNs on each queue based upon our latest state. This
     427             :  * implements a simple policy of first-valid-sync-standby-releases-waiter.
     428             :  *
     429             :  * Other policies are possible, which would change what we do here and
     430             :  * perhaps also which information we store as well.
     431             :  */
     432             : void
     433      145226 : SyncRepReleaseWaiters(void)
     434             : {
     435      145226 :     volatile WalSndCtlData *walsndctl = WalSndCtl;
     436             :     XLogRecPtr  writePtr;
     437             :     XLogRecPtr  flushPtr;
     438             :     XLogRecPtr  applyPtr;
     439             :     bool        got_recptr;
     440             :     bool        am_sync;
     441      145226 :     int         numwrite = 0;
     442      145226 :     int         numflush = 0;
     443      145226 :     int         numapply = 0;
     444             : 
     445             :     /*
     446             :      * If this WALSender is serving a standby that is not on the list of
     447             :      * potential sync standbys then we have nothing to do. If we are still
     448             :      * starting up, still running base backup or the current flush position is
     449             :      * still invalid, then leave quickly also.  Streaming or stopping WAL
     450             :      * senders are allowed to release waiters.
     451             :      */
     452      145226 :     if (MyWalSnd->sync_standby_priority == 0 ||
     453         198 :         (MyWalSnd->state != WALSNDSTATE_STREAMING &&
     454          62 :          MyWalSnd->state != WALSNDSTATE_STOPPING) ||
     455         180 :         XLogRecPtrIsInvalid(MyWalSnd->flush))
     456             :     {
     457      145046 :         announce_next_takeover = true;
     458      145048 :         return;
     459             :     }
     460             : 
     461             :     /*
     462             :      * We're a potential sync standby. Release waiters if there are enough
     463             :      * sync standbys and we are considered as sync.
     464             :      */
     465         180 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     466             : 
     467             :     /*
     468             :      * Check whether we are a sync standby or not, and calculate the synced
     469             :      * positions among all sync standbys.  (Note: although this step does not
     470             :      * of itself require holding SyncRepLock, it seems like a good idea to do
     471             :      * it after acquiring the lock.  This ensures that the WAL pointers we use
     472             :      * to release waiters are newer than any previous execution of this
     473             :      * routine used.)
     474             :      */
     475         180 :     got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
     476             : 
     477             :     /*
     478             :      * If we are managing a sync standby, though we weren't prior to this,
     479             :      * then announce we are now a sync standby.
     480             :      */
     481         180 :     if (announce_next_takeover && am_sync)
     482             :     {
     483          16 :         announce_next_takeover = false;
     484             : 
     485          16 :         if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
     486          16 :             ereport(LOG,
     487             :                     (errmsg("standby \"%s\" is now a synchronous standby with priority %d",
     488             :                             application_name, MyWalSnd->sync_standby_priority)));
     489             :         else
     490           0 :             ereport(LOG,
     491             :                     (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
     492             :                             application_name)));
     493             :     }
     494             : 
     495             :     /*
     496             :      * If the number of sync standbys is less than requested or we aren't
     497             :      * managing a sync standby then just leave.
     498             :      */
     499         180 :     if (!got_recptr || !am_sync)
     500             :     {
     501           2 :         LWLockRelease(SyncRepLock);
     502           2 :         announce_next_takeover = !am_sync;
     503           2 :         return;
     504             :     }
     505             : 
     506             :     /*
     507             :      * Set the lsn first so that when we wake backends they will release up to
     508             :      * this location.
     509             :      */
     510         178 :     if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
     511             :     {
     512          62 :         walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
     513          62 :         numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
     514             :     }
     515         178 :     if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
     516             :     {
     517          74 :         walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
     518          74 :         numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
     519             :     }
     520         178 :     if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
     521             :     {
     522          74 :         walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
     523          74 :         numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
     524             :     }
     525             : 
     526         178 :     LWLockRelease(SyncRepLock);
     527             : 
     528         178 :     elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
     529             :          numwrite, LSN_FORMAT_ARGS(writePtr),
     530             :          numflush, LSN_FORMAT_ARGS(flushPtr),
     531             :          numapply, LSN_FORMAT_ARGS(applyPtr));
     532             : }
     533             : 
     534             : /*
     535             :  * Calculate the synced Write, Flush and Apply positions among sync standbys.
     536             :  *
     537             :  * Return false if the number of sync standbys is less than
     538             :  * synchronous_standby_names specifies. Otherwise return true and
     539             :  * store the positions into *writePtr, *flushPtr and *applyPtr.
     540             :  *
     541             :  * On return, *am_sync is set to true if this walsender is connecting to
     542             :  * sync standby. Otherwise it's set to false.
     543             :  */
     544             : static bool
     545         180 : SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
     546             :                      XLogRecPtr *applyPtr, bool *am_sync)
     547             : {
     548             :     SyncRepStandbyData *sync_standbys;
     549             :     int         num_standbys;
     550             :     int         i;
     551             : 
     552             :     /* Initialize default results */
     553         180 :     *writePtr = InvalidXLogRecPtr;
     554         180 :     *flushPtr = InvalidXLogRecPtr;
     555         180 :     *applyPtr = InvalidXLogRecPtr;
     556         180 :     *am_sync = false;
     557             : 
     558             :     /* Quick out if not even configured to be synchronous */
     559         180 :     if (SyncRepConfig == NULL)
     560           0 :         return false;
     561             : 
     562             :     /* Get standbys that are considered as synchronous at this moment */
     563         180 :     num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
     564             : 
     565             :     /* Am I among the candidate sync standbys? */
     566         182 :     for (i = 0; i < num_standbys; i++)
     567             :     {
     568         180 :         if (sync_standbys[i].is_me)
     569             :         {
     570         178 :             *am_sync = true;
     571         178 :             break;
     572             :         }
     573             :     }
     574             : 
     575             :     /*
     576             :      * Nothing more to do if we are not managing a sync standby or there are
     577             :      * not enough synchronous standbys.
     578             :      */
     579         180 :     if (!(*am_sync) ||
     580         178 :         num_standbys < SyncRepConfig->num_sync)
     581             :     {
     582           2 :         pfree(sync_standbys);
     583           2 :         return false;
     584             :     }
     585             : 
     586             :     /*
     587             :      * In a priority-based sync replication, the synced positions are the
     588             :      * oldest ones among sync standbys. In a quorum-based, they are the Nth
     589             :      * latest ones.
     590             :      *
     591             :      * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
     592             :      * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
     593             :      * because it's a bit more efficient.
     594             :      *
     595             :      * XXX If the numbers of current and requested sync standbys are the same,
     596             :      * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
     597             :      * positions even in a quorum-based sync replication.
     598             :      */
     599         178 :     if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
     600             :     {
     601         178 :         SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
     602             :                                    sync_standbys, num_standbys);
     603             :     }
     604             :     else
     605             :     {
     606           0 :         SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
     607             :                                       sync_standbys, num_standbys,
     608           0 :                                       SyncRepConfig->num_sync);
     609             :     }
     610             : 
     611         178 :     pfree(sync_standbys);
     612         178 :     return true;
     613             : }
     614             : 
     615             : /*
     616             :  * Calculate the oldest Write, Flush and Apply positions among sync standbys.
     617             :  */
     618             : static void
     619         178 : SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
     620             :                            XLogRecPtr *flushPtr,
     621             :                            XLogRecPtr *applyPtr,
     622             :                            SyncRepStandbyData *sync_standbys,
     623             :                            int num_standbys)
     624             : {
     625             :     int         i;
     626             : 
     627             :     /*
     628             :      * Scan through all sync standbys and calculate the oldest Write, Flush
     629             :      * and Apply positions.  We assume *writePtr et al were initialized to
     630             :      * InvalidXLogRecPtr.
     631             :      */
     632         356 :     for (i = 0; i < num_standbys; i++)
     633             :     {
     634         178 :         XLogRecPtr  write = sync_standbys[i].write;
     635         178 :         XLogRecPtr  flush = sync_standbys[i].flush;
     636         178 :         XLogRecPtr  apply = sync_standbys[i].apply;
     637             : 
     638         178 :         if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
     639         178 :             *writePtr = write;
     640         178 :         if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
     641         178 :             *flushPtr = flush;
     642         178 :         if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
     643         178 :             *applyPtr = apply;
     644             :     }
     645         178 : }
     646             : 
     647             : /*
     648             :  * Calculate the Nth latest Write, Flush and Apply positions among sync
     649             :  * standbys.
     650             :  */
     651             : static void
     652           0 : SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
     653             :                               XLogRecPtr *flushPtr,
     654             :                               XLogRecPtr *applyPtr,
     655             :                               SyncRepStandbyData *sync_standbys,
     656             :                               int num_standbys,
     657             :                               uint8 nth)
     658             : {
     659             :     XLogRecPtr *write_array;
     660             :     XLogRecPtr *flush_array;
     661             :     XLogRecPtr *apply_array;
     662             :     int         i;
     663             : 
     664             :     /* Should have enough candidates, or somebody messed up */
     665             :     Assert(nth > 0 && nth <= num_standbys);
     666             : 
     667           0 :     write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
     668           0 :     flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
     669           0 :     apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
     670             : 
     671           0 :     for (i = 0; i < num_standbys; i++)
     672             :     {
     673           0 :         write_array[i] = sync_standbys[i].write;
     674           0 :         flush_array[i] = sync_standbys[i].flush;
     675           0 :         apply_array[i] = sync_standbys[i].apply;
     676             :     }
     677             : 
     678             :     /* Sort each array in descending order */
     679           0 :     qsort(write_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
     680           0 :     qsort(flush_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
     681           0 :     qsort(apply_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
     682             : 
     683             :     /* Get Nth latest Write, Flush, Apply positions */
     684           0 :     *writePtr = write_array[nth - 1];
     685           0 :     *flushPtr = flush_array[nth - 1];
     686           0 :     *applyPtr = apply_array[nth - 1];
     687             : 
     688           0 :     pfree(write_array);
     689           0 :     pfree(flush_array);
     690           0 :     pfree(apply_array);
     691           0 : }
     692             : 
     693             : /*
     694             :  * Compare lsn in order to sort array in descending order.
     695             :  */
     696             : static int
     697           0 : cmp_lsn(const void *a, const void *b)
     698             : {
     699           0 :     XLogRecPtr  lsn1 = *((const XLogRecPtr *) a);
     700           0 :     XLogRecPtr  lsn2 = *((const XLogRecPtr *) b);
     701             : 
     702           0 :     return pg_cmp_u64(lsn2, lsn1);
     703             : }
     704             : 
     705             : /*
     706             :  * Return data about walsenders that are candidates to be sync standbys.
     707             :  *
     708             :  * *standbys is set to a palloc'd array of structs of per-walsender data,
     709             :  * and the number of valid entries (candidate sync senders) is returned.
     710             :  * (This might be more or fewer than num_sync; caller must check.)
     711             :  */
     712             : int
     713        1318 : SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
     714             : {
     715             :     int         i;
     716             :     int         n;
     717             : 
     718             :     /* Create result array */
     719        1318 :     *standbys = (SyncRepStandbyData *)
     720        1318 :         palloc(max_wal_senders * sizeof(SyncRepStandbyData));
     721             : 
     722             :     /* Quick exit if sync replication is not requested */
     723        1318 :     if (SyncRepConfig == NULL)
     724        1102 :         return 0;
     725             : 
     726             :     /* Collect raw data from shared memory */
     727         216 :     n = 0;
     728        2376 :     for (i = 0; i < max_wal_senders; i++)
     729             :     {
     730             :         volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
     731             :                                      * rearrangement */
     732             :         SyncRepStandbyData *stby;
     733             :         WalSndState state;      /* not included in SyncRepStandbyData */
     734             : 
     735        2160 :         walsnd = &WalSndCtl->walsnds[i];
     736        2160 :         stby = *standbys + n;
     737             : 
     738        2160 :         SpinLockAcquire(&walsnd->mutex);
     739        2160 :         stby->pid = walsnd->pid;
     740        2160 :         state = walsnd->state;
     741        2160 :         stby->write = walsnd->write;
     742        2160 :         stby->flush = walsnd->flush;
     743        2160 :         stby->apply = walsnd->apply;
     744        2160 :         stby->sync_standby_priority = walsnd->sync_standby_priority;
     745        2160 :         SpinLockRelease(&walsnd->mutex);
     746             : 
     747             :         /* Must be active */
     748        2160 :         if (stby->pid == 0)
     749        1888 :             continue;
     750             : 
     751             :         /* Must be streaming or stopping */
     752         272 :         if (state != WALSNDSTATE_STREAMING &&
     753             :             state != WALSNDSTATE_STOPPING)
     754           0 :             continue;
     755             : 
     756             :         /* Must be synchronous */
     757         272 :         if (stby->sync_standby_priority == 0)
     758          14 :             continue;
     759             : 
     760             :         /* Must have a valid flush position */
     761         258 :         if (XLogRecPtrIsInvalid(stby->flush))
     762           0 :             continue;
     763             : 
     764             :         /* OK, it's a candidate */
     765         258 :         stby->walsnd_index = i;
     766         258 :         stby->is_me = (walsnd == MyWalSnd);
     767         258 :         n++;
     768             :     }
     769             : 
     770             :     /*
     771             :      * In quorum mode, we return all the candidates.  In priority mode, if we
     772             :      * have too many candidates then return only the num_sync ones of highest
     773             :      * priority.
     774             :      */
     775         216 :     if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY &&
     776         214 :         n > SyncRepConfig->num_sync)
     777             :     {
     778             :         /* Sort by priority ... */
     779          18 :         qsort(*standbys, n, sizeof(SyncRepStandbyData),
     780             :               standby_priority_comparator);
     781             :         /* ... then report just the first num_sync ones */
     782          18 :         n = SyncRepConfig->num_sync;
     783             :     }
     784             : 
     785         216 :     return n;
     786             : }
     787             : 
     788             : /*
     789             :  * qsort comparator to sort SyncRepStandbyData entries by priority
     790             :  */
     791             : static int
     792          40 : standby_priority_comparator(const void *a, const void *b)
     793             : {
     794          40 :     const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
     795          40 :     const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
     796             : 
     797             :     /* First, sort by increasing priority value */
     798          40 :     if (sa->sync_standby_priority != sb->sync_standby_priority)
     799          18 :         return sa->sync_standby_priority - sb->sync_standby_priority;
     800             : 
     801             :     /*
     802             :      * We might have equal priority values; arbitrarily break ties by position
     803             :      * in the WalSnd array.  (This is utterly bogus, since that is arrival
     804             :      * order dependent, but there are regression tests that rely on it.)
     805             :      */
     806          22 :     return sa->walsnd_index - sb->walsnd_index;
     807             : }
     808             : 
     809             : 
     810             : /*
     811             :  * Check if we are in the list of sync standbys, and if so, determine
     812             :  * priority sequence. Return priority if set, or zero to indicate that
     813             :  * we are not a potential sync standby.
     814             :  *
     815             :  * Compare the parameter SyncRepStandbyNames against the application_name
     816             :  * for this WALSender, or allow any name if we find a wildcard "*".
     817             :  */
     818             : static int
     819        1088 : SyncRepGetStandbyPriority(void)
     820             : {
     821             :     const char *standby_name;
     822             :     int         priority;
     823        1088 :     bool        found = false;
     824             : 
     825             :     /*
     826             :      * Since synchronous cascade replication is not allowed, we always set the
     827             :      * priority of cascading walsender to zero.
     828             :      */
     829        1088 :     if (am_cascading_walsender)
     830          44 :         return 0;
     831             : 
     832        1044 :     if (!SyncStandbysDefined() || SyncRepConfig == NULL)
     833         998 :         return 0;
     834             : 
     835          46 :     standby_name = SyncRepConfig->member_names;
     836          62 :     for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
     837             :     {
     838          60 :         if (pg_strcasecmp(standby_name, application_name) == 0 ||
     839          36 :             strcmp(standby_name, "*") == 0)
     840             :         {
     841          44 :             found = true;
     842          44 :             break;
     843             :         }
     844          16 :         standby_name += strlen(standby_name) + 1;
     845             :     }
     846             : 
     847          46 :     if (!found)
     848           2 :         return 0;
     849             : 
     850             :     /*
     851             :      * In quorum-based sync replication, all the standbys in the list have the
     852             :      * same priority, one.
     853             :      */
     854          44 :     return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
     855             : }
     856             : 
     857             : /*
     858             :  * Walk the specified queue from head.  Set the state of any backends that
     859             :  * need to be woken, remove them from the queue, and then wake them.
     860             :  * Pass all = true to wake whole queue; otherwise, just wake up to
     861             :  * the walsender's LSN.
     862             :  *
     863             :  * The caller must hold SyncRepLock in exclusive mode.
     864             :  */
     865             : static int
     866         210 : SyncRepWakeQueue(bool all, int mode)
     867             : {
     868         210 :     volatile WalSndCtlData *walsndctl = WalSndCtl;
     869         210 :     int         numprocs = 0;
     870             :     dlist_mutable_iter iter;
     871             : 
     872             :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
     873             :     Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
     874             :     Assert(SyncRepQueueIsOrderedByLSN(mode));
     875             : 
     876         250 :     dlist_foreach_modify(iter, &WalSndCtl->SyncRepQueue[mode])
     877             :     {
     878          54 :         PGPROC     *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
     879             : 
     880             :         /*
     881             :          * Assume the queue is ordered by LSN
     882             :          */
     883          54 :         if (!all && walsndctl->lsn[mode] < proc->waitLSN)
     884          14 :             return numprocs;
     885             : 
     886             :         /*
     887             :          * Remove from queue.
     888             :          */
     889          40 :         dlist_delete_thoroughly(&proc->syncRepLinks);
     890             : 
     891             :         /*
     892             :          * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
     893             :          * make sure that it sees the queue link being removed before the
     894             :          * syncRepState change.
     895             :          */
     896          40 :         pg_write_barrier();
     897             : 
     898             :         /*
     899             :          * Set state to complete; see SyncRepWaitForLSN() for discussion of
     900             :          * the various states.
     901             :          */
     902          40 :         proc->syncRepState = SYNC_REP_WAIT_COMPLETE;
     903             : 
     904             :         /*
     905             :          * Wake only when we have set state and removed from queue.
     906             :          */
     907          40 :         SetLatch(&(proc->procLatch));
     908             : 
     909          40 :         numprocs++;
     910             :     }
     911             : 
     912         196 :     return numprocs;
     913             : }
     914             : 
     915             : /*
     916             :  * The checkpointer calls this as needed to update the shared
     917             :  * sync_standbys_defined flag, so that backends don't remain permanently wedged
     918             :  * if synchronous_standby_names is unset.  It's safe to check the current value
     919             :  * without the lock, because it's only ever updated by one process.  But we
     920             :  * must take the lock to change it.
     921             :  */
     922             : void
     923         848 : SyncRepUpdateSyncStandbysDefined(void)
     924             : {
     925         848 :     bool        sync_standbys_defined = SyncStandbysDefined();
     926             : 
     927         848 :     if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
     928             :     {
     929          16 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     930             : 
     931             :         /*
     932             :          * If synchronous_standby_names has been reset to empty, it's futile
     933             :          * for backends to continue waiting.  Since the user no longer wants
     934             :          * synchronous replication, we'd better wake them up.
     935             :          */
     936          16 :         if (!sync_standbys_defined)
     937             :         {
     938             :             int         i;
     939             : 
     940           0 :             for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
     941           0 :                 SyncRepWakeQueue(true, i);
     942             :         }
     943             : 
     944             :         /*
     945             :          * Only allow people to join the queue when there are synchronous
     946             :          * standbys defined.  Without this interlock, there's a race
     947             :          * condition: we might wake up all the current waiters; then, some
     948             :          * backend that hasn't yet reloaded its config might go to sleep on
     949             :          * the queue (and never wake up).  This prevents that.
     950             :          */
     951          16 :         WalSndCtl->sync_standbys_defined = sync_standbys_defined;
     952             : 
     953          16 :         LWLockRelease(SyncRepLock);
     954             :     }
     955         848 : }
     956             : 
     957             : #ifdef USE_ASSERT_CHECKING
     958             : static bool
     959             : SyncRepQueueIsOrderedByLSN(int mode)
     960             : {
     961             :     XLogRecPtr  lastLSN;
     962             :     dlist_iter  iter;
     963             : 
     964             :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
     965             : 
     966             :     lastLSN = 0;
     967             : 
     968             :     dlist_foreach(iter, &WalSndCtl->SyncRepQueue[mode])
     969             :     {
     970             :         PGPROC     *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
     971             : 
     972             :         /*
     973             :          * Check the queue is ordered by LSN and that multiple procs don't
     974             :          * have matching LSNs
     975             :          */
     976             :         if (proc->waitLSN <= lastLSN)
     977             :             return false;
     978             : 
     979             :         lastLSN = proc->waitLSN;
     980             :     }
     981             : 
     982             :     return true;
     983             : }
     984             : #endif
     985             : 
     986             : /*
     987             :  * ===========================================================
     988             :  * Synchronous Replication functions executed by any process
     989             :  * ===========================================================
     990             :  */
     991             : 
     992             : bool
     993        1828 : check_synchronous_standby_names(char **newval, void **extra, GucSource source)
     994             : {
     995        1828 :     if (*newval != NULL && (*newval)[0] != '\0')
     996         126 :     {
     997             :         int         parse_rc;
     998             :         SyncRepConfigData *pconf;
     999             : 
    1000             :         /* Reset communication variables to ensure a fresh start */
    1001         126 :         syncrep_parse_result = NULL;
    1002         126 :         syncrep_parse_error_msg = NULL;
    1003             : 
    1004             :         /* Parse the synchronous_standby_names string */
    1005         126 :         syncrep_scanner_init(*newval);
    1006         126 :         parse_rc = syncrep_yyparse();
    1007         126 :         syncrep_scanner_finish();
    1008             : 
    1009         126 :         if (parse_rc != 0 || syncrep_parse_result == NULL)
    1010             :         {
    1011           0 :             GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
    1012           0 :             if (syncrep_parse_error_msg)
    1013           0 :                 GUC_check_errdetail("%s", syncrep_parse_error_msg);
    1014             :             else
    1015           0 :                 GUC_check_errdetail("synchronous_standby_names parser failed");
    1016           0 :             return false;
    1017             :         }
    1018             : 
    1019         126 :         if (syncrep_parse_result->num_sync <= 0)
    1020             :         {
    1021           0 :             GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
    1022           0 :                              syncrep_parse_result->num_sync);
    1023           0 :             return false;
    1024             :         }
    1025             : 
    1026             :         /* GUC extra value must be guc_malloc'd, not palloc'd */
    1027             :         pconf = (SyncRepConfigData *)
    1028         126 :             guc_malloc(LOG, syncrep_parse_result->config_size);
    1029         126 :         if (pconf == NULL)
    1030           0 :             return false;
    1031         126 :         memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);
    1032             : 
    1033         126 :         *extra = (void *) pconf;
    1034             : 
    1035             :         /*
    1036             :          * We need not explicitly clean up syncrep_parse_result.  It, and any
    1037             :          * other cruft generated during parsing, will be freed when the
    1038             :          * current memory context is deleted.  (This code is generally run in
    1039             :          * a short-lived context used for config file processing, so that will
    1040             :          * not be very long.)
    1041             :          */
    1042             :     }
    1043             :     else
    1044        1702 :         *extra = NULL;
    1045             : 
    1046        1828 :     return true;
    1047             : }
    1048             : 
    1049             : void
    1050        1808 : assign_synchronous_standby_names(const char *newval, void *extra)
    1051             : {
    1052        1808 :     SyncRepConfig = (SyncRepConfigData *) extra;
    1053        1808 : }
    1054             : 
    1055             : void
    1056        2760 : assign_synchronous_commit(int newval, void *extra)
    1057             : {
    1058        2760 :     switch (newval)
    1059             :     {
    1060           0 :         case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
    1061           0 :             SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
    1062           0 :             break;
    1063        1938 :         case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
    1064        1938 :             SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
    1065        1938 :             break;
    1066           0 :         case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
    1067           0 :             SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
    1068           0 :             break;
    1069         822 :         default:
    1070         822 :             SyncRepWaitMode = SYNC_REP_NO_WAIT;
    1071         822 :             break;
    1072             :     }
    1073        2760 : }

Generated by: LCOV version 1.14