LCOV - code coverage report
Current view: top level - src/backend/replication/logical - logicalctl.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 97.7 % 132 129
Test Date: 2026-04-06 10:16:19 Functions: 100.0 % 16 16
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  * logicalctl.c
       3              :  *      Functionality to control logical decoding status online.
       4              :  *
       5              :  * This module enables dynamic control of logical decoding availability.
       6              :  * Logical decoding becomes active under two conditions: when the wal_level
       7              :  * parameter is set to 'logical', or when at least one valid logical replication
       8              :  * slot exists with wal_level set to 'replica'. The system disables logical
       9              :  * decoding when neither condition is met. Therefore, the dynamic control
      10              :  * of logical decoding availability is required only when wal_level is set
      11              :  * to 'replica'. Logical decoding is always enabled when wal_level='logical'
      12              :  * and always disabled when wal_level='minimal'.
      13              :  *
      14              :  * The core concept of dynamically enabling and disabling logical decoding
      15              :  * is to separately control two aspects: writing information required for
      16              :  * logical decoding to WAL records, and using logical decoding itself. During
      17              :  * activation, we first enable logical WAL writing while keeping logical
      18              :  * decoding disabled. This change is reflected in the read-only
      19              :  * effective_wal_level GUC parameter. Once we ensure that all processes have
      20              :  * updated to the latest effective_wal_level value, we then enable logical
      21              :  * decoding. Deactivation follows a similar careful, multi-step process
      22              :  * in reverse order.
      23              :  *
      24              :  * While activation occurs synchronously right after creating the first
      25              :  * logical slot, deactivation happens asynchronously through the checkpointer
      26              :  * process. This design avoids a race condition at the end of recovery; see
      27              :  * the comments in UpdateLogicalDecodingStatusEndOfRecovery() for details.
      28              :  * Asynchronous deactivation also avoids excessive toggling of the logical
      29              :  * decoding status in workloads that repeatedly create and drop a single
      30              :  * logical slot. On the other hand, this lazy approach can delay changes
      31              :  * to effective_wal_level and the disabling logical decoding, especially
      32              :  * when the checkpointer is busy with other tasks. We chose this lazy approach
      33              :  * in all deactivation paths to keep the implementation simple, even though
      34              :  * laziness is strictly required only for end-of-recovery cases. Future work
      35              :  * might address this limitation either by using a dedicated worker instead
      36              :  * of the checkpointer, or by implementing synchronous waiting during slot
      37              :  * drops if workloads are significantly affected by the lazy deactivation
      38              :  * of logical decoding.
      39              :  *
      40              :  * Standby servers use the primary server's effective_wal_level and logical
      41              :  * decoding status. Unlike normal activation and deactivation, these
      42              :  * are updated simultaneously without status change coordination, solely by
      43              :  * replaying XLOG_LOGICAL_DECODING_STATUS_CHANGE records. The local wal_level
      44              :  * setting has no effect during this time. Upon promotion, we update the
      45              :  * logical decoding status based on local conditions: the wal_level value and
      46              :  * the presence of logical slots.
      47              :  *
      48              :  * In the future, we could extend support to include automatic transitions
      49              :  * of effective_wal_level between 'minimal' and 'logical' WAL levels. However,
      50              :  * this enhancement would require additional coordination mechanisms and
      51              :  * careful implementation of operations such as terminating walsenders and
      52              :  * archiver processes while carefully considering the sequence of operations
      53              :  * to ensure system stability during these transitions.
      54              :  *
      55              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      56              :  * Portions Copyright (c) 1994, Regents of the University of California
      57              :  *
      58              :  * IDENTIFICATION
      59              :  *    src/backend/replication/logical/logicalctl.c
      60              :  *
      61              :  *-------------------------------------------------------------------------
      62              :  */
      63              : 
      64              : #include "postgres.h"
      65              : 
      66              : #include "access/xloginsert.h"
      67              : #include "catalog/pg_control.h"
      68              : #include "miscadmin.h"
      69              : #include "replication/slot.h"
      70              : #include "storage/ipc.h"
      71              : #include "storage/lmgr.h"
      72              : #include "storage/proc.h"
      73              : #include "storage/procarray.h"
      74              : #include "storage/procsignal.h"
      75              : #include "storage/subsystems.h"
      76              : #include "utils/injection_point.h"
      77              : 
      78              : /*
      79              :  * Struct for controlling the logical decoding status.
      80              :  *
      81              :  * This struct is protected by LogicalDecodingControlLock.
      82              :  */
      83              : typedef struct LogicalDecodingCtlData
      84              : {
      85              :     /*
      86              :      * This is the authoritative value used by all processes to determine
      87              :      * whether to write additional information required by logical decoding to
      88              :      * WAL. Since this information could be checked frequently, each process
      89              :      * caches this value in XLogLogicalInfo for better performance.
      90              :      */
      91              :     bool        xlog_logical_info;
      92              : 
      93              :     /* True if logical decoding is available in the system */
      94              :     bool        logical_decoding_enabled;
      95              : 
      96              :     /* True if logical decoding might need to be disabled */
      97              :     bool        pending_disable;
      98              : } LogicalDecodingCtlData;
      99              : 
     100              : static LogicalDecodingCtlData *LogicalDecodingCtl = NULL;
     101              : 
     102              : static void LogicalDecodingCtlShmemRequest(void *arg);
     103              : 
     104              : const ShmemCallbacks LogicalDecodingCtlShmemCallbacks = {
     105              :     .request_fn = LogicalDecodingCtlShmemRequest,
     106              : };
     107              : 
     108              : /*
     109              :  * A process-local cache of LogicalDecodingCtl->xlog_logical_info. This is
     110              :  * initialized at process startup, and updated when processing the process
     111              :  * barrier signal in ProcessBarrierUpdateXLogLogicalInfo(). If the process
     112              :  * is in an XID-assigned transaction, the cache update is delayed until the
     113              :  * transaction ends. See the comments for XLogLogicalInfoUpdatePending for details.
     114              :  */
     115              : bool        XLogLogicalInfo = false;
     116              : 
     117              : /*
     118              :  * When receiving the PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO signal, if
     119              :  * an XID is assigned to the current transaction, the process sets this flag and
     120              :  * delays the XLogLogicalInfo update until the transaction ends. This ensures
     121              :  * that the XLogLogicalInfo value (typically accessed via XLogLogicalInfoActive)
     122              :  * remains consistent throughout the transaction.
     123              :  */
     124              : static bool XLogLogicalInfoUpdatePending = false;
     125              : 
     126              : static void update_xlog_logical_info(void);
     127              : static void abort_logical_decoding_activation(int code, Datum arg);
     128              : static void write_logical_decoding_status_update_record(bool status);
     129              : 
     130              : static void
     131         1232 : LogicalDecodingCtlShmemRequest(void *arg)
     132              : {
     133         1232 :     ShmemRequestStruct(.name = "Logical decoding control",
     134              :                        .size = sizeof(LogicalDecodingCtlData),
     135              :                        .ptr = (void **) &LogicalDecodingCtl,
     136              :         );
     137         1232 : }
     138              : 
     139              : /*
     140              :  * Initialize the logical decoding status in shmem at server startup. This
     141              :  * must be called ONCE during postmaster or standalone-backend startup.
     142              :  */
     143              : void
     144         1068 : StartupLogicalDecodingStatus(bool last_status)
     145              : {
     146              :     /* Logical decoding is always disabled when 'minimal' WAL level */
     147         1068 :     if (wal_level == WAL_LEVEL_MINIMAL)
     148          396 :         return;
     149              : 
     150              :     /*
     151              :      * Set the initial logical decoding status based on the last status. If
     152              :      * logical decoding was enabled before the last shutdown, it remains
     153              :      * enabled as we might have set wal_level='logical' or have at least one
     154              :      * logical slot.
     155              :      */
     156          672 :     LogicalDecodingCtl->xlog_logical_info = last_status;
     157          672 :     LogicalDecodingCtl->logical_decoding_enabled = last_status;
     158              : }
     159              : 
     160              : /*
     161              :  * Update the XLogLogicalInfo cache.
     162              :  */
     163              : static inline void
     164        26678 : update_xlog_logical_info(void)
     165              : {
     166        26678 :     XLogLogicalInfo = IsXLogLogicalInfoEnabled();
     167        26678 : }
     168              : 
     169              : /*
     170              :  * Initialize XLogLogicalInfo backend-private cache. This routine is called
     171              :  * during process initialization.
     172              :  */
     173              : void
     174        24774 : InitializeProcessXLogLogicalInfo(void)
     175              : {
     176        24774 :     update_xlog_logical_info();
     177        24774 : }
     178              : 
     179              : /*
     180              :  * This routine is called when we are told to update XLogLogicalInfo
     181              :  * by a ProcSignalBarrier.
     182              :  */
     183              : bool
     184         1904 : ProcessBarrierUpdateXLogLogicalInfo(void)
     185              : {
     186         1904 :     if (GetTopTransactionIdIfAny() != InvalidTransactionId)
     187              :     {
     188              :         /* Delay updating XLogLogicalInfo until the transaction end */
     189            0 :         XLogLogicalInfoUpdatePending = true;
     190              :     }
     191              :     else
     192         1904 :         update_xlog_logical_info();
     193              : 
     194         1904 :     return true;
     195              : }
     196              : 
     197              : /*
     198              :  * Check the shared memory state and return true if logical decoding is
     199              :  * enabled on the system.
     200              :  */
     201              : bool
     202        19756 : IsLogicalDecodingEnabled(void)
     203              : {
     204              :     bool        enabled;
     205              : 
     206        19756 :     LWLockAcquire(LogicalDecodingControlLock, LW_SHARED);
     207        19756 :     enabled = LogicalDecodingCtl->logical_decoding_enabled;
     208        19756 :     LWLockRelease(LogicalDecodingControlLock);
     209              : 
     210        19756 :     return enabled;
     211              : }
     212              : 
     213              : /*
     214              :  * Returns true if logical WAL logging is enabled based on the shared memory
     215              :  * status.
     216              :  */
     217              : bool
     218        26705 : IsXLogLogicalInfoEnabled(void)
     219              : {
     220              :     bool        xlog_logical_info;
     221              : 
     222        26705 :     LWLockAcquire(LogicalDecodingControlLock, LW_SHARED);
     223        26705 :     xlog_logical_info = LogicalDecodingCtl->xlog_logical_info;
     224        26705 :     LWLockRelease(LogicalDecodingControlLock);
     225              : 
     226        26705 :     return xlog_logical_info;
     227              : }
     228              : 
     229              : /*
     230              :  * Reset the local cache at end of the transaction.
     231              :  */
     232              : void
     233       635886 : AtEOXact_LogicalCtl(void)
     234              : {
     235              :     /* Update the local cache if there is a pending update */
     236       635886 :     if (XLogLogicalInfoUpdatePending)
     237              :     {
     238            0 :         update_xlog_logical_info();
     239            0 :         XLogLogicalInfoUpdatePending = false;
     240              :     }
     241       635886 : }
     242              : 
     243              : /*
     244              :  * Writes an XLOG_LOGICAL_DECODING_STATUS_CHANGE WAL record with the given
     245              :  * status.
     246              :  */
     247              : static void
     248           84 : write_logical_decoding_status_update_record(bool status)
     249              : {
     250              :     XLogRecPtr  recptr;
     251              : 
     252           84 :     XLogBeginInsert();
     253           84 :     XLogRegisterData(&status, sizeof(bool));
     254           84 :     recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE);
     255           84 :     XLogFlush(recptr);
     256           84 : }
     257              : 
     258              : /*
     259              :  * A PG_ENSURE_ERROR_CLEANUP callback for activating logical decoding, resetting
     260              :  * the shared flags to revert the logical decoding activation process.
     261              :  */
     262              : static void
     263            1 : abort_logical_decoding_activation(int code, Datum arg)
     264              : {
     265              :     Assert(MyReplicationSlot);
     266              :     Assert(!LogicalDecodingCtl->logical_decoding_enabled);
     267              : 
     268            1 :     elog(DEBUG1, "aborting logical decoding activation process");
     269              : 
     270              :     /*
     271              :      * Abort the change to xlog_logical_info. We don't need to check
     272              :      * CheckLogicalSlotExists() as we're still holding a logical slot.
     273              :      */
     274            1 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
     275            1 :     LogicalDecodingCtl->xlog_logical_info = false;
     276            1 :     LWLockRelease(LogicalDecodingControlLock);
     277              : 
     278              :     /*
     279              :      * Some processes might have already started logical info WAL logging, so
     280              :      * tell all running processes to update their caches. We don't need to
     281              :      * wait for all processes to disable xlog_logical_info locally as it's
     282              :      * always safe to write logical information to WAL records, even when not
     283              :      * strictly required.
     284              :      */
     285            1 :     EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO);
     286            1 : }
     287              : 
     288              : /*
     289              :  * Enable logical decoding if disabled.
     290              :  *
     291              :  * If this function is called during recovery, it simply returns without
     292              :  * action since the logical decoding status change is not allowed during
     293              :  * this time. The logical decoding status depends on the status on the primary.
     294              :  * The caller should use CheckLogicalDecodingRequirements() before calling this
     295              :  * function to make sure that the logical decoding status can be modified.
     296              :  *
     297              :  * Note that there is no interlock between logical decoding activation
     298              :  * and slot creation. To ensure enabling logical decoding, the caller
     299              :  * needs to call this function after creating a logical slot before
     300              :  * initializing the logical decoding context.
     301              :  */
     302              : void
     303          500 : EnsureLogicalDecodingEnabled(void)
     304              : {
     305              :     Assert(MyReplicationSlot);
     306              :     Assert(wal_level >= WAL_LEVEL_REPLICA);
     307              : 
     308              :     /* Logical decoding is always enabled */
     309          500 :     if (wal_level >= WAL_LEVEL_LOGICAL)
     310          490 :         return;
     311              : 
     312           10 :     if (RecoveryInProgress())
     313              :     {
     314              :         /*
     315              :          * CheckLogicalDecodingRequirements() must have already errored out if
     316              :          * logical decoding is not enabled since we cannot enable the logical
     317              :          * decoding status during recovery.
     318              :          */
     319              :         Assert(IsLogicalDecodingEnabled());
     320            3 :         return;
     321              :     }
     322              : 
     323              :     /*
     324              :      * Ensure to abort the activation process in cases where there in an
     325              :      * interruption during the wait.
     326              :      */
     327            7 :     PG_ENSURE_ERROR_CLEANUP(abort_logical_decoding_activation, (Datum) 0);
     328              :     {
     329            7 :         EnableLogicalDecoding();
     330              :     }
     331            7 :     PG_END_ENSURE_ERROR_CLEANUP(abort_logical_decoding_activation, (Datum) 0);
     332              : }
     333              : 
     334              : /*
     335              :  * A workhorse function to enable logical decoding.
     336              :  */
     337              : void
     338           16 : EnableLogicalDecoding(void)
     339              : {
     340              :     bool        in_recovery;
     341              : 
     342           16 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
     343              : 
     344              :     /* Return if it is already enabled */
     345           16 :     if (LogicalDecodingCtl->logical_decoding_enabled)
     346              :     {
     347            2 :         LogicalDecodingCtl->pending_disable = false;
     348            2 :         LWLockRelease(LogicalDecodingControlLock);
     349            2 :         return;
     350              :     }
     351              : 
     352              :     /*
     353              :      * Set logical info WAL logging in shmem. All process starts after this
     354              :      * point will include the information required by logical decoding to WAL
     355              :      * records.
     356              :      */
     357           14 :     LogicalDecodingCtl->xlog_logical_info = true;
     358              : 
     359           14 :     LWLockRelease(LogicalDecodingControlLock);
     360              : 
     361              :     /*
     362              :      * Tell all running processes to reflect the xlog_logical_info update, and
     363              :      * wait. This ensures that all running processes have enabled logical
     364              :      * information WAL logging.
     365              :      */
     366           14 :     WaitForProcSignalBarrier(
     367              :                              EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO));
     368              : 
     369           14 :     INJECTION_POINT("logical-decoding-activation", NULL);
     370              : 
     371           13 :     in_recovery = RecoveryInProgress();
     372              : 
     373              :     /*
     374              :      * There could be some transactions that might have started with the old
     375              :      * status, but we don't need to wait for these transactions to complete as
     376              :      * long as they have valid XIDs. These transactions will appear in the
     377              :      * xl_running_xacts record and therefore the snapshot builder will not try
     378              :      * to decode the transaction during the logical decoding initialization.
     379              :      *
     380              :      * There is a theoretical case where a transaction decides whether to
     381              :      * include logical-info to WAL records before getting an XID. In this
     382              :      * case, the transaction won't appear in xl_running_xacts.
     383              :      *
     384              :      * For operations that do not require an XID assignment, the process
     385              :      * starts including logical-info immediately upon receiving the signal
     386              :      * (barrier). If such an operation checks the effective_wal_level multiple
     387              :      * times within a single execution, the resulting WAL records might be
     388              :      * inconsistent (i.e., logical-info is included in some records but not in
     389              :      * others). However, this is harmless because logical decoding generally
     390              :      * ignores WAL records that are not associated with an assigned XID.
     391              :      *
     392              :      * One might think we need to wait for all running transactions, including
     393              :      * those without XIDs and read-only transactions, to finish before
     394              :      * enabling logical decoding. However, such a requirement would force the
     395              :      * slot creation to wait for a potentially very long time due to
     396              :      * long-running read queries, which is practically unacceptable.
     397              :      */
     398              : 
     399           13 :     START_CRIT_SECTION();
     400              : 
     401              :     /*
     402              :      * We enable logical decoding first, followed by writing the WAL record.
     403              :      * This sequence ensures logical decoding becomes available on the primary
     404              :      * first.
     405              :      */
     406           13 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
     407              : 
     408           13 :     LogicalDecodingCtl->logical_decoding_enabled = true;
     409              : 
     410           13 :     if (!in_recovery)
     411            4 :         write_logical_decoding_status_update_record(true);
     412              : 
     413           13 :     LogicalDecodingCtl->pending_disable = false;
     414              : 
     415           13 :     LWLockRelease(LogicalDecodingControlLock);
     416              : 
     417           13 :     END_CRIT_SECTION();
     418              : 
     419           13 :     if (!in_recovery)
     420            4 :         ereport(LOG,
     421              :                 errmsg("logical decoding is enabled upon creating a new logical replication slot"));
     422              : }
     423              : 
     424              : /*
     425              :  * Initiate a request for disabling logical decoding.
     426              :  *
     427              :  * Note that this function does not verify whether logical slots exist. The
     428              :  * checkpointer will verify if logical decoding should actually be disabled.
     429              :  */
     430              : void
     431          432 : RequestDisableLogicalDecoding(void)
     432              : {
     433          432 :     if (wal_level != WAL_LEVEL_REPLICA)
     434          421 :         return;
     435              : 
     436              :     /*
     437              :      * It's possible that we might not actually need to disable logical
     438              :      * decoding if someone creates a new logical slot concurrently. We set the
     439              :      * flag anyway and the checkpointer will check it and disable logical
     440              :      * decoding if necessary.
     441              :      */
     442           11 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
     443           11 :     LogicalDecodingCtl->pending_disable = true;
     444           11 :     LWLockRelease(LogicalDecodingControlLock);
     445              : 
     446           11 :     WakeupCheckpointer();
     447              : 
     448           11 :     elog(DEBUG1, "requested disabling logical decoding");
     449              : }
     450              : 
     451              : /*
     452              :  * Disable logical decoding if necessary.
     453              :  *
     454              :  * This function disables logical decoding upon a request initiated by
     455              :  * RequestDisableLogicalDecoding(). Otherwise, it performs no action.
     456              :  */
     457              : void
     458         5516 : DisableLogicalDecodingIfNecessary(void)
     459              : {
     460              :     bool        pending_disable;
     461              : 
     462         5516 :     if (wal_level != WAL_LEVEL_REPLICA)
     463         1251 :         return;
     464              : 
     465              :     /*
     466              :      * Sanity check as we cannot disable logical decoding while holding a
     467              :      * logical slot.
     468              :      */
     469              :     Assert(!MyReplicationSlot);
     470              : 
     471         4265 :     if (RecoveryInProgress())
     472         1696 :         return;
     473              : 
     474         2569 :     LWLockAcquire(LogicalDecodingControlLock, LW_SHARED);
     475         2569 :     pending_disable = LogicalDecodingCtl->pending_disable;
     476         2569 :     LWLockRelease(LogicalDecodingControlLock);
     477              : 
     478              :     /* Quick return if no pending disable request */
     479         2569 :     if (!pending_disable)
     480         2559 :         return;
     481              : 
     482           10 :     DisableLogicalDecoding();
     483              : }
     484              : 
     485              : /*
     486              :  * A workhorse function to disable logical decoding.
     487              :  */
     488              : void
     489           19 : DisableLogicalDecoding(void)
     490              : {
     491           19 :     bool        in_recovery = RecoveryInProgress();
     492              : 
     493           19 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
     494              : 
     495              :     /*
     496              :      * Check if we can disable logical decoding.
     497              :      *
     498              :      * Skip CheckLogicalSlotExists() check during recovery because the
     499              :      * existing slots will be invalidated after disabling logical decoding.
     500              :      */
     501           19 :     if (!LogicalDecodingCtl->logical_decoding_enabled ||
     502           16 :         (!in_recovery && CheckLogicalSlotExists()))
     503              :     {
     504            4 :         LogicalDecodingCtl->pending_disable = false;
     505            4 :         LWLockRelease(LogicalDecodingControlLock);
     506            4 :         return;
     507              :     }
     508              : 
     509           15 :     START_CRIT_SECTION();
     510              : 
     511              :     /*
     512              :      * We need to disable logical decoding first and then disable logical
     513              :      * information WAL logging in order to ensure that no logical decoding
     514              :      * processes WAL records with insufficient information.
     515              :      */
     516           15 :     LogicalDecodingCtl->logical_decoding_enabled = false;
     517              : 
     518              :     /* Write the WAL to disable logical decoding on standbys too */
     519           15 :     if (!in_recovery)
     520            6 :         write_logical_decoding_status_update_record(false);
     521              : 
     522              :     /* Now disable logical information WAL logging */
     523           15 :     LogicalDecodingCtl->xlog_logical_info = false;
     524           15 :     LogicalDecodingCtl->pending_disable = false;
     525              : 
     526           15 :     END_CRIT_SECTION();
     527              : 
     528           15 :     if (!in_recovery)
     529            6 :         ereport(LOG,
     530              :                 errmsg("logical decoding is disabled because there are no valid logical replication slots"));
     531              : 
     532           15 :     LWLockRelease(LogicalDecodingControlLock);
     533              : 
     534              :     /*
     535              :      * Tell all running processes to reflect the xlog_logical_info update.
     536              :      * Unlike when enabling logical decoding, we don't need to wait for all
     537              :      * processes to complete it in this case. We already disabled logical
     538              :      * decoding and it's always safe to write logical information to WAL
     539              :      * records, even when not strictly required. Therefore, we don't need to
     540              :      * wait for all running transactions to finish either.
     541              :      */
     542           15 :     EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO);
     543              : }
     544              : 
     545              : /*
     546              :  * Updates the logical decoding status at end of recovery, and ensures that
     547              :  * all running processes have the updated XLogLogicalInfo status. This
     548              :  * function must be called before accepting writes.
     549              :  */
     550              : void
     551         1005 : UpdateLogicalDecodingStatusEndOfRecovery(void)
     552              : {
     553         1005 :     bool        new_status = false;
     554              : 
     555              :     Assert(RecoveryInProgress());
     556              : 
     557              :     /*
     558              :      * With 'minimal' WAL level, there are no logical replication slots during
     559              :      * recovery. Logical decoding is always disabled, so there is no need to
     560              :      * synchronize XLogLogicalInfo.
     561              :      */
     562         1005 :     if (wal_level == WAL_LEVEL_MINIMAL)
     563              :     {
     564              :         Assert(!IsXLogLogicalInfoEnabled() && !IsLogicalDecodingEnabled());
     565          396 :         return;
     566              :     }
     567              : 
     568          609 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
     569              : 
     570          609 :     if (wal_level == WAL_LEVEL_LOGICAL || CheckLogicalSlotExists())
     571          134 :         new_status = true;
     572              : 
     573              :     /*
     574              :      * When recovery ends, we need to either enable or disable logical
     575              :      * decoding based on the wal_level setting and the presence of logical
     576              :      * slots. We need to note that concurrent slot creation and deletion could
     577              :      * happen but WAL writes are still not permitted until recovery fully
     578              :      * completes. Here's how we handle concurrent toggling of logical
     579              :      * decoding:
     580              :      *
     581              :      * For 'enable' case, if there's a concurrent disable request before
     582              :      * recovery fully completes, the checkpointer will handle it after
     583              :      * recovery is done. This means there might be a brief period after
     584              :      * recovery where logical decoding remains enabled even with no logical
     585              :      * replication slots present. This temporary state is not new - it can
     586              :      * already occur due to the checkpointer's asynchronous deactivation
     587              :      * process.
     588              :      *
     589              :      * For 'disable' case, backend cannot create logical replication slots
     590              :      * during recovery (see checks in CheckLogicalDecodingRequirements()),
     591              :      * which prevents a race condition between disabling logical decoding and
     592              :      * concurrent slot creation.
     593              :      */
     594          609 :     if (new_status != LogicalDecodingCtl->logical_decoding_enabled)
     595              :     {
     596              :         /*
     597              :          * Update both the logical decoding status and logical WAL logging
     598              :          * status. Unlike toggling these status during non-recovery, we don't
     599              :          * need to worry about the operation order as WAL writes are still not
     600              :          * permitted.
     601              :          */
     602           74 :         LogicalDecodingCtl->xlog_logical_info = new_status;
     603           74 :         LogicalDecodingCtl->logical_decoding_enabled = new_status;
     604              : 
     605           74 :         elog(DEBUG1,
     606              :              "update logical decoding status to %d at the end of recovery",
     607              :              new_status);
     608              : 
     609              :         /*
     610              :          * Now that we updated the logical decoding status, clear the pending
     611              :          * disable flag. It's possible that a concurrent process drops the
     612              :          * last logical slot and initiates the pending disable again. The
     613              :          * checkpointer process will check it.
     614              :          */
     615           74 :         LogicalDecodingCtl->pending_disable = false;
     616              : 
     617           74 :         LWLockRelease(LogicalDecodingControlLock);
     618              : 
     619           74 :         write_logical_decoding_status_update_record(new_status);
     620              :     }
     621              :     else
     622          535 :         LWLockRelease(LogicalDecodingControlLock);
     623              : 
     624              :     /*
     625              :      * Ensure all running processes have the updated status. We don't need to
     626              :      * wait for running transactions to finish as we don't accept any writes
     627              :      * yet. On the other hand, we need to wait for synchronizing
     628              :      * XLogLogicalInfo even if we've not updated the status above as the
     629              :      * status have been turned on and off during recovery, having running
     630              :      * processes have different status on their local caches.
     631              :      */
     632          609 :     if (IsUnderPostmaster)
     633          476 :         WaitForProcSignalBarrier(
     634              :                                  EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO));
     635              : 
     636          609 :     INJECTION_POINT("startup-logical-decoding-status-change-end-of-recovery", NULL);
     637              : }
        

Generated by: LCOV version 2.0-1