LCOV - code coverage report
Current view: top level - src/backend/replication - syncrep.c (source / functions) Hit Total Coverage
Test: PostgreSQL 14devel Lines: 229 305 75.1 %
Date: 2020-11-27 12:05:55 Functions: 15 18 83.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * syncrep.c
       4             :  *
       5             :  * Synchronous replication is new as of PostgreSQL 9.1.
       6             :  *
       7             :  * If requested, transaction commits wait until their commit LSN are
       8             :  * acknowledged by the synchronous standbys.
       9             :  *
      10             :  * This module contains the code for waiting and release of backends.
      11             :  * All code in this module executes on the primary. The core streaming
      12             :  * replication transport remains within WALreceiver/WALsender modules.
      13             :  *
      14             :  * The essence of this design is that it isolates all logic about
      15             :  * waiting/releasing onto the primary. The primary defines which standbys
      16             :  * it wishes to wait for. The standbys are completely unaware of the
      17             :  * durability requirements of transactions on the primary, reducing the
      18             :  * complexity of the code and streamlining both standby operations and
      19             :  * network bandwidth because there is no requirement to ship
      20             :  * per-transaction state information.
      21             :  *
      22             :  * Replication is either synchronous or not synchronous (async). If it is
      23             :  * async, we just fastpath out of here. If it is sync, then we wait for
      24             :  * the write, flush or apply location on the standby before releasing
      25             :  * the waiting backend. Further complexity in that interaction is
      26             :  * expected in later releases.
      27             :  *
      28             :  * The best performing way to manage the waiting backends is to have a
      29             :  * single ordered queue of waiting backends, so that we can avoid
      30             :  * searching the through all waiters each time we receive a reply.
      31             :  *
      32             :  * In 9.5 or before only a single standby could be considered as
      33             :  * synchronous. In 9.6 we support a priority-based multiple synchronous
      34             :  * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
      35             :  * supported. The number of synchronous standbys that transactions
      36             :  * must wait for replies from is specified in synchronous_standby_names.
      37             :  * This parameter also specifies a list of standby names and the method
      38             :  * (FIRST and ANY) to choose synchronous standbys from the listed ones.
      39             :  *
      40             :  * The method FIRST specifies a priority-based synchronous replication
      41             :  * and makes transaction commits wait until their WAL records are
      42             :  * replicated to the requested number of synchronous standbys chosen based
      43             :  * on their priorities. The standbys whose names appear earlier in the list
      44             :  * are given higher priority and will be considered as synchronous.
      45             :  * Other standby servers appearing later in this list represent potential
      46             :  * synchronous standbys. If any of the current synchronous standbys
      47             :  * disconnects for whatever reason, it will be replaced immediately with
      48             :  * the next-highest-priority standby.
      49             :  *
      50             :  * The method ANY specifies a quorum-based synchronous replication
      51             :  * and makes transaction commits wait until their WAL records are
      52             :  * replicated to at least the requested number of synchronous standbys
      53             :  * in the list. All the standbys appearing in the list are considered as
      54             :  * candidates for quorum synchronous standbys.
      55             :  *
      56             :  * If neither FIRST nor ANY is specified, FIRST is used as the method.
      57             :  * This is for backward compatibility with 9.6 or before where only a
      58             :  * priority-based sync replication was supported.
      59             :  *
      60             :  * Before the standbys chosen from synchronous_standby_names can
      61             :  * become the synchronous standbys they must have caught up with
      62             :  * the primary; that may take some time. Once caught up,
      63             :  * the standbys which are considered as synchronous at that moment
      64             :  * will release waiters from the queue.
      65             :  *
      66             :  * Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group
      67             :  *
      68             :  * IDENTIFICATION
      69             :  *    src/backend/replication/syncrep.c
      70             :  *
      71             :  *-------------------------------------------------------------------------
      72             :  */
      73             : #include "postgres.h"
      74             : 
      75             : #include <unistd.h>
      76             : 
      77             : #include "access/xact.h"
      78             : #include "miscadmin.h"
      79             : #include "pgstat.h"
      80             : #include "replication/syncrep.h"
      81             : #include "replication/walsender.h"
      82             : #include "replication/walsender_private.h"
      83             : #include "storage/pmsignal.h"
      84             : #include "storage/proc.h"
      85             : #include "tcop/tcopprot.h"
      86             : #include "utils/builtins.h"
      87             : #include "utils/ps_status.h"
      88             : 
      89             : /* User-settable parameters for sync rep */
      90             : char       *SyncRepStandbyNames;
      91             : 
      92             : #define SyncStandbysDefined() \
      93             :     (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
      94             : 
      95             : static bool announce_next_takeover = true;
      96             : 
      97             : SyncRepConfigData *SyncRepConfig = NULL;
      98             : static int  SyncRepWaitMode = SYNC_REP_NO_WAIT;
      99             : 
     100             : static void SyncRepQueueInsert(int mode);
     101             : static void SyncRepCancelWait(void);
     102             : static int  SyncRepWakeQueue(bool all, int mode);
     103             : 
     104             : static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
     105             :                                  XLogRecPtr *flushPtr,
     106             :                                  XLogRecPtr *applyPtr,
     107             :                                  bool *am_sync);
     108             : static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
     109             :                                        XLogRecPtr *flushPtr,
     110             :                                        XLogRecPtr *applyPtr,
     111             :                                        SyncRepStandbyData *sync_standbys,
     112             :                                        int num_standbys);
     113             : static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
     114             :                                           XLogRecPtr *flushPtr,
     115             :                                           XLogRecPtr *applyPtr,
     116             :                                           SyncRepStandbyData *sync_standbys,
     117             :                                           int num_standbys,
     118             :                                           uint8 nth);
     119             : static int  SyncRepGetStandbyPriority(void);
     120             : static int  standby_priority_comparator(const void *a, const void *b);
     121             : static int  cmp_lsn(const void *a, const void *b);
     122             : 
     123             : #ifdef USE_ASSERT_CHECKING
     124             : static bool SyncRepQueueIsOrderedByLSN(int mode);
     125             : #endif
     126             : 
     127             : /*
     128             :  * ===========================================================
     129             :  * Synchronous Replication functions for normal user backends
     130             :  * ===========================================================
     131             :  */
     132             : 
     133             : /*
     134             :  * Wait for synchronous replication, if requested by user.
     135             :  *
     136             :  * Initially backends start in state SYNC_REP_NOT_WAITING and then
     137             :  * change that state to SYNC_REP_WAITING before adding ourselves
     138             :  * to the wait queue. During SyncRepWakeQueue() a WALSender changes
     139             :  * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
     140             :  * This backend then resets its state to SYNC_REP_NOT_WAITING.
     141             :  *
     142             :  * 'lsn' represents the LSN to wait for.  'commit' indicates whether this LSN
     143             :  * represents a commit record.  If it doesn't, then we wait only for the WAL
     144             :  * to be flushed if synchronous_commit is set to the higher level of
     145             :  * remote_apply, because only commit records provide apply feedback.
     146             :  */
     147             : void
     148      287910 : SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
     149             : {
     150      287910 :     char       *new_status = NULL;
     151             :     const char *old_status;
     152             :     int         mode;
     153             : 
     154             :     /*
     155             :      * This should be called while holding interrupts during a transaction
     156             :      * commit to prevent the follow-up shared memory queue cleanups to be
     157             :      * influenced by external interruptions.
     158             :      */
     159             :     Assert(InterruptHoldoffCount > 0);
     160             : 
     161             :     /*
     162             :      * Fast exit if user has not requested sync replication, or there are no
     163             :      * sync replication standby names defined.
     164             :      *
     165             :      * Since this routine gets called every commit time, it's important to
     166             :      * exit quickly if sync replication is not requested. So we check
     167             :      * WalSndCtl->sync_standbys_defined flag without the lock and exit
     168             :      * immediately if it's false. If it's true, we need to check it again later
     169             :      * while holding the lock, to check the flag and operate the sync rep
     170             :      * queue atomically. This is necessary to avoid the race condition
     171             :      * described in SyncRepUpdateSyncStandbysDefined(). On the other
     172             :      * hand, if it's false, the lock is not necessary because we don't touch
     173             :      * the queue.
     174             :      */
     175      287910 :     if (!SyncRepRequested() ||
     176      277262 :         !((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
     177      287826 :         return;
     178             : 
     179             :     /* Cap the level for anything other than commit to remote flush only. */
     180          84 :     if (commit)
     181          40 :         mode = SyncRepWaitMode;
     182             :     else
     183          44 :         mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
     184             : 
     185             :     Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
     186             :     Assert(WalSndCtl != NULL);
     187             : 
     188          84 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     189             :     Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
     190             : 
     191             :     /*
     192             :      * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
     193             :      * set.  See SyncRepUpdateSyncStandbysDefined.
     194             :      *
     195             :      * Also check that the standby hasn't already replied. Unlikely race
     196             :      * condition but we'll be fetching that cache line anyway so it's likely
     197             :      * to be a low cost check.
     198             :      */
     199          84 :     if (!WalSndCtl->sync_standbys_defined ||
     200          84 :         lsn <= WalSndCtl->lsn[mode])
     201             :     {
     202          10 :         LWLockRelease(SyncRepLock);
     203          10 :         return;
     204             :     }
     205             : 
     206             :     /*
     207             :      * Set our waitLSN so WALSender will know when to wake us, and add
     208             :      * ourselves to the queue.
     209             :      */
     210          74 :     MyProc->waitLSN = lsn;
     211          74 :     MyProc->syncRepState = SYNC_REP_WAITING;
     212          74 :     SyncRepQueueInsert(mode);
     213             :     Assert(SyncRepQueueIsOrderedByLSN(mode));
     214          74 :     LWLockRelease(SyncRepLock);
     215             : 
     216             :     /* Alter ps display to show waiting for sync rep. */
     217          74 :     if (update_process_title)
     218             :     {
     219             :         int         len;
     220             : 
     221          74 :         old_status = get_ps_display(&len);
     222          74 :         new_status = (char *) palloc(len + 32 + 1);
     223          74 :         memcpy(new_status, old_status, len);
     224         148 :         sprintf(new_status + len, " waiting for %X/%X",
     225          74 :                 (uint32) (lsn >> 32), (uint32) lsn);
     226          74 :         set_ps_display(new_status);
     227          74 :         new_status[len] = '\0'; /* truncate off " waiting ..." */
     228             :     }
     229             : 
     230             :     /*
     231             :      * Wait for specified LSN to be confirmed.
     232             :      *
     233             :      * Each proc has its own wait latch, so we perform a normal latch
     234             :      * check/wait loop here.
     235             :      */
     236             :     for (;;)
     237          74 :     {
     238             :         int         rc;
     239             : 
     240             :         /* Must reset the latch before testing state. */
     241         148 :         ResetLatch(MyLatch);
     242             : 
     243             :         /*
     244             :          * Acquiring the lock is not needed, the latch ensures proper
     245             :          * barriers. If it looks like we're done, we must really be done,
     246             :          * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
     247             :          * it will never update it again, so we can't be seeing a stale value
     248             :          * in that case.
     249             :          */
     250         148 :         if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
     251          74 :             break;
     252             : 
     253             :         /*
     254             :          * If a wait for synchronous replication is pending, we can neither
     255             :          * acknowledge the commit nor raise ERROR or FATAL.  The latter would
     256             :          * lead the client to believe that the transaction aborted, which is
     257             :          * not true: it's already committed locally. The former is no good
     258             :          * either: the client has requested synchronous replication, and is
     259             :          * entitled to assume that an acknowledged commit is also replicated,
     260             :          * which might not be true. So in this case we issue a WARNING (which
     261             :          * some clients may be able to interpret) and shut off further output.
     262             :          * We do NOT reset ProcDiePending, so that the process will die after
     263             :          * the commit is cleaned up.
     264             :          */
     265          74 :         if (ProcDiePending)
     266             :         {
     267           0 :             ereport(WARNING,
     268             :                     (errcode(ERRCODE_ADMIN_SHUTDOWN),
     269             :                      errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
     270             :                      errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
     271           0 :             whereToSendOutput = DestNone;
     272           0 :             SyncRepCancelWait();
     273           0 :             break;
     274             :         }
     275             : 
     276             :         /*
     277             :          * It's unclear what to do if a query cancel interrupt arrives.  We
     278             :          * can't actually abort at this point, but ignoring the interrupt
     279             :          * altogether is not helpful, so we just terminate the wait with a
     280             :          * suitable warning.
     281             :          */
     282          74 :         if (QueryCancelPending)
     283             :         {
     284           0 :             QueryCancelPending = false;
     285           0 :             ereport(WARNING,
     286             :                     (errmsg("canceling wait for synchronous replication due to user request"),
     287             :                      errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
     288           0 :             SyncRepCancelWait();
     289           0 :             break;
     290             :         }
     291             : 
     292             :         /*
     293             :          * Wait on latch.  Any condition that should wake us up will set the
     294             :          * latch, so no need for timeout.
     295             :          */
     296          74 :         rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
     297             :                        WAIT_EVENT_SYNC_REP);
     298             : 
     299             :         /*
     300             :          * If the postmaster dies, we'll probably never get an acknowledgment,
     301             :          * because all the wal sender processes will exit. So just bail out.
     302             :          */
     303          74 :         if (rc & WL_POSTMASTER_DEATH)
     304             :         {
     305           0 :             ProcDiePending = true;
     306           0 :             whereToSendOutput = DestNone;
     307           0 :             SyncRepCancelWait();
     308           0 :             break;
     309             :         }
     310             :     }
     311             : 
     312             :     /*
     313             :      * WalSender has checked our LSN and has removed us from queue. Clean up
     314             :      * state and leave.  It's OK to reset these shared memory fields without
     315             :      * holding SyncRepLock, because any walsenders will ignore us anyway when
     316             :      * we're not on the queue.  We need a read barrier to make sure we see the
     317             :      * changes to the queue link (this might be unnecessary without
     318             :      * assertions, but better safe than sorry).
     319             :      */
     320          74 :     pg_read_barrier();
     321             :     Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
     322          74 :     MyProc->syncRepState = SYNC_REP_NOT_WAITING;
     323          74 :     MyProc->waitLSN = 0;
     324             : 
     325          74 :     if (new_status)
     326             :     {
     327             :         /* Reset ps display */
     328          74 :         set_ps_display(new_status);
     329          74 :         pfree(new_status);
     330             :     }
     331             : }
     332             : 
     333             : /*
     334             :  * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
     335             :  *
     336             :  * Usually we will go at tail of queue, though it's possible that we arrive
     337             :  * here out of order, so start at tail and work back to insertion point.
     338             :  */
     339             : static void
     340          74 : SyncRepQueueInsert(int mode)
     341             : {
     342             :     PGPROC     *proc;
     343             : 
     344             :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
     345          74 :     proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
     346          74 :                                    &(WalSndCtl->SyncRepQueue[mode]),
     347             :                                    offsetof(PGPROC, syncRepLinks));
     348             : 
     349          74 :     while (proc)
     350             :     {
     351             :         /*
     352             :          * Stop at the queue element that we should after to ensure the queue
     353             :          * is ordered by LSN.
     354             :          */
     355           0 :         if (proc->waitLSN < MyProc->waitLSN)
     356           0 :             break;
     357             : 
     358           0 :         proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
     359           0 :                                        &(proc->syncRepLinks),
     360             :                                        offsetof(PGPROC, syncRepLinks));
     361             :     }
     362             : 
     363          74 :     if (proc)
     364           0 :         SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
     365             :     else
     366          74 :         SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
     367          74 : }
     368             : 
     369             : /*
     370             :  * Acquire SyncRepLock and cancel any wait currently in progress.
     371             :  */
     372             : static void
     373           0 : SyncRepCancelWait(void)
     374             : {
     375           0 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     376           0 :     if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
     377           0 :         SHMQueueDelete(&(MyProc->syncRepLinks));
     378           0 :     MyProc->syncRepState = SYNC_REP_NOT_WAITING;
     379           0 :     LWLockRelease(SyncRepLock);
     380           0 : }
     381             : 
     382             : void
     383       11920 : SyncRepCleanupAtProcExit(void)
     384             : {
     385             :     /*
     386             :      * First check if we are removed from the queue without the lock to not
     387             :      * slow down backend exit.
     388             :      */
     389       11920 :     if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
     390             :     {
     391           0 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     392             : 
     393             :         /* maybe we have just been removed, so recheck */
     394           0 :         if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
     395           0 :             SHMQueueDelete(&(MyProc->syncRepLinks));
     396             : 
     397           0 :         LWLockRelease(SyncRepLock);
     398             :     }
     399       11920 : }
     400             : 
     401             : /*
     402             :  * ===========================================================
     403             :  * Synchronous Replication functions for wal sender processes
     404             :  * ===========================================================
     405             :  */
     406             : 
     407             : /*
     408             :  * Take any action required to initialise sync rep state from config
     409             :  * data. Called at WALSender startup and after each SIGHUP.
     410             :  */
     411             : void
     412         464 : SyncRepInitConfig(void)
     413             : {
     414             :     int         priority;
     415             : 
     416             :     /*
     417             :      * Determine if we are a potential sync standby and remember the result
     418             :      * for handling replies from standby.
     419             :      */
     420         464 :     priority = SyncRepGetStandbyPriority();
     421         464 :     if (MyWalSnd->sync_standby_priority != priority)
     422             :     {
     423          32 :         SpinLockAcquire(&MyWalSnd->mutex);
     424          32 :         MyWalSnd->sync_standby_priority = priority;
     425          32 :         SpinLockRelease(&MyWalSnd->mutex);
     426             : 
     427          32 :         ereport(DEBUG1,
     428             :                 (errmsg("standby \"%s\" now has synchronous standby priority %u",
     429             :                         application_name, priority)));
     430             :     }
     431         464 : }
     432             : 
     433             : /*
     434             :  * Update the LSNs on each queue based upon our latest state. This
     435             :  * implements a simple policy of first-valid-sync-standby-releases-waiter.
     436             :  *
     437             :  * Other policies are possible, which would change what we do here and
     438             :  * perhaps also which information we store as well.
     439             :  */
     440             : void
     441       19768 : SyncRepReleaseWaiters(void)
     442             : {
     443       19768 :     volatile WalSndCtlData *walsndctl = WalSndCtl;
     444             :     XLogRecPtr  writePtr;
     445             :     XLogRecPtr  flushPtr;
     446             :     XLogRecPtr  applyPtr;
     447             :     bool        got_recptr;
     448             :     bool        am_sync;
     449       19768 :     int         numwrite = 0;
     450       19768 :     int         numflush = 0;
     451       19768 :     int         numapply = 0;
     452             : 
     453             :     /*
     454             :      * If this WALSender is serving a standby that is not on the list of
     455             :      * potential sync standbys then we have nothing to do. If we are still
     456             :      * starting up, still running base backup or the current flush position is
     457             :      * still invalid, then leave quickly also.  Streaming or stopping WAL
     458             :      * senders are allowed to release waiters.
     459             :      */
     460       19768 :     if (MyWalSnd->sync_standby_priority == 0 ||
     461         240 :         (MyWalSnd->state != WALSNDSTATE_STREAMING &&
     462          52 :          MyWalSnd->state != WALSNDSTATE_STOPPING) ||
     463         236 :         XLogRecPtrIsInvalid(MyWalSnd->flush))
     464             :     {
     465       19532 :         announce_next_takeover = true;
     466       19534 :         return;
     467             :     }
     468             : 
     469             :     /*
     470             :      * We're a potential sync standby. Release waiters if there are enough
     471             :      * sync standbys and we are considered as sync.
     472             :      */
     473         236 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     474             : 
     475             :     /*
     476             :      * Check whether we are a sync standby or not, and calculate the synced
     477             :      * positions among all sync standbys.  (Note: although this step does not
     478             :      * of itself require holding SyncRepLock, it seems like a good idea to do
     479             :      * it after acquiring the lock.  This ensures that the WAL pointers we use
     480             :      * to release waiters are newer than any previous execution of this
     481             :      * routine used.)
     482             :      */
     483         236 :     got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
     484             : 
     485             :     /*
     486             :      * If we are managing a sync standby, though we weren't prior to this,
     487             :      * then announce we are now a sync standby.
     488             :      */
     489         236 :     if (announce_next_takeover && am_sync)
     490             :     {
     491          16 :         announce_next_takeover = false;
     492             : 
     493          16 :         if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
     494          16 :             ereport(LOG,
     495             :                     (errmsg("standby \"%s\" is now a synchronous standby with priority %u",
     496             :                             application_name, MyWalSnd->sync_standby_priority)));
     497             :         else
     498           0 :             ereport(LOG,
     499             :                     (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
     500             :                             application_name)));
     501             :     }
     502             : 
     503             :     /*
     504             :      * If the number of sync standbys is less than requested or we aren't
     505             :      * managing a sync standby then just leave.
     506             :      */
     507         236 :     if (!got_recptr || !am_sync)
     508             :     {
     509           2 :         LWLockRelease(SyncRepLock);
     510           2 :         announce_next_takeover = !am_sync;
     511           2 :         return;
     512             :     }
     513             : 
     514             :     /*
     515             :      * Set the lsn first so that when we wake backends they will release up to
     516             :      * this location.
     517             :      */
     518         234 :     if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
     519             :     {
     520          84 :         walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
     521          84 :         numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
     522             :     }
     523         234 :     if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
     524             :     {
     525          90 :         walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
     526          90 :         numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
     527             :     }
     528         234 :     if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
     529             :     {
     530          84 :         walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
     531          84 :         numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
     532             :     }
     533             : 
     534         234 :     LWLockRelease(SyncRepLock);
     535             : 
     536         234 :     elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
     537             :          numwrite, (uint32) (writePtr >> 32), (uint32) writePtr,
     538             :          numflush, (uint32) (flushPtr >> 32), (uint32) flushPtr,
     539             :          numapply, (uint32) (applyPtr >> 32), (uint32) applyPtr);
     540             : }
     541             : 
     542             : /*
     543             :  * Calculate the synced Write, Flush and Apply positions among sync standbys.
     544             :  *
     545             :  * Return false if the number of sync standbys is less than
     546             :  * synchronous_standby_names specifies. Otherwise return true and
     547             :  * store the positions into *writePtr, *flushPtr and *applyPtr.
     548             :  *
     549             :  * On return, *am_sync is set to true if this walsender is connecting to
     550             :  * sync standby. Otherwise it's set to false.
     551             :  */
     552             : static bool
     553         236 : SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
     554             :                      XLogRecPtr *applyPtr, bool *am_sync)
     555             : {
     556             :     SyncRepStandbyData *sync_standbys;
     557             :     int         num_standbys;
     558             :     int         i;
     559             : 
     560             :     /* Initialize default results */
     561         236 :     *writePtr = InvalidXLogRecPtr;
     562         236 :     *flushPtr = InvalidXLogRecPtr;
     563         236 :     *applyPtr = InvalidXLogRecPtr;
     564         236 :     *am_sync = false;
     565             : 
     566             :     /* Quick out if not even configured to be synchronous */
     567         236 :     if (SyncRepConfig == NULL)
     568           0 :         return false;
     569             : 
     570             :     /* Get standbys that are considered as synchronous at this moment */
     571         236 :     num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
     572             : 
     573             :     /* Am I among the candidate sync standbys? */
     574         238 :     for (i = 0; i < num_standbys; i++)
     575             :     {
     576         236 :         if (sync_standbys[i].is_me)
     577             :         {
     578         234 :             *am_sync = true;
     579         234 :             break;
     580             :         }
     581             :     }
     582             : 
     583             :     /*
     584             :      * Nothing more to do if we are not managing a sync standby or there are
     585             :      * not enough synchronous standbys.
     586             :      */
     587         236 :     if (!(*am_sync) ||
     588         234 :         num_standbys < SyncRepConfig->num_sync)
     589             :     {
     590           2 :         pfree(sync_standbys);
     591           2 :         return false;
     592             :     }
     593             : 
     594             :     /*
     595             :      * In a priority-based sync replication, the synced positions are the
     596             :      * oldest ones among sync standbys. In a quorum-based, they are the Nth
     597             :      * latest ones.
     598             :      *
     599             :      * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
     600             :      * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
     601             :      * because it's a bit more efficient.
     602             :      *
     603             :      * XXX If the numbers of current and requested sync standbys are the same,
     604             :      * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
     605             :      * positions even in a quorum-based sync replication.
     606             :      */
     607         234 :     if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
     608             :     {
     609         234 :         SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
     610             :                                    sync_standbys, num_standbys);
     611             :     }
     612             :     else
     613             :     {
     614           0 :         SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
     615             :                                       sync_standbys, num_standbys,
     616           0 :                                       SyncRepConfig->num_sync);
     617             :     }
     618             : 
     619         234 :     pfree(sync_standbys);
     620         234 :     return true;
     621             : }
     622             : 
     623             : /*
     624             :  * Calculate the oldest Write, Flush and Apply positions among sync standbys.
     625             :  */
     626             : static void
     627         234 : SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
     628             :                            XLogRecPtr *flushPtr,
     629             :                            XLogRecPtr *applyPtr,
     630             :                            SyncRepStandbyData *sync_standbys,
     631             :                            int num_standbys)
     632             : {
     633             :     int         i;
     634             : 
     635             :     /*
     636             :      * Scan through all sync standbys and calculate the oldest Write, Flush
     637             :      * and Apply positions.  We assume *writePtr et al were initialized to
     638             :      * InvalidXLogRecPtr.
     639             :      */
     640         468 :     for (i = 0; i < num_standbys; i++)
     641             :     {
     642         234 :         XLogRecPtr  write = sync_standbys[i].write;
     643         234 :         XLogRecPtr  flush = sync_standbys[i].flush;
     644         234 :         XLogRecPtr  apply = sync_standbys[i].apply;
     645             : 
     646         234 :         if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
     647         234 :             *writePtr = write;
     648         234 :         if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
     649         234 :             *flushPtr = flush;
     650         234 :         if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
     651         234 :             *applyPtr = apply;
     652             :     }
     653         234 : }
     654             : 
     655             : /*
     656             :  * Calculate the Nth latest Write, Flush and Apply positions among sync
     657             :  * standbys.
     658             :  */
     659             : static void
     660           0 : SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
     661             :                               XLogRecPtr *flushPtr,
     662             :                               XLogRecPtr *applyPtr,
     663             :                               SyncRepStandbyData *sync_standbys,
     664             :                               int num_standbys,
     665             :                               uint8 nth)
     666             : {
     667             :     XLogRecPtr *write_array;
     668             :     XLogRecPtr *flush_array;
     669             :     XLogRecPtr *apply_array;
     670             :     int         i;
     671             : 
     672             :     /* Should have enough candidates, or somebody messed up */
     673             :     Assert(nth > 0 && nth <= num_standbys);
     674             : 
     675           0 :     write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
     676           0 :     flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
     677           0 :     apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
     678             : 
     679           0 :     for (i = 0; i < num_standbys; i++)
     680             :     {
     681           0 :         write_array[i] = sync_standbys[i].write;
     682           0 :         flush_array[i] = sync_standbys[i].flush;
     683           0 :         apply_array[i] = sync_standbys[i].apply;
     684             :     }
     685             : 
     686             :     /* Sort each array in descending order */
     687           0 :     qsort(write_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
     688           0 :     qsort(flush_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
     689           0 :     qsort(apply_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
     690             : 
     691             :     /* Get Nth latest Write, Flush, Apply positions */
     692           0 :     *writePtr = write_array[nth - 1];
     693           0 :     *flushPtr = flush_array[nth - 1];
     694           0 :     *applyPtr = apply_array[nth - 1];
     695             : 
     696           0 :     pfree(write_array);
     697           0 :     pfree(flush_array);
     698           0 :     pfree(apply_array);
     699           0 : }
     700             : 
     701             : /*
     702             :  * Compare lsn in order to sort array in descending order.
     703             :  */
     704             : static int
     705           0 : cmp_lsn(const void *a, const void *b)
     706             : {
     707           0 :     XLogRecPtr  lsn1 = *((const XLogRecPtr *) a);
     708           0 :     XLogRecPtr  lsn2 = *((const XLogRecPtr *) b);
     709             : 
     710           0 :     if (lsn1 > lsn2)
     711           0 :         return -1;
     712           0 :     else if (lsn1 == lsn2)
     713           0 :         return 0;
     714             :     else
     715           0 :         return 1;
     716             : }
     717             : 
     718             : /*
     719             :  * Return data about walsenders that are candidates to be sync standbys.
     720             :  *
     721             :  * *standbys is set to a palloc'd array of structs of per-walsender data,
     722             :  * and the number of valid entries (candidate sync senders) is returned.
     723             :  * (This might be more or fewer than num_sync; caller must check.)
     724             :  */
     725             : int
     726         656 : SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
     727             : {
     728             :     int         i;
     729             :     int         n;
     730             : 
     731             :     /* Create result array */
     732         656 :     *standbys = (SyncRepStandbyData *)
     733         656 :         palloc(max_wal_senders * sizeof(SyncRepStandbyData));
     734             : 
     735             :     /* Quick exit if sync replication is not requested */
     736         656 :     if (SyncRepConfig == NULL)
     737         388 :         return 0;
     738             : 
     739             :     /* Collect raw data from shared memory */
     740         268 :     n = 0;
     741        2948 :     for (i = 0; i < max_wal_senders; i++)
     742             :     {
     743             :         volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
     744             :                                      * rearrangement */
     745             :         SyncRepStandbyData *stby;
     746             :         WalSndState state;      /* not included in SyncRepStandbyData */
     747             : 
     748        2680 :         walsnd = &WalSndCtl->walsnds[i];
     749        2680 :         stby = *standbys + n;
     750             : 
     751        2680 :         SpinLockAcquire(&walsnd->mutex);
     752        2680 :         stby->pid = walsnd->pid;
     753        2680 :         state = walsnd->state;
     754        2680 :         stby->write = walsnd->write;
     755        2680 :         stby->flush = walsnd->flush;
     756        2680 :         stby->apply = walsnd->apply;
     757        2680 :         stby->sync_standby_priority = walsnd->sync_standby_priority;
     758        2680 :         SpinLockRelease(&walsnd->mutex);
     759             : 
     760             :         /* Must be active */
     761        2680 :         if (stby->pid == 0)
     762        2356 :             continue;
     763             : 
     764             :         /* Must be streaming or stopping */
     765         324 :         if (state != WALSNDSTATE_STREAMING &&
     766             :             state != WALSNDSTATE_STOPPING)
     767           0 :             continue;
     768             : 
     769             :         /* Must be synchronous */
     770         324 :         if (stby->sync_standby_priority == 0)
     771          10 :             continue;
     772             : 
     773             :         /* Must have a valid flush position */
     774         314 :         if (XLogRecPtrIsInvalid(stby->flush))
     775           4 :             continue;
     776             : 
     777             :         /* OK, it's a candidate */
     778         310 :         stby->walsnd_index = i;
     779         310 :         stby->is_me = (walsnd == MyWalSnd);
     780         310 :         n++;
     781             :     }
     782             : 
     783             :     /*
     784             :      * In quorum mode, we return all the candidates.  In priority mode, if we
     785             :      * have too many candidates then return only the num_sync ones of highest
     786             :      * priority.
     787             :      */
     788         268 :     if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY &&
     789         266 :         n > SyncRepConfig->num_sync)
     790             :     {
     791             :         /* Sort by priority ... */
     792          16 :         qsort(*standbys, n, sizeof(SyncRepStandbyData),
     793             :               standby_priority_comparator);
     794             :         /* ... then report just the first num_sync ones */
     795          16 :         n = SyncRepConfig->num_sync;
     796             :     }
     797             : 
     798         268 :     return n;
     799             : }
     800             : 
     801             : /*
     802             :  * qsort comparator to sort SyncRepStandbyData entries by priority
     803             :  */
     804             : static int
     805          38 : standby_priority_comparator(const void *a, const void *b)
     806             : {
     807          38 :     const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
     808          38 :     const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
     809             : 
     810             :     /* First, sort by increasing priority value */
     811          38 :     if (sa->sync_standby_priority != sb->sync_standby_priority)
     812          18 :         return sa->sync_standby_priority - sb->sync_standby_priority;
     813             : 
     814             :     /*
     815             :      * We might have equal priority values; arbitrarily break ties by position
     816             :      * in the WALSnd array.  (This is utterly bogus, since that is arrival
     817             :      * order dependent, but there are regression tests that rely on it.)
     818             :      */
     819          20 :     return sa->walsnd_index - sb->walsnd_index;
     820             : }
     821             : 
     822             : 
     823             : /*
     824             :  * Check if we are in the list of sync standbys, and if so, determine
     825             :  * priority sequence. Return priority if set, or zero to indicate that
     826             :  * we are not a potential sync standby.
     827             :  *
     828             :  * Compare the parameter SyncRepStandbyNames against the application_name
     829             :  * for this WALSender, or allow any name if we find a wildcard "*".
     830             :  */
     831             : static int
     832         464 : SyncRepGetStandbyPriority(void)
     833             : {
     834             :     const char *standby_name;
     835             :     int         priority;
     836         464 :     bool        found = false;
     837             : 
     838             :     /*
     839             :      * Since synchronous cascade replication is not allowed, we always set the
     840             :      * priority of cascading walsender to zero.
     841             :      */
     842         464 :     if (am_cascading_walsender)
     843          16 :         return 0;
     844             : 
     845         448 :     if (!SyncStandbysDefined() || SyncRepConfig == NULL)
     846         404 :         return 0;
     847             : 
     848          44 :     standby_name = SyncRepConfig->member_names;
     849          60 :     for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
     850             :     {
     851          58 :         if (pg_strcasecmp(standby_name, application_name) == 0 ||
     852          36 :             strcmp(standby_name, "*") == 0)
     853             :         {
     854          42 :             found = true;
     855          42 :             break;
     856             :         }
     857          16 :         standby_name += strlen(standby_name) + 1;
     858             :     }
     859             : 
     860          44 :     if (!found)
     861           2 :         return 0;
     862             : 
     863             :     /*
     864             :      * In quorum-based sync replication, all the standbys in the list have the
     865             :      * same priority, one.
     866             :      */
     867          42 :     return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
     868             : }
     869             : 
     870             : /*
     871             :  * Walk the specified queue from head.  Set the state of any backends that
     872             :  * need to be woken, remove them from the queue, and then wake them.
     873             :  * Pass all = true to wake whole queue; otherwise, just wake up to
     874             :  * the walsender's LSN.
     875             :  *
     876             :  * The caller must hold SyncRepLock in exclusive mode.
     877             :  */
     878             : static int
     879         258 : SyncRepWakeQueue(bool all, int mode)
     880             : {
     881         258 :     volatile WalSndCtlData *walsndctl = WalSndCtl;
     882         258 :     PGPROC     *proc = NULL;
     883         258 :     PGPROC     *thisproc = NULL;
     884         258 :     int         numprocs = 0;
     885             : 
     886             :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
     887             :     Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
     888             :     Assert(SyncRepQueueIsOrderedByLSN(mode));
     889             : 
     890         258 :     proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
     891         258 :                                    &(WalSndCtl->SyncRepQueue[mode]),
     892             :                                    offsetof(PGPROC, syncRepLinks));
     893             : 
     894         294 :     while (proc)
     895             :     {
     896             :         /*
     897             :          * Assume the queue is ordered by LSN
     898             :          */
     899          38 :         if (!all && walsndctl->lsn[mode] < proc->waitLSN)
     900           2 :             return numprocs;
     901             : 
     902             :         /*
     903             :          * Move to next proc, so we can delete thisproc from the queue.
     904             :          * thisproc is valid, proc may be NULL after this.
     905             :          */
     906          36 :         thisproc = proc;
     907          36 :         proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
     908          36 :                                        &(proc->syncRepLinks),
     909             :                                        offsetof(PGPROC, syncRepLinks));
     910             : 
     911             :         /*
     912             :          * Remove thisproc from queue.
     913             :          */
     914          36 :         SHMQueueDelete(&(thisproc->syncRepLinks));
     915             : 
     916             :         /*
     917             :          * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
     918             :          * make sure that it sees the queue link being removed before the
     919             :          * syncRepState change.
     920             :          */
     921          36 :         pg_write_barrier();
     922             : 
     923             :         /*
     924             :          * Set state to complete; see SyncRepWaitForLSN() for discussion of
     925             :          * the various states.
     926             :          */
     927          36 :         thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;
     928             : 
     929             :         /*
     930             :          * Wake only when we have set state and removed from queue.
     931             :          */
     932          36 :         SetLatch(&(thisproc->procLatch));
     933             : 
     934          36 :         numprocs++;
     935             :     }
     936             : 
     937         256 :     return numprocs;
     938             : }
     939             : 
     940             : /*
     941             :  * The checkpointer calls this as needed to update the shared
     942             :  * sync_standbys_defined flag, so that backends don't remain permanently wedged
     943             :  * if synchronous_standby_names is unset.  It's safe to check the current value
     944             :  * without the lock, because it's only ever updated by one process.  But we
     945             :  * must take the lock to change it.
     946             :  */
     947             : void
     948         462 : SyncRepUpdateSyncStandbysDefined(void)
     949             : {
     950         462 :     bool        sync_standbys_defined = SyncStandbysDefined();
     951             : 
     952         462 :     if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
     953             :     {
     954          16 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     955             : 
     956             :         /*
     957             :          * If synchronous_standby_names has been reset to empty, it's futile
     958             :          * for backends to continue waiting.  Since the user no longer wants
     959             :          * synchronous replication, we'd better wake them up.
     960             :          */
     961          16 :         if (!sync_standbys_defined)
     962             :         {
     963             :             int         i;
     964             : 
     965           0 :             for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
     966           0 :                 SyncRepWakeQueue(true, i);
     967             :         }
     968             : 
     969             :         /*
     970             :          * Only allow people to join the queue when there are synchronous
     971             :          * standbys defined.  Without this interlock, there's a race
     972             :          * condition: we might wake up all the current waiters; then, some
     973             :          * backend that hasn't yet reloaded its config might go to sleep on
     974             :          * the queue (and never wake up).  This prevents that.
     975             :          */
     976          16 :         WalSndCtl->sync_standbys_defined = sync_standbys_defined;
     977             : 
     978          16 :         LWLockRelease(SyncRepLock);
     979             :     }
     980         462 : }
     981             : 
     982             : #ifdef USE_ASSERT_CHECKING
     983             : static bool
     984             : SyncRepQueueIsOrderedByLSN(int mode)
     985             : {
     986             :     PGPROC     *proc = NULL;
     987             :     XLogRecPtr  lastLSN;
     988             : 
     989             :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
     990             : 
     991             :     lastLSN = 0;
     992             : 
     993             :     proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
     994             :                                    &(WalSndCtl->SyncRepQueue[mode]),
     995             :                                    offsetof(PGPROC, syncRepLinks));
     996             : 
     997             :     while (proc)
     998             :     {
     999             :         /*
    1000             :          * Check the queue is ordered by LSN and that multiple procs don't
    1001             :          * have matching LSNs
    1002             :          */
    1003             :         if (proc->waitLSN <= lastLSN)
    1004             :             return false;
    1005             : 
    1006             :         lastLSN = proc->waitLSN;
    1007             : 
    1008             :         proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
    1009             :                                        &(proc->syncRepLinks),
    1010             :                                        offsetof(PGPROC, syncRepLinks));
    1011             :     }
    1012             : 
    1013             :     return true;
    1014             : }
    1015             : #endif
    1016             : 
    1017             : /*
    1018             :  * ===========================================================
    1019             :  * Synchronous Replication functions executed by any process
    1020             :  * ===========================================================
    1021             :  */
    1022             : 
    1023             : bool
    1024        2536 : check_synchronous_standby_names(char **newval, void **extra, GucSource source)
    1025             : {
    1026        2536 :     if (*newval != NULL && (*newval)[0] != '\0')
    1027         150 :     {
    1028             :         int         parse_rc;
    1029             :         SyncRepConfigData *pconf;
    1030             : 
    1031             :         /* Reset communication variables to ensure a fresh start */
    1032         150 :         syncrep_parse_result = NULL;
    1033         150 :         syncrep_parse_error_msg = NULL;
    1034             : 
    1035             :         /* Parse the synchronous_standby_names string */
    1036         150 :         syncrep_scanner_init(*newval);
    1037         150 :         parse_rc = syncrep_yyparse();
    1038         150 :         syncrep_scanner_finish();
    1039             : 
    1040         150 :         if (parse_rc != 0 || syncrep_parse_result == NULL)
    1041             :         {
    1042           0 :             GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
    1043           0 :             if (syncrep_parse_error_msg)
    1044           0 :                 GUC_check_errdetail("%s", syncrep_parse_error_msg);
    1045             :             else
    1046           0 :                 GUC_check_errdetail("synchronous_standby_names parser failed");
    1047           0 :             return false;
    1048             :         }
    1049             : 
    1050         150 :         if (syncrep_parse_result->num_sync <= 0)
    1051             :         {
    1052           0 :             GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
    1053           0 :                              syncrep_parse_result->num_sync);
    1054           0 :             return false;
    1055             :         }
    1056             : 
    1057             :         /* GUC extra value must be malloc'd, not palloc'd */
    1058             :         pconf = (SyncRepConfigData *)
    1059         150 :             malloc(syncrep_parse_result->config_size);
    1060         150 :         if (pconf == NULL)
    1061           0 :             return false;
    1062         150 :         memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);
    1063             : 
    1064         150 :         *extra = (void *) pconf;
    1065             : 
    1066             :         /*
    1067             :          * We need not explicitly clean up syncrep_parse_result.  It, and any
    1068             :          * other cruft generated during parsing, will be freed when the
    1069             :          * current memory context is deleted.  (This code is generally run in
    1070             :          * a short-lived context used for config file processing, so that will
    1071             :          * not be very long.)
    1072             :          */
    1073             :     }
    1074             :     else
    1075        2386 :         *extra = NULL;
    1076             : 
    1077        2536 :     return true;
    1078             : }
    1079             : 
    1080             : void
    1081        2518 : assign_synchronous_standby_names(const char *newval, void *extra)
    1082             : {
    1083        2518 :     SyncRepConfig = (SyncRepConfigData *) extra;
    1084        2518 : }
    1085             : 
    1086             : void
    1087        2842 : assign_synchronous_commit(int newval, void *extra)
    1088             : {
    1089        2842 :     switch (newval)
    1090             :     {
    1091           0 :         case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
    1092           0 :             SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
    1093           0 :             break;
    1094        2572 :         case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
    1095        2572 :             SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
    1096        2572 :             break;
    1097           0 :         case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
    1098           0 :             SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
    1099           0 :             break;
    1100         270 :         default:
    1101         270 :             SyncRepWaitMode = SYNC_REP_NO_WAIT;
    1102         270 :             break;
    1103             :     }
    1104        2842 : }

Generated by: LCOV version 1.13