LCOV - code coverage report
Current view: top level - src/backend/replication/logical - logicalctl.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 133 136 97.8 %
Date: 2025-12-25 21:18:31 Functions: 17 17 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * logicalctl.c
       3             :  *      Functionality to control logical decoding status online.
       4             :  *
       5             :  * This module enables dynamic control of logical decoding availability.
       6             :  * Logical decoding becomes active under two conditions: when the wal_level
       7             :  * parameter is set to 'logical', or when at least one valid logical replication
       8             :  * slot exists with wal_level set to 'replica'. The system disables logical
       9             :  * decoding when neither condition is met. Therefore, the dynamic control
      10             :  * of logical decoding availability is required only when wal_level is set
      11             :  * to 'replica'. Logical decoding is always enabled when wal_level='logical'
      12             :  * and always disabled when wal_level='minimal'.
      13             :  *
      14             :  * The core concept of dynamically enabling and disabling logical decoding
      15             :  * is to separately control two aspects: writing information required for
      16             :  * logical decoding to WAL records, and using logical decoding itself. During
      17             :  * activation, we first enable logical WAL writing while keeping logical
      18             :  * decoding disabled. This change is reflected in the read-only
      19             :  * effective_wal_level GUC parameter. Once we ensure that all processes have
      20             :  * updated to the latest effective_wal_level value, we then enable logical
      21             :  * decoding. Deactivation follows a similar careful, multi-step process
      22             :  * in reverse order.
      23             :  *
      24             :  * While activation occurs synchronously right after creating the first
      25             :  * logical slot, deactivation happens asynchronously through the checkpointer
      26             :  * process. This design avoids a race condition at the end of recovery; see
      27             :  * the comments in UpdateLogicalDecodingStatusEndOfRecovery() for details.
      28             :  * Asynchronous deactivation also avoids excessive toggling of the logical
      29             :  * decoding status in workloads that repeatedly create and drop a single
      30             :  * logical slot. On the other hand, this lazy approach can delay changes
      31             :  * to effective_wal_level and the disabling logical decoding, especially
      32             :  * when the checkpointer is busy with other tasks. We chose this lazy approach
      33             :  * in all deactivation paths to keep the implementation simple, even though
      34             :  * laziness is strictly required only for end-of-recovery cases. Future work
      35             :  * might address this limitation either by using a dedicated worker instead
      36             :  * of the checkpointer, or by implementing synchronous waiting during slot
      37             :  * drops if workloads are significantly affected by the lazy deactivation
      38             :  * of logical decoding.
      39             :  *
      40             :  * Standby servers use the primary server's effective_wal_level and logical
      41             :  * decoding status. Unlike normal activation and deactivation, these
      42             :  * are updated simultaneously without status change coordination, solely by
      43             :  * replaying XLOG_LOGICAL_DECODING_STATUS_CHANGE records. The local wal_level
      44             :  * setting has no effect during this time. Upon promotion, we update the
      45             :  * logical decoding status based on local conditions: the wal_level value and
      46             :  * the presence of logical slots.
      47             :  *
      48             :  * In the future, we could extend support to include automatic transitions
      49             :  * of effective_wal_level between 'minimal' and 'logical' WAL levels. However,
      50             :  * this enhancement would require additional coordination mechanisms and
      51             :  * careful implementation of operations such as terminating walsenders and
      52             :  * archiver processes while carefully considering the sequence of operations
      53             :  * to ensure system stability during these transitions.
      54             :  *
      55             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
      56             :  * Portions Copyright (c) 1994, Regents of the University of California
      57             :  *
      58             :  * IDENTIFICATION
      59             :  *    src/backend/replication/logical/logicalctl.c
      60             :  *
      61             :  *-------------------------------------------------------------------------
      62             :  */
      63             : 
      64             : #include "postgres.h"
      65             : 
      66             : #include "access/xloginsert.h"
      67             : #include "catalog/pg_control.h"
      68             : #include "miscadmin.h"
      69             : #include "replication/slot.h"
      70             : #include "storage/ipc.h"
      71             : #include "storage/lmgr.h"
      72             : #include "storage/proc.h"
      73             : #include "storage/procarray.h"
      74             : #include "utils/injection_point.h"
      75             : 
      76             : /*
      77             :  * Struct for controlling the logical decoding status.
      78             :  *
      79             :  * This struct is protected by LogicalDecodingControlLock.
      80             :  */
      81             : typedef struct LogicalDecodingCtlData
      82             : {
      83             :     /*
      84             :      * This is the authoritative value used by all processes to determine
      85             :      * whether to write additional information required by logical decoding to
      86             :      * WAL. Since this information could be checked frequently, each process
      87             :      * caches this value in XLogLogicalInfo for better performance.
      88             :      */
      89             :     bool        xlog_logical_info;
      90             : 
      91             :     /* True if logical decoding is available in the system */
      92             :     bool        logical_decoding_enabled;
      93             : 
      94             :     /* True if logical decoding might need to be disabled */
      95             :     bool        pending_disable;
      96             : } LogicalDecodingCtlData;
      97             : 
      98             : static LogicalDecodingCtlData *LogicalDecodingCtl = NULL;
      99             : 
     100             : /*
     101             :  * A process-local cache of LogicalDecodingCtl->xlog_logical_info. This is
     102             :  * initialized at process startup, and updated when processing the process
     103             :  * barrier signal in ProcessBarrierUpdateXLogLogicalInfo(). If the process
     104             :  * is in an XID-assigned transaction, the cache update is delayed until the
     105             :  * transaction ends. See the comments for XLogLogicalInfoUpdatePending for details.
     106             :  */
     107             : bool        XLogLogicalInfo = false;
     108             : 
     109             : /*
     110             :  * When receiving the PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO signal, if
     111             :  * an XID is assigned to the current transaction, the process sets this flag and
     112             :  * delays the XLogLogicalInfo update until the transaction ends. This ensures
     113             :  * that the XLogLogicalInfo value (typically accessed via XLogLogicalInfoActive)
     114             :  * remains consistent throughout the transaction.
     115             :  */
     116             : static bool XLogLogicalInfoUpdatePending = false;
     117             : 
     118             : static void update_xlog_logical_info(void);
     119             : static void abort_logical_decoding_activation(int code, Datum arg);
     120             : static void write_logical_decoding_status_update_record(bool status);
     121             : 
     122             : Size
     123        8758 : LogicalDecodingCtlShmemSize(void)
     124             : {
     125        8758 :     return sizeof(LogicalDecodingCtlData);
     126             : }
     127             : 
     128             : void
     129        2266 : LogicalDecodingCtlShmemInit(void)
     130             : {
     131             :     bool        found;
     132             : 
     133        2266 :     LogicalDecodingCtl = ShmemInitStruct("Logical decoding control",
     134             :                                          LogicalDecodingCtlShmemSize(),
     135             :                                          &found);
     136             : 
     137        2266 :     if (!found)
     138        2266 :         MemSet(LogicalDecodingCtl, 0, LogicalDecodingCtlShmemSize());
     139        2266 : }
     140             : 
     141             : /*
     142             :  * Initialize the logical decoding status in shmem at server startup. This
     143             :  * must be called ONCE during postmaster or standalone-backend startup.
     144             :  */
     145             : void
     146        1974 : StartupLogicalDecodingStatus(bool last_status)
     147             : {
     148             :     /* Logical decoding is always disabled when 'minimal' WAL level */
     149        1974 :     if (wal_level == WAL_LEVEL_MINIMAL)
     150         688 :         return;
     151             : 
     152             :     /*
     153             :      * Set the initial logical decoding status based on the last status. If
     154             :      * logical decoding was enabled before the last shutdown, it remains
     155             :      * enabled as we might have set wal_level='logical' or have at least one
     156             :      * logical slot.
     157             :      */
     158        1286 :     LogicalDecodingCtl->xlog_logical_info = last_status;
     159        1286 :     LogicalDecodingCtl->logical_decoding_enabled = last_status;
     160             : }
     161             : 
     162             : /*
     163             :  * Update the XLogLogicalInfo cache.
     164             :  */
     165             : static inline void
     166       49458 : update_xlog_logical_info(void)
     167             : {
     168       49458 :     XLogLogicalInfo = IsXLogLogicalInfoEnabled();
     169       49458 : }
     170             : 
     171             : /*
     172             :  * Initialize XLogLogicalInfo backend-private cache. This routine is called
     173             :  * during process initialization.
     174             :  */
     175             : void
     176       45994 : InitializeProcessXLogLogicalInfo(void)
     177             : {
     178       45994 :     update_xlog_logical_info();
     179       45994 : }
     180             : 
     181             : /*
     182             :  * This routine is called when we are told to update XLogLogicalInfo
     183             :  * by a ProcSignalBarrier.
     184             :  */
     185             : bool
     186        3464 : ProcessBarrierUpdateXLogLogicalInfo(void)
     187             : {
     188        3464 :     if (GetTopTransactionIdIfAny() != InvalidTransactionId)
     189             :     {
     190             :         /* Delay updating XLogLogicalInfo until the transaction end */
     191           0 :         XLogLogicalInfoUpdatePending = true;
     192             :     }
     193             :     else
     194        3464 :         update_xlog_logical_info();
     195             : 
     196        3464 :     return true;
     197             : }
     198             : 
     199             : /*
     200             :  * Check the shared memory state and return true if logical decoding is
     201             :  * enabled on the system.
     202             :  */
     203             : bool
     204       34934 : IsLogicalDecodingEnabled(void)
     205             : {
     206             :     bool        enabled;
     207             : 
     208       34934 :     LWLockAcquire(LogicalDecodingControlLock, LW_SHARED);
     209       34934 :     enabled = LogicalDecodingCtl->logical_decoding_enabled;
     210       34934 :     LWLockRelease(LogicalDecodingControlLock);
     211             : 
     212       34934 :     return enabled;
     213             : }
     214             : 
     215             : /*
     216             :  * Returns true if logical WAL logging is enabled based on the shared memory
     217             :  * status.
     218             :  */
     219             : bool
     220       49496 : IsXLogLogicalInfoEnabled(void)
     221             : {
     222             :     bool        xlog_logical_info;
     223             : 
     224       49496 :     LWLockAcquire(LogicalDecodingControlLock, LW_SHARED);
     225       49496 :     xlog_logical_info = LogicalDecodingCtl->xlog_logical_info;
     226       49496 :     LWLockRelease(LogicalDecodingControlLock);
     227             : 
     228       49496 :     return xlog_logical_info;
     229             : }
     230             : 
     231             : /*
     232             :  * Reset the local cache at end of the transaction.
     233             :  */
     234             : void
     235     1145718 : AtEOXact_LogicalCtl(void)
     236             : {
     237             :     /* Update the local cache if there is a pending update */
     238     1145718 :     if (XLogLogicalInfoUpdatePending)
     239             :     {
     240           0 :         update_xlog_logical_info();
     241           0 :         XLogLogicalInfoUpdatePending = false;
     242             :     }
     243     1145718 : }
     244             : 
     245             : /*
     246             :  * Writes an XLOG_LOGICAL_DECODING_STATUS_CHANGE WAL record with the given
     247             :  * status.
     248             :  */
     249             : static void
     250         162 : write_logical_decoding_status_update_record(bool status)
     251             : {
     252             :     XLogRecPtr  recptr;
     253             : 
     254         162 :     XLogBeginInsert();
     255         162 :     XLogRegisterData(&status, sizeof(bool));
     256         162 :     recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE);
     257         162 :     XLogFlush(recptr);
     258         162 : }
     259             : 
     260             : /*
     261             :  * A PG_ENSURE_ERROR_CLEANUP callback for activating logical decoding, resetting
     262             :  * the shared flags to revert the logical decoding activation process.
     263             :  */
     264             : static void
     265           2 : abort_logical_decoding_activation(int code, Datum arg)
     266             : {
     267             :     Assert(MyReplicationSlot);
     268             :     Assert(!LogicalDecodingCtl->logical_decoding_enabled);
     269             : 
     270           2 :     elog(DEBUG1, "aborting logical decoding activation process");
     271             : 
     272             :     /*
     273             :      * Abort the change to xlog_logical_info. We don't need to check
     274             :      * CheckLogicalSlotExists() as we're still holding a logical slot.
     275             :      */
     276           2 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
     277           2 :     LogicalDecodingCtl->xlog_logical_info = false;
     278           2 :     LWLockRelease(LogicalDecodingControlLock);
     279             : 
     280             :     /*
     281             :      * Some processes might have already started logical info WAL logging, so
     282             :      * tell all running processes to update their caches. We don't need to
     283             :      * wait for all processes to disable xlog_logical_info locally as it's
     284             :      * always safe to write logical information to WAL records, even when not
     285             :      * strictly required.
     286             :      */
     287           2 :     EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO);
     288           2 : }
     289             : 
     290             : /*
     291             :  * Enable logical decoding if disabled.
     292             :  *
     293             :  * If this function is called during recovery, it simply returns without
     294             :  * action since the logical decoding status change is not allowed during
     295             :  * this time. The logical decoding status depends on the status on the primary.
     296             :  * The caller should use CheckLogicalDecodingRequirements() before calling this
     297             :  * function to make sure that the logical decoding status can be modified.
     298             :  *
     299             :  * Note that there is no interlock between logical decoding activation
     300             :  * and slot creation. To ensure enabling logical decoding, the caller
     301             :  * needs to call this function after creating a logical slot before
     302             :  * initializing the logical decoding context.
     303             :  */
     304             : void
     305         954 : EnsureLogicalDecodingEnabled(void)
     306             : {
     307             :     Assert(MyReplicationSlot);
     308             :     Assert(wal_level >= WAL_LEVEL_REPLICA);
     309             : 
     310             :     /* Logical decoding is always enabled */
     311         954 :     if (wal_level >= WAL_LEVEL_LOGICAL)
     312         934 :         return;
     313             : 
     314          20 :     if (RecoveryInProgress())
     315             :     {
     316             :         /*
     317             :          * CheckLogicalDecodingRequirements() must have already errored out if
     318             :          * logical decoding is not enabled since we cannot enable the logical
     319             :          * decoding status during recovery.
     320             :          */
     321             :         Assert(IsLogicalDecodingEnabled());
     322           6 :         return;
     323             :     }
     324             : 
     325             :     /*
     326             :      * Ensure to abort the activation process in cases where there in an
     327             :      * interruption during the wait.
     328             :      */
     329          14 :     PG_ENSURE_ERROR_CLEANUP(abort_logical_decoding_activation, (Datum) 0);
     330             :     {
     331          14 :         EnableLogicalDecoding();
     332             :     }
     333          14 :     PG_END_ENSURE_ERROR_CLEANUP(abort_logical_decoding_activation, (Datum) 0);
     334             : }
     335             : 
     336             : /*
     337             :  * A workhorse function to enable logical decoding.
     338             :  */
     339             : void
     340          32 : EnableLogicalDecoding(void)
     341             : {
     342             :     bool        in_recovery;
     343             : 
     344          32 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
     345             : 
     346             :     /* Return if it is already enabled */
     347          32 :     if (LogicalDecodingCtl->logical_decoding_enabled)
     348             :     {
     349           4 :         LogicalDecodingCtl->pending_disable = false;
     350           4 :         LWLockRelease(LogicalDecodingControlLock);
     351           4 :         return;
     352             :     }
     353             : 
     354             :     /*
     355             :      * Set logical info WAL logging in shmem. All process starts after this
     356             :      * point will include the information required by logical decoding to WAL
     357             :      * records.
     358             :      */
     359          28 :     LogicalDecodingCtl->xlog_logical_info = true;
     360             : 
     361          28 :     LWLockRelease(LogicalDecodingControlLock);
     362             : 
     363             :     /*
     364             :      * Tell all running processes to reflect the xlog_logical_info update, and
     365             :      * wait. This ensures that all running processes have enabled logical
     366             :      * information WAL logging.
     367             :      */
     368          28 :     WaitForProcSignalBarrier(
     369             :                              EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO));
     370             : 
     371          28 :     INJECTION_POINT("logical-decoding-activation", NULL);
     372             : 
     373          26 :     in_recovery = RecoveryInProgress();
     374             : 
     375             :     /*
     376             :      * There could be some transactions that might have started with the old
     377             :      * status, but we don't need to wait for these transactions to complete as
     378             :      * long as they have valid XIDs. These transactions will appear in the
     379             :      * xl_running_xacts record and therefore the snapshot builder will not try
     380             :      * to decode the transaction during the logical decoding initialization.
     381             :      *
     382             :      * There is a theoretical case where a transaction decides whether to
     383             :      * include logical-info to WAL records before getting an XID. In this
     384             :      * case, the transaction won't appear in xl_running_xacts.
     385             :      *
     386             :      * For operations that do not require an XID assignment, the process
     387             :      * starts including logical-info immediately upon receiving the signal
     388             :      * (barrier). If such an operation checks the effective_wal_level multiple
     389             :      * times within a single execution, the resulting WAL records might be
     390             :      * inconsistent (i.e., logical-info is included in some records but not in
     391             :      * others). However, this is harmless because logical decoding generally
     392             :      * ignores WAL records that are not associated with an assigned XID.
     393             :      *
     394             :      * One might think we need to wait for all running transactions, including
     395             :      * those without XIDs and read-only transactions, to finish before
     396             :      * enabling logical decoding. However, such a requirement would force the
     397             :      * slot creation to wait for a potentially very long time due to
     398             :      * long-running read queries, which is practically unacceptable.
     399             :      */
     400             : 
     401          26 :     START_CRIT_SECTION();
     402             : 
     403             :     /*
     404             :      * We enable logical decoding first, followed by writing the WAL record.
     405             :      * This sequence ensures logical decoding becomes available on the primary
     406             :      * first.
     407             :      */
     408          26 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
     409             : 
     410          26 :     LogicalDecodingCtl->logical_decoding_enabled = true;
     411             : 
     412          26 :     if (!in_recovery)
     413           8 :         write_logical_decoding_status_update_record(true);
     414             : 
     415          26 :     LogicalDecodingCtl->pending_disable = false;
     416             : 
     417          26 :     LWLockRelease(LogicalDecodingControlLock);
     418             : 
     419          26 :     END_CRIT_SECTION();
     420             : 
     421          26 :     if (!in_recovery)
     422           8 :         ereport(LOG,
     423             :                 errmsg("logical decoding is enabled upon creating a new logical replication slot"));
     424             : }
     425             : 
     426             : /*
     427             :  * Initiate a request for disabling logical decoding.
     428             :  *
     429             :  * Note that this function does not verify whether logical slots exist. The
     430             :  * checkpointer will verify if logical decoding should actually be disabled.
     431             :  */
     432             : void
     433         826 : RequestDisableLogicalDecoding(void)
     434             : {
     435         826 :     if (wal_level != WAL_LEVEL_REPLICA)
     436         804 :         return;
     437             : 
     438             :     /*
     439             :      * It's possible that we might not actually need to disable logical
     440             :      * decoding if someone creates a new logical slot concurrently. We set the
     441             :      * flag anyway and the checkpointer will check it and disable logical
     442             :      * decoding if necessary.
     443             :      */
     444          22 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
     445          22 :     LogicalDecodingCtl->pending_disable = true;
     446          22 :     LWLockRelease(LogicalDecodingControlLock);
     447             : 
     448          22 :     WakeupCheckpointer();
     449             : 
     450          22 :     elog(DEBUG1, "requested disabling logical decoding");
     451             : }
     452             : 
     453             : /*
     454             :  * Disable logical decoding if necessary.
     455             :  *
     456             :  * This function disables logical decoding upon a request initiated by
     457             :  * RequestDisableLogicalDecoding(). Otherwise, it performs no action.
     458             :  */
     459             : void
     460        8776 : DisableLogicalDecodingIfNecessary(void)
     461             : {
     462             :     bool        pending_disable;
     463             : 
     464        8776 :     if (wal_level != WAL_LEVEL_REPLICA)
     465        2290 :         return;
     466             : 
     467             :     /*
     468             :      * Sanity check as we cannot disable logical decoding while holding a
     469             :      * logical slot.
     470             :      */
     471             :     Assert(!MyReplicationSlot);
     472             : 
     473        6486 :     if (RecoveryInProgress())
     474        2982 :         return;
     475             : 
     476        3504 :     LWLockAcquire(LogicalDecodingControlLock, LW_SHARED);
     477        3504 :     pending_disable = LogicalDecodingCtl->pending_disable;
     478        3504 :     LWLockRelease(LogicalDecodingControlLock);
     479             : 
     480             :     /* Quick return if no pending disable request */
     481        3504 :     if (!pending_disable)
     482        3484 :         return;
     483             : 
     484          20 :     DisableLogicalDecoding();
     485             : }
     486             : 
     487             : /*
     488             :  * A workhorse function to disable logical decoding.
     489             :  */
     490             : void
     491          38 : DisableLogicalDecoding(void)
     492             : {
     493          38 :     bool        in_recovery = RecoveryInProgress();
     494             : 
     495          38 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
     496             : 
     497             :     /*
     498             :      * Check if we can disable logical decoding.
     499             :      *
     500             :      * Skip CheckLogicalSlotExists() check during recovery because the
     501             :      * existing slots will be invalidated after disabling logical decoding.
     502             :      */
     503          38 :     if (!LogicalDecodingCtl->logical_decoding_enabled ||
     504          32 :         (!in_recovery && CheckLogicalSlotExists()))
     505             :     {
     506           8 :         LogicalDecodingCtl->pending_disable = false;
     507           8 :         LWLockRelease(LogicalDecodingControlLock);
     508           8 :         return;
     509             :     }
     510             : 
     511          30 :     START_CRIT_SECTION();
     512             : 
     513             :     /*
     514             :      * We need to disable logical decoding first and then disable logical
     515             :      * information WAL logging in order to ensure that no logical decoding
     516             :      * processes WAL records with insufficient information.
     517             :      */
     518          30 :     LogicalDecodingCtl->logical_decoding_enabled = false;
     519             : 
     520             :     /* Write the WAL to disable logical decoding on standbys too */
     521          30 :     if (!in_recovery)
     522          12 :         write_logical_decoding_status_update_record(false);
     523             : 
     524             :     /* Now disable logical information WAL logging */
     525          30 :     LogicalDecodingCtl->xlog_logical_info = false;
     526          30 :     LogicalDecodingCtl->pending_disable = false;
     527             : 
     528          30 :     END_CRIT_SECTION();
     529             : 
     530          30 :     if (!in_recovery)
     531          12 :         ereport(LOG,
     532             :                 errmsg("logical decoding is disabled because there are no valid logical replication slots"));
     533             : 
     534          30 :     LWLockRelease(LogicalDecodingControlLock);
     535             : 
     536             :     /*
     537             :      * Tell all running processes to reflect the xlog_logical_info update.
     538             :      * Unlike when enabling logical decoding, we don't need to wait for all
     539             :      * processes to complete it in this case. We already disabled logical
     540             :      * decoding and it's always safe to write logical information to WAL
     541             :      * records, even when not strictly required. Therefore, we don't need to
     542             :      * wait for all running transactions to finish either.
     543             :      */
     544          30 :     EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO);
     545             : }
     546             : 
     547             : /*
     548             :  * Updates the logical decoding status at end of recovery, and ensures that
     549             :  * all running processes have the updated XLogLogicalInfo status. This
     550             :  * function must be called before accepting writes.
     551             :  */
     552             : void
     553        1852 : UpdateLogicalDecodingStatusEndOfRecovery(void)
     554             : {
     555        1852 :     bool        new_status = false;
     556             : 
     557             :     Assert(RecoveryInProgress());
     558             : 
     559             :     /*
     560             :      * With 'minimal' WAL level, there are no logical replication slots during
     561             :      * recovery. Logical decoding is always disabled, so there is no need to
     562             :      * synchronize XLogLogicalInfo.
     563             :      */
     564        1852 :     if (wal_level == WAL_LEVEL_MINIMAL)
     565             :     {
     566             :         Assert(!IsXLogLogicalInfoEnabled() && !IsLogicalDecodingEnabled());
     567         688 :         return;
     568             :     }
     569             : 
     570        1164 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
     571             : 
     572        1164 :     if (wal_level == WAL_LEVEL_LOGICAL || CheckLogicalSlotExists())
     573         258 :         new_status = true;
     574             : 
     575             :     /*
     576             :      * When recovery ends, we need to either enable or disable logical
     577             :      * decoding based on the wal_level setting and the presence of logical
     578             :      * slots. We need to note that concurrent slot creation and deletion could
     579             :      * happen but WAL writes are still not permitted until recovery fully
     580             :      * completes. Here's how we handle concurrent toggling of logical
     581             :      * decoding:
     582             :      *
     583             :      * For 'enable' case, if there's a concurrent disable request before
     584             :      * recovery fully completes, the checkpointer will handle it after
     585             :      * recovery is done. This means there might be a brief period after
     586             :      * recovery where logical decoding remains enabled even with no logical
     587             :      * replication slots present. This temporary state is not new - it can
     588             :      * already occur due to the checkpointer's asynchronous deactivation
     589             :      * process.
     590             :      *
     591             :      * For 'disable' case, backend cannot create logical replication slots
     592             :      * during recovery (see checks in CheckLogicalDecodingRequirements()),
     593             :      * which prevents a race condition between disabling logical decoding and
     594             :      * concurrent slot creation.
     595             :      */
     596        1164 :     if (new_status != LogicalDecodingCtl->logical_decoding_enabled)
     597             :     {
     598             :         /*
     599             :          * Update both the logical decoding status and logical WAL logging
     600             :          * status. Unlike toggling these status during non-recovery, we don't
     601             :          * need to worry about the operation order as WAL writes are still not
     602             :          * permitted.
     603             :          */
     604         142 :         LogicalDecodingCtl->xlog_logical_info = new_status;
     605         142 :         LogicalDecodingCtl->logical_decoding_enabled = new_status;
     606             : 
     607         142 :         elog(DEBUG1,
     608             :              "update logical decoding status to %d at the end of recovery",
     609             :              new_status);
     610             : 
     611             :         /*
     612             :          * Now that we updated the logical decoding status, clear the pending
     613             :          * disable flag. It's possible that a concurrent process drops the
     614             :          * last logical slot and initiates the pending disable again. The
     615             :          * checkpointer process will check it.
     616             :          */
     617         142 :         LogicalDecodingCtl->pending_disable = false;
     618             : 
     619         142 :         LWLockRelease(LogicalDecodingControlLock);
     620             : 
     621         142 :         write_logical_decoding_status_update_record(new_status);
     622             :     }
     623             :     else
     624        1022 :         LWLockRelease(LogicalDecodingControlLock);
     625             : 
     626             :     /*
     627             :      * Ensure all running processes have the updated status. We don't need to
     628             :      * wait for running transactions to finish as we don't accept any writes
     629             :      * yet. On the other hand, we need to wait for synchronizing
     630             :      * XLogLogicalInfo even if we've not updated the status above as the
     631             :      * status have been turned on and off during recovery, having running
     632             :      * processes have different status on their local caches.
     633             :      */
     634        1164 :     if (IsUnderPostmaster)
     635         922 :         WaitForProcSignalBarrier(
     636             :                                  EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO));
     637             : 
     638        1164 :     INJECTION_POINT("startup-logical-decoding-status-change-end-of-recovery", NULL);
     639             : }

Generated by: LCOV version 1.16