LCOV - code coverage report
Current view: top level - src/backend/replication - syncrep.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 76.6 % 295 226
Test Date: 2026-03-03 14:15:12 Functions: 83.3 % 18 15
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * syncrep.c
       4              :  *
       5              :  * Synchronous replication is new as of PostgreSQL 9.1.
       6              :  *
       7              :  * If requested, transaction commits wait until their commit LSN are
       8              :  * acknowledged by the synchronous standbys.
       9              :  *
      10              :  * This module contains the code for waiting and release of backends.
      11              :  * All code in this module executes on the primary. The core streaming
      12              :  * replication transport remains within WALreceiver/WALsender modules.
      13              :  *
      14              :  * The essence of this design is that it isolates all logic about
      15              :  * waiting/releasing onto the primary. The primary defines which standbys
      16              :  * it wishes to wait for. The standbys are completely unaware of the
      17              :  * durability requirements of transactions on the primary, reducing the
      18              :  * complexity of the code and streamlining both standby operations and
      19              :  * network bandwidth because there is no requirement to ship
      20              :  * per-transaction state information.
      21              :  *
      22              :  * Replication is either synchronous or not synchronous (async). If it is
      23              :  * async, we just fastpath out of here. If it is sync, then we wait for
      24              :  * the write, flush or apply location on the standby before releasing
      25              :  * the waiting backend. Further complexity in that interaction is
      26              :  * expected in later releases.
      27              :  *
      28              :  * The best performing way to manage the waiting backends is to have a
      29              :  * single ordered queue of waiting backends, so that we can avoid
      30              :  * searching the through all waiters each time we receive a reply.
      31              :  *
      32              :  * In 9.5 or before only a single standby could be considered as
      33              :  * synchronous. In 9.6 we support a priority-based multiple synchronous
      34              :  * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
      35              :  * supported. The number of synchronous standbys that transactions
      36              :  * must wait for replies from is specified in synchronous_standby_names.
      37              :  * This parameter also specifies a list of standby names and the method
      38              :  * (FIRST and ANY) to choose synchronous standbys from the listed ones.
      39              :  *
      40              :  * The method FIRST specifies a priority-based synchronous replication
      41              :  * and makes transaction commits wait until their WAL records are
      42              :  * replicated to the requested number of synchronous standbys chosen based
      43              :  * on their priorities. The standbys whose names appear earlier in the list
      44              :  * are given higher priority and will be considered as synchronous.
      45              :  * Other standby servers appearing later in this list represent potential
      46              :  * synchronous standbys. If any of the current synchronous standbys
      47              :  * disconnects for whatever reason, it will be replaced immediately with
      48              :  * the next-highest-priority standby.
      49              :  *
      50              :  * The method ANY specifies a quorum-based synchronous replication
      51              :  * and makes transaction commits wait until their WAL records are
      52              :  * replicated to at least the requested number of synchronous standbys
      53              :  * in the list. All the standbys appearing in the list are considered as
      54              :  * candidates for quorum synchronous standbys.
      55              :  *
      56              :  * If neither FIRST nor ANY is specified, FIRST is used as the method.
      57              :  * This is for backward compatibility with 9.6 or before where only a
      58              :  * priority-based sync replication was supported.
      59              :  *
      60              :  * Before the standbys chosen from synchronous_standby_names can
      61              :  * become the synchronous standbys they must have caught up with
      62              :  * the primary; that may take some time. Once caught up,
      63              :  * the standbys which are considered as synchronous at that moment
      64              :  * will release waiters from the queue.
      65              :  *
      66              :  * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
      67              :  *
      68              :  * IDENTIFICATION
      69              :  *    src/backend/replication/syncrep.c
      70              :  *
      71              :  *-------------------------------------------------------------------------
      72              :  */
      73              : #include "postgres.h"
      74              : 
      75              : #include <unistd.h>
      76              : 
      77              : #include "access/xact.h"
      78              : #include "common/int.h"
      79              : #include "miscadmin.h"
      80              : #include "pgstat.h"
      81              : #include "replication/syncrep.h"
      82              : #include "replication/walsender.h"
      83              : #include "replication/walsender_private.h"
      84              : #include "storage/proc.h"
      85              : #include "tcop/tcopprot.h"
      86              : #include "utils/guc_hooks.h"
      87              : #include "utils/ps_status.h"
      88              : 
      89              : /* User-settable parameters for sync rep */
      90              : char       *SyncRepStandbyNames;
      91              : 
      92              : #define SyncStandbysDefined() \
      93              :     (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
      94              : 
      95              : static bool announce_next_takeover = true;
      96              : 
      97              : SyncRepConfigData *SyncRepConfig = NULL;
      98              : static int  SyncRepWaitMode = SYNC_REP_NO_WAIT;
      99              : 
     100              : static void SyncRepQueueInsert(int mode);
     101              : static void SyncRepCancelWait(void);
     102              : static int  SyncRepWakeQueue(bool all, int mode);
     103              : 
     104              : static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
     105              :                                  XLogRecPtr *flushPtr,
     106              :                                  XLogRecPtr *applyPtr,
     107              :                                  bool *am_sync);
     108              : static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
     109              :                                        XLogRecPtr *flushPtr,
     110              :                                        XLogRecPtr *applyPtr,
     111              :                                        SyncRepStandbyData *sync_standbys,
     112              :                                        int num_standbys);
     113              : static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
     114              :                                           XLogRecPtr *flushPtr,
     115              :                                           XLogRecPtr *applyPtr,
     116              :                                           SyncRepStandbyData *sync_standbys,
     117              :                                           int num_standbys,
     118              :                                           uint8 nth);
     119              : static int  SyncRepGetStandbyPriority(void);
     120              : static int  standby_priority_comparator(const void *a, const void *b);
     121              : static int  cmp_lsn(const void *a, const void *b);
     122              : 
     123              : #ifdef USE_ASSERT_CHECKING
     124              : static bool SyncRepQueueIsOrderedByLSN(int mode);
     125              : #endif
     126              : 
     127              : /*
     128              :  * ===========================================================
     129              :  * Synchronous Replication functions for normal user backends
     130              :  * ===========================================================
     131              :  */
     132              : 
     133              : /*
     134              :  * Wait for synchronous replication, if requested by user.
     135              :  *
     136              :  * Initially backends start in state SYNC_REP_NOT_WAITING and then
     137              :  * change that state to SYNC_REP_WAITING before adding ourselves
     138              :  * to the wait queue. During SyncRepWakeQueue() a WALSender changes
     139              :  * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
     140              :  * This backend then resets its state to SYNC_REP_NOT_WAITING.
     141              :  *
     142              :  * 'lsn' represents the LSN to wait for.  'commit' indicates whether this LSN
     143              :  * represents a commit record.  If it doesn't, then we wait only for the WAL
     144              :  * to be flushed if synchronous_commit is set to the higher level of
     145              :  * remote_apply, because only commit records provide apply feedback.
     146              :  */
     147              : void
     148       127775 : SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
     149              : {
     150              :     int         mode;
     151              : 
     152              :     /*
     153              :      * This should be called while holding interrupts during a transaction
     154              :      * commit to prevent the follow-up shared memory queue cleanups to be
     155              :      * influenced by external interruptions.
     156              :      */
     157              :     Assert(InterruptHoldoffCount > 0);
     158              : 
     159              :     /*
     160              :      * Fast exit if user has not requested sync replication, or there are no
     161              :      * sync replication standby names defined.
     162              :      *
     163              :      * Since this routine gets called every commit time, it's important to
     164              :      * exit quickly if sync replication is not requested.
     165              :      *
     166              :      * We check WalSndCtl->sync_standbys_status flag without the lock and exit
     167              :      * immediately if SYNC_STANDBY_INIT is set (the checkpointer has
     168              :      * initialized this data) but SYNC_STANDBY_DEFINED is missing (no sync
     169              :      * replication requested).
     170              :      *
     171              :      * If SYNC_STANDBY_DEFINED is set, we need to check the status again later
     172              :      * while holding the lock, to check the flag and operate the sync rep
     173              :      * queue atomically.  This is necessary to avoid the race condition
     174              :      * described in SyncRepUpdateSyncStandbysDefined().  On the other hand, if
     175              :      * SYNC_STANDBY_DEFINED is not set, the lock is not necessary because we
     176              :      * don't touch the queue.
     177              :      */
     178       127775 :     if (!SyncRepRequested() ||
     179        95822 :         ((((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_status) &
     180              :          (SYNC_STANDBY_INIT | SYNC_STANDBY_DEFINED)) == SYNC_STANDBY_INIT)
     181        92011 :         return;
     182              : 
     183              :     /* Cap the level for anything other than commit to remote flush only. */
     184        35764 :     if (commit)
     185        35746 :         mode = SyncRepWaitMode;
     186              :     else
     187           18 :         mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
     188              : 
     189              :     Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
     190              :     Assert(WalSndCtl != NULL);
     191              : 
     192        35764 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     193              :     Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
     194              : 
     195              :     /*
     196              :      * We don't wait for sync rep if SYNC_STANDBY_DEFINED is not set.  See
     197              :      * SyncRepUpdateSyncStandbysDefined().
     198              :      *
     199              :      * Also check that the standby hasn't already replied. Unlikely race
     200              :      * condition but we'll be fetching that cache line anyway so it's likely
     201              :      * to be a low cost check.
     202              :      *
     203              :      * If the sync standby data has not been initialized yet
     204              :      * (SYNC_STANDBY_INIT is not set), fall back to a check based on the LSN,
     205              :      * then do a direct GUC check.
     206              :      */
     207        35764 :     if (WalSndCtl->sync_standbys_status & SYNC_STANDBY_INIT)
     208              :     {
     209           40 :         if ((WalSndCtl->sync_standbys_status & SYNC_STANDBY_DEFINED) == 0 ||
     210           40 :             lsn <= WalSndCtl->lsn[mode])
     211              :         {
     212            6 :             LWLockRelease(SyncRepLock);
     213            6 :             return;
     214              :         }
     215              :     }
     216        35724 :     else if (lsn <= WalSndCtl->lsn[mode])
     217              :     {
     218              :         /*
     219              :          * The LSN is older than what we need to wait for.  The sync standby
     220              :          * data has not been initialized yet, but we are OK to not wait
     221              :          * because we know that there is no point in doing so based on the
     222              :          * LSN.
     223              :          */
     224            0 :         LWLockRelease(SyncRepLock);
     225            0 :         return;
     226              :     }
     227        35724 :     else if (!SyncStandbysDefined())
     228              :     {
     229              :         /*
     230              :          * If we are here, the sync standby data has not been initialized yet,
     231              :          * and the LSN is newer than what need to wait for, so we have fallen
     232              :          * back to the best thing we could do in this case: a check on
     233              :          * SyncStandbysDefined() to see if the GUC is set or not.
     234              :          *
     235              :          * When the GUC has a value, we wait until the checkpointer updates
     236              :          * the status data because we cannot be sure yet if we should wait or
     237              :          * not. Here, the GUC has *no* value, we are sure that there is no
     238              :          * point to wait; this matters for example when initializing a
     239              :          * cluster, where we should never wait, and no sync standbys is the
     240              :          * default behavior.
     241              :          */
     242        35724 :         LWLockRelease(SyncRepLock);
     243        35724 :         return;
     244              :     }
     245              : 
     246              :     /*
     247              :      * Set our waitLSN so WALSender will know when to wake us, and add
     248              :      * ourselves to the queue.
     249              :      */
     250           34 :     MyProc->waitLSN = lsn;
     251           34 :     MyProc->syncRepState = SYNC_REP_WAITING;
     252           34 :     SyncRepQueueInsert(mode);
     253              :     Assert(SyncRepQueueIsOrderedByLSN(mode));
     254           34 :     LWLockRelease(SyncRepLock);
     255              : 
     256              :     /* Alter ps display to show waiting for sync rep. */
     257           34 :     if (update_process_title)
     258              :     {
     259              :         char        buffer[32];
     260              : 
     261           34 :         sprintf(buffer, "waiting for %X/%08X", LSN_FORMAT_ARGS(lsn));
     262           34 :         set_ps_display_suffix(buffer);
     263              :     }
     264              : 
     265              :     /*
     266              :      * Wait for specified LSN to be confirmed.
     267              :      *
     268              :      * Each proc has its own wait latch, so we perform a normal latch
     269              :      * check/wait loop here.
     270              :      */
     271              :     for (;;)
     272           34 :     {
     273              :         int         rc;
     274              : 
     275              :         /* Must reset the latch before testing state. */
     276           68 :         ResetLatch(MyLatch);
     277              : 
     278              :         /*
     279              :          * Acquiring the lock is not needed, the latch ensures proper
     280              :          * barriers. If it looks like we're done, we must really be done,
     281              :          * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
     282              :          * it will never update it again, so we can't be seeing a stale value
     283              :          * in that case.
     284              :          */
     285           68 :         if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
     286           34 :             break;
     287              : 
     288              :         /*
     289              :          * If a wait for synchronous replication is pending, we can neither
     290              :          * acknowledge the commit nor raise ERROR or FATAL.  The latter would
     291              :          * lead the client to believe that the transaction aborted, which is
     292              :          * not true: it's already committed locally. The former is no good
     293              :          * either: the client has requested synchronous replication, and is
     294              :          * entitled to assume that an acknowledged commit is also replicated,
     295              :          * which might not be true. So in this case we issue a WARNING (which
     296              :          * some clients may be able to interpret) and shut off further output.
     297              :          * We do NOT reset ProcDiePending, so that the process will die after
     298              :          * the commit is cleaned up.
     299              :          */
     300           34 :         if (ProcDiePending)
     301              :         {
     302            0 :             ereport(WARNING,
     303              :                     (errcode(ERRCODE_ADMIN_SHUTDOWN),
     304              :                      errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
     305              :                      errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
     306            0 :             whereToSendOutput = DestNone;
     307            0 :             SyncRepCancelWait();
     308            0 :             break;
     309              :         }
     310              : 
     311              :         /*
     312              :          * It's unclear what to do if a query cancel interrupt arrives.  We
     313              :          * can't actually abort at this point, but ignoring the interrupt
     314              :          * altogether is not helpful, so we just terminate the wait with a
     315              :          * suitable warning.
     316              :          */
     317           34 :         if (QueryCancelPending)
     318              :         {
     319            0 :             QueryCancelPending = false;
     320            0 :             ereport(WARNING,
     321              :                     (errmsg("canceling wait for synchronous replication due to user request"),
     322              :                      errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
     323            0 :             SyncRepCancelWait();
     324            0 :             break;
     325              :         }
     326              : 
     327              :         /*
     328              :          * Wait on latch.  Any condition that should wake us up will set the
     329              :          * latch, so no need for timeout.
     330              :          */
     331           34 :         rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
     332              :                        WAIT_EVENT_SYNC_REP);
     333              : 
     334              :         /*
     335              :          * If the postmaster dies, we'll probably never get an acknowledgment,
     336              :          * because all the wal sender processes will exit. So just bail out.
     337              :          */
     338           34 :         if (rc & WL_POSTMASTER_DEATH)
     339              :         {
     340            0 :             ProcDiePending = true;
     341            0 :             whereToSendOutput = DestNone;
     342            0 :             SyncRepCancelWait();
     343            0 :             break;
     344              :         }
     345              :     }
     346              : 
     347              :     /*
     348              :      * WalSender has checked our LSN and has removed us from queue. Clean up
     349              :      * state and leave.  It's OK to reset these shared memory fields without
     350              :      * holding SyncRepLock, because any walsenders will ignore us anyway when
     351              :      * we're not on the queue.  We need a read barrier to make sure we see the
     352              :      * changes to the queue link (this might be unnecessary without
     353              :      * assertions, but better safe than sorry).
     354              :      */
     355           34 :     pg_read_barrier();
     356              :     Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
     357           34 :     MyProc->syncRepState = SYNC_REP_NOT_WAITING;
     358           34 :     MyProc->waitLSN = InvalidXLogRecPtr;
     359              : 
     360              :     /* reset ps display to remove the suffix */
     361           34 :     if (update_process_title)
     362           34 :         set_ps_display_remove_suffix();
     363              : }
     364              : 
     365              : /*
     366              :  * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
     367              :  *
     368              :  * Usually we will go at tail of queue, though it's possible that we arrive
     369              :  * here out of order, so start at tail and work back to insertion point.
     370              :  */
     371              : static void
     372           34 : SyncRepQueueInsert(int mode)
     373              : {
     374              :     dlist_head *queue;
     375              :     dlist_iter  iter;
     376              : 
     377              :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
     378           34 :     queue = &WalSndCtl->SyncRepQueue[mode];
     379              : 
     380           34 :     dlist_reverse_foreach(iter, queue)
     381              :     {
     382            0 :         PGPROC     *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
     383              : 
     384              :         /*
     385              :          * Stop at the queue element that we should insert after to ensure the
     386              :          * queue is ordered by LSN.
     387              :          */
     388            0 :         if (proc->waitLSN < MyProc->waitLSN)
     389              :         {
     390            0 :             dlist_insert_after(&proc->syncRepLinks, &MyProc->syncRepLinks);
     391            0 :             return;
     392              :         }
     393              :     }
     394              : 
     395              :     /*
     396              :      * If we get here, the list was either empty, or this process needs to be
     397              :      * at the head.
     398              :      */
     399           34 :     dlist_push_head(queue, &MyProc->syncRepLinks);
     400              : }
     401              : 
     402              : /*
     403              :  * Acquire SyncRepLock and cancel any wait currently in progress.
     404              :  */
     405              : static void
     406            0 : SyncRepCancelWait(void)
     407              : {
     408            0 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     409            0 :     if (!dlist_node_is_detached(&MyProc->syncRepLinks))
     410            0 :         dlist_delete_thoroughly(&MyProc->syncRepLinks);
     411            0 :     MyProc->syncRepState = SYNC_REP_NOT_WAITING;
     412            0 :     LWLockRelease(SyncRepLock);
     413            0 : }
     414              : 
     415              : void
     416        18348 : SyncRepCleanupAtProcExit(void)
     417              : {
     418              :     /*
     419              :      * First check if we are removed from the queue without the lock to not
     420              :      * slow down backend exit.
     421              :      */
     422        18348 :     if (!dlist_node_is_detached(&MyProc->syncRepLinks))
     423              :     {
     424            0 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     425              : 
     426              :         /* maybe we have just been removed, so recheck */
     427            0 :         if (!dlist_node_is_detached(&MyProc->syncRepLinks))
     428            0 :             dlist_delete_thoroughly(&MyProc->syncRepLinks);
     429              : 
     430            0 :         LWLockRelease(SyncRepLock);
     431              :     }
     432        18348 : }
     433              : 
     434              : /*
     435              :  * ===========================================================
     436              :  * Synchronous Replication functions for wal sender processes
     437              :  * ===========================================================
     438              :  */
     439              : 
     440              : /*
     441              :  * Take any action required to initialise sync rep state from config
     442              :  * data. Called at WALSender startup and after each SIGHUP.
     443              :  */
     444              : void
     445          746 : SyncRepInitConfig(void)
     446              : {
     447              :     int         priority;
     448              : 
     449              :     /*
     450              :      * Determine if we are a potential sync standby and remember the result
     451              :      * for handling replies from standby.
     452              :      */
     453          746 :     priority = SyncRepGetStandbyPriority();
     454          746 :     if (MyWalSnd->sync_standby_priority != priority)
     455              :     {
     456           17 :         SpinLockAcquire(&MyWalSnd->mutex);
     457           17 :         MyWalSnd->sync_standby_priority = priority;
     458           17 :         SpinLockRelease(&MyWalSnd->mutex);
     459              : 
     460           17 :         ereport(DEBUG1,
     461              :                 (errmsg_internal("standby \"%s\" now has synchronous standby priority %d",
     462              :                                  application_name, priority)));
     463              :     }
     464          746 : }
     465              : 
     466              : /*
     467              :  * Update the LSNs on each queue based upon our latest state. This
     468              :  * implements a simple policy of first-valid-sync-standby-releases-waiter.
     469              :  *
     470              :  * Other policies are possible, which would change what we do here and
     471              :  * perhaps also which information we store as well.
     472              :  */
     473              : void
     474       114766 : SyncRepReleaseWaiters(void)
     475              : {
     476       114766 :     volatile WalSndCtlData *walsndctl = WalSndCtl;
     477              :     XLogRecPtr  writePtr;
     478              :     XLogRecPtr  flushPtr;
     479              :     XLogRecPtr  applyPtr;
     480              :     bool        got_recptr;
     481              :     bool        am_sync;
     482       114766 :     int         numwrite = 0;
     483       114766 :     int         numflush = 0;
     484       114766 :     int         numapply = 0;
     485              : 
     486              :     /*
     487              :      * If this WALSender is serving a standby that is not on the list of
     488              :      * potential sync standbys then we have nothing to do. If we are still
     489              :      * starting up, still running base backup or the current flush position is
     490              :      * still invalid, then leave quickly also.  Streaming or stopping WAL
     491              :      * senders are allowed to release waiters.
     492              :      */
     493       114766 :     if (MyWalSnd->sync_standby_priority == 0 ||
     494          151 :         (MyWalSnd->state != WALSNDSTATE_STREAMING &&
     495           33 :          MyWalSnd->state != WALSNDSTATE_STOPPING) ||
     496          145 :         !XLogRecPtrIsValid(MyWalSnd->flush))
     497              :     {
     498       114621 :         announce_next_takeover = true;
     499       114625 :         return;
     500              :     }
     501              : 
     502              :     /*
     503              :      * We're a potential sync standby. Release waiters if there are enough
     504              :      * sync standbys and we are considered as sync.
     505              :      */
     506          145 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     507              : 
     508              :     /*
     509              :      * Check whether we are a sync standby or not, and calculate the synced
     510              :      * positions among all sync standbys.  (Note: although this step does not
     511              :      * of itself require holding SyncRepLock, it seems like a good idea to do
     512              :      * it after acquiring the lock.  This ensures that the WAL pointers we use
     513              :      * to release waiters are newer than any previous execution of this
     514              :      * routine used.)
     515              :      */
     516          145 :     got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
     517              : 
     518              :     /*
     519              :      * If we are managing a sync standby, though we weren't prior to this,
     520              :      * then announce we are now a sync standby.
     521              :      */
     522          145 :     if (announce_next_takeover && am_sync)
     523              :     {
     524           12 :         announce_next_takeover = false;
     525              : 
     526           12 :         if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
     527           12 :             ereport(LOG,
     528              :                     (errmsg("standby \"%s\" is now a synchronous standby with priority %d",
     529              :                             application_name, MyWalSnd->sync_standby_priority)));
     530              :         else
     531            0 :             ereport(LOG,
     532              :                     (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
     533              :                             application_name)));
     534              :     }
     535              : 
     536              :     /*
     537              :      * If the number of sync standbys is less than requested or we aren't
     538              :      * managing a sync standby then just leave.
     539              :      */
     540          145 :     if (!got_recptr || !am_sync)
     541              :     {
     542            4 :         LWLockRelease(SyncRepLock);
     543            4 :         announce_next_takeover = !am_sync;
     544            4 :         return;
     545              :     }
     546              : 
     547              :     /*
     548              :      * Set the lsn first so that when we wake backends they will release up to
     549              :      * this location.
     550              :      */
     551          141 :     if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
     552              :     {
     553           48 :         walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
     554           48 :         numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
     555              :     }
     556          141 :     if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
     557              :     {
     558           50 :         walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
     559           50 :         numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
     560              :     }
     561          141 :     if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
     562              :     {
     563           45 :         walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
     564           45 :         numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
     565              :     }
     566              : 
     567          141 :     LWLockRelease(SyncRepLock);
     568              : 
     569          141 :     elog(DEBUG3, "released %d procs up to write %X/%08X, %d procs up to flush %X/%08X, %d procs up to apply %X/%08X",
     570              :          numwrite, LSN_FORMAT_ARGS(writePtr),
     571              :          numflush, LSN_FORMAT_ARGS(flushPtr),
     572              :          numapply, LSN_FORMAT_ARGS(applyPtr));
     573              : }
     574              : 
     575              : /*
     576              :  * Calculate the synced Write, Flush and Apply positions among sync standbys.
     577              :  *
     578              :  * Return false if the number of sync standbys is less than
     579              :  * synchronous_standby_names specifies. Otherwise return true and
     580              :  * store the positions into *writePtr, *flushPtr and *applyPtr.
     581              :  *
     582              :  * On return, *am_sync is set to true if this walsender is connecting to
     583              :  * sync standby. Otherwise it's set to false.
     584              :  */
     585              : static bool
     586          145 : SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
     587              :                      XLogRecPtr *applyPtr, bool *am_sync)
     588              : {
     589              :     SyncRepStandbyData *sync_standbys;
     590              :     int         num_standbys;
     591              :     int         i;
     592              : 
     593              :     /* Initialize default results */
     594          145 :     *writePtr = InvalidXLogRecPtr;
     595          145 :     *flushPtr = InvalidXLogRecPtr;
     596          145 :     *applyPtr = InvalidXLogRecPtr;
     597          145 :     *am_sync = false;
     598              : 
     599              :     /* Quick out if not even configured to be synchronous */
     600          145 :     if (SyncRepConfig == NULL)
     601            0 :         return false;
     602              : 
     603              :     /* Get standbys that are considered as synchronous at this moment */
     604          145 :     num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
     605              : 
     606              :     /* Am I among the candidate sync standbys? */
     607          151 :     for (i = 0; i < num_standbys; i++)
     608              :     {
     609          148 :         if (sync_standbys[i].is_me)
     610              :         {
     611          142 :             *am_sync = true;
     612          142 :             break;
     613              :         }
     614              :     }
     615              : 
     616              :     /*
     617              :      * Nothing more to do if we are not managing a sync standby or there are
     618              :      * not enough synchronous standbys.
     619              :      */
     620          145 :     if (!(*am_sync) ||
     621          142 :         num_standbys < SyncRepConfig->num_sync)
     622              :     {
     623            4 :         pfree(sync_standbys);
     624            4 :         return false;
     625              :     }
     626              : 
     627              :     /*
     628              :      * In a priority-based sync replication, the synced positions are the
     629              :      * oldest ones among sync standbys. In a quorum-based, they are the Nth
     630              :      * latest ones.
     631              :      *
     632              :      * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
     633              :      * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
     634              :      * because it's a bit more efficient.
     635              :      *
     636              :      * XXX If the numbers of current and requested sync standbys are the same,
     637              :      * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
     638              :      * positions even in a quorum-based sync replication.
     639              :      */
     640          141 :     if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
     641              :     {
     642          141 :         SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
     643              :                                    sync_standbys, num_standbys);
     644              :     }
     645              :     else
     646              :     {
     647            0 :         SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
     648              :                                       sync_standbys, num_standbys,
     649            0 :                                       SyncRepConfig->num_sync);
     650              :     }
     651              : 
     652          141 :     pfree(sync_standbys);
     653          141 :     return true;
     654              : }
     655              : 
     656              : /*
     657              :  * Calculate the oldest Write, Flush and Apply positions among sync standbys.
     658              :  */
     659              : static void
     660          141 : SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
     661              :                            XLogRecPtr *flushPtr,
     662              :                            XLogRecPtr *applyPtr,
     663              :                            SyncRepStandbyData *sync_standbys,
     664              :                            int num_standbys)
     665              : {
     666              :     int         i;
     667              : 
     668              :     /*
     669              :      * Scan through all sync standbys and calculate the oldest Write, Flush
     670              :      * and Apply positions.  We assume *writePtr et al were initialized to
     671              :      * InvalidXLogRecPtr.
     672              :      */
     673          285 :     for (i = 0; i < num_standbys; i++)
     674              :     {
     675          144 :         XLogRecPtr  write = sync_standbys[i].write;
     676          144 :         XLogRecPtr  flush = sync_standbys[i].flush;
     677          144 :         XLogRecPtr  apply = sync_standbys[i].apply;
     678              : 
     679          144 :         if (!XLogRecPtrIsValid(*writePtr) || *writePtr > write)
     680          141 :             *writePtr = write;
     681          144 :         if (!XLogRecPtrIsValid(*flushPtr) || *flushPtr > flush)
     682          141 :             *flushPtr = flush;
     683          144 :         if (!XLogRecPtrIsValid(*applyPtr) || *applyPtr > apply)
     684          141 :             *applyPtr = apply;
     685              :     }
     686          141 : }
     687              : 
     688              : /*
     689              :  * Calculate the Nth latest Write, Flush and Apply positions among sync
     690              :  * standbys.
     691              :  */
     692              : static void
     693            0 : SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
     694              :                               XLogRecPtr *flushPtr,
     695              :                               XLogRecPtr *applyPtr,
     696              :                               SyncRepStandbyData *sync_standbys,
     697              :                               int num_standbys,
     698              :                               uint8 nth)
     699              : {
     700              :     XLogRecPtr *write_array;
     701              :     XLogRecPtr *flush_array;
     702              :     XLogRecPtr *apply_array;
     703              :     int         i;
     704              : 
     705              :     /* Should have enough candidates, or somebody messed up */
     706              :     Assert(nth > 0 && nth <= num_standbys);
     707              : 
     708            0 :     write_array = palloc_array(XLogRecPtr, num_standbys);
     709            0 :     flush_array = palloc_array(XLogRecPtr, num_standbys);
     710            0 :     apply_array = palloc_array(XLogRecPtr, num_standbys);
     711              : 
     712            0 :     for (i = 0; i < num_standbys; i++)
     713              :     {
     714            0 :         write_array[i] = sync_standbys[i].write;
     715            0 :         flush_array[i] = sync_standbys[i].flush;
     716            0 :         apply_array[i] = sync_standbys[i].apply;
     717              :     }
     718              : 
     719              :     /* Sort each array in descending order */
     720            0 :     qsort(write_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
     721            0 :     qsort(flush_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
     722            0 :     qsort(apply_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
     723              : 
     724              :     /* Get Nth latest Write, Flush, Apply positions */
     725            0 :     *writePtr = write_array[nth - 1];
     726            0 :     *flushPtr = flush_array[nth - 1];
     727            0 :     *applyPtr = apply_array[nth - 1];
     728              : 
     729            0 :     pfree(write_array);
     730            0 :     pfree(flush_array);
     731            0 :     pfree(apply_array);
     732            0 : }
     733              : 
     734              : /*
     735              :  * Compare lsn in order to sort array in descending order.
     736              :  */
     737              : static int
     738            0 : cmp_lsn(const void *a, const void *b)
     739              : {
     740            0 :     XLogRecPtr  lsn1 = *((const XLogRecPtr *) a);
     741            0 :     XLogRecPtr  lsn2 = *((const XLogRecPtr *) b);
     742              : 
     743            0 :     return pg_cmp_u64(lsn2, lsn1);
     744              : }
     745              : 
     746              : /*
     747              :  * Return data about walsenders that are candidates to be sync standbys.
     748              :  *
     749              :  * *standbys is set to a palloc'd array of structs of per-walsender data,
     750              :  * and the number of valid entries (candidate sync senders) is returned.
     751              :  * (This might be more or fewer than num_sync; caller must check.)
     752              :  */
     753              : int
     754          800 : SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
     755              : {
     756              :     int         i;
     757              :     int         n;
     758              : 
     759              :     /* Create result array */
     760          800 :     *standbys = palloc_array(SyncRepStandbyData, max_wal_senders);
     761              : 
     762              :     /* Quick exit if sync replication is not requested */
     763          800 :     if (SyncRepConfig == NULL)
     764          637 :         return 0;
     765              : 
     766              :     /* Collect raw data from shared memory */
     767          163 :     n = 0;
     768         1793 :     for (i = 0; i < max_wal_senders; i++)
     769              :     {
     770              :         volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
     771              :                                      * rearrangement */
     772              :         SyncRepStandbyData *stby;
     773              :         WalSndState state;      /* not included in SyncRepStandbyData */
     774              : 
     775         1630 :         walsnd = &WalSndCtl->walsnds[i];
     776         1630 :         stby = *standbys + n;
     777              : 
     778         1630 :         SpinLockAcquire(&walsnd->mutex);
     779         1630 :         stby->pid = walsnd->pid;
     780         1630 :         state = walsnd->state;
     781         1630 :         stby->write = walsnd->write;
     782         1630 :         stby->flush = walsnd->flush;
     783         1630 :         stby->apply = walsnd->apply;
     784         1630 :         stby->sync_standby_priority = walsnd->sync_standby_priority;
     785         1630 :         SpinLockRelease(&walsnd->mutex);
     786              : 
     787              :         /* Must be active */
     788         1630 :         if (stby->pid == 0)
     789         1415 :             continue;
     790              : 
     791              :         /* Must be streaming or stopping */
     792          215 :         if (state != WALSNDSTATE_STREAMING &&
     793              :             state != WALSNDSTATE_STOPPING)
     794            0 :             continue;
     795              : 
     796              :         /* Must be synchronous */
     797          215 :         if (stby->sync_standby_priority == 0)
     798           12 :             continue;
     799              : 
     800              :         /* Must have a valid flush position */
     801          203 :         if (!XLogRecPtrIsValid(stby->flush))
     802            0 :             continue;
     803              : 
     804              :         /* OK, it's a candidate */
     805          203 :         stby->walsnd_index = i;
     806          203 :         stby->is_me = (walsnd == MyWalSnd);
     807          203 :         n++;
     808              :     }
     809              : 
     810              :     /*
     811              :      * In quorum mode, we return all the candidates.  In priority mode, if we
     812              :      * have too many candidates then return only the num_sync ones of highest
     813              :      * priority.
     814              :      */
     815          163 :     if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY &&
     816          161 :         n > SyncRepConfig->num_sync)
     817              :     {
     818              :         /* Sort by priority ... */
     819           15 :         qsort(*standbys, n, sizeof(SyncRepStandbyData),
     820              :               standby_priority_comparator);
     821              :         /* ... then report just the first num_sync ones */
     822           15 :         n = SyncRepConfig->num_sync;
     823              :     }
     824              : 
     825          163 :     return n;
     826              : }
     827              : 
     828              : /*
     829              :  * qsort comparator to sort SyncRepStandbyData entries by priority
     830              :  */
     831              : static int
     832           39 : standby_priority_comparator(const void *a, const void *b)
     833              : {
     834           39 :     const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
     835           39 :     const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
     836              : 
     837              :     /* First, sort by increasing priority value */
     838           39 :     if (sa->sync_standby_priority != sb->sync_standby_priority)
     839           21 :         return sa->sync_standby_priority - sb->sync_standby_priority;
     840              : 
     841              :     /*
     842              :      * We might have equal priority values; arbitrarily break ties by position
     843              :      * in the WalSnd array.  (This is utterly bogus, since that is arrival
     844              :      * order dependent, but there are regression tests that rely on it.)
     845              :      */
     846           18 :     return sa->walsnd_index - sb->walsnd_index;
     847              : }
     848              : 
     849              : 
     850              : /*
     851              :  * Check if we are in the list of sync standbys, and if so, determine
     852              :  * priority sequence. Return priority if set, or zero to indicate that
     853              :  * we are not a potential sync standby.
     854              :  *
     855              :  * Compare the parameter SyncRepStandbyNames against the application_name
     856              :  * for this WALSender, or allow any name if we find a wildcard "*".
     857              :  */
     858              : static int
     859          746 : SyncRepGetStandbyPriority(void)
     860              : {
     861              :     const char *standby_name;
     862              :     int         priority;
     863          746 :     bool        found = false;
     864              : 
     865              :     /*
     866              :      * Since synchronous cascade replication is not allowed, we always set the
     867              :      * priority of cascading walsender to zero.
     868              :      */
     869          746 :     if (am_cascading_walsender)
     870           26 :         return 0;
     871              : 
     872          720 :     if (!SyncStandbysDefined() || SyncRepConfig == NULL)
     873          696 :         return 0;
     874              : 
     875           24 :     standby_name = SyncRepConfig->member_names;
     876           32 :     for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
     877              :     {
     878           31 :         if (pg_strcasecmp(standby_name, application_name) == 0 ||
     879           18 :             strcmp(standby_name, "*") == 0)
     880              :         {
     881           23 :             found = true;
     882           23 :             break;
     883              :         }
     884            8 :         standby_name += strlen(standby_name) + 1;
     885              :     }
     886              : 
     887           24 :     if (!found)
     888            1 :         return 0;
     889              : 
     890              :     /*
     891              :      * In quorum-based sync replication, all the standbys in the list have the
     892              :      * same priority, one.
     893              :      */
     894           23 :     return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
     895              : }
     896              : 
     897              : /*
     898              :  * Walk the specified queue from head.  Set the state of any backends that
     899              :  * need to be woken, remove them from the queue, and then wake them.
     900              :  * Pass all = true to wake whole queue; otherwise, just wake up to
     901              :  * the walsender's LSN.
     902              :  *
     903              :  * The caller must hold SyncRepLock in exclusive mode.
     904              :  */
     905              : static int
     906          146 : SyncRepWakeQueue(bool all, int mode)
     907              : {
     908          146 :     volatile WalSndCtlData *walsndctl = WalSndCtl;
     909          146 :     int         numprocs = 0;
     910              :     dlist_mutable_iter iter;
     911              : 
     912              :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
     913              :     Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
     914              :     Assert(SyncRepQueueIsOrderedByLSN(mode));
     915              : 
     916          165 :     dlist_foreach_modify(iter, &WalSndCtl->SyncRepQueue[mode])
     917              :     {
     918           21 :         PGPROC     *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
     919              : 
     920              :         /*
     921              :          * Assume the queue is ordered by LSN
     922              :          */
     923           21 :         if (!all && walsndctl->lsn[mode] < proc->waitLSN)
     924            2 :             return numprocs;
     925              : 
     926              :         /*
     927              :          * Remove from queue.
     928              :          */
     929           19 :         dlist_delete_thoroughly(&proc->syncRepLinks);
     930              : 
     931              :         /*
     932              :          * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
     933              :          * make sure that it sees the queue link being removed before the
     934              :          * syncRepState change.
     935              :          */
     936           19 :         pg_write_barrier();
     937              : 
     938              :         /*
     939              :          * Set state to complete; see SyncRepWaitForLSN() for discussion of
     940              :          * the various states.
     941              :          */
     942           19 :         proc->syncRepState = SYNC_REP_WAIT_COMPLETE;
     943              : 
     944              :         /*
     945              :          * Wake only when we have set state and removed from queue.
     946              :          */
     947           19 :         SetLatch(&(proc->procLatch));
     948              : 
     949           19 :         numprocs++;
     950              :     }
     951              : 
     952          144 :     return numprocs;
     953              : }
     954              : 
     955              : /*
     956              :  * The checkpointer calls this as needed to update the shared
     957              :  * sync_standbys_status flag, so that backends don't remain permanently wedged
     958              :  * if synchronous_standby_names is unset.  It's safe to check the current value
     959              :  * without the lock, because it's only ever updated by one process.  But we
     960              :  * must take the lock to change it.
     961              :  */
     962              : void
     963          648 : SyncRepUpdateSyncStandbysDefined(void)
     964              : {
     965          648 :     bool        sync_standbys_defined = SyncStandbysDefined();
     966              : 
     967          648 :     if (sync_standbys_defined !=
     968          648 :         ((WalSndCtl->sync_standbys_status & SYNC_STANDBY_DEFINED) != 0))
     969              :     {
     970           13 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     971              : 
     972              :         /*
     973              :          * If synchronous_standby_names has been reset to empty, it's futile
     974              :          * for backends to continue waiting.  Since the user no longer wants
     975              :          * synchronous replication, we'd better wake them up.
     976              :          */
     977           13 :         if (!sync_standbys_defined)
     978              :         {
     979              :             int         i;
     980              : 
     981            4 :             for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
     982            3 :                 SyncRepWakeQueue(true, i);
     983              :         }
     984              : 
     985              :         /*
     986              :          * Only allow people to join the queue when there are synchronous
     987              :          * standbys defined.  Without this interlock, there's a race
     988              :          * condition: we might wake up all the current waiters; then, some
     989              :          * backend that hasn't yet reloaded its config might go to sleep on
     990              :          * the queue (and never wake up).  This prevents that.
     991              :          */
     992           13 :         WalSndCtl->sync_standbys_status = SYNC_STANDBY_INIT |
     993              :             (sync_standbys_defined ? SYNC_STANDBY_DEFINED : 0);
     994              : 
     995           13 :         LWLockRelease(SyncRepLock);
     996              :     }
     997          635 :     else if ((WalSndCtl->sync_standbys_status & SYNC_STANDBY_INIT) == 0)
     998              :     {
     999          572 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
    1000              : 
    1001              :         /*
    1002              :          * Note that there is no need to wake up the queues here.  We would
    1003              :          * reach this path only if SyncStandbysDefined() returns false, or it
    1004              :          * would mean that some backends are waiting with the GUC set.  See
    1005              :          * SyncRepWaitForLSN().
    1006              :          */
    1007              :         Assert(!SyncStandbysDefined());
    1008              : 
    1009              :         /*
    1010              :          * Even if there is no sync standby defined, let the readers of this
    1011              :          * information know that the sync standby data has been initialized.
    1012              :          * This can just be done once, hence the previous check on
    1013              :          * SYNC_STANDBY_INIT to avoid useless work.
    1014              :          */
    1015          572 :         WalSndCtl->sync_standbys_status |= SYNC_STANDBY_INIT;
    1016              : 
    1017          572 :         LWLockRelease(SyncRepLock);
    1018              :     }
    1019          648 : }
    1020              : 
    1021              : #ifdef USE_ASSERT_CHECKING
    1022              : static bool
    1023              : SyncRepQueueIsOrderedByLSN(int mode)
    1024              : {
    1025              :     XLogRecPtr  lastLSN;
    1026              :     dlist_iter  iter;
    1027              : 
    1028              :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
    1029              : 
    1030              :     lastLSN = InvalidXLogRecPtr;
    1031              : 
    1032              :     dlist_foreach(iter, &WalSndCtl->SyncRepQueue[mode])
    1033              :     {
    1034              :         PGPROC     *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
    1035              : 
    1036              :         /*
    1037              :          * Check the queue is ordered by LSN and that multiple procs don't
    1038              :          * have matching LSNs
    1039              :          */
    1040              :         if (proc->waitLSN <= lastLSN)
    1041              :             return false;
    1042              : 
    1043              :         lastLSN = proc->waitLSN;
    1044              :     }
    1045              : 
    1046              :     return true;
    1047              : }
    1048              : #endif
    1049              : 
    1050              : /*
    1051              :  * ===========================================================
    1052              :  * Synchronous Replication functions executed by any process
    1053              :  * ===========================================================
    1054              :  */
    1055              : 
    1056              : bool
    1057         1279 : check_synchronous_standby_names(char **newval, void **extra, GucSource source)
    1058              : {
    1059         1279 :     if (*newval != NULL && (*newval)[0] != '\0')
    1060           78 :     {
    1061              :         yyscan_t    scanner;
    1062              :         int         parse_rc;
    1063              :         SyncRepConfigData *pconf;
    1064              : 
    1065              :         /* Result of parsing is returned in one of these two variables */
    1066           78 :         SyncRepConfigData *syncrep_parse_result = NULL;
    1067           78 :         char       *syncrep_parse_error_msg = NULL;
    1068              : 
    1069              :         /* Parse the synchronous_standby_names string */
    1070           78 :         syncrep_scanner_init(*newval, &scanner);
    1071           78 :         parse_rc = syncrep_yyparse(&syncrep_parse_result, &syncrep_parse_error_msg, scanner);
    1072           78 :         syncrep_scanner_finish(scanner);
    1073              : 
    1074           78 :         if (parse_rc != 0 || syncrep_parse_result == NULL)
    1075              :         {
    1076            0 :             GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
    1077            0 :             if (syncrep_parse_error_msg)
    1078            0 :                 GUC_check_errdetail("%s", syncrep_parse_error_msg);
    1079              :             else
    1080              :                 /* translator: %s is a GUC name */
    1081            0 :                 GUC_check_errdetail("\"%s\" parser failed.",
    1082              :                                     "synchronous_standby_names");
    1083            0 :             return false;
    1084              :         }
    1085              : 
    1086           78 :         if (syncrep_parse_result->num_sync <= 0)
    1087              :         {
    1088            0 :             GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
    1089            0 :                              syncrep_parse_result->num_sync);
    1090            0 :             return false;
    1091              :         }
    1092              : 
    1093              :         /* GUC extra value must be guc_malloc'd, not palloc'd */
    1094              :         pconf = (SyncRepConfigData *)
    1095           78 :             guc_malloc(LOG, syncrep_parse_result->config_size);
    1096           78 :         if (pconf == NULL)
    1097            0 :             return false;
    1098           78 :         memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);
    1099              : 
    1100           78 :         *extra = pconf;
    1101              : 
    1102              :         /*
    1103              :          * We need not explicitly clean up syncrep_parse_result.  It, and any
    1104              :          * other cruft generated during parsing, will be freed when the
    1105              :          * current memory context is deleted.  (This code is generally run in
    1106              :          * a short-lived context used for config file processing, so that will
    1107              :          * not be very long.)
    1108              :          */
    1109              :     }
    1110              :     else
    1111         1201 :         *extra = NULL;
    1112              : 
    1113         1279 :     return true;
    1114              : }
    1115              : 
    1116              : void
    1117         1269 : assign_synchronous_standby_names(const char *newval, void *extra)
    1118              : {
    1119         1269 :     SyncRepConfig = (SyncRepConfigData *) extra;
    1120         1269 : }
    1121              : 
    1122              : void
    1123         3451 : assign_synchronous_commit(int newval, void *extra)
    1124              : {
    1125         3451 :     switch (newval)
    1126              :     {
    1127            0 :         case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
    1128            0 :             SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
    1129            0 :             break;
    1130         1327 :         case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
    1131         1327 :             SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
    1132         1327 :             break;
    1133            2 :         case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
    1134            2 :             SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
    1135            2 :             break;
    1136         2122 :         default:
    1137         2122 :             SyncRepWaitMode = SYNC_REP_NO_WAIT;
    1138         2122 :             break;
    1139              :     }
    1140         3451 : }
        

Generated by: LCOV version 2.0-1