LCOV - code coverage report
Current view: top level - src/backend/replication - syncrep.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 222 296 75.0 %
Date: 2025-04-24 12:15:10 Functions: 15 18 83.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * syncrep.c
       4             :  *
       5             :  * Synchronous replication is new as of PostgreSQL 9.1.
       6             :  *
       7             :  * If requested, transaction commits wait until their commit LSN are
       8             :  * acknowledged by the synchronous standbys.
       9             :  *
      10             :  * This module contains the code for waiting and release of backends.
      11             :  * All code in this module executes on the primary. The core streaming
      12             :  * replication transport remains within WALreceiver/WALsender modules.
      13             :  *
      14             :  * The essence of this design is that it isolates all logic about
      15             :  * waiting/releasing onto the primary. The primary defines which standbys
      16             :  * it wishes to wait for. The standbys are completely unaware of the
      17             :  * durability requirements of transactions on the primary, reducing the
      18             :  * complexity of the code and streamlining both standby operations and
      19             :  * network bandwidth because there is no requirement to ship
      20             :  * per-transaction state information.
      21             :  *
      22             :  * Replication is either synchronous or not synchronous (async). If it is
      23             :  * async, we just fastpath out of here. If it is sync, then we wait for
      24             :  * the write, flush or apply location on the standby before releasing
      25             :  * the waiting backend. Further complexity in that interaction is
      26             :  * expected in later releases.
      27             :  *
      28             :  * The best performing way to manage the waiting backends is to have a
      29             :  * single ordered queue of waiting backends, so that we can avoid
      30             :  * searching the through all waiters each time we receive a reply.
      31             :  *
      32             :  * In 9.5 or before only a single standby could be considered as
      33             :  * synchronous. In 9.6 we support a priority-based multiple synchronous
      34             :  * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
      35             :  * supported. The number of synchronous standbys that transactions
      36             :  * must wait for replies from is specified in synchronous_standby_names.
      37             :  * This parameter also specifies a list of standby names and the method
      38             :  * (FIRST and ANY) to choose synchronous standbys from the listed ones.
      39             :  *
      40             :  * The method FIRST specifies a priority-based synchronous replication
      41             :  * and makes transaction commits wait until their WAL records are
      42             :  * replicated to the requested number of synchronous standbys chosen based
      43             :  * on their priorities. The standbys whose names appear earlier in the list
      44             :  * are given higher priority and will be considered as synchronous.
      45             :  * Other standby servers appearing later in this list represent potential
      46             :  * synchronous standbys. If any of the current synchronous standbys
      47             :  * disconnects for whatever reason, it will be replaced immediately with
      48             :  * the next-highest-priority standby.
      49             :  *
      50             :  * The method ANY specifies a quorum-based synchronous replication
      51             :  * and makes transaction commits wait until their WAL records are
      52             :  * replicated to at least the requested number of synchronous standbys
      53             :  * in the list. All the standbys appearing in the list are considered as
      54             :  * candidates for quorum synchronous standbys.
      55             :  *
      56             :  * If neither FIRST nor ANY is specified, FIRST is used as the method.
      57             :  * This is for backward compatibility with 9.6 or before where only a
      58             :  * priority-based sync replication was supported.
      59             :  *
      60             :  * Before the standbys chosen from synchronous_standby_names can
      61             :  * become the synchronous standbys they must have caught up with
      62             :  * the primary; that may take some time. Once caught up,
      63             :  * the standbys which are considered as synchronous at that moment
      64             :  * will release waiters from the queue.
      65             :  *
      66             :  * Portions Copyright (c) 2010-2025, PostgreSQL Global Development Group
      67             :  *
      68             :  * IDENTIFICATION
      69             :  *    src/backend/replication/syncrep.c
      70             :  *
      71             :  *-------------------------------------------------------------------------
      72             :  */
      73             : #include "postgres.h"
      74             : 
      75             : #include <unistd.h>
      76             : 
      77             : #include "access/xact.h"
      78             : #include "common/int.h"
      79             : #include "miscadmin.h"
      80             : #include "pgstat.h"
      81             : #include "replication/syncrep.h"
      82             : #include "replication/walsender.h"
      83             : #include "replication/walsender_private.h"
      84             : #include "storage/proc.h"
      85             : #include "tcop/tcopprot.h"
      86             : #include "utils/guc_hooks.h"
      87             : #include "utils/ps_status.h"
      88             : 
      89             : /* User-settable parameters for sync rep */
      90             : char       *SyncRepStandbyNames;
      91             : 
      92             : #define SyncStandbysDefined() \
      93             :     (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
      94             : 
      95             : static bool announce_next_takeover = true;
      96             : 
      97             : SyncRepConfigData *SyncRepConfig = NULL;
      98             : static int  SyncRepWaitMode = SYNC_REP_NO_WAIT;
      99             : 
     100             : static void SyncRepQueueInsert(int mode);
     101             : static void SyncRepCancelWait(void);
     102             : static int  SyncRepWakeQueue(bool all, int mode);
     103             : 
     104             : static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
     105             :                                  XLogRecPtr *flushPtr,
     106             :                                  XLogRecPtr *applyPtr,
     107             :                                  bool *am_sync);
     108             : static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
     109             :                                        XLogRecPtr *flushPtr,
     110             :                                        XLogRecPtr *applyPtr,
     111             :                                        SyncRepStandbyData *sync_standbys,
     112             :                                        int num_standbys);
     113             : static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
     114             :                                           XLogRecPtr *flushPtr,
     115             :                                           XLogRecPtr *applyPtr,
     116             :                                           SyncRepStandbyData *sync_standbys,
     117             :                                           int num_standbys,
     118             :                                           uint8 nth);
     119             : static int  SyncRepGetStandbyPriority(void);
     120             : static int  standby_priority_comparator(const void *a, const void *b);
     121             : static int  cmp_lsn(const void *a, const void *b);
     122             : 
     123             : #ifdef USE_ASSERT_CHECKING
     124             : static bool SyncRepQueueIsOrderedByLSN(int mode);
     125             : #endif
     126             : 
     127             : /*
     128             :  * ===========================================================
     129             :  * Synchronous Replication functions for normal user backends
     130             :  * ===========================================================
     131             :  */
     132             : 
     133             : /*
     134             :  * Wait for synchronous replication, if requested by user.
     135             :  *
     136             :  * Initially backends start in state SYNC_REP_NOT_WAITING and then
     137             :  * change that state to SYNC_REP_WAITING before adding ourselves
     138             :  * to the wait queue. During SyncRepWakeQueue() a WALSender changes
     139             :  * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
     140             :  * This backend then resets its state to SYNC_REP_NOT_WAITING.
     141             :  *
     142             :  * 'lsn' represents the LSN to wait for.  'commit' indicates whether this LSN
     143             :  * represents a commit record.  If it doesn't, then we wait only for the WAL
     144             :  * to be flushed if synchronous_commit is set to the higher level of
     145             :  * remote_apply, because only commit records provide apply feedback.
     146             :  */
     147             : void
     148      245734 : SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
     149             : {
     150             :     int         mode;
     151             : 
     152             :     /*
     153             :      * This should be called while holding interrupts during a transaction
     154             :      * commit to prevent the follow-up shared memory queue cleanups to be
     155             :      * influenced by external interruptions.
     156             :      */
     157             :     Assert(InterruptHoldoffCount > 0);
     158             : 
     159             :     /*
     160             :      * Fast exit if user has not requested sync replication, or there are no
     161             :      * sync replication standby names defined.
     162             :      *
     163             :      * Since this routine gets called every commit time, it's important to
     164             :      * exit quickly if sync replication is not requested.
     165             :      *
     166             :      * We check WalSndCtl->sync_standbys_status flag without the lock and exit
     167             :      * immediately if SYNC_STANDBY_INIT is set (the checkpointer has
     168             :      * initialized this data) but SYNC_STANDBY_DEFINED is missing (no sync
     169             :      * replication requested).
     170             :      *
     171             :      * If SYNC_STANDBY_DEFINED is set, we need to check the status again later
     172             :      * while holding the lock, to check the flag and operate the sync rep
     173             :      * queue atomically.  This is necessary to avoid the race condition
     174             :      * described in SyncRepUpdateSyncStandbysDefined().  On the other hand, if
     175             :      * SYNC_STANDBY_DEFINED is not set, the lock is not necessary because we
     176             :      * don't touch the queue.
     177             :      */
     178      245734 :     if (!SyncRepRequested() ||
     179      184702 :         ((((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_status) &
     180             :          (SYNC_STANDBY_INIT | SYNC_STANDBY_DEFINED)) == SYNC_STANDBY_INIT)
     181      175046 :         return;
     182             : 
     183             :     /* Cap the level for anything other than commit to remote flush only. */
     184       70688 :     if (commit)
     185       70646 :         mode = SyncRepWaitMode;
     186             :     else
     187          42 :         mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
     188             : 
     189             :     Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
     190             :     Assert(WalSndCtl != NULL);
     191             : 
     192       70688 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     193             :     Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
     194             : 
     195             :     /*
     196             :      * We don't wait for sync rep if SYNC_STANDBY_DEFINED is not set.  See
     197             :      * SyncRepUpdateSyncStandbysDefined().
     198             :      *
     199             :      * Also check that the standby hasn't already replied. Unlikely race
     200             :      * condition but we'll be fetching that cache line anyway so it's likely
     201             :      * to be a low cost check.
     202             :      *
     203             :      * If the sync standby data has not been initialized yet
     204             :      * (SYNC_STANDBY_INIT is not set), fall back to a check based on the LSN,
     205             :      * then do a direct GUC check.
     206             :      */
     207       70688 :     if (WalSndCtl->sync_standbys_status & SYNC_STANDBY_INIT)
     208             :     {
     209          90 :         if ((WalSndCtl->sync_standbys_status & SYNC_STANDBY_DEFINED) == 0 ||
     210          90 :             lsn <= WalSndCtl->lsn[mode])
     211             :         {
     212          10 :             LWLockRelease(SyncRepLock);
     213          10 :             return;
     214             :         }
     215             :     }
     216       70598 :     else if (lsn <= WalSndCtl->lsn[mode])
     217             :     {
     218             :         /*
     219             :          * The LSN is older than what we need to wait for.  The sync standby
     220             :          * data has not been initialized yet, but we are OK to not wait
     221             :          * because we know that there is no point in doing so based on the
     222             :          * LSN.
     223             :          */
     224           0 :         LWLockRelease(SyncRepLock);
     225           0 :         return;
     226             :     }
     227       70598 :     else if (!SyncStandbysDefined())
     228             :     {
     229             :         /*
     230             :          * If we are here, the sync standby data has not been initialized yet,
     231             :          * and the LSN is newer than what need to wait for, so we have fallen
     232             :          * back to the best thing we could do in this case: a check on
     233             :          * SyncStandbysDefined() to see if the GUC is set or not.
     234             :          *
     235             :          * When the GUC has a value, we wait until the checkpointer updates
     236             :          * the status data because we cannot be sure yet if we should wait or
     237             :          * not. Here, the GUC has *no* value, we are sure that there is no
     238             :          * point to wait; this matters for example when initializing a
     239             :          * cluster, where we should never wait, and no sync standbys is the
     240             :          * default behavior.
     241             :          */
     242       70598 :         LWLockRelease(SyncRepLock);
     243       70598 :         return;
     244             :     }
     245             : 
     246             :     /*
     247             :      * Set our waitLSN so WALSender will know when to wake us, and add
     248             :      * ourselves to the queue.
     249             :      */
     250          80 :     MyProc->waitLSN = lsn;
     251          80 :     MyProc->syncRepState = SYNC_REP_WAITING;
     252          80 :     SyncRepQueueInsert(mode);
     253             :     Assert(SyncRepQueueIsOrderedByLSN(mode));
     254          80 :     LWLockRelease(SyncRepLock);
     255             : 
     256             :     /* Alter ps display to show waiting for sync rep. */
     257          80 :     if (update_process_title)
     258             :     {
     259             :         char        buffer[32];
     260             : 
     261          80 :         sprintf(buffer, "waiting for %X/%X", LSN_FORMAT_ARGS(lsn));
     262          80 :         set_ps_display_suffix(buffer);
     263             :     }
     264             : 
     265             :     /*
     266             :      * Wait for specified LSN to be confirmed.
     267             :      *
     268             :      * Each proc has its own wait latch, so we perform a normal latch
     269             :      * check/wait loop here.
     270             :      */
     271             :     for (;;)
     272          80 :     {
     273             :         int         rc;
     274             : 
     275             :         /* Must reset the latch before testing state. */
     276         160 :         ResetLatch(MyLatch);
     277             : 
     278             :         /*
     279             :          * Acquiring the lock is not needed, the latch ensures proper
     280             :          * barriers. If it looks like we're done, we must really be done,
     281             :          * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
     282             :          * it will never update it again, so we can't be seeing a stale value
     283             :          * in that case.
     284             :          */
     285         160 :         if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
     286          80 :             break;
     287             : 
     288             :         /*
     289             :          * If a wait for synchronous replication is pending, we can neither
     290             :          * acknowledge the commit nor raise ERROR or FATAL.  The latter would
     291             :          * lead the client to believe that the transaction aborted, which is
     292             :          * not true: it's already committed locally. The former is no good
     293             :          * either: the client has requested synchronous replication, and is
     294             :          * entitled to assume that an acknowledged commit is also replicated,
     295             :          * which might not be true. So in this case we issue a WARNING (which
     296             :          * some clients may be able to interpret) and shut off further output.
     297             :          * We do NOT reset ProcDiePending, so that the process will die after
     298             :          * the commit is cleaned up.
     299             :          */
     300          80 :         if (ProcDiePending)
     301             :         {
     302           0 :             ereport(WARNING,
     303             :                     (errcode(ERRCODE_ADMIN_SHUTDOWN),
     304             :                      errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
     305             :                      errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
     306           0 :             whereToSendOutput = DestNone;
     307           0 :             SyncRepCancelWait();
     308           0 :             break;
     309             :         }
     310             : 
     311             :         /*
     312             :          * It's unclear what to do if a query cancel interrupt arrives.  We
     313             :          * can't actually abort at this point, but ignoring the interrupt
     314             :          * altogether is not helpful, so we just terminate the wait with a
     315             :          * suitable warning.
     316             :          */
     317          80 :         if (QueryCancelPending)
     318             :         {
     319           0 :             QueryCancelPending = false;
     320           0 :             ereport(WARNING,
     321             :                     (errmsg("canceling wait for synchronous replication due to user request"),
     322             :                      errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
     323           0 :             SyncRepCancelWait();
     324           0 :             break;
     325             :         }
     326             : 
     327             :         /*
     328             :          * Wait on latch.  Any condition that should wake us up will set the
     329             :          * latch, so no need for timeout.
     330             :          */
     331          80 :         rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
     332             :                        WAIT_EVENT_SYNC_REP);
     333             : 
     334             :         /*
     335             :          * If the postmaster dies, we'll probably never get an acknowledgment,
     336             :          * because all the wal sender processes will exit. So just bail out.
     337             :          */
     338          80 :         if (rc & WL_POSTMASTER_DEATH)
     339             :         {
     340           0 :             ProcDiePending = true;
     341           0 :             whereToSendOutput = DestNone;
     342           0 :             SyncRepCancelWait();
     343           0 :             break;
     344             :         }
     345             :     }
     346             : 
     347             :     /*
     348             :      * WalSender has checked our LSN and has removed us from queue. Clean up
     349             :      * state and leave.  It's OK to reset these shared memory fields without
     350             :      * holding SyncRepLock, because any walsenders will ignore us anyway when
     351             :      * we're not on the queue.  We need a read barrier to make sure we see the
     352             :      * changes to the queue link (this might be unnecessary without
     353             :      * assertions, but better safe than sorry).
     354             :      */
     355          80 :     pg_read_barrier();
     356             :     Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
     357          80 :     MyProc->syncRepState = SYNC_REP_NOT_WAITING;
     358          80 :     MyProc->waitLSN = 0;
     359             : 
     360             :     /* reset ps display to remove the suffix */
     361          80 :     if (update_process_title)
     362          80 :         set_ps_display_remove_suffix();
     363             : }
     364             : 
     365             : /*
     366             :  * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
     367             :  *
     368             :  * Usually we will go at tail of queue, though it's possible that we arrive
     369             :  * here out of order, so start at tail and work back to insertion point.
     370             :  */
     371             : static void
     372          80 : SyncRepQueueInsert(int mode)
     373             : {
     374             :     dlist_head *queue;
     375             :     dlist_iter  iter;
     376             : 
     377             :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
     378          80 :     queue = &WalSndCtl->SyncRepQueue[mode];
     379             : 
     380          80 :     dlist_reverse_foreach(iter, queue)
     381             :     {
     382           0 :         PGPROC     *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
     383             : 
     384             :         /*
     385             :          * Stop at the queue element that we should insert after to ensure the
     386             :          * queue is ordered by LSN.
     387             :          */
     388           0 :         if (proc->waitLSN < MyProc->waitLSN)
     389             :         {
     390           0 :             dlist_insert_after(&proc->syncRepLinks, &MyProc->syncRepLinks);
     391           0 :             return;
     392             :         }
     393             :     }
     394             : 
     395             :     /*
     396             :      * If we get here, the list was either empty, or this process needs to be
     397             :      * at the head.
     398             :      */
     399          80 :     dlist_push_head(queue, &MyProc->syncRepLinks);
     400             : }
     401             : 
     402             : /*
     403             :  * Acquire SyncRepLock and cancel any wait currently in progress.
     404             :  */
     405             : static void
     406           0 : SyncRepCancelWait(void)
     407             : {
     408           0 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     409           0 :     if (!dlist_node_is_detached(&MyProc->syncRepLinks))
     410           0 :         dlist_delete_thoroughly(&MyProc->syncRepLinks);
     411           0 :     MyProc->syncRepState = SYNC_REP_NOT_WAITING;
     412           0 :     LWLockRelease(SyncRepLock);
     413           0 : }
     414             : 
     415             : void
     416       32736 : SyncRepCleanupAtProcExit(void)
     417             : {
     418             :     /*
     419             :      * First check if we are removed from the queue without the lock to not
     420             :      * slow down backend exit.
     421             :      */
     422       32736 :     if (!dlist_node_is_detached(&MyProc->syncRepLinks))
     423             :     {
     424           0 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     425             : 
     426             :         /* maybe we have just been removed, so recheck */
     427           0 :         if (!dlist_node_is_detached(&MyProc->syncRepLinks))
     428           0 :             dlist_delete_thoroughly(&MyProc->syncRepLinks);
     429             : 
     430           0 :         LWLockRelease(SyncRepLock);
     431             :     }
     432       32736 : }
     433             : 
     434             : /*
     435             :  * ===========================================================
     436             :  * Synchronous Replication functions for wal sender processes
     437             :  * ===========================================================
     438             :  */
     439             : 
     440             : /*
     441             :  * Take any action required to initialise sync rep state from config
     442             :  * data. Called at WALSender startup and after each SIGHUP.
     443             :  */
     444             : void
     445        1302 : SyncRepInitConfig(void)
     446             : {
     447             :     int         priority;
     448             : 
     449             :     /*
     450             :      * Determine if we are a potential sync standby and remember the result
     451             :      * for handling replies from standby.
     452             :      */
     453        1302 :     priority = SyncRepGetStandbyPriority();
     454        1302 :     if (MyWalSnd->sync_standby_priority != priority)
     455             :     {
     456          42 :         SpinLockAcquire(&MyWalSnd->mutex);
     457          42 :         MyWalSnd->sync_standby_priority = priority;
     458          42 :         SpinLockRelease(&MyWalSnd->mutex);
     459             : 
     460          42 :         ereport(DEBUG1,
     461             :                 (errmsg_internal("standby \"%s\" now has synchronous standby priority %d",
     462             :                                  application_name, priority)));
     463             :     }
     464        1302 : }
     465             : 
     466             : /*
     467             :  * Update the LSNs on each queue based upon our latest state. This
     468             :  * implements a simple policy of first-valid-sync-standby-releases-waiter.
     469             :  *
     470             :  * Other policies are possible, which would change what we do here and
     471             :  * perhaps also which information we store as well.
     472             :  */
     473             : void
     474       99832 : SyncRepReleaseWaiters(void)
     475             : {
     476       99832 :     volatile WalSndCtlData *walsndctl = WalSndCtl;
     477             :     XLogRecPtr  writePtr;
     478             :     XLogRecPtr  flushPtr;
     479             :     XLogRecPtr  applyPtr;
     480             :     bool        got_recptr;
     481             :     bool        am_sync;
     482       99832 :     int         numwrite = 0;
     483       99832 :     int         numflush = 0;
     484       99832 :     int         numapply = 0;
     485             : 
     486             :     /*
     487             :      * If this WALSender is serving a standby that is not on the list of
     488             :      * potential sync standbys then we have nothing to do. If we are still
     489             :      * starting up, still running base backup or the current flush position is
     490             :      * still invalid, then leave quickly also.  Streaming or stopping WAL
     491             :      * senders are allowed to release waiters.
     492             :      */
     493       99832 :     if (MyWalSnd->sync_standby_priority == 0 ||
     494         240 :         (MyWalSnd->state != WALSNDSTATE_STREAMING &&
     495          70 :          MyWalSnd->state != WALSNDSTATE_STOPPING) ||
     496         202 :         XLogRecPtrIsInvalid(MyWalSnd->flush))
     497             :     {
     498       99630 :         announce_next_takeover = true;
     499       99630 :         return;
     500             :     }
     501             : 
     502             :     /*
     503             :      * We're a potential sync standby. Release waiters if there are enough
     504             :      * sync standbys and we are considered as sync.
     505             :      */
     506         202 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     507             : 
     508             :     /*
     509             :      * Check whether we are a sync standby or not, and calculate the synced
     510             :      * positions among all sync standbys.  (Note: although this step does not
     511             :      * of itself require holding SyncRepLock, it seems like a good idea to do
     512             :      * it after acquiring the lock.  This ensures that the WAL pointers we use
     513             :      * to release waiters are newer than any previous execution of this
     514             :      * routine used.)
     515             :      */
     516         202 :     got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
     517             : 
     518             :     /*
     519             :      * If we are managing a sync standby, though we weren't prior to this,
     520             :      * then announce we are now a sync standby.
     521             :      */
     522         202 :     if (announce_next_takeover && am_sync)
     523             :     {
     524          18 :         announce_next_takeover = false;
     525             : 
     526          18 :         if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
     527          18 :             ereport(LOG,
     528             :                     (errmsg("standby \"%s\" is now a synchronous standby with priority %d",
     529             :                             application_name, MyWalSnd->sync_standby_priority)));
     530             :         else
     531           0 :             ereport(LOG,
     532             :                     (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
     533             :                             application_name)));
     534             :     }
     535             : 
     536             :     /*
     537             :      * If the number of sync standbys is less than requested or we aren't
     538             :      * managing a sync standby then just leave.
     539             :      */
     540         202 :     if (!got_recptr || !am_sync)
     541             :     {
     542           0 :         LWLockRelease(SyncRepLock);
     543           0 :         announce_next_takeover = !am_sync;
     544           0 :         return;
     545             :     }
     546             : 
     547             :     /*
     548             :      * Set the lsn first so that when we wake backends they will release up to
     549             :      * this location.
     550             :      */
     551         202 :     if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
     552             :     {
     553          74 :         walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
     554          74 :         numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
     555             :     }
     556         202 :     if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
     557             :     {
     558          90 :         walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
     559          90 :         numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
     560             :     }
     561         202 :     if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
     562             :     {
     563          70 :         walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
     564          70 :         numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
     565             :     }
     566             : 
     567         202 :     LWLockRelease(SyncRepLock);
     568             : 
     569         202 :     elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
     570             :          numwrite, LSN_FORMAT_ARGS(writePtr),
     571             :          numflush, LSN_FORMAT_ARGS(flushPtr),
     572             :          numapply, LSN_FORMAT_ARGS(applyPtr));
     573             : }
     574             : 
     575             : /*
     576             :  * Calculate the synced Write, Flush and Apply positions among sync standbys.
     577             :  *
     578             :  * Return false if the number of sync standbys is less than
     579             :  * synchronous_standby_names specifies. Otherwise return true and
     580             :  * store the positions into *writePtr, *flushPtr and *applyPtr.
     581             :  *
     582             :  * On return, *am_sync is set to true if this walsender is connecting to
     583             :  * sync standby. Otherwise it's set to false.
     584             :  */
     585             : static bool
     586         202 : SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
     587             :                      XLogRecPtr *applyPtr, bool *am_sync)
     588             : {
     589             :     SyncRepStandbyData *sync_standbys;
     590             :     int         num_standbys;
     591             :     int         i;
     592             : 
     593             :     /* Initialize default results */
     594         202 :     *writePtr = InvalidXLogRecPtr;
     595         202 :     *flushPtr = InvalidXLogRecPtr;
     596         202 :     *applyPtr = InvalidXLogRecPtr;
     597         202 :     *am_sync = false;
     598             : 
     599             :     /* Quick out if not even configured to be synchronous */
     600         202 :     if (SyncRepConfig == NULL)
     601           0 :         return false;
     602             : 
     603             :     /* Get standbys that are considered as synchronous at this moment */
     604         202 :     num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
     605             : 
     606             :     /* Am I among the candidate sync standbys? */
     607         202 :     for (i = 0; i < num_standbys; i++)
     608             :     {
     609         202 :         if (sync_standbys[i].is_me)
     610             :         {
     611         202 :             *am_sync = true;
     612         202 :             break;
     613             :         }
     614             :     }
     615             : 
     616             :     /*
     617             :      * Nothing more to do if we are not managing a sync standby or there are
     618             :      * not enough synchronous standbys.
     619             :      */
     620         202 :     if (!(*am_sync) ||
     621         202 :         num_standbys < SyncRepConfig->num_sync)
     622             :     {
     623           0 :         pfree(sync_standbys);
     624           0 :         return false;
     625             :     }
     626             : 
     627             :     /*
     628             :      * In a priority-based sync replication, the synced positions are the
     629             :      * oldest ones among sync standbys. In a quorum-based, they are the Nth
     630             :      * latest ones.
     631             :      *
     632             :      * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
     633             :      * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
     634             :      * because it's a bit more efficient.
     635             :      *
     636             :      * XXX If the numbers of current and requested sync standbys are the same,
     637             :      * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
     638             :      * positions even in a quorum-based sync replication.
     639             :      */
     640         202 :     if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
     641             :     {
     642         202 :         SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
     643             :                                    sync_standbys, num_standbys);
     644             :     }
     645             :     else
     646             :     {
     647           0 :         SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
     648             :                                       sync_standbys, num_standbys,
     649           0 :                                       SyncRepConfig->num_sync);
     650             :     }
     651             : 
     652         202 :     pfree(sync_standbys);
     653         202 :     return true;
     654             : }
     655             : 
     656             : /*
     657             :  * Calculate the oldest Write, Flush and Apply positions among sync standbys.
     658             :  */
     659             : static void
     660         202 : SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
     661             :                            XLogRecPtr *flushPtr,
     662             :                            XLogRecPtr *applyPtr,
     663             :                            SyncRepStandbyData *sync_standbys,
     664             :                            int num_standbys)
     665             : {
     666             :     int         i;
     667             : 
     668             :     /*
     669             :      * Scan through all sync standbys and calculate the oldest Write, Flush
     670             :      * and Apply positions.  We assume *writePtr et al were initialized to
     671             :      * InvalidXLogRecPtr.
     672             :      */
     673         404 :     for (i = 0; i < num_standbys; i++)
     674             :     {
     675         202 :         XLogRecPtr  write = sync_standbys[i].write;
     676         202 :         XLogRecPtr  flush = sync_standbys[i].flush;
     677         202 :         XLogRecPtr  apply = sync_standbys[i].apply;
     678             : 
     679         202 :         if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
     680         202 :             *writePtr = write;
     681         202 :         if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
     682         202 :             *flushPtr = flush;
     683         202 :         if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
     684         202 :             *applyPtr = apply;
     685             :     }
     686         202 : }
     687             : 
     688             : /*
     689             :  * Calculate the Nth latest Write, Flush and Apply positions among sync
     690             :  * standbys.
     691             :  */
     692             : static void
     693           0 : SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
     694             :                               XLogRecPtr *flushPtr,
     695             :                               XLogRecPtr *applyPtr,
     696             :                               SyncRepStandbyData *sync_standbys,
     697             :                               int num_standbys,
     698             :                               uint8 nth)
     699             : {
     700             :     XLogRecPtr *write_array;
     701             :     XLogRecPtr *flush_array;
     702             :     XLogRecPtr *apply_array;
     703             :     int         i;
     704             : 
     705             :     /* Should have enough candidates, or somebody messed up */
     706             :     Assert(nth > 0 && nth <= num_standbys);
     707             : 
     708           0 :     write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
     709           0 :     flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
     710           0 :     apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
     711             : 
     712           0 :     for (i = 0; i < num_standbys; i++)
     713             :     {
     714           0 :         write_array[i] = sync_standbys[i].write;
     715           0 :         flush_array[i] = sync_standbys[i].flush;
     716           0 :         apply_array[i] = sync_standbys[i].apply;
     717             :     }
     718             : 
     719             :     /* Sort each array in descending order */
     720           0 :     qsort(write_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
     721           0 :     qsort(flush_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
     722           0 :     qsort(apply_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
     723             : 
     724             :     /* Get Nth latest Write, Flush, Apply positions */
     725           0 :     *writePtr = write_array[nth - 1];
     726           0 :     *flushPtr = flush_array[nth - 1];
     727           0 :     *applyPtr = apply_array[nth - 1];
     728             : 
     729           0 :     pfree(write_array);
     730           0 :     pfree(flush_array);
     731           0 :     pfree(apply_array);
     732           0 : }
     733             : 
     734             : /*
     735             :  * Compare lsn in order to sort array in descending order.
     736             :  */
     737             : static int
     738           0 : cmp_lsn(const void *a, const void *b)
     739             : {
     740           0 :     XLogRecPtr  lsn1 = *((const XLogRecPtr *) a);
     741           0 :     XLogRecPtr  lsn2 = *((const XLogRecPtr *) b);
     742             : 
     743           0 :     return pg_cmp_u64(lsn2, lsn1);
     744             : }
     745             : 
     746             : /*
     747             :  * Return data about walsenders that are candidates to be sync standbys.
     748             :  *
     749             :  * *standbys is set to a palloc'd array of structs of per-walsender data,
     750             :  * and the number of valid entries (candidate sync senders) is returned.
     751             :  * (This might be more or fewer than num_sync; caller must check.)
     752             :  */
     753             : int
     754        1814 : SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
     755             : {
     756             :     int         i;
     757             :     int         n;
     758             : 
     759             :     /* Create result array */
     760        1814 :     *standbys = (SyncRepStandbyData *)
     761        1814 :         palloc(max_wal_senders * sizeof(SyncRepStandbyData));
     762             : 
     763             :     /* Quick exit if sync replication is not requested */
     764        1814 :     if (SyncRepConfig == NULL)
     765        1578 :         return 0;
     766             : 
     767             :     /* Collect raw data from shared memory */
     768         236 :     n = 0;
     769        2596 :     for (i = 0; i < max_wal_senders; i++)
     770             :     {
     771             :         volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
     772             :                                      * rearrangement */
     773             :         SyncRepStandbyData *stby;
     774             :         WalSndState state;      /* not included in SyncRepStandbyData */
     775             : 
     776        2360 :         walsnd = &WalSndCtl->walsnds[i];
     777        2360 :         stby = *standbys + n;
     778             : 
     779        2360 :         SpinLockAcquire(&walsnd->mutex);
     780        2360 :         stby->pid = walsnd->pid;
     781        2360 :         state = walsnd->state;
     782        2360 :         stby->write = walsnd->write;
     783        2360 :         stby->flush = walsnd->flush;
     784        2360 :         stby->apply = walsnd->apply;
     785        2360 :         stby->sync_standby_priority = walsnd->sync_standby_priority;
     786        2360 :         SpinLockRelease(&walsnd->mutex);
     787             : 
     788             :         /* Must be active */
     789        2360 :         if (stby->pid == 0)
     790        2068 :             continue;
     791             : 
     792             :         /* Must be streaming or stopping */
     793         292 :         if (state != WALSNDSTATE_STREAMING &&
     794             :             state != WALSNDSTATE_STOPPING)
     795           0 :             continue;
     796             : 
     797             :         /* Must be synchronous */
     798         292 :         if (stby->sync_standby_priority == 0)
     799          14 :             continue;
     800             : 
     801             :         /* Must have a valid flush position */
     802         278 :         if (XLogRecPtrIsInvalid(stby->flush))
     803           0 :             continue;
     804             : 
     805             :         /* OK, it's a candidate */
     806         278 :         stby->walsnd_index = i;
     807         278 :         stby->is_me = (walsnd == MyWalSnd);
     808         278 :         n++;
     809             :     }
     810             : 
     811             :     /*
     812             :      * In quorum mode, we return all the candidates.  In priority mode, if we
     813             :      * have too many candidates then return only the num_sync ones of highest
     814             :      * priority.
     815             :      */
     816         236 :     if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY &&
     817         234 :         n > SyncRepConfig->num_sync)
     818             :     {
     819             :         /* Sort by priority ... */
     820          16 :         qsort(*standbys, n, sizeof(SyncRepStandbyData),
     821             :               standby_priority_comparator);
     822             :         /* ... then report just the first num_sync ones */
     823          16 :         n = SyncRepConfig->num_sync;
     824             :     }
     825             : 
     826         236 :     return n;
     827             : }
     828             : 
     829             : /*
     830             :  * qsort comparator to sort SyncRepStandbyData entries by priority
     831             :  */
     832             : static int
     833          38 : standby_priority_comparator(const void *a, const void *b)
     834             : {
     835          38 :     const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
     836          38 :     const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
     837             : 
     838             :     /* First, sort by increasing priority value */
     839          38 :     if (sa->sync_standby_priority != sb->sync_standby_priority)
     840          18 :         return sa->sync_standby_priority - sb->sync_standby_priority;
     841             : 
     842             :     /*
     843             :      * We might have equal priority values; arbitrarily break ties by position
     844             :      * in the WalSnd array.  (This is utterly bogus, since that is arrival
     845             :      * order dependent, but there are regression tests that rely on it.)
     846             :      */
     847          20 :     return sa->walsnd_index - sb->walsnd_index;
     848             : }
     849             : 
     850             : 
     851             : /*
     852             :  * Check if we are in the list of sync standbys, and if so, determine
     853             :  * priority sequence. Return priority if set, or zero to indicate that
     854             :  * we are not a potential sync standby.
     855             :  *
     856             :  * Compare the parameter SyncRepStandbyNames against the application_name
     857             :  * for this WALSender, or allow any name if we find a wildcard "*".
     858             :  */
     859             : static int
     860        1302 : SyncRepGetStandbyPriority(void)
     861             : {
     862             :     const char *standby_name;
     863             :     int         priority;
     864        1302 :     bool        found = false;
     865             : 
     866             :     /*
     867             :      * Since synchronous cascade replication is not allowed, we always set the
     868             :      * priority of cascading walsender to zero.
     869             :      */
     870        1302 :     if (am_cascading_walsender)
     871          50 :         return 0;
     872             : 
     873        1252 :     if (!SyncStandbysDefined() || SyncRepConfig == NULL)
     874        1204 :         return 0;
     875             : 
     876          48 :     standby_name = SyncRepConfig->member_names;
     877          64 :     for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
     878             :     {
     879          62 :         if (pg_strcasecmp(standby_name, application_name) == 0 ||
     880          36 :             strcmp(standby_name, "*") == 0)
     881             :         {
     882          46 :             found = true;
     883          46 :             break;
     884             :         }
     885          16 :         standby_name += strlen(standby_name) + 1;
     886             :     }
     887             : 
     888          48 :     if (!found)
     889           2 :         return 0;
     890             : 
     891             :     /*
     892             :      * In quorum-based sync replication, all the standbys in the list have the
     893             :      * same priority, one.
     894             :      */
     895          46 :     return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
     896             : }
     897             : 
     898             : /*
     899             :  * Walk the specified queue from head.  Set the state of any backends that
     900             :  * need to be woken, remove them from the queue, and then wake them.
     901             :  * Pass all = true to wake whole queue; otherwise, just wake up to
     902             :  * the walsender's LSN.
     903             :  *
     904             :  * The caller must hold SyncRepLock in exclusive mode.
     905             :  */
     906             : static int
     907         240 : SyncRepWakeQueue(bool all, int mode)
     908             : {
     909         240 :     volatile WalSndCtlData *walsndctl = WalSndCtl;
     910         240 :     int         numprocs = 0;
     911             :     dlist_mutable_iter iter;
     912             : 
     913             :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
     914             :     Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
     915             :     Assert(SyncRepQueueIsOrderedByLSN(mode));
     916             : 
     917         282 :     dlist_foreach_modify(iter, &WalSndCtl->SyncRepQueue[mode])
     918             :     {
     919          58 :         PGPROC     *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
     920             : 
     921             :         /*
     922             :          * Assume the queue is ordered by LSN
     923             :          */
     924          58 :         if (!all && walsndctl->lsn[mode] < proc->waitLSN)
     925          16 :             return numprocs;
     926             : 
     927             :         /*
     928             :          * Remove from queue.
     929             :          */
     930          42 :         dlist_delete_thoroughly(&proc->syncRepLinks);
     931             : 
     932             :         /*
     933             :          * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
     934             :          * make sure that it sees the queue link being removed before the
     935             :          * syncRepState change.
     936             :          */
     937          42 :         pg_write_barrier();
     938             : 
     939             :         /*
     940             :          * Set state to complete; see SyncRepWaitForLSN() for discussion of
     941             :          * the various states.
     942             :          */
     943          42 :         proc->syncRepState = SYNC_REP_WAIT_COMPLETE;
     944             : 
     945             :         /*
     946             :          * Wake only when we have set state and removed from queue.
     947             :          */
     948          42 :         SetLatch(&(proc->procLatch));
     949             : 
     950          42 :         numprocs++;
     951             :     }
     952             : 
     953         224 :     return numprocs;
     954             : }
     955             : 
     956             : /*
     957             :  * The checkpointer calls this as needed to update the shared
     958             :  * sync_standbys_status flag, so that backends don't remain permanently wedged
     959             :  * if synchronous_standby_names is unset.  It's safe to check the current value
     960             :  * without the lock, because it's only ever updated by one process.  But we
     961             :  * must take the lock to change it.
     962             :  */
     963             : void
     964        1120 : SyncRepUpdateSyncStandbysDefined(void)
     965             : {
     966        1120 :     bool        sync_standbys_defined = SyncStandbysDefined();
     967             : 
     968        1120 :     if (sync_standbys_defined !=
     969        1120 :         ((WalSndCtl->sync_standbys_status & SYNC_STANDBY_DEFINED) != 0))
     970             :     {
     971          26 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     972             : 
     973             :         /*
     974             :          * If synchronous_standby_names has been reset to empty, it's futile
     975             :          * for backends to continue waiting.  Since the user no longer wants
     976             :          * synchronous replication, we'd better wake them up.
     977             :          */
     978          26 :         if (!sync_standbys_defined)
     979             :         {
     980             :             int         i;
     981             : 
     982           8 :             for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
     983           6 :                 SyncRepWakeQueue(true, i);
     984             :         }
     985             : 
     986             :         /*
     987             :          * Only allow people to join the queue when there are synchronous
     988             :          * standbys defined.  Without this interlock, there's a race
     989             :          * condition: we might wake up all the current waiters; then, some
     990             :          * backend that hasn't yet reloaded its config might go to sleep on
     991             :          * the queue (and never wake up).  This prevents that.
     992             :          */
     993          26 :         WalSndCtl->sync_standbys_status = SYNC_STANDBY_INIT |
     994             :             (sync_standbys_defined ? SYNC_STANDBY_DEFINED : 0);
     995             : 
     996          26 :         LWLockRelease(SyncRepLock);
     997             :     }
     998        1094 :     else if ((WalSndCtl->sync_standbys_status & SYNC_STANDBY_INIT) == 0)
     999             :     {
    1000         988 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
    1001             : 
    1002             :         /*
    1003             :          * Note that there is no need to wake up the queues here.  We would
    1004             :          * reach this path only if SyncStandbysDefined() returns false, or it
    1005             :          * would mean that some backends are waiting with the GUC set.  See
    1006             :          * SyncRepWaitForLSN().
    1007             :          */
    1008             :         Assert(!SyncStandbysDefined());
    1009             : 
    1010             :         /*
    1011             :          * Even if there is no sync standby defined, let the readers of this
    1012             :          * information know that the sync standby data has been initialized.
    1013             :          * This can just be done once, hence the previous check on
    1014             :          * SYNC_STANDBY_INIT to avoid useless work.
    1015             :          */
    1016         988 :         WalSndCtl->sync_standbys_status |= SYNC_STANDBY_INIT;
    1017             : 
    1018         988 :         LWLockRelease(SyncRepLock);
    1019             :     }
    1020        1120 : }
    1021             : 
    1022             : #ifdef USE_ASSERT_CHECKING
    1023             : static bool
    1024             : SyncRepQueueIsOrderedByLSN(int mode)
    1025             : {
    1026             :     XLogRecPtr  lastLSN;
    1027             :     dlist_iter  iter;
    1028             : 
    1029             :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
    1030             : 
    1031             :     lastLSN = 0;
    1032             : 
    1033             :     dlist_foreach(iter, &WalSndCtl->SyncRepQueue[mode])
    1034             :     {
    1035             :         PGPROC     *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
    1036             : 
    1037             :         /*
    1038             :          * Check the queue is ordered by LSN and that multiple procs don't
    1039             :          * have matching LSNs
    1040             :          */
    1041             :         if (proc->waitLSN <= lastLSN)
    1042             :             return false;
    1043             : 
    1044             :         lastLSN = proc->waitLSN;
    1045             :     }
    1046             : 
    1047             :     return true;
    1048             : }
    1049             : #endif
    1050             : 
    1051             : /*
    1052             :  * ===========================================================
    1053             :  * Synchronous Replication functions executed by any process
    1054             :  * ===========================================================
    1055             :  */
    1056             : 
    1057             : bool
    1058        2318 : check_synchronous_standby_names(char **newval, void **extra, GucSource source)
    1059             : {
    1060        2318 :     if (*newval != NULL && (*newval)[0] != '\0')
    1061         134 :     {
    1062             :         yyscan_t    scanner;
    1063             :         int         parse_rc;
    1064             :         SyncRepConfigData *pconf;
    1065             : 
    1066             :         /* Result of parsing is returned in one of these two variables */
    1067         134 :         SyncRepConfigData *syncrep_parse_result = NULL;
    1068         134 :         char       *syncrep_parse_error_msg = NULL;
    1069             : 
    1070             :         /* Parse the synchronous_standby_names string */
    1071         134 :         syncrep_scanner_init(*newval, &scanner);
    1072         134 :         parse_rc = syncrep_yyparse(&syncrep_parse_result, &syncrep_parse_error_msg, scanner);
    1073         134 :         syncrep_scanner_finish(scanner);
    1074             : 
    1075         134 :         if (parse_rc != 0 || syncrep_parse_result == NULL)
    1076             :         {
    1077           0 :             GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
    1078           0 :             if (syncrep_parse_error_msg)
    1079           0 :                 GUC_check_errdetail("%s", syncrep_parse_error_msg);
    1080             :             else
    1081           0 :                 GUC_check_errdetail("\"%s\" parser failed.",
    1082             :                                     "synchronous_standby_names");
    1083           0 :             return false;
    1084             :         }
    1085             : 
    1086         134 :         if (syncrep_parse_result->num_sync <= 0)
    1087             :         {
    1088           0 :             GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
    1089           0 :                              syncrep_parse_result->num_sync);
    1090           0 :             return false;
    1091             :         }
    1092             : 
    1093             :         /* GUC extra value must be guc_malloc'd, not palloc'd */
    1094             :         pconf = (SyncRepConfigData *)
    1095         134 :             guc_malloc(LOG, syncrep_parse_result->config_size);
    1096         134 :         if (pconf == NULL)
    1097           0 :             return false;
    1098         134 :         memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);
    1099             : 
    1100         134 :         *extra = pconf;
    1101             : 
    1102             :         /*
    1103             :          * We need not explicitly clean up syncrep_parse_result.  It, and any
    1104             :          * other cruft generated during parsing, will be freed when the
    1105             :          * current memory context is deleted.  (This code is generally run in
    1106             :          * a short-lived context used for config file processing, so that will
    1107             :          * not be very long.)
    1108             :          */
    1109             :     }
    1110             :     else
    1111        2184 :         *extra = NULL;
    1112             : 
    1113        2318 :     return true;
    1114             : }
    1115             : 
    1116             : void
    1117        2298 : assign_synchronous_standby_names(const char *newval, void *extra)
    1118             : {
    1119        2298 :     SyncRepConfig = (SyncRepConfigData *) extra;
    1120        2298 : }
    1121             : 
    1122             : void
    1123        5784 : assign_synchronous_commit(int newval, void *extra)
    1124             : {
    1125        5784 :     switch (newval)
    1126             :     {
    1127           0 :         case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
    1128           0 :             SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
    1129           0 :             break;
    1130        2422 :         case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
    1131        2422 :             SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
    1132        2422 :             break;
    1133           4 :         case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
    1134           4 :             SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
    1135           4 :             break;
    1136        3358 :         default:
    1137        3358 :             SyncRepWaitMode = SYNC_REP_NO_WAIT;
    1138        3358 :             break;
    1139             :     }
    1140        5784 : }

Generated by: LCOV version 1.14