LCOV - code coverage report
Current view: top level - src/backend/replication/logical - logicalctl.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 97.8 % 136 133
Test Date: 2026-03-14 14:14:39 Functions: 100.0 % 17 17
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  * logicalctl.c
       3              :  *      Functionality to control logical decoding status online.
       4              :  *
       5              :  * This module enables dynamic control of logical decoding availability.
       6              :  * Logical decoding becomes active under two conditions: when the wal_level
       7              :  * parameter is set to 'logical', or when at least one valid logical replication
       8              :  * slot exists with wal_level set to 'replica'. The system disables logical
       9              :  * decoding when neither condition is met. Therefore, the dynamic control
      10              :  * of logical decoding availability is required only when wal_level is set
      11              :  * to 'replica'. Logical decoding is always enabled when wal_level='logical'
      12              :  * and always disabled when wal_level='minimal'.
      13              :  *
      14              :  * The core concept of dynamically enabling and disabling logical decoding
      15              :  * is to separately control two aspects: writing information required for
      16              :  * logical decoding to WAL records, and using logical decoding itself. During
      17              :  * activation, we first enable logical WAL writing while keeping logical
      18              :  * decoding disabled. This change is reflected in the read-only
      19              :  * effective_wal_level GUC parameter. Once we ensure that all processes have
      20              :  * updated to the latest effective_wal_level value, we then enable logical
      21              :  * decoding. Deactivation follows a similar careful, multi-step process
      22              :  * in reverse order.
      23              :  *
      24              :  * While activation occurs synchronously right after creating the first
      25              :  * logical slot, deactivation happens asynchronously through the checkpointer
      26              :  * process. This design avoids a race condition at the end of recovery; see
      27              :  * the comments in UpdateLogicalDecodingStatusEndOfRecovery() for details.
      28              :  * Asynchronous deactivation also avoids excessive toggling of the logical
      29              :  * decoding status in workloads that repeatedly create and drop a single
      30              :  * logical slot. On the other hand, this lazy approach can delay changes
      31              :  * to effective_wal_level and the disabling logical decoding, especially
      32              :  * when the checkpointer is busy with other tasks. We chose this lazy approach
      33              :  * in all deactivation paths to keep the implementation simple, even though
      34              :  * laziness is strictly required only for end-of-recovery cases. Future work
      35              :  * might address this limitation either by using a dedicated worker instead
      36              :  * of the checkpointer, or by implementing synchronous waiting during slot
      37              :  * drops if workloads are significantly affected by the lazy deactivation
      38              :  * of logical decoding.
      39              :  *
      40              :  * Standby servers use the primary server's effective_wal_level and logical
      41              :  * decoding status. Unlike normal activation and deactivation, these
      42              :  * are updated simultaneously without status change coordination, solely by
      43              :  * replaying XLOG_LOGICAL_DECODING_STATUS_CHANGE records. The local wal_level
      44              :  * setting has no effect during this time. Upon promotion, we update the
      45              :  * logical decoding status based on local conditions: the wal_level value and
      46              :  * the presence of logical slots.
      47              :  *
      48              :  * In the future, we could extend support to include automatic transitions
      49              :  * of effective_wal_level between 'minimal' and 'logical' WAL levels. However,
      50              :  * this enhancement would require additional coordination mechanisms and
      51              :  * careful implementation of operations such as terminating walsenders and
      52              :  * archiver processes while carefully considering the sequence of operations
      53              :  * to ensure system stability during these transitions.
      54              :  *
      55              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      56              :  * Portions Copyright (c) 1994, Regents of the University of California
      57              :  *
      58              :  * IDENTIFICATION
      59              :  *    src/backend/replication/logical/logicalctl.c
      60              :  *
      61              :  *-------------------------------------------------------------------------
      62              :  */
      63              : 
      64              : #include "postgres.h"
      65              : 
      66              : #include "access/xloginsert.h"
      67              : #include "catalog/pg_control.h"
      68              : #include "miscadmin.h"
      69              : #include "replication/slot.h"
      70              : #include "storage/ipc.h"
      71              : #include "storage/lmgr.h"
      72              : #include "storage/proc.h"
      73              : #include "storage/procarray.h"
      74              : #include "storage/procsignal.h"
      75              : #include "utils/injection_point.h"
      76              : 
      77              : /*
      78              :  * Struct for controlling the logical decoding status.
      79              :  *
      80              :  * This struct is protected by LogicalDecodingControlLock.
      81              :  */
      82              : typedef struct LogicalDecodingCtlData
      83              : {
      84              :     /*
      85              :      * This is the authoritative value used by all processes to determine
      86              :      * whether to write additional information required by logical decoding to
      87              :      * WAL. Since this information could be checked frequently, each process
      88              :      * caches this value in XLogLogicalInfo for better performance.
      89              :      */
      90              :     bool        xlog_logical_info;
      91              : 
      92              :     /* True if logical decoding is available in the system */
      93              :     bool        logical_decoding_enabled;
      94              : 
      95              :     /* True if logical decoding might need to be disabled */
      96              :     bool        pending_disable;
      97              : } LogicalDecodingCtlData;
      98              : 
      99              : static LogicalDecodingCtlData *LogicalDecodingCtl = NULL;
     100              : 
     101              : /*
     102              :  * A process-local cache of LogicalDecodingCtl->xlog_logical_info. This is
     103              :  * initialized at process startup, and updated when processing the process
     104              :  * barrier signal in ProcessBarrierUpdateXLogLogicalInfo(). If the process
     105              :  * is in an XID-assigned transaction, the cache update is delayed until the
     106              :  * transaction ends. See the comments for XLogLogicalInfoUpdatePending for details.
     107              :  */
     108              : bool        XLogLogicalInfo = false;
     109              : 
     110              : /*
     111              :  * When receiving the PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO signal, if
     112              :  * an XID is assigned to the current transaction, the process sets this flag and
     113              :  * delays the XLogLogicalInfo update until the transaction ends. This ensures
     114              :  * that the XLogLogicalInfo value (typically accessed via XLogLogicalInfoActive)
     115              :  * remains consistent throughout the transaction.
     116              :  */
     117              : static bool XLogLogicalInfoUpdatePending = false;
     118              : 
     119              : static void update_xlog_logical_info(void);
     120              : static void abort_logical_decoding_activation(int code, Datum arg);
     121              : static void write_logical_decoding_status_update_record(bool status);
     122              : 
     123              : Size
     124         4483 : LogicalDecodingCtlShmemSize(void)
     125              : {
     126         4483 :     return sizeof(LogicalDecodingCtlData);
     127              : }
     128              : 
     129              : void
     130         1159 : LogicalDecodingCtlShmemInit(void)
     131              : {
     132              :     bool        found;
     133              : 
     134         1159 :     LogicalDecodingCtl = ShmemInitStruct("Logical decoding control",
     135              :                                          LogicalDecodingCtlShmemSize(),
     136              :                                          &found);
     137              : 
     138         1159 :     if (!found)
     139         1159 :         MemSet(LogicalDecodingCtl, 0, LogicalDecodingCtlShmemSize());
     140         1159 : }
     141              : 
     142              : /*
     143              :  * Initialize the logical decoding status in shmem at server startup. This
     144              :  * must be called ONCE during postmaster or standalone-backend startup.
     145              :  */
     146              : void
     147         1010 : StartupLogicalDecodingStatus(bool last_status)
     148              : {
     149              :     /* Logical decoding is always disabled when 'minimal' WAL level */
     150         1010 :     if (wal_level == WAL_LEVEL_MINIMAL)
     151          360 :         return;
     152              : 
     153              :     /*
     154              :      * Set the initial logical decoding status based on the last status. If
     155              :      * logical decoding was enabled before the last shutdown, it remains
     156              :      * enabled as we might have set wal_level='logical' or have at least one
     157              :      * logical slot.
     158              :      */
     159          650 :     LogicalDecodingCtl->xlog_logical_info = last_status;
     160          650 :     LogicalDecodingCtl->logical_decoding_enabled = last_status;
     161              : }
     162              : 
     163              : /*
     164              :  * Update the XLogLogicalInfo cache.
     165              :  */
     166              : static inline void
     167        25377 : update_xlog_logical_info(void)
     168              : {
     169        25377 :     XLogLogicalInfo = IsXLogLogicalInfoEnabled();
     170        25377 : }
     171              : 
     172              : /*
     173              :  * Initialize XLogLogicalInfo backend-private cache. This routine is called
     174              :  * during process initialization.
     175              :  */
     176              : void
     177        23507 : InitializeProcessXLogLogicalInfo(void)
     178              : {
     179        23507 :     update_xlog_logical_info();
     180        23507 : }
     181              : 
     182              : /*
     183              :  * This routine is called when we are told to update XLogLogicalInfo
     184              :  * by a ProcSignalBarrier.
     185              :  */
     186              : bool
     187         1870 : ProcessBarrierUpdateXLogLogicalInfo(void)
     188              : {
     189         1870 :     if (GetTopTransactionIdIfAny() != InvalidTransactionId)
     190              :     {
     191              :         /* Delay updating XLogLogicalInfo until the transaction end */
     192            0 :         XLogLogicalInfoUpdatePending = true;
     193              :     }
     194              :     else
     195         1870 :         update_xlog_logical_info();
     196              : 
     197         1870 :     return true;
     198              : }
     199              : 
     200              : /*
     201              :  * Check the shared memory state and return true if logical decoding is
     202              :  * enabled on the system.
     203              :  */
     204              : bool
     205        17611 : IsLogicalDecodingEnabled(void)
     206              : {
     207              :     bool        enabled;
     208              : 
     209        17611 :     LWLockAcquire(LogicalDecodingControlLock, LW_SHARED);
     210        17611 :     enabled = LogicalDecodingCtl->logical_decoding_enabled;
     211        17611 :     LWLockRelease(LogicalDecodingControlLock);
     212              : 
     213        17611 :     return enabled;
     214              : }
     215              : 
     216              : /*
     217              :  * Returns true if logical WAL logging is enabled based on the shared memory
     218              :  * status.
     219              :  */
     220              : bool
     221        25399 : IsXLogLogicalInfoEnabled(void)
     222              : {
     223              :     bool        xlog_logical_info;
     224              : 
     225        25399 :     LWLockAcquire(LogicalDecodingControlLock, LW_SHARED);
     226        25399 :     xlog_logical_info = LogicalDecodingCtl->xlog_logical_info;
     227        25399 :     LWLockRelease(LogicalDecodingControlLock);
     228              : 
     229        25399 :     return xlog_logical_info;
     230              : }
     231              : 
     232              : /*
     233              :  * Reset the local cache at end of the transaction.
     234              :  */
     235              : void
     236       565533 : AtEOXact_LogicalCtl(void)
     237              : {
     238              :     /* Update the local cache if there is a pending update */
     239       565533 :     if (XLogLogicalInfoUpdatePending)
     240              :     {
     241            0 :         update_xlog_logical_info();
     242            0 :         XLogLogicalInfoUpdatePending = false;
     243              :     }
     244       565533 : }
     245              : 
     246              : /*
     247              :  * Writes an XLOG_LOGICAL_DECODING_STATUS_CHANGE WAL record with the given
     248              :  * status.
     249              :  */
     250              : static void
     251           83 : write_logical_decoding_status_update_record(bool status)
     252              : {
     253              :     XLogRecPtr  recptr;
     254              : 
     255           83 :     XLogBeginInsert();
     256           83 :     XLogRegisterData(&status, sizeof(bool));
     257           83 :     recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE);
     258           83 :     XLogFlush(recptr);
     259           83 : }
     260              : 
     261              : /*
     262              :  * A PG_ENSURE_ERROR_CLEANUP callback for activating logical decoding, resetting
     263              :  * the shared flags to revert the logical decoding activation process.
     264              :  */
     265              : static void
     266            1 : abort_logical_decoding_activation(int code, Datum arg)
     267              : {
     268              :     Assert(MyReplicationSlot);
     269              :     Assert(!LogicalDecodingCtl->logical_decoding_enabled);
     270              : 
     271            1 :     elog(DEBUG1, "aborting logical decoding activation process");
     272              : 
     273              :     /*
     274              :      * Abort the change to xlog_logical_info. We don't need to check
     275              :      * CheckLogicalSlotExists() as we're still holding a logical slot.
     276              :      */
     277            1 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
     278            1 :     LogicalDecodingCtl->xlog_logical_info = false;
     279            1 :     LWLockRelease(LogicalDecodingControlLock);
     280              : 
     281              :     /*
     282              :      * Some processes might have already started logical info WAL logging, so
     283              :      * tell all running processes to update their caches. We don't need to
     284              :      * wait for all processes to disable xlog_logical_info locally as it's
     285              :      * always safe to write logical information to WAL records, even when not
     286              :      * strictly required.
     287              :      */
     288            1 :     EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO);
     289            1 : }
     290              : 
     291              : /*
     292              :  * Enable logical decoding if disabled.
     293              :  *
     294              :  * If this function is called during recovery, it simply returns without
     295              :  * action since the logical decoding status change is not allowed during
     296              :  * this time. The logical decoding status depends on the status on the primary.
     297              :  * The caller should use CheckLogicalDecodingRequirements() before calling this
     298              :  * function to make sure that the logical decoding status can be modified.
     299              :  *
     300              :  * Note that there is no interlock between logical decoding activation
     301              :  * and slot creation. To ensure enabling logical decoding, the caller
     302              :  * needs to call this function after creating a logical slot before
     303              :  * initializing the logical decoding context.
     304              :  */
     305              : void
     306          494 : EnsureLogicalDecodingEnabled(void)
     307              : {
     308              :     Assert(MyReplicationSlot);
     309              :     Assert(wal_level >= WAL_LEVEL_REPLICA);
     310              : 
     311              :     /* Logical decoding is always enabled */
     312          494 :     if (wal_level >= WAL_LEVEL_LOGICAL)
     313          484 :         return;
     314              : 
     315           10 :     if (RecoveryInProgress())
     316              :     {
     317              :         /*
     318              :          * CheckLogicalDecodingRequirements() must have already errored out if
     319              :          * logical decoding is not enabled since we cannot enable the logical
     320              :          * decoding status during recovery.
     321              :          */
     322              :         Assert(IsLogicalDecodingEnabled());
     323            3 :         return;
     324              :     }
     325              : 
     326              :     /*
     327              :      * Ensure to abort the activation process in cases where there in an
     328              :      * interruption during the wait.
     329              :      */
     330            7 :     PG_ENSURE_ERROR_CLEANUP(abort_logical_decoding_activation, (Datum) 0);
     331              :     {
     332            7 :         EnableLogicalDecoding();
     333              :     }
     334            7 :     PG_END_ENSURE_ERROR_CLEANUP(abort_logical_decoding_activation, (Datum) 0);
     335              : }
     336              : 
     337              : /*
     338              :  * A workhorse function to enable logical decoding.
     339              :  */
     340              : void
     341           16 : EnableLogicalDecoding(void)
     342              : {
     343              :     bool        in_recovery;
     344              : 
     345           16 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
     346              : 
     347              :     /* Return if it is already enabled */
     348           16 :     if (LogicalDecodingCtl->logical_decoding_enabled)
     349              :     {
     350            2 :         LogicalDecodingCtl->pending_disable = false;
     351            2 :         LWLockRelease(LogicalDecodingControlLock);
     352            2 :         return;
     353              :     }
     354              : 
     355              :     /*
     356              :      * Set logical info WAL logging in shmem. All process starts after this
     357              :      * point will include the information required by logical decoding to WAL
     358              :      * records.
     359              :      */
     360           14 :     LogicalDecodingCtl->xlog_logical_info = true;
     361              : 
     362           14 :     LWLockRelease(LogicalDecodingControlLock);
     363              : 
     364              :     /*
     365              :      * Tell all running processes to reflect the xlog_logical_info update, and
     366              :      * wait. This ensures that all running processes have enabled logical
     367              :      * information WAL logging.
     368              :      */
     369           14 :     WaitForProcSignalBarrier(
     370              :                              EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO));
     371              : 
     372           14 :     INJECTION_POINT("logical-decoding-activation", NULL);
     373              : 
     374           13 :     in_recovery = RecoveryInProgress();
     375              : 
     376              :     /*
     377              :      * There could be some transactions that might have started with the old
     378              :      * status, but we don't need to wait for these transactions to complete as
     379              :      * long as they have valid XIDs. These transactions will appear in the
     380              :      * xl_running_xacts record and therefore the snapshot builder will not try
     381              :      * to decode the transaction during the logical decoding initialization.
     382              :      *
     383              :      * There is a theoretical case where a transaction decides whether to
     384              :      * include logical-info to WAL records before getting an XID. In this
     385              :      * case, the transaction won't appear in xl_running_xacts.
     386              :      *
     387              :      * For operations that do not require an XID assignment, the process
     388              :      * starts including logical-info immediately upon receiving the signal
     389              :      * (barrier). If such an operation checks the effective_wal_level multiple
     390              :      * times within a single execution, the resulting WAL records might be
     391              :      * inconsistent (i.e., logical-info is included in some records but not in
     392              :      * others). However, this is harmless because logical decoding generally
     393              :      * ignores WAL records that are not associated with an assigned XID.
     394              :      *
     395              :      * One might think we need to wait for all running transactions, including
     396              :      * those without XIDs and read-only transactions, to finish before
     397              :      * enabling logical decoding. However, such a requirement would force the
     398              :      * slot creation to wait for a potentially very long time due to
     399              :      * long-running read queries, which is practically unacceptable.
     400              :      */
     401              : 
     402           13 :     START_CRIT_SECTION();
     403              : 
     404              :     /*
     405              :      * We enable logical decoding first, followed by writing the WAL record.
     406              :      * This sequence ensures logical decoding becomes available on the primary
     407              :      * first.
     408              :      */
     409           13 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
     410              : 
     411           13 :     LogicalDecodingCtl->logical_decoding_enabled = true;
     412              : 
     413           13 :     if (!in_recovery)
     414            4 :         write_logical_decoding_status_update_record(true);
     415              : 
     416           13 :     LogicalDecodingCtl->pending_disable = false;
     417              : 
     418           13 :     LWLockRelease(LogicalDecodingControlLock);
     419              : 
     420           13 :     END_CRIT_SECTION();
     421              : 
     422           13 :     if (!in_recovery)
     423            4 :         ereport(LOG,
     424              :                 errmsg("logical decoding is enabled upon creating a new logical replication slot"));
     425              : }
     426              : 
     427              : /*
     428              :  * Initiate a request for disabling logical decoding.
     429              :  *
     430              :  * Note that this function does not verify whether logical slots exist. The
     431              :  * checkpointer will verify if logical decoding should actually be disabled.
     432              :  */
     433              : void
     434          428 : RequestDisableLogicalDecoding(void)
     435              : {
     436          428 :     if (wal_level != WAL_LEVEL_REPLICA)
     437          417 :         return;
     438              : 
     439              :     /*
     440              :      * It's possible that we might not actually need to disable logical
     441              :      * decoding if someone creates a new logical slot concurrently. We set the
     442              :      * flag anyway and the checkpointer will check it and disable logical
     443              :      * decoding if necessary.
     444              :      */
     445           11 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
     446           11 :     LogicalDecodingCtl->pending_disable = true;
     447           11 :     LWLockRelease(LogicalDecodingControlLock);
     448              : 
     449           11 :     WakeupCheckpointer();
     450              : 
     451           11 :     elog(DEBUG1, "requested disabling logical decoding");
     452              : }
     453              : 
     454              : /*
     455              :  * Disable logical decoding if necessary.
     456              :  *
     457              :  * This function disables logical decoding upon a request initiated by
     458              :  * RequestDisableLogicalDecoding(). Otherwise, it performs no action.
     459              :  */
     460              : void
     461         4411 : DisableLogicalDecodingIfNecessary(void)
     462              : {
     463              :     bool        pending_disable;
     464              : 
     465         4411 :     if (wal_level != WAL_LEVEL_REPLICA)
     466         1170 :         return;
     467              : 
     468              :     /*
     469              :      * Sanity check as we cannot disable logical decoding while holding a
     470              :      * logical slot.
     471              :      */
     472              :     Assert(!MyReplicationSlot);
     473              : 
     474         3241 :     if (RecoveryInProgress())
     475         1577 :         return;
     476              : 
     477         1664 :     LWLockAcquire(LogicalDecodingControlLock, LW_SHARED);
     478         1664 :     pending_disable = LogicalDecodingCtl->pending_disable;
     479         1664 :     LWLockRelease(LogicalDecodingControlLock);
     480              : 
     481              :     /* Quick return if no pending disable request */
     482         1664 :     if (!pending_disable)
     483         1654 :         return;
     484              : 
     485           10 :     DisableLogicalDecoding();
     486              : }
     487              : 
     488              : /*
     489              :  * A workhorse function to disable logical decoding.
     490              :  */
     491              : void
     492           19 : DisableLogicalDecoding(void)
     493              : {
     494           19 :     bool        in_recovery = RecoveryInProgress();
     495              : 
     496           19 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
     497              : 
     498              :     /*
     499              :      * Check if we can disable logical decoding.
     500              :      *
     501              :      * Skip CheckLogicalSlotExists() check during recovery because the
     502              :      * existing slots will be invalidated after disabling logical decoding.
     503              :      */
     504           19 :     if (!LogicalDecodingCtl->logical_decoding_enabled ||
     505           16 :         (!in_recovery && CheckLogicalSlotExists()))
     506              :     {
     507            4 :         LogicalDecodingCtl->pending_disable = false;
     508            4 :         LWLockRelease(LogicalDecodingControlLock);
     509            4 :         return;
     510              :     }
     511              : 
     512           15 :     START_CRIT_SECTION();
     513              : 
     514              :     /*
     515              :      * We need to disable logical decoding first and then disable logical
     516              :      * information WAL logging in order to ensure that no logical decoding
     517              :      * processes WAL records with insufficient information.
     518              :      */
     519           15 :     LogicalDecodingCtl->logical_decoding_enabled = false;
     520              : 
     521              :     /* Write the WAL to disable logical decoding on standbys too */
     522           15 :     if (!in_recovery)
     523            6 :         write_logical_decoding_status_update_record(false);
     524              : 
     525              :     /* Now disable logical information WAL logging */
     526           15 :     LogicalDecodingCtl->xlog_logical_info = false;
     527           15 :     LogicalDecodingCtl->pending_disable = false;
     528              : 
     529           15 :     END_CRIT_SECTION();
     530              : 
     531           15 :     if (!in_recovery)
     532            6 :         ereport(LOG,
     533              :                 errmsg("logical decoding is disabled because there are no valid logical replication slots"));
     534              : 
     535           15 :     LWLockRelease(LogicalDecodingControlLock);
     536              : 
     537              :     /*
     538              :      * Tell all running processes to reflect the xlog_logical_info update.
     539              :      * Unlike when enabling logical decoding, we don't need to wait for all
     540              :      * processes to complete it in this case. We already disabled logical
     541              :      * decoding and it's always safe to write logical information to WAL
     542              :      * records, even when not strictly required. Therefore, we don't need to
     543              :      * wait for all running transactions to finish either.
     544              :      */
     545           15 :     EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO);
     546              : }
     547              : 
     548              : /*
     549              :  * Updates the logical decoding status at end of recovery, and ensures that
     550              :  * all running processes have the updated XLogLogicalInfo status. This
     551              :  * function must be called before accepting writes.
     552              :  */
     553              : void
     554          949 : UpdateLogicalDecodingStatusEndOfRecovery(void)
     555              : {
     556          949 :     bool        new_status = false;
     557              : 
     558              :     Assert(RecoveryInProgress());
     559              : 
     560              :     /*
     561              :      * With 'minimal' WAL level, there are no logical replication slots during
     562              :      * recovery. Logical decoding is always disabled, so there is no need to
     563              :      * synchronize XLogLogicalInfo.
     564              :      */
     565          949 :     if (wal_level == WAL_LEVEL_MINIMAL)
     566              :     {
     567              :         Assert(!IsXLogLogicalInfoEnabled() && !IsLogicalDecodingEnabled());
     568          360 :         return;
     569              :     }
     570              : 
     571          589 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
     572              : 
     573          589 :     if (wal_level == WAL_LEVEL_LOGICAL || CheckLogicalSlotExists())
     574          131 :         new_status = true;
     575              : 
     576              :     /*
     577              :      * When recovery ends, we need to either enable or disable logical
     578              :      * decoding based on the wal_level setting and the presence of logical
     579              :      * slots. We need to note that concurrent slot creation and deletion could
     580              :      * happen but WAL writes are still not permitted until recovery fully
     581              :      * completes. Here's how we handle concurrent toggling of logical
     582              :      * decoding:
     583              :      *
     584              :      * For 'enable' case, if there's a concurrent disable request before
     585              :      * recovery fully completes, the checkpointer will handle it after
     586              :      * recovery is done. This means there might be a brief period after
     587              :      * recovery where logical decoding remains enabled even with no logical
     588              :      * replication slots present. This temporary state is not new - it can
     589              :      * already occur due to the checkpointer's asynchronous deactivation
     590              :      * process.
     591              :      *
     592              :      * For 'disable' case, backend cannot create logical replication slots
     593              :      * during recovery (see checks in CheckLogicalDecodingRequirements()),
     594              :      * which prevents a race condition between disabling logical decoding and
     595              :      * concurrent slot creation.
     596              :      */
     597          589 :     if (new_status != LogicalDecodingCtl->logical_decoding_enabled)
     598              :     {
     599              :         /*
     600              :          * Update both the logical decoding status and logical WAL logging
     601              :          * status. Unlike toggling these status during non-recovery, we don't
     602              :          * need to worry about the operation order as WAL writes are still not
     603              :          * permitted.
     604              :          */
     605           73 :         LogicalDecodingCtl->xlog_logical_info = new_status;
     606           73 :         LogicalDecodingCtl->logical_decoding_enabled = new_status;
     607              : 
     608           73 :         elog(DEBUG1,
     609              :              "update logical decoding status to %d at the end of recovery",
     610              :              new_status);
     611              : 
     612              :         /*
     613              :          * Now that we updated the logical decoding status, clear the pending
     614              :          * disable flag. It's possible that a concurrent process drops the
     615              :          * last logical slot and initiates the pending disable again. The
     616              :          * checkpointer process will check it.
     617              :          */
     618           73 :         LogicalDecodingCtl->pending_disable = false;
     619              : 
     620           73 :         LWLockRelease(LogicalDecodingControlLock);
     621              : 
     622           73 :         write_logical_decoding_status_update_record(new_status);
     623              :     }
     624              :     else
     625          516 :         LWLockRelease(LogicalDecodingControlLock);
     626              : 
     627              :     /*
     628              :      * Ensure all running processes have the updated status. We don't need to
     629              :      * wait for running transactions to finish as we don't accept any writes
     630              :      * yet. On the other hand, we need to wait for synchronizing
     631              :      * XLogLogicalInfo even if we've not updated the status above as the
     632              :      * status have been turned on and off during recovery, having running
     633              :      * processes have different status on their local caches.
     634              :      */
     635          589 :     if (IsUnderPostmaster)
     636          468 :         WaitForProcSignalBarrier(
     637              :                                  EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO));
     638              : 
     639          589 :     INJECTION_POINT("startup-logical-decoding-status-change-end-of-recovery", NULL);
     640              : }
        

Generated by: LCOV version 2.0-1