LCOV - code coverage report
Current view: top level - src/include/replication - worker_internal.h (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 100.0 % 10 10
Test Date: 2026-03-10 18:15:00 Functions: 100.0 % 5 5
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * worker_internal.h
       4              :  *    Internal headers shared by logical replication workers.
       5              :  *
       6              :  * Portions Copyright (c) 2016-2026, PostgreSQL Global Development Group
       7              :  *
       8              :  * src/include/replication/worker_internal.h
       9              :  *
      10              :  *-------------------------------------------------------------------------
      11              :  */
      12              : #ifndef WORKER_INTERNAL_H
      13              : #define WORKER_INTERNAL_H
      14              : 
      15              : #include "access/xlogdefs.h"
      16              : #include "catalog/pg_subscription.h"
      17              : #include "datatype/timestamp.h"
      18              : #include "miscadmin.h"
      19              : #include "replication/logicalrelation.h"
      20              : #include "replication/walreceiver.h"
      21              : #include "storage/buffile.h"
      22              : #include "storage/fileset.h"
      23              : #include "storage/lock.h"
      24              : #include "storage/shm_mq.h"
      25              : #include "storage/shm_toc.h"
      26              : #include "storage/spin.h"
      27              : 
      28              : /* Different types of worker */
      29              : typedef enum LogicalRepWorkerType
      30              : {
      31              :     WORKERTYPE_UNKNOWN = 0,
      32              :     WORKERTYPE_TABLESYNC,
      33              :     WORKERTYPE_SEQUENCESYNC,
      34              :     WORKERTYPE_APPLY,
      35              :     WORKERTYPE_PARALLEL_APPLY,
      36              : } LogicalRepWorkerType;
      37              : 
      38              : typedef struct LogicalRepWorker
      39              : {
      40              :     /* What type of worker is this? */
      41              :     LogicalRepWorkerType type;
      42              : 
      43              :     /* Time at which this worker was launched. */
      44              :     TimestampTz launch_time;
      45              : 
      46              :     /* Indicates if this slot is used or free. */
      47              :     bool        in_use;
      48              : 
      49              :     /* Increased every time the slot is taken by new worker. */
      50              :     uint16      generation;
      51              : 
      52              :     /* Pointer to proc array. NULL if not running. */
      53              :     PGPROC     *proc;
      54              : 
      55              :     /* Database id to connect to. */
      56              :     Oid         dbid;
      57              : 
      58              :     /* User to use for connection (will be same as owner of subscription). */
      59              :     Oid         userid;
      60              : 
      61              :     /* Subscription id for the worker. */
      62              :     Oid         subid;
      63              : 
      64              :     /* Used for initial table synchronization. */
      65              :     Oid         relid;
      66              :     char        relstate;
      67              :     XLogRecPtr  relstate_lsn;
      68              :     slock_t     relmutex;
      69              : 
      70              :     /*
      71              :      * Used to create the changes and subxact files for the streaming
      72              :      * transactions.  Upon the arrival of the first streaming transaction or
      73              :      * when the first-time leader apply worker times out while sending changes
      74              :      * to the parallel apply worker, the fileset will be initialized, and it
      75              :      * will be deleted when the worker exits.  Under this, separate buffiles
      76              :      * would be created for each transaction which will be deleted after the
      77              :      * transaction is finished.
      78              :      */
      79              :     FileSet    *stream_fileset;
      80              : 
      81              :     /*
      82              :      * PID of leader apply worker if this slot is used for a parallel apply
      83              :      * worker, InvalidPid otherwise.
      84              :      */
      85              :     pid_t       leader_pid;
      86              : 
      87              :     /* Indicates whether apply can be performed in parallel. */
      88              :     bool        parallel_apply;
      89              : 
      90              :     /*
      91              :      * Changes made by this transaction and subsequent ones must be preserved.
      92              :      * This ensures that update_deleted conflicts can be accurately detected
      93              :      * during the apply phase of logical replication by this worker.
      94              :      *
      95              :      * The logical replication launcher manages an internal replication slot
      96              :      * named "pg_conflict_detection". It asynchronously collects this ID to
      97              :      * decide when to advance the xmin value of the slot.
      98              :      *
      99              :      * This ID is set to InvalidTransactionId when the apply worker stops
     100              :      * retaining information needed for conflict detection.
     101              :      */
     102              :     TransactionId oldest_nonremovable_xid;
     103              : 
     104              :     /* Stats. */
     105              :     XLogRecPtr  last_lsn;
     106              :     TimestampTz last_send_time;
     107              :     TimestampTz last_recv_time;
     108              :     XLogRecPtr  reply_lsn;
     109              :     TimestampTz reply_time;
     110              : 
     111              :     TimestampTz last_seqsync_start_time;
     112              : } LogicalRepWorker;
     113              : 
     114              : /*
     115              :  * State of the transaction in parallel apply worker.
     116              :  *
     117              :  * The enum values must have the same order as the transaction state
     118              :  * transitions.
     119              :  */
     120              : typedef enum ParallelTransState
     121              : {
     122              :     PARALLEL_TRANS_UNKNOWN,
     123              :     PARALLEL_TRANS_STARTED,
     124              :     PARALLEL_TRANS_FINISHED,
     125              : } ParallelTransState;
     126              : 
     127              : /*
     128              :  * State of fileset used to communicate changes from leader to parallel
     129              :  * apply worker.
     130              :  *
     131              :  * FS_EMPTY indicates an initial state where the leader doesn't need to use
     132              :  * the file to communicate with the parallel apply worker.
     133              :  *
     134              :  * FS_SERIALIZE_IN_PROGRESS indicates that the leader is serializing changes
     135              :  * to the file.
     136              :  *
     137              :  * FS_SERIALIZE_DONE indicates that the leader has serialized all changes to
     138              :  * the file.
     139              :  *
     140              :  * FS_READY indicates that it is now ok for a parallel apply worker to
     141              :  * read the file.
     142              :  */
     143              : typedef enum PartialFileSetState
     144              : {
     145              :     FS_EMPTY,
     146              :     FS_SERIALIZE_IN_PROGRESS,
     147              :     FS_SERIALIZE_DONE,
     148              :     FS_READY,
     149              : } PartialFileSetState;
     150              : 
     151              : /*
     152              :  * Struct for sharing information between leader apply worker and parallel
     153              :  * apply workers.
     154              :  */
     155              : typedef struct ParallelApplyWorkerShared
     156              : {
     157              :     slock_t     mutex;
     158              : 
     159              :     TransactionId xid;
     160              : 
     161              :     /*
     162              :      * State used to ensure commit ordering.
     163              :      *
     164              :      * The parallel apply worker will set it to PARALLEL_TRANS_FINISHED after
     165              :      * handling the transaction finish commands while the apply leader will
     166              :      * wait for it to become PARALLEL_TRANS_FINISHED before proceeding in
     167              :      * transaction finish commands (e.g. STREAM_COMMIT/STREAM_PREPARE/
     168              :      * STREAM_ABORT).
     169              :      */
     170              :     ParallelTransState xact_state;
     171              : 
     172              :     /* Information from the corresponding LogicalRepWorker slot. */
     173              :     uint16      logicalrep_worker_generation;
     174              :     int         logicalrep_worker_slot_no;
     175              : 
     176              :     /*
     177              :      * Indicates whether there are pending streaming blocks in the queue. The
     178              :      * parallel apply worker will check it before starting to wait.
     179              :      */
     180              :     pg_atomic_uint32 pending_stream_count;
     181              : 
     182              :     /*
     183              :      * XactLastCommitEnd from the parallel apply worker. This is required by
     184              :      * the leader worker so it can update the lsn_mappings.
     185              :      */
     186              :     XLogRecPtr  last_commit_end;
     187              : 
     188              :     /*
     189              :      * After entering PARTIAL_SERIALIZE mode, the leader apply worker will
     190              :      * serialize changes to the file, and share the fileset with the parallel
     191              :      * apply worker when processing the transaction finish command. Then the
     192              :      * parallel apply worker will apply all the spooled messages.
     193              :      *
     194              :      * FileSet is used here instead of SharedFileSet because we need it to
     195              :      * survive after releasing the shared memory so that the leader apply
     196              :      * worker can re-use the same fileset for the next streaming transaction.
     197              :      */
     198              :     PartialFileSetState fileset_state;
     199              :     FileSet     fileset;
     200              : } ParallelApplyWorkerShared;
     201              : 
     202              : /*
     203              :  * Information which is used to manage the parallel apply worker.
     204              :  */
     205              : typedef struct ParallelApplyWorkerInfo
     206              : {
     207              :     /*
     208              :      * This queue is used to send changes from the leader apply worker to the
     209              :      * parallel apply worker.
     210              :      */
     211              :     shm_mq_handle *mq_handle;
     212              : 
     213              :     /*
     214              :      * This queue is used to transfer error messages from the parallel apply
     215              :      * worker to the leader apply worker.
     216              :      */
     217              :     shm_mq_handle *error_mq_handle;
     218              : 
     219              :     dsm_segment *dsm_seg;
     220              : 
     221              :     /*
     222              :      * Indicates whether the leader apply worker needs to serialize the
     223              :      * remaining changes to a file due to timeout when attempting to send data
     224              :      * to the parallel apply worker via shared memory.
     225              :      */
     226              :     bool        serialize_changes;
     227              : 
     228              :     /*
     229              :      * True if the worker is being used to process a parallel apply
     230              :      * transaction. False indicates this worker is available for re-use.
     231              :      */
     232              :     bool        in_use;
     233              : 
     234              :     ParallelApplyWorkerShared *shared;
     235              : } ParallelApplyWorkerInfo;
     236              : 
     237              : /* Main memory context for apply worker. Permanent during worker lifetime. */
     238              : extern PGDLLIMPORT MemoryContext ApplyContext;
     239              : 
     240              : extern PGDLLIMPORT MemoryContext ApplyMessageContext;
     241              : 
     242              : extern PGDLLIMPORT ErrorContextCallback *apply_error_context_stack;
     243              : 
     244              : extern PGDLLIMPORT ParallelApplyWorkerShared *MyParallelShared;
     245              : 
     246              : /* libpqreceiver connection */
     247              : extern PGDLLIMPORT struct WalReceiverConn *LogRepWorkerWalRcvConn;
     248              : 
     249              : /* Worker and subscription objects. */
     250              : extern PGDLLIMPORT Subscription *MySubscription;
     251              : extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
     252              : 
     253              : extern PGDLLIMPORT bool in_remote_transaction;
     254              : 
     255              : extern PGDLLIMPORT bool InitializingApplyWorker;
     256              : 
     257              : extern PGDLLIMPORT List *table_states_not_ready;
     258              : 
     259              : extern void logicalrep_worker_attach(int slot);
     260              : extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype,
     261              :                                                 Oid subid, Oid relid,
     262              :                                                 bool only_running);
     263              : extern List *logicalrep_workers_find(Oid subid, bool only_running,
     264              :                                      bool acquire_lock);
     265              : extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
     266              :                                      Oid dbid, Oid subid, const char *subname,
     267              :                                      Oid userid, Oid relid,
     268              :                                      dsm_handle subworker_dsm,
     269              :                                      bool retain_dead_tuples);
     270              : extern void logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid,
     271              :                                    Oid relid);
     272              : extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo);
     273              : extern void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid,
     274              :                                      Oid relid);
     275              : extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
     276              : 
     277              : extern void logicalrep_reset_seqsync_start_time(void);
     278              : extern int  logicalrep_sync_worker_count(Oid subid);
     279              : 
     280              : extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
     281              :                                                char *originname, Size szoriginname);
     282              : 
     283              : extern bool AllTablesyncsReady(void);
     284              : extern bool HasSubscriptionTablesCached(void);
     285              : extern void UpdateTwoPhaseState(Oid suboid, char new_state);
     286              : 
     287              : extern void ProcessSyncingTablesForSync(XLogRecPtr current_lsn);
     288              : extern void ProcessSyncingTablesForApply(XLogRecPtr current_lsn);
     289              : extern void ProcessSequencesForSync(void);
     290              : 
     291              : pg_noreturn extern void FinishSyncWorker(void);
     292              : extern void InvalidateSyncingRelStates(Datum arg, SysCacheIdentifier cacheid,
     293              :                                        uint32 hashvalue);
     294              : extern void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers,
     295              :                                Oid relid, TimestampTz *last_start_time);
     296              : extern void ProcessSyncingRelations(XLogRecPtr current_lsn);
     297              : extern void FetchRelationStates(bool *has_pending_subtables,
     298              :                                 bool *has_pending_subsequences, bool *started_tx);
     299              : 
     300              : extern void stream_start_internal(TransactionId xid, bool first_segment);
     301              : extern void stream_stop_internal(TransactionId xid);
     302              : 
     303              : /* Common streaming function to apply all the spooled messages */
     304              : extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
     305              :                                    XLogRecPtr lsn);
     306              : 
     307              : extern void apply_dispatch(StringInfo s);
     308              : 
     309              : extern void maybe_reread_subscription(void);
     310              : 
     311              : extern void stream_cleanup_files(Oid subid, TransactionId xid);
     312              : 
     313              : extern void set_stream_options(WalRcvStreamOptions *options,
     314              :                                char *slotname,
     315              :                                XLogRecPtr *origin_startpos);
     316              : 
     317              : extern void start_apply(XLogRecPtr origin_startpos);
     318              : 
     319              : extern void InitializeLogRepWorker(void);
     320              : 
     321              : extern void SetupApplyOrSyncWorker(int worker_slot);
     322              : 
     323              : extern void DisableSubscriptionAndExit(void);
     324              : 
     325              : extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn);
     326              : 
     327              : /* Function for apply error callback */
     328              : extern void apply_error_callback(void *arg);
     329              : extern void set_apply_error_context_origin(char *originname);
     330              : 
     331              : /* Parallel apply worker setup and interactions */
     332              : extern void pa_allocate_worker(TransactionId xid);
     333              : extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid);
     334              : extern void pa_detach_all_error_mq(void);
     335              : 
     336              : extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes,
     337              :                          const void *data);
     338              : extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
     339              :                                            bool stream_locked);
     340              : 
     341              : extern void pa_set_xact_state(ParallelApplyWorkerShared *wshared,
     342              :                               ParallelTransState xact_state);
     343              : extern void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo);
     344              : 
     345              : extern void pa_start_subtrans(TransactionId current_xid,
     346              :                               TransactionId top_xid);
     347              : extern void pa_reset_subtrans(void);
     348              : extern void pa_stream_abort(LogicalRepStreamAbortData *abort_data);
     349              : extern void pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
     350              :                                  PartialFileSetState fileset_state);
     351              : 
     352              : extern void pa_lock_stream(TransactionId xid, LOCKMODE lockmode);
     353              : extern void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode);
     354              : 
     355              : extern void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode);
     356              : extern void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode);
     357              : 
     358              : extern void pa_decr_and_wait_stream_block(void);
     359              : 
     360              : extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
     361              :                            XLogRecPtr remote_lsn);
     362              : 
     363              : #define isParallelApplyWorker(worker) ((worker)->in_use && \
     364              :                                        (worker)->type == WORKERTYPE_PARALLEL_APPLY)
     365              : #define isTableSyncWorker(worker) ((worker)->in_use && \
     366              :                                    (worker)->type == WORKERTYPE_TABLESYNC)
     367              : #define isSequenceSyncWorker(worker) ((worker)->in_use && \
     368              :                                       (worker)->type == WORKERTYPE_SEQUENCESYNC)
     369              : 
     370              : static inline bool
     371          571 : am_tablesync_worker(void)
     372              : {
     373          571 :     return isTableSyncWorker(MyLogicalRepWorker);
     374              : }
     375              : 
     376              : static inline bool
     377          526 : am_sequencesync_worker(void)
     378              : {
     379          526 :     return isSequenceSyncWorker(MyLogicalRepWorker);
     380              : }
     381              : 
     382              : static inline bool
     383       229914 : am_leader_apply_worker(void)
     384              : {
     385              :     Assert(MyLogicalRepWorker->in_use);
     386       229914 :     return (MyLogicalRepWorker->type == WORKERTYPE_APPLY);
     387              : }
     388              : 
     389              : static inline bool
     390       340062 : am_parallel_apply_worker(void)
     391              : {
     392              :     Assert(MyLogicalRepWorker->in_use);
     393       340062 :     return isParallelApplyWorker(MyLogicalRepWorker);
     394              : }
     395              : 
     396              : static inline LogicalRepWorkerType
     397          120 : get_logical_worker_type(void)
     398              : {
     399              :     Assert(MyLogicalRepWorker->in_use);
     400          120 :     return MyLogicalRepWorker->type;
     401              : }
     402              : 
     403              : #endif                          /* WORKER_INTERNAL_H */
        

Generated by: LCOV version 2.0-1