LCOV - code coverage report
Current view: top level - src/include/replication - worker_internal.h (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 8 8 100.0 %
Date: 2025-11-13 05:17:35 Functions: 4 4 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * worker_internal.h
       4             :  *    Internal headers shared by logical replication workers.
       5             :  *
       6             :  * Portions Copyright (c) 2016-2025, PostgreSQL Global Development Group
       7             :  *
       8             :  * src/include/replication/worker_internal.h
       9             :  *
      10             :  *-------------------------------------------------------------------------
      11             :  */
      12             : #ifndef WORKER_INTERNAL_H
      13             : #define WORKER_INTERNAL_H
      14             : 
      15             : #include "access/xlogdefs.h"
      16             : #include "catalog/pg_subscription.h"
      17             : #include "datatype/timestamp.h"
      18             : #include "miscadmin.h"
      19             : #include "replication/logicalrelation.h"
      20             : #include "replication/walreceiver.h"
      21             : #include "storage/buffile.h"
      22             : #include "storage/fileset.h"
      23             : #include "storage/lock.h"
      24             : #include "storage/shm_mq.h"
      25             : #include "storage/shm_toc.h"
      26             : #include "storage/spin.h"
      27             : 
      28             : /* Different types of worker */
      29             : typedef enum LogicalRepWorkerType
      30             : {
      31             :     WORKERTYPE_UNKNOWN = 0,
      32             :     WORKERTYPE_TABLESYNC,
      33             :     WORKERTYPE_SEQUENCESYNC,
      34             :     WORKERTYPE_APPLY,
      35             :     WORKERTYPE_PARALLEL_APPLY,
      36             : } LogicalRepWorkerType;
      37             : 
      38             : typedef struct LogicalRepWorker
      39             : {
      40             :     /* What type of worker is this? */
      41             :     LogicalRepWorkerType type;
      42             : 
      43             :     /* Time at which this worker was launched. */
      44             :     TimestampTz launch_time;
      45             : 
      46             :     /* Indicates if this slot is used or free. */
      47             :     bool        in_use;
      48             : 
      49             :     /* Increased every time the slot is taken by new worker. */
      50             :     uint16      generation;
      51             : 
      52             :     /* Pointer to proc array. NULL if not running. */
      53             :     PGPROC     *proc;
      54             : 
      55             :     /* Database id to connect to. */
      56             :     Oid         dbid;
      57             : 
      58             :     /* User to use for connection (will be same as owner of subscription). */
      59             :     Oid         userid;
      60             : 
      61             :     /* Subscription id for the worker. */
      62             :     Oid         subid;
      63             : 
      64             :     /* Used for initial table synchronization. */
      65             :     Oid         relid;
      66             :     char        relstate;
      67             :     XLogRecPtr  relstate_lsn;
      68             :     slock_t     relmutex;
      69             : 
      70             :     /*
      71             :      * Used to create the changes and subxact files for the streaming
      72             :      * transactions.  Upon the arrival of the first streaming transaction or
      73             :      * when the first-time leader apply worker times out while sending changes
      74             :      * to the parallel apply worker, the fileset will be initialized, and it
      75             :      * will be deleted when the worker exits.  Under this, separate buffiles
      76             :      * would be created for each transaction which will be deleted after the
      77             :      * transaction is finished.
      78             :      */
      79             :     FileSet    *stream_fileset;
      80             : 
      81             :     /*
      82             :      * PID of leader apply worker if this slot is used for a parallel apply
      83             :      * worker, InvalidPid otherwise.
      84             :      */
      85             :     pid_t       leader_pid;
      86             : 
      87             :     /* Indicates whether apply can be performed in parallel. */
      88             :     bool        parallel_apply;
      89             : 
      90             :     /*
      91             :      * Changes made by this transaction and subsequent ones must be preserved.
      92             :      * This ensures that update_deleted conflicts can be accurately detected
      93             :      * during the apply phase of logical replication by this worker.
      94             :      *
      95             :      * The logical replication launcher manages an internal replication slot
      96             :      * named "pg_conflict_detection". It asynchronously collects this ID to
      97             :      * decide when to advance the xmin value of the slot.
      98             :      *
      99             :      * This ID is set to InvalidTransactionId when the apply worker stops
     100             :      * retaining information needed for conflict detection.
     101             :      */
     102             :     TransactionId oldest_nonremovable_xid;
     103             : 
     104             :     /* Stats. */
     105             :     XLogRecPtr  last_lsn;
     106             :     TimestampTz last_send_time;
     107             :     TimestampTz last_recv_time;
     108             :     XLogRecPtr  reply_lsn;
     109             :     TimestampTz reply_time;
     110             : 
     111             :     TimestampTz last_seqsync_start_time;
     112             : } LogicalRepWorker;
     113             : 
     114             : /*
     115             :  * State of the transaction in parallel apply worker.
     116             :  *
     117             :  * The enum values must have the same order as the transaction state
     118             :  * transitions.
     119             :  */
     120             : typedef enum ParallelTransState
     121             : {
     122             :     PARALLEL_TRANS_UNKNOWN,
     123             :     PARALLEL_TRANS_STARTED,
     124             :     PARALLEL_TRANS_FINISHED,
     125             : } ParallelTransState;
     126             : 
     127             : /*
     128             :  * State of fileset used to communicate changes from leader to parallel
     129             :  * apply worker.
     130             :  *
     131             :  * FS_EMPTY indicates an initial state where the leader doesn't need to use
     132             :  * the file to communicate with the parallel apply worker.
     133             :  *
     134             :  * FS_SERIALIZE_IN_PROGRESS indicates that the leader is serializing changes
     135             :  * to the file.
     136             :  *
     137             :  * FS_SERIALIZE_DONE indicates that the leader has serialized all changes to
     138             :  * the file.
     139             :  *
     140             :  * FS_READY indicates that it is now ok for a parallel apply worker to
     141             :  * read the file.
     142             :  */
     143             : typedef enum PartialFileSetState
     144             : {
     145             :     FS_EMPTY,
     146             :     FS_SERIALIZE_IN_PROGRESS,
     147             :     FS_SERIALIZE_DONE,
     148             :     FS_READY,
     149             : } PartialFileSetState;
     150             : 
     151             : /*
     152             :  * Struct for sharing information between leader apply worker and parallel
     153             :  * apply workers.
     154             :  */
     155             : typedef struct ParallelApplyWorkerShared
     156             : {
     157             :     slock_t     mutex;
     158             : 
     159             :     TransactionId xid;
     160             : 
     161             :     /*
     162             :      * State used to ensure commit ordering.
     163             :      *
     164             :      * The parallel apply worker will set it to PARALLEL_TRANS_FINISHED after
     165             :      * handling the transaction finish commands while the apply leader will
     166             :      * wait for it to become PARALLEL_TRANS_FINISHED before proceeding in
     167             :      * transaction finish commands (e.g. STREAM_COMMIT/STREAM_PREPARE/
     168             :      * STREAM_ABORT).
     169             :      */
     170             :     ParallelTransState xact_state;
     171             : 
     172             :     /* Information from the corresponding LogicalRepWorker slot. */
     173             :     uint16      logicalrep_worker_generation;
     174             :     int         logicalrep_worker_slot_no;
     175             : 
     176             :     /*
     177             :      * Indicates whether there are pending streaming blocks in the queue. The
     178             :      * parallel apply worker will check it before starting to wait.
     179             :      */
     180             :     pg_atomic_uint32 pending_stream_count;
     181             : 
     182             :     /*
     183             :      * XactLastCommitEnd from the parallel apply worker. This is required by
     184             :      * the leader worker so it can update the lsn_mappings.
     185             :      */
     186             :     XLogRecPtr  last_commit_end;
     187             : 
     188             :     /*
     189             :      * After entering PARTIAL_SERIALIZE mode, the leader apply worker will
     190             :      * serialize changes to the file, and share the fileset with the parallel
     191             :      * apply worker when processing the transaction finish command. Then the
     192             :      * parallel apply worker will apply all the spooled messages.
     193             :      *
     194             :      * FileSet is used here instead of SharedFileSet because we need it to
     195             :      * survive after releasing the shared memory so that the leader apply
     196             :      * worker can re-use the same fileset for the next streaming transaction.
     197             :      */
     198             :     PartialFileSetState fileset_state;
     199             :     FileSet     fileset;
     200             : } ParallelApplyWorkerShared;
     201             : 
     202             : /*
     203             :  * Information which is used to manage the parallel apply worker.
     204             :  */
     205             : typedef struct ParallelApplyWorkerInfo
     206             : {
     207             :     /*
     208             :      * This queue is used to send changes from the leader apply worker to the
     209             :      * parallel apply worker.
     210             :      */
     211             :     shm_mq_handle *mq_handle;
     212             : 
     213             :     /*
     214             :      * This queue is used to transfer error messages from the parallel apply
     215             :      * worker to the leader apply worker.
     216             :      */
     217             :     shm_mq_handle *error_mq_handle;
     218             : 
     219             :     dsm_segment *dsm_seg;
     220             : 
     221             :     /*
     222             :      * Indicates whether the leader apply worker needs to serialize the
     223             :      * remaining changes to a file due to timeout when attempting to send data
     224             :      * to the parallel apply worker via shared memory.
     225             :      */
     226             :     bool        serialize_changes;
     227             : 
     228             :     /*
     229             :      * True if the worker is being used to process a parallel apply
     230             :      * transaction. False indicates this worker is available for re-use.
     231             :      */
     232             :     bool        in_use;
     233             : 
     234             :     ParallelApplyWorkerShared *shared;
     235             : } ParallelApplyWorkerInfo;
     236             : 
     237             : /* Main memory context for apply worker. Permanent during worker lifetime. */
     238             : extern PGDLLIMPORT MemoryContext ApplyContext;
     239             : 
     240             : extern PGDLLIMPORT MemoryContext ApplyMessageContext;
     241             : 
     242             : extern PGDLLIMPORT ErrorContextCallback *apply_error_context_stack;
     243             : 
     244             : extern PGDLLIMPORT ParallelApplyWorkerShared *MyParallelShared;
     245             : 
     246             : /* libpqreceiver connection */
     247             : extern PGDLLIMPORT struct WalReceiverConn *LogRepWorkerWalRcvConn;
     248             : 
     249             : /* Worker and subscription objects. */
     250             : extern PGDLLIMPORT Subscription *MySubscription;
     251             : extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
     252             : 
     253             : extern PGDLLIMPORT bool in_remote_transaction;
     254             : 
     255             : extern PGDLLIMPORT bool InitializingApplyWorker;
     256             : 
     257             : extern PGDLLIMPORT List *table_states_not_ready;
     258             : 
     259             : extern void logicalrep_worker_attach(int slot);
     260             : extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype,
     261             :                                                 Oid subid, Oid relid,
     262             :                                                 bool only_running);
     263             : extern List *logicalrep_workers_find(Oid subid, bool only_running,
     264             :                                      bool acquire_lock);
     265             : extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
     266             :                                      Oid dbid, Oid subid, const char *subname,
     267             :                                      Oid userid, Oid relid,
     268             :                                      dsm_handle subworker_dsm,
     269             :                                      bool retain_dead_tuples);
     270             : extern void logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid,
     271             :                                    Oid relid);
     272             : extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo);
     273             : extern void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid,
     274             :                                      Oid relid);
     275             : extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
     276             : 
     277             : extern void logicalrep_reset_seqsync_start_time(void);
     278             : extern int  logicalrep_sync_worker_count(Oid subid);
     279             : 
     280             : extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
     281             :                                                char *originname, Size szoriginname);
     282             : 
     283             : extern bool AllTablesyncsReady(void);
     284             : extern bool HasSubscriptionTablesCached(void);
     285             : extern void UpdateTwoPhaseState(Oid suboid, char new_state);
     286             : 
     287             : extern void ProcessSyncingTablesForSync(XLogRecPtr current_lsn);
     288             : extern void ProcessSyncingTablesForApply(XLogRecPtr current_lsn);
     289             : extern void ProcessSequencesForSync(void);
     290             : 
     291             : pg_noreturn extern void FinishSyncWorker(void);
     292             : extern void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue);
     293             : extern void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers,
     294             :                                Oid relid, TimestampTz *last_start_time);
     295             : extern void ProcessSyncingRelations(XLogRecPtr current_lsn);
     296             : extern void FetchRelationStates(bool *has_pending_subtables,
     297             :                                 bool *has_pending_sequences, bool *started_tx);
     298             : 
     299             : extern void stream_start_internal(TransactionId xid, bool first_segment);
     300             : extern void stream_stop_internal(TransactionId xid);
     301             : 
     302             : /* Common streaming function to apply all the spooled messages */
     303             : extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
     304             :                                    XLogRecPtr lsn);
     305             : 
     306             : extern void apply_dispatch(StringInfo s);
     307             : 
     308             : extern void maybe_reread_subscription(void);
     309             : 
     310             : extern void stream_cleanup_files(Oid subid, TransactionId xid);
     311             : 
     312             : extern void set_stream_options(WalRcvStreamOptions *options,
     313             :                                char *slotname,
     314             :                                XLogRecPtr *origin_startpos);
     315             : 
     316             : extern void start_apply(XLogRecPtr origin_startpos);
     317             : 
     318             : extern void InitializeLogRepWorker(void);
     319             : 
     320             : extern void SetupApplyOrSyncWorker(int worker_slot);
     321             : 
     322             : extern void DisableSubscriptionAndExit(void);
     323             : 
     324             : extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn);
     325             : 
     326             : /* Function for apply error callback */
     327             : extern void apply_error_callback(void *arg);
     328             : extern void set_apply_error_context_origin(char *originname);
     329             : 
     330             : /* Parallel apply worker setup and interactions */
     331             : extern void pa_allocate_worker(TransactionId xid);
     332             : extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid);
     333             : extern void pa_detach_all_error_mq(void);
     334             : 
     335             : extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes,
     336             :                          const void *data);
     337             : extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
     338             :                                            bool stream_locked);
     339             : 
     340             : extern void pa_set_xact_state(ParallelApplyWorkerShared *wshared,
     341             :                               ParallelTransState xact_state);
     342             : extern void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo);
     343             : 
     344             : extern void pa_start_subtrans(TransactionId current_xid,
     345             :                               TransactionId top_xid);
     346             : extern void pa_reset_subtrans(void);
     347             : extern void pa_stream_abort(LogicalRepStreamAbortData *abort_data);
     348             : extern void pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
     349             :                                  PartialFileSetState fileset_state);
     350             : 
     351             : extern void pa_lock_stream(TransactionId xid, LOCKMODE lockmode);
     352             : extern void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode);
     353             : 
     354             : extern void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode);
     355             : extern void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode);
     356             : 
     357             : extern void pa_decr_and_wait_stream_block(void);
     358             : 
     359             : extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
     360             :                            XLogRecPtr remote_lsn);
     361             : 
     362             : #define isParallelApplyWorker(worker) ((worker)->in_use && \
     363             :                                        (worker)->type == WORKERTYPE_PARALLEL_APPLY)
     364             : #define isTableSyncWorker(worker) ((worker)->in_use && \
     365             :                                    (worker)->type == WORKERTYPE_TABLESYNC)
     366             : #define isSequenceSyncWorker(worker) ((worker)->in_use && \
     367             :                                       (worker)->type == WORKERTYPE_SEQUENCESYNC)
     368             : 
     369             : static inline bool
     370        1040 : am_tablesync_worker(void)
     371             : {
     372        1040 :     return isTableSyncWorker(MyLogicalRepWorker);
     373             : }
     374             : 
     375             : static inline bool
     376         968 : am_sequencesync_worker(void)
     377             : {
     378         968 :     return isSequenceSyncWorker(MyLogicalRepWorker);
     379             : }
     380             : 
     381             : static inline bool
     382      545760 : am_leader_apply_worker(void)
     383             : {
     384             :     Assert(MyLogicalRepWorker->in_use);
     385      545760 :     return (MyLogicalRepWorker->type == WORKERTYPE_APPLY);
     386             : }
     387             : 
     388             : static inline bool
     389      663456 : am_parallel_apply_worker(void)
     390             : {
     391             :     Assert(MyLogicalRepWorker->in_use);
     392      663456 :     return isParallelApplyWorker(MyLogicalRepWorker);
     393             : }
     394             : 
     395             : #endif                          /* WORKER_INTERNAL_H */

Generated by: LCOV version 1.16