LCOV - code coverage report
Current view: top level - src/backend/replication - syncrep.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 228 304 75.0 %
Date: 2020-06-03 11:07:14 Functions: 15 18 83.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * syncrep.c
       4             :  *
       5             :  * Synchronous replication is new as of PostgreSQL 9.1.
       6             :  *
       7             :  * If requested, transaction commits wait until their commit LSN are
       8             :  * acknowledged by the synchronous standbys.
       9             :  *
      10             :  * This module contains the code for waiting and release of backends.
      11             :  * All code in this module executes on the primary. The core streaming
      12             :  * replication transport remains within WALreceiver/WALsender modules.
      13             :  *
      14             :  * The essence of this design is that it isolates all logic about
      15             :  * waiting/releasing onto the primary. The primary defines which standbys
      16             :  * it wishes to wait for. The standbys are completely unaware of the
      17             :  * durability requirements of transactions on the primary, reducing the
      18             :  * complexity of the code and streamlining both standby operations and
      19             :  * network bandwidth because there is no requirement to ship
      20             :  * per-transaction state information.
      21             :  *
      22             :  * Replication is either synchronous or not synchronous (async). If it is
      23             :  * async, we just fastpath out of here. If it is sync, then we wait for
      24             :  * the write, flush or apply location on the standby before releasing
      25             :  * the waiting backend. Further complexity in that interaction is
      26             :  * expected in later releases.
      27             :  *
      28             :  * The best performing way to manage the waiting backends is to have a
      29             :  * single ordered queue of waiting backends, so that we can avoid
      30             :  * searching the through all waiters each time we receive a reply.
      31             :  *
      32             :  * In 9.5 or before only a single standby could be considered as
      33             :  * synchronous. In 9.6 we support a priority-based multiple synchronous
      34             :  * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
      35             :  * supported. The number of synchronous standbys that transactions
      36             :  * must wait for replies from is specified in synchronous_standby_names.
      37             :  * This parameter also specifies a list of standby names and the method
      38             :  * (FIRST and ANY) to choose synchronous standbys from the listed ones.
      39             :  *
      40             :  * The method FIRST specifies a priority-based synchronous replication
      41             :  * and makes transaction commits wait until their WAL records are
      42             :  * replicated to the requested number of synchronous standbys chosen based
      43             :  * on their priorities. The standbys whose names appear earlier in the list
      44             :  * are given higher priority and will be considered as synchronous.
      45             :  * Other standby servers appearing later in this list represent potential
      46             :  * synchronous standbys. If any of the current synchronous standbys
      47             :  * disconnects for whatever reason, it will be replaced immediately with
      48             :  * the next-highest-priority standby.
      49             :  *
      50             :  * The method ANY specifies a quorum-based synchronous replication
      51             :  * and makes transaction commits wait until their WAL records are
      52             :  * replicated to at least the requested number of synchronous standbys
      53             :  * in the list. All the standbys appearing in the list are considered as
      54             :  * candidates for quorum synchronous standbys.
      55             :  *
      56             :  * If neither FIRST nor ANY is specified, FIRST is used as the method.
      57             :  * This is for backward compatibility with 9.6 or before where only a
      58             :  * priority-based sync replication was supported.
      59             :  *
      60             :  * Before the standbys chosen from synchronous_standby_names can
      61             :  * become the synchronous standbys they must have caught up with
      62             :  * the primary; that may take some time. Once caught up,
      63             :  * the standbys which are considered as synchronous at that moment
      64             :  * will release waiters from the queue.
      65             :  *
      66             :  * Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group
      67             :  *
      68             :  * IDENTIFICATION
      69             :  *    src/backend/replication/syncrep.c
      70             :  *
      71             :  *-------------------------------------------------------------------------
      72             :  */
      73             : #include "postgres.h"
      74             : 
      75             : #include <unistd.h>
      76             : 
      77             : #include "access/xact.h"
      78             : #include "miscadmin.h"
      79             : #include "pgstat.h"
      80             : #include "replication/syncrep.h"
      81             : #include "replication/walsender.h"
      82             : #include "replication/walsender_private.h"
      83             : #include "storage/pmsignal.h"
      84             : #include "storage/proc.h"
      85             : #include "tcop/tcopprot.h"
      86             : #include "utils/builtins.h"
      87             : #include "utils/ps_status.h"
      88             : 
      89             : /* User-settable parameters for sync rep */
      90             : char       *SyncRepStandbyNames;
      91             : 
      92             : #define SyncStandbysDefined() \
      93             :     (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
      94             : 
      95             : static bool announce_next_takeover = true;
      96             : 
      97             : SyncRepConfigData *SyncRepConfig = NULL;
      98             : static int  SyncRepWaitMode = SYNC_REP_NO_WAIT;
      99             : 
     100             : static void SyncRepQueueInsert(int mode);
     101             : static void SyncRepCancelWait(void);
     102             : static int  SyncRepWakeQueue(bool all, int mode);
     103             : 
     104             : static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
     105             :                                  XLogRecPtr *flushPtr,
     106             :                                  XLogRecPtr *applyPtr,
     107             :                                  bool *am_sync);
     108             : static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
     109             :                                        XLogRecPtr *flushPtr,
     110             :                                        XLogRecPtr *applyPtr,
     111             :                                        SyncRepStandbyData *sync_standbys,
     112             :                                        int num_standbys);
     113             : static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
     114             :                                           XLogRecPtr *flushPtr,
     115             :                                           XLogRecPtr *applyPtr,
     116             :                                           SyncRepStandbyData *sync_standbys,
     117             :                                           int num_standbys,
     118             :                                           uint8 nth);
     119             : static int  SyncRepGetStandbyPriority(void);
     120             : static int  standby_priority_comparator(const void *a, const void *b);
     121             : static int  cmp_lsn(const void *a, const void *b);
     122             : 
     123             : #ifdef USE_ASSERT_CHECKING
     124             : static bool SyncRepQueueIsOrderedByLSN(int mode);
     125             : #endif
     126             : 
     127             : /*
     128             :  * ===========================================================
     129             :  * Synchronous Replication functions for normal user backends
     130             :  * ===========================================================
     131             :  */
     132             : 
     133             : /*
     134             :  * Wait for synchronous replication, if requested by user.
     135             :  *
     136             :  * Initially backends start in state SYNC_REP_NOT_WAITING and then
     137             :  * change that state to SYNC_REP_WAITING before adding ourselves
     138             :  * to the wait queue. During SyncRepWakeQueue() a WALSender changes
     139             :  * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
     140             :  * This backend then resets its state to SYNC_REP_NOT_WAITING.
     141             :  *
     142             :  * 'lsn' represents the LSN to wait for.  'commit' indicates whether this LSN
     143             :  * represents a commit record.  If it doesn't, then we wait only for the WAL
     144             :  * to be flushed if synchronous_commit is set to the higher level of
     145             :  * remote_apply, because only commit records provide apply feedback.
     146             :  */
     147             : void
     148      262048 : SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
     149             : {
     150      262048 :     char       *new_status = NULL;
     151             :     const char *old_status;
     152             :     int         mode;
     153             : 
     154             :     /*
     155             :      * This should be called while holding interrupts during a transaction
     156             :      * commit to prevent the follow-up shared memory queue cleanups to be
     157             :      * influenced by external interruptions.
     158             :      */
     159             :     Assert(InterruptHoldoffCount > 0);
     160             : 
     161             :     /* Cap the level for anything other than commit to remote flush only. */
     162      262048 :     if (commit)
     163      261962 :         mode = SyncRepWaitMode;
     164             :     else
     165          86 :         mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
     166             : 
     167             :     /*
     168             :      * Fast exit if user has not requested sync replication.
     169             :      */
     170      262048 :     if (!SyncRepRequested())
     171       10578 :         return;
     172             : 
     173             :     Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
     174             :     Assert(WalSndCtl != NULL);
     175             : 
     176      251470 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     177             :     Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
     178             : 
     179             :     /*
     180             :      * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
     181             :      * set.  See SyncRepUpdateSyncStandbysDefined.
     182             :      *
     183             :      * Also check that the standby hasn't already replied. Unlikely race
     184             :      * condition but we'll be fetching that cache line anyway so it's likely
     185             :      * to be a low cost check.
     186             :      */
     187      251470 :     if (!WalSndCtl->sync_standbys_defined ||
     188          88 :         lsn <= WalSndCtl->lsn[mode])
     189             :     {
     190      251382 :         LWLockRelease(SyncRepLock);
     191      251382 :         return;
     192             :     }
     193             : 
     194             :     /*
     195             :      * Set our waitLSN so WALSender will know when to wake us, and add
     196             :      * ourselves to the queue.
     197             :      */
     198          88 :     MyProc->waitLSN = lsn;
     199          88 :     MyProc->syncRepState = SYNC_REP_WAITING;
     200          88 :     SyncRepQueueInsert(mode);
     201             :     Assert(SyncRepQueueIsOrderedByLSN(mode));
     202          88 :     LWLockRelease(SyncRepLock);
     203             : 
     204             :     /* Alter ps display to show waiting for sync rep. */
     205          88 :     if (update_process_title)
     206             :     {
     207             :         int         len;
     208             : 
     209          88 :         old_status = get_ps_display(&len);
     210          88 :         new_status = (char *) palloc(len + 32 + 1);
     211          88 :         memcpy(new_status, old_status, len);
     212         176 :         sprintf(new_status + len, " waiting for %X/%X",
     213          88 :                 (uint32) (lsn >> 32), (uint32) lsn);
     214          88 :         set_ps_display(new_status);
     215          88 :         new_status[len] = '\0'; /* truncate off " waiting ..." */
     216             :     }
     217             : 
     218             :     /*
     219             :      * Wait for specified LSN to be confirmed.
     220             :      *
     221             :      * Each proc has its own wait latch, so we perform a normal latch
     222             :      * check/wait loop here.
     223             :      */
     224             :     for (;;)
     225          88 :     {
     226             :         int         rc;
     227             : 
     228             :         /* Must reset the latch before testing state. */
     229         176 :         ResetLatch(MyLatch);
     230             : 
     231             :         /*
     232             :          * Acquiring the lock is not needed, the latch ensures proper
     233             :          * barriers. If it looks like we're done, we must really be done,
     234             :          * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
     235             :          * it will never update it again, so we can't be seeing a stale value
     236             :          * in that case.
     237             :          */
     238         176 :         if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
     239          88 :             break;
     240             : 
     241             :         /*
     242             :          * If a wait for synchronous replication is pending, we can neither
     243             :          * acknowledge the commit nor raise ERROR or FATAL.  The latter would
     244             :          * lead the client to believe that the transaction aborted, which is
     245             :          * not true: it's already committed locally. The former is no good
     246             :          * either: the client has requested synchronous replication, and is
     247             :          * entitled to assume that an acknowledged commit is also replicated,
     248             :          * which might not be true. So in this case we issue a WARNING (which
     249             :          * some clients may be able to interpret) and shut off further output.
     250             :          * We do NOT reset ProcDiePending, so that the process will die after
     251             :          * the commit is cleaned up.
     252             :          */
     253          88 :         if (ProcDiePending)
     254             :         {
     255           0 :             ereport(WARNING,
     256             :                     (errcode(ERRCODE_ADMIN_SHUTDOWN),
     257             :                      errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
     258             :                      errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
     259           0 :             whereToSendOutput = DestNone;
     260           0 :             SyncRepCancelWait();
     261           0 :             break;
     262             :         }
     263             : 
     264             :         /*
     265             :          * It's unclear what to do if a query cancel interrupt arrives.  We
     266             :          * can't actually abort at this point, but ignoring the interrupt
     267             :          * altogether is not helpful, so we just terminate the wait with a
     268             :          * suitable warning.
     269             :          */
     270          88 :         if (QueryCancelPending)
     271             :         {
     272           0 :             QueryCancelPending = false;
     273           0 :             ereport(WARNING,
     274             :                     (errmsg("canceling wait for synchronous replication due to user request"),
     275             :                      errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
     276           0 :             SyncRepCancelWait();
     277           0 :             break;
     278             :         }
     279             : 
     280             :         /*
     281             :          * Wait on latch.  Any condition that should wake us up will set the
     282             :          * latch, so no need for timeout.
     283             :          */
     284          88 :         rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
     285             :                        WAIT_EVENT_SYNC_REP);
     286             : 
     287             :         /*
     288             :          * If the postmaster dies, we'll probably never get an acknowledgment,
     289             :          * because all the wal sender processes will exit. So just bail out.
     290             :          */
     291          88 :         if (rc & WL_POSTMASTER_DEATH)
     292             :         {
     293           0 :             ProcDiePending = true;
     294           0 :             whereToSendOutput = DestNone;
     295           0 :             SyncRepCancelWait();
     296           0 :             break;
     297             :         }
     298             :     }
     299             : 
     300             :     /*
     301             :      * WalSender has checked our LSN and has removed us from queue. Clean up
     302             :      * state and leave.  It's OK to reset these shared memory fields without
     303             :      * holding SyncRepLock, because any walsenders will ignore us anyway when
     304             :      * we're not on the queue.  We need a read barrier to make sure we see the
     305             :      * changes to the queue link (this might be unnecessary without
     306             :      * assertions, but better safe than sorry).
     307             :      */
     308          88 :     pg_read_barrier();
     309             :     Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
     310          88 :     MyProc->syncRepState = SYNC_REP_NOT_WAITING;
     311          88 :     MyProc->waitLSN = 0;
     312             : 
     313          88 :     if (new_status)
     314             :     {
     315             :         /* Reset ps display */
     316          88 :         set_ps_display(new_status);
     317          88 :         pfree(new_status);
     318             :     }
     319             : }
     320             : 
     321             : /*
     322             :  * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
     323             :  *
     324             :  * Usually we will go at tail of queue, though it's possible that we arrive
     325             :  * here out of order, so start at tail and work back to insertion point.
     326             :  */
     327             : static void
     328          88 : SyncRepQueueInsert(int mode)
     329             : {
     330             :     PGPROC     *proc;
     331             : 
     332             :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
     333          88 :     proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
     334          88 :                                    &(WalSndCtl->SyncRepQueue[mode]),
     335             :                                    offsetof(PGPROC, syncRepLinks));
     336             : 
     337          88 :     while (proc)
     338             :     {
     339             :         /*
     340             :          * Stop at the queue element that we should after to ensure the queue
     341             :          * is ordered by LSN.
     342             :          */
     343           0 :         if (proc->waitLSN < MyProc->waitLSN)
     344           0 :             break;
     345             : 
     346           0 :         proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
     347           0 :                                        &(proc->syncRepLinks),
     348             :                                        offsetof(PGPROC, syncRepLinks));
     349             :     }
     350             : 
     351          88 :     if (proc)
     352           0 :         SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
     353             :     else
     354          88 :         SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
     355          88 : }
     356             : 
     357             : /*
     358             :  * Acquire SyncRepLock and cancel any wait currently in progress.
     359             :  */
     360             : static void
     361           0 : SyncRepCancelWait(void)
     362             : {
     363           0 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     364           0 :     if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
     365           0 :         SHMQueueDelete(&(MyProc->syncRepLinks));
     366           0 :     MyProc->syncRepState = SYNC_REP_NOT_WAITING;
     367           0 :     LWLockRelease(SyncRepLock);
     368           0 : }
     369             : 
     370             : void
     371       11076 : SyncRepCleanupAtProcExit(void)
     372             : {
     373             :     /*
     374             :      * First check if we are removed from the queue without the lock to not
     375             :      * slow down backend exit.
     376             :      */
     377       11076 :     if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
     378             :     {
     379           0 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     380             : 
     381             :         /* maybe we have just been removed, so recheck */
     382           0 :         if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
     383           0 :             SHMQueueDelete(&(MyProc->syncRepLinks));
     384             : 
     385           0 :         LWLockRelease(SyncRepLock);
     386             :     }
     387       11076 : }
     388             : 
     389             : /*
     390             :  * ===========================================================
     391             :  * Synchronous Replication functions for wal sender processes
     392             :  * ===========================================================
     393             :  */
     394             : 
     395             : /*
     396             :  * Take any action required to initialise sync rep state from config
     397             :  * data. Called at WALSender startup and after each SIGHUP.
     398             :  */
     399             : void
     400         316 : SyncRepInitConfig(void)
     401             : {
     402             :     int         priority;
     403             : 
     404             :     /*
     405             :      * Determine if we are a potential sync standby and remember the result
     406             :      * for handling replies from standby.
     407             :      */
     408         316 :     priority = SyncRepGetStandbyPriority();
     409         316 :     if (MyWalSnd->sync_standby_priority != priority)
     410             :     {
     411          32 :         SpinLockAcquire(&MyWalSnd->mutex);
     412          32 :         MyWalSnd->sync_standby_priority = priority;
     413          32 :         SpinLockRelease(&MyWalSnd->mutex);
     414             : 
     415          32 :         ereport(DEBUG1,
     416             :                 (errmsg("standby \"%s\" now has synchronous standby priority %u",
     417             :                         application_name, priority)));
     418             :     }
     419         316 : }
     420             : 
     421             : /*
     422             :  * Update the LSNs on each queue based upon our latest state. This
     423             :  * implements a simple policy of first-valid-sync-standby-releases-waiter.
     424             :  *
     425             :  * Other policies are possible, which would change what we do here and
     426             :  * perhaps also which information we store as well.
     427             :  */
     428             : void
     429        1634 : SyncRepReleaseWaiters(void)
     430             : {
     431        1634 :     volatile WalSndCtlData *walsndctl = WalSndCtl;
     432             :     XLogRecPtr  writePtr;
     433             :     XLogRecPtr  flushPtr;
     434             :     XLogRecPtr  applyPtr;
     435             :     bool        got_recptr;
     436             :     bool        am_sync;
     437        1634 :     int         numwrite = 0;
     438        1634 :     int         numflush = 0;
     439        1634 :     int         numapply = 0;
     440             : 
     441             :     /*
     442             :      * If this WALSender is serving a standby that is not on the list of
     443             :      * potential sync standbys then we have nothing to do. If we are still
     444             :      * starting up, still running base backup or the current flush position is
     445             :      * still invalid, then leave quickly also.  Streaming or stopping WAL
     446             :      * senders are allowed to release waiters.
     447             :      */
     448        1634 :     if (MyWalSnd->sync_standby_priority == 0 ||
     449         202 :         (MyWalSnd->state != WALSNDSTATE_STREAMING &&
     450          44 :          MyWalSnd->state != WALSNDSTATE_STOPPING) ||
     451         202 :         XLogRecPtrIsInvalid(MyWalSnd->flush))
     452             :     {
     453        1432 :         announce_next_takeover = true;
     454        1434 :         return;
     455             :     }
     456             : 
     457             :     /*
     458             :      * We're a potential sync standby. Release waiters if there are enough
     459             :      * sync standbys and we are considered as sync.
     460             :      */
     461         202 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     462             : 
     463             :     /*
     464             :      * Check whether we are a sync standby or not, and calculate the synced
     465             :      * positions among all sync standbys.  (Note: although this step does not
     466             :      * of itself require holding SyncRepLock, it seems like a good idea to do
     467             :      * it after acquiring the lock.  This ensures that the WAL pointers we use
     468             :      * to release waiters are newer than any previous execution of this
     469             :      * routine used.)
     470             :      */
     471         202 :     got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
     472             : 
     473             :     /*
     474             :      * If we are managing a sync standby, though we weren't prior to this,
     475             :      * then announce we are now a sync standby.
     476             :      */
     477         202 :     if (announce_next_takeover && am_sync)
     478             :     {
     479          16 :         announce_next_takeover = false;
     480             : 
     481          16 :         if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
     482          16 :             ereport(LOG,
     483             :                     (errmsg("standby \"%s\" is now a synchronous standby with priority %u",
     484             :                             application_name, MyWalSnd->sync_standby_priority)));
     485             :         else
     486           0 :             ereport(LOG,
     487             :                     (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
     488             :                             application_name)));
     489             :     }
     490             : 
     491             :     /*
     492             :      * If the number of sync standbys is less than requested or we aren't
     493             :      * managing a sync standby then just leave.
     494             :      */
     495         202 :     if (!got_recptr || !am_sync)
     496             :     {
     497           2 :         LWLockRelease(SyncRepLock);
     498           2 :         announce_next_takeover = !am_sync;
     499           2 :         return;
     500             :     }
     501             : 
     502             :     /*
     503             :      * Set the lsn first so that when we wake backends they will release up to
     504             :      * this location.
     505             :      */
     506         200 :     if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
     507             :     {
     508          72 :         walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
     509          72 :         numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
     510             :     }
     511         200 :     if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
     512             :     {
     513          80 :         walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
     514          80 :         numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
     515             :     }
     516         200 :     if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
     517             :     {
     518          78 :         walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
     519          78 :         numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
     520             :     }
     521             : 
     522         200 :     LWLockRelease(SyncRepLock);
     523             : 
     524         200 :     elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
     525             :          numwrite, (uint32) (writePtr >> 32), (uint32) writePtr,
     526             :          numflush, (uint32) (flushPtr >> 32), (uint32) flushPtr,
     527             :          numapply, (uint32) (applyPtr >> 32), (uint32) applyPtr);
     528             : }
     529             : 
     530             : /*
     531             :  * Calculate the synced Write, Flush and Apply positions among sync standbys.
     532             :  *
     533             :  * Return false if the number of sync standbys is less than
     534             :  * synchronous_standby_names specifies. Otherwise return true and
     535             :  * store the positions into *writePtr, *flushPtr and *applyPtr.
     536             :  *
     537             :  * On return, *am_sync is set to true if this walsender is connecting to
     538             :  * sync standby. Otherwise it's set to false.
     539             :  */
     540             : static bool
     541         202 : SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
     542             :                      XLogRecPtr *applyPtr, bool *am_sync)
     543             : {
     544             :     SyncRepStandbyData *sync_standbys;
     545             :     int         num_standbys;
     546             :     int         i;
     547             : 
     548             :     /* Initialize default results */
     549         202 :     *writePtr = InvalidXLogRecPtr;
     550         202 :     *flushPtr = InvalidXLogRecPtr;
     551         202 :     *applyPtr = InvalidXLogRecPtr;
     552         202 :     *am_sync = false;
     553             : 
     554             :     /* Quick out if not even configured to be synchronous */
     555         202 :     if (SyncRepConfig == NULL)
     556           0 :         return false;
     557             : 
     558             :     /* Get standbys that are considered as synchronous at this moment */
     559         202 :     num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
     560             : 
     561             :     /* Am I among the candidate sync standbys? */
     562         204 :     for (i = 0; i < num_standbys; i++)
     563             :     {
     564         202 :         if (sync_standbys[i].is_me)
     565             :         {
     566         200 :             *am_sync = true;
     567         200 :             break;
     568             :         }
     569             :     }
     570             : 
     571             :     /*
     572             :      * Nothing more to do if we are not managing a sync standby or there are
     573             :      * not enough synchronous standbys.
     574             :      */
     575         202 :     if (!(*am_sync) ||
     576         200 :         num_standbys < SyncRepConfig->num_sync)
     577             :     {
     578           2 :         pfree(sync_standbys);
     579           2 :         return false;
     580             :     }
     581             : 
     582             :     /*
     583             :      * In a priority-based sync replication, the synced positions are the
     584             :      * oldest ones among sync standbys. In a quorum-based, they are the Nth
     585             :      * latest ones.
     586             :      *
     587             :      * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
     588             :      * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
     589             :      * because it's a bit more efficient.
     590             :      *
     591             :      * XXX If the numbers of current and requested sync standbys are the same,
     592             :      * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
     593             :      * positions even in a quorum-based sync replication.
     594             :      */
     595         200 :     if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
     596             :     {
     597         200 :         SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
     598             :                                    sync_standbys, num_standbys);
     599             :     }
     600             :     else
     601             :     {
     602           0 :         SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
     603             :                                       sync_standbys, num_standbys,
     604           0 :                                       SyncRepConfig->num_sync);
     605             :     }
     606             : 
     607         200 :     pfree(sync_standbys);
     608         200 :     return true;
     609             : }
     610             : 
     611             : /*
     612             :  * Calculate the oldest Write, Flush and Apply positions among sync standbys.
     613             :  */
     614             : static void
     615         200 : SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
     616             :                            XLogRecPtr *flushPtr,
     617             :                            XLogRecPtr *applyPtr,
     618             :                            SyncRepStandbyData *sync_standbys,
     619             :                            int num_standbys)
     620             : {
     621             :     int         i;
     622             : 
     623             :     /*
     624             :      * Scan through all sync standbys and calculate the oldest Write, Flush
     625             :      * and Apply positions.  We assume *writePtr et al were initialized to
     626             :      * InvalidXLogRecPtr.
     627             :      */
     628         400 :     for (i = 0; i < num_standbys; i++)
     629             :     {
     630         200 :         XLogRecPtr  write = sync_standbys[i].write;
     631         200 :         XLogRecPtr  flush = sync_standbys[i].flush;
     632         200 :         XLogRecPtr  apply = sync_standbys[i].apply;
     633             : 
     634         200 :         if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
     635         200 :             *writePtr = write;
     636         200 :         if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
     637         200 :             *flushPtr = flush;
     638         200 :         if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
     639         200 :             *applyPtr = apply;
     640             :     }
     641         200 : }
     642             : 
     643             : /*
     644             :  * Calculate the Nth latest Write, Flush and Apply positions among sync
     645             :  * standbys.
     646             :  */
     647             : static void
     648           0 : SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
     649             :                               XLogRecPtr *flushPtr,
     650             :                               XLogRecPtr *applyPtr,
     651             :                               SyncRepStandbyData *sync_standbys,
     652             :                               int num_standbys,
     653             :                               uint8 nth)
     654             : {
     655             :     XLogRecPtr *write_array;
     656             :     XLogRecPtr *flush_array;
     657             :     XLogRecPtr *apply_array;
     658             :     int         i;
     659             : 
     660             :     /* Should have enough candidates, or somebody messed up */
     661             :     Assert(nth > 0 && nth <= num_standbys);
     662             : 
     663           0 :     write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
     664           0 :     flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
     665           0 :     apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
     666             : 
     667           0 :     for (i = 0; i < num_standbys; i++)
     668             :     {
     669           0 :         write_array[i] = sync_standbys[i].write;
     670           0 :         flush_array[i] = sync_standbys[i].flush;
     671           0 :         apply_array[i] = sync_standbys[i].apply;
     672             :     }
     673             : 
     674             :     /* Sort each array in descending order */
     675           0 :     qsort(write_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
     676           0 :     qsort(flush_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
     677           0 :     qsort(apply_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
     678             : 
     679             :     /* Get Nth latest Write, Flush, Apply positions */
     680           0 :     *writePtr = write_array[nth - 1];
     681           0 :     *flushPtr = flush_array[nth - 1];
     682           0 :     *applyPtr = apply_array[nth - 1];
     683             : 
     684           0 :     pfree(write_array);
     685           0 :     pfree(flush_array);
     686           0 :     pfree(apply_array);
     687           0 : }
     688             : 
     689             : /*
     690             :  * Compare lsn in order to sort array in descending order.
     691             :  */
     692             : static int
     693           0 : cmp_lsn(const void *a, const void *b)
     694             : {
     695           0 :     XLogRecPtr  lsn1 = *((const XLogRecPtr *) a);
     696           0 :     XLogRecPtr  lsn2 = *((const XLogRecPtr *) b);
     697             : 
     698           0 :     if (lsn1 > lsn2)
     699           0 :         return -1;
     700           0 :     else if (lsn1 == lsn2)
     701           0 :         return 0;
     702             :     else
     703           0 :         return 1;
     704             : }
     705             : 
     706             : /*
     707             :  * Return data about walsenders that are candidates to be sync standbys.
     708             :  *
     709             :  * *standbys is set to a palloc'd array of structs of per-walsender data,
     710             :  * and the number of valid entries (candidate sync senders) is returned.
     711             :  * (This might be more or fewer than num_sync; caller must check.)
     712             :  */
     713             : int
     714         534 : SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
     715             : {
     716             :     int         i;
     717             :     int         n;
     718             : 
     719             :     /* Create result array */
     720         534 :     *standbys = (SyncRepStandbyData *)
     721         534 :         palloc(max_wal_senders * sizeof(SyncRepStandbyData));
     722             : 
     723             :     /* Quick exit if sync replication is not requested */
     724         534 :     if (SyncRepConfig == NULL)
     725         294 :         return 0;
     726             : 
     727             :     /* Collect raw data from shared memory */
     728         240 :     n = 0;
     729        1440 :     for (i = 0; i < max_wal_senders; i++)
     730             :     {
     731             :         volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
     732             :                                      * rearrangement */
     733             :         SyncRepStandbyData *stby;
     734             :         WalSndState state;      /* not included in SyncRepStandbyData */
     735             : 
     736        1200 :         walsnd = &WalSndCtl->walsnds[i];
     737        1200 :         stby = *standbys + n;
     738             : 
     739        1200 :         SpinLockAcquire(&walsnd->mutex);
     740        1200 :         stby->pid = walsnd->pid;
     741        1200 :         state = walsnd->state;
     742        1200 :         stby->write = walsnd->write;
     743        1200 :         stby->flush = walsnd->flush;
     744        1200 :         stby->apply = walsnd->apply;
     745        1200 :         stby->sync_standby_priority = walsnd->sync_standby_priority;
     746        1200 :         SpinLockRelease(&walsnd->mutex);
     747             : 
     748             :         /* Must be active */
     749        1200 :         if (stby->pid == 0)
     750         892 :             continue;
     751             : 
     752             :         /* Must be streaming or stopping */
     753         308 :         if (state != WALSNDSTATE_STREAMING &&
     754             :             state != WALSNDSTATE_STOPPING)
     755           0 :             continue;
     756             : 
     757             :         /* Must be synchronous */
     758         308 :         if (stby->sync_standby_priority == 0)
     759          12 :             continue;
     760             : 
     761             :         /* Must have a valid flush position */
     762         296 :         if (XLogRecPtrIsInvalid(stby->flush))
     763           6 :             continue;
     764             : 
     765             :         /* OK, it's a candidate */
     766         290 :         stby->walsnd_index = i;
     767         290 :         stby->is_me = (walsnd == MyWalSnd);
     768         290 :         n++;
     769             :     }
     770             : 
     771             :     /*
     772             :      * In quorum mode, we return all the candidates.  In priority mode, if we
     773             :      * have too many candidates then return only the num_sync ones of highest
     774             :      * priority.
     775             :      */
     776         240 :     if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY &&
     777         236 :         n > SyncRepConfig->num_sync)
     778             :     {
     779             :         /* Sort by priority ... */
     780          18 :         qsort(*standbys, n, sizeof(SyncRepStandbyData),
     781             :               standby_priority_comparator);
     782             :         /* ... then report just the first num_sync ones */
     783          18 :         n = SyncRepConfig->num_sync;
     784             :     }
     785             : 
     786         240 :     return n;
     787             : }
     788             : 
     789             : /*
     790             :  * qsort comparator to sort SyncRepStandbyData entries by priority
     791             :  */
     792             : static int
     793          40 : standby_priority_comparator(const void *a, const void *b)
     794             : {
     795          40 :     const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
     796          40 :     const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
     797             : 
     798             :     /* First, sort by increasing priority value */
     799          40 :     if (sa->sync_standby_priority != sb->sync_standby_priority)
     800          18 :         return sa->sync_standby_priority - sb->sync_standby_priority;
     801             : 
     802             :     /*
     803             :      * We might have equal priority values; arbitrarily break ties by position
     804             :      * in the WALSnd array.  (This is utterly bogus, since that is arrival
     805             :      * order dependent, but there are regression tests that rely on it.)
     806             :      */
     807          22 :     return sa->walsnd_index - sb->walsnd_index;
     808             : }
     809             : 
     810             : 
     811             : /*
     812             :  * Check if we are in the list of sync standbys, and if so, determine
     813             :  * priority sequence. Return priority if set, or zero to indicate that
     814             :  * we are not a potential sync standby.
     815             :  *
     816             :  * Compare the parameter SyncRepStandbyNames against the application_name
     817             :  * for this WALSender, or allow any name if we find a wildcard "*".
     818             :  */
     819             : static int
     820         316 : SyncRepGetStandbyPriority(void)
     821             : {
     822             :     const char *standby_name;
     823             :     int         priority;
     824         316 :     bool        found = false;
     825             : 
     826             :     /*
     827             :      * Since synchronous cascade replication is not allowed, we always set the
     828             :      * priority of cascading walsender to zero.
     829             :      */
     830         316 :     if (am_cascading_walsender)
     831          12 :         return 0;
     832             : 
     833         304 :     if (!SyncStandbysDefined() || SyncRepConfig == NULL)
     834         260 :         return 0;
     835             : 
     836          44 :     standby_name = SyncRepConfig->member_names;
     837          60 :     for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
     838             :     {
     839          58 :         if (pg_strcasecmp(standby_name, application_name) == 0 ||
     840          36 :             strcmp(standby_name, "*") == 0)
     841             :         {
     842          42 :             found = true;
     843          42 :             break;
     844             :         }
     845          16 :         standby_name += strlen(standby_name) + 1;
     846             :     }
     847             : 
     848          44 :     if (!found)
     849           2 :         return 0;
     850             : 
     851             :     /*
     852             :      * In quorum-based sync replication, all the standbys in the list have the
     853             :      * same priority, one.
     854             :      */
     855          42 :     return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
     856             : }
     857             : 
     858             : /*
     859             :  * Walk the specified queue from head.  Set the state of any backends that
     860             :  * need to be woken, remove them from the queue, and then wake them.
     861             :  * Pass all = true to wake whole queue; otherwise, just wake up to
     862             :  * the walsender's LSN.
     863             :  *
     864             :  * The caller must hold SyncRepLock in exclusive mode.
     865             :  */
     866             : static int
     867         230 : SyncRepWakeQueue(bool all, int mode)
     868             : {
     869         230 :     volatile WalSndCtlData *walsndctl = WalSndCtl;
     870         230 :     PGPROC     *proc = NULL;
     871         230 :     PGPROC     *thisproc = NULL;
     872         230 :     int         numprocs = 0;
     873             : 
     874             :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
     875             :     Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
     876             :     Assert(SyncRepQueueIsOrderedByLSN(mode));
     877             : 
     878         230 :     proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
     879         230 :                                    &(WalSndCtl->SyncRepQueue[mode]),
     880             :                                    offsetof(PGPROC, syncRepLinks));
     881             : 
     882         270 :     while (proc)
     883             :     {
     884             :         /*
     885             :          * Assume the queue is ordered by LSN
     886             :          */
     887          50 :         if (!all && walsndctl->lsn[mode] < proc->waitLSN)
     888          10 :             return numprocs;
     889             : 
     890             :         /*
     891             :          * Move to next proc, so we can delete thisproc from the queue.
     892             :          * thisproc is valid, proc may be NULL after this.
     893             :          */
     894          40 :         thisproc = proc;
     895          40 :         proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
     896          40 :                                        &(proc->syncRepLinks),
     897             :                                        offsetof(PGPROC, syncRepLinks));
     898             : 
     899             :         /*
     900             :          * Remove thisproc from queue.
     901             :          */
     902          40 :         SHMQueueDelete(&(thisproc->syncRepLinks));
     903             : 
     904             :         /*
     905             :          * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
     906             :          * make sure that it sees the queue link being removed before the
     907             :          * syncRepState change.
     908             :          */
     909          40 :         pg_write_barrier();
     910             : 
     911             :         /*
     912             :          * Set state to complete; see SyncRepWaitForLSN() for discussion of
     913             :          * the various states.
     914             :          */
     915          40 :         thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;
     916             : 
     917             :         /*
     918             :          * Wake only when we have set state and removed from queue.
     919             :          */
     920          40 :         SetLatch(&(thisproc->procLatch));
     921             : 
     922          40 :         numprocs++;
     923             :     }
     924             : 
     925         220 :     return numprocs;
     926             : }
     927             : 
     928             : /*
     929             :  * The checkpointer calls this as needed to update the shared
     930             :  * sync_standbys_defined flag, so that backends don't remain permanently wedged
     931             :  * if synchronous_standby_names is unset.  It's safe to check the current value
     932             :  * without the lock, because it's only ever updated by one process.  But we
     933             :  * must take the lock to change it.
     934             :  */
     935             : void
     936         422 : SyncRepUpdateSyncStandbysDefined(void)
     937             : {
     938         422 :     bool        sync_standbys_defined = SyncStandbysDefined();
     939             : 
     940         422 :     if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
     941             :     {
     942          16 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     943             : 
     944             :         /*
     945             :          * If synchronous_standby_names has been reset to empty, it's futile
     946             :          * for backends to continue waiting.  Since the user no longer wants
     947             :          * synchronous replication, we'd better wake them up.
     948             :          */
     949          16 :         if (!sync_standbys_defined)
     950             :         {
     951             :             int         i;
     952             : 
     953           0 :             for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
     954           0 :                 SyncRepWakeQueue(true, i);
     955             :         }
     956             : 
     957             :         /*
     958             :          * Only allow people to join the queue when there are synchronous
     959             :          * standbys defined.  Without this interlock, there's a race
     960             :          * condition: we might wake up all the current waiters; then, some
     961             :          * backend that hasn't yet reloaded its config might go to sleep on
     962             :          * the queue (and never wake up).  This prevents that.
     963             :          */
     964          16 :         WalSndCtl->sync_standbys_defined = sync_standbys_defined;
     965             : 
     966          16 :         LWLockRelease(SyncRepLock);
     967             :     }
     968         422 : }
     969             : 
     970             : #ifdef USE_ASSERT_CHECKING
     971             : static bool
     972             : SyncRepQueueIsOrderedByLSN(int mode)
     973             : {
     974             :     PGPROC     *proc = NULL;
     975             :     XLogRecPtr  lastLSN;
     976             : 
     977             :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
     978             : 
     979             :     lastLSN = 0;
     980             : 
     981             :     proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
     982             :                                    &(WalSndCtl->SyncRepQueue[mode]),
     983             :                                    offsetof(PGPROC, syncRepLinks));
     984             : 
     985             :     while (proc)
     986             :     {
     987             :         /*
     988             :          * Check the queue is ordered by LSN and that multiple procs don't
     989             :          * have matching LSNs
     990             :          */
     991             :         if (proc->waitLSN <= lastLSN)
     992             :             return false;
     993             : 
     994             :         lastLSN = proc->waitLSN;
     995             : 
     996             :         proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
     997             :                                        &(proc->syncRepLinks),
     998             :                                        offsetof(PGPROC, syncRepLinks));
     999             :     }
    1000             : 
    1001             :     return true;
    1002             : }
    1003             : #endif
    1004             : 
    1005             : /*
    1006             :  * ===========================================================
    1007             :  * Synchronous Replication functions executed by any process
    1008             :  * ===========================================================
    1009             :  */
    1010             : 
    1011             : bool
    1012        2326 : check_synchronous_standby_names(char **newval, void **extra, GucSource source)
    1013             : {
    1014        2326 :     if (*newval != NULL && (*newval)[0] != '\0')
    1015         142 :     {
    1016             :         int         parse_rc;
    1017             :         SyncRepConfigData *pconf;
    1018             : 
    1019             :         /* Reset communication variables to ensure a fresh start */
    1020         142 :         syncrep_parse_result = NULL;
    1021         142 :         syncrep_parse_error_msg = NULL;
    1022             : 
    1023             :         /* Parse the synchronous_standby_names string */
    1024         142 :         syncrep_scanner_init(*newval);
    1025         142 :         parse_rc = syncrep_yyparse();
    1026         142 :         syncrep_scanner_finish();
    1027             : 
    1028         142 :         if (parse_rc != 0 || syncrep_parse_result == NULL)
    1029             :         {
    1030           0 :             GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
    1031           0 :             if (syncrep_parse_error_msg)
    1032           0 :                 GUC_check_errdetail("%s", syncrep_parse_error_msg);
    1033             :             else
    1034           0 :                 GUC_check_errdetail("synchronous_standby_names parser failed");
    1035           0 :             return false;
    1036             :         }
    1037             : 
    1038         142 :         if (syncrep_parse_result->num_sync <= 0)
    1039             :         {
    1040           0 :             GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
    1041           0 :                              syncrep_parse_result->num_sync);
    1042           0 :             return false;
    1043             :         }
    1044             : 
    1045             :         /* GUC extra value must be malloc'd, not palloc'd */
    1046             :         pconf = (SyncRepConfigData *)
    1047         142 :             malloc(syncrep_parse_result->config_size);
    1048         142 :         if (pconf == NULL)
    1049           0 :             return false;
    1050         142 :         memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);
    1051             : 
    1052         142 :         *extra = (void *) pconf;
    1053             : 
    1054             :         /*
    1055             :          * We need not explicitly clean up syncrep_parse_result.  It, and any
    1056             :          * other cruft generated during parsing, will be freed when the
    1057             :          * current memory context is deleted.  (This code is generally run in
    1058             :          * a short-lived context used for config file processing, so that will
    1059             :          * not be very long.)
    1060             :          */
    1061             :     }
    1062             :     else
    1063        2184 :         *extra = NULL;
    1064             : 
    1065        2326 :     return true;
    1066             : }
    1067             : 
    1068             : void
    1069        2308 : assign_synchronous_standby_names(const char *newval, void *extra)
    1070             : {
    1071        2308 :     SyncRepConfig = (SyncRepConfigData *) extra;
    1072        2308 : }
    1073             : 
    1074             : void
    1075        2604 : assign_synchronous_commit(int newval, void *extra)
    1076             : {
    1077        2604 :     switch (newval)
    1078             :     {
    1079           0 :         case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
    1080           0 :             SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
    1081           0 :             break;
    1082        2360 :         case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
    1083        2360 :             SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
    1084        2360 :             break;
    1085           0 :         case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
    1086           0 :             SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
    1087           0 :             break;
    1088         244 :         default:
    1089         244 :             SyncRepWaitMode = SYNC_REP_NO_WAIT;
    1090         244 :             break;
    1091             :     }
    1092        2604 : }

Generated by: LCOV version 1.13