LCOV - code coverage report
Current view: top level - src/backend/replication - syncrep.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 264 353 74.8 %
Date: 2019-09-19 02:07:14 Functions: 16 19 84.2 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * syncrep.c
       4             :  *
       5             :  * Synchronous replication is new as of PostgreSQL 9.1.
       6             :  *
       7             :  * If requested, transaction commits wait until their commit LSN are
       8             :  * acknowledged by the synchronous standbys.
       9             :  *
      10             :  * This module contains the code for waiting and release of backends.
      11             :  * All code in this module executes on the primary. The core streaming
      12             :  * replication transport remains within WALreceiver/WALsender modules.
      13             :  *
      14             :  * The essence of this design is that it isolates all logic about
      15             :  * waiting/releasing onto the primary. The primary defines which standbys
      16             :  * it wishes to wait for. The standbys are completely unaware of the
      17             :  * durability requirements of transactions on the primary, reducing the
      18             :  * complexity of the code and streamlining both standby operations and
      19             :  * network bandwidth because there is no requirement to ship
      20             :  * per-transaction state information.
      21             :  *
      22             :  * Replication is either synchronous or not synchronous (async). If it is
      23             :  * async, we just fastpath out of here. If it is sync, then we wait for
      24             :  * the write, flush or apply location on the standby before releasing
      25             :  * the waiting backend. Further complexity in that interaction is
      26             :  * expected in later releases.
      27             :  *
      28             :  * The best performing way to manage the waiting backends is to have a
      29             :  * single ordered queue of waiting backends, so that we can avoid
      30             :  * searching the through all waiters each time we receive a reply.
      31             :  *
      32             :  * In 9.5 or before only a single standby could be considered as
      33             :  * synchronous. In 9.6 we support a priority-based multiple synchronous
      34             :  * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
      35             :  * supported. The number of synchronous standbys that transactions
      36             :  * must wait for replies from is specified in synchronous_standby_names.
      37             :  * This parameter also specifies a list of standby names and the method
      38             :  * (FIRST and ANY) to choose synchronous standbys from the listed ones.
      39             :  *
      40             :  * The method FIRST specifies a priority-based synchronous replication
      41             :  * and makes transaction commits wait until their WAL records are
      42             :  * replicated to the requested number of synchronous standbys chosen based
      43             :  * on their priorities. The standbys whose names appear earlier in the list
      44             :  * are given higher priority and will be considered as synchronous.
      45             :  * Other standby servers appearing later in this list represent potential
      46             :  * synchronous standbys. If any of the current synchronous standbys
      47             :  * disconnects for whatever reason, it will be replaced immediately with
      48             :  * the next-highest-priority standby.
      49             :  *
      50             :  * The method ANY specifies a quorum-based synchronous replication
      51             :  * and makes transaction commits wait until their WAL records are
      52             :  * replicated to at least the requested number of synchronous standbys
      53             :  * in the list. All the standbys appearing in the list are considered as
      54             :  * candidates for quorum synchronous standbys.
      55             :  *
      56             :  * If neither FIRST nor ANY is specified, FIRST is used as the method.
      57             :  * This is for backward compatibility with 9.6 or before where only a
      58             :  * priority-based sync replication was supported.
      59             :  *
      60             :  * Before the standbys chosen from synchronous_standby_names can
      61             :  * become the synchronous standbys they must have caught up with
      62             :  * the primary; that may take some time. Once caught up,
      63             :  * the standbys which are considered as synchronous at that moment
      64             :  * will release waiters from the queue.
      65             :  *
      66             :  * Portions Copyright (c) 2010-2019, PostgreSQL Global Development Group
      67             :  *
      68             :  * IDENTIFICATION
      69             :  *    src/backend/replication/syncrep.c
      70             :  *
      71             :  *-------------------------------------------------------------------------
      72             :  */
      73             : #include "postgres.h"
      74             : 
      75             : #include <unistd.h>
      76             : 
      77             : #include "access/xact.h"
      78             : #include "miscadmin.h"
      79             : #include "pgstat.h"
      80             : #include "replication/syncrep.h"
      81             : #include "replication/walsender.h"
      82             : #include "replication/walsender_private.h"
      83             : #include "storage/pmsignal.h"
      84             : #include "storage/proc.h"
      85             : #include "tcop/tcopprot.h"
      86             : #include "utils/builtins.h"
      87             : #include "utils/ps_status.h"
      88             : 
      89             : /* User-settable parameters for sync rep */
      90             : char       *SyncRepStandbyNames;
      91             : 
      92             : #define SyncStandbysDefined() \
      93             :     (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
      94             : 
      95             : static bool announce_next_takeover = true;
      96             : 
      97             : SyncRepConfigData *SyncRepConfig = NULL;
      98             : static int  SyncRepWaitMode = SYNC_REP_NO_WAIT;
      99             : 
     100             : static void SyncRepQueueInsert(int mode);
     101             : static void SyncRepCancelWait(void);
     102             : static int  SyncRepWakeQueue(bool all, int mode);
     103             : 
     104             : static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
     105             :                                  XLogRecPtr *flushPtr,
     106             :                                  XLogRecPtr *applyPtr,
     107             :                                  bool *am_sync);
     108             : static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
     109             :                                        XLogRecPtr *flushPtr,
     110             :                                        XLogRecPtr *applyPtr,
     111             :                                        List *sync_standbys);
     112             : static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
     113             :                                           XLogRecPtr *flushPtr,
     114             :                                           XLogRecPtr *applyPtr,
     115             :                                           List *sync_standbys, uint8 nth);
     116             : static int  SyncRepGetStandbyPriority(void);
     117             : static List *SyncRepGetSyncStandbysPriority(bool *am_sync);
     118             : static List *SyncRepGetSyncStandbysQuorum(bool *am_sync);
     119             : static int  cmp_lsn(const void *a, const void *b);
     120             : 
     121             : #ifdef USE_ASSERT_CHECKING
     122             : static bool SyncRepQueueIsOrderedByLSN(int mode);
     123             : #endif
     124             : 
     125             : /*
     126             :  * ===========================================================
     127             :  * Synchronous Replication functions for normal user backends
     128             :  * ===========================================================
     129             :  */
     130             : 
     131             : /*
     132             :  * Wait for synchronous replication, if requested by user.
     133             :  *
     134             :  * Initially backends start in state SYNC_REP_NOT_WAITING and then
     135             :  * change that state to SYNC_REP_WAITING before adding ourselves
     136             :  * to the wait queue. During SyncRepWakeQueue() a WALSender changes
     137             :  * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
     138             :  * This backend then resets its state to SYNC_REP_NOT_WAITING.
     139             :  *
     140             :  * 'lsn' represents the LSN to wait for.  'commit' indicates whether this LSN
     141             :  * represents a commit record.  If it doesn't, then we wait only for the WAL
     142             :  * to be flushed if synchronous_commit is set to the higher level of
     143             :  * remote_apply, because only commit records provide apply feedback.
     144             :  */
     145             : void
     146      239704 : SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
     147             : {
     148      239704 :     char       *new_status = NULL;
     149             :     const char *old_status;
     150             :     int         mode;
     151             : 
     152             :     /* Cap the level for anything other than commit to remote flush only. */
     153      239704 :     if (commit)
     154      239618 :         mode = SyncRepWaitMode;
     155             :     else
     156          86 :         mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
     157             : 
     158             :     /*
     159             :      * Fast exit if user has not requested sync replication.
     160             :      */
     161      239704 :     if (!SyncRepRequested())
     162       10414 :         return;
     163             : 
     164             :     Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
     165             :     Assert(WalSndCtl != NULL);
     166             : 
     167      229290 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     168             :     Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
     169             : 
     170             :     /*
     171             :      * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
     172             :      * set.  See SyncRepUpdateSyncStandbysDefined.
     173             :      *
     174             :      * Also check that the standby hasn't already replied. Unlikely race
     175             :      * condition but we'll be fetching that cache line anyway so it's likely
     176             :      * to be a low cost check.
     177             :      */
     178      229378 :     if (!WalSndCtl->sync_standbys_defined ||
     179          88 :         lsn <= WalSndCtl->lsn[mode])
     180             :     {
     181      229206 :         LWLockRelease(SyncRepLock);
     182      229206 :         return;
     183             :     }
     184             : 
     185             :     /*
     186             :      * Set our waitLSN so WALSender will know when to wake us, and add
     187             :      * ourselves to the queue.
     188             :      */
     189          84 :     MyProc->waitLSN = lsn;
     190          84 :     MyProc->syncRepState = SYNC_REP_WAITING;
     191          84 :     SyncRepQueueInsert(mode);
     192             :     Assert(SyncRepQueueIsOrderedByLSN(mode));
     193          84 :     LWLockRelease(SyncRepLock);
     194             : 
     195             :     /* Alter ps display to show waiting for sync rep. */
     196          84 :     if (update_process_title)
     197             :     {
     198             :         int         len;
     199             : 
     200          84 :         old_status = get_ps_display(&len);
     201          84 :         new_status = (char *) palloc(len + 32 + 1);
     202          84 :         memcpy(new_status, old_status, len);
     203         168 :         sprintf(new_status + len, " waiting for %X/%X",
     204          84 :                 (uint32) (lsn >> 32), (uint32) lsn);
     205          84 :         set_ps_display(new_status, false);
     206          84 :         new_status[len] = '\0'; /* truncate off " waiting ..." */
     207             :     }
     208             : 
     209             :     /*
     210             :      * Wait for specified LSN to be confirmed.
     211             :      *
     212             :      * Each proc has its own wait latch, so we perform a normal latch
     213             :      * check/wait loop here.
     214             :      */
     215             :     for (;;)
     216          84 :     {
     217             :         int         rc;
     218             : 
     219             :         /* Must reset the latch before testing state. */
     220         168 :         ResetLatch(MyLatch);
     221             : 
     222             :         /*
     223             :          * Acquiring the lock is not needed, the latch ensures proper
     224             :          * barriers. If it looks like we're done, we must really be done,
     225             :          * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
     226             :          * it will never update it again, so we can't be seeing a stale value
     227             :          * in that case.
     228             :          */
     229         168 :         if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
     230          84 :             break;
     231             : 
     232             :         /*
     233             :          * If a wait for synchronous replication is pending, we can neither
     234             :          * acknowledge the commit nor raise ERROR or FATAL.  The latter would
     235             :          * lead the client to believe that the transaction aborted, which is
     236             :          * not true: it's already committed locally. The former is no good
     237             :          * either: the client has requested synchronous replication, and is
     238             :          * entitled to assume that an acknowledged commit is also replicated,
     239             :          * which might not be true. So in this case we issue a WARNING (which
     240             :          * some clients may be able to interpret) and shut off further output.
     241             :          * We do NOT reset ProcDiePending, so that the process will die after
     242             :          * the commit is cleaned up.
     243             :          */
     244          84 :         if (ProcDiePending)
     245             :         {
     246           0 :             ereport(WARNING,
     247             :                     (errcode(ERRCODE_ADMIN_SHUTDOWN),
     248             :                      errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
     249             :                      errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
     250           0 :             whereToSendOutput = DestNone;
     251           0 :             SyncRepCancelWait();
     252           0 :             break;
     253             :         }
     254             : 
     255             :         /*
     256             :          * It's unclear what to do if a query cancel interrupt arrives.  We
     257             :          * can't actually abort at this point, but ignoring the interrupt
     258             :          * altogether is not helpful, so we just terminate the wait with a
     259             :          * suitable warning.
     260             :          */
     261          84 :         if (QueryCancelPending)
     262             :         {
     263           0 :             QueryCancelPending = false;
     264           0 :             ereport(WARNING,
     265             :                     (errmsg("canceling wait for synchronous replication due to user request"),
     266             :                      errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
     267           0 :             SyncRepCancelWait();
     268           0 :             break;
     269             :         }
     270             : 
     271             :         /*
     272             :          * Wait on latch.  Any condition that should wake us up will set the
     273             :          * latch, so no need for timeout.
     274             :          */
     275          84 :         rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
     276             :                        WAIT_EVENT_SYNC_REP);
     277             : 
     278             :         /*
     279             :          * If the postmaster dies, we'll probably never get an acknowledgment,
     280             :          * because all the wal sender processes will exit. So just bail out.
     281             :          */
     282          84 :         if (rc & WL_POSTMASTER_DEATH)
     283             :         {
     284           0 :             ProcDiePending = true;
     285           0 :             whereToSendOutput = DestNone;
     286           0 :             SyncRepCancelWait();
     287           0 :             break;
     288             :         }
     289             :     }
     290             : 
     291             :     /*
     292             :      * WalSender has checked our LSN and has removed us from queue. Clean up
     293             :      * state and leave.  It's OK to reset these shared memory fields without
     294             :      * holding SyncRepLock, because any walsenders will ignore us anyway when
     295             :      * we're not on the queue.  We need a read barrier to make sure we see the
     296             :      * changes to the queue link (this might be unnecessary without
     297             :      * assertions, but better safe than sorry).
     298             :      */
     299          84 :     pg_read_barrier();
     300             :     Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
     301          84 :     MyProc->syncRepState = SYNC_REP_NOT_WAITING;
     302          84 :     MyProc->waitLSN = 0;
     303             : 
     304          84 :     if (new_status)
     305             :     {
     306             :         /* Reset ps display */
     307          84 :         set_ps_display(new_status, false);
     308          84 :         pfree(new_status);
     309             :     }
     310             : }
     311             : 
     312             : /*
     313             :  * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
     314             :  *
     315             :  * Usually we will go at tail of queue, though it's possible that we arrive
     316             :  * here out of order, so start at tail and work back to insertion point.
     317             :  */
     318             : static void
     319          84 : SyncRepQueueInsert(int mode)
     320             : {
     321             :     PGPROC     *proc;
     322             : 
     323             :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
     324          84 :     proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
     325          84 :                                    &(WalSndCtl->SyncRepQueue[mode]),
     326             :                                    offsetof(PGPROC, syncRepLinks));
     327             : 
     328         168 :     while (proc)
     329             :     {
     330             :         /*
     331             :          * Stop at the queue element that we should after to ensure the queue
     332             :          * is ordered by LSN.
     333             :          */
     334           0 :         if (proc->waitLSN < MyProc->waitLSN)
     335           0 :             break;
     336             : 
     337           0 :         proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
     338           0 :                                        &(proc->syncRepLinks),
     339             :                                        offsetof(PGPROC, syncRepLinks));
     340             :     }
     341             : 
     342          84 :     if (proc)
     343           0 :         SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
     344             :     else
     345          84 :         SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
     346          84 : }
     347             : 
     348             : /*
     349             :  * Acquire SyncRepLock and cancel any wait currently in progress.
     350             :  */
     351             : static void
     352           0 : SyncRepCancelWait(void)
     353             : {
     354           0 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     355           0 :     if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
     356           0 :         SHMQueueDelete(&(MyProc->syncRepLinks));
     357           0 :     MyProc->syncRepState = SYNC_REP_NOT_WAITING;
     358           0 :     LWLockRelease(SyncRepLock);
     359           0 : }
     360             : 
     361             : void
     362        9880 : SyncRepCleanupAtProcExit(void)
     363             : {
     364        9880 :     if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
     365             :     {
     366           0 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     367           0 :         SHMQueueDelete(&(MyProc->syncRepLinks));
     368           0 :         LWLockRelease(SyncRepLock);
     369             :     }
     370        9880 : }
     371             : 
     372             : /*
     373             :  * ===========================================================
     374             :  * Synchronous Replication functions for wal sender processes
     375             :  * ===========================================================
     376             :  */
     377             : 
     378             : /*
     379             :  * Take any action required to initialise sync rep state from config
     380             :  * data. Called at WALSender startup and after each SIGHUP.
     381             :  */
     382             : void
     383         244 : SyncRepInitConfig(void)
     384             : {
     385             :     int         priority;
     386             : 
     387             :     /*
     388             :      * Determine if we are a potential sync standby and remember the result
     389             :      * for handling replies from standby.
     390             :      */
     391         244 :     priority = SyncRepGetStandbyPriority();
     392         244 :     if (MyWalSnd->sync_standby_priority != priority)
     393             :     {
     394          30 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     395          30 :         MyWalSnd->sync_standby_priority = priority;
     396          30 :         LWLockRelease(SyncRepLock);
     397          30 :         ereport(DEBUG1,
     398             :                 (errmsg("standby \"%s\" now has synchronous standby priority %u",
     399             :                         application_name, priority)));
     400             :     }
     401         244 : }
     402             : 
     403             : /*
     404             :  * Update the LSNs on each queue based upon our latest state. This
     405             :  * implements a simple policy of first-valid-sync-standby-releases-waiter.
     406             :  *
     407             :  * Other policies are possible, which would change what we do here and
     408             :  * perhaps also which information we store as well.
     409             :  */
     410             : void
     411        1256 : SyncRepReleaseWaiters(void)
     412             : {
     413        1256 :     volatile WalSndCtlData *walsndctl = WalSndCtl;
     414             :     XLogRecPtr  writePtr;
     415             :     XLogRecPtr  flushPtr;
     416             :     XLogRecPtr  applyPtr;
     417             :     bool        got_recptr;
     418             :     bool        am_sync;
     419        1256 :     int         numwrite = 0;
     420        1256 :     int         numflush = 0;
     421        1256 :     int         numapply = 0;
     422             : 
     423             :     /*
     424             :      * If this WALSender is serving a standby that is not on the list of
     425             :      * potential sync standbys then we have nothing to do. If we are still
     426             :      * starting up, still running base backup or the current flush position is
     427             :      * still invalid, then leave quickly also.  Streaming or stopping WAL
     428             :      * senders are allowed to release waiters.
     429             :      */
     430        1454 :     if (MyWalSnd->sync_standby_priority == 0 ||
     431         242 :         (MyWalSnd->state != WALSNDSTATE_STREAMING &&
     432         242 :          MyWalSnd->state != WALSNDSTATE_STOPPING) ||
     433         198 :         XLogRecPtrIsInvalid(MyWalSnd->flush))
     434             :     {
     435        1058 :         announce_next_takeover = true;
     436        2118 :         return;
     437             :     }
     438             : 
     439             :     /*
     440             :      * We're a potential sync standby. Release waiters if there are enough
     441             :      * sync standbys and we are considered as sync.
     442             :      */
     443         198 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     444             : 
     445             :     /*
     446             :      * Check whether we are a sync standby or not, and calculate the synced
     447             :      * positions among all sync standbys.
     448             :      */
     449         198 :     got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
     450             : 
     451             :     /*
     452             :      * If we are managing a sync standby, though we weren't prior to this,
     453             :      * then announce we are now a sync standby.
     454             :      */
     455         198 :     if (announce_next_takeover && am_sync)
     456             :     {
     457          16 :         announce_next_takeover = false;
     458             : 
     459          16 :         if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
     460          16 :             ereport(LOG,
     461             :                     (errmsg("standby \"%s\" is now a synchronous standby with priority %u",
     462             :                             application_name, MyWalSnd->sync_standby_priority)));
     463             :         else
     464           0 :             ereport(LOG,
     465             :                     (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
     466             :                             application_name)));
     467             :     }
     468             : 
     469             :     /*
     470             :      * If the number of sync standbys is less than requested or we aren't
     471             :      * managing a sync standby then just leave.
     472             :      */
     473         198 :     if (!got_recptr || !am_sync)
     474             :     {
     475           2 :         LWLockRelease(SyncRepLock);
     476           2 :         announce_next_takeover = !am_sync;
     477           2 :         return;
     478             :     }
     479             : 
     480             :     /*
     481             :      * Set the lsn first so that when we wake backends they will release up to
     482             :      * this location.
     483             :      */
     484         196 :     if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
     485             :     {
     486          72 :         walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
     487          72 :         numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
     488             :     }
     489         196 :     if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
     490             :     {
     491          78 :         walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
     492          78 :         numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
     493             :     }
     494         196 :     if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
     495             :     {
     496          78 :         walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
     497          78 :         numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
     498             :     }
     499             : 
     500         196 :     LWLockRelease(SyncRepLock);
     501             : 
     502         196 :     elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
     503             :          numwrite, (uint32) (writePtr >> 32), (uint32) writePtr,
     504             :          numflush, (uint32) (flushPtr >> 32), (uint32) flushPtr,
     505             :          numapply, (uint32) (applyPtr >> 32), (uint32) applyPtr);
     506             : }
     507             : 
     508             : /*
     509             :  * Calculate the synced Write, Flush and Apply positions among sync standbys.
     510             :  *
     511             :  * Return false if the number of sync standbys is less than
     512             :  * synchronous_standby_names specifies. Otherwise return true and
     513             :  * store the positions into *writePtr, *flushPtr and *applyPtr.
     514             :  *
     515             :  * On return, *am_sync is set to true if this walsender is connecting to
     516             :  * sync standby. Otherwise it's set to false.
     517             :  */
     518             : static bool
     519         198 : SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
     520             :                      XLogRecPtr *applyPtr, bool *am_sync)
     521             : {
     522             :     List       *sync_standbys;
     523             : 
     524         198 :     *writePtr = InvalidXLogRecPtr;
     525         198 :     *flushPtr = InvalidXLogRecPtr;
     526         198 :     *applyPtr = InvalidXLogRecPtr;
     527         198 :     *am_sync = false;
     528             : 
     529             :     /* Get standbys that are considered as synchronous at this moment */
     530         198 :     sync_standbys = SyncRepGetSyncStandbys(am_sync);
     531             : 
     532             :     /*
     533             :      * Quick exit if we are not managing a sync standby or there are not
     534             :      * enough synchronous standbys.
     535             :      */
     536         394 :     if (!(*am_sync) ||
     537         392 :         SyncRepConfig == NULL ||
     538         196 :         list_length(sync_standbys) < SyncRepConfig->num_sync)
     539             :     {
     540           2 :         list_free(sync_standbys);
     541           2 :         return false;
     542             :     }
     543             : 
     544             :     /*
     545             :      * In a priority-based sync replication, the synced positions are the
     546             :      * oldest ones among sync standbys. In a quorum-based, they are the Nth
     547             :      * latest ones.
     548             :      *
     549             :      * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
     550             :      * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
     551             :      * because it's a bit more efficient.
     552             :      *
     553             :      * XXX If the numbers of current and requested sync standbys are the same,
     554             :      * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
     555             :      * positions even in a quorum-based sync replication.
     556             :      */
     557         196 :     if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
     558             :     {
     559         196 :         SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
     560             :                                    sync_standbys);
     561             :     }
     562             :     else
     563             :     {
     564           0 :         SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
     565           0 :                                       sync_standbys, SyncRepConfig->num_sync);
     566             :     }
     567             : 
     568         196 :     list_free(sync_standbys);
     569         196 :     return true;
     570             : }
     571             : 
     572             : /*
     573             :  * Calculate the oldest Write, Flush and Apply positions among sync standbys.
     574             :  */
     575             : static void
     576         196 : SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
     577             :                            XLogRecPtr *applyPtr, List *sync_standbys)
     578             : {
     579             :     ListCell   *cell;
     580             : 
     581             :     /*
     582             :      * Scan through all sync standbys and calculate the oldest Write, Flush
     583             :      * and Apply positions.
     584             :      */
     585         392 :     foreach(cell, sync_standbys)
     586             :     {
     587         196 :         WalSnd     *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
     588             :         XLogRecPtr  write;
     589             :         XLogRecPtr  flush;
     590             :         XLogRecPtr  apply;
     591             : 
     592         196 :         SpinLockAcquire(&walsnd->mutex);
     593         196 :         write = walsnd->write;
     594         196 :         flush = walsnd->flush;
     595         196 :         apply = walsnd->apply;
     596         196 :         SpinLockRelease(&walsnd->mutex);
     597             : 
     598         196 :         if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
     599         196 :             *writePtr = write;
     600         196 :         if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
     601         196 :             *flushPtr = flush;
     602         196 :         if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
     603         196 :             *applyPtr = apply;
     604             :     }
     605         196 : }
     606             : 
     607             : /*
     608             :  * Calculate the Nth latest Write, Flush and Apply positions among sync
     609             :  * standbys.
     610             :  */
     611             : static void
     612           0 : SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
     613             :                               XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth)
     614             : {
     615             :     ListCell   *cell;
     616             :     XLogRecPtr *write_array;
     617             :     XLogRecPtr *flush_array;
     618             :     XLogRecPtr *apply_array;
     619             :     int         len;
     620           0 :     int         i = 0;
     621             : 
     622           0 :     len = list_length(sync_standbys);
     623           0 :     write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
     624           0 :     flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
     625           0 :     apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
     626             : 
     627           0 :     foreach(cell, sync_standbys)
     628             :     {
     629           0 :         WalSnd     *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
     630             : 
     631           0 :         SpinLockAcquire(&walsnd->mutex);
     632           0 :         write_array[i] = walsnd->write;
     633           0 :         flush_array[i] = walsnd->flush;
     634           0 :         apply_array[i] = walsnd->apply;
     635           0 :         SpinLockRelease(&walsnd->mutex);
     636             : 
     637           0 :         i++;
     638             :     }
     639             : 
     640             :     /* Sort each array in descending order */
     641           0 :     qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn);
     642           0 :     qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn);
     643           0 :     qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn);
     644             : 
     645             :     /* Get Nth latest Write, Flush, Apply positions */
     646           0 :     *writePtr = write_array[nth - 1];
     647           0 :     *flushPtr = flush_array[nth - 1];
     648           0 :     *applyPtr = apply_array[nth - 1];
     649             : 
     650           0 :     pfree(write_array);
     651           0 :     pfree(flush_array);
     652           0 :     pfree(apply_array);
     653           0 : }
     654             : 
     655             : /*
     656             :  * Compare lsn in order to sort array in descending order.
     657             :  */
     658             : static int
     659           0 : cmp_lsn(const void *a, const void *b)
     660             : {
     661           0 :     XLogRecPtr  lsn1 = *((const XLogRecPtr *) a);
     662           0 :     XLogRecPtr  lsn2 = *((const XLogRecPtr *) b);
     663             : 
     664           0 :     if (lsn1 > lsn2)
     665           0 :         return -1;
     666           0 :     else if (lsn1 == lsn2)
     667           0 :         return 0;
     668             :     else
     669           0 :         return 1;
     670             : }
     671             : 
     672             : /*
     673             :  * Return the list of sync standbys, or NIL if no sync standby is connected.
     674             :  *
     675             :  * The caller must hold SyncRepLock.
     676             :  *
     677             :  * On return, *am_sync is set to true if this walsender is connecting to
     678             :  * sync standby. Otherwise it's set to false.
     679             :  */
     680             : List *
     681         430 : SyncRepGetSyncStandbys(bool *am_sync)
     682             : {
     683             :     /* Set default result */
     684         430 :     if (am_sync != NULL)
     685         198 :         *am_sync = false;
     686             : 
     687             :     /* Quick exit if sync replication is not requested */
     688         430 :     if (SyncRepConfig == NULL)
     689         198 :         return NIL;
     690             : 
     691         232 :     return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ?
     692         232 :         SyncRepGetSyncStandbysPriority(am_sync) :
     693             :         SyncRepGetSyncStandbysQuorum(am_sync);
     694             : }
     695             : 
     696             : /*
     697             :  * Return the list of all the candidates for quorum sync standbys,
     698             :  * or NIL if no such standby is connected.
     699             :  *
     700             :  * The caller must hold SyncRepLock. This function must be called only in
     701             :  * a quorum-based sync replication.
     702             :  *
     703             :  * On return, *am_sync is set to true if this walsender is connecting to
     704             :  * sync standby. Otherwise it's set to false.
     705             :  */
     706             : static List *
     707           4 : SyncRepGetSyncStandbysQuorum(bool *am_sync)
     708             : {
     709           4 :     List       *result = NIL;
     710             :     int         i;
     711             :     volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
     712             :                                  * rearrangement */
     713             : 
     714             :     Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM);
     715             : 
     716          24 :     for (i = 0; i < max_wal_senders; i++)
     717             :     {
     718             :         XLogRecPtr  flush;
     719             :         WalSndState state;
     720             :         int         pid;
     721             : 
     722          20 :         walsnd = &WalSndCtl->walsnds[i];
     723             : 
     724          20 :         SpinLockAcquire(&walsnd->mutex);
     725          20 :         pid = walsnd->pid;
     726          20 :         flush = walsnd->flush;
     727          20 :         state = walsnd->state;
     728          20 :         SpinLockRelease(&walsnd->mutex);
     729             : 
     730             :         /* Must be active */
     731          20 :         if (pid == 0)
     732           6 :             continue;
     733             : 
     734             :         /* Must be streaming or stopping */
     735          14 :         if (state != WALSNDSTATE_STREAMING &&
     736             :             state != WALSNDSTATE_STOPPING)
     737           0 :             continue;
     738             : 
     739             :         /* Must be synchronous */
     740          14 :         if (walsnd->sync_standby_priority == 0)
     741           2 :             continue;
     742             : 
     743             :         /* Must have a valid flush position */
     744          12 :         if (XLogRecPtrIsInvalid(flush))
     745           2 :             continue;
     746             : 
     747             :         /*
     748             :          * Consider this standby as a candidate for quorum sync standbys and
     749             :          * append it to the result.
     750             :          */
     751          10 :         result = lappend_int(result, i);
     752          10 :         if (am_sync != NULL && walsnd == MyWalSnd)
     753           0 :             *am_sync = true;
     754             :     }
     755             : 
     756           4 :     return result;
     757             : }
     758             : 
     759             : /*
     760             :  * Return the list of sync standbys chosen based on their priorities,
     761             :  * or NIL if no sync standby is connected.
     762             :  *
     763             :  * If there are multiple standbys with the same priority,
     764             :  * the first one found is selected preferentially.
     765             :  *
     766             :  * The caller must hold SyncRepLock. This function must be called only in
     767             :  * a priority-based sync replication.
     768             :  *
     769             :  * On return, *am_sync is set to true if this walsender is connecting to
     770             :  * sync standby. Otherwise it's set to false.
     771             :  */
     772             : static List *
     773         228 : SyncRepGetSyncStandbysPriority(bool *am_sync)
     774             : {
     775         228 :     List       *result = NIL;
     776         228 :     List       *pending = NIL;
     777             :     int         lowest_priority;
     778             :     int         next_highest_priority;
     779             :     int         this_priority;
     780             :     int         priority;
     781             :     int         i;
     782         228 :     bool        am_in_pending = false;
     783             :     volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
     784             :                                  * rearrangement */
     785             : 
     786             :     Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY);
     787             : 
     788         228 :     lowest_priority = SyncRepConfig->nmembers;
     789         228 :     next_highest_priority = lowest_priority + 1;
     790             : 
     791             :     /*
     792             :      * Find the sync standbys which have the highest priority (i.e, 1). Also
     793             :      * store all the other potential sync standbys into the pending list, in
     794             :      * order to scan it later and find other sync standbys from it quickly.
     795             :      */
     796         314 :     for (i = 0; i < max_wal_senders; i++)
     797             :     {
     798             :         XLogRecPtr  flush;
     799             :         WalSndState state;
     800             :         int         pid;
     801             : 
     802         298 :         walsnd = &WalSndCtl->walsnds[i];
     803             : 
     804         298 :         SpinLockAcquire(&walsnd->mutex);
     805         298 :         pid = walsnd->pid;
     806         298 :         flush = walsnd->flush;
     807         298 :         state = walsnd->state;
     808         298 :         SpinLockRelease(&walsnd->mutex);
     809             : 
     810             :         /* Must be active */
     811         298 :         if (pid == 0)
     812          32 :             continue;
     813             : 
     814             :         /* Must be streaming or stopping */
     815         266 :         if (state != WALSNDSTATE_STREAMING &&
     816             :             state != WALSNDSTATE_STOPPING)
     817           0 :             continue;
     818             : 
     819             :         /* Must be synchronous */
     820         266 :         this_priority = walsnd->sync_standby_priority;
     821         266 :         if (this_priority == 0)
     822           8 :             continue;
     823             : 
     824             :         /* Must have a valid flush position */
     825         258 :         if (XLogRecPtrIsInvalid(flush))
     826           6 :             continue;
     827             : 
     828             :         /*
     829             :          * If the priority is equal to 1, consider this standby as sync and
     830             :          * append it to the result. Otherwise append this standby to the
     831             :          * pending list to check if it's actually sync or not later.
     832             :          */
     833         252 :         if (this_priority == 1)
     834             :         {
     835         224 :             result = lappend_int(result, i);
     836         224 :             if (am_sync != NULL && walsnd == MyWalSnd)
     837         196 :                 *am_sync = true;
     838         224 :             if (list_length(result) == SyncRepConfig->num_sync)
     839             :             {
     840         212 :                 list_free(pending);
     841         212 :                 return result;  /* Exit if got enough sync standbys */
     842             :             }
     843             :         }
     844             :         else
     845             :         {
     846          28 :             pending = lappend_int(pending, i);
     847          28 :             if (am_sync != NULL && walsnd == MyWalSnd)
     848           0 :                 am_in_pending = true;
     849             : 
     850             :             /*
     851             :              * Track the highest priority among the standbys in the pending
     852             :              * list, in order to use it as the starting priority for later
     853             :              * scan of the list. This is useful to find quickly the sync
     854             :              * standbys from the pending list later because we can skip
     855             :              * unnecessary scans for the unused priorities.
     856             :              */
     857          28 :             if (this_priority < next_highest_priority)
     858          18 :                 next_highest_priority = this_priority;
     859             :         }
     860             :     }
     861             : 
     862             :     /*
     863             :      * Consider all pending standbys as sync if the number of them plus
     864             :      * already-found sync ones is lower than the configuration requests.
     865             :      */
     866          16 :     if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync)
     867             :     {
     868             :         /*
     869             :          * Set *am_sync to true if this walsender is in the pending list
     870             :          * because all pending standbys are considered as sync.
     871             :          */
     872          12 :         if (am_sync != NULL && !(*am_sync))
     873           0 :             *am_sync = am_in_pending;
     874             : 
     875          12 :         result = list_concat(result, pending);
     876          12 :         list_free(pending);
     877          12 :         return result;
     878             :     }
     879             : 
     880             :     /*
     881             :      * Find the sync standbys from the pending list.
     882             :      */
     883           4 :     priority = next_highest_priority;
     884           8 :     while (priority <= lowest_priority)
     885             :     {
     886             :         ListCell   *cell;
     887             : 
     888           4 :         next_highest_priority = lowest_priority + 1;
     889             : 
     890           4 :         foreach(cell, pending)
     891             :         {
     892           4 :             i = lfirst_int(cell);
     893           4 :             walsnd = &WalSndCtl->walsnds[i];
     894             : 
     895           4 :             this_priority = walsnd->sync_standby_priority;
     896           4 :             if (this_priority == priority)
     897             :             {
     898           4 :                 result = lappend_int(result, i);
     899           4 :                 if (am_sync != NULL && walsnd == MyWalSnd)
     900           0 :                     *am_sync = true;
     901             : 
     902             :                 /*
     903             :                  * We should always exit here after the scan of pending list
     904             :                  * starts because we know that the list has enough elements to
     905             :                  * reach SyncRepConfig->num_sync.
     906             :                  */
     907           4 :                 if (list_length(result) == SyncRepConfig->num_sync)
     908             :                 {
     909           4 :                     list_free(pending);
     910           4 :                     return result;  /* Exit if got enough sync standbys */
     911             :                 }
     912             : 
     913             :                 /*
     914             :                  * Remove the entry for this sync standby from the list to
     915             :                  * prevent us from looking at the same entry again.
     916             :                  */
     917           0 :                 pending = foreach_delete_current(pending, cell);
     918             : 
     919           0 :                 continue;       /* don't adjust next_highest_priority */
     920             :             }
     921             : 
     922           0 :             if (this_priority < next_highest_priority)
     923           0 :                 next_highest_priority = this_priority;
     924             :         }
     925             : 
     926           0 :         priority = next_highest_priority;
     927             :     }
     928             : 
     929             :     /* never reached, but keep compiler quiet */
     930             :     Assert(false);
     931           0 :     return result;
     932             : }
     933             : 
     934             : /*
     935             :  * Check if we are in the list of sync standbys, and if so, determine
     936             :  * priority sequence. Return priority if set, or zero to indicate that
     937             :  * we are not a potential sync standby.
     938             :  *
     939             :  * Compare the parameter SyncRepStandbyNames against the application_name
     940             :  * for this WALSender, or allow any name if we find a wildcard "*".
     941             :  */
     942             : static int
     943         244 : SyncRepGetStandbyPriority(void)
     944             : {
     945             :     const char *standby_name;
     946             :     int         priority;
     947         244 :     bool        found = false;
     948             : 
     949             :     /*
     950             :      * Since synchronous cascade replication is not allowed, we always set the
     951             :      * priority of cascading walsender to zero.
     952             :      */
     953         244 :     if (am_cascading_walsender)
     954           6 :         return 0;
     955             : 
     956         238 :     if (!SyncStandbysDefined() || SyncRepConfig == NULL)
     957         194 :         return 0;
     958             : 
     959          44 :     standby_name = SyncRepConfig->member_names;
     960          60 :     for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
     961             :     {
     962          94 :         if (pg_strcasecmp(standby_name, application_name) == 0 ||
     963          36 :             strcmp(standby_name, "*") == 0)
     964             :         {
     965          42 :             found = true;
     966          42 :             break;
     967             :         }
     968          16 :         standby_name += strlen(standby_name) + 1;
     969             :     }
     970             : 
     971          44 :     if (!found)
     972           2 :         return 0;
     973             : 
     974             :     /*
     975             :      * In quorum-based sync replication, all the standbys in the list have the
     976             :      * same priority, one.
     977             :      */
     978          42 :     return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
     979             : }
     980             : 
     981             : /*
     982             :  * Walk the specified queue from head.  Set the state of any backends that
     983             :  * need to be woken, remove them from the queue, and then wake them.
     984             :  * Pass all = true to wake whole queue; otherwise, just wake up to
     985             :  * the walsender's LSN.
     986             :  *
     987             :  * Must hold SyncRepLock.
     988             :  */
     989             : static int
     990         228 : SyncRepWakeQueue(bool all, int mode)
     991             : {
     992         228 :     volatile WalSndCtlData *walsndctl = WalSndCtl;
     993         228 :     PGPROC     *proc = NULL;
     994         228 :     PGPROC     *thisproc = NULL;
     995         228 :     int         numprocs = 0;
     996             : 
     997             :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
     998             :     Assert(SyncRepQueueIsOrderedByLSN(mode));
     999             : 
    1000         228 :     proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
    1001         228 :                                    &(WalSndCtl->SyncRepQueue[mode]),
    1002             :                                    offsetof(PGPROC, syncRepLinks));
    1003             : 
    1004         496 :     while (proc)
    1005             :     {
    1006             :         /*
    1007             :          * Assume the queue is ordered by LSN
    1008             :          */
    1009          50 :         if (!all && walsndctl->lsn[mode] < proc->waitLSN)
    1010          10 :             return numprocs;
    1011             : 
    1012             :         /*
    1013             :          * Move to next proc, so we can delete thisproc from the queue.
    1014             :          * thisproc is valid, proc may be NULL after this.
    1015             :          */
    1016          40 :         thisproc = proc;
    1017          40 :         proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
    1018          40 :                                        &(proc->syncRepLinks),
    1019             :                                        offsetof(PGPROC, syncRepLinks));
    1020             : 
    1021             :         /*
    1022             :          * Remove thisproc from queue.
    1023             :          */
    1024          40 :         SHMQueueDelete(&(thisproc->syncRepLinks));
    1025             : 
    1026             :         /*
    1027             :          * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
    1028             :          * make sure that it sees the queue link being removed before the
    1029             :          * syncRepState change.
    1030             :          */
    1031          40 :         pg_write_barrier();
    1032             : 
    1033             :         /*
    1034             :          * Set state to complete; see SyncRepWaitForLSN() for discussion of
    1035             :          * the various states.
    1036             :          */
    1037          40 :         thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;
    1038             : 
    1039             :         /*
    1040             :          * Wake only when we have set state and removed from queue.
    1041             :          */
    1042          40 :         SetLatch(&(thisproc->procLatch));
    1043             : 
    1044          40 :         numprocs++;
    1045             :     }
    1046             : 
    1047         218 :     return numprocs;
    1048             : }
    1049             : 
    1050             : /*
    1051             :  * The checkpointer calls this as needed to update the shared
    1052             :  * sync_standbys_defined flag, so that backends don't remain permanently wedged
    1053             :  * if synchronous_standby_names is unset.  It's safe to check the current value
    1054             :  * without the lock, because it's only ever updated by one process.  But we
    1055             :  * must take the lock to change it.
    1056             :  */
    1057             : void
    1058         390 : SyncRepUpdateSyncStandbysDefined(void)
    1059             : {
    1060         390 :     bool        sync_standbys_defined = SyncStandbysDefined();
    1061             : 
    1062         390 :     if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
    1063             :     {
    1064          16 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
    1065             : 
    1066             :         /*
    1067             :          * If synchronous_standby_names has been reset to empty, it's futile
    1068             :          * for backends to continue to waiting.  Since the user no longer
    1069             :          * wants synchronous replication, we'd better wake them up.
    1070             :          */
    1071          16 :         if (!sync_standbys_defined)
    1072             :         {
    1073             :             int         i;
    1074             : 
    1075           0 :             for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
    1076           0 :                 SyncRepWakeQueue(true, i);
    1077             :         }
    1078             : 
    1079             :         /*
    1080             :          * Only allow people to join the queue when there are synchronous
    1081             :          * standbys defined.  Without this interlock, there's a race
    1082             :          * condition: we might wake up all the current waiters; then, some
    1083             :          * backend that hasn't yet reloaded its config might go to sleep on
    1084             :          * the queue (and never wake up).  This prevents that.
    1085             :          */
    1086          16 :         WalSndCtl->sync_standbys_defined = sync_standbys_defined;
    1087             : 
    1088          16 :         LWLockRelease(SyncRepLock);
    1089             :     }
    1090         390 : }
    1091             : 
    1092             : #ifdef USE_ASSERT_CHECKING
    1093             : static bool
    1094             : SyncRepQueueIsOrderedByLSN(int mode)
    1095             : {
    1096             :     PGPROC     *proc = NULL;
    1097             :     XLogRecPtr  lastLSN;
    1098             : 
    1099             :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
    1100             : 
    1101             :     lastLSN = 0;
    1102             : 
    1103             :     proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
    1104             :                                    &(WalSndCtl->SyncRepQueue[mode]),
    1105             :                                    offsetof(PGPROC, syncRepLinks));
    1106             : 
    1107             :     while (proc)
    1108             :     {
    1109             :         /*
    1110             :          * Check the queue is ordered by LSN and that multiple procs don't
    1111             :          * have matching LSNs
    1112             :          */
    1113             :         if (proc->waitLSN <= lastLSN)
    1114             :             return false;
    1115             : 
    1116             :         lastLSN = proc->waitLSN;
    1117             : 
    1118             :         proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
    1119             :                                        &(proc->syncRepLinks),
    1120             :                                        offsetof(PGPROC, syncRepLinks));
    1121             :     }
    1122             : 
    1123             :     return true;
    1124             : }
    1125             : #endif
    1126             : 
    1127             : /*
    1128             :  * ===========================================================
    1129             :  * Synchronous Replication functions executed by any process
    1130             :  * ===========================================================
    1131             :  */
    1132             : 
    1133             : bool
    1134        2012 : check_synchronous_standby_names(char **newval, void **extra, GucSource source)
    1135             : {
    1136        2012 :     if (*newval != NULL && (*newval)[0] != '\0')
    1137         150 :     {
    1138             :         int         parse_rc;
    1139             :         SyncRepConfigData *pconf;
    1140             : 
    1141             :         /* Reset communication variables to ensure a fresh start */
    1142         150 :         syncrep_parse_result = NULL;
    1143         150 :         syncrep_parse_error_msg = NULL;
    1144             : 
    1145             :         /* Parse the synchronous_standby_names string */
    1146         150 :         syncrep_scanner_init(*newval);
    1147         150 :         parse_rc = syncrep_yyparse();
    1148         150 :         syncrep_scanner_finish();
    1149             : 
    1150         150 :         if (parse_rc != 0 || syncrep_parse_result == NULL)
    1151             :         {
    1152           0 :             GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
    1153           0 :             if (syncrep_parse_error_msg)
    1154           0 :                 GUC_check_errdetail("%s", syncrep_parse_error_msg);
    1155             :             else
    1156           0 :                 GUC_check_errdetail("synchronous_standby_names parser failed");
    1157           0 :             return false;
    1158             :         }
    1159             : 
    1160         150 :         if (syncrep_parse_result->num_sync <= 0)
    1161             :         {
    1162           0 :             GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
    1163           0 :                              syncrep_parse_result->num_sync);
    1164           0 :             return false;
    1165             :         }
    1166             : 
    1167             :         /* GUC extra value must be malloc'd, not palloc'd */
    1168         150 :         pconf = (SyncRepConfigData *)
    1169         150 :             malloc(syncrep_parse_result->config_size);
    1170         150 :         if (pconf == NULL)
    1171           0 :             return false;
    1172         150 :         memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);
    1173             : 
    1174         150 :         *extra = (void *) pconf;
    1175             : 
    1176             :         /*
    1177             :          * We need not explicitly clean up syncrep_parse_result.  It, and any
    1178             :          * other cruft generated during parsing, will be freed when the
    1179             :          * current memory context is deleted.  (This code is generally run in
    1180             :          * a short-lived context used for config file processing, so that will
    1181             :          * not be very long.)
    1182             :          */
    1183             :     }
    1184             :     else
    1185        1862 :         *extra = NULL;
    1186             : 
    1187        2012 :     return true;
    1188             : }
    1189             : 
    1190             : void
    1191        1994 : assign_synchronous_standby_names(const char *newval, void *extra)
    1192             : {
    1193        1994 :     SyncRepConfig = (SyncRepConfigData *) extra;
    1194        1994 : }
    1195             : 
    1196             : void
    1197        2240 : assign_synchronous_commit(int newval, void *extra)
    1198             : {
    1199        2240 :     switch (newval)
    1200             :     {
    1201             :         case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
    1202           0 :             SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
    1203           0 :             break;
    1204             :         case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
    1205        2032 :             SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
    1206        2032 :             break;
    1207             :         case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
    1208           0 :             SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
    1209           0 :             break;
    1210             :         default:
    1211         208 :             SyncRepWaitMode = SYNC_REP_NO_WAIT;
    1212         208 :             break;
    1213             :     }
    1214        2240 : }

Generated by: LCOV version 1.13