LCOV - code coverage report
Current view: top level - src/backend/replication - syncrep.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 76.6 % 295 226
Test Date: 2026-03-24 01:16:09 Functions: 83.3 % 18 15
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * syncrep.c
       4              :  *
       5              :  * Synchronous replication is new as of PostgreSQL 9.1.
       6              :  *
       7              :  * If requested, transaction commits wait until their commit LSN are
       8              :  * acknowledged by the synchronous standbys.
       9              :  *
      10              :  * This module contains the code for waiting and release of backends.
      11              :  * All code in this module executes on the primary. The core streaming
      12              :  * replication transport remains within WALreceiver/WALsender modules.
      13              :  *
      14              :  * The essence of this design is that it isolates all logic about
      15              :  * waiting/releasing onto the primary. The primary defines which standbys
      16              :  * it wishes to wait for. The standbys are completely unaware of the
      17              :  * durability requirements of transactions on the primary, reducing the
      18              :  * complexity of the code and streamlining both standby operations and
      19              :  * network bandwidth because there is no requirement to ship
      20              :  * per-transaction state information.
      21              :  *
      22              :  * Replication is either synchronous or not synchronous (async). If it is
      23              :  * async, we just fastpath out of here. If it is sync, then we wait for
      24              :  * the write, flush or apply location on the standby before releasing
      25              :  * the waiting backend. Further complexity in that interaction is
      26              :  * expected in later releases.
      27              :  *
      28              :  * The best performing way to manage the waiting backends is to have a
      29              :  * single ordered queue of waiting backends, so that we can avoid
      30              :  * searching the through all waiters each time we receive a reply.
      31              :  *
      32              :  * In 9.5 or before only a single standby could be considered as
      33              :  * synchronous. In 9.6 we support a priority-based multiple synchronous
      34              :  * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
      35              :  * supported. The number of synchronous standbys that transactions
      36              :  * must wait for replies from is specified in synchronous_standby_names.
      37              :  * This parameter also specifies a list of standby names and the method
      38              :  * (FIRST and ANY) to choose synchronous standbys from the listed ones.
      39              :  *
      40              :  * The method FIRST specifies a priority-based synchronous replication
      41              :  * and makes transaction commits wait until their WAL records are
      42              :  * replicated to the requested number of synchronous standbys chosen based
      43              :  * on their priorities. The standbys whose names appear earlier in the list
      44              :  * are given higher priority and will be considered as synchronous.
      45              :  * Other standby servers appearing later in this list represent potential
      46              :  * synchronous standbys. If any of the current synchronous standbys
      47              :  * disconnects for whatever reason, it will be replaced immediately with
      48              :  * the next-highest-priority standby.
      49              :  *
      50              :  * The method ANY specifies a quorum-based synchronous replication
      51              :  * and makes transaction commits wait until their WAL records are
      52              :  * replicated to at least the requested number of synchronous standbys
      53              :  * in the list. All the standbys appearing in the list are considered as
      54              :  * candidates for quorum synchronous standbys.
      55              :  *
      56              :  * If neither FIRST nor ANY is specified, FIRST is used as the method.
      57              :  * This is for backward compatibility with 9.6 or before where only a
      58              :  * priority-based sync replication was supported.
      59              :  *
      60              :  * Before the standbys chosen from synchronous_standby_names can
      61              :  * become the synchronous standbys they must have caught up with
      62              :  * the primary; that may take some time. Once caught up,
      63              :  * the standbys which are considered as synchronous at that moment
      64              :  * will release waiters from the queue.
      65              :  *
      66              :  * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
      67              :  *
      68              :  * IDENTIFICATION
      69              :  *    src/backend/replication/syncrep.c
      70              :  *
      71              :  *-------------------------------------------------------------------------
      72              :  */
      73              : #include "postgres.h"
      74              : 
      75              : #include <unistd.h>
      76              : 
      77              : #include "access/xact.h"
      78              : #include "common/int.h"
      79              : #include "miscadmin.h"
      80              : #include "pgstat.h"
      81              : #include "replication/syncrep.h"
      82              : #include "replication/walsender.h"
      83              : #include "replication/walsender_private.h"
      84              : #include "storage/proc.h"
      85              : #include "tcop/tcopprot.h"
      86              : #include "utils/guc_hooks.h"
      87              : #include "utils/ps_status.h"
      88              : #include "utils/wait_event.h"
      89              : 
      90              : /* User-settable parameters for sync rep */
      91              : char       *SyncRepStandbyNames;
      92              : 
      93              : #define SyncStandbysDefined() \
      94              :     (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
      95              : 
      96              : static bool announce_next_takeover = true;
      97              : 
      98              : SyncRepConfigData *SyncRepConfig = NULL;
      99              : static int  SyncRepWaitMode = SYNC_REP_NO_WAIT;
     100              : 
     101              : static void SyncRepQueueInsert(int mode);
     102              : static void SyncRepCancelWait(void);
     103              : static int  SyncRepWakeQueue(bool all, int mode);
     104              : 
     105              : static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
     106              :                                  XLogRecPtr *flushPtr,
     107              :                                  XLogRecPtr *applyPtr,
     108              :                                  bool *am_sync);
     109              : static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
     110              :                                        XLogRecPtr *flushPtr,
     111              :                                        XLogRecPtr *applyPtr,
     112              :                                        SyncRepStandbyData *sync_standbys,
     113              :                                        int num_standbys);
     114              : static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
     115              :                                           XLogRecPtr *flushPtr,
     116              :                                           XLogRecPtr *applyPtr,
     117              :                                           SyncRepStandbyData *sync_standbys,
     118              :                                           int num_standbys,
     119              :                                           uint8 nth);
     120              : static int  SyncRepGetStandbyPriority(void);
     121              : static int  standby_priority_comparator(const void *a, const void *b);
     122              : static int  cmp_lsn(const void *a, const void *b);
     123              : 
     124              : #ifdef USE_ASSERT_CHECKING
     125              : static bool SyncRepQueueIsOrderedByLSN(int mode);
     126              : #endif
     127              : 
     128              : /*
     129              :  * ===========================================================
     130              :  * Synchronous Replication functions for normal user backends
     131              :  * ===========================================================
     132              :  */
     133              : 
     134              : /*
     135              :  * Wait for synchronous replication, if requested by user.
     136              :  *
     137              :  * Initially backends start in state SYNC_REP_NOT_WAITING and then
     138              :  * change that state to SYNC_REP_WAITING before adding ourselves
     139              :  * to the wait queue. During SyncRepWakeQueue() a WALSender changes
     140              :  * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
     141              :  * This backend then resets its state to SYNC_REP_NOT_WAITING.
     142              :  *
     143              :  * 'lsn' represents the LSN to wait for.  'commit' indicates whether this LSN
     144              :  * represents a commit record.  If it doesn't, then we wait only for the WAL
     145              :  * to be flushed if synchronous_commit is set to the higher level of
     146              :  * remote_apply, because only commit records provide apply feedback.
     147              :  */
     148              : void
     149       147465 : SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
     150              : {
     151              :     int         mode;
     152              : 
     153              :     /*
     154              :      * This should be called while holding interrupts during a transaction
     155              :      * commit to prevent the follow-up shared memory queue cleanups to be
     156              :      * influenced by external interruptions.
     157              :      */
     158              :     Assert(InterruptHoldoffCount > 0);
     159              : 
     160              :     /*
     161              :      * Fast exit if user has not requested sync replication, or there are no
     162              :      * sync replication standby names defined.
     163              :      *
     164              :      * Since this routine gets called every commit time, it's important to
     165              :      * exit quickly if sync replication is not requested.
     166              :      *
     167              :      * We check WalSndCtl->sync_standbys_status flag without the lock and exit
     168              :      * immediately if SYNC_STANDBY_INIT is set (the checkpointer has
     169              :      * initialized this data) but SYNC_STANDBY_DEFINED is missing (no sync
     170              :      * replication requested).
     171              :      *
     172              :      * If SYNC_STANDBY_DEFINED is set, we need to check the status again later
     173              :      * while holding the lock, to check the flag and operate the sync rep
     174              :      * queue atomically.  This is necessary to avoid the race condition
     175              :      * described in SyncRepUpdateSyncStandbysDefined().  On the other hand, if
     176              :      * SYNC_STANDBY_DEFINED is not set, the lock is not necessary because we
     177              :      * don't touch the queue.
     178              :      */
     179       147465 :     if (!SyncRepRequested() ||
     180        94942 :         ((((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_status) &
     181              :          (SYNC_STANDBY_INIT | SYNC_STANDBY_DEFINED)) == SYNC_STANDBY_INIT)
     182       113659 :         return;
     183              : 
     184              :     /* Cap the level for anything other than commit to remote flush only. */
     185        33806 :     if (commit)
     186        33786 :         mode = SyncRepWaitMode;
     187              :     else
     188           20 :         mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
     189              : 
     190              :     Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
     191              :     Assert(WalSndCtl != NULL);
     192              : 
     193        33806 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     194              :     Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
     195              : 
     196              :     /*
     197              :      * We don't wait for sync rep if SYNC_STANDBY_DEFINED is not set.  See
     198              :      * SyncRepUpdateSyncStandbysDefined().
     199              :      *
     200              :      * Also check that the standby hasn't already replied. Unlikely race
     201              :      * condition but we'll be fetching that cache line anyway so it's likely
     202              :      * to be a low cost check.
     203              :      *
     204              :      * If the sync standby data has not been initialized yet
     205              :      * (SYNC_STANDBY_INIT is not set), fall back to a check based on the LSN,
     206              :      * then do a direct GUC check.
     207              :      */
     208        33806 :     if (WalSndCtl->sync_standbys_status & SYNC_STANDBY_INIT)
     209              :     {
     210           42 :         if ((WalSndCtl->sync_standbys_status & SYNC_STANDBY_DEFINED) == 0 ||
     211           42 :             lsn <= WalSndCtl->lsn[mode])
     212              :         {
     213            7 :             LWLockRelease(SyncRepLock);
     214            7 :             return;
     215              :         }
     216              :     }
     217        33764 :     else if (lsn <= WalSndCtl->lsn[mode])
     218              :     {
     219              :         /*
     220              :          * The LSN is older than what we need to wait for.  The sync standby
     221              :          * data has not been initialized yet, but we are OK to not wait
     222              :          * because we know that there is no point in doing so based on the
     223              :          * LSN.
     224              :          */
     225            0 :         LWLockRelease(SyncRepLock);
     226            0 :         return;
     227              :     }
     228        33764 :     else if (!SyncStandbysDefined())
     229              :     {
     230              :         /*
     231              :          * If we are here, the sync standby data has not been initialized yet,
     232              :          * and the LSN is newer than what need to wait for, so we have fallen
     233              :          * back to the best thing we could do in this case: a check on
     234              :          * SyncStandbysDefined() to see if the GUC is set or not.
     235              :          *
     236              :          * When the GUC has a value, we wait until the checkpointer updates
     237              :          * the status data because we cannot be sure yet if we should wait or
     238              :          * not. Here, the GUC has *no* value, we are sure that there is no
     239              :          * point to wait; this matters for example when initializing a
     240              :          * cluster, where we should never wait, and no sync standbys is the
     241              :          * default behavior.
     242              :          */
     243        33764 :         LWLockRelease(SyncRepLock);
     244        33764 :         return;
     245              :     }
     246              : 
     247              :     /*
     248              :      * Set our waitLSN so WALSender will know when to wake us, and add
     249              :      * ourselves to the queue.
     250              :      */
     251           35 :     MyProc->waitLSN = lsn;
     252           35 :     MyProc->syncRepState = SYNC_REP_WAITING;
     253           35 :     SyncRepQueueInsert(mode);
     254              :     Assert(SyncRepQueueIsOrderedByLSN(mode));
     255           35 :     LWLockRelease(SyncRepLock);
     256              : 
     257              :     /* Alter ps display to show waiting for sync rep. */
     258           35 :     if (update_process_title)
     259              :     {
     260              :         char        buffer[32];
     261              : 
     262           35 :         sprintf(buffer, "waiting for %X/%08X", LSN_FORMAT_ARGS(lsn));
     263           35 :         set_ps_display_suffix(buffer);
     264              :     }
     265              : 
     266              :     /*
     267              :      * Wait for specified LSN to be confirmed.
     268              :      *
     269              :      * Each proc has its own wait latch, so we perform a normal latch
     270              :      * check/wait loop here.
     271              :      */
     272              :     for (;;)
     273           35 :     {
     274              :         int         rc;
     275              : 
     276              :         /* Must reset the latch before testing state. */
     277           70 :         ResetLatch(MyLatch);
     278              : 
     279              :         /*
     280              :          * Acquiring the lock is not needed, the latch ensures proper
     281              :          * barriers. If it looks like we're done, we must really be done,
     282              :          * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
     283              :          * it will never update it again, so we can't be seeing a stale value
     284              :          * in that case.
     285              :          */
     286           70 :         if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
     287           35 :             break;
     288              : 
     289              :         /*
     290              :          * If a wait for synchronous replication is pending, we can neither
     291              :          * acknowledge the commit nor raise ERROR or FATAL.  The latter would
     292              :          * lead the client to believe that the transaction aborted, which is
     293              :          * not true: it's already committed locally. The former is no good
     294              :          * either: the client has requested synchronous replication, and is
     295              :          * entitled to assume that an acknowledged commit is also replicated,
     296              :          * which might not be true. So in this case we issue a WARNING (which
     297              :          * some clients may be able to interpret) and shut off further output.
     298              :          * We do NOT reset ProcDiePending, so that the process will die after
     299              :          * the commit is cleaned up.
     300              :          */
     301           35 :         if (ProcDiePending)
     302              :         {
     303            0 :             ereport(WARNING,
     304              :                     (errcode(ERRCODE_ADMIN_SHUTDOWN),
     305              :                      errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
     306              :                      errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
     307            0 :             whereToSendOutput = DestNone;
     308            0 :             SyncRepCancelWait();
     309            0 :             break;
     310              :         }
     311              : 
     312              :         /*
     313              :          * It's unclear what to do if a query cancel interrupt arrives.  We
     314              :          * can't actually abort at this point, but ignoring the interrupt
     315              :          * altogether is not helpful, so we just terminate the wait with a
     316              :          * suitable warning.
     317              :          */
     318           35 :         if (QueryCancelPending)
     319              :         {
     320            0 :             QueryCancelPending = false;
     321            0 :             ereport(WARNING,
     322              :                     (errmsg("canceling wait for synchronous replication due to user request"),
     323              :                      errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
     324            0 :             SyncRepCancelWait();
     325            0 :             break;
     326              :         }
     327              : 
     328              :         /*
     329              :          * Wait on latch.  Any condition that should wake us up will set the
     330              :          * latch, so no need for timeout.
     331              :          */
     332           35 :         rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
     333              :                        WAIT_EVENT_SYNC_REP);
     334              : 
     335              :         /*
     336              :          * If the postmaster dies, we'll probably never get an acknowledgment,
     337              :          * because all the wal sender processes will exit. So just bail out.
     338              :          */
     339           35 :         if (rc & WL_POSTMASTER_DEATH)
     340              :         {
     341            0 :             ProcDiePending = true;
     342            0 :             whereToSendOutput = DestNone;
     343            0 :             SyncRepCancelWait();
     344            0 :             break;
     345              :         }
     346              :     }
     347              : 
     348              :     /*
     349              :      * WalSender has checked our LSN and has removed us from queue. Clean up
     350              :      * state and leave.  It's OK to reset these shared memory fields without
     351              :      * holding SyncRepLock, because any walsenders will ignore us anyway when
     352              :      * we're not on the queue.  We need a read barrier to make sure we see the
     353              :      * changes to the queue link (this might be unnecessary without
     354              :      * assertions, but better safe than sorry).
     355              :      */
     356           35 :     pg_read_barrier();
     357              :     Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
     358           35 :     MyProc->syncRepState = SYNC_REP_NOT_WAITING;
     359           35 :     MyProc->waitLSN = InvalidXLogRecPtr;
     360              : 
     361              :     /* reset ps display to remove the suffix */
     362           35 :     if (update_process_title)
     363           35 :         set_ps_display_remove_suffix();
     364              : }
     365              : 
     366              : /*
     367              :  * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
     368              :  *
     369              :  * Usually we will go at tail of queue, though it's possible that we arrive
     370              :  * here out of order, so start at tail and work back to insertion point.
     371              :  */
     372              : static void
     373           35 : SyncRepQueueInsert(int mode)
     374              : {
     375              :     dlist_head *queue;
     376              :     dlist_iter  iter;
     377              : 
     378              :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
     379           35 :     queue = &WalSndCtl->SyncRepQueue[mode];
     380              : 
     381           35 :     dlist_reverse_foreach(iter, queue)
     382              :     {
     383            0 :         PGPROC     *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
     384              : 
     385              :         /*
     386              :          * Stop at the queue element that we should insert after to ensure the
     387              :          * queue is ordered by LSN.
     388              :          */
     389            0 :         if (proc->waitLSN < MyProc->waitLSN)
     390              :         {
     391            0 :             dlist_insert_after(&proc->syncRepLinks, &MyProc->syncRepLinks);
     392            0 :             return;
     393              :         }
     394              :     }
     395              : 
     396              :     /*
     397              :      * If we get here, the list was either empty, or this process needs to be
     398              :      * at the head.
     399              :      */
     400           35 :     dlist_push_head(queue, &MyProc->syncRepLinks);
     401              : }
     402              : 
     403              : /*
     404              :  * Acquire SyncRepLock and cancel any wait currently in progress.
     405              :  */
     406              : static void
     407            0 : SyncRepCancelWait(void)
     408              : {
     409            0 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     410            0 :     if (!dlist_node_is_detached(&MyProc->syncRepLinks))
     411            0 :         dlist_delete_thoroughly(&MyProc->syncRepLinks);
     412            0 :     MyProc->syncRepState = SYNC_REP_NOT_WAITING;
     413            0 :     LWLockRelease(SyncRepLock);
     414            0 : }
     415              : 
     416              : void
     417        19358 : SyncRepCleanupAtProcExit(void)
     418              : {
     419              :     /*
     420              :      * First check if we are removed from the queue without the lock to not
     421              :      * slow down backend exit.
     422              :      */
     423        19358 :     if (!dlist_node_is_detached(&MyProc->syncRepLinks))
     424              :     {
     425            0 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     426              : 
     427              :         /* maybe we have just been removed, so recheck */
     428            0 :         if (!dlist_node_is_detached(&MyProc->syncRepLinks))
     429            0 :             dlist_delete_thoroughly(&MyProc->syncRepLinks);
     430              : 
     431            0 :         LWLockRelease(SyncRepLock);
     432              :     }
     433        19358 : }
     434              : 
     435              : /*
     436              :  * ===========================================================
     437              :  * Synchronous Replication functions for wal sender processes
     438              :  * ===========================================================
     439              :  */
     440              : 
     441              : /*
     442              :  * Take any action required to initialise sync rep state from config
     443              :  * data. Called at WALSender startup and after each SIGHUP.
     444              :  */
     445              : void
     446          789 : SyncRepInitConfig(void)
     447              : {
     448              :     int         priority;
     449              : 
     450              :     /*
     451              :      * Determine if we are a potential sync standby and remember the result
     452              :      * for handling replies from standby.
     453              :      */
     454          789 :     priority = SyncRepGetStandbyPriority();
     455          789 :     if (MyWalSnd->sync_standby_priority != priority)
     456              :     {
     457           18 :         SpinLockAcquire(&MyWalSnd->mutex);
     458           18 :         MyWalSnd->sync_standby_priority = priority;
     459           18 :         SpinLockRelease(&MyWalSnd->mutex);
     460              : 
     461           18 :         ereport(DEBUG1,
     462              :                 (errmsg_internal("standby \"%s\" now has synchronous standby priority %d",
     463              :                                  application_name, priority)));
     464              :     }
     465          789 : }
     466              : 
     467              : /*
     468              :  * Update the LSNs on each queue based upon our latest state. This
     469              :  * implements a simple policy of first-valid-sync-standby-releases-waiter.
     470              :  *
     471              :  * Other policies are possible, which would change what we do here and
     472              :  * perhaps also which information we store as well.
     473              :  */
     474              : void
     475       112838 : SyncRepReleaseWaiters(void)
     476              : {
     477       112838 :     volatile WalSndCtlData *walsndctl = WalSndCtl;
     478              :     XLogRecPtr  writePtr;
     479              :     XLogRecPtr  flushPtr;
     480              :     XLogRecPtr  applyPtr;
     481              :     bool        got_recptr;
     482              :     bool        am_sync;
     483       112838 :     int         numwrite = 0;
     484       112838 :     int         numflush = 0;
     485       112838 :     int         numapply = 0;
     486              : 
     487              :     /*
     488              :      * If this WALSender is serving a standby that is not on the list of
     489              :      * potential sync standbys then we have nothing to do. If we are still
     490              :      * starting up, still running base backup or the current flush position is
     491              :      * still invalid, then leave quickly also.  Streaming or stopping WAL
     492              :      * senders are allowed to release waiters.
     493              :      */
     494       112838 :     if (MyWalSnd->sync_standby_priority == 0 ||
     495          366 :         (MyWalSnd->state != WALSNDSTATE_STREAMING &&
     496          233 :          MyWalSnd->state != WALSNDSTATE_STOPPING) ||
     497          357 :         !XLogRecPtrIsValid(MyWalSnd->flush))
     498              :     {
     499       112481 :         announce_next_takeover = true;
     500       112486 :         return;
     501              :     }
     502              : 
     503              :     /*
     504              :      * We're a potential sync standby. Release waiters if there are enough
     505              :      * sync standbys and we are considered as sync.
     506              :      */
     507          357 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     508              : 
     509              :     /*
     510              :      * Check whether we are a sync standby or not, and calculate the synced
     511              :      * positions among all sync standbys.  (Note: although this step does not
     512              :      * of itself require holding SyncRepLock, it seems like a good idea to do
     513              :      * it after acquiring the lock.  This ensures that the WAL pointers we use
     514              :      * to release waiters are newer than any previous execution of this
     515              :      * routine used.)
     516              :      */
     517          357 :     got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
     518              : 
     519              :     /*
     520              :      * If we are managing a sync standby, though we weren't prior to this,
     521              :      * then announce we are now a sync standby.
     522              :      */
     523          357 :     if (announce_next_takeover && am_sync)
     524              :     {
     525           13 :         announce_next_takeover = false;
     526              : 
     527           13 :         if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
     528           13 :             ereport(LOG,
     529              :                     (errmsg("standby \"%s\" is now a synchronous standby with priority %d",
     530              :                             application_name, MyWalSnd->sync_standby_priority)));
     531              :         else
     532            0 :             ereport(LOG,
     533              :                     (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
     534              :                             application_name)));
     535              :     }
     536              : 
     537              :     /*
     538              :      * If the number of sync standbys is less than requested or we aren't
     539              :      * managing a sync standby then just leave.
     540              :      */
     541          357 :     if (!got_recptr || !am_sync)
     542              :     {
     543            5 :         LWLockRelease(SyncRepLock);
     544            5 :         announce_next_takeover = !am_sync;
     545            5 :         return;
     546              :     }
     547              : 
     548              :     /*
     549              :      * Set the lsn first so that when we wake backends they will release up to
     550              :      * this location.
     551              :      */
     552          352 :     if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
     553              :     {
     554           54 :         walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
     555           54 :         numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
     556              :     }
     557          352 :     if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
     558              :     {
     559           59 :         walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
     560           59 :         numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
     561              :     }
     562          352 :     if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
     563              :     {
     564           54 :         walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
     565           54 :         numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
     566              :     }
     567              : 
     568          352 :     LWLockRelease(SyncRepLock);
     569              : 
     570          352 :     elog(DEBUG3, "released %d procs up to write %X/%08X, %d procs up to flush %X/%08X, %d procs up to apply %X/%08X",
     571              :          numwrite, LSN_FORMAT_ARGS(writePtr),
     572              :          numflush, LSN_FORMAT_ARGS(flushPtr),
     573              :          numapply, LSN_FORMAT_ARGS(applyPtr));
     574              : }
     575              : 
     576              : /*
     577              :  * Calculate the synced Write, Flush and Apply positions among sync standbys.
     578              :  *
     579              :  * Return false if the number of sync standbys is less than
     580              :  * synchronous_standby_names specifies. Otherwise return true and
     581              :  * store the positions into *writePtr, *flushPtr and *applyPtr.
     582              :  *
     583              :  * On return, *am_sync is set to true if this walsender is connecting to
     584              :  * sync standby. Otherwise it's set to false.
     585              :  */
     586              : static bool
     587          357 : SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
     588              :                      XLogRecPtr *applyPtr, bool *am_sync)
     589              : {
     590              :     SyncRepStandbyData *sync_standbys;
     591              :     int         num_standbys;
     592              :     int         i;
     593              : 
     594              :     /* Initialize default results */
     595          357 :     *writePtr = InvalidXLogRecPtr;
     596          357 :     *flushPtr = InvalidXLogRecPtr;
     597          357 :     *applyPtr = InvalidXLogRecPtr;
     598          357 :     *am_sync = false;
     599              : 
     600              :     /* Quick out if not even configured to be synchronous */
     601          357 :     if (SyncRepConfig == NULL)
     602            0 :         return false;
     603              : 
     604              :     /* Get standbys that are considered as synchronous at this moment */
     605          357 :     num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
     606              : 
     607              :     /* Am I among the candidate sync standbys? */
     608          364 :     for (i = 0; i < num_standbys; i++)
     609              :     {
     610          360 :         if (sync_standbys[i].is_me)
     611              :         {
     612          353 :             *am_sync = true;
     613          353 :             break;
     614              :         }
     615              :     }
     616              : 
     617              :     /*
     618              :      * Nothing more to do if we are not managing a sync standby or there are
     619              :      * not enough synchronous standbys.
     620              :      */
     621          357 :     if (!(*am_sync) ||
     622          353 :         num_standbys < SyncRepConfig->num_sync)
     623              :     {
     624            5 :         pfree(sync_standbys);
     625            5 :         return false;
     626              :     }
     627              : 
     628              :     /*
     629              :      * In a priority-based sync replication, the synced positions are the
     630              :      * oldest ones among sync standbys. In a quorum-based, they are the Nth
     631              :      * latest ones.
     632              :      *
     633              :      * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
     634              :      * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
     635              :      * because it's a bit more efficient.
     636              :      *
     637              :      * XXX If the numbers of current and requested sync standbys are the same,
     638              :      * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
     639              :      * positions even in a quorum-based sync replication.
     640              :      */
     641          352 :     if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
     642              :     {
     643          352 :         SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
     644              :                                    sync_standbys, num_standbys);
     645              :     }
     646              :     else
     647              :     {
     648            0 :         SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
     649              :                                       sync_standbys, num_standbys,
     650            0 :                                       SyncRepConfig->num_sync);
     651              :     }
     652              : 
     653          352 :     pfree(sync_standbys);
     654          352 :     return true;
     655              : }
     656              : 
     657              : /*
     658              :  * Calculate the oldest Write, Flush and Apply positions among sync standbys.
     659              :  */
     660              : static void
     661          352 : SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
     662              :                            XLogRecPtr *flushPtr,
     663              :                            XLogRecPtr *applyPtr,
     664              :                            SyncRepStandbyData *sync_standbys,
     665              :                            int num_standbys)
     666              : {
     667              :     int         i;
     668              : 
     669              :     /*
     670              :      * Scan through all sync standbys and calculate the oldest Write, Flush
     671              :      * and Apply positions.  We assume *writePtr et al were initialized to
     672              :      * InvalidXLogRecPtr.
     673              :      */
     674          707 :     for (i = 0; i < num_standbys; i++)
     675              :     {
     676          355 :         XLogRecPtr  write = sync_standbys[i].write;
     677          355 :         XLogRecPtr  flush = sync_standbys[i].flush;
     678          355 :         XLogRecPtr  apply = sync_standbys[i].apply;
     679              : 
     680          355 :         if (!XLogRecPtrIsValid(*writePtr) || *writePtr > write)
     681          352 :             *writePtr = write;
     682          355 :         if (!XLogRecPtrIsValid(*flushPtr) || *flushPtr > flush)
     683          352 :             *flushPtr = flush;
     684          355 :         if (!XLogRecPtrIsValid(*applyPtr) || *applyPtr > apply)
     685          352 :             *applyPtr = apply;
     686              :     }
     687          352 : }
     688              : 
     689              : /*
     690              :  * Calculate the Nth latest Write, Flush and Apply positions among sync
     691              :  * standbys.
     692              :  */
     693              : static void
     694            0 : SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
     695              :                               XLogRecPtr *flushPtr,
     696              :                               XLogRecPtr *applyPtr,
     697              :                               SyncRepStandbyData *sync_standbys,
     698              :                               int num_standbys,
     699              :                               uint8 nth)
     700              : {
     701              :     XLogRecPtr *write_array;
     702              :     XLogRecPtr *flush_array;
     703              :     XLogRecPtr *apply_array;
     704              :     int         i;
     705              : 
     706              :     /* Should have enough candidates, or somebody messed up */
     707              :     Assert(nth > 0 && nth <= num_standbys);
     708              : 
     709            0 :     write_array = palloc_array(XLogRecPtr, num_standbys);
     710            0 :     flush_array = palloc_array(XLogRecPtr, num_standbys);
     711            0 :     apply_array = palloc_array(XLogRecPtr, num_standbys);
     712              : 
     713            0 :     for (i = 0; i < num_standbys; i++)
     714              :     {
     715            0 :         write_array[i] = sync_standbys[i].write;
     716            0 :         flush_array[i] = sync_standbys[i].flush;
     717            0 :         apply_array[i] = sync_standbys[i].apply;
     718              :     }
     719              : 
     720              :     /* Sort each array in descending order */
     721            0 :     qsort(write_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
     722            0 :     qsort(flush_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
     723            0 :     qsort(apply_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
     724              : 
     725              :     /* Get Nth latest Write, Flush, Apply positions */
     726            0 :     *writePtr = write_array[nth - 1];
     727            0 :     *flushPtr = flush_array[nth - 1];
     728            0 :     *applyPtr = apply_array[nth - 1];
     729              : 
     730            0 :     pfree(write_array);
     731            0 :     pfree(flush_array);
     732            0 :     pfree(apply_array);
     733            0 : }
     734              : 
     735              : /*
     736              :  * Compare lsn in order to sort array in descending order.
     737              :  */
     738              : static int
     739            0 : cmp_lsn(const void *a, const void *b)
     740              : {
     741            0 :     XLogRecPtr  lsn1 = *((const XLogRecPtr *) a);
     742            0 :     XLogRecPtr  lsn2 = *((const XLogRecPtr *) b);
     743              : 
     744            0 :     return pg_cmp_u64(lsn2, lsn1);
     745              : }
     746              : 
     747              : /*
     748              :  * Return data about walsenders that are candidates to be sync standbys.
     749              :  *
     750              :  * *standbys is set to a palloc'd array of structs of per-walsender data,
     751              :  * and the number of valid entries (candidate sync senders) is returned.
     752              :  * (This might be more or fewer than num_sync; caller must check.)
     753              :  */
     754              : int
     755          991 : SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
     756              : {
     757              :     int         i;
     758              :     int         n;
     759              : 
     760              :     /* Create result array */
     761          991 :     *standbys = palloc_array(SyncRepStandbyData, max_wal_senders);
     762              : 
     763              :     /* Quick exit if sync replication is not requested */
     764          991 :     if (SyncRepConfig == NULL)
     765          617 :         return 0;
     766              : 
     767              :     /* Collect raw data from shared memory */
     768          374 :     n = 0;
     769         4114 :     for (i = 0; i < max_wal_senders; i++)
     770              :     {
     771              :         volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
     772              :                                      * rearrangement */
     773              :         SyncRepStandbyData *stby;
     774              :         WalSndState state;      /* not included in SyncRepStandbyData */
     775              : 
     776         3740 :         walsnd = &WalSndCtl->walsnds[i];
     777         3740 :         stby = *standbys + n;
     778              : 
     779         3740 :         SpinLockAcquire(&walsnd->mutex);
     780         3740 :         stby->pid = walsnd->pid;
     781         3740 :         state = walsnd->state;
     782         3740 :         stby->write = walsnd->write;
     783         3740 :         stby->flush = walsnd->flush;
     784         3740 :         stby->apply = walsnd->apply;
     785         3740 :         stby->sync_standby_priority = walsnd->sync_standby_priority;
     786         3740 :         SpinLockRelease(&walsnd->mutex);
     787              : 
     788              :         /* Must be active */
     789         3740 :         if (stby->pid == 0)
     790         3317 :             continue;
     791              : 
     792              :         /* Must be streaming or stopping */
     793          423 :         if (state != WALSNDSTATE_STREAMING &&
     794              :             state != WALSNDSTATE_STOPPING)
     795            0 :             continue;
     796              : 
     797              :         /* Must be synchronous */
     798          423 :         if (stby->sync_standby_priority == 0)
     799           12 :             continue;
     800              : 
     801              :         /* Must have a valid flush position */
     802          411 :         if (!XLogRecPtrIsValid(stby->flush))
     803            0 :             continue;
     804              : 
     805              :         /* OK, it's a candidate */
     806          411 :         stby->walsnd_index = i;
     807          411 :         stby->is_me = (walsnd == MyWalSnd);
     808          411 :         n++;
     809              :     }
     810              : 
     811              :     /*
     812              :      * In quorum mode, we return all the candidates.  In priority mode, if we
     813              :      * have too many candidates then return only the num_sync ones of highest
     814              :      * priority.
     815              :      */
     816          374 :     if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY &&
     817          373 :         n > SyncRepConfig->num_sync)
     818              :     {
     819              :         /* Sort by priority ... */
     820           15 :         qsort(*standbys, n, sizeof(SyncRepStandbyData),
     821              :               standby_priority_comparator);
     822              :         /* ... then report just the first num_sync ones */
     823           15 :         n = SyncRepConfig->num_sync;
     824              :     }
     825              : 
     826          374 :     return n;
     827              : }
     828              : 
     829              : /*
     830              :  * qsort comparator to sort SyncRepStandbyData entries by priority
     831              :  */
     832              : static int
     833           36 : standby_priority_comparator(const void *a, const void *b)
     834              : {
     835           36 :     const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
     836           36 :     const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
     837              : 
     838              :     /* First, sort by increasing priority value */
     839           36 :     if (sa->sync_standby_priority != sb->sync_standby_priority)
     840           19 :         return sa->sync_standby_priority - sb->sync_standby_priority;
     841              : 
     842              :     /*
     843              :      * We might have equal priority values; arbitrarily break ties by position
     844              :      * in the WalSnd array.  (This is utterly bogus, since that is arrival
     845              :      * order dependent, but there are regression tests that rely on it.)
     846              :      */
     847           17 :     return sa->walsnd_index - sb->walsnd_index;
     848              : }
     849              : 
     850              : 
     851              : /*
     852              :  * Check if we are in the list of sync standbys, and if so, determine
     853              :  * priority sequence. Return priority if set, or zero to indicate that
     854              :  * we are not a potential sync standby.
     855              :  *
     856              :  * Compare the parameter SyncRepStandbyNames against the application_name
     857              :  * for this WALSender, or allow any name if we find a wildcard "*".
     858              :  */
     859              : static int
     860          789 : SyncRepGetStandbyPriority(void)
     861              : {
     862              :     const char *standby_name;
     863              :     int         priority;
     864          789 :     bool        found = false;
     865              : 
     866              :     /*
     867              :      * Since synchronous cascade replication is not allowed, we always set the
     868              :      * priority of cascading walsender to zero.
     869              :      */
     870          789 :     if (am_cascading_walsender)
     871           27 :         return 0;
     872              : 
     873          762 :     if (!SyncStandbysDefined() || SyncRepConfig == NULL)
     874          737 :         return 0;
     875              : 
     876           25 :     standby_name = SyncRepConfig->member_names;
     877           33 :     for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
     878              :     {
     879           32 :         if (pg_strcasecmp(standby_name, application_name) == 0 ||
     880           18 :             strcmp(standby_name, "*") == 0)
     881              :         {
     882           24 :             found = true;
     883           24 :             break;
     884              :         }
     885            8 :         standby_name += strlen(standby_name) + 1;
     886              :     }
     887              : 
     888           25 :     if (!found)
     889            1 :         return 0;
     890              : 
     891              :     /*
     892              :      * In quorum-based sync replication, all the standbys in the list have the
     893              :      * same priority, one.
     894              :      */
     895           24 :     return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
     896              : }
     897              : 
     898              : /*
     899              :  * Walk the specified queue from head.  Set the state of any backends that
     900              :  * need to be woken, remove them from the queue, and then wake them.
     901              :  * Pass all = true to wake whole queue; otherwise, just wake up to
     902              :  * the walsender's LSN.
     903              :  *
     904              :  * The caller must hold SyncRepLock in exclusive mode.
     905              :  */
     906              : static int
     907          170 : SyncRepWakeQueue(bool all, int mode)
     908              : {
     909          170 :     volatile WalSndCtlData *walsndctl = WalSndCtl;
     910          170 :     int         numprocs = 0;
     911              :     dlist_mutable_iter iter;
     912              : 
     913              :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
     914              :     Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
     915              :     Assert(SyncRepQueueIsOrderedByLSN(mode));
     916              : 
     917          195 :     dlist_foreach_modify(iter, &WalSndCtl->SyncRepQueue[mode])
     918              :     {
     919           28 :         PGPROC     *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
     920              : 
     921              :         /*
     922              :          * Assume the queue is ordered by LSN
     923              :          */
     924           28 :         if (!all && walsndctl->lsn[mode] < proc->waitLSN)
     925            3 :             return numprocs;
     926              : 
     927              :         /*
     928              :          * Remove from queue.
     929              :          */
     930           25 :         dlist_delete_thoroughly(&proc->syncRepLinks);
     931              : 
     932              :         /*
     933              :          * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
     934              :          * make sure that it sees the queue link being removed before the
     935              :          * syncRepState change.
     936              :          */
     937           25 :         pg_write_barrier();
     938              : 
     939              :         /*
     940              :          * Set state to complete; see SyncRepWaitForLSN() for discussion of
     941              :          * the various states.
     942              :          */
     943           25 :         proc->syncRepState = SYNC_REP_WAIT_COMPLETE;
     944              : 
     945              :         /*
     946              :          * Wake only when we have set state and removed from queue.
     947              :          */
     948           25 :         SetLatch(&(proc->procLatch));
     949              : 
     950           25 :         numprocs++;
     951              :     }
     952              : 
     953          167 :     return numprocs;
     954              : }
     955              : 
     956              : /*
     957              :  * The checkpointer calls this as needed to update the shared
     958              :  * sync_standbys_status flag, so that backends don't remain permanently wedged
     959              :  * if synchronous_standby_names is unset.  It's safe to check the current value
     960              :  * without the lock, because it's only ever updated by one process.  But we
     961              :  * must take the lock to change it.
     962              :  */
     963              : void
     964          685 : SyncRepUpdateSyncStandbysDefined(void)
     965              : {
     966          685 :     bool        sync_standbys_defined = SyncStandbysDefined();
     967              : 
     968          685 :     if (sync_standbys_defined !=
     969          685 :         ((WalSndCtl->sync_standbys_status & SYNC_STANDBY_DEFINED) != 0))
     970              :     {
     971           14 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     972              : 
     973              :         /*
     974              :          * If synchronous_standby_names has been reset to empty, it's futile
     975              :          * for backends to continue waiting.  Since the user no longer wants
     976              :          * synchronous replication, we'd better wake them up.
     977              :          */
     978           14 :         if (!sync_standbys_defined)
     979              :         {
     980              :             int         i;
     981              : 
     982            4 :             for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
     983            3 :                 SyncRepWakeQueue(true, i);
     984              :         }
     985              : 
     986              :         /*
     987              :          * Only allow people to join the queue when there are synchronous
     988              :          * standbys defined.  Without this interlock, there's a race
     989              :          * condition: we might wake up all the current waiters; then, some
     990              :          * backend that hasn't yet reloaded its config might go to sleep on
     991              :          * the queue (and never wake up).  This prevents that.
     992              :          */
     993           14 :         WalSndCtl->sync_standbys_status = SYNC_STANDBY_INIT |
     994              :             (sync_standbys_defined ? SYNC_STANDBY_DEFINED : 0);
     995              : 
     996           14 :         LWLockRelease(SyncRepLock);
     997              :     }
     998          671 :     else if ((WalSndCtl->sync_standbys_status & SYNC_STANDBY_INIT) == 0)
     999              :     {
    1000          597 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
    1001              : 
    1002              :         /*
    1003              :          * Note that there is no need to wake up the queues here.  We would
    1004              :          * reach this path only if SyncStandbysDefined() returns false, or it
    1005              :          * would mean that some backends are waiting with the GUC set.  See
    1006              :          * SyncRepWaitForLSN().
    1007              :          */
    1008              :         Assert(!SyncStandbysDefined());
    1009              : 
    1010              :         /*
    1011              :          * Even if there is no sync standby defined, let the readers of this
    1012              :          * information know that the sync standby data has been initialized.
    1013              :          * This can just be done once, hence the previous check on
    1014              :          * SYNC_STANDBY_INIT to avoid useless work.
    1015              :          */
    1016          597 :         WalSndCtl->sync_standbys_status |= SYNC_STANDBY_INIT;
    1017              : 
    1018          597 :         LWLockRelease(SyncRepLock);
    1019              :     }
    1020          685 : }
    1021              : 
    1022              : #ifdef USE_ASSERT_CHECKING
    1023              : static bool
    1024              : SyncRepQueueIsOrderedByLSN(int mode)
    1025              : {
    1026              :     XLogRecPtr  lastLSN;
    1027              :     dlist_iter  iter;
    1028              : 
    1029              :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
    1030              : 
    1031              :     lastLSN = InvalidXLogRecPtr;
    1032              : 
    1033              :     dlist_foreach(iter, &WalSndCtl->SyncRepQueue[mode])
    1034              :     {
    1035              :         PGPROC     *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
    1036              : 
    1037              :         /*
    1038              :          * Check the queue is ordered by LSN and that multiple procs don't
    1039              :          * have matching LSNs
    1040              :          */
    1041              :         if (proc->waitLSN <= lastLSN)
    1042              :             return false;
    1043              : 
    1044              :         lastLSN = proc->waitLSN;
    1045              :     }
    1046              : 
    1047              :     return true;
    1048              : }
    1049              : #endif
    1050              : 
    1051              : /*
    1052              :  * ===========================================================
    1053              :  * Synchronous Replication functions executed by any process
    1054              :  * ===========================================================
    1055              :  */
    1056              : 
    1057              : bool
    1058         1312 : check_synchronous_standby_names(char **newval, void **extra, GucSource source)
    1059              : {
    1060         1312 :     if (*newval != NULL && (*newval)[0] != '\0')
    1061           77 :     {
    1062              :         yyscan_t    scanner;
    1063              :         int         parse_rc;
    1064              :         SyncRepConfigData *pconf;
    1065              : 
    1066              :         /* Result of parsing is returned in one of these two variables */
    1067           77 :         SyncRepConfigData *syncrep_parse_result = NULL;
    1068           77 :         char       *syncrep_parse_error_msg = NULL;
    1069              : 
    1070              :         /* Parse the synchronous_standby_names string */
    1071           77 :         syncrep_scanner_init(*newval, &scanner);
    1072           77 :         parse_rc = syncrep_yyparse(&syncrep_parse_result, &syncrep_parse_error_msg, scanner);
    1073           77 :         syncrep_scanner_finish(scanner);
    1074              : 
    1075           77 :         if (parse_rc != 0 || syncrep_parse_result == NULL)
    1076              :         {
    1077            0 :             GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
    1078            0 :             if (syncrep_parse_error_msg)
    1079            0 :                 GUC_check_errdetail("%s", syncrep_parse_error_msg);
    1080              :             else
    1081              :                 /* translator: %s is a GUC name */
    1082            0 :                 GUC_check_errdetail("\"%s\" parser failed.",
    1083              :                                     "synchronous_standby_names");
    1084            0 :             return false;
    1085              :         }
    1086              : 
    1087           77 :         if (syncrep_parse_result->num_sync <= 0)
    1088              :         {
    1089            0 :             GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
    1090            0 :                              syncrep_parse_result->num_sync);
    1091            0 :             return false;
    1092              :         }
    1093              : 
    1094              :         /* GUC extra value must be guc_malloc'd, not palloc'd */
    1095              :         pconf = (SyncRepConfigData *)
    1096           77 :             guc_malloc(LOG, syncrep_parse_result->config_size);
    1097           77 :         if (pconf == NULL)
    1098            0 :             return false;
    1099           77 :         memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);
    1100              : 
    1101           77 :         *extra = pconf;
    1102              : 
    1103              :         /*
    1104              :          * We need not explicitly clean up syncrep_parse_result.  It, and any
    1105              :          * other cruft generated during parsing, will be freed when the
    1106              :          * current memory context is deleted.  (This code is generally run in
    1107              :          * a short-lived context used for config file processing, so that will
    1108              :          * not be very long.)
    1109              :          */
    1110              :     }
    1111              :     else
    1112         1235 :         *extra = NULL;
    1113              : 
    1114         1312 :     return true;
    1115              : }
    1116              : 
    1117              : void
    1118         1302 : assign_synchronous_standby_names(const char *newval, void *extra)
    1119              : {
    1120         1302 :     SyncRepConfig = (SyncRepConfigData *) extra;
    1121         1302 : }
    1122              : 
    1123              : void
    1124         3397 : assign_synchronous_commit(int newval, void *extra)
    1125              : {
    1126         3397 :     switch (newval)
    1127              :     {
    1128            0 :         case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
    1129            0 :             SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
    1130            0 :             break;
    1131         1362 :         case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
    1132         1362 :             SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
    1133         1362 :             break;
    1134            2 :         case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
    1135            2 :             SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
    1136            2 :             break;
    1137         2033 :         default:
    1138         2033 :             SyncRepWaitMode = SYNC_REP_NO_WAIT;
    1139         2033 :             break;
    1140              :     }
    1141         3397 : }
        

Generated by: LCOV version 2.0-1