LCOV - code coverage report
Current view: top level - src/backend/replication - syncrep.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 215 284 75.7 %
Date: 2025-01-18 04:15:08 Functions: 15 18 83.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * syncrep.c
       4             :  *
       5             :  * Synchronous replication is new as of PostgreSQL 9.1.
       6             :  *
       7             :  * If requested, transaction commits wait until their commit LSN are
       8             :  * acknowledged by the synchronous standbys.
       9             :  *
      10             :  * This module contains the code for waiting and release of backends.
      11             :  * All code in this module executes on the primary. The core streaming
      12             :  * replication transport remains within WALreceiver/WALsender modules.
      13             :  *
      14             :  * The essence of this design is that it isolates all logic about
      15             :  * waiting/releasing onto the primary. The primary defines which standbys
      16             :  * it wishes to wait for. The standbys are completely unaware of the
      17             :  * durability requirements of transactions on the primary, reducing the
      18             :  * complexity of the code and streamlining both standby operations and
      19             :  * network bandwidth because there is no requirement to ship
      20             :  * per-transaction state information.
      21             :  *
      22             :  * Replication is either synchronous or not synchronous (async). If it is
      23             :  * async, we just fastpath out of here. If it is sync, then we wait for
      24             :  * the write, flush or apply location on the standby before releasing
      25             :  * the waiting backend. Further complexity in that interaction is
      26             :  * expected in later releases.
      27             :  *
      28             :  * The best performing way to manage the waiting backends is to have a
      29             :  * single ordered queue of waiting backends, so that we can avoid
      30             :  * searching the through all waiters each time we receive a reply.
      31             :  *
      32             :  * In 9.5 or before only a single standby could be considered as
      33             :  * synchronous. In 9.6 we support a priority-based multiple synchronous
      34             :  * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
      35             :  * supported. The number of synchronous standbys that transactions
      36             :  * must wait for replies from is specified in synchronous_standby_names.
      37             :  * This parameter also specifies a list of standby names and the method
      38             :  * (FIRST and ANY) to choose synchronous standbys from the listed ones.
      39             :  *
      40             :  * The method FIRST specifies a priority-based synchronous replication
      41             :  * and makes transaction commits wait until their WAL records are
      42             :  * replicated to the requested number of synchronous standbys chosen based
      43             :  * on their priorities. The standbys whose names appear earlier in the list
      44             :  * are given higher priority and will be considered as synchronous.
      45             :  * Other standby servers appearing later in this list represent potential
      46             :  * synchronous standbys. If any of the current synchronous standbys
      47             :  * disconnects for whatever reason, it will be replaced immediately with
      48             :  * the next-highest-priority standby.
      49             :  *
      50             :  * The method ANY specifies a quorum-based synchronous replication
      51             :  * and makes transaction commits wait until their WAL records are
      52             :  * replicated to at least the requested number of synchronous standbys
      53             :  * in the list. All the standbys appearing in the list are considered as
      54             :  * candidates for quorum synchronous standbys.
      55             :  *
      56             :  * If neither FIRST nor ANY is specified, FIRST is used as the method.
      57             :  * This is for backward compatibility with 9.6 or before where only a
      58             :  * priority-based sync replication was supported.
      59             :  *
      60             :  * Before the standbys chosen from synchronous_standby_names can
      61             :  * become the synchronous standbys they must have caught up with
      62             :  * the primary; that may take some time. Once caught up,
      63             :  * the standbys which are considered as synchronous at that moment
      64             :  * will release waiters from the queue.
      65             :  *
      66             :  * Portions Copyright (c) 2010-2025, PostgreSQL Global Development Group
      67             :  *
      68             :  * IDENTIFICATION
      69             :  *    src/backend/replication/syncrep.c
      70             :  *
      71             :  *-------------------------------------------------------------------------
      72             :  */
      73             : #include "postgres.h"
      74             : 
      75             : #include <unistd.h>
      76             : 
      77             : #include "access/xact.h"
      78             : #include "common/int.h"
      79             : #include "miscadmin.h"
      80             : #include "pgstat.h"
      81             : #include "replication/syncrep.h"
      82             : #include "replication/walsender.h"
      83             : #include "replication/walsender_private.h"
      84             : #include "storage/proc.h"
      85             : #include "tcop/tcopprot.h"
      86             : #include "utils/guc_hooks.h"
      87             : #include "utils/ps_status.h"
      88             : 
      89             : /* User-settable parameters for sync rep */
      90             : char       *SyncRepStandbyNames;
      91             : 
      92             : #define SyncStandbysDefined() \
      93             :     (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
      94             : 
      95             : static bool announce_next_takeover = true;
      96             : 
      97             : SyncRepConfigData *SyncRepConfig = NULL;
      98             : static int  SyncRepWaitMode = SYNC_REP_NO_WAIT;
      99             : 
     100             : static void SyncRepQueueInsert(int mode);
     101             : static void SyncRepCancelWait(void);
     102             : static int  SyncRepWakeQueue(bool all, int mode);
     103             : 
     104             : static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
     105             :                                  XLogRecPtr *flushPtr,
     106             :                                  XLogRecPtr *applyPtr,
     107             :                                  bool *am_sync);
     108             : static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
     109             :                                        XLogRecPtr *flushPtr,
     110             :                                        XLogRecPtr *applyPtr,
     111             :                                        SyncRepStandbyData *sync_standbys,
     112             :                                        int num_standbys);
     113             : static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
     114             :                                           XLogRecPtr *flushPtr,
     115             :                                           XLogRecPtr *applyPtr,
     116             :                                           SyncRepStandbyData *sync_standbys,
     117             :                                           int num_standbys,
     118             :                                           uint8 nth);
     119             : static int  SyncRepGetStandbyPriority(void);
     120             : static int  standby_priority_comparator(const void *a, const void *b);
     121             : static int  cmp_lsn(const void *a, const void *b);
     122             : 
     123             : #ifdef USE_ASSERT_CHECKING
     124             : static bool SyncRepQueueIsOrderedByLSN(int mode);
     125             : #endif
     126             : 
     127             : /*
     128             :  * ===========================================================
     129             :  * Synchronous Replication functions for normal user backends
     130             :  * ===========================================================
     131             :  */
     132             : 
     133             : /*
     134             :  * Wait for synchronous replication, if requested by user.
     135             :  *
     136             :  * Initially backends start in state SYNC_REP_NOT_WAITING and then
     137             :  * change that state to SYNC_REP_WAITING before adding ourselves
     138             :  * to the wait queue. During SyncRepWakeQueue() a WALSender changes
     139             :  * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
     140             :  * This backend then resets its state to SYNC_REP_NOT_WAITING.
     141             :  *
     142             :  * 'lsn' represents the LSN to wait for.  'commit' indicates whether this LSN
     143             :  * represents a commit record.  If it doesn't, then we wait only for the WAL
     144             :  * to be flushed if synchronous_commit is set to the higher level of
     145             :  * remote_apply, because only commit records provide apply feedback.
     146             :  */
     147             : void
     148      227050 : SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
     149             : {
     150             :     int         mode;
     151             : 
     152             :     /*
     153             :      * This should be called while holding interrupts during a transaction
     154             :      * commit to prevent the follow-up shared memory queue cleanups to be
     155             :      * influenced by external interruptions.
     156             :      */
     157             :     Assert(InterruptHoldoffCount > 0);
     158             : 
     159             :     /*
     160             :      * Fast exit if user has not requested sync replication, or there are no
     161             :      * sync replication standby names defined.
     162             :      *
     163             :      * Since this routine gets called every commit time, it's important to
     164             :      * exit quickly if sync replication is not requested. So we check
     165             :      * WalSndCtl->sync_standbys_defined flag without the lock and exit
     166             :      * immediately if it's false. If it's true, we need to check it again
     167             :      * later while holding the lock, to check the flag and operate the sync
     168             :      * rep queue atomically. This is necessary to avoid the race condition
     169             :      * described in SyncRepUpdateSyncStandbysDefined(). On the other hand, if
     170             :      * it's false, the lock is not necessary because we don't touch the queue.
     171             :      */
     172      227050 :     if (!SyncRepRequested() ||
     173      174892 :         !((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
     174      226972 :         return;
     175             : 
     176             :     /* Cap the level for anything other than commit to remote flush only. */
     177          78 :     if (commit)
     178          44 :         mode = SyncRepWaitMode;
     179             :     else
     180          34 :         mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
     181             : 
     182             :     Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
     183             :     Assert(WalSndCtl != NULL);
     184             : 
     185          78 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     186             :     Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
     187             : 
     188             :     /*
     189             :      * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
     190             :      * set.  See SyncRepUpdateSyncStandbysDefined.
     191             :      *
     192             :      * Also check that the standby hasn't already replied. Unlikely race
     193             :      * condition but we'll be fetching that cache line anyway so it's likely
     194             :      * to be a low cost check.
     195             :      */
     196          78 :     if (!WalSndCtl->sync_standbys_defined ||
     197          78 :         lsn <= WalSndCtl->lsn[mode])
     198             :     {
     199           0 :         LWLockRelease(SyncRepLock);
     200           0 :         return;
     201             :     }
     202             : 
     203             :     /*
     204             :      * Set our waitLSN so WALSender will know when to wake us, and add
     205             :      * ourselves to the queue.
     206             :      */
     207          78 :     MyProc->waitLSN = lsn;
     208          78 :     MyProc->syncRepState = SYNC_REP_WAITING;
     209          78 :     SyncRepQueueInsert(mode);
     210             :     Assert(SyncRepQueueIsOrderedByLSN(mode));
     211          78 :     LWLockRelease(SyncRepLock);
     212             : 
     213             :     /* Alter ps display to show waiting for sync rep. */
     214          78 :     if (update_process_title)
     215             :     {
     216             :         char        buffer[32];
     217             : 
     218          78 :         sprintf(buffer, "waiting for %X/%X", LSN_FORMAT_ARGS(lsn));
     219          78 :         set_ps_display_suffix(buffer);
     220             :     }
     221             : 
     222             :     /*
     223             :      * Wait for specified LSN to be confirmed.
     224             :      *
     225             :      * Each proc has its own wait latch, so we perform a normal latch
     226             :      * check/wait loop here.
     227             :      */
     228             :     for (;;)
     229          78 :     {
     230             :         int         rc;
     231             : 
     232             :         /* Must reset the latch before testing state. */
     233         156 :         ResetLatch(MyLatch);
     234             : 
     235             :         /*
     236             :          * Acquiring the lock is not needed, the latch ensures proper
     237             :          * barriers. If it looks like we're done, we must really be done,
     238             :          * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
     239             :          * it will never update it again, so we can't be seeing a stale value
     240             :          * in that case.
     241             :          */
     242         156 :         if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
     243          78 :             break;
     244             : 
     245             :         /*
     246             :          * If a wait for synchronous replication is pending, we can neither
     247             :          * acknowledge the commit nor raise ERROR or FATAL.  The latter would
     248             :          * lead the client to believe that the transaction aborted, which is
     249             :          * not true: it's already committed locally. The former is no good
     250             :          * either: the client has requested synchronous replication, and is
     251             :          * entitled to assume that an acknowledged commit is also replicated,
     252             :          * which might not be true. So in this case we issue a WARNING (which
     253             :          * some clients may be able to interpret) and shut off further output.
     254             :          * We do NOT reset ProcDiePending, so that the process will die after
     255             :          * the commit is cleaned up.
     256             :          */
     257          78 :         if (ProcDiePending)
     258             :         {
     259           0 :             ereport(WARNING,
     260             :                     (errcode(ERRCODE_ADMIN_SHUTDOWN),
     261             :                      errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
     262             :                      errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
     263           0 :             whereToSendOutput = DestNone;
     264           0 :             SyncRepCancelWait();
     265           0 :             break;
     266             :         }
     267             : 
     268             :         /*
     269             :          * It's unclear what to do if a query cancel interrupt arrives.  We
     270             :          * can't actually abort at this point, but ignoring the interrupt
     271             :          * altogether is not helpful, so we just terminate the wait with a
     272             :          * suitable warning.
     273             :          */
     274          78 :         if (QueryCancelPending)
     275             :         {
     276           0 :             QueryCancelPending = false;
     277           0 :             ereport(WARNING,
     278             :                     (errmsg("canceling wait for synchronous replication due to user request"),
     279             :                      errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
     280           0 :             SyncRepCancelWait();
     281           0 :             break;
     282             :         }
     283             : 
     284             :         /*
     285             :          * Wait on latch.  Any condition that should wake us up will set the
     286             :          * latch, so no need for timeout.
     287             :          */
     288          78 :         rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
     289             :                        WAIT_EVENT_SYNC_REP);
     290             : 
     291             :         /*
     292             :          * If the postmaster dies, we'll probably never get an acknowledgment,
     293             :          * because all the wal sender processes will exit. So just bail out.
     294             :          */
     295          78 :         if (rc & WL_POSTMASTER_DEATH)
     296             :         {
     297           0 :             ProcDiePending = true;
     298           0 :             whereToSendOutput = DestNone;
     299           0 :             SyncRepCancelWait();
     300           0 :             break;
     301             :         }
     302             :     }
     303             : 
     304             :     /*
     305             :      * WalSender has checked our LSN and has removed us from queue. Clean up
     306             :      * state and leave.  It's OK to reset these shared memory fields without
     307             :      * holding SyncRepLock, because any walsenders will ignore us anyway when
     308             :      * we're not on the queue.  We need a read barrier to make sure we see the
     309             :      * changes to the queue link (this might be unnecessary without
     310             :      * assertions, but better safe than sorry).
     311             :      */
     312          78 :     pg_read_barrier();
     313             :     Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
     314          78 :     MyProc->syncRepState = SYNC_REP_NOT_WAITING;
     315          78 :     MyProc->waitLSN = 0;
     316             : 
     317             :     /* reset ps display to remove the suffix */
     318          78 :     if (update_process_title)
     319          78 :         set_ps_display_remove_suffix();
     320             : }
     321             : 
     322             : /*
     323             :  * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
     324             :  *
     325             :  * Usually we will go at tail of queue, though it's possible that we arrive
     326             :  * here out of order, so start at tail and work back to insertion point.
     327             :  */
     328             : static void
     329          78 : SyncRepQueueInsert(int mode)
     330             : {
     331             :     dlist_head *queue;
     332             :     dlist_iter  iter;
     333             : 
     334             :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
     335          78 :     queue = &WalSndCtl->SyncRepQueue[mode];
     336             : 
     337          78 :     dlist_reverse_foreach(iter, queue)
     338             :     {
     339           0 :         PGPROC     *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
     340             : 
     341             :         /*
     342             :          * Stop at the queue element that we should insert after to ensure the
     343             :          * queue is ordered by LSN.
     344             :          */
     345           0 :         if (proc->waitLSN < MyProc->waitLSN)
     346             :         {
     347           0 :             dlist_insert_after(&proc->syncRepLinks, &MyProc->syncRepLinks);
     348           0 :             return;
     349             :         }
     350             :     }
     351             : 
     352             :     /*
     353             :      * If we get here, the list was either empty, or this process needs to be
     354             :      * at the head.
     355             :      */
     356          78 :     dlist_push_head(queue, &MyProc->syncRepLinks);
     357             : }
     358             : 
     359             : /*
     360             :  * Acquire SyncRepLock and cancel any wait currently in progress.
     361             :  */
     362             : static void
     363           0 : SyncRepCancelWait(void)
     364             : {
     365           0 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     366           0 :     if (!dlist_node_is_detached(&MyProc->syncRepLinks))
     367           0 :         dlist_delete_thoroughly(&MyProc->syncRepLinks);
     368           0 :     MyProc->syncRepState = SYNC_REP_NOT_WAITING;
     369           0 :     LWLockRelease(SyncRepLock);
     370           0 : }
     371             : 
     372             : void
     373       30192 : SyncRepCleanupAtProcExit(void)
     374             : {
     375             :     /*
     376             :      * First check if we are removed from the queue without the lock to not
     377             :      * slow down backend exit.
     378             :      */
     379       30192 :     if (!dlist_node_is_detached(&MyProc->syncRepLinks))
     380             :     {
     381           0 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     382             : 
     383             :         /* maybe we have just been removed, so recheck */
     384           0 :         if (!dlist_node_is_detached(&MyProc->syncRepLinks))
     385           0 :             dlist_delete_thoroughly(&MyProc->syncRepLinks);
     386             : 
     387           0 :         LWLockRelease(SyncRepLock);
     388             :     }
     389       30192 : }
     390             : 
     391             : /*
     392             :  * ===========================================================
     393             :  * Synchronous Replication functions for wal sender processes
     394             :  * ===========================================================
     395             :  */
     396             : 
     397             : /*
     398             :  * Take any action required to initialise sync rep state from config
     399             :  * data. Called at WALSender startup and after each SIGHUP.
     400             :  */
     401             : void
     402        1270 : SyncRepInitConfig(void)
     403             : {
     404             :     int         priority;
     405             : 
     406             :     /*
     407             :      * Determine if we are a potential sync standby and remember the result
     408             :      * for handling replies from standby.
     409             :      */
     410        1270 :     priority = SyncRepGetStandbyPriority();
     411        1270 :     if (MyWalSnd->sync_standby_priority != priority)
     412             :     {
     413          42 :         SpinLockAcquire(&MyWalSnd->mutex);
     414          42 :         MyWalSnd->sync_standby_priority = priority;
     415          42 :         SpinLockRelease(&MyWalSnd->mutex);
     416             : 
     417          42 :         ereport(DEBUG1,
     418             :                 (errmsg_internal("standby \"%s\" now has synchronous standby priority %d",
     419             :                                  application_name, priority)));
     420             :     }
     421        1270 : }
     422             : 
     423             : /*
     424             :  * Update the LSNs on each queue based upon our latest state. This
     425             :  * implements a simple policy of first-valid-sync-standby-releases-waiter.
     426             :  *
     427             :  * Other policies are possible, which would change what we do here and
     428             :  * perhaps also which information we store as well.
     429             :  */
     430             : void
     431       66492 : SyncRepReleaseWaiters(void)
     432             : {
     433       66492 :     volatile WalSndCtlData *walsndctl = WalSndCtl;
     434             :     XLogRecPtr  writePtr;
     435             :     XLogRecPtr  flushPtr;
     436             :     XLogRecPtr  applyPtr;
     437             :     bool        got_recptr;
     438             :     bool        am_sync;
     439       66492 :     int         numwrite = 0;
     440       66492 :     int         numflush = 0;
     441       66492 :     int         numapply = 0;
     442             : 
     443             :     /*
     444             :      * If this WALSender is serving a standby that is not on the list of
     445             :      * potential sync standbys then we have nothing to do. If we are still
     446             :      * starting up, still running base backup or the current flush position is
     447             :      * still invalid, then leave quickly also.  Streaming or stopping WAL
     448             :      * senders are allowed to release waiters.
     449             :      */
     450       66492 :     if (MyWalSnd->sync_standby_priority == 0 ||
     451         230 :         (MyWalSnd->state != WALSNDSTATE_STREAMING &&
     452          70 :          MyWalSnd->state != WALSNDSTATE_STOPPING) ||
     453         210 :         XLogRecPtrIsInvalid(MyWalSnd->flush))
     454             :     {
     455       66282 :         announce_next_takeover = true;
     456       66284 :         return;
     457             :     }
     458             : 
     459             :     /*
     460             :      * We're a potential sync standby. Release waiters if there are enough
     461             :      * sync standbys and we are considered as sync.
     462             :      */
     463         210 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     464             : 
     465             :     /*
     466             :      * Check whether we are a sync standby or not, and calculate the synced
     467             :      * positions among all sync standbys.  (Note: although this step does not
     468             :      * of itself require holding SyncRepLock, it seems like a good idea to do
     469             :      * it after acquiring the lock.  This ensures that the WAL pointers we use
     470             :      * to release waiters are newer than any previous execution of this
     471             :      * routine used.)
     472             :      */
     473         210 :     got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
     474             : 
     475             :     /*
     476             :      * If we are managing a sync standby, though we weren't prior to this,
     477             :      * then announce we are now a sync standby.
     478             :      */
     479         210 :     if (announce_next_takeover && am_sync)
     480             :     {
     481          18 :         announce_next_takeover = false;
     482             : 
     483          18 :         if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
     484          18 :             ereport(LOG,
     485             :                     (errmsg("standby \"%s\" is now a synchronous standby with priority %d",
     486             :                             application_name, MyWalSnd->sync_standby_priority)));
     487             :         else
     488           0 :             ereport(LOG,
     489             :                     (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
     490             :                             application_name)));
     491             :     }
     492             : 
     493             :     /*
     494             :      * If the number of sync standbys is less than requested or we aren't
     495             :      * managing a sync standby then just leave.
     496             :      */
     497         210 :     if (!got_recptr || !am_sync)
     498             :     {
     499           2 :         LWLockRelease(SyncRepLock);
     500           2 :         announce_next_takeover = !am_sync;
     501           2 :         return;
     502             :     }
     503             : 
     504             :     /*
     505             :      * Set the lsn first so that when we wake backends they will release up to
     506             :      * this location.
     507             :      */
     508         208 :     if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
     509             :     {
     510          72 :         walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
     511          72 :         numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
     512             :     }
     513         208 :     if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
     514             :     {
     515          86 :         walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
     516          86 :         numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
     517             :     }
     518         208 :     if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
     519             :     {
     520          86 :         walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
     521          86 :         numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
     522             :     }
     523             : 
     524         208 :     LWLockRelease(SyncRepLock);
     525             : 
     526         208 :     elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
     527             :          numwrite, LSN_FORMAT_ARGS(writePtr),
     528             :          numflush, LSN_FORMAT_ARGS(flushPtr),
     529             :          numapply, LSN_FORMAT_ARGS(applyPtr));
     530             : }
     531             : 
     532             : /*
     533             :  * Calculate the synced Write, Flush and Apply positions among sync standbys.
     534             :  *
     535             :  * Return false if the number of sync standbys is less than
     536             :  * synchronous_standby_names specifies. Otherwise return true and
     537             :  * store the positions into *writePtr, *flushPtr and *applyPtr.
     538             :  *
     539             :  * On return, *am_sync is set to true if this walsender is connecting to
     540             :  * sync standby. Otherwise it's set to false.
     541             :  */
     542             : static bool
     543         210 : SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
     544             :                      XLogRecPtr *applyPtr, bool *am_sync)
     545             : {
     546             :     SyncRepStandbyData *sync_standbys;
     547             :     int         num_standbys;
     548             :     int         i;
     549             : 
     550             :     /* Initialize default results */
     551         210 :     *writePtr = InvalidXLogRecPtr;
     552         210 :     *flushPtr = InvalidXLogRecPtr;
     553         210 :     *applyPtr = InvalidXLogRecPtr;
     554         210 :     *am_sync = false;
     555             : 
     556             :     /* Quick out if not even configured to be synchronous */
     557         210 :     if (SyncRepConfig == NULL)
     558           0 :         return false;
     559             : 
     560             :     /* Get standbys that are considered as synchronous at this moment */
     561         210 :     num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
     562             : 
     563             :     /* Am I among the candidate sync standbys? */
     564         212 :     for (i = 0; i < num_standbys; i++)
     565             :     {
     566         210 :         if (sync_standbys[i].is_me)
     567             :         {
     568         208 :             *am_sync = true;
     569         208 :             break;
     570             :         }
     571             :     }
     572             : 
     573             :     /*
     574             :      * Nothing more to do if we are not managing a sync standby or there are
     575             :      * not enough synchronous standbys.
     576             :      */
     577         210 :     if (!(*am_sync) ||
     578         208 :         num_standbys < SyncRepConfig->num_sync)
     579             :     {
     580           2 :         pfree(sync_standbys);
     581           2 :         return false;
     582             :     }
     583             : 
     584             :     /*
     585             :      * In a priority-based sync replication, the synced positions are the
     586             :      * oldest ones among sync standbys. In a quorum-based, they are the Nth
     587             :      * latest ones.
     588             :      *
     589             :      * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
     590             :      * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
     591             :      * because it's a bit more efficient.
     592             :      *
     593             :      * XXX If the numbers of current and requested sync standbys are the same,
     594             :      * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
     595             :      * positions even in a quorum-based sync replication.
     596             :      */
     597         208 :     if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
     598             :     {
     599         208 :         SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
     600             :                                    sync_standbys, num_standbys);
     601             :     }
     602             :     else
     603             :     {
     604           0 :         SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
     605             :                                       sync_standbys, num_standbys,
     606           0 :                                       SyncRepConfig->num_sync);
     607             :     }
     608             : 
     609         208 :     pfree(sync_standbys);
     610         208 :     return true;
     611             : }
     612             : 
     613             : /*
     614             :  * Calculate the oldest Write, Flush and Apply positions among sync standbys.
     615             :  */
     616             : static void
     617         208 : SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
     618             :                            XLogRecPtr *flushPtr,
     619             :                            XLogRecPtr *applyPtr,
     620             :                            SyncRepStandbyData *sync_standbys,
     621             :                            int num_standbys)
     622             : {
     623             :     int         i;
     624             : 
     625             :     /*
     626             :      * Scan through all sync standbys and calculate the oldest Write, Flush
     627             :      * and Apply positions.  We assume *writePtr et al were initialized to
     628             :      * InvalidXLogRecPtr.
     629             :      */
     630         416 :     for (i = 0; i < num_standbys; i++)
     631             :     {
     632         208 :         XLogRecPtr  write = sync_standbys[i].write;
     633         208 :         XLogRecPtr  flush = sync_standbys[i].flush;
     634         208 :         XLogRecPtr  apply = sync_standbys[i].apply;
     635             : 
     636         208 :         if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
     637         208 :             *writePtr = write;
     638         208 :         if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
     639         208 :             *flushPtr = flush;
     640         208 :         if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
     641         208 :             *applyPtr = apply;
     642             :     }
     643         208 : }
     644             : 
     645             : /*
     646             :  * Calculate the Nth latest Write, Flush and Apply positions among sync
     647             :  * standbys.
     648             :  */
     649             : static void
     650           0 : SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
     651             :                               XLogRecPtr *flushPtr,
     652             :                               XLogRecPtr *applyPtr,
     653             :                               SyncRepStandbyData *sync_standbys,
     654             :                               int num_standbys,
     655             :                               uint8 nth)
     656             : {
     657             :     XLogRecPtr *write_array;
     658             :     XLogRecPtr *flush_array;
     659             :     XLogRecPtr *apply_array;
     660             :     int         i;
     661             : 
     662             :     /* Should have enough candidates, or somebody messed up */
     663             :     Assert(nth > 0 && nth <= num_standbys);
     664             : 
     665           0 :     write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
     666           0 :     flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
     667           0 :     apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
     668             : 
     669           0 :     for (i = 0; i < num_standbys; i++)
     670             :     {
     671           0 :         write_array[i] = sync_standbys[i].write;
     672           0 :         flush_array[i] = sync_standbys[i].flush;
     673           0 :         apply_array[i] = sync_standbys[i].apply;
     674             :     }
     675             : 
     676             :     /* Sort each array in descending order */
     677           0 :     qsort(write_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
     678           0 :     qsort(flush_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
     679           0 :     qsort(apply_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
     680             : 
     681             :     /* Get Nth latest Write, Flush, Apply positions */
     682           0 :     *writePtr = write_array[nth - 1];
     683           0 :     *flushPtr = flush_array[nth - 1];
     684           0 :     *applyPtr = apply_array[nth - 1];
     685             : 
     686           0 :     pfree(write_array);
     687           0 :     pfree(flush_array);
     688           0 :     pfree(apply_array);
     689           0 : }
     690             : 
     691             : /*
     692             :  * Compare lsn in order to sort array in descending order.
     693             :  */
     694             : static int
     695           0 : cmp_lsn(const void *a, const void *b)
     696             : {
     697           0 :     XLogRecPtr  lsn1 = *((const XLogRecPtr *) a);
     698           0 :     XLogRecPtr  lsn2 = *((const XLogRecPtr *) b);
     699             : 
     700           0 :     return pg_cmp_u64(lsn2, lsn1);
     701             : }
     702             : 
     703             : /*
     704             :  * Return data about walsenders that are candidates to be sync standbys.
     705             :  *
     706             :  * *standbys is set to a palloc'd array of structs of per-walsender data,
     707             :  * and the number of valid entries (candidate sync senders) is returned.
     708             :  * (This might be more or fewer than num_sync; caller must check.)
     709             :  */
     710             : int
     711        1538 : SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
     712             : {
     713             :     int         i;
     714             :     int         n;
     715             : 
     716             :     /* Create result array */
     717        1538 :     *standbys = (SyncRepStandbyData *)
     718        1538 :         palloc(max_wal_senders * sizeof(SyncRepStandbyData));
     719             : 
     720             :     /* Quick exit if sync replication is not requested */
     721        1538 :     if (SyncRepConfig == NULL)
     722        1292 :         return 0;
     723             : 
     724             :     /* Collect raw data from shared memory */
     725         246 :     n = 0;
     726        2706 :     for (i = 0; i < max_wal_senders; i++)
     727             :     {
     728             :         volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
     729             :                                      * rearrangement */
     730             :         SyncRepStandbyData *stby;
     731             :         WalSndState state;      /* not included in SyncRepStandbyData */
     732             : 
     733        2460 :         walsnd = &WalSndCtl->walsnds[i];
     734        2460 :         stby = *standbys + n;
     735             : 
     736        2460 :         SpinLockAcquire(&walsnd->mutex);
     737        2460 :         stby->pid = walsnd->pid;
     738        2460 :         state = walsnd->state;
     739        2460 :         stby->write = walsnd->write;
     740        2460 :         stby->flush = walsnd->flush;
     741        2460 :         stby->apply = walsnd->apply;
     742        2460 :         stby->sync_standby_priority = walsnd->sync_standby_priority;
     743        2460 :         SpinLockRelease(&walsnd->mutex);
     744             : 
     745             :         /* Must be active */
     746        2460 :         if (stby->pid == 0)
     747        2154 :             continue;
     748             : 
     749             :         /* Must be streaming or stopping */
     750         306 :         if (state != WALSNDSTATE_STREAMING &&
     751             :             state != WALSNDSTATE_STOPPING)
     752           0 :             continue;
     753             : 
     754             :         /* Must be synchronous */
     755         306 :         if (stby->sync_standby_priority == 0)
     756          14 :             continue;
     757             : 
     758             :         /* Must have a valid flush position */
     759         292 :         if (XLogRecPtrIsInvalid(stby->flush))
     760           0 :             continue;
     761             : 
     762             :         /* OK, it's a candidate */
     763         292 :         stby->walsnd_index = i;
     764         292 :         stby->is_me = (walsnd == MyWalSnd);
     765         292 :         n++;
     766             :     }
     767             : 
     768             :     /*
     769             :      * In quorum mode, we return all the candidates.  In priority mode, if we
     770             :      * have too many candidates then return only the num_sync ones of highest
     771             :      * priority.
     772             :      */
     773         246 :     if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY &&
     774         244 :         n > SyncRepConfig->num_sync)
     775             :     {
     776             :         /* Sort by priority ... */
     777          18 :         qsort(*standbys, n, sizeof(SyncRepStandbyData),
     778             :               standby_priority_comparator);
     779             :         /* ... then report just the first num_sync ones */
     780          18 :         n = SyncRepConfig->num_sync;
     781             :     }
     782             : 
     783         246 :     return n;
     784             : }
     785             : 
     786             : /*
     787             :  * qsort comparator to sort SyncRepStandbyData entries by priority
     788             :  */
     789             : static int
     790          40 : standby_priority_comparator(const void *a, const void *b)
     791             : {
     792          40 :     const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
     793          40 :     const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
     794             : 
     795             :     /* First, sort by increasing priority value */
     796          40 :     if (sa->sync_standby_priority != sb->sync_standby_priority)
     797          18 :         return sa->sync_standby_priority - sb->sync_standby_priority;
     798             : 
     799             :     /*
     800             :      * We might have equal priority values; arbitrarily break ties by position
     801             :      * in the WalSnd array.  (This is utterly bogus, since that is arrival
     802             :      * order dependent, but there are regression tests that rely on it.)
     803             :      */
     804          22 :     return sa->walsnd_index - sb->walsnd_index;
     805             : }
     806             : 
     807             : 
     808             : /*
     809             :  * Check if we are in the list of sync standbys, and if so, determine
     810             :  * priority sequence. Return priority if set, or zero to indicate that
     811             :  * we are not a potential sync standby.
     812             :  *
     813             :  * Compare the parameter SyncRepStandbyNames against the application_name
     814             :  * for this WALSender, or allow any name if we find a wildcard "*".
     815             :  */
     816             : static int
     817        1270 : SyncRepGetStandbyPriority(void)
     818             : {
     819             :     const char *standby_name;
     820             :     int         priority;
     821        1270 :     bool        found = false;
     822             : 
     823             :     /*
     824             :      * Since synchronous cascade replication is not allowed, we always set the
     825             :      * priority of cascading walsender to zero.
     826             :      */
     827        1270 :     if (am_cascading_walsender)
     828          52 :         return 0;
     829             : 
     830        1218 :     if (!SyncStandbysDefined() || SyncRepConfig == NULL)
     831        1170 :         return 0;
     832             : 
     833          48 :     standby_name = SyncRepConfig->member_names;
     834          64 :     for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
     835             :     {
     836          62 :         if (pg_strcasecmp(standby_name, application_name) == 0 ||
     837          36 :             strcmp(standby_name, "*") == 0)
     838             :         {
     839          46 :             found = true;
     840          46 :             break;
     841             :         }
     842          16 :         standby_name += strlen(standby_name) + 1;
     843             :     }
     844             : 
     845          48 :     if (!found)
     846           2 :         return 0;
     847             : 
     848             :     /*
     849             :      * In quorum-based sync replication, all the standbys in the list have the
     850             :      * same priority, one.
     851             :      */
     852          46 :     return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
     853             : }
     854             : 
     855             : /*
     856             :  * Walk the specified queue from head.  Set the state of any backends that
     857             :  * need to be woken, remove them from the queue, and then wake them.
     858             :  * Pass all = true to wake whole queue; otherwise, just wake up to
     859             :  * the walsender's LSN.
     860             :  *
     861             :  * The caller must hold SyncRepLock in exclusive mode.
     862             :  */
     863             : static int
     864         250 : SyncRepWakeQueue(bool all, int mode)
     865             : {
     866         250 :     volatile WalSndCtlData *walsndctl = WalSndCtl;
     867         250 :     int         numprocs = 0;
     868             :     dlist_mutable_iter iter;
     869             : 
     870             :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
     871             :     Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
     872             :     Assert(SyncRepQueueIsOrderedByLSN(mode));
     873             : 
     874         296 :     dlist_foreach_modify(iter, &WalSndCtl->SyncRepQueue[mode])
     875             :     {
     876          62 :         PGPROC     *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
     877             : 
     878             :         /*
     879             :          * Assume the queue is ordered by LSN
     880             :          */
     881          62 :         if (!all && walsndctl->lsn[mode] < proc->waitLSN)
     882          16 :             return numprocs;
     883             : 
     884             :         /*
     885             :          * Remove from queue.
     886             :          */
     887          46 :         dlist_delete_thoroughly(&proc->syncRepLinks);
     888             : 
     889             :         /*
     890             :          * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
     891             :          * make sure that it sees the queue link being removed before the
     892             :          * syncRepState change.
     893             :          */
     894          46 :         pg_write_barrier();
     895             : 
     896             :         /*
     897             :          * Set state to complete; see SyncRepWaitForLSN() for discussion of
     898             :          * the various states.
     899             :          */
     900          46 :         proc->syncRepState = SYNC_REP_WAIT_COMPLETE;
     901             : 
     902             :         /*
     903             :          * Wake only when we have set state and removed from queue.
     904             :          */
     905          46 :         SetLatch(&(proc->procLatch));
     906             : 
     907          46 :         numprocs++;
     908             :     }
     909             : 
     910         234 :     return numprocs;
     911             : }
     912             : 
     913             : /*
     914             :  * The checkpointer calls this as needed to update the shared
     915             :  * sync_standbys_defined flag, so that backends don't remain permanently wedged
     916             :  * if synchronous_standby_names is unset.  It's safe to check the current value
     917             :  * without the lock, because it's only ever updated by one process.  But we
     918             :  * must take the lock to change it.
     919             :  */
     920             : void
     921        1002 : SyncRepUpdateSyncStandbysDefined(void)
     922             : {
     923        1002 :     bool        sync_standbys_defined = SyncStandbysDefined();
     924             : 
     925        1002 :     if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
     926             :     {
     927          26 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     928             : 
     929             :         /*
     930             :          * If synchronous_standby_names has been reset to empty, it's futile
     931             :          * for backends to continue waiting.  Since the user no longer wants
     932             :          * synchronous replication, we'd better wake them up.
     933             :          */
     934          26 :         if (!sync_standbys_defined)
     935             :         {
     936             :             int         i;
     937             : 
     938           8 :             for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
     939           6 :                 SyncRepWakeQueue(true, i);
     940             :         }
     941             : 
     942             :         /*
     943             :          * Only allow people to join the queue when there are synchronous
     944             :          * standbys defined.  Without this interlock, there's a race
     945             :          * condition: we might wake up all the current waiters; then, some
     946             :          * backend that hasn't yet reloaded its config might go to sleep on
     947             :          * the queue (and never wake up).  This prevents that.
     948             :          */
     949          26 :         WalSndCtl->sync_standbys_defined = sync_standbys_defined;
     950             : 
     951          26 :         LWLockRelease(SyncRepLock);
     952             :     }
     953        1002 : }
     954             : 
     955             : #ifdef USE_ASSERT_CHECKING
     956             : static bool
     957             : SyncRepQueueIsOrderedByLSN(int mode)
     958             : {
     959             :     XLogRecPtr  lastLSN;
     960             :     dlist_iter  iter;
     961             : 
     962             :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
     963             : 
     964             :     lastLSN = 0;
     965             : 
     966             :     dlist_foreach(iter, &WalSndCtl->SyncRepQueue[mode])
     967             :     {
     968             :         PGPROC     *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
     969             : 
     970             :         /*
     971             :          * Check the queue is ordered by LSN and that multiple procs don't
     972             :          * have matching LSNs
     973             :          */
     974             :         if (proc->waitLSN <= lastLSN)
     975             :             return false;
     976             : 
     977             :         lastLSN = proc->waitLSN;
     978             :     }
     979             : 
     980             :     return true;
     981             : }
     982             : #endif
     983             : 
     984             : /*
     985             :  * ===========================================================
     986             :  * Synchronous Replication functions executed by any process
     987             :  * ===========================================================
     988             :  */
     989             : 
     990             : bool
     991        2128 : check_synchronous_standby_names(char **newval, void **extra, GucSource source)
     992             : {
     993        2128 :     if (*newval != NULL && (*newval)[0] != '\0')
     994         130 :     {
     995             :         yyscan_t    scanner;
     996             :         int         parse_rc;
     997             :         SyncRepConfigData *pconf;
     998             : 
     999             :         /* Reset communication variables to ensure a fresh start */
    1000         130 :         syncrep_parse_result = NULL;
    1001         130 :         syncrep_parse_error_msg = NULL;
    1002             : 
    1003             :         /* Parse the synchronous_standby_names string */
    1004         130 :         syncrep_scanner_init(*newval, &scanner);
    1005         130 :         parse_rc = syncrep_yyparse(scanner);
    1006         130 :         syncrep_scanner_finish(scanner);
    1007             : 
    1008         130 :         if (parse_rc != 0 || syncrep_parse_result == NULL)
    1009             :         {
    1010           0 :             GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
    1011           0 :             if (syncrep_parse_error_msg)
    1012           0 :                 GUC_check_errdetail("%s", syncrep_parse_error_msg);
    1013             :             else
    1014           0 :                 GUC_check_errdetail("\"%s\" parser failed.",
    1015             :                                     "synchronous_standby_names");
    1016           0 :             return false;
    1017             :         }
    1018             : 
    1019         130 :         if (syncrep_parse_result->num_sync <= 0)
    1020             :         {
    1021           0 :             GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
    1022           0 :                              syncrep_parse_result->num_sync);
    1023           0 :             return false;
    1024             :         }
    1025             : 
    1026             :         /* GUC extra value must be guc_malloc'd, not palloc'd */
    1027             :         pconf = (SyncRepConfigData *)
    1028         130 :             guc_malloc(LOG, syncrep_parse_result->config_size);
    1029         130 :         if (pconf == NULL)
    1030           0 :             return false;
    1031         130 :         memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);
    1032             : 
    1033         130 :         *extra = pconf;
    1034             : 
    1035             :         /*
    1036             :          * We need not explicitly clean up syncrep_parse_result.  It, and any
    1037             :          * other cruft generated during parsing, will be freed when the
    1038             :          * current memory context is deleted.  (This code is generally run in
    1039             :          * a short-lived context used for config file processing, so that will
    1040             :          * not be very long.)
    1041             :          */
    1042             :     }
    1043             :     else
    1044        1998 :         *extra = NULL;
    1045             : 
    1046        2128 :     return true;
    1047             : }
    1048             : 
    1049             : void
    1050        2108 : assign_synchronous_standby_names(const char *newval, void *extra)
    1051             : {
    1052        2108 :     SyncRepConfig = (SyncRepConfigData *) extra;
    1053        2108 : }
    1054             : 
    1055             : void
    1056        5404 : assign_synchronous_commit(int newval, void *extra)
    1057             : {
    1058        5404 :     switch (newval)
    1059             :     {
    1060           0 :         case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
    1061           0 :             SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
    1062           0 :             break;
    1063        2232 :         case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
    1064        2232 :             SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
    1065        2232 :             break;
    1066           4 :         case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
    1067           4 :             SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
    1068           4 :             break;
    1069        3168 :         default:
    1070        3168 :             SyncRepWaitMode = SYNC_REP_NO_WAIT;
    1071        3168 :             break;
    1072             :     }
    1073        5404 : }

Generated by: LCOV version 1.14