LCOV - code coverage report
Current view: top level - src/backend/replication - syncrep.c (source / functions) Hit Total Coverage
Test: PostgreSQL 12beta2 Lines: 267 358 74.6 %
Date: 2019-06-19 14:06:47 Functions: 16 19 84.2 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * syncrep.c
       4             :  *
       5             :  * Synchronous replication is new as of PostgreSQL 9.1.
       6             :  *
       7             :  * If requested, transaction commits wait until their commit LSN are
       8             :  * acknowledged by the synchronous standbys.
       9             :  *
      10             :  * This module contains the code for waiting and release of backends.
      11             :  * All code in this module executes on the primary. The core streaming
      12             :  * replication transport remains within WALreceiver/WALsender modules.
      13             :  *
      14             :  * The essence of this design is that it isolates all logic about
      15             :  * waiting/releasing onto the primary. The primary defines which standbys
      16             :  * it wishes to wait for. The standbys are completely unaware of the
      17             :  * durability requirements of transactions on the primary, reducing the
      18             :  * complexity of the code and streamlining both standby operations and
      19             :  * network bandwidth because there is no requirement to ship
      20             :  * per-transaction state information.
      21             :  *
      22             :  * Replication is either synchronous or not synchronous (async). If it is
      23             :  * async, we just fastpath out of here. If it is sync, then we wait for
      24             :  * the write, flush or apply location on the standby before releasing
      25             :  * the waiting backend. Further complexity in that interaction is
      26             :  * expected in later releases.
      27             :  *
      28             :  * The best performing way to manage the waiting backends is to have a
      29             :  * single ordered queue of waiting backends, so that we can avoid
      30             :  * searching the through all waiters each time we receive a reply.
      31             :  *
      32             :  * In 9.5 or before only a single standby could be considered as
      33             :  * synchronous. In 9.6 we support a priority-based multiple synchronous
      34             :  * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
      35             :  * supported. The number of synchronous standbys that transactions
      36             :  * must wait for replies from is specified in synchronous_standby_names.
      37             :  * This parameter also specifies a list of standby names and the method
      38             :  * (FIRST and ANY) to choose synchronous standbys from the listed ones.
      39             :  *
      40             :  * The method FIRST specifies a priority-based synchronous replication
      41             :  * and makes transaction commits wait until their WAL records are
      42             :  * replicated to the requested number of synchronous standbys chosen based
      43             :  * on their priorities. The standbys whose names appear earlier in the list
      44             :  * are given higher priority and will be considered as synchronous.
      45             :  * Other standby servers appearing later in this list represent potential
      46             :  * synchronous standbys. If any of the current synchronous standbys
      47             :  * disconnects for whatever reason, it will be replaced immediately with
      48             :  * the next-highest-priority standby.
      49             :  *
      50             :  * The method ANY specifies a quorum-based synchronous replication
      51             :  * and makes transaction commits wait until their WAL records are
      52             :  * replicated to at least the requested number of synchronous standbys
      53             :  * in the list. All the standbys appearing in the list are considered as
      54             :  * candidates for quorum synchronous standbys.
      55             :  *
      56             :  * If neither FIRST nor ANY is specified, FIRST is used as the method.
      57             :  * This is for backward compatibility with 9.6 or before where only a
      58             :  * priority-based sync replication was supported.
      59             :  *
      60             :  * Before the standbys chosen from synchronous_standby_names can
      61             :  * become the synchronous standbys they must have caught up with
      62             :  * the primary; that may take some time. Once caught up,
      63             :  * the standbys which are considered as synchronous at that moment
      64             :  * will release waiters from the queue.
      65             :  *
      66             :  * Portions Copyright (c) 2010-2019, PostgreSQL Global Development Group
      67             :  *
      68             :  * IDENTIFICATION
      69             :  *    src/backend/replication/syncrep.c
      70             :  *
      71             :  *-------------------------------------------------------------------------
      72             :  */
      73             : #include "postgres.h"
      74             : 
      75             : #include <unistd.h>
      76             : 
      77             : #include "access/xact.h"
      78             : #include "miscadmin.h"
      79             : #include "pgstat.h"
      80             : #include "replication/syncrep.h"
      81             : #include "replication/walsender.h"
      82             : #include "replication/walsender_private.h"
      83             : #include "storage/pmsignal.h"
      84             : #include "storage/proc.h"
      85             : #include "tcop/tcopprot.h"
      86             : #include "utils/builtins.h"
      87             : #include "utils/ps_status.h"
      88             : 
      89             : /* User-settable parameters for sync rep */
      90             : char       *SyncRepStandbyNames;
      91             : 
      92             : #define SyncStandbysDefined() \
      93             :     (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
      94             : 
      95             : static bool announce_next_takeover = true;
      96             : 
      97             : SyncRepConfigData *SyncRepConfig = NULL;
      98             : static int  SyncRepWaitMode = SYNC_REP_NO_WAIT;
      99             : 
     100             : static void SyncRepQueueInsert(int mode);
     101             : static void SyncRepCancelWait(void);
     102             : static int  SyncRepWakeQueue(bool all, int mode);
     103             : 
     104             : static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
     105             :                                  XLogRecPtr *flushPtr,
     106             :                                  XLogRecPtr *applyPtr,
     107             :                                  bool *am_sync);
     108             : static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
     109             :                                        XLogRecPtr *flushPtr,
     110             :                                        XLogRecPtr *applyPtr,
     111             :                                        List *sync_standbys);
     112             : static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
     113             :                                           XLogRecPtr *flushPtr,
     114             :                                           XLogRecPtr *applyPtr,
     115             :                                           List *sync_standbys, uint8 nth);
     116             : static int  SyncRepGetStandbyPriority(void);
     117             : static List *SyncRepGetSyncStandbysPriority(bool *am_sync);
     118             : static List *SyncRepGetSyncStandbysQuorum(bool *am_sync);
     119             : static int  cmp_lsn(const void *a, const void *b);
     120             : 
     121             : #ifdef USE_ASSERT_CHECKING
     122             : static bool SyncRepQueueIsOrderedByLSN(int mode);
     123             : #endif
     124             : 
     125             : /*
     126             :  * ===========================================================
     127             :  * Synchronous Replication functions for normal user backends
     128             :  * ===========================================================
     129             :  */
     130             : 
     131             : /*
     132             :  * Wait for synchronous replication, if requested by user.
     133             :  *
     134             :  * Initially backends start in state SYNC_REP_NOT_WAITING and then
     135             :  * change that state to SYNC_REP_WAITING before adding ourselves
     136             :  * to the wait queue. During SyncRepWakeQueue() a WALSender changes
     137             :  * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
     138             :  * This backend then resets its state to SYNC_REP_NOT_WAITING.
     139             :  *
     140             :  * 'lsn' represents the LSN to wait for.  'commit' indicates whether this LSN
     141             :  * represents a commit record.  If it doesn't, then we wait only for the WAL
     142             :  * to be flushed if synchronous_commit is set to the higher level of
     143             :  * remote_apply, because only commit records provide apply feedback.
     144             :  */
     145             : void
     146      236150 : SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
     147             : {
     148      236150 :     char       *new_status = NULL;
     149             :     const char *old_status;
     150             :     int         mode;
     151             : 
     152             :     /* Cap the level for anything other than commit to remote flush only. */
     153      236150 :     if (commit)
     154      236066 :         mode = SyncRepWaitMode;
     155             :     else
     156          84 :         mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
     157             : 
     158             :     /*
     159             :      * Fast exit if user has not requested sync replication.
     160             :      */
     161      236150 :     if (!SyncRepRequested())
     162        9936 :         return;
     163             : 
     164             :     Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
     165             :     Assert(WalSndCtl != NULL);
     166             : 
     167      226214 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     168             :     Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
     169             : 
     170             :     /*
     171             :      * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
     172             :      * set.  See SyncRepUpdateSyncStandbysDefined.
     173             :      *
     174             :      * Also check that the standby hasn't already replied. Unlikely race
     175             :      * condition but we'll be fetching that cache line anyway so it's likely
     176             :      * to be a low cost check.
     177             :      */
     178      226300 :     if (!WalSndCtl->sync_standbys_defined ||
     179          86 :         lsn <= WalSndCtl->lsn[mode])
     180             :     {
     181      226128 :         LWLockRelease(SyncRepLock);
     182      226128 :         return;
     183             :     }
     184             : 
     185             :     /*
     186             :      * Set our waitLSN so WALSender will know when to wake us, and add
     187             :      * ourselves to the queue.
     188             :      */
     189          86 :     MyProc->waitLSN = lsn;
     190          86 :     MyProc->syncRepState = SYNC_REP_WAITING;
     191          86 :     SyncRepQueueInsert(mode);
     192             :     Assert(SyncRepQueueIsOrderedByLSN(mode));
     193          86 :     LWLockRelease(SyncRepLock);
     194             : 
     195             :     /* Alter ps display to show waiting for sync rep. */
     196          86 :     if (update_process_title)
     197             :     {
     198             :         int         len;
     199             : 
     200          86 :         old_status = get_ps_display(&len);
     201          86 :         new_status = (char *) palloc(len + 32 + 1);
     202          86 :         memcpy(new_status, old_status, len);
     203         172 :         sprintf(new_status + len, " waiting for %X/%X",
     204          86 :                 (uint32) (lsn >> 32), (uint32) lsn);
     205          86 :         set_ps_display(new_status, false);
     206          86 :         new_status[len] = '\0'; /* truncate off " waiting ..." */
     207             :     }
     208             : 
     209             :     /*
     210             :      * Wait for specified LSN to be confirmed.
     211             :      *
     212             :      * Each proc has its own wait latch, so we perform a normal latch
     213             :      * check/wait loop here.
     214             :      */
     215             :     for (;;)
     216          86 :     {
     217             :         int         rc;
     218             : 
     219             :         /* Must reset the latch before testing state. */
     220         172 :         ResetLatch(MyLatch);
     221             : 
     222             :         /*
     223             :          * Acquiring the lock is not needed, the latch ensures proper
     224             :          * barriers. If it looks like we're done, we must really be done,
     225             :          * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
     226             :          * it will never update it again, so we can't be seeing a stale value
     227             :          * in that case.
     228             :          */
     229         172 :         if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
     230          86 :             break;
     231             : 
     232             :         /*
     233             :          * If a wait for synchronous replication is pending, we can neither
     234             :          * acknowledge the commit nor raise ERROR or FATAL.  The latter would
     235             :          * lead the client to believe that the transaction aborted, which is
     236             :          * not true: it's already committed locally. The former is no good
     237             :          * either: the client has requested synchronous replication, and is
     238             :          * entitled to assume that an acknowledged commit is also replicated,
     239             :          * which might not be true. So in this case we issue a WARNING (which
     240             :          * some clients may be able to interpret) and shut off further output.
     241             :          * We do NOT reset ProcDiePending, so that the process will die after
     242             :          * the commit is cleaned up.
     243             :          */
     244          86 :         if (ProcDiePending)
     245             :         {
     246           0 :             ereport(WARNING,
     247             :                     (errcode(ERRCODE_ADMIN_SHUTDOWN),
     248             :                      errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
     249             :                      errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
     250           0 :             whereToSendOutput = DestNone;
     251           0 :             SyncRepCancelWait();
     252           0 :             break;
     253             :         }
     254             : 
     255             :         /*
     256             :          * It's unclear what to do if a query cancel interrupt arrives.  We
     257             :          * can't actually abort at this point, but ignoring the interrupt
     258             :          * altogether is not helpful, so we just terminate the wait with a
     259             :          * suitable warning.
     260             :          */
     261          86 :         if (QueryCancelPending)
     262             :         {
     263           0 :             QueryCancelPending = false;
     264           0 :             ereport(WARNING,
     265             :                     (errmsg("canceling wait for synchronous replication due to user request"),
     266             :                      errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
     267           0 :             SyncRepCancelWait();
     268           0 :             break;
     269             :         }
     270             : 
     271             :         /*
     272             :          * Wait on latch.  Any condition that should wake us up will set the
     273             :          * latch, so no need for timeout.
     274             :          */
     275          86 :         rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
     276             :                        WAIT_EVENT_SYNC_REP);
     277             : 
     278             :         /*
     279             :          * If the postmaster dies, we'll probably never get an acknowledgment,
     280             :          * because all the wal sender processes will exit. So just bail out.
     281             :          */
     282          86 :         if (rc & WL_POSTMASTER_DEATH)
     283             :         {
     284           0 :             ProcDiePending = true;
     285           0 :             whereToSendOutput = DestNone;
     286           0 :             SyncRepCancelWait();
     287           0 :             break;
     288             :         }
     289             :     }
     290             : 
     291             :     /*
     292             :      * WalSender has checked our LSN and has removed us from queue. Clean up
     293             :      * state and leave.  It's OK to reset these shared memory fields without
     294             :      * holding SyncRepLock, because any walsenders will ignore us anyway when
     295             :      * we're not on the queue.  We need a read barrier to make sure we see the
     296             :      * changes to the queue link (this might be unnecessary without
     297             :      * assertions, but better safe than sorry).
     298             :      */
     299          86 :     pg_read_barrier();
     300             :     Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
     301          86 :     MyProc->syncRepState = SYNC_REP_NOT_WAITING;
     302          86 :     MyProc->waitLSN = 0;
     303             : 
     304          86 :     if (new_status)
     305             :     {
     306             :         /* Reset ps display */
     307          86 :         set_ps_display(new_status, false);
     308          86 :         pfree(new_status);
     309             :     }
     310             : }
     311             : 
     312             : /*
     313             :  * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
     314             :  *
     315             :  * Usually we will go at tail of queue, though it's possible that we arrive
     316             :  * here out of order, so start at tail and work back to insertion point.
     317             :  */
     318             : static void
     319          86 : SyncRepQueueInsert(int mode)
     320             : {
     321             :     PGPROC     *proc;
     322             : 
     323             :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
     324          86 :     proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
     325          86 :                                    &(WalSndCtl->SyncRepQueue[mode]),
     326             :                                    offsetof(PGPROC, syncRepLinks));
     327             : 
     328         172 :     while (proc)
     329             :     {
     330             :         /*
     331             :          * Stop at the queue element that we should after to ensure the queue
     332             :          * is ordered by LSN.
     333             :          */
     334           0 :         if (proc->waitLSN < MyProc->waitLSN)
     335           0 :             break;
     336             : 
     337           0 :         proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
     338           0 :                                        &(proc->syncRepLinks),
     339             :                                        offsetof(PGPROC, syncRepLinks));
     340             :     }
     341             : 
     342          86 :     if (proc)
     343           0 :         SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
     344             :     else
     345          86 :         SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
     346          86 : }
     347             : 
     348             : /*
     349             :  * Acquire SyncRepLock and cancel any wait currently in progress.
     350             :  */
     351             : static void
     352           0 : SyncRepCancelWait(void)
     353             : {
     354           0 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     355           0 :     if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
     356           0 :         SHMQueueDelete(&(MyProc->syncRepLinks));
     357           0 :     MyProc->syncRepState = SYNC_REP_NOT_WAITING;
     358           0 :     LWLockRelease(SyncRepLock);
     359           0 : }
     360             : 
     361             : void
     362        9794 : SyncRepCleanupAtProcExit(void)
     363             : {
     364        9794 :     if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
     365             :     {
     366           0 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     367           0 :         SHMQueueDelete(&(MyProc->syncRepLinks));
     368           0 :         LWLockRelease(SyncRepLock);
     369             :     }
     370        9794 : }
     371             : 
     372             : /*
     373             :  * ===========================================================
     374             :  * Synchronous Replication functions for wal sender processes
     375             :  * ===========================================================
     376             :  */
     377             : 
     378             : /*
     379             :  * Take any action required to initialise sync rep state from config
     380             :  * data. Called at WALSender startup and after each SIGHUP.
     381             :  */
     382             : void
     383         250 : SyncRepInitConfig(void)
     384             : {
     385             :     int         priority;
     386             : 
     387             :     /*
     388             :      * Determine if we are a potential sync standby and remember the result
     389             :      * for handling replies from standby.
     390             :      */
     391         250 :     priority = SyncRepGetStandbyPriority();
     392         250 :     if (MyWalSnd->sync_standby_priority != priority)
     393             :     {
     394          30 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     395          30 :         MyWalSnd->sync_standby_priority = priority;
     396          30 :         LWLockRelease(SyncRepLock);
     397          30 :         ereport(DEBUG1,
     398             :                 (errmsg("standby \"%s\" now has synchronous standby priority %u",
     399             :                         application_name, priority)));
     400             :     }
     401         250 : }
     402             : 
     403             : /*
     404             :  * Update the LSNs on each queue based upon our latest state. This
     405             :  * implements a simple policy of first-valid-sync-standby-releases-waiter.
     406             :  *
     407             :  * Other policies are possible, which would change what we do here and
     408             :  * perhaps also which information we store as well.
     409             :  */
     410             : void
     411        6552 : SyncRepReleaseWaiters(void)
     412             : {
     413        6552 :     volatile WalSndCtlData *walsndctl = WalSndCtl;
     414             :     XLogRecPtr  writePtr;
     415             :     XLogRecPtr  flushPtr;
     416             :     XLogRecPtr  applyPtr;
     417             :     bool        got_recptr;
     418             :     bool        am_sync;
     419        6552 :     int         numwrite = 0;
     420        6552 :     int         numflush = 0;
     421        6552 :     int         numapply = 0;
     422             : 
     423             :     /*
     424             :      * If this WALSender is serving a standby that is not on the list of
     425             :      * potential sync standbys then we have nothing to do. If we are still
     426             :      * starting up, still running base backup or the current flush position is
     427             :      * still invalid, then leave quickly also.  Streaming or stopping WAL
     428             :      * senders are allowed to release waiters.
     429             :      */
     430        6756 :     if (MyWalSnd->sync_standby_priority == 0 ||
     431         248 :         (MyWalSnd->state != WALSNDSTATE_STREAMING &&
     432         248 :          MyWalSnd->state != WALSNDSTATE_STOPPING) ||
     433         204 :         XLogRecPtrIsInvalid(MyWalSnd->flush))
     434             :     {
     435        6348 :         announce_next_takeover = true;
     436       12698 :         return;
     437             :     }
     438             : 
     439             :     /*
     440             :      * We're a potential sync standby. Release waiters if there are enough
     441             :      * sync standbys and we are considered as sync.
     442             :      */
     443         204 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     444             : 
     445             :     /*
     446             :      * Check whether we are a sync standby or not, and calculate the synced
     447             :      * positions among all sync standbys.
     448             :      */
     449         204 :     got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
     450             : 
     451             :     /*
     452             :      * If we are managing a sync standby, though we weren't prior to this,
     453             :      * then announce we are now a sync standby.
     454             :      */
     455         204 :     if (announce_next_takeover && am_sync)
     456             :     {
     457          16 :         announce_next_takeover = false;
     458             : 
     459          16 :         if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
     460          16 :             ereport(LOG,
     461             :                     (errmsg("standby \"%s\" is now a synchronous standby with priority %u",
     462             :                             application_name, MyWalSnd->sync_standby_priority)));
     463             :         else
     464           0 :             ereport(LOG,
     465             :                     (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
     466             :                             application_name)));
     467             :     }
     468             : 
     469             :     /*
     470             :      * If the number of sync standbys is less than requested or we aren't
     471             :      * managing a sync standby then just leave.
     472             :      */
     473         204 :     if (!got_recptr || !am_sync)
     474             :     {
     475           2 :         LWLockRelease(SyncRepLock);
     476           2 :         announce_next_takeover = !am_sync;
     477           2 :         return;
     478             :     }
     479             : 
     480             :     /*
     481             :      * Set the lsn first so that when we wake backends they will release up to
     482             :      * this location.
     483             :      */
     484         202 :     if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
     485             :     {
     486          74 :         walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
     487          74 :         numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
     488             :     }
     489         202 :     if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
     490             :     {
     491          80 :         walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
     492          80 :         numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
     493             :     }
     494         202 :     if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
     495             :     {
     496          78 :         walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
     497          78 :         numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
     498             :     }
     499             : 
     500         202 :     LWLockRelease(SyncRepLock);
     501             : 
     502         202 :     elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
     503             :          numwrite, (uint32) (writePtr >> 32), (uint32) writePtr,
     504             :          numflush, (uint32) (flushPtr >> 32), (uint32) flushPtr,
     505             :          numapply, (uint32) (applyPtr >> 32), (uint32) applyPtr);
     506             : }
     507             : 
     508             : /*
     509             :  * Calculate the synced Write, Flush and Apply positions among sync standbys.
     510             :  *
     511             :  * Return false if the number of sync standbys is less than
     512             :  * synchronous_standby_names specifies. Otherwise return true and
     513             :  * store the positions into *writePtr, *flushPtr and *applyPtr.
     514             :  *
     515             :  * On return, *am_sync is set to true if this walsender is connecting to
     516             :  * sync standby. Otherwise it's set to false.
     517             :  */
     518             : static bool
     519         204 : SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
     520             :                      XLogRecPtr *applyPtr, bool *am_sync)
     521             : {
     522             :     List       *sync_standbys;
     523             : 
     524         204 :     *writePtr = InvalidXLogRecPtr;
     525         204 :     *flushPtr = InvalidXLogRecPtr;
     526         204 :     *applyPtr = InvalidXLogRecPtr;
     527         204 :     *am_sync = false;
     528             : 
     529             :     /* Get standbys that are considered as synchronous at this moment */
     530         204 :     sync_standbys = SyncRepGetSyncStandbys(am_sync);
     531             : 
     532             :     /*
     533             :      * Quick exit if we are not managing a sync standby or there are not
     534             :      * enough synchronous standbys.
     535             :      */
     536         406 :     if (!(*am_sync) ||
     537         404 :         SyncRepConfig == NULL ||
     538         202 :         list_length(sync_standbys) < SyncRepConfig->num_sync)
     539             :     {
     540           2 :         list_free(sync_standbys);
     541           2 :         return false;
     542             :     }
     543             : 
     544             :     /*
     545             :      * In a priority-based sync replication, the synced positions are the
     546             :      * oldest ones among sync standbys. In a quorum-based, they are the Nth
     547             :      * latest ones.
     548             :      *
     549             :      * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
     550             :      * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
     551             :      * because it's a bit more efficient.
     552             :      *
     553             :      * XXX If the numbers of current and requested sync standbys are the same,
     554             :      * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
     555             :      * positions even in a quorum-based sync replication.
     556             :      */
     557         202 :     if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
     558             :     {
     559         202 :         SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
     560             :                                    sync_standbys);
     561             :     }
     562             :     else
     563             :     {
     564           0 :         SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
     565           0 :                                       sync_standbys, SyncRepConfig->num_sync);
     566             :     }
     567             : 
     568         202 :     list_free(sync_standbys);
     569         202 :     return true;
     570             : }
     571             : 
     572             : /*
     573             :  * Calculate the oldest Write, Flush and Apply positions among sync standbys.
     574             :  */
     575             : static void
     576         202 : SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
     577             :                            XLogRecPtr *applyPtr, List *sync_standbys)
     578             : {
     579             :     ListCell   *cell;
     580             : 
     581             :     /*
     582             :      * Scan through all sync standbys and calculate the oldest Write, Flush
     583             :      * and Apply positions.
     584             :      */
     585         404 :     foreach(cell, sync_standbys)
     586             :     {
     587         202 :         WalSnd     *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
     588             :         XLogRecPtr  write;
     589             :         XLogRecPtr  flush;
     590             :         XLogRecPtr  apply;
     591             : 
     592         202 :         SpinLockAcquire(&walsnd->mutex);
     593         202 :         write = walsnd->write;
     594         202 :         flush = walsnd->flush;
     595         202 :         apply = walsnd->apply;
     596         202 :         SpinLockRelease(&walsnd->mutex);
     597             : 
     598         202 :         if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
     599         202 :             *writePtr = write;
     600         202 :         if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
     601         202 :             *flushPtr = flush;
     602         202 :         if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
     603         202 :             *applyPtr = apply;
     604             :     }
     605         202 : }
     606             : 
     607             : /*
     608             :  * Calculate the Nth latest Write, Flush and Apply positions among sync
     609             :  * standbys.
     610             :  */
     611             : static void
     612           0 : SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
     613             :                               XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth)
     614             : {
     615             :     ListCell   *cell;
     616             :     XLogRecPtr *write_array;
     617             :     XLogRecPtr *flush_array;
     618             :     XLogRecPtr *apply_array;
     619             :     int         len;
     620           0 :     int         i = 0;
     621             : 
     622           0 :     len = list_length(sync_standbys);
     623           0 :     write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
     624           0 :     flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
     625           0 :     apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
     626             : 
     627           0 :     foreach(cell, sync_standbys)
     628             :     {
     629           0 :         WalSnd     *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
     630             : 
     631           0 :         SpinLockAcquire(&walsnd->mutex);
     632           0 :         write_array[i] = walsnd->write;
     633           0 :         flush_array[i] = walsnd->flush;
     634           0 :         apply_array[i] = walsnd->apply;
     635           0 :         SpinLockRelease(&walsnd->mutex);
     636             : 
     637           0 :         i++;
     638             :     }
     639             : 
     640             :     /* Sort each array in descending order */
     641           0 :     qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn);
     642           0 :     qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn);
     643           0 :     qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn);
     644             : 
     645             :     /* Get Nth latest Write, Flush, Apply positions */
     646           0 :     *writePtr = write_array[nth - 1];
     647           0 :     *flushPtr = flush_array[nth - 1];
     648           0 :     *applyPtr = apply_array[nth - 1];
     649             : 
     650           0 :     pfree(write_array);
     651           0 :     pfree(flush_array);
     652           0 :     pfree(apply_array);
     653           0 : }
     654             : 
     655             : /*
     656             :  * Compare lsn in order to sort array in descending order.
     657             :  */
     658             : static int
     659           0 : cmp_lsn(const void *a, const void *b)
     660             : {
     661           0 :     XLogRecPtr  lsn1 = *((const XLogRecPtr *) a);
     662           0 :     XLogRecPtr  lsn2 = *((const XLogRecPtr *) b);
     663             : 
     664           0 :     if (lsn1 > lsn2)
     665           0 :         return -1;
     666           0 :     else if (lsn1 == lsn2)
     667           0 :         return 0;
     668             :     else
     669           0 :         return 1;
     670             : }
     671             : 
     672             : /*
     673             :  * Return the list of sync standbys, or NIL if no sync standby is connected.
     674             :  *
     675             :  * The caller must hold SyncRepLock.
     676             :  *
     677             :  * On return, *am_sync is set to true if this walsender is connecting to
     678             :  * sync standby. Otherwise it's set to false.
     679             :  */
     680             : List *
     681         426 : SyncRepGetSyncStandbys(bool *am_sync)
     682             : {
     683             :     /* Set default result */
     684         426 :     if (am_sync != NULL)
     685         204 :         *am_sync = false;
     686             : 
     687             :     /* Quick exit if sync replication is not requested */
     688         426 :     if (SyncRepConfig == NULL)
     689         194 :         return NIL;
     690             : 
     691         232 :     return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ?
     692         232 :         SyncRepGetSyncStandbysPriority(am_sync) :
     693             :         SyncRepGetSyncStandbysQuorum(am_sync);
     694             : }
     695             : 
     696             : /*
     697             :  * Return the list of all the candidates for quorum sync standbys,
     698             :  * or NIL if no such standby is connected.
     699             :  *
     700             :  * The caller must hold SyncRepLock. This function must be called only in
     701             :  * a quorum-based sync replication.
     702             :  *
     703             :  * On return, *am_sync is set to true if this walsender is connecting to
     704             :  * sync standby. Otherwise it's set to false.
     705             :  */
     706             : static List *
     707           4 : SyncRepGetSyncStandbysQuorum(bool *am_sync)
     708             : {
     709           4 :     List       *result = NIL;
     710             :     int         i;
     711             :     volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
     712             :                                  * rearrangement */
     713             : 
     714             :     Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM);
     715             : 
     716          24 :     for (i = 0; i < max_wal_senders; i++)
     717             :     {
     718             :         XLogRecPtr  flush;
     719             :         WalSndState state;
     720             :         int         pid;
     721             : 
     722          20 :         walsnd = &WalSndCtl->walsnds[i];
     723             : 
     724          20 :         SpinLockAcquire(&walsnd->mutex);
     725          20 :         pid = walsnd->pid;
     726          20 :         flush = walsnd->flush;
     727          20 :         state = walsnd->state;
     728          20 :         SpinLockRelease(&walsnd->mutex);
     729             : 
     730             :         /* Must be active */
     731          20 :         if (pid == 0)
     732           6 :             continue;
     733             : 
     734             :         /* Must be streaming or stopping */
     735          14 :         if (state != WALSNDSTATE_STREAMING &&
     736             :             state != WALSNDSTATE_STOPPING)
     737           0 :             continue;
     738             : 
     739             :         /* Must be synchronous */
     740          14 :         if (walsnd->sync_standby_priority == 0)
     741           2 :             continue;
     742             : 
     743             :         /* Must have a valid flush position */
     744          12 :         if (XLogRecPtrIsInvalid(flush))
     745           2 :             continue;
     746             : 
     747             :         /*
     748             :          * Consider this standby as a candidate for quorum sync standbys and
     749             :          * append it to the result.
     750             :          */
     751          10 :         result = lappend_int(result, i);
     752          10 :         if (am_sync != NULL && walsnd == MyWalSnd)
     753           0 :             *am_sync = true;
     754             :     }
     755             : 
     756           4 :     return result;
     757             : }
     758             : 
     759             : /*
     760             :  * Return the list of sync standbys chosen based on their priorities,
     761             :  * or NIL if no sync standby is connected.
     762             :  *
     763             :  * If there are multiple standbys with the same priority,
     764             :  * the first one found is selected preferentially.
     765             :  *
     766             :  * The caller must hold SyncRepLock. This function must be called only in
     767             :  * a priority-based sync replication.
     768             :  *
     769             :  * On return, *am_sync is set to true if this walsender is connecting to
     770             :  * sync standby. Otherwise it's set to false.
     771             :  */
     772             : static List *
     773         228 : SyncRepGetSyncStandbysPriority(bool *am_sync)
     774             : {
     775         228 :     List       *result = NIL;
     776         228 :     List       *pending = NIL;
     777             :     int         lowest_priority;
     778             :     int         next_highest_priority;
     779             :     int         this_priority;
     780             :     int         priority;
     781             :     int         i;
     782         228 :     bool        am_in_pending = false;
     783             :     volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
     784             :                                  * rearrangement */
     785             : 
     786             :     Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY);
     787             : 
     788         228 :     lowest_priority = SyncRepConfig->nmembers;
     789         228 :     next_highest_priority = lowest_priority + 1;
     790             : 
     791             :     /*
     792             :      * Find the sync standbys which have the highest priority (i.e, 1). Also
     793             :      * store all the other potential sync standbys into the pending list, in
     794             :      * order to scan it later and find other sync standbys from it quickly.
     795             :      */
     796         294 :     for (i = 0; i < max_wal_senders; i++)
     797             :     {
     798             :         XLogRecPtr  flush;
     799             :         WalSndState state;
     800             :         int         pid;
     801             : 
     802         282 :         walsnd = &WalSndCtl->walsnds[i];
     803             : 
     804         282 :         SpinLockAcquire(&walsnd->mutex);
     805         282 :         pid = walsnd->pid;
     806         282 :         flush = walsnd->flush;
     807         282 :         state = walsnd->state;
     808         282 :         SpinLockRelease(&walsnd->mutex);
     809             : 
     810             :         /* Must be active */
     811         282 :         if (pid == 0)
     812          22 :             continue;
     813             : 
     814             :         /* Must be streaming or stopping */
     815         260 :         if (state != WALSNDSTATE_STREAMING &&
     816             :             state != WALSNDSTATE_STOPPING)
     817           0 :             continue;
     818             : 
     819             :         /* Must be synchronous */
     820         260 :         this_priority = walsnd->sync_standby_priority;
     821         260 :         if (this_priority == 0)
     822           6 :             continue;
     823             : 
     824             :         /* Must have a valid flush position */
     825         254 :         if (XLogRecPtrIsInvalid(flush))
     826           0 :             continue;
     827             : 
     828             :         /*
     829             :          * If the priority is equal to 1, consider this standby as sync and
     830             :          * append it to the result. Otherwise append this standby to the
     831             :          * pending list to check if it's actually sync or not later.
     832             :          */
     833         254 :         if (this_priority == 1)
     834             :         {
     835         230 :             result = lappend_int(result, i);
     836         230 :             if (am_sync != NULL && walsnd == MyWalSnd)
     837         202 :                 *am_sync = true;
     838         230 :             if (list_length(result) == SyncRepConfig->num_sync)
     839             :             {
     840         216 :                 list_free(pending);
     841         216 :                 return result;  /* Exit if got enough sync standbys */
     842             :             }
     843             :         }
     844             :         else
     845             :         {
     846          24 :             pending = lappend_int(pending, i);
     847          24 :             if (am_sync != NULL && walsnd == MyWalSnd)
     848           0 :                 am_in_pending = true;
     849             : 
     850             :             /*
     851             :              * Track the highest priority among the standbys in the pending
     852             :              * list, in order to use it as the starting priority for later
     853             :              * scan of the list. This is useful to find quickly the sync
     854             :              * standbys from the pending list later because we can skip
     855             :              * unnecessary scans for the unused priorities.
     856             :              */
     857          24 :             if (this_priority < next_highest_priority)
     858          14 :                 next_highest_priority = this_priority;
     859             :         }
     860             :     }
     861             : 
     862             :     /*
     863             :      * Consider all pending standbys as sync if the number of them plus
     864             :      * already-found sync ones is lower than the configuration requests.
     865             :      */
     866          12 :     if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync)
     867             :     {
     868           6 :         bool        needfree = (result != NIL && pending != NIL);
     869             : 
     870             :         /*
     871             :          * Set *am_sync to true if this walsender is in the pending list
     872             :          * because all pending standbys are considered as sync.
     873             :          */
     874           6 :         if (am_sync != NULL && !(*am_sync))
     875           0 :             *am_sync = am_in_pending;
     876             : 
     877           6 :         result = list_concat(result, pending);
     878           6 :         if (needfree)
     879           4 :             pfree(pending);
     880           6 :         return result;
     881             :     }
     882             : 
     883             :     /*
     884             :      * Find the sync standbys from the pending list.
     885             :      */
     886           6 :     priority = next_highest_priority;
     887          12 :     while (priority <= lowest_priority)
     888             :     {
     889             :         ListCell   *cell;
     890           6 :         ListCell   *prev = NULL;
     891             :         ListCell   *next;
     892             : 
     893           6 :         next_highest_priority = lowest_priority + 1;
     894             : 
     895           6 :         for (cell = list_head(pending); cell != NULL; cell = next)
     896             :         {
     897           6 :             i = lfirst_int(cell);
     898           6 :             walsnd = &WalSndCtl->walsnds[i];
     899             : 
     900           6 :             next = lnext(cell);
     901             : 
     902           6 :             this_priority = walsnd->sync_standby_priority;
     903           6 :             if (this_priority == priority)
     904             :             {
     905           6 :                 result = lappend_int(result, i);
     906           6 :                 if (am_sync != NULL && walsnd == MyWalSnd)
     907           0 :                     *am_sync = true;
     908             : 
     909             :                 /*
     910             :                  * We should always exit here after the scan of pending list
     911             :                  * starts because we know that the list has enough elements to
     912             :                  * reach SyncRepConfig->num_sync.
     913             :                  */
     914           6 :                 if (list_length(result) == SyncRepConfig->num_sync)
     915             :                 {
     916           6 :                     list_free(pending);
     917           6 :                     return result;  /* Exit if got enough sync standbys */
     918             :                 }
     919             : 
     920             :                 /*
     921             :                  * Remove the entry for this sync standby from the list to
     922             :                  * prevent us from looking at the same entry again.
     923             :                  */
     924           0 :                 pending = list_delete_cell(pending, cell, prev);
     925             : 
     926           0 :                 continue;
     927             :             }
     928             : 
     929           0 :             if (this_priority < next_highest_priority)
     930           0 :                 next_highest_priority = this_priority;
     931             : 
     932           0 :             prev = cell;
     933             :         }
     934             : 
     935           0 :         priority = next_highest_priority;
     936             :     }
     937             : 
     938             :     /* never reached, but keep compiler quiet */
     939             :     Assert(false);
     940           0 :     return result;
     941             : }
     942             : 
     943             : /*
     944             :  * Check if we are in the list of sync standbys, and if so, determine
     945             :  * priority sequence. Return priority if set, or zero to indicate that
     946             :  * we are not a potential sync standby.
     947             :  *
     948             :  * Compare the parameter SyncRepStandbyNames against the application_name
     949             :  * for this WALSender, or allow any name if we find a wildcard "*".
     950             :  */
     951             : static int
     952         250 : SyncRepGetStandbyPriority(void)
     953             : {
     954             :     const char *standby_name;
     955             :     int         priority;
     956         250 :     bool        found = false;
     957             : 
     958             :     /*
     959             :      * Since synchronous cascade replication is not allowed, we always set the
     960             :      * priority of cascading walsender to zero.
     961             :      */
     962         250 :     if (am_cascading_walsender)
     963           8 :         return 0;
     964             : 
     965         242 :     if (!SyncStandbysDefined() || SyncRepConfig == NULL)
     966         198 :         return 0;
     967             : 
     968          44 :     standby_name = SyncRepConfig->member_names;
     969          60 :     for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
     970             :     {
     971          94 :         if (pg_strcasecmp(standby_name, application_name) == 0 ||
     972          36 :             strcmp(standby_name, "*") == 0)
     973             :         {
     974          42 :             found = true;
     975          42 :             break;
     976             :         }
     977          16 :         standby_name += strlen(standby_name) + 1;
     978             :     }
     979             : 
     980          44 :     if (!found)
     981           2 :         return 0;
     982             : 
     983             :     /*
     984             :      * In quorum-based sync replication, all the standbys in the list have the
     985             :      * same priority, one.
     986             :      */
     987          42 :     return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
     988             : }
     989             : 
     990             : /*
     991             :  * Walk the specified queue from head.  Set the state of any backends that
     992             :  * need to be woken, remove them from the queue, and then wake them.
     993             :  * Pass all = true to wake whole queue; otherwise, just wake up to
     994             :  * the walsender's LSN.
     995             :  *
     996             :  * Must hold SyncRepLock.
     997             :  */
     998             : static int
     999         232 : SyncRepWakeQueue(bool all, int mode)
    1000             : {
    1001         232 :     volatile WalSndCtlData *walsndctl = WalSndCtl;
    1002         232 :     PGPROC     *proc = NULL;
    1003         232 :     PGPROC     *thisproc = NULL;
    1004         232 :     int         numprocs = 0;
    1005             : 
    1006             :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
    1007             :     Assert(SyncRepQueueIsOrderedByLSN(mode));
    1008             : 
    1009         232 :     proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
    1010         232 :                                    &(WalSndCtl->SyncRepQueue[mode]),
    1011             :                                    offsetof(PGPROC, syncRepLinks));
    1012             : 
    1013         504 :     while (proc)
    1014             :     {
    1015             :         /*
    1016             :          * Assume the queue is ordered by LSN
    1017             :          */
    1018          48 :         if (!all && walsndctl->lsn[mode] < proc->waitLSN)
    1019           8 :             return numprocs;
    1020             : 
    1021             :         /*
    1022             :          * Move to next proc, so we can delete thisproc from the queue.
    1023             :          * thisproc is valid, proc may be NULL after this.
    1024             :          */
    1025          40 :         thisproc = proc;
    1026          40 :         proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
    1027          40 :                                        &(proc->syncRepLinks),
    1028             :                                        offsetof(PGPROC, syncRepLinks));
    1029             : 
    1030             :         /*
    1031             :          * Remove thisproc from queue.
    1032             :          */
    1033          40 :         SHMQueueDelete(&(thisproc->syncRepLinks));
    1034             : 
    1035             :         /*
    1036             :          * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
    1037             :          * make sure that it sees the queue link being removed before the
    1038             :          * syncRepState change.
    1039             :          */
    1040          40 :         pg_write_barrier();
    1041             : 
    1042             :         /*
    1043             :          * Set state to complete; see SyncRepWaitForLSN() for discussion of
    1044             :          * the various states.
    1045             :          */
    1046          40 :         thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;
    1047             : 
    1048             :         /*
    1049             :          * Wake only when we have set state and removed from queue.
    1050             :          */
    1051          40 :         SetLatch(&(thisproc->procLatch));
    1052             : 
    1053          40 :         numprocs++;
    1054             :     }
    1055             : 
    1056         224 :     return numprocs;
    1057             : }
    1058             : 
    1059             : /*
    1060             :  * The checkpointer calls this as needed to update the shared
    1061             :  * sync_standbys_defined flag, so that backends don't remain permanently wedged
    1062             :  * if synchronous_standby_names is unset.  It's safe to check the current value
    1063             :  * without the lock, because it's only ever updated by one process.  But we
    1064             :  * must take the lock to change it.
    1065             :  */
    1066             : void
    1067         390 : SyncRepUpdateSyncStandbysDefined(void)
    1068             : {
    1069         390 :     bool        sync_standbys_defined = SyncStandbysDefined();
    1070             : 
    1071         390 :     if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
    1072             :     {
    1073          16 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
    1074             : 
    1075             :         /*
    1076             :          * If synchronous_standby_names has been reset to empty, it's futile
    1077             :          * for backends to continue to waiting.  Since the user no longer
    1078             :          * wants synchronous replication, we'd better wake them up.
    1079             :          */
    1080          16 :         if (!sync_standbys_defined)
    1081             :         {
    1082             :             int         i;
    1083             : 
    1084           0 :             for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
    1085           0 :                 SyncRepWakeQueue(true, i);
    1086             :         }
    1087             : 
    1088             :         /*
    1089             :          * Only allow people to join the queue when there are synchronous
    1090             :          * standbys defined.  Without this interlock, there's a race
    1091             :          * condition: we might wake up all the current waiters; then, some
    1092             :          * backend that hasn't yet reloaded its config might go to sleep on
    1093             :          * the queue (and never wake up).  This prevents that.
    1094             :          */
    1095          16 :         WalSndCtl->sync_standbys_defined = sync_standbys_defined;
    1096             : 
    1097          16 :         LWLockRelease(SyncRepLock);
    1098             :     }
    1099         390 : }
    1100             : 
    1101             : #ifdef USE_ASSERT_CHECKING
    1102             : static bool
    1103             : SyncRepQueueIsOrderedByLSN(int mode)
    1104             : {
    1105             :     PGPROC     *proc = NULL;
    1106             :     XLogRecPtr  lastLSN;
    1107             : 
    1108             :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
    1109             : 
    1110             :     lastLSN = 0;
    1111             : 
    1112             :     proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
    1113             :                                    &(WalSndCtl->SyncRepQueue[mode]),
    1114             :                                    offsetof(PGPROC, syncRepLinks));
    1115             : 
    1116             :     while (proc)
    1117             :     {
    1118             :         /*
    1119             :          * Check the queue is ordered by LSN and that multiple procs don't
    1120             :          * have matching LSNs
    1121             :          */
    1122             :         if (proc->waitLSN <= lastLSN)
    1123             :             return false;
    1124             : 
    1125             :         lastLSN = proc->waitLSN;
    1126             : 
    1127             :         proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
    1128             :                                        &(proc->syncRepLinks),
    1129             :                                        offsetof(PGPROC, syncRepLinks));
    1130             :     }
    1131             : 
    1132             :     return true;
    1133             : }
    1134             : #endif
    1135             : 
    1136             : /*
    1137             :  * ===========================================================
    1138             :  * Synchronous Replication functions executed by any process
    1139             :  * ===========================================================
    1140             :  */
    1141             : 
    1142             : bool
    1143        2002 : check_synchronous_standby_names(char **newval, void **extra, GucSource source)
    1144             : {
    1145        2002 :     if (*newval != NULL && (*newval)[0] != '\0')
    1146         148 :     {
    1147             :         int         parse_rc;
    1148             :         SyncRepConfigData *pconf;
    1149             : 
    1150             :         /* Reset communication variables to ensure a fresh start */
    1151         148 :         syncrep_parse_result = NULL;
    1152         148 :         syncrep_parse_error_msg = NULL;
    1153             : 
    1154             :         /* Parse the synchronous_standby_names string */
    1155         148 :         syncrep_scanner_init(*newval);
    1156         148 :         parse_rc = syncrep_yyparse();
    1157         148 :         syncrep_scanner_finish();
    1158             : 
    1159         148 :         if (parse_rc != 0 || syncrep_parse_result == NULL)
    1160             :         {
    1161           0 :             GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
    1162           0 :             if (syncrep_parse_error_msg)
    1163           0 :                 GUC_check_errdetail("%s", syncrep_parse_error_msg);
    1164             :             else
    1165           0 :                 GUC_check_errdetail("synchronous_standby_names parser failed");
    1166           0 :             return false;
    1167             :         }
    1168             : 
    1169         148 :         if (syncrep_parse_result->num_sync <= 0)
    1170             :         {
    1171           0 :             GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
    1172           0 :                              syncrep_parse_result->num_sync);
    1173           0 :             return false;
    1174             :         }
    1175             : 
    1176             :         /* GUC extra value must be malloc'd, not palloc'd */
    1177         148 :         pconf = (SyncRepConfigData *)
    1178         148 :             malloc(syncrep_parse_result->config_size);
    1179         148 :         if (pconf == NULL)
    1180           0 :             return false;
    1181         148 :         memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);
    1182             : 
    1183         148 :         *extra = (void *) pconf;
    1184             : 
    1185             :         /*
    1186             :          * We need not explicitly clean up syncrep_parse_result.  It, and any
    1187             :          * other cruft generated during parsing, will be freed when the
    1188             :          * current memory context is deleted.  (This code is generally run in
    1189             :          * a short-lived context used for config file processing, so that will
    1190             :          * not be very long.)
    1191             :          */
    1192             :     }
    1193             :     else
    1194        1854 :         *extra = NULL;
    1195             : 
    1196        2002 :     return true;
    1197             : }
    1198             : 
    1199             : void
    1200        1984 : assign_synchronous_standby_names(const char *newval, void *extra)
    1201             : {
    1202        1984 :     SyncRepConfig = (SyncRepConfigData *) extra;
    1203        1984 : }
    1204             : 
    1205             : void
    1206        2232 : assign_synchronous_commit(int newval, void *extra)
    1207             : {
    1208        2232 :     switch (newval)
    1209             :     {
    1210             :         case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
    1211           0 :             SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
    1212           0 :             break;
    1213             :         case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
    1214        2024 :             SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
    1215        2024 :             break;
    1216             :         case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
    1217           0 :             SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
    1218           0 :             break;
    1219             :         default:
    1220         208 :             SyncRepWaitMode = SYNC_REP_NO_WAIT;
    1221         208 :             break;
    1222             :     }
    1223        2232 : }

Generated by: LCOV version 1.13