LCOV - code coverage report
Current view: top level - src/backend/replication/logical - logicalctl.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19beta1 Lines: 100.0 % 131 131
Test Date: 2026-06-26 23:16:30 Functions: 100.0 % 16 16
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  * logicalctl.c
       3              :  *      Functionality to control logical decoding status online.
       4              :  *
       5              :  * This module enables dynamic control of logical decoding availability.
       6              :  * Logical decoding becomes active under two conditions: when the wal_level
       7              :  * parameter is set to 'logical', or when at least one valid logical replication
       8              :  * slot exists with wal_level set to 'replica'. The system disables logical
       9              :  * decoding when neither condition is met. Therefore, the dynamic control
      10              :  * of logical decoding availability is required only when wal_level is set
      11              :  * to 'replica'. Logical decoding is always enabled when wal_level='logical'
      12              :  * and always disabled when wal_level='minimal'.
      13              :  *
      14              :  * The core concept of dynamically enabling and disabling logical decoding
      15              :  * is to separately control two aspects: writing information required for
      16              :  * logical decoding to WAL records, and using logical decoding itself. During
      17              :  * activation, we first enable logical WAL writing while keeping logical
      18              :  * decoding disabled. This change is reflected in the read-only
      19              :  * effective_wal_level GUC parameter. Once we ensure that all processes have
      20              :  * updated to the latest effective_wal_level value, we then enable logical
      21              :  * decoding. Deactivation follows a similar careful, multi-step process
      22              :  * in reverse order.
      23              :  *
      24              :  * While activation occurs synchronously right after creating the first
      25              :  * logical slot, deactivation happens asynchronously through the checkpointer
      26              :  * process. This design avoids a race condition at the end of recovery; see
      27              :  * the comments in UpdateLogicalDecodingStatusEndOfRecovery() for details.
      28              :  * Asynchronous deactivation also avoids excessive toggling of the logical
      29              :  * decoding status in workloads that repeatedly create and drop a single
      30              :  * logical slot. On the other hand, this lazy approach can delay changes
      31              :  * to effective_wal_level and the disabling logical decoding, especially
      32              :  * when the checkpointer is busy with other tasks. We chose this lazy approach
      33              :  * in all deactivation paths to keep the implementation simple, even though
      34              :  * laziness is strictly required only for end-of-recovery cases. Future work
      35              :  * might address this limitation either by using a dedicated worker instead
      36              :  * of the checkpointer, or by implementing synchronous waiting during slot
      37              :  * drops if workloads are significantly affected by the lazy deactivation
      38              :  * of logical decoding.
      39              :  *
      40              :  * Standby servers use the primary server's effective_wal_level and logical
      41              :  * decoding status. Unlike normal activation and deactivation, these
      42              :  * are updated simultaneously without status change coordination, solely by
      43              :  * replaying XLOG_LOGICAL_DECODING_STATUS_CHANGE records. The local wal_level
      44              :  * setting has no effect during this time. Upon promotion, we update the
      45              :  * logical decoding status based on local conditions: the wal_level value and
      46              :  * the presence of logical slots.
      47              :  *
      48              :  * In the future, we could extend support to include automatic transitions
      49              :  * of effective_wal_level between 'minimal' and 'logical' WAL levels. However,
      50              :  * this enhancement would require additional coordination mechanisms and
      51              :  * careful implementation of operations such as terminating walsenders and
      52              :  * archiver processes while carefully considering the sequence of operations
      53              :  * to ensure system stability during these transitions.
      54              :  *
      55              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      56              :  * Portions Copyright (c) 1994, Regents of the University of California
      57              :  *
      58              :  * IDENTIFICATION
      59              :  *    src/backend/replication/logical/logicalctl.c
      60              :  *
      61              :  *-------------------------------------------------------------------------
      62              :  */
      63              : 
      64              : #include "postgres.h"
      65              : 
      66              : #include "access/xloginsert.h"
      67              : #include "catalog/pg_control.h"
      68              : #include "miscadmin.h"
      69              : #include "replication/slot.h"
      70              : #include "storage/ipc.h"
      71              : #include "storage/lmgr.h"
      72              : #include "storage/proc.h"
      73              : #include "storage/procarray.h"
      74              : #include "storage/procsignal.h"
      75              : #include "storage/subsystems.h"
      76              : #include "utils/injection_point.h"
      77              : 
      78              : /*
      79              :  * Struct for controlling the logical decoding status.
      80              :  *
      81              :  * This struct is protected by LogicalDecodingControlLock.
      82              :  */
      83              : typedef struct LogicalDecodingCtlData
      84              : {
      85              :     /*
      86              :      * This is the authoritative value used by all processes to determine
      87              :      * whether to write additional information required by logical decoding to
      88              :      * WAL. Since this information could be checked frequently, each process
      89              :      * caches this value in XLogLogicalInfo for better performance.
      90              :      */
      91              :     bool        xlog_logical_info;
      92              : 
      93              :     /* True if logical decoding is available in the system */
      94              :     bool        logical_decoding_enabled;
      95              : 
      96              :     /* True if logical decoding might need to be disabled */
      97              :     bool        pending_disable;
      98              : } LogicalDecodingCtlData;
      99              : 
     100              : static LogicalDecodingCtlData *LogicalDecodingCtl = NULL;
     101              : 
     102              : static void LogicalDecodingCtlShmemRequest(void *arg);
     103              : 
     104              : const ShmemCallbacks LogicalDecodingCtlShmemCallbacks = {
     105              :     .request_fn = LogicalDecodingCtlShmemRequest,
     106              : };
     107              : 
     108              : /*
     109              :  * A process-local cache of LogicalDecodingCtl->xlog_logical_info. This is
     110              :  * initialized at process startup, and updated when processing the process
     111              :  * barrier signal in ProcessBarrierUpdateXLogLogicalInfo(). If the process
     112              :  * is in an XID-assigned transaction, the cache update is delayed until the
     113              :  * transaction ends. See the comments for XLogLogicalInfoUpdatePending for details.
     114              :  */
     115              : bool        XLogLogicalInfo = false;
     116              : 
     117              : /*
     118              :  * When receiving the PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO signal, if
     119              :  * an XID is assigned to the current transaction, the process sets this flag and
     120              :  * delays the XLogLogicalInfo update until the transaction ends. This ensures
     121              :  * that the XLogLogicalInfo value (typically accessed via XLogLogicalInfoActive)
     122              :  * remains consistent throughout the transaction.
     123              :  */
     124              : static bool XLogLogicalInfoUpdatePending = false;
     125              : 
     126              : static void update_xlog_logical_info(void);
     127              : static void abort_logical_decoding_activation(int code, Datum arg);
     128              : static void write_logical_decoding_status_update_record(bool status);
     129              : 
     130              : static void
     131         1245 : LogicalDecodingCtlShmemRequest(void *arg)
     132              : {
     133         1245 :     ShmemRequestStruct(.name = "Logical decoding control",
     134              :                        .size = sizeof(LogicalDecodingCtlData),
     135              :                        .ptr = (void **) &LogicalDecodingCtl,
     136              :         );
     137         1245 : }
     138              : 
     139              : /*
     140              :  * Initialize the logical decoding status in shmem at server startup. This
     141              :  * must be called ONCE during postmaster or standalone-backend startup.
     142              :  */
     143              : void
     144         1082 : StartupLogicalDecodingStatus(bool last_status)
     145              : {
     146              :     /* Logical decoding is always disabled when 'minimal' WAL level */
     147         1082 :     if (wal_level == WAL_LEVEL_MINIMAL)
     148          392 :         return;
     149              : 
     150              :     /*
     151              :      * Set the initial logical decoding status based on the last status. If
     152              :      * logical decoding was enabled before the last shutdown, it remains
     153              :      * enabled as we might have set wal_level='logical' or have at least one
     154              :      * logical slot.
     155              :      */
     156          690 :     LogicalDecodingCtl->xlog_logical_info = last_status;
     157          690 :     LogicalDecodingCtl->logical_decoding_enabled = last_status;
     158              : }
     159              : 
     160              : /*
     161              :  * Update the XLogLogicalInfo cache.
     162              :  */
     163              : static inline void
     164        26323 : update_xlog_logical_info(void)
     165              : {
     166        26323 :     XLogLogicalInfo = IsXLogLogicalInfoEnabled();
     167        26323 : }
     168              : 
     169              : /*
     170              :  * Initialize XLogLogicalInfo backend-private cache. This routine is called
     171              :  * during process initialization.
     172              :  */
     173              : void
     174        24568 : InitializeProcessXLogLogicalInfo(void)
     175              : {
     176        24568 :     update_xlog_logical_info();
     177        24568 : }
     178              : 
     179              : /*
     180              :  * This routine is called when we are told to update XLogLogicalInfo
     181              :  * by a ProcSignalBarrier.
     182              :  */
     183              : bool
     184         1755 : ProcessBarrierUpdateXLogLogicalInfo(void)
     185              : {
     186         1755 :     if (GetTopTransactionIdIfAny() != InvalidTransactionId)
     187              :     {
     188              :         /* Delay updating XLogLogicalInfo until the transaction end */
     189            5 :         XLogLogicalInfoUpdatePending = true;
     190              :     }
     191              :     else
     192         1750 :         update_xlog_logical_info();
     193              : 
     194         1755 :     return true;
     195              : }
     196              : 
     197              : /*
     198              :  * Check the shared memory state and return true if logical decoding is
     199              :  * enabled on the system.
     200              :  */
     201              : bool
     202        19745 : IsLogicalDecodingEnabled(void)
     203              : {
     204              :     bool        enabled;
     205              : 
     206        19745 :     LWLockAcquire(LogicalDecodingControlLock, LW_SHARED);
     207        19745 :     enabled = LogicalDecodingCtl->logical_decoding_enabled;
     208        19745 :     LWLockRelease(LogicalDecodingControlLock);
     209              : 
     210        19745 :     return enabled;
     211              : }
     212              : 
     213              : /*
     214              :  * Returns true if logical WAL logging is enabled based on the shared memory
     215              :  * status.
     216              :  */
     217              : bool
     218        26354 : IsXLogLogicalInfoEnabled(void)
     219              : {
     220              :     bool        xlog_logical_info;
     221              : 
     222        26354 :     LWLockAcquire(LogicalDecodingControlLock, LW_SHARED);
     223        26354 :     xlog_logical_info = LogicalDecodingCtl->xlog_logical_info;
     224        26354 :     LWLockRelease(LogicalDecodingControlLock);
     225              : 
     226        26354 :     return xlog_logical_info;
     227              : }
     228              : 
     229              : /*
     230              :  * Reset the local cache at end of the transaction.
     231              :  */
     232              : void
     233       653088 : AtEOXact_LogicalCtl(void)
     234              : {
     235              :     /* Update the local cache if there is a pending update */
     236       653088 :     if (XLogLogicalInfoUpdatePending)
     237              :     {
     238            5 :         update_xlog_logical_info();
     239            5 :         XLogLogicalInfoUpdatePending = false;
     240              :     }
     241       653088 : }
     242              : 
     243              : /*
     244              :  * Writes an XLOG_LOGICAL_DECODING_STATUS_CHANGE WAL record with the given
     245              :  * status.
     246              :  */
     247              : static void
     248           95 : write_logical_decoding_status_update_record(bool status)
     249              : {
     250              :     XLogRecPtr  recptr;
     251              : 
     252           95 :     XLogBeginInsert();
     253           95 :     XLogRegisterData(&status, sizeof(bool));
     254           95 :     recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE);
     255           95 :     XLogFlush(recptr);
     256           95 : }
     257              : 
     258              : /*
     259              :  * A PG_ENSURE_ERROR_CLEANUP callback for activating logical decoding.
     260              :  *
     261              :  * Rather than directly reverting xlog_logical_info here, we request
     262              :  * that the checkpointer handle it via the normal disable path. This
     263              :  * avoids race conditions when multiple backends attempt concurrent
     264              :  * activation: the checkpointer will reset xlog_logical_info when
     265              :  * no valid logical slots exist.
     266              :  */
     267              : static void
     268            2 : abort_logical_decoding_activation(int code, Datum arg)
     269              : {
     270            2 :     elog(DEBUG1, "aborting logical decoding activation process");
     271            2 :     RequestDisableLogicalDecoding();
     272            2 : }
     273              : 
     274              : /*
     275              :  * Enable logical decoding if disabled.
     276              :  *
     277              :  * If this function is called during recovery, it simply returns without
     278              :  * action since the logical decoding status change is not allowed during
     279              :  * this time. The logical decoding status depends on the status on the primary.
     280              :  * The caller should use CheckLogicalDecodingRequirements() before calling this
     281              :  * function to make sure that the logical decoding status can be modified.
     282              :  *
     283              :  * Note that there is no interlock between logical decoding activation
     284              :  * and slot creation. To ensure enabling logical decoding, the caller
     285              :  * needs to call this function after creating a logical slot before
     286              :  * initializing the logical decoding context.
     287              :  */
     288              : void
     289          514 : EnsureLogicalDecodingEnabled(void)
     290              : {
     291              :     Assert(MyReplicationSlot);
     292              :     Assert(wal_level >= WAL_LEVEL_REPLICA);
     293              : 
     294              :     /* Logical decoding is always enabled */
     295          514 :     if (wal_level >= WAL_LEVEL_LOGICAL)
     296          497 :         return;
     297              : 
     298           17 :     if (RecoveryInProgress())
     299              :     {
     300              :         /*
     301              :          * CheckLogicalDecodingRequirements() must have already errored out if
     302              :          * logical decoding is not enabled since we cannot enable the logical
     303              :          * decoding status during recovery.
     304              :          */
     305              :         Assert(IsLogicalDecodingEnabled());
     306            3 :         return;
     307              :     }
     308              : 
     309              :     /*
     310              :      * Ensure to abort the activation process in cases where there in an
     311              :      * interruption during the wait.
     312              :      */
     313           14 :     PG_ENSURE_ERROR_CLEANUP(abort_logical_decoding_activation, (Datum) 0);
     314              :     {
     315           14 :         EnableLogicalDecoding();
     316              :     }
     317           14 :     PG_END_ENSURE_ERROR_CLEANUP(abort_logical_decoding_activation, (Datum) 0);
     318              : }
     319              : 
     320              : /*
     321              :  * A workhorse function to enable logical decoding.
     322              :  */
     323              : void
     324           23 : EnableLogicalDecoding(void)
     325              : {
     326              :     bool        in_recovery;
     327              : 
     328           23 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
     329              : 
     330              :     /* Return if it is already enabled */
     331           23 :     if (LogicalDecodingCtl->logical_decoding_enabled)
     332              :     {
     333            2 :         LogicalDecodingCtl->pending_disable = false;
     334            2 :         LWLockRelease(LogicalDecodingControlLock);
     335            2 :         return;
     336              :     }
     337              : 
     338              :     /*
     339              :      * Set logical info WAL logging in shmem. All process starts after this
     340              :      * point will include the information required by logical decoding to WAL
     341              :      * records.
     342              :      */
     343           21 :     LogicalDecodingCtl->xlog_logical_info = true;
     344              : 
     345           21 :     LWLockRelease(LogicalDecodingControlLock);
     346              : 
     347              :     /*
     348              :      * Tell all running processes to reflect the xlog_logical_info update, and
     349              :      * wait. This ensures that all running processes have enabled logical
     350              :      * information WAL logging.
     351              :      */
     352           21 :     WaitForProcSignalBarrier(
     353              :                              EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO));
     354              : 
     355           21 :     INJECTION_POINT("logical-decoding-activation", NULL);
     356              : 
     357           19 :     in_recovery = RecoveryInProgress();
     358              : 
     359              :     /*
     360              :      * There could be some transactions that might have started with the old
     361              :      * status, but we don't need to wait for these transactions to complete as
     362              :      * long as they have valid XIDs. These transactions will appear in the
     363              :      * xl_running_xacts record and therefore the snapshot builder will not try
     364              :      * to decode the transaction during the logical decoding initialization.
     365              :      *
     366              :      * There is a theoretical case where a transaction decides whether to
     367              :      * include logical-info to WAL records before getting an XID. In this
     368              :      * case, the transaction won't appear in xl_running_xacts.
     369              :      *
     370              :      * For operations that do not require an XID assignment, the process
     371              :      * starts including logical-info immediately upon receiving the signal
     372              :      * (barrier). If such an operation checks the effective_wal_level multiple
     373              :      * times within a single execution, the resulting WAL records might be
     374              :      * inconsistent (i.e., logical-info is included in some records but not in
     375              :      * others). However, this is harmless because logical decoding generally
     376              :      * ignores WAL records that are not associated with an assigned XID.
     377              :      *
     378              :      * One might think we need to wait for all running transactions, including
     379              :      * those without XIDs and read-only transactions, to finish before
     380              :      * enabling logical decoding. However, such a requirement would force the
     381              :      * slot creation to wait for a potentially very long time due to
     382              :      * long-running read queries, which is practically unacceptable.
     383              :      */
     384              : 
     385           19 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
     386              : 
     387           19 :     START_CRIT_SECTION();
     388              : 
     389              :     /*
     390              :      * We enable logical decoding first, followed by writing the WAL record.
     391              :      * This sequence ensures logical decoding becomes available on the primary
     392              :      * first.
     393              :      */
     394           19 :     LogicalDecodingCtl->logical_decoding_enabled = true;
     395              : 
     396           19 :     if (!in_recovery)
     397           10 :         write_logical_decoding_status_update_record(true);
     398              : 
     399           19 :     LogicalDecodingCtl->pending_disable = false;
     400              : 
     401           19 :     END_CRIT_SECTION();
     402              : 
     403           19 :     LWLockRelease(LogicalDecodingControlLock);
     404              : 
     405              :     /*
     406              :      * We log the activation message after releasing the slot lock. This is
     407              :      * safe because the activation is performed while holding a logical slot,
     408              :      * meaning, a concurrent deactivation cannot interleave its log message
     409              :      * ahead of ours.
     410              :      */
     411           19 :     if (!in_recovery)
     412           10 :         ereport(LOG,
     413              :                 errmsg("logical decoding is enabled upon creating a new logical replication slot"));
     414              : }
     415              : 
     416              : /*
     417              :  * Initiate a request for disabling logical decoding.
     418              :  *
     419              :  * Note that this function does not verify whether logical slots exist. The
     420              :  * checkpointer will verify if logical decoding should actually be disabled.
     421              :  */
     422              : void
     423          446 : RequestDisableLogicalDecoding(void)
     424              : {
     425          446 :     if (wal_level != WAL_LEVEL_REPLICA)
     426          427 :         return;
     427              : 
     428              :     /*
     429              :      * It's possible that we might not actually need to disable logical
     430              :      * decoding if someone creates a new logical slot concurrently. We set the
     431              :      * flag anyway and the checkpointer will check it and disable logical
     432              :      * decoding if necessary.
     433              :      */
     434           19 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
     435           19 :     LogicalDecodingCtl->pending_disable = true;
     436           19 :     LWLockRelease(LogicalDecodingControlLock);
     437              : 
     438           19 :     WakeupCheckpointer();
     439              : 
     440           19 :     elog(DEBUG1, "requested disabling logical decoding");
     441              : }
     442              : 
     443              : /*
     444              :  * Disable logical decoding if necessary.
     445              :  *
     446              :  * This function disables logical decoding upon a request initiated by
     447              :  * RequestDisableLogicalDecoding(). Otherwise, it performs no action.
     448              :  */
     449              : void
     450         5483 : DisableLogicalDecodingIfNecessary(void)
     451              : {
     452              :     bool        pending_disable;
     453              : 
     454         5483 :     if (wal_level != WAL_LEVEL_REPLICA)
     455         1271 :         return;
     456              : 
     457              :     /*
     458              :      * Sanity check as we cannot disable logical decoding while holding a
     459              :      * logical slot.
     460              :      */
     461              :     Assert(!MyReplicationSlot);
     462              : 
     463         4212 :     if (RecoveryInProgress())
     464         1692 :         return;
     465              : 
     466         2520 :     LWLockAcquire(LogicalDecodingControlLock, LW_SHARED);
     467         2520 :     pending_disable = LogicalDecodingCtl->pending_disable;
     468         2520 :     LWLockRelease(LogicalDecodingControlLock);
     469              : 
     470              :     /* Quick return if no pending disable request */
     471         2520 :     if (!pending_disable)
     472         2504 :         return;
     473              : 
     474           16 :     DisableLogicalDecoding();
     475              : }
     476              : 
     477              : /*
     478              :  * A workhorse function to disable logical decoding.
     479              :  */
     480              : void
     481           25 : DisableLogicalDecoding(void)
     482              : {
     483           25 :     bool        in_recovery = RecoveryInProgress();
     484              :     bool        was_enabled;
     485              : 
     486           25 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
     487              : 
     488              :     /*
     489              :      * Check if we can disable logical decoding.
     490              :      *
     491              :      * Nothing to do if both flags are already off, or if valid slots exist
     492              :      * (skip the slot check during recovery because the existing slots will be
     493              :      * invalidated after disabling logical decoding.)
     494              :      */
     495           25 :     if ((!LogicalDecodingCtl->logical_decoding_enabled &&
     496            3 :          !LogicalDecodingCtl->xlog_logical_info) ||
     497           23 :         (!in_recovery && CheckLogicalSlotExists()))
     498              :     {
     499            4 :         LogicalDecodingCtl->pending_disable = false;
     500            4 :         LWLockRelease(LogicalDecodingControlLock);
     501            4 :         return;
     502              :     }
     503              : 
     504              :     /*
     505              :      * Remember if logical decoding was enabled. An interrupted activation can
     506              :      * leave xlog_logical_info=true while logical_decoding_enabled remains
     507              :      * false.
     508              :      */
     509           21 :     was_enabled = LogicalDecodingCtl->logical_decoding_enabled;
     510              : 
     511           21 :     START_CRIT_SECTION();
     512              : 
     513              :     /*
     514              :      * We need to disable logical decoding first and then disable logical
     515              :      * information WAL logging in order to ensure that no logical decoding
     516              :      * processes WAL records with insufficient information.
     517              :      */
     518           21 :     LogicalDecodingCtl->logical_decoding_enabled = false;
     519              : 
     520              :     /* Write the WAL to disable logical decoding on standbys too */
     521           21 :     if (!in_recovery && was_enabled)
     522           11 :         write_logical_decoding_status_update_record(false);
     523              : 
     524              :     /* Now disable logical information WAL logging */
     525           21 :     LogicalDecodingCtl->xlog_logical_info = false;
     526           21 :     LogicalDecodingCtl->pending_disable = false;
     527              : 
     528           21 :     END_CRIT_SECTION();
     529              : 
     530              :     /*
     531              :      * Logging under the lock guarantees our "is disabled" message appears in
     532              :      * the server log before its eventual "is enabled", making server log
     533              :      * diagnostics easy.
     534              :      */
     535           21 :     if (!in_recovery && was_enabled)
     536           11 :         ereport(LOG,
     537              :                 errmsg("logical decoding is disabled because there are no valid logical replication slots"));
     538              : 
     539           21 :     LWLockRelease(LogicalDecodingControlLock);
     540              : 
     541              :     /*
     542              :      * Tell all running processes to reflect the xlog_logical_info update.
     543              :      * Unlike when enabling logical decoding, we don't need to wait for all
     544              :      * processes to complete it in this case. We already disabled logical
     545              :      * decoding and it's always safe to write logical information to WAL
     546              :      * records, even when not strictly required. Therefore, we don't need to
     547              :      * wait for all running transactions to finish either.
     548              :      */
     549           21 :     EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO);
     550              : }
     551              : 
     552              : /*
     553              :  * Updates the logical decoding status at end of recovery, and ensures that
     554              :  * all running processes have the updated XLogLogicalInfo status. This
     555              :  * function must be called before accepting writes.
     556              :  */
     557              : void
     558         1015 : UpdateLogicalDecodingStatusEndOfRecovery(void)
     559              : {
     560         1015 :     bool        new_status = false;
     561              : 
     562              :     Assert(RecoveryInProgress());
     563              : 
     564              :     /*
     565              :      * With 'minimal' WAL level, there are no logical replication slots during
     566              :      * recovery. Logical decoding is always disabled, so there is no need to
     567              :      * synchronize XLogLogicalInfo.
     568              :      */
     569         1015 :     if (wal_level == WAL_LEVEL_MINIMAL)
     570              :     {
     571              :         Assert(!IsXLogLogicalInfoEnabled() && !IsLogicalDecodingEnabled());
     572          392 :         return;
     573              :     }
     574              : 
     575          623 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
     576              : 
     577          623 :     if (wal_level == WAL_LEVEL_LOGICAL || CheckLogicalSlotExists())
     578          137 :         new_status = true;
     579              : 
     580              :     /*
     581              :      * When recovery ends, we need to either enable or disable logical
     582              :      * decoding based on the wal_level setting and the presence of logical
     583              :      * slots. We need to note that concurrent slot creation and deletion could
     584              :      * happen but WAL writes are still not permitted until recovery fully
     585              :      * completes. Here's how we handle concurrent toggling of logical
     586              :      * decoding:
     587              :      *
     588              :      * For 'enable' case, if there's a concurrent disable request before
     589              :      * recovery fully completes, the checkpointer will handle it after
     590              :      * recovery is done. This means there might be a brief period after
     591              :      * recovery where logical decoding remains enabled even with no logical
     592              :      * replication slots present. This temporary state is not new - it can
     593              :      * already occur due to the checkpointer's asynchronous deactivation
     594              :      * process.
     595              :      *
     596              :      * For 'disable' case, backend cannot create logical replication slots
     597              :      * during recovery (see checks in CheckLogicalDecodingRequirements()),
     598              :      * which prevents a race condition between disabling logical decoding and
     599              :      * concurrent slot creation.
     600              :      */
     601          623 :     if (new_status != LogicalDecodingCtl->logical_decoding_enabled)
     602              :     {
     603              :         /*
     604              :          * Update both the logical decoding status and logical WAL logging
     605              :          * status. Unlike toggling these status during non-recovery, we don't
     606              :          * need to worry about the operation order as WAL writes are still not
     607              :          * permitted.
     608              :          */
     609           74 :         LogicalDecodingCtl->xlog_logical_info = new_status;
     610           74 :         LogicalDecodingCtl->logical_decoding_enabled = new_status;
     611              : 
     612           74 :         elog(DEBUG1,
     613              :              "update logical decoding status to %d at the end of recovery",
     614              :              new_status);
     615              : 
     616              :         /*
     617              :          * Now that we updated the logical decoding status, clear the pending
     618              :          * disable flag. It's possible that a concurrent process drops the
     619              :          * last logical slot and initiates the pending disable again. The
     620              :          * checkpointer process will check it.
     621              :          */
     622           74 :         LogicalDecodingCtl->pending_disable = false;
     623              : 
     624           74 :         LWLockRelease(LogicalDecodingControlLock);
     625              : 
     626           74 :         write_logical_decoding_status_update_record(new_status);
     627              :     }
     628              :     else
     629          549 :         LWLockRelease(LogicalDecodingControlLock);
     630              : 
     631              :     /*
     632              :      * Ensure all running processes have the updated status. We don't need to
     633              :      * wait for running transactions to finish as we don't accept any writes
     634              :      * yet. On the other hand, we need to wait for synchronizing
     635              :      * XLogLogicalInfo even if we've not updated the status above as the
     636              :      * status have been turned on and off during recovery, having running
     637              :      * processes have different status on their local caches.
     638              :      */
     639          623 :     if (IsUnderPostmaster)
     640          490 :         WaitForProcSignalBarrier(
     641              :                                  EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO));
     642              : 
     643          623 :     INJECTION_POINT("startup-logical-decoding-status-change-end-of-recovery", NULL);
     644              : }
        

Generated by: LCOV version 2.0-1