LCOV - code coverage report
Current view: top level - src/backend/replication/logical - worker.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18beta1 Lines: 1476 1587 93.0 %
Date: 2025-06-27 21:17:17 Functions: 82 82 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * worker.c
       3             :  *     PostgreSQL logical replication worker (apply)
       4             :  *
       5             :  * Copyright (c) 2016-2025, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/worker.c
       9             :  *
      10             :  * NOTES
      11             :  *    This file contains the worker which applies logical changes as they come
      12             :  *    from remote logical replication stream.
      13             :  *
      14             :  *    The main worker (apply) is started by logical replication worker
      15             :  *    launcher for every enabled subscription in a database. It uses
      16             :  *    walsender protocol to communicate with publisher.
      17             :  *
      18             :  *    This module includes server facing code and shares libpqwalreceiver
      19             :  *    module with walreceiver for providing the libpq specific functionality.
      20             :  *
      21             :  *
      22             :  * STREAMED TRANSACTIONS
      23             :  * ---------------------
      24             :  * Streamed transactions (large transactions exceeding a memory limit on the
      25             :  * upstream) are applied using one of two approaches:
      26             :  *
      27             :  * 1) Write to temporary files and apply when the final commit arrives
      28             :  *
      29             :  * This approach is used when the user has set the subscription's streaming
      30             :  * option as on.
      31             :  *
      32             :  * Unlike the regular (non-streamed) case, handling streamed transactions has
      33             :  * to handle aborts of both the toplevel transaction and subtransactions. This
      34             :  * is achieved by tracking offsets for subtransactions, which is then used
      35             :  * to truncate the file with serialized changes.
      36             :  *
      37             :  * The files are placed in tmp file directory by default, and the filenames
      38             :  * include both the XID of the toplevel transaction and OID of the
      39             :  * subscription. This is necessary so that different workers processing a
      40             :  * remote transaction with the same XID doesn't interfere.
      41             :  *
      42             :  * We use BufFiles instead of using normal temporary files because (a) the
      43             :  * BufFile infrastructure supports temporary files that exceed the OS file size
      44             :  * limit, (b) provides a way for automatic clean up on the error and (c) provides
      45             :  * a way to survive these files across local transactions and allow to open and
      46             :  * close at stream start and close. We decided to use FileSet
      47             :  * infrastructure as without that it deletes the files on the closure of the
      48             :  * file and if we decide to keep stream files open across the start/stop stream
      49             :  * then it will consume a lot of memory (more than 8K for each BufFile and
      50             :  * there could be multiple such BufFiles as the subscriber could receive
      51             :  * multiple start/stop streams for different transactions before getting the
      52             :  * commit). Moreover, if we don't use FileSet then we also need to invent
      53             :  * a new way to pass filenames to BufFile APIs so that we are allowed to open
      54             :  * the file we desired across multiple stream-open calls for the same
      55             :  * transaction.
      56             :  *
      57             :  * 2) Parallel apply workers.
      58             :  *
      59             :  * This approach is used when the user has set the subscription's streaming
      60             :  * option as parallel. See logical/applyparallelworker.c for information about
      61             :  * this approach.
      62             :  *
      63             :  * TWO_PHASE TRANSACTIONS
      64             :  * ----------------------
      65             :  * Two phase transactions are replayed at prepare and then committed or
      66             :  * rolled back at commit prepared and rollback prepared respectively. It is
      67             :  * possible to have a prepared transaction that arrives at the apply worker
      68             :  * when the tablesync is busy doing the initial copy. In this case, the apply
      69             :  * worker skips all the prepared operations [e.g. inserts] while the tablesync
      70             :  * is still busy (see the condition of should_apply_changes_for_rel). The
      71             :  * tablesync worker might not get such a prepared transaction because say it
      72             :  * was prior to the initial consistent point but might have got some later
      73             :  * commits. Now, the tablesync worker will exit without doing anything for the
      74             :  * prepared transaction skipped by the apply worker as the sync location for it
      75             :  * will be already ahead of the apply worker's current location. This would lead
      76             :  * to an "empty prepare", because later when the apply worker does the commit
      77             :  * prepare, there is nothing in it (the inserts were skipped earlier).
      78             :  *
      79             :  * To avoid this, and similar prepare confusions the subscription's two_phase
      80             :  * commit is enabled only after the initial sync is over. The two_phase option
      81             :  * has been implemented as a tri-state with values DISABLED, PENDING, and
      82             :  * ENABLED.
      83             :  *
      84             :  * Even if the user specifies they want a subscription with two_phase = on,
      85             :  * internally it will start with a tri-state of PENDING which only becomes
      86             :  * ENABLED after all tablesync initializations are completed - i.e. when all
      87             :  * tablesync workers have reached their READY state. In other words, the value
      88             :  * PENDING is only a temporary state for subscription start-up.
      89             :  *
      90             :  * Until the two_phase is properly available (ENABLED) the subscription will
      91             :  * behave as if two_phase = off. When the apply worker detects that all
      92             :  * tablesyncs have become READY (while the tri-state was PENDING) it will
      93             :  * restart the apply worker process. This happens in
      94             :  * process_syncing_tables_for_apply.
      95             :  *
      96             :  * When the (re-started) apply worker finds that all tablesyncs are READY for a
      97             :  * two_phase tri-state of PENDING it start streaming messages with the
      98             :  * two_phase option which in turn enables the decoding of two-phase commits at
      99             :  * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
     100             :  * Now, it is possible that during the time we have not enabled two_phase, the
     101             :  * publisher (replication server) would have skipped some prepares but we
     102             :  * ensure that such prepares are sent along with commit prepare, see
     103             :  * ReorderBufferFinishPrepared.
     104             :  *
     105             :  * If the subscription has no tables then a two_phase tri-state PENDING is
     106             :  * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
     107             :  * PUBLICATION which might otherwise be disallowed (see below).
     108             :  *
     109             :  * If ever a user needs to be aware of the tri-state value, they can fetch it
     110             :  * from the pg_subscription catalog (see column subtwophasestate).
     111             :  *
     112             :  * Finally, to avoid problems mentioned in previous paragraphs from any
     113             :  * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
     114             :  * to 'off' and then again back to 'on') there is a restriction for
     115             :  * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
     116             :  * the two_phase tri-state is ENABLED, except when copy_data = false.
     117             :  *
     118             :  * We can get prepare of the same GID more than once for the genuine cases
     119             :  * where we have defined multiple subscriptions for publications on the same
     120             :  * server and prepared transaction has operations on tables subscribed to those
     121             :  * subscriptions. For such cases, if we use the GID sent by publisher one of
     122             :  * the prepares will be successful and others will fail, in which case the
     123             :  * server will send them again. Now, this can lead to a deadlock if user has
     124             :  * set synchronous_standby_names for all the subscriptions on subscriber. To
     125             :  * avoid such deadlocks, we generate a unique GID (consisting of the
     126             :  * subscription oid and the xid of the prepared transaction) for each prepare
     127             :  * transaction on the subscriber.
     128             :  *
     129             :  * FAILOVER
     130             :  * ----------------------
     131             :  * The logical slot on the primary can be synced to the standby by specifying
     132             :  * failover = true when creating the subscription. Enabling failover allows us
     133             :  * to smoothly transition to the promoted standby, ensuring that we can
     134             :  * subscribe to the new primary without losing any data.
     135             :  *-------------------------------------------------------------------------
     136             :  */
     137             : 
     138             : #include "postgres.h"
     139             : 
     140             : #include <sys/stat.h>
     141             : #include <unistd.h>
     142             : 
     143             : #include "access/table.h"
     144             : #include "access/tableam.h"
     145             : #include "access/twophase.h"
     146             : #include "access/xact.h"
     147             : #include "catalog/indexing.h"
     148             : #include "catalog/pg_inherits.h"
     149             : #include "catalog/pg_subscription.h"
     150             : #include "catalog/pg_subscription_rel.h"
     151             : #include "commands/tablecmds.h"
     152             : #include "commands/trigger.h"
     153             : #include "executor/executor.h"
     154             : #include "executor/execPartition.h"
     155             : #include "libpq/pqformat.h"
     156             : #include "miscadmin.h"
     157             : #include "optimizer/optimizer.h"
     158             : #include "parser/parse_relation.h"
     159             : #include "pgstat.h"
     160             : #include "postmaster/bgworker.h"
     161             : #include "postmaster/interrupt.h"
     162             : #include "postmaster/walwriter.h"
     163             : #include "replication/conflict.h"
     164             : #include "replication/logicallauncher.h"
     165             : #include "replication/logicalproto.h"
     166             : #include "replication/logicalrelation.h"
     167             : #include "replication/logicalworker.h"
     168             : #include "replication/origin.h"
     169             : #include "replication/walreceiver.h"
     170             : #include "replication/worker_internal.h"
     171             : #include "rewrite/rewriteHandler.h"
     172             : #include "storage/buffile.h"
     173             : #include "storage/ipc.h"
     174             : #include "storage/lmgr.h"
     175             : #include "tcop/tcopprot.h"
     176             : #include "utils/acl.h"
     177             : #include "utils/dynahash.h"
     178             : #include "utils/guc.h"
     179             : #include "utils/inval.h"
     180             : #include "utils/lsyscache.h"
     181             : #include "utils/memutils.h"
     182             : #include "utils/pg_lsn.h"
     183             : #include "utils/rel.h"
     184             : #include "utils/rls.h"
     185             : #include "utils/snapmgr.h"
     186             : #include "utils/syscache.h"
     187             : #include "utils/usercontext.h"
     188             : 
     189             : #define NAPTIME_PER_CYCLE 1000  /* max sleep time between cycles (1s) */
     190             : 
     191             : typedef struct FlushPosition
     192             : {
     193             :     dlist_node  node;
     194             :     XLogRecPtr  local_end;
     195             :     XLogRecPtr  remote_end;
     196             : } FlushPosition;
     197             : 
     198             : static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
     199             : 
     200             : typedef struct ApplyExecutionData
     201             : {
     202             :     EState     *estate;         /* executor state, used to track resources */
     203             : 
     204             :     LogicalRepRelMapEntry *targetRel;   /* replication target rel */
     205             :     ResultRelInfo *targetRelInfo;   /* ResultRelInfo for same */
     206             : 
     207             :     /* These fields are used when the target relation is partitioned: */
     208             :     ModifyTableState *mtstate;  /* dummy ModifyTable state */
     209             :     PartitionTupleRouting *proute;  /* partition routing info */
     210             : } ApplyExecutionData;
     211             : 
     212             : /* Struct for saving and restoring apply errcontext information */
     213             : typedef struct ApplyErrorCallbackArg
     214             : {
     215             :     LogicalRepMsgType command;  /* 0 if invalid */
     216             :     LogicalRepRelMapEntry *rel;
     217             : 
     218             :     /* Remote node information */
     219             :     int         remote_attnum;  /* -1 if invalid */
     220             :     TransactionId remote_xid;
     221             :     XLogRecPtr  finish_lsn;
     222             :     char       *origin_name;
     223             : } ApplyErrorCallbackArg;
     224             : 
     225             : /*
     226             :  * The action to be taken for the changes in the transaction.
     227             :  *
     228             :  * TRANS_LEADER_APPLY:
     229             :  * This action means that we are in the leader apply worker or table sync
     230             :  * worker. The changes of the transaction are either directly applied or
     231             :  * are read from temporary files (for streaming transactions) and then
     232             :  * applied by the worker.
     233             :  *
     234             :  * TRANS_LEADER_SERIALIZE:
     235             :  * This action means that we are in the leader apply worker or table sync
     236             :  * worker. Changes are written to temporary files and then applied when the
     237             :  * final commit arrives.
     238             :  *
     239             :  * TRANS_LEADER_SEND_TO_PARALLEL:
     240             :  * This action means that we are in the leader apply worker and need to send
     241             :  * the changes to the parallel apply worker.
     242             :  *
     243             :  * TRANS_LEADER_PARTIAL_SERIALIZE:
     244             :  * This action means that we are in the leader apply worker and have sent some
     245             :  * changes directly to the parallel apply worker and the remaining changes are
     246             :  * serialized to a file, due to timeout while sending data. The parallel apply
     247             :  * worker will apply these serialized changes when the final commit arrives.
     248             :  *
     249             :  * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
     250             :  * serializing changes, the leader worker also needs to serialize the
     251             :  * STREAM_XXX message to a file, and wait for the parallel apply worker to
     252             :  * finish the transaction when processing the transaction finish command. So
     253             :  * this new action was introduced to keep the code and logic clear.
     254             :  *
     255             :  * TRANS_PARALLEL_APPLY:
     256             :  * This action means that we are in the parallel apply worker and changes of
     257             :  * the transaction are applied directly by the worker.
     258             :  */
     259             : typedef enum
     260             : {
     261             :     /* The action for non-streaming transactions. */
     262             :     TRANS_LEADER_APPLY,
     263             : 
     264             :     /* Actions for streaming transactions. */
     265             :     TRANS_LEADER_SERIALIZE,
     266             :     TRANS_LEADER_SEND_TO_PARALLEL,
     267             :     TRANS_LEADER_PARTIAL_SERIALIZE,
     268             :     TRANS_PARALLEL_APPLY,
     269             : } TransApplyAction;
     270             : 
     271             : /* errcontext tracker */
     272             : static ApplyErrorCallbackArg apply_error_callback_arg =
     273             : {
     274             :     .command = 0,
     275             :     .rel = NULL,
     276             :     .remote_attnum = -1,
     277             :     .remote_xid = InvalidTransactionId,
     278             :     .finish_lsn = InvalidXLogRecPtr,
     279             :     .origin_name = NULL,
     280             : };
     281             : 
     282             : ErrorContextCallback *apply_error_context_stack = NULL;
     283             : 
     284             : MemoryContext ApplyMessageContext = NULL;
     285             : MemoryContext ApplyContext = NULL;
     286             : 
     287             : /* per stream context for streaming transactions */
     288             : static MemoryContext LogicalStreamingContext = NULL;
     289             : 
     290             : WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
     291             : 
     292             : Subscription *MySubscription = NULL;
     293             : static bool MySubscriptionValid = false;
     294             : 
     295             : static List *on_commit_wakeup_workers_subids = NIL;
     296             : 
     297             : bool        in_remote_transaction = false;
     298             : static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
     299             : 
     300             : /* fields valid only when processing streamed transaction */
     301             : static bool in_streamed_transaction = false;
     302             : 
     303             : static TransactionId stream_xid = InvalidTransactionId;
     304             : 
     305             : /*
     306             :  * The number of changes applied by parallel apply worker during one streaming
     307             :  * block.
     308             :  */
     309             : static uint32 parallel_stream_nchanges = 0;
     310             : 
     311             : /* Are we initializing an apply worker? */
     312             : bool        InitializingApplyWorker = false;
     313             : 
     314             : /*
     315             :  * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
     316             :  * the subscription if the remote transaction's finish LSN matches the subskiplsn.
     317             :  * Once we start skipping changes, we don't stop it until we skip all changes of
     318             :  * the transaction even if pg_subscription is updated and MySubscription->skiplsn
     319             :  * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
     320             :  * we don't skip receiving and spooling the changes since we decide whether or not
     321             :  * to skip applying the changes when starting to apply changes. The subskiplsn is
     322             :  * cleared after successfully skipping the transaction or applying non-empty
     323             :  * transaction. The latter prevents the mistakenly specified subskiplsn from
     324             :  * being left. Note that we cannot skip the streaming transactions when using
     325             :  * parallel apply workers because we cannot get the finish LSN before applying
     326             :  * the changes. So, we don't start parallel apply worker when finish LSN is set
     327             :  * by the user.
     328             :  */
     329             : static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
     330             : #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
     331             : 
     332             : /* BufFile handle of the current streaming file */
     333             : static BufFile *stream_fd = NULL;
     334             : 
     335             : typedef struct SubXactInfo
     336             : {
     337             :     TransactionId xid;          /* XID of the subxact */
     338             :     int         fileno;         /* file number in the buffile */
     339             :     off_t       offset;         /* offset in the file */
     340             : } SubXactInfo;
     341             : 
     342             : /* Sub-transaction data for the current streaming transaction */
     343             : typedef struct ApplySubXactData
     344             : {
     345             :     uint32      nsubxacts;      /* number of sub-transactions */
     346             :     uint32      nsubxacts_max;  /* current capacity of subxacts */
     347             :     TransactionId subxact_last; /* xid of the last sub-transaction */
     348             :     SubXactInfo *subxacts;      /* sub-xact offset in changes file */
     349             : } ApplySubXactData;
     350             : 
     351             : static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
     352             : 
     353             : static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
     354             : static inline void changes_filename(char *path, Oid subid, TransactionId xid);
     355             : 
     356             : /*
     357             :  * Information about subtransactions of a given toplevel transaction.
     358             :  */
     359             : static void subxact_info_write(Oid subid, TransactionId xid);
     360             : static void subxact_info_read(Oid subid, TransactionId xid);
     361             : static void subxact_info_add(TransactionId xid);
     362             : static inline void cleanup_subxact_info(void);
     363             : 
     364             : /*
     365             :  * Serialize and deserialize changes for a toplevel transaction.
     366             :  */
     367             : static void stream_open_file(Oid subid, TransactionId xid,
     368             :                              bool first_segment);
     369             : static void stream_write_change(char action, StringInfo s);
     370             : static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
     371             : static void stream_close_file(void);
     372             : 
     373             : static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
     374             : 
     375             : static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
     376             : static void apply_handle_insert_internal(ApplyExecutionData *edata,
     377             :                                          ResultRelInfo *relinfo,
     378             :                                          TupleTableSlot *remoteslot);
     379             : static void apply_handle_update_internal(ApplyExecutionData *edata,
     380             :                                          ResultRelInfo *relinfo,
     381             :                                          TupleTableSlot *remoteslot,
     382             :                                          LogicalRepTupleData *newtup,
     383             :                                          Oid localindexoid);
     384             : static void apply_handle_delete_internal(ApplyExecutionData *edata,
     385             :                                          ResultRelInfo *relinfo,
     386             :                                          TupleTableSlot *remoteslot,
     387             :                                          Oid localindexoid);
     388             : static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
     389             :                                     LogicalRepRelation *remoterel,
     390             :                                     Oid localidxoid,
     391             :                                     TupleTableSlot *remoteslot,
     392             :                                     TupleTableSlot **localslot);
     393             : static void apply_handle_tuple_routing(ApplyExecutionData *edata,
     394             :                                        TupleTableSlot *remoteslot,
     395             :                                        LogicalRepTupleData *newtup,
     396             :                                        CmdType operation);
     397             : 
     398             : /* Functions for skipping changes */
     399             : static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
     400             : static void stop_skipping_changes(void);
     401             : static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
     402             : 
     403             : /* Functions for apply error callback */
     404             : static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
     405             : static inline void reset_apply_error_context_info(void);
     406             : 
     407             : static TransApplyAction get_transaction_apply_action(TransactionId xid,
     408             :                                                      ParallelApplyWorkerInfo **winfo);
     409             : 
     410             : static void replorigin_reset(int code, Datum arg);
     411             : 
     412             : /*
     413             :  * Form the origin name for the subscription.
     414             :  *
     415             :  * This is a common function for tablesync and other workers. Tablesync workers
     416             :  * must pass a valid relid. Other callers must pass relid = InvalidOid.
     417             :  *
     418             :  * Return the name in the supplied buffer.
     419             :  */
     420             : void
     421        2488 : ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
     422             :                                    char *originname, Size szoriginname)
     423             : {
     424        2488 :     if (OidIsValid(relid))
     425             :     {
     426             :         /* Replication origin name for tablesync workers. */
     427        1484 :         snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
     428             :     }
     429             :     else
     430             :     {
     431             :         /* Replication origin name for non-tablesync workers. */
     432        1004 :         snprintf(originname, szoriginname, "pg_%u", suboid);
     433             :     }
     434        2488 : }
     435             : 
     436             : /*
     437             :  * Should this worker apply changes for given relation.
     438             :  *
     439             :  * This is mainly needed for initial relation data sync as that runs in
     440             :  * separate worker process running in parallel and we need some way to skip
     441             :  * changes coming to the leader apply worker during the sync of a table.
     442             :  *
     443             :  * Note we need to do smaller or equals comparison for SYNCDONE state because
     444             :  * it might hold position of end of initial slot consistent point WAL
     445             :  * record + 1 (ie start of next record) and next record can be COMMIT of
     446             :  * transaction we are now processing (which is what we set remote_final_lsn
     447             :  * to in apply_handle_begin).
     448             :  *
     449             :  * Note that for streaming transactions that are being applied in the parallel
     450             :  * apply worker, we disallow applying changes if the target table in the
     451             :  * subscription is not in the READY state, because we cannot decide whether to
     452             :  * apply the change as we won't know remote_final_lsn by that time.
     453             :  *
     454             :  * We already checked this in pa_can_start() before assigning the
     455             :  * streaming transaction to the parallel worker, but it also needs to be
     456             :  * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
     457             :  * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
     458             :  * while applying this transaction.
     459             :  */
     460             : static bool
     461      296266 : should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
     462             : {
     463      296266 :     switch (MyLogicalRepWorker->type)
     464             :     {
     465          20 :         case WORKERTYPE_TABLESYNC:
     466          20 :             return MyLogicalRepWorker->relid == rel->localreloid;
     467             : 
     468      136786 :         case WORKERTYPE_PARALLEL_APPLY:
     469             :             /* We don't synchronize rel's that are in unknown state. */
     470      136786 :             if (rel->state != SUBREL_STATE_READY &&
     471           0 :                 rel->state != SUBREL_STATE_UNKNOWN)
     472           0 :                 ereport(ERROR,
     473             :                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     474             :                          errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
     475             :                                 MySubscription->name),
     476             :                          errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
     477             : 
     478      136786 :             return rel->state == SUBREL_STATE_READY;
     479             : 
     480      159460 :         case WORKERTYPE_APPLY:
     481      159584 :             return (rel->state == SUBREL_STATE_READY ||
     482         124 :                     (rel->state == SUBREL_STATE_SYNCDONE &&
     483           6 :                      rel->statelsn <= remote_final_lsn));
     484             : 
     485           0 :         case WORKERTYPE_UNKNOWN:
     486             :             /* Should never happen. */
     487           0 :             elog(ERROR, "Unknown worker type");
     488             :     }
     489             : 
     490           0 :     return false;               /* dummy for compiler */
     491             : }
     492             : 
     493             : /*
     494             :  * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
     495             :  *
     496             :  * Start a transaction, if this is the first step (else we keep using the
     497             :  * existing transaction).
     498             :  * Also provide a global snapshot and ensure we run in ApplyMessageContext.
     499             :  */
     500             : static void
     501      297176 : begin_replication_step(void)
     502             : {
     503      297176 :     SetCurrentStatementStartTimestamp();
     504             : 
     505      297176 :     if (!IsTransactionState())
     506             :     {
     507        1856 :         StartTransactionCommand();
     508        1856 :         maybe_reread_subscription();
     509             :     }
     510             : 
     511      297170 :     PushActiveSnapshot(GetTransactionSnapshot());
     512             : 
     513      297170 :     MemoryContextSwitchTo(ApplyMessageContext);
     514      297170 : }
     515             : 
     516             : /*
     517             :  * Finish up one step of a replication transaction.
     518             :  * Callers of begin_replication_step() must also call this.
     519             :  *
     520             :  * We don't close out the transaction here, but we should increment
     521             :  * the command counter to make the effects of this step visible.
     522             :  */
     523             : static void
     524      297112 : end_replication_step(void)
     525             : {
     526      297112 :     PopActiveSnapshot();
     527             : 
     528      297112 :     CommandCounterIncrement();
     529      297112 : }
     530             : 
     531             : /*
     532             :  * Handle streamed transactions for both the leader apply worker and the
     533             :  * parallel apply workers.
     534             :  *
     535             :  * In the streaming case (receiving a block of the streamed transaction), for
     536             :  * serialize mode, simply redirect it to a file for the proper toplevel
     537             :  * transaction, and for parallel mode, the leader apply worker will send the
     538             :  * changes to parallel apply workers and the parallel apply worker will define
     539             :  * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
     540             :  * messages will be applied by both leader apply worker and parallel apply
     541             :  * workers).
     542             :  *
     543             :  * Returns true for streamed transactions (when the change is either serialized
     544             :  * to file or sent to parallel apply worker), false otherwise (regular mode or
     545             :  * needs to be processed by parallel apply worker).
     546             :  *
     547             :  * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
     548             :  * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
     549             :  * to a parallel apply worker.
     550             :  */
     551             : static bool
     552      648856 : handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
     553             : {
     554             :     TransactionId current_xid;
     555             :     ParallelApplyWorkerInfo *winfo;
     556             :     TransApplyAction apply_action;
     557             :     StringInfoData original_msg;
     558             : 
     559      648856 :     apply_action = get_transaction_apply_action(stream_xid, &winfo);
     560             : 
     561             :     /* not in streaming mode */
     562      648856 :     if (apply_action == TRANS_LEADER_APPLY)
     563      160204 :         return false;
     564             : 
     565             :     Assert(TransactionIdIsValid(stream_xid));
     566             : 
     567             :     /*
     568             :      * The parallel apply worker needs the xid in this message to decide
     569             :      * whether to define a savepoint, so save the original message that has
     570             :      * not moved the cursor after the xid. We will serialize this message to a
     571             :      * file in PARTIAL_SERIALIZE mode.
     572             :      */
     573      488652 :     original_msg = *s;
     574             : 
     575             :     /*
     576             :      * We should have received XID of the subxact as the first part of the
     577             :      * message, so extract it.
     578             :      */
     579      488652 :     current_xid = pq_getmsgint(s, 4);
     580             : 
     581      488652 :     if (!TransactionIdIsValid(current_xid))
     582           0 :         ereport(ERROR,
     583             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     584             :                  errmsg_internal("invalid transaction ID in streamed replication transaction")));
     585             : 
     586      488652 :     switch (apply_action)
     587             :     {
     588      205026 :         case TRANS_LEADER_SERIALIZE:
     589             :             Assert(stream_fd);
     590             : 
     591             :             /* Add the new subxact to the array (unless already there). */
     592      205026 :             subxact_info_add(current_xid);
     593             : 
     594             :             /* Write the change to the current file */
     595      205026 :             stream_write_change(action, s);
     596      205026 :             return true;
     597             : 
     598      136772 :         case TRANS_LEADER_SEND_TO_PARALLEL:
     599             :             Assert(winfo);
     600             : 
     601             :             /*
     602             :              * XXX The publisher side doesn't always send relation/type update
     603             :              * messages after the streaming transaction, so also update the
     604             :              * relation/type in leader apply worker. See function
     605             :              * cleanup_rel_sync_cache.
     606             :              */
     607      136772 :             if (pa_send_data(winfo, s->len, s->data))
     608      136772 :                 return (action != LOGICAL_REP_MSG_RELATION &&
     609             :                         action != LOGICAL_REP_MSG_TYPE);
     610             : 
     611             :             /*
     612             :              * Switch to serialize mode when we are not able to send the
     613             :              * change to parallel apply worker.
     614             :              */
     615           0 :             pa_switch_to_partial_serialize(winfo, false);
     616             : 
     617             :             /* fall through */
     618       10012 :         case TRANS_LEADER_PARTIAL_SERIALIZE:
     619       10012 :             stream_write_change(action, &original_msg);
     620             : 
     621             :             /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
     622       10012 :             return (action != LOGICAL_REP_MSG_RELATION &&
     623             :                     action != LOGICAL_REP_MSG_TYPE);
     624             : 
     625      136842 :         case TRANS_PARALLEL_APPLY:
     626      136842 :             parallel_stream_nchanges += 1;
     627             : 
     628             :             /* Define a savepoint for a subxact if needed. */
     629      136842 :             pa_start_subtrans(current_xid, stream_xid);
     630      136842 :             return false;
     631             : 
     632           0 :         default:
     633           0 :             elog(ERROR, "unexpected apply action: %d", (int) apply_action);
     634             :             return false;       /* silence compiler warning */
     635             :     }
     636             : }
     637             : 
     638             : /*
     639             :  * Executor state preparation for evaluation of constraint expressions,
     640             :  * indexes and triggers for the specified relation.
     641             :  *
     642             :  * Note that the caller must open and close any indexes to be updated.
     643             :  */
     644             : static ApplyExecutionData *
     645      296090 : create_edata_for_relation(LogicalRepRelMapEntry *rel)
     646             : {
     647             :     ApplyExecutionData *edata;
     648             :     EState     *estate;
     649             :     RangeTblEntry *rte;
     650      296090 :     List       *perminfos = NIL;
     651             :     ResultRelInfo *resultRelInfo;
     652             : 
     653      296090 :     edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
     654      296090 :     edata->targetRel = rel;
     655             : 
     656      296090 :     edata->estate = estate = CreateExecutorState();
     657             : 
     658      296090 :     rte = makeNode(RangeTblEntry);
     659      296090 :     rte->rtekind = RTE_RELATION;
     660      296090 :     rte->relid = RelationGetRelid(rel->localrel);
     661      296090 :     rte->relkind = rel->localrel->rd_rel->relkind;
     662      296090 :     rte->rellockmode = AccessShareLock;
     663             : 
     664      296090 :     addRTEPermissionInfo(&perminfos, rte);
     665             : 
     666      296090 :     ExecInitRangeTable(estate, list_make1(rte), perminfos,
     667             :                        bms_make_singleton(1));
     668             : 
     669      296090 :     edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
     670             : 
     671             :     /*
     672             :      * Use Relation opened by logicalrep_rel_open() instead of opening it
     673             :      * again.
     674             :      */
     675      296090 :     InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
     676             : 
     677             :     /*
     678             :      * We put the ResultRelInfo in the es_opened_result_relations list, even
     679             :      * though we don't populate the es_result_relations array.  That's a bit
     680             :      * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
     681             :      *
     682             :      * ExecOpenIndices() is not called here either, each execution path doing
     683             :      * an apply operation being responsible for that.
     684             :      */
     685      296090 :     estate->es_opened_result_relations =
     686      296090 :         lappend(estate->es_opened_result_relations, resultRelInfo);
     687             : 
     688      296090 :     estate->es_output_cid = GetCurrentCommandId(true);
     689             : 
     690             :     /* Prepare to catch AFTER triggers. */
     691      296090 :     AfterTriggerBeginQuery();
     692             : 
     693             :     /* other fields of edata remain NULL for now */
     694             : 
     695      296090 :     return edata;
     696             : }
     697             : 
     698             : /*
     699             :  * Finish any operations related to the executor state created by
     700             :  * create_edata_for_relation().
     701             :  */
     702             : static void
     703      296048 : finish_edata(ApplyExecutionData *edata)
     704             : {
     705      296048 :     EState     *estate = edata->estate;
     706             : 
     707             :     /* Handle any queued AFTER triggers. */
     708      296048 :     AfterTriggerEndQuery(estate);
     709             : 
     710             :     /* Shut down tuple routing, if any was done. */
     711      296048 :     if (edata->proute)
     712         148 :         ExecCleanupTupleRouting(edata->mtstate, edata->proute);
     713             : 
     714             :     /*
     715             :      * Cleanup.  It might seem that we should call ExecCloseResultRelations()
     716             :      * here, but we intentionally don't.  It would close the rel we added to
     717             :      * es_opened_result_relations above, which is wrong because we took no
     718             :      * corresponding refcount.  We rely on ExecCleanupTupleRouting() to close
     719             :      * any other relations opened during execution.
     720             :      */
     721      296048 :     ExecResetTupleTable(estate->es_tupleTable, false);
     722      296048 :     FreeExecutorState(estate);
     723      296048 :     pfree(edata);
     724      296048 : }
     725             : 
     726             : /*
     727             :  * Executes default values for columns for which we can't map to remote
     728             :  * relation columns.
     729             :  *
     730             :  * This allows us to support tables which have more columns on the downstream
     731             :  * than on the upstream.
     732             :  */
     733             : static void
     734      151586 : slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
     735             :                    TupleTableSlot *slot)
     736             : {
     737      151586 :     TupleDesc   desc = RelationGetDescr(rel->localrel);
     738      151586 :     int         num_phys_attrs = desc->natts;
     739             :     int         i;
     740             :     int         attnum,
     741      151586 :                 num_defaults = 0;
     742             :     int        *defmap;
     743             :     ExprState **defexprs;
     744             :     ExprContext *econtext;
     745             : 
     746      151586 :     econtext = GetPerTupleExprContext(estate);
     747             : 
     748             :     /* We got all the data via replication, no need to evaluate anything. */
     749      151586 :     if (num_phys_attrs == rel->remoterel.natts)
     750       71296 :         return;
     751             : 
     752       80290 :     defmap = (int *) palloc(num_phys_attrs * sizeof(int));
     753       80290 :     defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
     754             : 
     755             :     Assert(rel->attrmap->maplen == num_phys_attrs);
     756      421326 :     for (attnum = 0; attnum < num_phys_attrs; attnum++)
     757             :     {
     758             :         Expr       *defexpr;
     759             : 
     760      341036 :         if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
     761          18 :             continue;
     762             : 
     763      341018 :         if (rel->attrmap->attnums[attnum] >= 0)
     764      184536 :             continue;
     765             : 
     766      156482 :         defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
     767             : 
     768      156482 :         if (defexpr != NULL)
     769             :         {
     770             :             /* Run the expression through planner */
     771      140262 :             defexpr = expression_planner(defexpr);
     772             : 
     773             :             /* Initialize executable expression in copycontext */
     774      140262 :             defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
     775      140262 :             defmap[num_defaults] = attnum;
     776      140262 :             num_defaults++;
     777             :         }
     778             :     }
     779             : 
     780      220552 :     for (i = 0; i < num_defaults; i++)
     781      140262 :         slot->tts_values[defmap[i]] =
     782      140262 :             ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
     783             : }
     784             : 
     785             : /*
     786             :  * Store tuple data into slot.
     787             :  *
     788             :  * Incoming data can be either text or binary format.
     789             :  */
     790             : static void
     791      296108 : slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
     792             :                 LogicalRepTupleData *tupleData)
     793             : {
     794      296108 :     int         natts = slot->tts_tupleDescriptor->natts;
     795             :     int         i;
     796             : 
     797      296108 :     ExecClearTuple(slot);
     798             : 
     799             :     /* Call the "in" function for each non-dropped, non-null attribute */
     800             :     Assert(natts == rel->attrmap->maplen);
     801     1315154 :     for (i = 0; i < natts; i++)
     802             :     {
     803     1019046 :         Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
     804     1019046 :         int         remoteattnum = rel->attrmap->attnums[i];
     805             : 
     806     1019046 :         if (!att->attisdropped && remoteattnum >= 0)
     807      605212 :         {
     808      605212 :             StringInfo  colvalue = &tupleData->colvalues[remoteattnum];
     809             : 
     810             :             Assert(remoteattnum < tupleData->ncols);
     811             : 
     812             :             /* Set attnum for error callback */
     813      605212 :             apply_error_callback_arg.remote_attnum = remoteattnum;
     814             : 
     815      605212 :             if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
     816             :             {
     817             :                 Oid         typinput;
     818             :                 Oid         typioparam;
     819             : 
     820      284542 :                 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
     821      569084 :                 slot->tts_values[i] =
     822      284542 :                     OidInputFunctionCall(typinput, colvalue->data,
     823             :                                          typioparam, att->atttypmod);
     824      284542 :                 slot->tts_isnull[i] = false;
     825             :             }
     826      320670 :             else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
     827             :             {
     828             :                 Oid         typreceive;
     829             :                 Oid         typioparam;
     830             : 
     831             :                 /*
     832             :                  * In some code paths we may be asked to re-parse the same
     833             :                  * tuple data.  Reset the StringInfo's cursor so that works.
     834             :                  */
     835      220016 :                 colvalue->cursor = 0;
     836             : 
     837      220016 :                 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
     838      440032 :                 slot->tts_values[i] =
     839      220016 :                     OidReceiveFunctionCall(typreceive, colvalue,
     840             :                                            typioparam, att->atttypmod);
     841             : 
     842             :                 /* Trouble if it didn't eat the whole buffer */
     843      220016 :                 if (colvalue->cursor != colvalue->len)
     844           0 :                     ereport(ERROR,
     845             :                             (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
     846             :                              errmsg("incorrect binary data format in logical replication column %d",
     847             :                                     remoteattnum + 1)));
     848      220016 :                 slot->tts_isnull[i] = false;
     849             :             }
     850             :             else
     851             :             {
     852             :                 /*
     853             :                  * NULL value from remote.  (We don't expect to see
     854             :                  * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
     855             :                  * NULL.)
     856             :                  */
     857      100654 :                 slot->tts_values[i] = (Datum) 0;
     858      100654 :                 slot->tts_isnull[i] = true;
     859             :             }
     860             : 
     861             :             /* Reset attnum for error callback */
     862      605212 :             apply_error_callback_arg.remote_attnum = -1;
     863             :         }
     864             :         else
     865             :         {
     866             :             /*
     867             :              * We assign NULL to dropped attributes and missing values
     868             :              * (missing values should be later filled using
     869             :              * slot_fill_defaults).
     870             :              */
     871      413834 :             slot->tts_values[i] = (Datum) 0;
     872      413834 :             slot->tts_isnull[i] = true;
     873             :         }
     874             :     }
     875             : 
     876      296108 :     ExecStoreVirtualTuple(slot);
     877      296108 : }
     878             : 
     879             : /*
     880             :  * Replace updated columns with data from the LogicalRepTupleData struct.
     881             :  * This is somewhat similar to heap_modify_tuple but also calls the type
     882             :  * input functions on the user data.
     883             :  *
     884             :  * "slot" is filled with a copy of the tuple in "srcslot", replacing
     885             :  * columns provided in "tupleData" and leaving others as-is.
     886             :  *
     887             :  * Caution: unreplaced pass-by-ref columns in "slot" will point into the
     888             :  * storage for "srcslot".  This is OK for current usage, but someday we may
     889             :  * need to materialize "slot" at the end to make it independent of "srcslot".
     890             :  */
     891             : static void
     892       63848 : slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
     893             :                  LogicalRepRelMapEntry *rel,
     894             :                  LogicalRepTupleData *tupleData)
     895             : {
     896       63848 :     int         natts = slot->tts_tupleDescriptor->natts;
     897             :     int         i;
     898             : 
     899             :     /* We'll fill "slot" with a virtual tuple, so we must start with ... */
     900       63848 :     ExecClearTuple(slot);
     901             : 
     902             :     /*
     903             :      * Copy all the column data from srcslot, so that we'll have valid values
     904             :      * for unreplaced columns.
     905             :      */
     906             :     Assert(natts == srcslot->tts_tupleDescriptor->natts);
     907       63848 :     slot_getallattrs(srcslot);
     908       63848 :     memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
     909       63848 :     memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
     910             : 
     911             :     /* Call the "in" function for each replaced attribute */
     912             :     Assert(natts == rel->attrmap->maplen);
     913      318560 :     for (i = 0; i < natts; i++)
     914             :     {
     915      254712 :         Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
     916      254712 :         int         remoteattnum = rel->attrmap->attnums[i];
     917             : 
     918      254712 :         if (remoteattnum < 0)
     919      117038 :             continue;
     920             : 
     921             :         Assert(remoteattnum < tupleData->ncols);
     922             : 
     923      137674 :         if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
     924             :         {
     925      137668 :             StringInfo  colvalue = &tupleData->colvalues[remoteattnum];
     926             : 
     927             :             /* Set attnum for error callback */
     928      137668 :             apply_error_callback_arg.remote_attnum = remoteattnum;
     929             : 
     930      137668 :             if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
     931             :             {
     932             :                 Oid         typinput;
     933             :                 Oid         typioparam;
     934             : 
     935       50860 :                 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
     936      101720 :                 slot->tts_values[i] =
     937       50860 :                     OidInputFunctionCall(typinput, colvalue->data,
     938             :                                          typioparam, att->atttypmod);
     939       50860 :                 slot->tts_isnull[i] = false;
     940             :             }
     941       86808 :             else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
     942             :             {
     943             :                 Oid         typreceive;
     944             :                 Oid         typioparam;
     945             : 
     946             :                 /*
     947             :                  * In some code paths we may be asked to re-parse the same
     948             :                  * tuple data.  Reset the StringInfo's cursor so that works.
     949             :                  */
     950       86712 :                 colvalue->cursor = 0;
     951             : 
     952       86712 :                 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
     953      173424 :                 slot->tts_values[i] =
     954       86712 :                     OidReceiveFunctionCall(typreceive, colvalue,
     955             :                                            typioparam, att->atttypmod);
     956             : 
     957             :                 /* Trouble if it didn't eat the whole buffer */
     958       86712 :                 if (colvalue->cursor != colvalue->len)
     959           0 :                     ereport(ERROR,
     960             :                             (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
     961             :                              errmsg("incorrect binary data format in logical replication column %d",
     962             :                                     remoteattnum + 1)));
     963       86712 :                 slot->tts_isnull[i] = false;
     964             :             }
     965             :             else
     966             :             {
     967             :                 /* must be LOGICALREP_COLUMN_NULL */
     968          96 :                 slot->tts_values[i] = (Datum) 0;
     969          96 :                 slot->tts_isnull[i] = true;
     970             :             }
     971             : 
     972             :             /* Reset attnum for error callback */
     973      137668 :             apply_error_callback_arg.remote_attnum = -1;
     974             :         }
     975             :     }
     976             : 
     977             :     /* And finally, declare that "slot" contains a valid virtual tuple */
     978       63848 :     ExecStoreVirtualTuple(slot);
     979       63848 : }
     980             : 
     981             : /*
     982             :  * Handle BEGIN message.
     983             :  */
     984             : static void
     985         908 : apply_handle_begin(StringInfo s)
     986             : {
     987             :     LogicalRepBeginData begin_data;
     988             : 
     989             :     /* There must not be an active streaming transaction. */
     990             :     Assert(!TransactionIdIsValid(stream_xid));
     991             : 
     992         908 :     logicalrep_read_begin(s, &begin_data);
     993         908 :     set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
     994             : 
     995         908 :     remote_final_lsn = begin_data.final_lsn;
     996             : 
     997         908 :     maybe_start_skipping_changes(begin_data.final_lsn);
     998             : 
     999         908 :     in_remote_transaction = true;
    1000             : 
    1001         908 :     pgstat_report_activity(STATE_RUNNING, NULL);
    1002         908 : }
    1003             : 
    1004             : /*
    1005             :  * Handle COMMIT message.
    1006             :  *
    1007             :  * TODO, support tracking of multiple origins
    1008             :  */
    1009             : static void
    1010         848 : apply_handle_commit(StringInfo s)
    1011             : {
    1012             :     LogicalRepCommitData commit_data;
    1013             : 
    1014         848 :     logicalrep_read_commit(s, &commit_data);
    1015             : 
    1016         848 :     if (commit_data.commit_lsn != remote_final_lsn)
    1017           0 :         ereport(ERROR,
    1018             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1019             :                  errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
    1020             :                                  LSN_FORMAT_ARGS(commit_data.commit_lsn),
    1021             :                                  LSN_FORMAT_ARGS(remote_final_lsn))));
    1022             : 
    1023         848 :     apply_handle_commit_internal(&commit_data);
    1024             : 
    1025             :     /* Process any tables that are being synchronized in parallel. */
    1026         848 :     process_syncing_tables(commit_data.end_lsn);
    1027             : 
    1028         848 :     pgstat_report_activity(STATE_IDLE, NULL);
    1029         848 :     reset_apply_error_context_info();
    1030         848 : }
    1031             : 
    1032             : /*
    1033             :  * Handle BEGIN PREPARE message.
    1034             :  */
    1035             : static void
    1036          32 : apply_handle_begin_prepare(StringInfo s)
    1037             : {
    1038             :     LogicalRepPreparedTxnData begin_data;
    1039             : 
    1040             :     /* Tablesync should never receive prepare. */
    1041          32 :     if (am_tablesync_worker())
    1042           0 :         ereport(ERROR,
    1043             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1044             :                  errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
    1045             : 
    1046             :     /* There must not be an active streaming transaction. */
    1047             :     Assert(!TransactionIdIsValid(stream_xid));
    1048             : 
    1049          32 :     logicalrep_read_begin_prepare(s, &begin_data);
    1050          32 :     set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
    1051             : 
    1052          32 :     remote_final_lsn = begin_data.prepare_lsn;
    1053             : 
    1054          32 :     maybe_start_skipping_changes(begin_data.prepare_lsn);
    1055             : 
    1056          32 :     in_remote_transaction = true;
    1057             : 
    1058          32 :     pgstat_report_activity(STATE_RUNNING, NULL);
    1059          32 : }
    1060             : 
    1061             : /*
    1062             :  * Common function to prepare the GID.
    1063             :  */
    1064             : static void
    1065          46 : apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
    1066             : {
    1067             :     char        gid[GIDSIZE];
    1068             : 
    1069             :     /*
    1070             :      * Compute unique GID for two_phase transactions. We don't use GID of
    1071             :      * prepared transaction sent by server as that can lead to deadlock when
    1072             :      * we have multiple subscriptions from same node point to publications on
    1073             :      * the same node. See comments atop worker.c
    1074             :      */
    1075          46 :     TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
    1076             :                            gid, sizeof(gid));
    1077             : 
    1078             :     /*
    1079             :      * BeginTransactionBlock is necessary to balance the EndTransactionBlock
    1080             :      * called within the PrepareTransactionBlock below.
    1081             :      */
    1082          46 :     if (!IsTransactionBlock())
    1083             :     {
    1084          46 :         BeginTransactionBlock();
    1085          46 :         CommitTransactionCommand(); /* Completes the preceding Begin command. */
    1086             :     }
    1087             : 
    1088             :     /*
    1089             :      * Update origin state so we can restart streaming from correct position
    1090             :      * in case of crash.
    1091             :      */
    1092          46 :     replorigin_session_origin_lsn = prepare_data->end_lsn;
    1093          46 :     replorigin_session_origin_timestamp = prepare_data->prepare_time;
    1094             : 
    1095          46 :     PrepareTransactionBlock(gid);
    1096          46 : }
    1097             : 
    1098             : /*
    1099             :  * Handle PREPARE message.
    1100             :  */
    1101             : static void
    1102          30 : apply_handle_prepare(StringInfo s)
    1103             : {
    1104             :     LogicalRepPreparedTxnData prepare_data;
    1105             : 
    1106          30 :     logicalrep_read_prepare(s, &prepare_data);
    1107             : 
    1108          30 :     if (prepare_data.prepare_lsn != remote_final_lsn)
    1109           0 :         ereport(ERROR,
    1110             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1111             :                  errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
    1112             :                                  LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
    1113             :                                  LSN_FORMAT_ARGS(remote_final_lsn))));
    1114             : 
    1115             :     /*
    1116             :      * Unlike commit, here, we always prepare the transaction even though no
    1117             :      * change has happened in this transaction or all changes are skipped. It
    1118             :      * is done this way because at commit prepared time, we won't know whether
    1119             :      * we have skipped preparing a transaction because of those reasons.
    1120             :      *
    1121             :      * XXX, We can optimize such that at commit prepared time, we first check
    1122             :      * whether we have prepared the transaction or not but that doesn't seem
    1123             :      * worthwhile because such cases shouldn't be common.
    1124             :      */
    1125          30 :     begin_replication_step();
    1126             : 
    1127          30 :     apply_handle_prepare_internal(&prepare_data);
    1128             : 
    1129          30 :     end_replication_step();
    1130          30 :     CommitTransactionCommand();
    1131          28 :     pgstat_report_stat(false);
    1132             : 
    1133             :     /*
    1134             :      * It is okay not to set the local_end LSN for the prepare because we
    1135             :      * always flush the prepare record. So, we can send the acknowledgment of
    1136             :      * the remote_end LSN as soon as prepare is finished.
    1137             :      *
    1138             :      * XXX For the sake of consistency with commit, we could have set it with
    1139             :      * the LSN of prepare but as of now we don't track that value similar to
    1140             :      * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
    1141             :      * it.
    1142             :      */
    1143          28 :     store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
    1144             : 
    1145          28 :     in_remote_transaction = false;
    1146             : 
    1147             :     /* Process any tables that are being synchronized in parallel. */
    1148          28 :     process_syncing_tables(prepare_data.end_lsn);
    1149             : 
    1150             :     /*
    1151             :      * Since we have already prepared the transaction, in a case where the
    1152             :      * server crashes before clearing the subskiplsn, it will be left but the
    1153             :      * transaction won't be resent. But that's okay because it's a rare case
    1154             :      * and the subskiplsn will be cleared when finishing the next transaction.
    1155             :      */
    1156          28 :     stop_skipping_changes();
    1157          28 :     clear_subscription_skip_lsn(prepare_data.prepare_lsn);
    1158             : 
    1159          28 :     pgstat_report_activity(STATE_IDLE, NULL);
    1160          28 :     reset_apply_error_context_info();
    1161          28 : }
    1162             : 
    1163             : /*
    1164             :  * Handle a COMMIT PREPARED of a previously PREPARED transaction.
    1165             :  *
    1166             :  * Note that we don't need to wait here if the transaction was prepared in a
    1167             :  * parallel apply worker. In that case, we have already waited for the prepare
    1168             :  * to finish in apply_handle_stream_prepare() which will ensure all the
    1169             :  * operations in that transaction have happened in the subscriber, so no
    1170             :  * concurrent transaction can cause deadlock or transaction dependency issues.
    1171             :  */
    1172             : static void
    1173          40 : apply_handle_commit_prepared(StringInfo s)
    1174             : {
    1175             :     LogicalRepCommitPreparedTxnData prepare_data;
    1176             :     char        gid[GIDSIZE];
    1177             : 
    1178          40 :     logicalrep_read_commit_prepared(s, &prepare_data);
    1179          40 :     set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
    1180             : 
    1181             :     /* Compute GID for two_phase transactions. */
    1182          40 :     TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
    1183             :                            gid, sizeof(gid));
    1184             : 
    1185             :     /* There is no transaction when COMMIT PREPARED is called */
    1186          40 :     begin_replication_step();
    1187             : 
    1188             :     /*
    1189             :      * Update origin state so we can restart streaming from correct position
    1190             :      * in case of crash.
    1191             :      */
    1192          40 :     replorigin_session_origin_lsn = prepare_data.end_lsn;
    1193          40 :     replorigin_session_origin_timestamp = prepare_data.commit_time;
    1194             : 
    1195          40 :     FinishPreparedTransaction(gid, true);
    1196          40 :     end_replication_step();
    1197          40 :     CommitTransactionCommand();
    1198          40 :     pgstat_report_stat(false);
    1199             : 
    1200          40 :     store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
    1201          40 :     in_remote_transaction = false;
    1202             : 
    1203             :     /* Process any tables that are being synchronized in parallel. */
    1204          40 :     process_syncing_tables(prepare_data.end_lsn);
    1205             : 
    1206          40 :     clear_subscription_skip_lsn(prepare_data.end_lsn);
    1207             : 
    1208          40 :     pgstat_report_activity(STATE_IDLE, NULL);
    1209          40 :     reset_apply_error_context_info();
    1210          40 : }
    1211             : 
    1212             : /*
    1213             :  * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
    1214             :  *
    1215             :  * Note that we don't need to wait here if the transaction was prepared in a
    1216             :  * parallel apply worker. In that case, we have already waited for the prepare
    1217             :  * to finish in apply_handle_stream_prepare() which will ensure all the
    1218             :  * operations in that transaction have happened in the subscriber, so no
    1219             :  * concurrent transaction can cause deadlock or transaction dependency issues.
    1220             :  */
    1221             : static void
    1222          10 : apply_handle_rollback_prepared(StringInfo s)
    1223             : {
    1224             :     LogicalRepRollbackPreparedTxnData rollback_data;
    1225             :     char        gid[GIDSIZE];
    1226             : 
    1227          10 :     logicalrep_read_rollback_prepared(s, &rollback_data);
    1228          10 :     set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
    1229             : 
    1230             :     /* Compute GID for two_phase transactions. */
    1231          10 :     TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
    1232             :                            gid, sizeof(gid));
    1233             : 
    1234             :     /*
    1235             :      * It is possible that we haven't received prepare because it occurred
    1236             :      * before walsender reached a consistent point or the two_phase was still
    1237             :      * not enabled by that time, so in such cases, we need to skip rollback
    1238             :      * prepared.
    1239             :      */
    1240          10 :     if (LookupGXact(gid, rollback_data.prepare_end_lsn,
    1241             :                     rollback_data.prepare_time))
    1242             :     {
    1243             :         /*
    1244             :          * Update origin state so we can restart streaming from correct
    1245             :          * position in case of crash.
    1246             :          */
    1247          10 :         replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
    1248          10 :         replorigin_session_origin_timestamp = rollback_data.rollback_time;
    1249             : 
    1250             :         /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
    1251          10 :         begin_replication_step();
    1252          10 :         FinishPreparedTransaction(gid, false);
    1253          10 :         end_replication_step();
    1254          10 :         CommitTransactionCommand();
    1255             : 
    1256          10 :         clear_subscription_skip_lsn(rollback_data.rollback_end_lsn);
    1257             :     }
    1258             : 
    1259          10 :     pgstat_report_stat(false);
    1260             : 
    1261             :     /*
    1262             :      * It is okay not to set the local_end LSN for the rollback of prepared
    1263             :      * transaction because we always flush the WAL record for it. See
    1264             :      * apply_handle_prepare.
    1265             :      */
    1266          10 :     store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr);
    1267          10 :     in_remote_transaction = false;
    1268             : 
    1269             :     /* Process any tables that are being synchronized in parallel. */
    1270          10 :     process_syncing_tables(rollback_data.rollback_end_lsn);
    1271             : 
    1272          10 :     pgstat_report_activity(STATE_IDLE, NULL);
    1273          10 :     reset_apply_error_context_info();
    1274          10 : }
    1275             : 
    1276             : /*
    1277             :  * Handle STREAM PREPARE.
    1278             :  */
    1279             : static void
    1280          22 : apply_handle_stream_prepare(StringInfo s)
    1281             : {
    1282             :     LogicalRepPreparedTxnData prepare_data;
    1283             :     ParallelApplyWorkerInfo *winfo;
    1284             :     TransApplyAction apply_action;
    1285             : 
    1286             :     /* Save the message before it is consumed. */
    1287          22 :     StringInfoData original_msg = *s;
    1288             : 
    1289          22 :     if (in_streamed_transaction)
    1290           0 :         ereport(ERROR,
    1291             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1292             :                  errmsg_internal("STREAM PREPARE message without STREAM STOP")));
    1293             : 
    1294             :     /* Tablesync should never receive prepare. */
    1295          22 :     if (am_tablesync_worker())
    1296           0 :         ereport(ERROR,
    1297             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1298             :                  errmsg_internal("tablesync worker received a STREAM PREPARE message")));
    1299             : 
    1300          22 :     logicalrep_read_stream_prepare(s, &prepare_data);
    1301          22 :     set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
    1302             : 
    1303          22 :     apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
    1304             : 
    1305          22 :     switch (apply_action)
    1306             :     {
    1307          10 :         case TRANS_LEADER_APPLY:
    1308             : 
    1309             :             /*
    1310             :              * The transaction has been serialized to file, so replay all the
    1311             :              * spooled operations.
    1312             :              */
    1313          10 :             apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
    1314             :                                    prepare_data.xid, prepare_data.prepare_lsn);
    1315             : 
    1316             :             /* Mark the transaction as prepared. */
    1317          10 :             apply_handle_prepare_internal(&prepare_data);
    1318             : 
    1319          10 :             CommitTransactionCommand();
    1320             : 
    1321             :             /*
    1322             :              * It is okay not to set the local_end LSN for the prepare because
    1323             :              * we always flush the prepare record. See apply_handle_prepare.
    1324             :              */
    1325          10 :             store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
    1326             : 
    1327          10 :             in_remote_transaction = false;
    1328             : 
    1329             :             /* Unlink the files with serialized changes and subxact info. */
    1330          10 :             stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
    1331             : 
    1332          10 :             elog(DEBUG1, "finished processing the STREAM PREPARE command");
    1333          10 :             break;
    1334             : 
    1335           4 :         case TRANS_LEADER_SEND_TO_PARALLEL:
    1336             :             Assert(winfo);
    1337             : 
    1338           4 :             if (pa_send_data(winfo, s->len, s->data))
    1339             :             {
    1340             :                 /* Finish processing the streaming transaction. */
    1341           4 :                 pa_xact_finish(winfo, prepare_data.end_lsn);
    1342           4 :                 break;
    1343             :             }
    1344             : 
    1345             :             /*
    1346             :              * Switch to serialize mode when we are not able to send the
    1347             :              * change to parallel apply worker.
    1348             :              */
    1349           0 :             pa_switch_to_partial_serialize(winfo, true);
    1350             : 
    1351             :             /* fall through */
    1352           2 :         case TRANS_LEADER_PARTIAL_SERIALIZE:
    1353             :             Assert(winfo);
    1354             : 
    1355           2 :             stream_open_and_write_change(prepare_data.xid,
    1356             :                                          LOGICAL_REP_MSG_STREAM_PREPARE,
    1357             :                                          &original_msg);
    1358             : 
    1359           2 :             pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
    1360             : 
    1361             :             /* Finish processing the streaming transaction. */
    1362           2 :             pa_xact_finish(winfo, prepare_data.end_lsn);
    1363           2 :             break;
    1364             : 
    1365           6 :         case TRANS_PARALLEL_APPLY:
    1366             : 
    1367             :             /*
    1368             :              * If the parallel apply worker is applying spooled messages then
    1369             :              * close the file before preparing.
    1370             :              */
    1371           6 :             if (stream_fd)
    1372           2 :                 stream_close_file();
    1373             : 
    1374           6 :             begin_replication_step();
    1375             : 
    1376             :             /* Mark the transaction as prepared. */
    1377           6 :             apply_handle_prepare_internal(&prepare_data);
    1378             : 
    1379           6 :             end_replication_step();
    1380             : 
    1381           6 :             CommitTransactionCommand();
    1382             : 
    1383             :             /*
    1384             :              * It is okay not to set the local_end LSN for the prepare because
    1385             :              * we always flush the prepare record. See apply_handle_prepare.
    1386             :              */
    1387           6 :             MyParallelShared->last_commit_end = InvalidXLogRecPtr;
    1388             : 
    1389           6 :             pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
    1390           6 :             pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock);
    1391             : 
    1392           6 :             pa_reset_subtrans();
    1393             : 
    1394           6 :             elog(DEBUG1, "finished processing the STREAM PREPARE command");
    1395           6 :             break;
    1396             : 
    1397           0 :         default:
    1398           0 :             elog(ERROR, "unexpected apply action: %d", (int) apply_action);
    1399             :             break;
    1400             :     }
    1401             : 
    1402          22 :     pgstat_report_stat(false);
    1403             : 
    1404             :     /* Process any tables that are being synchronized in parallel. */
    1405          22 :     process_syncing_tables(prepare_data.end_lsn);
    1406             : 
    1407             :     /*
    1408             :      * Similar to prepare case, the subskiplsn could be left in a case of
    1409             :      * server crash but it's okay. See the comments in apply_handle_prepare().
    1410             :      */
    1411          22 :     stop_skipping_changes();
    1412          22 :     clear_subscription_skip_lsn(prepare_data.prepare_lsn);
    1413             : 
    1414          22 :     pgstat_report_activity(STATE_IDLE, NULL);
    1415             : 
    1416          22 :     reset_apply_error_context_info();
    1417          22 : }
    1418             : 
    1419             : /*
    1420             :  * Handle ORIGIN message.
    1421             :  *
    1422             :  * TODO, support tracking of multiple origins
    1423             :  */
    1424             : static void
    1425          16 : apply_handle_origin(StringInfo s)
    1426             : {
    1427             :     /*
    1428             :      * ORIGIN message can only come inside streaming transaction or inside
    1429             :      * remote transaction and before any actual writes.
    1430             :      */
    1431          16 :     if (!in_streamed_transaction &&
    1432          24 :         (!in_remote_transaction ||
    1433          12 :          (IsTransactionState() && !am_tablesync_worker())))
    1434           0 :         ereport(ERROR,
    1435             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1436             :                  errmsg_internal("ORIGIN message sent out of order")));
    1437          16 : }
    1438             : 
    1439             : /*
    1440             :  * Initialize fileset (if not already done).
    1441             :  *
    1442             :  * Create a new file when first_segment is true, otherwise open the existing
    1443             :  * file.
    1444             :  */
    1445             : void
    1446         726 : stream_start_internal(TransactionId xid, bool first_segment)
    1447             : {
    1448         726 :     begin_replication_step();
    1449             : 
    1450             :     /*
    1451             :      * Initialize the worker's stream_fileset if we haven't yet. This will be
    1452             :      * used for the entire duration of the worker so create it in a permanent
    1453             :      * context. We create this on the very first streaming message from any
    1454             :      * transaction and then use it for this and other streaming transactions.
    1455             :      * Now, we could create a fileset at the start of the worker as well but
    1456             :      * then we won't be sure that it will ever be used.
    1457             :      */
    1458         726 :     if (!MyLogicalRepWorker->stream_fileset)
    1459             :     {
    1460             :         MemoryContext oldctx;
    1461             : 
    1462          28 :         oldctx = MemoryContextSwitchTo(ApplyContext);
    1463             : 
    1464          28 :         MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
    1465          28 :         FileSetInit(MyLogicalRepWorker->stream_fileset);
    1466             : 
    1467          28 :         MemoryContextSwitchTo(oldctx);
    1468             :     }
    1469             : 
    1470             :     /* Open the spool file for this transaction. */
    1471         726 :     stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
    1472             : 
    1473             :     /* If this is not the first segment, open existing subxact file. */
    1474         726 :     if (!first_segment)
    1475         662 :         subxact_info_read(MyLogicalRepWorker->subid, xid);
    1476             : 
    1477         726 :     end_replication_step();
    1478         726 : }
    1479             : 
    1480             : /*
    1481             :  * Handle STREAM START message.
    1482             :  */
    1483             : static void
    1484        1674 : apply_handle_stream_start(StringInfo s)
    1485             : {
    1486             :     bool        first_segment;
    1487             :     ParallelApplyWorkerInfo *winfo;
    1488             :     TransApplyAction apply_action;
    1489             : 
    1490             :     /* Save the message before it is consumed. */
    1491        1674 :     StringInfoData original_msg = *s;
    1492             : 
    1493        1674 :     if (in_streamed_transaction)
    1494           0 :         ereport(ERROR,
    1495             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1496             :                  errmsg_internal("duplicate STREAM START message")));
    1497             : 
    1498             :     /* There must not be an active streaming transaction. */
    1499             :     Assert(!TransactionIdIsValid(stream_xid));
    1500             : 
    1501             :     /* notify handle methods we're processing a remote transaction */
    1502        1674 :     in_streamed_transaction = true;
    1503             : 
    1504             :     /* extract XID of the top-level transaction */
    1505        1674 :     stream_xid = logicalrep_read_stream_start(s, &first_segment);
    1506             : 
    1507        1674 :     if (!TransactionIdIsValid(stream_xid))
    1508           0 :         ereport(ERROR,
    1509             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1510             :                  errmsg_internal("invalid transaction ID in streamed replication transaction")));
    1511             : 
    1512        1674 :     set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
    1513             : 
    1514             :     /* Try to allocate a worker for the streaming transaction. */
    1515        1674 :     if (first_segment)
    1516         164 :         pa_allocate_worker(stream_xid);
    1517             : 
    1518        1674 :     apply_action = get_transaction_apply_action(stream_xid, &winfo);
    1519             : 
    1520        1674 :     switch (apply_action)
    1521             :     {
    1522         686 :         case TRANS_LEADER_SERIALIZE:
    1523             : 
    1524             :             /*
    1525             :              * Function stream_start_internal starts a transaction. This
    1526             :              * transaction will be committed on the stream stop unless it is a
    1527             :              * tablesync worker in which case it will be committed after
    1528             :              * processing all the messages. We need this transaction for
    1529             :              * handling the BufFile, used for serializing the streaming data
    1530             :              * and subxact info.
    1531             :              */
    1532         686 :             stream_start_internal(stream_xid, first_segment);
    1533         686 :             break;
    1534             : 
    1535         482 :         case TRANS_LEADER_SEND_TO_PARALLEL:
    1536             :             Assert(winfo);
    1537             : 
    1538             :             /*
    1539             :              * Once we start serializing the changes, the parallel apply
    1540             :              * worker will wait for the leader to release the stream lock
    1541             :              * until the end of the transaction. So, we don't need to release
    1542             :              * the lock or increment the stream count in that case.
    1543             :              */
    1544         482 :             if (pa_send_data(winfo, s->len, s->data))
    1545             :             {
    1546             :                 /*
    1547             :                  * Unlock the shared object lock so that the parallel apply
    1548             :                  * worker can continue to receive changes.
    1549             :                  */
    1550         474 :                 if (!first_segment)
    1551         428 :                     pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
    1552             : 
    1553             :                 /*
    1554             :                  * Increment the number of streaming blocks waiting to be
    1555             :                  * processed by parallel apply worker.
    1556             :                  */
    1557         474 :                 pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
    1558             : 
    1559             :                 /* Cache the parallel apply worker for this transaction. */
    1560         474 :                 pa_set_stream_apply_worker(winfo);
    1561         474 :                 break;
    1562             :             }
    1563             : 
    1564             :             /*
    1565             :              * Switch to serialize mode when we are not able to send the
    1566             :              * change to parallel apply worker.
    1567             :              */
    1568           8 :             pa_switch_to_partial_serialize(winfo, !first_segment);
    1569             : 
    1570             :             /* fall through */
    1571          30 :         case TRANS_LEADER_PARTIAL_SERIALIZE:
    1572             :             Assert(winfo);
    1573             : 
    1574             :             /*
    1575             :              * Open the spool file unless it was already opened when switching
    1576             :              * to serialize mode. The transaction started in
    1577             :              * stream_start_internal will be committed on the stream stop.
    1578             :              */
    1579          30 :             if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
    1580          22 :                 stream_start_internal(stream_xid, first_segment);
    1581             : 
    1582          30 :             stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg);
    1583             : 
    1584             :             /* Cache the parallel apply worker for this transaction. */
    1585          30 :             pa_set_stream_apply_worker(winfo);
    1586          30 :             break;
    1587             : 
    1588         484 :         case TRANS_PARALLEL_APPLY:
    1589         484 :             if (first_segment)
    1590             :             {
    1591             :                 /* Hold the lock until the end of the transaction. */
    1592          54 :                 pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock);
    1593          54 :                 pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED);
    1594             : 
    1595             :                 /*
    1596             :                  * Signal the leader apply worker, as it may be waiting for
    1597             :                  * us.
    1598             :                  */
    1599          54 :                 logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
    1600             :             }
    1601             : 
    1602         484 :             parallel_stream_nchanges = 0;
    1603         484 :             break;
    1604             : 
    1605           0 :         default:
    1606           0 :             elog(ERROR, "unexpected apply action: %d", (int) apply_action);
    1607             :             break;
    1608             :     }
    1609             : 
    1610        1674 :     pgstat_report_activity(STATE_RUNNING, NULL);
    1611        1674 : }
    1612             : 
    1613             : /*
    1614             :  * Update the information about subxacts and close the file.
    1615             :  *
    1616             :  * This function should be called when the stream_start_internal function has
    1617             :  * been called.
    1618             :  */
    1619             : void
    1620         726 : stream_stop_internal(TransactionId xid)
    1621             : {
    1622             :     /*
    1623             :      * Serialize information about subxacts for the toplevel transaction, then
    1624             :      * close the stream messages spool file.
    1625             :      */
    1626         726 :     subxact_info_write(MyLogicalRepWorker->subid, xid);
    1627         726 :     stream_close_file();
    1628             : 
    1629             :     /* We must be in a valid transaction state */
    1630             :     Assert(IsTransactionState());
    1631             : 
    1632             :     /* Commit the per-stream transaction */
    1633         726 :     CommitTransactionCommand();
    1634             : 
    1635             :     /* Reset per-stream context */
    1636         726 :     MemoryContextReset(LogicalStreamingContext);
    1637         726 : }
    1638             : 
    1639             : /*
    1640             :  * Handle STREAM STOP message.
    1641             :  */
    1642             : static void
    1643        1672 : apply_handle_stream_stop(StringInfo s)
    1644             : {
    1645             :     ParallelApplyWorkerInfo *winfo;
    1646             :     TransApplyAction apply_action;
    1647             : 
    1648        1672 :     if (!in_streamed_transaction)
    1649           0 :         ereport(ERROR,
    1650             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1651             :                  errmsg_internal("STREAM STOP message without STREAM START")));
    1652             : 
    1653        1672 :     apply_action = get_transaction_apply_action(stream_xid, &winfo);
    1654             : 
    1655        1672 :     switch (apply_action)
    1656             :     {
    1657         686 :         case TRANS_LEADER_SERIALIZE:
    1658         686 :             stream_stop_internal(stream_xid);
    1659         686 :             break;
    1660             : 
    1661         474 :         case TRANS_LEADER_SEND_TO_PARALLEL:
    1662             :             Assert(winfo);
    1663             : 
    1664             :             /*
    1665             :              * Lock before sending the STREAM_STOP message so that the leader
    1666             :              * can hold the lock first and the parallel apply worker will wait
    1667             :              * for leader to release the lock. See Locking Considerations atop
    1668             :              * applyparallelworker.c.
    1669             :              */
    1670         474 :             pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
    1671             : 
    1672         474 :             if (pa_send_data(winfo, s->len, s->data))
    1673             :             {
    1674         474 :                 pa_set_stream_apply_worker(NULL);
    1675         474 :                 break;
    1676             :             }
    1677             : 
    1678             :             /*
    1679             :              * Switch to serialize mode when we are not able to send the
    1680             :              * change to parallel apply worker.
    1681             :              */
    1682           0 :             pa_switch_to_partial_serialize(winfo, true);
    1683             : 
    1684             :             /* fall through */
    1685          30 :         case TRANS_LEADER_PARTIAL_SERIALIZE:
    1686          30 :             stream_write_change(LOGICAL_REP_MSG_STREAM_STOP, s);
    1687          30 :             stream_stop_internal(stream_xid);
    1688          30 :             pa_set_stream_apply_worker(NULL);
    1689          30 :             break;
    1690             : 
    1691         482 :         case TRANS_PARALLEL_APPLY:
    1692         482 :             elog(DEBUG1, "applied %u changes in the streaming chunk",
    1693             :                  parallel_stream_nchanges);
    1694             : 
    1695             :             /*
    1696             :              * By the time parallel apply worker is processing the changes in
    1697             :              * the current streaming block, the leader apply worker may have
    1698             :              * sent multiple streaming blocks. This can lead to parallel apply
    1699             :              * worker start waiting even when there are more chunk of streams
    1700             :              * in the queue. So, try to lock only if there is no message left
    1701             :              * in the queue. See Locking Considerations atop
    1702             :              * applyparallelworker.c.
    1703             :              *
    1704             :              * Note that here we have a race condition where we can start
    1705             :              * waiting even when there are pending streaming chunks. This can
    1706             :              * happen if the leader sends another streaming block and acquires
    1707             :              * the stream lock again after the parallel apply worker checks
    1708             :              * that there is no pending streaming block and before it actually
    1709             :              * starts waiting on a lock. We can handle this case by not
    1710             :              * allowing the leader to increment the stream block count during
    1711             :              * the time parallel apply worker acquires the lock but it is not
    1712             :              * clear whether that is worth the complexity.
    1713             :              *
    1714             :              * Now, if this missed chunk contains rollback to savepoint, then
    1715             :              * there is a risk of deadlock which probably shouldn't happen
    1716             :              * after restart.
    1717             :              */
    1718         482 :             pa_decr_and_wait_stream_block();
    1719         478 :             break;
    1720             : 
    1721           0 :         default:
    1722           0 :             elog(ERROR, "unexpected apply action: %d", (int) apply_action);
    1723             :             break;
    1724             :     }
    1725             : 
    1726        1668 :     in_streamed_transaction = false;
    1727        1668 :     stream_xid = InvalidTransactionId;
    1728             : 
    1729             :     /*
    1730             :      * The parallel apply worker could be in a transaction in which case we
    1731             :      * need to report the state as STATE_IDLEINTRANSACTION.
    1732             :      */
    1733        1668 :     if (IsTransactionOrTransactionBlock())
    1734         478 :         pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL);
    1735             :     else
    1736        1190 :         pgstat_report_activity(STATE_IDLE, NULL);
    1737             : 
    1738        1668 :     reset_apply_error_context_info();
    1739        1668 : }
    1740             : 
    1741             : /*
    1742             :  * Helper function to handle STREAM ABORT message when the transaction was
    1743             :  * serialized to file.
    1744             :  */
    1745             : static void
    1746          28 : stream_abort_internal(TransactionId xid, TransactionId subxid)
    1747             : {
    1748             :     /*
    1749             :      * If the two XIDs are the same, it's in fact abort of toplevel xact, so
    1750             :      * just delete the files with serialized info.
    1751             :      */
    1752          28 :     if (xid == subxid)
    1753           2 :         stream_cleanup_files(MyLogicalRepWorker->subid, xid);
    1754             :     else
    1755             :     {
    1756             :         /*
    1757             :          * OK, so it's a subxact. We need to read the subxact file for the
    1758             :          * toplevel transaction, determine the offset tracked for the subxact,
    1759             :          * and truncate the file with changes. We also remove the subxacts
    1760             :          * with higher offsets (or rather higher XIDs).
    1761             :          *
    1762             :          * We intentionally scan the array from the tail, because we're likely
    1763             :          * aborting a change for the most recent subtransactions.
    1764             :          *
    1765             :          * We can't use the binary search here as subxact XIDs won't
    1766             :          * necessarily arrive in sorted order, consider the case where we have
    1767             :          * released the savepoint for multiple subtransactions and then
    1768             :          * performed rollback to savepoint for one of the earlier
    1769             :          * sub-transaction.
    1770             :          */
    1771             :         int64       i;
    1772             :         int64       subidx;
    1773             :         BufFile    *fd;
    1774          26 :         bool        found = false;
    1775             :         char        path[MAXPGPATH];
    1776             : 
    1777          26 :         subidx = -1;
    1778          26 :         begin_replication_step();
    1779          26 :         subxact_info_read(MyLogicalRepWorker->subid, xid);
    1780             : 
    1781          30 :         for (i = subxact_data.nsubxacts; i > 0; i--)
    1782             :         {
    1783          22 :             if (subxact_data.subxacts[i - 1].xid == subxid)
    1784             :             {
    1785          18 :                 subidx = (i - 1);
    1786          18 :                 found = true;
    1787          18 :                 break;
    1788             :             }
    1789             :         }
    1790             : 
    1791             :         /*
    1792             :          * If it's an empty sub-transaction then we will not find the subxid
    1793             :          * here so just cleanup the subxact info and return.
    1794             :          */
    1795          26 :         if (!found)
    1796             :         {
    1797             :             /* Cleanup the subxact info */
    1798           8 :             cleanup_subxact_info();
    1799           8 :             end_replication_step();
    1800           8 :             CommitTransactionCommand();
    1801           8 :             return;
    1802             :         }
    1803             : 
    1804             :         /* open the changes file */
    1805          18 :         changes_filename(path, MyLogicalRepWorker->subid, xid);
    1806          18 :         fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
    1807             :                                 O_RDWR, false);
    1808             : 
    1809             :         /* OK, truncate the file at the right offset */
    1810          18 :         BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
    1811          18 :                                subxact_data.subxacts[subidx].offset);
    1812          18 :         BufFileClose(fd);
    1813             : 
    1814             :         /* discard the subxacts added later */
    1815          18 :         subxact_data.nsubxacts = subidx;
    1816             : 
    1817             :         /* write the updated subxact list */
    1818          18 :         subxact_info_write(MyLogicalRepWorker->subid, xid);
    1819             : 
    1820          18 :         end_replication_step();
    1821          18 :         CommitTransactionCommand();
    1822             :     }
    1823             : }
    1824             : 
    1825             : /*
    1826             :  * Handle STREAM ABORT message.
    1827             :  */
    1828             : static void
    1829          76 : apply_handle_stream_abort(StringInfo s)
    1830             : {
    1831             :     TransactionId xid;
    1832             :     TransactionId subxid;
    1833             :     LogicalRepStreamAbortData abort_data;
    1834             :     ParallelApplyWorkerInfo *winfo;
    1835             :     TransApplyAction apply_action;
    1836             : 
    1837             :     /* Save the message before it is consumed. */
    1838          76 :     StringInfoData original_msg = *s;
    1839             :     bool        toplevel_xact;
    1840             : 
    1841          76 :     if (in_streamed_transaction)
    1842           0 :         ereport(ERROR,
    1843             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1844             :                  errmsg_internal("STREAM ABORT message without STREAM STOP")));
    1845             : 
    1846             :     /* We receive abort information only when we can apply in parallel. */
    1847          76 :     logicalrep_read_stream_abort(s, &abort_data,
    1848          76 :                                  MyLogicalRepWorker->parallel_apply);
    1849             : 
    1850          76 :     xid = abort_data.xid;
    1851          76 :     subxid = abort_data.subxid;
    1852          76 :     toplevel_xact = (xid == subxid);
    1853             : 
    1854          76 :     set_apply_error_context_xact(subxid, abort_data.abort_lsn);
    1855             : 
    1856          76 :     apply_action = get_transaction_apply_action(xid, &winfo);
    1857             : 
    1858          76 :     switch (apply_action)
    1859             :     {
    1860          28 :         case TRANS_LEADER_APPLY:
    1861             : 
    1862             :             /*
    1863             :              * We are in the leader apply worker and the transaction has been
    1864             :              * serialized to file.
    1865             :              */
    1866          28 :             stream_abort_internal(xid, subxid);
    1867             : 
    1868          28 :             elog(DEBUG1, "finished processing the STREAM ABORT command");
    1869          28 :             break;
    1870             : 
    1871          20 :         case TRANS_LEADER_SEND_TO_PARALLEL:
    1872             :             Assert(winfo);
    1873             : 
    1874             :             /*
    1875             :              * For the case of aborting the subtransaction, we increment the
    1876             :              * number of streaming blocks and take the lock again before
    1877             :              * sending the STREAM_ABORT to ensure that the parallel apply
    1878             :              * worker will wait on the lock for the next set of changes after
    1879             :              * processing the STREAM_ABORT message if it is not already
    1880             :              * waiting for STREAM_STOP message.
    1881             :              *
    1882             :              * It is important to perform this locking before sending the
    1883             :              * STREAM_ABORT message so that the leader can hold the lock first
    1884             :              * and the parallel apply worker will wait for the leader to
    1885             :              * release the lock. This is the same as what we do in
    1886             :              * apply_handle_stream_stop. See Locking Considerations atop
    1887             :              * applyparallelworker.c.
    1888             :              */
    1889          20 :             if (!toplevel_xact)
    1890             :             {
    1891          18 :                 pa_unlock_stream(xid, AccessExclusiveLock);
    1892          18 :                 pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
    1893          18 :                 pa_lock_stream(xid, AccessExclusiveLock);
    1894             :             }
    1895             : 
    1896          20 :             if (pa_send_data(winfo, s->len, s->data))
    1897             :             {
    1898             :                 /*
    1899             :                  * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
    1900             :                  * wait here for the parallel apply worker to finish as that
    1901             :                  * is not required to maintain the commit order and won't have
    1902             :                  * the risk of failures due to transaction dependencies and
    1903             :                  * deadlocks. However, it is possible that before the parallel
    1904             :                  * worker finishes and we clear the worker info, the xid
    1905             :                  * wraparound happens on the upstream and a new transaction
    1906             :                  * with the same xid can appear and that can lead to duplicate
    1907             :                  * entries in ParallelApplyTxnHash. Yet another problem could
    1908             :                  * be that we may have serialized the changes in partial
    1909             :                  * serialize mode and the file containing xact changes may
    1910             :                  * already exist, and after xid wraparound trying to create
    1911             :                  * the file for the same xid can lead to an error. To avoid
    1912             :                  * these problems, we decide to wait for the aborts to finish.
    1913             :                  *
    1914             :                  * Note, it is okay to not update the flush location position
    1915             :                  * for aborts as in worst case that means such a transaction
    1916             :                  * won't be sent again after restart.
    1917             :                  */
    1918          20 :                 if (toplevel_xact)
    1919           2 :                     pa_xact_finish(winfo, InvalidXLogRecPtr);
    1920             : 
    1921          20 :                 break;
    1922             :             }
    1923             : 
    1924             :             /*
    1925             :              * Switch to serialize mode when we are not able to send the
    1926             :              * change to parallel apply worker.
    1927             :              */
    1928           0 :             pa_switch_to_partial_serialize(winfo, true);
    1929             : 
    1930             :             /* fall through */
    1931           4 :         case TRANS_LEADER_PARTIAL_SERIALIZE:
    1932             :             Assert(winfo);
    1933             : 
    1934             :             /*
    1935             :              * Parallel apply worker might have applied some changes, so write
    1936             :              * the STREAM_ABORT message so that it can rollback the
    1937             :              * subtransaction if needed.
    1938             :              */
    1939           4 :             stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_ABORT,
    1940             :                                          &original_msg);
    1941             : 
    1942           4 :             if (toplevel_xact)
    1943             :             {
    1944           2 :                 pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
    1945           2 :                 pa_xact_finish(winfo, InvalidXLogRecPtr);
    1946             :             }
    1947           4 :             break;
    1948             : 
    1949          24 :         case TRANS_PARALLEL_APPLY:
    1950             : 
    1951             :             /*
    1952             :              * If the parallel apply worker is applying spooled messages then
    1953             :              * close the file before aborting.
    1954             :              */
    1955          24 :             if (toplevel_xact && stream_fd)
    1956           2 :                 stream_close_file();
    1957             : 
    1958          24 :             pa_stream_abort(&abort_data);
    1959             : 
    1960             :             /*
    1961             :              * We need to wait after processing rollback to savepoint for the
    1962             :              * next set of changes.
    1963             :              *
    1964             :              * We have a race condition here due to which we can start waiting
    1965             :              * here when there are more chunk of streams in the queue. See
    1966             :              * apply_handle_stream_stop.
    1967             :              */
    1968          24 :             if (!toplevel_xact)
    1969          20 :                 pa_decr_and_wait_stream_block();
    1970             : 
    1971          24 :             elog(DEBUG1, "finished processing the STREAM ABORT command");
    1972          24 :             break;
    1973             : 
    1974           0 :         default:
    1975           0 :             elog(ERROR, "unexpected apply action: %d", (int) apply_action);
    1976             :             break;
    1977             :     }
    1978             : 
    1979          76 :     reset_apply_error_context_info();
    1980          76 : }
    1981             : 
    1982             : /*
    1983             :  * Ensure that the passed location is fileset's end.
    1984             :  */
    1985             : static void
    1986           8 : ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
    1987             :                     off_t offset)
    1988             : {
    1989             :     char        path[MAXPGPATH];
    1990             :     BufFile    *fd;
    1991             :     int         last_fileno;
    1992             :     off_t       last_offset;
    1993             : 
    1994             :     Assert(!IsTransactionState());
    1995             : 
    1996           8 :     begin_replication_step();
    1997             : 
    1998           8 :     changes_filename(path, MyLogicalRepWorker->subid, xid);
    1999             : 
    2000           8 :     fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
    2001             : 
    2002           8 :     BufFileSeek(fd, 0, 0, SEEK_END);
    2003           8 :     BufFileTell(fd, &last_fileno, &last_offset);
    2004             : 
    2005           8 :     BufFileClose(fd);
    2006             : 
    2007           8 :     end_replication_step();
    2008             : 
    2009           8 :     if (last_fileno != fileno || last_offset != offset)
    2010           0 :         elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
    2011             :              path);
    2012           8 : }
    2013             : 
    2014             : /*
    2015             :  * Common spoolfile processing.
    2016             :  */
    2017             : void
    2018          62 : apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
    2019             :                        XLogRecPtr lsn)
    2020             : {
    2021             :     int         nchanges;
    2022             :     char        path[MAXPGPATH];
    2023          62 :     char       *buffer = NULL;
    2024             :     MemoryContext oldcxt;
    2025             :     ResourceOwner oldowner;
    2026             :     int         fileno;
    2027             :     off_t       offset;
    2028             : 
    2029          62 :     if (!am_parallel_apply_worker())
    2030          54 :         maybe_start_skipping_changes(lsn);
    2031             : 
    2032             :     /* Make sure we have an open transaction */
    2033          62 :     begin_replication_step();
    2034             : 
    2035             :     /*
    2036             :      * Allocate file handle and memory required to process all the messages in
    2037             :      * TopTransactionContext to avoid them getting reset after each message is
    2038             :      * processed.
    2039             :      */
    2040          62 :     oldcxt = MemoryContextSwitchTo(TopTransactionContext);
    2041             : 
    2042             :     /* Open the spool file for the committed/prepared transaction */
    2043          62 :     changes_filename(path, MyLogicalRepWorker->subid, xid);
    2044          62 :     elog(DEBUG1, "replaying changes from file \"%s\"", path);
    2045             : 
    2046             :     /*
    2047             :      * Make sure the file is owned by the toplevel transaction so that the
    2048             :      * file will not be accidentally closed when aborting a subtransaction.
    2049             :      */
    2050          62 :     oldowner = CurrentResourceOwner;
    2051          62 :     CurrentResourceOwner = TopTransactionResourceOwner;
    2052             : 
    2053          62 :     stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
    2054             : 
    2055          62 :     CurrentResourceOwner = oldowner;
    2056             : 
    2057          62 :     buffer = palloc(BLCKSZ);
    2058             : 
    2059          62 :     MemoryContextSwitchTo(oldcxt);
    2060             : 
    2061          62 :     remote_final_lsn = lsn;
    2062             : 
    2063             :     /*
    2064             :      * Make sure the handle apply_dispatch methods are aware we're in a remote
    2065             :      * transaction.
    2066             :      */
    2067          62 :     in_remote_transaction = true;
    2068          62 :     pgstat_report_activity(STATE_RUNNING, NULL);
    2069             : 
    2070          62 :     end_replication_step();
    2071             : 
    2072             :     /*
    2073             :      * Read the entries one by one and pass them through the same logic as in
    2074             :      * apply_dispatch.
    2075             :      */
    2076          62 :     nchanges = 0;
    2077             :     while (true)
    2078      176940 :     {
    2079             :         StringInfoData s2;
    2080             :         size_t      nbytes;
    2081             :         int         len;
    2082             : 
    2083      177002 :         CHECK_FOR_INTERRUPTS();
    2084             : 
    2085             :         /* read length of the on-disk record */
    2086      177002 :         nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
    2087             : 
    2088             :         /* have we reached end of the file? */
    2089      177002 :         if (nbytes == 0)
    2090          52 :             break;
    2091             : 
    2092             :         /* do we have a correct length? */
    2093      176950 :         if (len <= 0)
    2094           0 :             elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
    2095             :                  len, path);
    2096             : 
    2097             :         /* make sure we have sufficiently large buffer */
    2098      176950 :         buffer = repalloc(buffer, len);
    2099             : 
    2100             :         /* and finally read the data into the buffer */
    2101      176950 :         BufFileReadExact(stream_fd, buffer, len);
    2102             : 
    2103      176950 :         BufFileTell(stream_fd, &fileno, &offset);
    2104             : 
    2105             :         /* init a stringinfo using the buffer and call apply_dispatch */
    2106      176950 :         initReadOnlyStringInfo(&s2, buffer, len);
    2107             : 
    2108             :         /* Ensure we are reading the data into our memory context. */
    2109      176950 :         oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
    2110             : 
    2111      176950 :         apply_dispatch(&s2);
    2112             : 
    2113      176948 :         MemoryContextReset(ApplyMessageContext);
    2114             : 
    2115      176948 :         MemoryContextSwitchTo(oldcxt);
    2116             : 
    2117      176948 :         nchanges++;
    2118             : 
    2119             :         /*
    2120             :          * It is possible the file has been closed because we have processed
    2121             :          * the transaction end message like stream_commit in which case that
    2122             :          * must be the last message.
    2123             :          */
    2124      176948 :         if (!stream_fd)
    2125             :         {
    2126           8 :             ensure_last_message(stream_fileset, xid, fileno, offset);
    2127           8 :             break;
    2128             :         }
    2129             : 
    2130      176940 :         if (nchanges % 1000 == 0)
    2131         166 :             elog(DEBUG1, "replayed %d changes from file \"%s\"",
    2132             :                  nchanges, path);
    2133             :     }
    2134             : 
    2135          60 :     if (stream_fd)
    2136          52 :         stream_close_file();
    2137             : 
    2138          60 :     elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
    2139             :          nchanges, path);
    2140             : 
    2141          60 :     return;
    2142             : }
    2143             : 
    2144             : /*
    2145             :  * Handle STREAM COMMIT message.
    2146             :  */
    2147             : static void
    2148         122 : apply_handle_stream_commit(StringInfo s)
    2149             : {
    2150             :     TransactionId xid;
    2151             :     LogicalRepCommitData commit_data;
    2152             :     ParallelApplyWorkerInfo *winfo;
    2153             :     TransApplyAction apply_action;
    2154             : 
    2155             :     /* Save the message before it is consumed. */
    2156         122 :     StringInfoData original_msg = *s;
    2157             : 
    2158         122 :     if (in_streamed_transaction)
    2159           0 :         ereport(ERROR,
    2160             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    2161             :                  errmsg_internal("STREAM COMMIT message without STREAM STOP")));
    2162             : 
    2163         122 :     xid = logicalrep_read_stream_commit(s, &commit_data);
    2164         122 :     set_apply_error_context_xact(xid, commit_data.commit_lsn);
    2165             : 
    2166         122 :     apply_action = get_transaction_apply_action(xid, &winfo);
    2167             : 
    2168         122 :     switch (apply_action)
    2169             :     {
    2170          44 :         case TRANS_LEADER_APPLY:
    2171             : 
    2172             :             /*
    2173             :              * The transaction has been serialized to file, so replay all the
    2174             :              * spooled operations.
    2175             :              */
    2176          44 :             apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
    2177             :                                    commit_data.commit_lsn);
    2178             : 
    2179          42 :             apply_handle_commit_internal(&commit_data);
    2180             : 
    2181             :             /* Unlink the files with serialized changes and subxact info. */
    2182          42 :             stream_cleanup_files(MyLogicalRepWorker->subid, xid);
    2183             : 
    2184          42 :             elog(DEBUG1, "finished processing the STREAM COMMIT command");
    2185          42 :             break;
    2186             : 
    2187          36 :         case TRANS_LEADER_SEND_TO_PARALLEL:
    2188             :             Assert(winfo);
    2189             : 
    2190          36 :             if (pa_send_data(winfo, s->len, s->data))
    2191             :             {
    2192             :                 /* Finish processing the streaming transaction. */
    2193          36 :                 pa_xact_finish(winfo, commit_data.end_lsn);
    2194          34 :                 break;
    2195             :             }
    2196             : 
    2197             :             /*
    2198             :              * Switch to serialize mode when we are not able to send the
    2199             :              * change to parallel apply worker.
    2200             :              */
    2201           0 :             pa_switch_to_partial_serialize(winfo, true);
    2202             : 
    2203             :             /* fall through */
    2204           4 :         case TRANS_LEADER_PARTIAL_SERIALIZE:
    2205             :             Assert(winfo);
    2206             : 
    2207           4 :             stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_COMMIT,
    2208             :                                          &original_msg);
    2209             : 
    2210           4 :             pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
    2211             : 
    2212             :             /* Finish processing the streaming transaction. */
    2213           4 :             pa_xact_finish(winfo, commit_data.end_lsn);
    2214           4 :             break;
    2215             : 
    2216          38 :         case TRANS_PARALLEL_APPLY:
    2217             : 
    2218             :             /*
    2219             :              * If the parallel apply worker is applying spooled messages then
    2220             :              * close the file before committing.
    2221             :              */
    2222          38 :             if (stream_fd)
    2223           4 :                 stream_close_file();
    2224             : 
    2225          38 :             apply_handle_commit_internal(&commit_data);
    2226             : 
    2227          38 :             MyParallelShared->last_commit_end = XactLastCommitEnd;
    2228             : 
    2229             :             /*
    2230             :              * It is important to set the transaction state as finished before
    2231             :              * releasing the lock. See pa_wait_for_xact_finish.
    2232             :              */
    2233          38 :             pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
    2234          38 :             pa_unlock_transaction(xid, AccessExclusiveLock);
    2235             : 
    2236          38 :             pa_reset_subtrans();
    2237             : 
    2238          38 :             elog(DEBUG1, "finished processing the STREAM COMMIT command");
    2239          38 :             break;
    2240             : 
    2241           0 :         default:
    2242           0 :             elog(ERROR, "unexpected apply action: %d", (int) apply_action);
    2243             :             break;
    2244             :     }
    2245             : 
    2246             :     /* Process any tables that are being synchronized in parallel. */
    2247         118 :     process_syncing_tables(commit_data.end_lsn);
    2248             : 
    2249         118 :     pgstat_report_activity(STATE_IDLE, NULL);
    2250             : 
    2251         118 :     reset_apply_error_context_info();
    2252         118 : }
    2253             : 
    2254             : /*
    2255             :  * Helper function for apply_handle_commit and apply_handle_stream_commit.
    2256             :  */
    2257             : static void
    2258         928 : apply_handle_commit_internal(LogicalRepCommitData *commit_data)
    2259             : {
    2260         928 :     if (is_skipping_changes())
    2261             :     {
    2262           4 :         stop_skipping_changes();
    2263             : 
    2264             :         /*
    2265             :          * Start a new transaction to clear the subskiplsn, if not started
    2266             :          * yet.
    2267             :          */
    2268           4 :         if (!IsTransactionState())
    2269           2 :             StartTransactionCommand();
    2270             :     }
    2271             : 
    2272         928 :     if (IsTransactionState())
    2273             :     {
    2274             :         /*
    2275             :          * The transaction is either non-empty or skipped, so we clear the
    2276             :          * subskiplsn.
    2277             :          */
    2278         928 :         clear_subscription_skip_lsn(commit_data->commit_lsn);
    2279             : 
    2280             :         /*
    2281             :          * Update origin state so we can restart streaming from correct
    2282             :          * position in case of crash.
    2283             :          */
    2284         928 :         replorigin_session_origin_lsn = commit_data->end_lsn;
    2285         928 :         replorigin_session_origin_timestamp = commit_data->committime;
    2286             : 
    2287         928 :         CommitTransactionCommand();
    2288             : 
    2289         928 :         if (IsTransactionBlock())
    2290             :         {
    2291           8 :             EndTransactionBlock(false);
    2292           8 :             CommitTransactionCommand();
    2293             :         }
    2294             : 
    2295         928 :         pgstat_report_stat(false);
    2296             : 
    2297         928 :         store_flush_position(commit_data->end_lsn, XactLastCommitEnd);
    2298             :     }
    2299             :     else
    2300             :     {
    2301             :         /* Process any invalidation messages that might have accumulated. */
    2302           0 :         AcceptInvalidationMessages();
    2303           0 :         maybe_reread_subscription();
    2304             :     }
    2305             : 
    2306         928 :     in_remote_transaction = false;
    2307         928 : }
    2308             : 
    2309             : /*
    2310             :  * Handle RELATION message.
    2311             :  *
    2312             :  * Note we don't do validation against local schema here. The validation
    2313             :  * against local schema is postponed until first change for given relation
    2314             :  * comes as we only care about it when applying changes for it anyway and we
    2315             :  * do less locking this way.
    2316             :  */
    2317             : static void
    2318         870 : apply_handle_relation(StringInfo s)
    2319             : {
    2320             :     LogicalRepRelation *rel;
    2321             : 
    2322         870 :     if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
    2323          72 :         return;
    2324             : 
    2325         798 :     rel = logicalrep_read_rel(s);
    2326         798 :     logicalrep_relmap_update(rel);
    2327             : 
    2328             :     /* Also reset all entries in the partition map that refer to remoterel. */
    2329         798 :     logicalrep_partmap_reset_relmap(rel);
    2330             : }
    2331             : 
    2332             : /*
    2333             :  * Handle TYPE message.
    2334             :  *
    2335             :  * This implementation pays no attention to TYPE messages; we expect the user
    2336             :  * to have set things up so that the incoming data is acceptable to the input
    2337             :  * functions for the locally subscribed tables.  Hence, we just read and
    2338             :  * discard the message.
    2339             :  */
    2340             : static void
    2341          36 : apply_handle_type(StringInfo s)
    2342             : {
    2343             :     LogicalRepTyp typ;
    2344             : 
    2345          36 :     if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
    2346           0 :         return;
    2347             : 
    2348          36 :     logicalrep_read_typ(s, &typ);
    2349             : }
    2350             : 
    2351             : /*
    2352             :  * Check that we (the subscription owner) have sufficient privileges on the
    2353             :  * target relation to perform the given operation.
    2354             :  */
    2355             : static void
    2356      440628 : TargetPrivilegesCheck(Relation rel, AclMode mode)
    2357             : {
    2358             :     Oid         relid;
    2359             :     AclResult   aclresult;
    2360             : 
    2361      440628 :     relid = RelationGetRelid(rel);
    2362      440628 :     aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
    2363      440628 :     if (aclresult != ACLCHECK_OK)
    2364          14 :         aclcheck_error(aclresult,
    2365          14 :                        get_relkind_objtype(rel->rd_rel->relkind),
    2366          14 :                        get_rel_name(relid));
    2367             : 
    2368             :     /*
    2369             :      * We lack the infrastructure to honor RLS policies.  It might be possible
    2370             :      * to add such infrastructure here, but tablesync workers lack it, too, so
    2371             :      * we don't bother.  RLS does not ordinarily apply to TRUNCATE commands,
    2372             :      * but it seems dangerous to replicate a TRUNCATE and then refuse to
    2373             :      * replicate subsequent INSERTs, so we forbid all commands the same.
    2374             :      */
    2375      440614 :     if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
    2376           6 :         ereport(ERROR,
    2377             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2378             :                  errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
    2379             :                         GetUserNameFromId(GetUserId(), true),
    2380             :                         RelationGetRelationName(rel))));
    2381      440608 : }
    2382             : 
    2383             : /*
    2384             :  * Handle INSERT message.
    2385             :  */
    2386             : 
    2387             : static void
    2388      371736 : apply_handle_insert(StringInfo s)
    2389             : {
    2390             :     LogicalRepRelMapEntry *rel;
    2391             :     LogicalRepTupleData newtup;
    2392             :     LogicalRepRelId relid;
    2393             :     UserContext ucxt;
    2394             :     ApplyExecutionData *edata;
    2395             :     EState     *estate;
    2396             :     TupleTableSlot *remoteslot;
    2397             :     MemoryContext oldctx;
    2398             :     bool        run_as_owner;
    2399             : 
    2400             :     /*
    2401             :      * Quick return if we are skipping data modification changes or handling
    2402             :      * streamed transactions.
    2403             :      */
    2404      723470 :     if (is_skipping_changes() ||
    2405      351734 :         handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
    2406      220132 :         return;
    2407             : 
    2408      151722 :     begin_replication_step();
    2409             : 
    2410      151718 :     relid = logicalrep_read_insert(s, &newtup);
    2411      151718 :     rel = logicalrep_rel_open(relid, RowExclusiveLock);
    2412      151704 :     if (!should_apply_changes_for_rel(rel))
    2413             :     {
    2414             :         /*
    2415             :          * The relation can't become interesting in the middle of the
    2416             :          * transaction so it's safe to unlock it.
    2417             :          */
    2418         118 :         logicalrep_rel_close(rel, RowExclusiveLock);
    2419         118 :         end_replication_step();
    2420         118 :         return;
    2421             :     }
    2422             : 
    2423             :     /*
    2424             :      * Make sure that any user-supplied code runs as the table owner, unless
    2425             :      * the user has opted out of that behavior.
    2426             :      */
    2427      151586 :     run_as_owner = MySubscription->runasowner;
    2428      151586 :     if (!run_as_owner)
    2429      151570 :         SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
    2430             : 
    2431             :     /* Set relation for error callback */
    2432      151586 :     apply_error_callback_arg.rel = rel;
    2433             : 
    2434             :     /* Initialize the executor state. */
    2435      151586 :     edata = create_edata_for_relation(rel);
    2436      151586 :     estate = edata->estate;
    2437      151586 :     remoteslot = ExecInitExtraTupleSlot(estate,
    2438      151586 :                                         RelationGetDescr(rel->localrel),
    2439             :                                         &TTSOpsVirtual);
    2440             : 
    2441             :     /* Process and store remote tuple in the slot */
    2442      151586 :     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    2443      151586 :     slot_store_data(remoteslot, rel, &newtup);
    2444      151586 :     slot_fill_defaults(rel, estate, remoteslot);
    2445      151586 :     MemoryContextSwitchTo(oldctx);
    2446             : 
    2447             :     /* For a partitioned table, insert the tuple into a partition. */
    2448      151586 :     if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
    2449          90 :         apply_handle_tuple_routing(edata,
    2450             :                                    remoteslot, NULL, CMD_INSERT);
    2451             :     else
    2452             :     {
    2453      151496 :         ResultRelInfo *relinfo = edata->targetRelInfo;
    2454             : 
    2455      151496 :         ExecOpenIndices(relinfo, false);
    2456      151496 :         apply_handle_insert_internal(edata, relinfo, remoteslot);
    2457      151468 :         ExecCloseIndices(relinfo);
    2458             :     }
    2459             : 
    2460      151556 :     finish_edata(edata);
    2461             : 
    2462             :     /* Reset relation for error callback */
    2463      151556 :     apply_error_callback_arg.rel = NULL;
    2464             : 
    2465      151556 :     if (!run_as_owner)
    2466      151546 :         RestoreUserContext(&ucxt);
    2467             : 
    2468      151556 :     logicalrep_rel_close(rel, NoLock);
    2469             : 
    2470      151556 :     end_replication_step();
    2471             : }
    2472             : 
    2473             : /*
    2474             :  * Workhorse for apply_handle_insert()
    2475             :  * relinfo is for the relation we're actually inserting into
    2476             :  * (could be a child partition of edata->targetRelInfo)
    2477             :  */
    2478             : static void
    2479      151588 : apply_handle_insert_internal(ApplyExecutionData *edata,
    2480             :                              ResultRelInfo *relinfo,
    2481             :                              TupleTableSlot *remoteslot)
    2482             : {
    2483      151588 :     EState     *estate = edata->estate;
    2484             : 
    2485             :     /* Caller should have opened indexes already. */
    2486             :     Assert(relinfo->ri_IndexRelationDescs != NULL ||
    2487             :            !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
    2488             :            RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
    2489             : 
    2490             :     /* Caller will not have done this bit. */
    2491             :     Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
    2492      151588 :     InitConflictIndexes(relinfo);
    2493             : 
    2494             :     /* Do the insert. */
    2495      151588 :     TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
    2496      151576 :     ExecSimpleRelationInsert(relinfo, estate, remoteslot);
    2497      151558 : }
    2498             : 
    2499             : /*
    2500             :  * Check if the logical replication relation is updatable and throw
    2501             :  * appropriate error if it isn't.
    2502             :  */
    2503             : static void
    2504      144566 : check_relation_updatable(LogicalRepRelMapEntry *rel)
    2505             : {
    2506             :     /*
    2507             :      * For partitioned tables, we only need to care if the target partition is
    2508             :      * updatable (aka has PK or RI defined for it).
    2509             :      */
    2510      144566 :     if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
    2511          60 :         return;
    2512             : 
    2513             :     /* Updatable, no error. */
    2514      144506 :     if (rel->updatable)
    2515      144506 :         return;
    2516             : 
    2517             :     /*
    2518             :      * We are in error mode so it's fine this is somewhat slow. It's better to
    2519             :      * give user correct error.
    2520             :      */
    2521           0 :     if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
    2522             :     {
    2523           0 :         ereport(ERROR,
    2524             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2525             :                  errmsg("publisher did not send replica identity column "
    2526             :                         "expected by the logical replication target relation \"%s.%s\"",
    2527             :                         rel->remoterel.nspname, rel->remoterel.relname)));
    2528             :     }
    2529             : 
    2530           0 :     ereport(ERROR,
    2531             :             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2532             :              errmsg("logical replication target relation \"%s.%s\" has "
    2533             :                     "neither REPLICA IDENTITY index nor PRIMARY "
    2534             :                     "KEY and published relation does not have "
    2535             :                     "REPLICA IDENTITY FULL",
    2536             :                     rel->remoterel.nspname, rel->remoterel.relname)));
    2537             : }
    2538             : 
    2539             : /*
    2540             :  * Handle UPDATE message.
    2541             :  *
    2542             :  * TODO: FDW support
    2543             :  */
    2544             : static void
    2545      132318 : apply_handle_update(StringInfo s)
    2546             : {
    2547             :     LogicalRepRelMapEntry *rel;
    2548             :     LogicalRepRelId relid;
    2549             :     UserContext ucxt;
    2550             :     ApplyExecutionData *edata;
    2551             :     EState     *estate;
    2552             :     LogicalRepTupleData oldtup;
    2553             :     LogicalRepTupleData newtup;
    2554             :     bool        has_oldtup;
    2555             :     TupleTableSlot *remoteslot;
    2556             :     RTEPermissionInfo *target_perminfo;
    2557             :     MemoryContext oldctx;
    2558             :     bool        run_as_owner;
    2559             : 
    2560             :     /*
    2561             :      * Quick return if we are skipping data modification changes or handling
    2562             :      * streamed transactions.
    2563             :      */
    2564      264630 :     if (is_skipping_changes() ||
    2565      132312 :         handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
    2566       68446 :         return;
    2567             : 
    2568       63872 :     begin_replication_step();
    2569             : 
    2570       63870 :     relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
    2571             :                                    &newtup);
    2572       63870 :     rel = logicalrep_rel_open(relid, RowExclusiveLock);
    2573       63870 :     if (!should_apply_changes_for_rel(rel))
    2574             :     {
    2575             :         /*
    2576             :          * The relation can't become interesting in the middle of the
    2577             :          * transaction so it's safe to unlock it.
    2578             :          */
    2579           0 :         logicalrep_rel_close(rel, RowExclusiveLock);
    2580           0 :         end_replication_step();
    2581           0 :         return;
    2582             :     }
    2583             : 
    2584             :     /* Set relation for error callback */
    2585       63870 :     apply_error_callback_arg.rel = rel;
    2586             : 
    2587             :     /* Check if we can do the update. */
    2588       63870 :     check_relation_updatable(rel);
    2589             : 
    2590             :     /*
    2591             :      * Make sure that any user-supplied code runs as the table owner, unless
    2592             :      * the user has opted out of that behavior.
    2593             :      */
    2594       63870 :     run_as_owner = MySubscription->runasowner;
    2595       63870 :     if (!run_as_owner)
    2596       63864 :         SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
    2597             : 
    2598             :     /* Initialize the executor state. */
    2599       63868 :     edata = create_edata_for_relation(rel);
    2600       63868 :     estate = edata->estate;
    2601       63868 :     remoteslot = ExecInitExtraTupleSlot(estate,
    2602       63868 :                                         RelationGetDescr(rel->localrel),
    2603             :                                         &TTSOpsVirtual);
    2604             : 
    2605             :     /*
    2606             :      * Populate updatedCols so that per-column triggers can fire, and so
    2607             :      * executor can correctly pass down indexUnchanged hint.  This could
    2608             :      * include more columns than were actually changed on the publisher
    2609             :      * because the logical replication protocol doesn't contain that
    2610             :      * information.  But it would for example exclude columns that only exist
    2611             :      * on the subscriber, since we are not touching those.
    2612             :      */
    2613       63868 :     target_perminfo = list_nth(estate->es_rteperminfos, 0);
    2614      318608 :     for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
    2615             :     {
    2616      254740 :         Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i);
    2617      254740 :         int         remoteattnum = rel->attrmap->attnums[i];
    2618             : 
    2619      254740 :         if (!att->attisdropped && remoteattnum >= 0)
    2620             :         {
    2621             :             Assert(remoteattnum < newtup.ncols);
    2622      137706 :             if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
    2623      137700 :                 target_perminfo->updatedCols =
    2624      137700 :                     bms_add_member(target_perminfo->updatedCols,
    2625             :                                    i + 1 - FirstLowInvalidHeapAttributeNumber);
    2626             :         }
    2627             :     }
    2628             : 
    2629             :     /* Build the search tuple. */
    2630       63868 :     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    2631       63868 :     slot_store_data(remoteslot, rel,
    2632       63868 :                     has_oldtup ? &oldtup : &newtup);
    2633       63868 :     MemoryContextSwitchTo(oldctx);
    2634             : 
    2635             :     /* For a partitioned table, apply update to correct partition. */
    2636       63868 :     if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
    2637          26 :         apply_handle_tuple_routing(edata,
    2638             :                                    remoteslot, &newtup, CMD_UPDATE);
    2639             :     else
    2640       63842 :         apply_handle_update_internal(edata, edata->targetRelInfo,
    2641             :                                      remoteslot, &newtup, rel->localindexoid);
    2642             : 
    2643       63856 :     finish_edata(edata);
    2644             : 
    2645             :     /* Reset relation for error callback */
    2646       63856 :     apply_error_callback_arg.rel = NULL;
    2647             : 
    2648       63856 :     if (!run_as_owner)
    2649       63852 :         RestoreUserContext(&ucxt);
    2650             : 
    2651       63856 :     logicalrep_rel_close(rel, NoLock);
    2652             : 
    2653       63856 :     end_replication_step();
    2654             : }
    2655             : 
    2656             : /*
    2657             :  * Workhorse for apply_handle_update()
    2658             :  * relinfo is for the relation we're actually updating in
    2659             :  * (could be a child partition of edata->targetRelInfo)
    2660             :  */
    2661             : static void
    2662       63842 : apply_handle_update_internal(ApplyExecutionData *edata,
    2663             :                              ResultRelInfo *relinfo,
    2664             :                              TupleTableSlot *remoteslot,
    2665             :                              LogicalRepTupleData *newtup,
    2666             :                              Oid localindexoid)
    2667             : {
    2668       63842 :     EState     *estate = edata->estate;
    2669       63842 :     LogicalRepRelMapEntry *relmapentry = edata->targetRel;
    2670       63842 :     Relation    localrel = relinfo->ri_RelationDesc;
    2671             :     EPQState    epqstate;
    2672       63842 :     TupleTableSlot *localslot = NULL;
    2673       63842 :     ConflictTupleInfo conflicttuple = {0};
    2674             :     bool        found;
    2675             :     MemoryContext oldctx;
    2676             : 
    2677       63842 :     EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
    2678       63842 :     ExecOpenIndices(relinfo, false);
    2679             : 
    2680       63842 :     found = FindReplTupleInLocalRel(edata, localrel,
    2681             :                                     &relmapentry->remoterel,
    2682             :                                     localindexoid,
    2683             :                                     remoteslot, &localslot);
    2684             : 
    2685             :     /*
    2686             :      * Tuple found.
    2687             :      *
    2688             :      * Note this will fail if there are other conflicting unique indexes.
    2689             :      */
    2690       63834 :     if (found)
    2691             :     {
    2692             :         /*
    2693             :          * Report the conflict if the tuple was modified by a different
    2694             :          * origin.
    2695             :          */
    2696       63826 :         if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
    2697           4 :                                     &conflicttuple.origin, &conflicttuple.ts) &&
    2698           4 :             conflicttuple.origin != replorigin_session_origin)
    2699             :         {
    2700             :             TupleTableSlot *newslot;
    2701             : 
    2702             :             /* Store the new tuple for conflict reporting */
    2703           4 :             newslot = table_slot_create(localrel, &estate->es_tupleTable);
    2704           4 :             slot_store_data(newslot, relmapentry, newtup);
    2705             : 
    2706           4 :             conflicttuple.slot = localslot;
    2707             : 
    2708           4 :             ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
    2709             :                                 remoteslot, newslot,
    2710           4 :                                 list_make1(&conflicttuple));
    2711             :         }
    2712             : 
    2713             :         /* Process and store remote tuple in the slot */
    2714       63826 :         oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    2715       63826 :         slot_modify_data(remoteslot, localslot, relmapentry, newtup);
    2716       63826 :         MemoryContextSwitchTo(oldctx);
    2717             : 
    2718       63826 :         EvalPlanQualSetSlot(&epqstate, remoteslot);
    2719             : 
    2720       63826 :         InitConflictIndexes(relinfo);
    2721             : 
    2722             :         /* Do the actual update. */
    2723       63826 :         TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
    2724       63826 :         ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
    2725             :                                  remoteslot);
    2726             :     }
    2727             :     else
    2728             :     {
    2729           8 :         TupleTableSlot *newslot = localslot;
    2730             : 
    2731             :         /* Store the new tuple for conflict reporting */
    2732           8 :         slot_store_data(newslot, relmapentry, newtup);
    2733             : 
    2734             :         /*
    2735             :          * The tuple to be updated could not be found.  Do nothing except for
    2736             :          * emitting a log message.
    2737             :          */
    2738           8 :         ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
    2739           8 :                             remoteslot, newslot, list_make1(&conflicttuple));
    2740             :     }
    2741             : 
    2742             :     /* Cleanup. */
    2743       63830 :     ExecCloseIndices(relinfo);
    2744       63830 :     EvalPlanQualEnd(&epqstate);
    2745       63830 : }
    2746             : 
    2747             : /*
    2748             :  * Handle DELETE message.
    2749             :  *
    2750             :  * TODO: FDW support
    2751             :  */
    2752             : static void
    2753      163866 : apply_handle_delete(StringInfo s)
    2754             : {
    2755             :     LogicalRepRelMapEntry *rel;
    2756             :     LogicalRepTupleData oldtup;
    2757             :     LogicalRepRelId relid;
    2758             :     UserContext ucxt;
    2759             :     ApplyExecutionData *edata;
    2760             :     EState     *estate;
    2761             :     TupleTableSlot *remoteslot;
    2762             :     MemoryContext oldctx;
    2763             :     bool        run_as_owner;
    2764             : 
    2765             :     /*
    2766             :      * Quick return if we are skipping data modification changes or handling
    2767             :      * streamed transactions.
    2768             :      */
    2769      327732 :     if (is_skipping_changes() ||
    2770      163866 :         handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
    2771       83230 :         return;
    2772             : 
    2773       80636 :     begin_replication_step();
    2774             : 
    2775       80636 :     relid = logicalrep_read_delete(s, &oldtup);
    2776       80636 :     rel = logicalrep_rel_open(relid, RowExclusiveLock);
    2777       80636 :     if (!should_apply_changes_for_rel(rel))
    2778             :     {
    2779             :         /*
    2780             :          * The relation can't become interesting in the middle of the
    2781             :          * transaction so it's safe to unlock it.
    2782             :          */
    2783           0 :         logicalrep_rel_close(rel, RowExclusiveLock);
    2784           0 :         end_replication_step();
    2785           0 :         return;
    2786             :     }
    2787             : 
    2788             :     /* Set relation for error callback */
    2789       80636 :     apply_error_callback_arg.rel = rel;
    2790             : 
    2791             :     /* Check if we can do the delete. */
    2792       80636 :     check_relation_updatable(rel);
    2793             : 
    2794             :     /*
    2795             :      * Make sure that any user-supplied code runs as the table owner, unless
    2796             :      * the user has opted out of that behavior.
    2797             :      */
    2798       80636 :     run_as_owner = MySubscription->runasowner;
    2799       80636 :     if (!run_as_owner)
    2800       80632 :         SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
    2801             : 
    2802             :     /* Initialize the executor state. */
    2803       80636 :     edata = create_edata_for_relation(rel);
    2804       80636 :     estate = edata->estate;
    2805       80636 :     remoteslot = ExecInitExtraTupleSlot(estate,
    2806       80636 :                                         RelationGetDescr(rel->localrel),
    2807             :                                         &TTSOpsVirtual);
    2808             : 
    2809             :     /* Build the search tuple. */
    2810       80636 :     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    2811       80636 :     slot_store_data(remoteslot, rel, &oldtup);
    2812       80636 :     MemoryContextSwitchTo(oldctx);
    2813             : 
    2814             :     /* For a partitioned table, apply delete to correct partition. */
    2815       80636 :     if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
    2816          34 :         apply_handle_tuple_routing(edata,
    2817             :                                    remoteslot, NULL, CMD_DELETE);
    2818             :     else
    2819             :     {
    2820       80602 :         ResultRelInfo *relinfo = edata->targetRelInfo;
    2821             : 
    2822       80602 :         ExecOpenIndices(relinfo, false);
    2823       80602 :         apply_handle_delete_internal(edata, relinfo,
    2824             :                                      remoteslot, rel->localindexoid);
    2825       80602 :         ExecCloseIndices(relinfo);
    2826             :     }
    2827             : 
    2828       80636 :     finish_edata(edata);
    2829             : 
    2830             :     /* Reset relation for error callback */
    2831       80636 :     apply_error_callback_arg.rel = NULL;
    2832             : 
    2833       80636 :     if (!run_as_owner)
    2834       80632 :         RestoreUserContext(&ucxt);
    2835             : 
    2836       80636 :     logicalrep_rel_close(rel, NoLock);
    2837             : 
    2838       80636 :     end_replication_step();
    2839             : }
    2840             : 
    2841             : /*
    2842             :  * Workhorse for apply_handle_delete()
    2843             :  * relinfo is for the relation we're actually deleting from
    2844             :  * (could be a child partition of edata->targetRelInfo)
    2845             :  */
    2846             : static void
    2847       80636 : apply_handle_delete_internal(ApplyExecutionData *edata,
    2848             :                              ResultRelInfo *relinfo,
    2849             :                              TupleTableSlot *remoteslot,
    2850             :                              Oid localindexoid)
    2851             : {
    2852       80636 :     EState     *estate = edata->estate;
    2853       80636 :     Relation    localrel = relinfo->ri_RelationDesc;
    2854       80636 :     LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
    2855             :     EPQState    epqstate;
    2856             :     TupleTableSlot *localslot;
    2857       80636 :     ConflictTupleInfo conflicttuple = {0};
    2858             :     bool        found;
    2859             : 
    2860       80636 :     EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
    2861             : 
    2862             :     /* Caller should have opened indexes already. */
    2863             :     Assert(relinfo->ri_IndexRelationDescs != NULL ||
    2864             :            !localrel->rd_rel->relhasindex ||
    2865             :            RelationGetIndexList(localrel) == NIL);
    2866             : 
    2867       80636 :     found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
    2868             :                                     remoteslot, &localslot);
    2869             : 
    2870             :     /* If found delete it. */
    2871       80636 :     if (found)
    2872             :     {
    2873             :         /*
    2874             :          * Report the conflict if the tuple was modified by a different
    2875             :          * origin.
    2876             :          */
    2877       80618 :         if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
    2878           6 :                                     &conflicttuple.origin, &conflicttuple.ts) &&
    2879           6 :             conflicttuple.origin != replorigin_session_origin)
    2880             :         {
    2881           4 :             conflicttuple.slot = localslot;
    2882           4 :             ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
    2883             :                                 remoteslot, NULL,
    2884           4 :                                 list_make1(&conflicttuple));
    2885             :         }
    2886             : 
    2887       80618 :         EvalPlanQualSetSlot(&epqstate, localslot);
    2888             : 
    2889             :         /* Do the actual delete. */
    2890       80618 :         TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE);
    2891       80618 :         ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
    2892             :     }
    2893             :     else
    2894             :     {
    2895             :         /*
    2896             :          * The tuple to be deleted could not be found.  Do nothing except for
    2897             :          * emitting a log message.
    2898             :          */
    2899          18 :         ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
    2900          18 :                             remoteslot, NULL, list_make1(&conflicttuple));
    2901             :     }
    2902             : 
    2903             :     /* Cleanup. */
    2904       80636 :     EvalPlanQualEnd(&epqstate);
    2905       80636 : }
    2906             : 
    2907             : /*
    2908             :  * Try to find a tuple received from the publication side (in 'remoteslot') in
    2909             :  * the corresponding local relation using either replica identity index,
    2910             :  * primary key, index or if needed, sequential scan.
    2911             :  *
    2912             :  * Local tuple, if found, is returned in '*localslot'.
    2913             :  */
    2914             : static bool
    2915      144504 : FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
    2916             :                         LogicalRepRelation *remoterel,
    2917             :                         Oid localidxoid,
    2918             :                         TupleTableSlot *remoteslot,
    2919             :                         TupleTableSlot **localslot)
    2920             : {
    2921      144504 :     EState     *estate = edata->estate;
    2922             :     bool        found;
    2923             : 
    2924             :     /*
    2925             :      * Regardless of the top-level operation, we're performing a read here, so
    2926             :      * check for SELECT privileges.
    2927             :      */
    2928      144504 :     TargetPrivilegesCheck(localrel, ACL_SELECT);
    2929             : 
    2930      144496 :     *localslot = table_slot_create(localrel, &estate->es_tupleTable);
    2931             : 
    2932             :     Assert(OidIsValid(localidxoid) ||
    2933             :            (remoterel->replident == REPLICA_IDENTITY_FULL));
    2934             : 
    2935      144496 :     if (OidIsValid(localidxoid))
    2936             :     {
    2937             : #ifdef USE_ASSERT_CHECKING
    2938             :         Relation    idxrel = index_open(localidxoid, AccessShareLock);
    2939             : 
    2940             :         /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
    2941             :         Assert(GetRelationIdentityOrPK(localrel) == localidxoid ||
    2942             :                (remoterel->replident == REPLICA_IDENTITY_FULL &&
    2943             :                 IsIndexUsableForReplicaIdentityFull(idxrel,
    2944             :                                                     edata->targetRel->attrmap)));
    2945             :         index_close(idxrel, AccessShareLock);
    2946             : #endif
    2947             : 
    2948      144198 :         found = RelationFindReplTupleByIndex(localrel, localidxoid,
    2949             :                                              LockTupleExclusive,
    2950             :                                              remoteslot, *localslot);
    2951             :     }
    2952             :     else
    2953         298 :         found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
    2954             :                                          remoteslot, *localslot);
    2955             : 
    2956      144496 :     return found;
    2957             : }
    2958             : 
    2959             : /*
    2960             :  * This handles insert, update, delete on a partitioned table.
    2961             :  */
    2962             : static void
    2963         150 : apply_handle_tuple_routing(ApplyExecutionData *edata,
    2964             :                            TupleTableSlot *remoteslot,
    2965             :                            LogicalRepTupleData *newtup,
    2966             :                            CmdType operation)
    2967             : {
    2968         150 :     EState     *estate = edata->estate;
    2969         150 :     LogicalRepRelMapEntry *relmapentry = edata->targetRel;
    2970         150 :     ResultRelInfo *relinfo = edata->targetRelInfo;
    2971         150 :     Relation    parentrel = relinfo->ri_RelationDesc;
    2972             :     ModifyTableState *mtstate;
    2973             :     PartitionTupleRouting *proute;
    2974             :     ResultRelInfo *partrelinfo;
    2975             :     Relation    partrel;
    2976             :     TupleTableSlot *remoteslot_part;
    2977             :     TupleConversionMap *map;
    2978             :     MemoryContext oldctx;
    2979         150 :     LogicalRepRelMapEntry *part_entry = NULL;
    2980         150 :     AttrMap    *attrmap = NULL;
    2981             : 
    2982             :     /* ModifyTableState is needed for ExecFindPartition(). */
    2983         150 :     edata->mtstate = mtstate = makeNode(ModifyTableState);
    2984         150 :     mtstate->ps.plan = NULL;
    2985         150 :     mtstate->ps.state = estate;
    2986         150 :     mtstate->operation = operation;
    2987         150 :     mtstate->resultRelInfo = relinfo;
    2988             : 
    2989             :     /* ... as is PartitionTupleRouting. */
    2990         150 :     edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
    2991             : 
    2992             :     /*
    2993             :      * Find the partition to which the "search tuple" belongs.
    2994             :      */
    2995             :     Assert(remoteslot != NULL);
    2996         150 :     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    2997         150 :     partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
    2998             :                                     remoteslot, estate);
    2999             :     Assert(partrelinfo != NULL);
    3000         150 :     partrel = partrelinfo->ri_RelationDesc;
    3001             : 
    3002             :     /*
    3003             :      * Check for supported relkind.  We need this since partitions might be of
    3004             :      * unsupported relkinds; and the set of partitions can change, so checking
    3005             :      * at CREATE/ALTER SUBSCRIPTION would be insufficient.
    3006             :      */
    3007         150 :     CheckSubscriptionRelkind(partrel->rd_rel->relkind,
    3008         150 :                              get_namespace_name(RelationGetNamespace(partrel)),
    3009         150 :                              RelationGetRelationName(partrel));
    3010             : 
    3011             :     /*
    3012             :      * To perform any of the operations below, the tuple must match the
    3013             :      * partition's rowtype. Convert if needed or just copy, using a dedicated
    3014             :      * slot to store the tuple in any case.
    3015             :      */
    3016         150 :     remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
    3017         150 :     if (remoteslot_part == NULL)
    3018          84 :         remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
    3019         150 :     map = ExecGetRootToChildMap(partrelinfo, estate);
    3020         150 :     if (map != NULL)
    3021             :     {
    3022          66 :         attrmap = map->attrMap;
    3023          66 :         remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
    3024             :                                                 remoteslot_part);
    3025             :     }
    3026             :     else
    3027             :     {
    3028          84 :         remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
    3029          84 :         slot_getallattrs(remoteslot_part);
    3030             :     }
    3031         150 :     MemoryContextSwitchTo(oldctx);
    3032             : 
    3033             :     /* Check if we can do the update or delete on the leaf partition. */
    3034         150 :     if (operation == CMD_UPDATE || operation == CMD_DELETE)
    3035             :     {
    3036          60 :         part_entry = logicalrep_partition_open(relmapentry, partrel,
    3037             :                                                attrmap);
    3038          60 :         check_relation_updatable(part_entry);
    3039             :     }
    3040             : 
    3041         150 :     switch (operation)
    3042             :     {
    3043          90 :         case CMD_INSERT:
    3044          90 :             apply_handle_insert_internal(edata, partrelinfo,
    3045             :                                          remoteslot_part);
    3046          88 :             break;
    3047             : 
    3048          34 :         case CMD_DELETE:
    3049          34 :             apply_handle_delete_internal(edata, partrelinfo,
    3050             :                                          remoteslot_part,
    3051             :                                          part_entry->localindexoid);
    3052          34 :             break;
    3053             : 
    3054          26 :         case CMD_UPDATE:
    3055             : 
    3056             :             /*
    3057             :              * For UPDATE, depending on whether or not the updated tuple
    3058             :              * satisfies the partition's constraint, perform a simple UPDATE
    3059             :              * of the partition or move the updated tuple into a different
    3060             :              * suitable partition.
    3061             :              */
    3062             :             {
    3063             :                 TupleTableSlot *localslot;
    3064             :                 ResultRelInfo *partrelinfo_new;
    3065             :                 Relation    partrel_new;
    3066             :                 bool        found;
    3067             :                 EPQState    epqstate;
    3068          26 :                 ConflictTupleInfo conflicttuple = {0};
    3069             : 
    3070             :                 /* Get the matching local tuple from the partition. */
    3071          26 :                 found = FindReplTupleInLocalRel(edata, partrel,
    3072             :                                                 &part_entry->remoterel,
    3073             :                                                 part_entry->localindexoid,
    3074             :                                                 remoteslot_part, &localslot);
    3075          26 :                 if (!found)
    3076             :                 {
    3077           4 :                     TupleTableSlot *newslot = localslot;
    3078             : 
    3079             :                     /* Store the new tuple for conflict reporting */
    3080           4 :                     slot_store_data(newslot, part_entry, newtup);
    3081             : 
    3082             :                     /*
    3083             :                      * The tuple to be updated could not be found.  Do nothing
    3084             :                      * except for emitting a log message.
    3085             :                      */
    3086           4 :                     ReportApplyConflict(estate, partrelinfo, LOG,
    3087             :                                         CT_UPDATE_MISSING, remoteslot_part,
    3088           4 :                                         newslot, list_make1(&conflicttuple));
    3089             : 
    3090           4 :                     return;
    3091             :                 }
    3092             : 
    3093             :                 /*
    3094             :                  * Report the conflict if the tuple was modified by a
    3095             :                  * different origin.
    3096             :                  */
    3097          22 :                 if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
    3098             :                                             &conflicttuple.origin,
    3099           2 :                                             &conflicttuple.ts) &&
    3100           2 :                     conflicttuple.origin != replorigin_session_origin)
    3101             :                 {
    3102             :                     TupleTableSlot *newslot;
    3103             : 
    3104             :                     /* Store the new tuple for conflict reporting */
    3105           2 :                     newslot = table_slot_create(partrel, &estate->es_tupleTable);
    3106           2 :                     slot_store_data(newslot, part_entry, newtup);
    3107             : 
    3108           2 :                     conflicttuple.slot = localslot;
    3109             : 
    3110           2 :                     ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
    3111             :                                         remoteslot_part, newslot,
    3112           2 :                                         list_make1(&conflicttuple));
    3113             :                 }
    3114             : 
    3115             :                 /*
    3116             :                  * Apply the update to the local tuple, putting the result in
    3117             :                  * remoteslot_part.
    3118             :                  */
    3119          22 :                 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    3120          22 :                 slot_modify_data(remoteslot_part, localslot, part_entry,
    3121             :                                  newtup);
    3122          22 :                 MemoryContextSwitchTo(oldctx);
    3123             : 
    3124          22 :                 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
    3125             : 
    3126             :                 /*
    3127             :                  * Does the updated tuple still satisfy the current
    3128             :                  * partition's constraint?
    3129             :                  */
    3130          44 :                 if (!partrel->rd_rel->relispartition ||
    3131          22 :                     ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
    3132             :                                        false))
    3133             :                 {
    3134             :                     /*
    3135             :                      * Yes, so simply UPDATE the partition.  We don't call
    3136             :                      * apply_handle_update_internal() here, which would
    3137             :                      * normally do the following work, to avoid repeating some
    3138             :                      * work already done above to find the local tuple in the
    3139             :                      * partition.
    3140             :                      */
    3141          20 :                     InitConflictIndexes(partrelinfo);
    3142             : 
    3143          20 :                     EvalPlanQualSetSlot(&epqstate, remoteslot_part);
    3144          20 :                     TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
    3145             :                                           ACL_UPDATE);
    3146          20 :                     ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
    3147             :                                              localslot, remoteslot_part);
    3148             :                 }
    3149             :                 else
    3150             :                 {
    3151             :                     /* Move the tuple into the new partition. */
    3152             : 
    3153             :                     /*
    3154             :                      * New partition will be found using tuple routing, which
    3155             :                      * can only occur via the parent table.  We might need to
    3156             :                      * convert the tuple to the parent's rowtype.  Note that
    3157             :                      * this is the tuple found in the partition, not the
    3158             :                      * original search tuple received by this function.
    3159             :                      */
    3160           2 :                     if (map)
    3161             :                     {
    3162             :                         TupleConversionMap *PartitionToRootMap =
    3163           2 :                             convert_tuples_by_name(RelationGetDescr(partrel),
    3164             :                                                    RelationGetDescr(parentrel));
    3165             : 
    3166             :                         remoteslot =
    3167           2 :                             execute_attr_map_slot(PartitionToRootMap->attrMap,
    3168             :                                                   remoteslot_part, remoteslot);
    3169             :                     }
    3170             :                     else
    3171             :                     {
    3172           0 :                         remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
    3173           0 :                         slot_getallattrs(remoteslot);
    3174             :                     }
    3175             : 
    3176             :                     /* Find the new partition. */
    3177           2 :                     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    3178           2 :                     partrelinfo_new = ExecFindPartition(mtstate, relinfo,
    3179             :                                                         proute, remoteslot,
    3180             :                                                         estate);
    3181           2 :                     MemoryContextSwitchTo(oldctx);
    3182             :                     Assert(partrelinfo_new != partrelinfo);
    3183           2 :                     partrel_new = partrelinfo_new->ri_RelationDesc;
    3184             : 
    3185             :                     /* Check that new partition also has supported relkind. */
    3186           2 :                     CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
    3187           2 :                                              get_namespace_name(RelationGetNamespace(partrel_new)),
    3188           2 :                                              RelationGetRelationName(partrel_new));
    3189             : 
    3190             :                     /* DELETE old tuple found in the old partition. */
    3191           2 :                     EvalPlanQualSetSlot(&epqstate, localslot);
    3192           2 :                     TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, ACL_DELETE);
    3193           2 :                     ExecSimpleRelationDelete(partrelinfo, estate, &epqstate, localslot);
    3194             : 
    3195             :                     /* INSERT new tuple into the new partition. */
    3196             : 
    3197             :                     /*
    3198             :                      * Convert the replacement tuple to match the destination
    3199             :                      * partition rowtype.
    3200             :                      */
    3201           2 :                     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    3202           2 :                     remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
    3203           2 :                     if (remoteslot_part == NULL)
    3204           2 :                         remoteslot_part = table_slot_create(partrel_new,
    3205             :                                                             &estate->es_tupleTable);
    3206           2 :                     map = ExecGetRootToChildMap(partrelinfo_new, estate);
    3207           2 :                     if (map != NULL)
    3208             :                     {
    3209           0 :                         remoteslot_part = execute_attr_map_slot(map->attrMap,
    3210             :                                                                 remoteslot,
    3211             :                                                                 remoteslot_part);
    3212             :                     }
    3213             :                     else
    3214             :                     {
    3215           2 :                         remoteslot_part = ExecCopySlot(remoteslot_part,
    3216             :                                                        remoteslot);
    3217           2 :                         slot_getallattrs(remoteslot);
    3218             :                     }
    3219           2 :                     MemoryContextSwitchTo(oldctx);
    3220           2 :                     apply_handle_insert_internal(edata, partrelinfo_new,
    3221             :                                                  remoteslot_part);
    3222             :                 }
    3223             : 
    3224          22 :                 EvalPlanQualEnd(&epqstate);
    3225             :             }
    3226          22 :             break;
    3227             : 
    3228           0 :         default:
    3229           0 :             elog(ERROR, "unrecognized CmdType: %d", (int) operation);
    3230             :             break;
    3231             :     }
    3232             : }
    3233             : 
    3234             : /*
    3235             :  * Handle TRUNCATE message.
    3236             :  *
    3237             :  * TODO: FDW support
    3238             :  */
    3239             : static void
    3240          38 : apply_handle_truncate(StringInfo s)
    3241             : {
    3242          38 :     bool        cascade = false;
    3243          38 :     bool        restart_seqs = false;
    3244          38 :     List       *remote_relids = NIL;
    3245          38 :     List       *remote_rels = NIL;
    3246          38 :     List       *rels = NIL;
    3247          38 :     List       *part_rels = NIL;
    3248          38 :     List       *relids = NIL;
    3249          38 :     List       *relids_logged = NIL;
    3250             :     ListCell   *lc;
    3251          38 :     LOCKMODE    lockmode = AccessExclusiveLock;
    3252             : 
    3253             :     /*
    3254             :      * Quick return if we are skipping data modification changes or handling
    3255             :      * streamed transactions.
    3256             :      */
    3257          76 :     if (is_skipping_changes() ||
    3258          38 :         handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
    3259           0 :         return;
    3260             : 
    3261          38 :     begin_replication_step();
    3262             : 
    3263          38 :     remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
    3264             : 
    3265          94 :     foreach(lc, remote_relids)
    3266             :     {
    3267          56 :         LogicalRepRelId relid = lfirst_oid(lc);
    3268             :         LogicalRepRelMapEntry *rel;
    3269             : 
    3270          56 :         rel = logicalrep_rel_open(relid, lockmode);
    3271          56 :         if (!should_apply_changes_for_rel(rel))
    3272             :         {
    3273             :             /*
    3274             :              * The relation can't become interesting in the middle of the
    3275             :              * transaction so it's safe to unlock it.
    3276             :              */
    3277           0 :             logicalrep_rel_close(rel, lockmode);
    3278           0 :             continue;
    3279             :         }
    3280             : 
    3281          56 :         remote_rels = lappend(remote_rels, rel);
    3282          56 :         TargetPrivilegesCheck(rel->localrel, ACL_TRUNCATE);
    3283          56 :         rels = lappend(rels, rel->localrel);
    3284          56 :         relids = lappend_oid(relids, rel->localreloid);
    3285          56 :         if (RelationIsLogicallyLogged(rel->localrel))
    3286           0 :             relids_logged = lappend_oid(relids_logged, rel->localreloid);
    3287             : 
    3288             :         /*
    3289             :          * Truncate partitions if we got a message to truncate a partitioned
    3290             :          * table.
    3291             :          */
    3292          56 :         if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
    3293             :         {
    3294             :             ListCell   *child;
    3295           8 :             List       *children = find_all_inheritors(rel->localreloid,
    3296             :                                                        lockmode,
    3297             :                                                        NULL);
    3298             : 
    3299          30 :             foreach(child, children)
    3300             :             {
    3301          22 :                 Oid         childrelid = lfirst_oid(child);
    3302             :                 Relation    childrel;
    3303             : 
    3304          22 :                 if (list_member_oid(relids, childrelid))
    3305           8 :                     continue;
    3306             : 
    3307             :                 /* find_all_inheritors already got lock */
    3308          14 :                 childrel = table_open(childrelid, NoLock);
    3309             : 
    3310             :                 /*
    3311             :                  * Ignore temp tables of other backends.  See similar code in
    3312             :                  * ExecuteTruncate().
    3313             :                  */
    3314          14 :                 if (RELATION_IS_OTHER_TEMP(childrel))
    3315             :                 {
    3316           0 :                     table_close(childrel, lockmode);
    3317           0 :                     continue;
    3318             :                 }
    3319             : 
    3320          14 :                 TargetPrivilegesCheck(childrel, ACL_TRUNCATE);
    3321          14 :                 rels = lappend(rels, childrel);
    3322          14 :                 part_rels = lappend(part_rels, childrel);
    3323          14 :                 relids = lappend_oid(relids, childrelid);
    3324             :                 /* Log this relation only if needed for logical decoding */
    3325          14 :                 if (RelationIsLogicallyLogged(childrel))
    3326           0 :                     relids_logged = lappend_oid(relids_logged, childrelid);
    3327             :             }
    3328             :         }
    3329             :     }
    3330             : 
    3331             :     /*
    3332             :      * Even if we used CASCADE on the upstream primary we explicitly default
    3333             :      * to replaying changes without further cascading. This might be later
    3334             :      * changeable with a user specified option.
    3335             :      *
    3336             :      * MySubscription->runasowner tells us whether we want to execute
    3337             :      * replication actions as the subscription owner; the last argument to
    3338             :      * TruncateGuts tells it whether we want to switch to the table owner.
    3339             :      * Those are exactly opposite conditions.
    3340             :      */
    3341          38 :     ExecuteTruncateGuts(rels,
    3342             :                         relids,
    3343             :                         relids_logged,
    3344             :                         DROP_RESTRICT,
    3345             :                         restart_seqs,
    3346          38 :                         !MySubscription->runasowner);
    3347          94 :     foreach(lc, remote_rels)
    3348             :     {
    3349          56 :         LogicalRepRelMapEntry *rel = lfirst(lc);
    3350             : 
    3351          56 :         logicalrep_rel_close(rel, NoLock);
    3352             :     }
    3353          52 :     foreach(lc, part_rels)
    3354             :     {
    3355          14 :         Relation    rel = lfirst(lc);
    3356             : 
    3357          14 :         table_close(rel, NoLock);
    3358             :     }
    3359             : 
    3360          38 :     end_replication_step();
    3361             : }
    3362             : 
    3363             : 
    3364             : /*
    3365             :  * Logical replication protocol message dispatcher.
    3366             :  */
    3367             : void
    3368      674314 : apply_dispatch(StringInfo s)
    3369             : {
    3370      674314 :     LogicalRepMsgType action = pq_getmsgbyte(s);
    3371             :     LogicalRepMsgType saved_command;
    3372             : 
    3373             :     /*
    3374             :      * Set the current command being applied. Since this function can be
    3375             :      * called recursively when applying spooled changes, save the current
    3376             :      * command.
    3377             :      */
    3378      674314 :     saved_command = apply_error_callback_arg.command;
    3379      674314 :     apply_error_callback_arg.command = action;
    3380             : 
    3381      674314 :     switch (action)
    3382             :     {
    3383         908 :         case LOGICAL_REP_MSG_BEGIN:
    3384         908 :             apply_handle_begin(s);
    3385         908 :             break;
    3386             : 
    3387         848 :         case LOGICAL_REP_MSG_COMMIT:
    3388         848 :             apply_handle_commit(s);
    3389         848 :             break;
    3390             : 
    3391      371736 :         case LOGICAL_REP_MSG_INSERT:
    3392      371736 :             apply_handle_insert(s);
    3393      371688 :             break;
    3394             : 
    3395      132318 :         case LOGICAL_REP_MSG_UPDATE:
    3396      132318 :             apply_handle_update(s);
    3397      132302 :             break;
    3398             : 
    3399      163866 :         case LOGICAL_REP_MSG_DELETE:
    3400      163866 :             apply_handle_delete(s);
    3401      163866 :             break;
    3402             : 
    3403          38 :         case LOGICAL_REP_MSG_TRUNCATE:
    3404          38 :             apply_handle_truncate(s);
    3405          38 :             break;
    3406             : 
    3407         870 :         case LOGICAL_REP_MSG_RELATION:
    3408         870 :             apply_handle_relation(s);
    3409         870 :             break;
    3410             : 
    3411          36 :         case LOGICAL_REP_MSG_TYPE:
    3412          36 :             apply_handle_type(s);
    3413          36 :             break;
    3414             : 
    3415          16 :         case LOGICAL_REP_MSG_ORIGIN:
    3416          16 :             apply_handle_origin(s);
    3417          16 :             break;
    3418             : 
    3419           0 :         case LOGICAL_REP_MSG_MESSAGE:
    3420             : 
    3421             :             /*
    3422             :              * Logical replication does not use generic logical messages yet.
    3423             :              * Although, it could be used by other applications that use this
    3424             :              * output plugin.
    3425             :              */
    3426           0 :             break;
    3427             : 
    3428        1674 :         case LOGICAL_REP_MSG_STREAM_START:
    3429        1674 :             apply_handle_stream_start(s);
    3430        1674 :             break;
    3431             : 
    3432        1672 :         case LOGICAL_REP_MSG_STREAM_STOP:
    3433        1672 :             apply_handle_stream_stop(s);
    3434        1668 :             break;
    3435             : 
    3436          76 :         case LOGICAL_REP_MSG_STREAM_ABORT:
    3437          76 :             apply_handle_stream_abort(s);
    3438          76 :             break;
    3439             : 
    3440         122 :         case LOGICAL_REP_MSG_STREAM_COMMIT:
    3441         122 :             apply_handle_stream_commit(s);
    3442         118 :             break;
    3443             : 
    3444          32 :         case LOGICAL_REP_MSG_BEGIN_PREPARE:
    3445          32 :             apply_handle_begin_prepare(s);
    3446          32 :             break;
    3447             : 
    3448          30 :         case LOGICAL_REP_MSG_PREPARE:
    3449          30 :             apply_handle_prepare(s);
    3450          28 :             break;
    3451             : 
    3452          40 :         case LOGICAL_REP_MSG_COMMIT_PREPARED:
    3453          40 :             apply_handle_commit_prepared(s);
    3454          40 :             break;
    3455             : 
    3456          10 :         case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
    3457          10 :             apply_handle_rollback_prepared(s);
    3458          10 :             break;
    3459             : 
    3460          22 :         case LOGICAL_REP_MSG_STREAM_PREPARE:
    3461          22 :             apply_handle_stream_prepare(s);
    3462          22 :             break;
    3463             : 
    3464           0 :         default:
    3465           0 :             ereport(ERROR,
    3466             :                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
    3467             :                      errmsg("invalid logical replication message type \"??? (%d)\"", action)));
    3468             :     }
    3469             : 
    3470             :     /* Reset the current command */
    3471      674240 :     apply_error_callback_arg.command = saved_command;
    3472      674240 : }
    3473             : 
    3474             : /*
    3475             :  * Figure out which write/flush positions to report to the walsender process.
    3476             :  *
    3477             :  * We can't simply report back the last LSN the walsender sent us because the
    3478             :  * local transaction might not yet be flushed to disk locally. Instead we
    3479             :  * build a list that associates local with remote LSNs for every commit. When
    3480             :  * reporting back the flush position to the sender we iterate that list and
    3481             :  * check which entries on it are already locally flushed. Those we can report
    3482             :  * as having been flushed.
    3483             :  *
    3484             :  * The have_pending_txes is true if there are outstanding transactions that
    3485             :  * need to be flushed.
    3486             :  */
    3487             : static void
    3488      130688 : get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
    3489             :                    bool *have_pending_txes)
    3490             : {
    3491             :     dlist_mutable_iter iter;
    3492      130688 :     XLogRecPtr  local_flush = GetFlushRecPtr(NULL);
    3493             : 
    3494      130688 :     *write = InvalidXLogRecPtr;
    3495      130688 :     *flush = InvalidXLogRecPtr;
    3496             : 
    3497      131640 :     dlist_foreach_modify(iter, &lsn_mapping)
    3498             :     {
    3499       24944 :         FlushPosition *pos =
    3500       24944 :             dlist_container(FlushPosition, node, iter.cur);
    3501             : 
    3502       24944 :         *write = pos->remote_end;
    3503             : 
    3504       24944 :         if (pos->local_end <= local_flush)
    3505             :         {
    3506         952 :             *flush = pos->remote_end;
    3507         952 :             dlist_delete(iter.cur);
    3508         952 :             pfree(pos);
    3509             :         }
    3510             :         else
    3511             :         {
    3512             :             /*
    3513             :              * Don't want to uselessly iterate over the rest of the list which
    3514             :              * could potentially be long. Instead get the last element and
    3515             :              * grab the write position from there.
    3516             :              */
    3517       23992 :             pos = dlist_tail_element(FlushPosition, node,
    3518             :                                      &lsn_mapping);
    3519       23992 :             *write = pos->remote_end;
    3520       23992 :             *have_pending_txes = true;
    3521       23992 :             return;
    3522             :         }
    3523             :     }
    3524             : 
    3525      106696 :     *have_pending_txes = !dlist_is_empty(&lsn_mapping);
    3526             : }
    3527             : 
    3528             : /*
    3529             :  * Store current remote/local lsn pair in the tracking list.
    3530             :  */
    3531             : void
    3532        1060 : store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
    3533             : {
    3534             :     FlushPosition *flushpos;
    3535             : 
    3536             :     /*
    3537             :      * Skip for parallel apply workers, because the lsn_mapping is maintained
    3538             :      * by the leader apply worker.
    3539             :      */
    3540        1060 :     if (am_parallel_apply_worker())
    3541          38 :         return;
    3542             : 
    3543             :     /* Need to do this in permanent context */
    3544        1022 :     MemoryContextSwitchTo(ApplyContext);
    3545             : 
    3546             :     /* Track commit lsn  */
    3547        1022 :     flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
    3548        1022 :     flushpos->local_end = local_lsn;
    3549        1022 :     flushpos->remote_end = remote_lsn;
    3550             : 
    3551        1022 :     dlist_push_tail(&lsn_mapping, &flushpos->node);
    3552        1022 :     MemoryContextSwitchTo(ApplyMessageContext);
    3553             : }
    3554             : 
    3555             : 
    3556             : /* Update statistics of the worker. */
    3557             : static void
    3558      374344 : UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
    3559             : {
    3560      374344 :     MyLogicalRepWorker->last_lsn = last_lsn;
    3561      374344 :     MyLogicalRepWorker->last_send_time = send_time;
    3562      374344 :     MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
    3563      374344 :     if (reply)
    3564             :     {
    3565        4774 :         MyLogicalRepWorker->reply_lsn = last_lsn;
    3566        4774 :         MyLogicalRepWorker->reply_time = send_time;
    3567             :     }
    3568      374344 : }
    3569             : 
    3570             : /*
    3571             :  * Apply main loop.
    3572             :  */
    3573             : static void
    3574         746 : LogicalRepApplyLoop(XLogRecPtr last_received)
    3575             : {
    3576         746 :     TimestampTz last_recv_timestamp = GetCurrentTimestamp();
    3577         746 :     bool        ping_sent = false;
    3578             :     TimeLineID  tli;
    3579             :     ErrorContextCallback errcallback;
    3580             : 
    3581             :     /*
    3582             :      * Init the ApplyMessageContext which we clean up after each replication
    3583             :      * protocol message.
    3584             :      */
    3585         746 :     ApplyMessageContext = AllocSetContextCreate(ApplyContext,
    3586             :                                                 "ApplyMessageContext",
    3587             :                                                 ALLOCSET_DEFAULT_SIZES);
    3588             : 
    3589             :     /*
    3590             :      * This memory context is used for per-stream data when the streaming mode
    3591             :      * is enabled. This context is reset on each stream stop.
    3592             :      */
    3593         746 :     LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
    3594             :                                                     "LogicalStreamingContext",
    3595             :                                                     ALLOCSET_DEFAULT_SIZES);
    3596             : 
    3597             :     /* mark as idle, before starting to loop */
    3598         746 :     pgstat_report_activity(STATE_IDLE, NULL);
    3599             : 
    3600             :     /*
    3601             :      * Push apply error context callback. Fields will be filled while applying
    3602             :      * a change.
    3603             :      */
    3604         746 :     errcallback.callback = apply_error_callback;
    3605         746 :     errcallback.previous = error_context_stack;
    3606         746 :     error_context_stack = &errcallback;
    3607         746 :     apply_error_context_stack = error_context_stack;
    3608             : 
    3609             :     /* This outer loop iterates once per wait. */
    3610             :     for (;;)
    3611      124956 :     {
    3612      125702 :         pgsocket    fd = PGINVALID_SOCKET;
    3613             :         int         rc;
    3614             :         int         len;
    3615      125702 :         char       *buf = NULL;
    3616      125702 :         bool        endofstream = false;
    3617             :         long        wait_time;
    3618             : 
    3619      125702 :         CHECK_FOR_INTERRUPTS();
    3620             : 
    3621      125700 :         MemoryContextSwitchTo(ApplyMessageContext);
    3622             : 
    3623      125700 :         len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
    3624             : 
    3625      125668 :         if (len != 0)
    3626             :         {
    3627             :             /* Loop to process all available data (without blocking). */
    3628             :             for (;;)
    3629             :             {
    3630      497986 :                 CHECK_FOR_INTERRUPTS();
    3631             : 
    3632      497986 :                 if (len == 0)
    3633             :                 {
    3634      123624 :                     break;
    3635             :                 }
    3636      374362 :                 else if (len < 0)
    3637             :                 {
    3638          18 :                     ereport(LOG,
    3639             :                             (errmsg("data stream from publisher has ended")));
    3640          18 :                     endofstream = true;
    3641          18 :                     break;
    3642             :                 }
    3643             :                 else
    3644             :                 {
    3645             :                     int         c;
    3646             :                     StringInfoData s;
    3647             : 
    3648      374344 :                     if (ConfigReloadPending)
    3649             :                     {
    3650           0 :                         ConfigReloadPending = false;
    3651           0 :                         ProcessConfigFile(PGC_SIGHUP);
    3652             :                     }
    3653             : 
    3654             :                     /* Reset timeout. */
    3655      374344 :                     last_recv_timestamp = GetCurrentTimestamp();
    3656      374344 :                     ping_sent = false;
    3657             : 
    3658             :                     /* Ensure we are reading the data into our memory context. */
    3659      374344 :                     MemoryContextSwitchTo(ApplyMessageContext);
    3660             : 
    3661      374344 :                     initReadOnlyStringInfo(&s, buf, len);
    3662             : 
    3663      374344 :                     c = pq_getmsgbyte(&s);
    3664             : 
    3665      374344 :                     if (c == 'w')
    3666             :                     {
    3667             :                         XLogRecPtr  start_lsn;
    3668             :                         XLogRecPtr  end_lsn;
    3669             :                         TimestampTz send_time;
    3670             : 
    3671      369570 :                         start_lsn = pq_getmsgint64(&s);
    3672      369570 :                         end_lsn = pq_getmsgint64(&s);
    3673      369570 :                         send_time = pq_getmsgint64(&s);
    3674             : 
    3675      369570 :                         if (last_received < start_lsn)
    3676      297208 :                             last_received = start_lsn;
    3677             : 
    3678      369570 :                         if (last_received < end_lsn)
    3679           0 :                             last_received = end_lsn;
    3680             : 
    3681      369570 :                         UpdateWorkerStats(last_received, send_time, false);
    3682             : 
    3683      369570 :                         apply_dispatch(&s);
    3684             :                     }
    3685        4774 :                     else if (c == 'k')
    3686             :                     {
    3687             :                         XLogRecPtr  end_lsn;
    3688             :                         TimestampTz timestamp;
    3689             :                         bool        reply_requested;
    3690             : 
    3691        4774 :                         end_lsn = pq_getmsgint64(&s);
    3692        4774 :                         timestamp = pq_getmsgint64(&s);
    3693        4774 :                         reply_requested = pq_getmsgbyte(&s);
    3694             : 
    3695        4774 :                         if (last_received < end_lsn)
    3696        1748 :                             last_received = end_lsn;
    3697             : 
    3698        4774 :                         send_feedback(last_received, reply_requested, false);
    3699        4774 :                         UpdateWorkerStats(last_received, timestamp, true);
    3700             :                     }
    3701             :                     /* other message types are purposefully ignored */
    3702             : 
    3703      374276 :                     MemoryContextReset(ApplyMessageContext);
    3704             :                 }
    3705             : 
    3706      374276 :                 len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
    3707             :             }
    3708             :         }
    3709             : 
    3710             :         /* confirm all writes so far */
    3711      125600 :         send_feedback(last_received, false, false);
    3712             : 
    3713      125600 :         if (!in_remote_transaction && !in_streamed_transaction)
    3714             :         {
    3715             :             /*
    3716             :              * If we didn't get any transactions for a while there might be
    3717             :              * unconsumed invalidation messages in the queue, consume them
    3718             :              * now.
    3719             :              */
    3720        6692 :             AcceptInvalidationMessages();
    3721        6692 :             maybe_reread_subscription();
    3722             : 
    3723             :             /* Process any table synchronization changes. */
    3724        6624 :             process_syncing_tables(last_received);
    3725             :         }
    3726             : 
    3727             :         /* Cleanup the memory. */
    3728      125154 :         MemoryContextReset(ApplyMessageContext);
    3729      125154 :         MemoryContextSwitchTo(TopMemoryContext);
    3730             : 
    3731             :         /* Check if we need to exit the streaming loop. */
    3732      125154 :         if (endofstream)
    3733          18 :             break;
    3734             : 
    3735             :         /*
    3736             :          * Wait for more data or latch.  If we have unflushed transactions,
    3737             :          * wake up after WalWriterDelay to see if they've been flushed yet (in
    3738             :          * which case we should send a feedback message).  Otherwise, there's
    3739             :          * no particular urgency about waking up unless we get data or a
    3740             :          * signal.
    3741             :          */
    3742      125136 :         if (!dlist_is_empty(&lsn_mapping))
    3743       21338 :             wait_time = WalWriterDelay;
    3744             :         else
    3745      103798 :             wait_time = NAPTIME_PER_CYCLE;
    3746             : 
    3747      125136 :         rc = WaitLatchOrSocket(MyLatch,
    3748             :                                WL_SOCKET_READABLE | WL_LATCH_SET |
    3749             :                                WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    3750             :                                fd, wait_time,
    3751             :                                WAIT_EVENT_LOGICAL_APPLY_MAIN);
    3752             : 
    3753      125136 :         if (rc & WL_LATCH_SET)
    3754             :         {
    3755        1378 :             ResetLatch(MyLatch);
    3756        1378 :             CHECK_FOR_INTERRUPTS();
    3757             :         }
    3758             : 
    3759      124956 :         if (ConfigReloadPending)
    3760             :         {
    3761          16 :             ConfigReloadPending = false;
    3762          16 :             ProcessConfigFile(PGC_SIGHUP);
    3763             :         }
    3764             : 
    3765      124956 :         if (rc & WL_TIMEOUT)
    3766             :         {
    3767             :             /*
    3768             :              * We didn't receive anything new. If we haven't heard anything
    3769             :              * from the server for more than wal_receiver_timeout / 2, ping
    3770             :              * the server. Also, if it's been longer than
    3771             :              * wal_receiver_status_interval since the last update we sent,
    3772             :              * send a status update to the primary anyway, to report any
    3773             :              * progress in applying WAL.
    3774             :              */
    3775         314 :             bool        requestReply = false;
    3776             : 
    3777             :             /*
    3778             :              * Check if time since last receive from primary has reached the
    3779             :              * configured limit.
    3780             :              */
    3781         314 :             if (wal_receiver_timeout > 0)
    3782             :             {
    3783         314 :                 TimestampTz now = GetCurrentTimestamp();
    3784             :                 TimestampTz timeout;
    3785             : 
    3786         314 :                 timeout =
    3787         314 :                     TimestampTzPlusMilliseconds(last_recv_timestamp,
    3788             :                                                 wal_receiver_timeout);
    3789             : 
    3790         314 :                 if (now >= timeout)
    3791           0 :                     ereport(ERROR,
    3792             :                             (errcode(ERRCODE_CONNECTION_FAILURE),
    3793             :                              errmsg("terminating logical replication worker due to timeout")));
    3794             : 
    3795             :                 /* Check to see if it's time for a ping. */
    3796         314 :                 if (!ping_sent)
    3797             :                 {
    3798         314 :                     timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
    3799             :                                                           (wal_receiver_timeout / 2));
    3800         314 :                     if (now >= timeout)
    3801             :                     {
    3802           0 :                         requestReply = true;
    3803           0 :                         ping_sent = true;
    3804             :                     }
    3805             :                 }
    3806             :             }
    3807             : 
    3808         314 :             send_feedback(last_received, requestReply, requestReply);
    3809             : 
    3810             :             /*
    3811             :              * Force reporting to ensure long idle periods don't lead to
    3812             :              * arbitrarily delayed stats. Stats can only be reported outside
    3813             :              * of (implicit or explicit) transactions. That shouldn't lead to
    3814             :              * stats being delayed for long, because transactions are either
    3815             :              * sent as a whole on commit or streamed. Streamed transactions
    3816             :              * are spilled to disk and applied on commit.
    3817             :              */
    3818         314 :             if (!IsTransactionState())
    3819         314 :                 pgstat_report_stat(true);
    3820             :         }
    3821             :     }
    3822             : 
    3823             :     /* Pop the error context stack */
    3824          18 :     error_context_stack = errcallback.previous;
    3825          18 :     apply_error_context_stack = error_context_stack;
    3826             : 
    3827             :     /* All done */
    3828          18 :     walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
    3829           0 : }
    3830             : 
    3831             : /*
    3832             :  * Send a Standby Status Update message to server.
    3833             :  *
    3834             :  * 'recvpos' is the latest LSN we've received data to, force is set if we need
    3835             :  * to send a response to avoid timeouts.
    3836             :  */
    3837             : static void
    3838      130688 : send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
    3839             : {
    3840             :     static StringInfo reply_message = NULL;
    3841             :     static TimestampTz send_time = 0;
    3842             : 
    3843             :     static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
    3844             :     static XLogRecPtr last_writepos = InvalidXLogRecPtr;
    3845             :     static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
    3846             : 
    3847             :     XLogRecPtr  writepos;
    3848             :     XLogRecPtr  flushpos;
    3849             :     TimestampTz now;
    3850             :     bool        have_pending_txes;
    3851             : 
    3852             :     /*
    3853             :      * If the user doesn't want status to be reported to the publisher, be
    3854             :      * sure to exit before doing anything at all.
    3855             :      */
    3856      130688 :     if (!force && wal_receiver_status_interval <= 0)
    3857       43560 :         return;
    3858             : 
    3859             :     /* It's legal to not pass a recvpos */
    3860      130688 :     if (recvpos < last_recvpos)
    3861           0 :         recvpos = last_recvpos;
    3862             : 
    3863      130688 :     get_flush_position(&writepos, &flushpos, &have_pending_txes);
    3864             : 
    3865             :     /*
    3866             :      * No outstanding transactions to flush, we can report the latest received
    3867             :      * position. This is important for synchronous replication.
    3868             :      */
    3869      130688 :     if (!have_pending_txes)
    3870      106696 :         flushpos = writepos = recvpos;
    3871             : 
    3872      130688 :     if (writepos < last_writepos)
    3873           0 :         writepos = last_writepos;
    3874             : 
    3875      130688 :     if (flushpos < last_flushpos)
    3876       23910 :         flushpos = last_flushpos;
    3877             : 
    3878      130688 :     now = GetCurrentTimestamp();
    3879             : 
    3880             :     /* if we've already reported everything we're good */
    3881      130688 :     if (!force &&
    3882      129016 :         writepos == last_writepos &&
    3883       44062 :         flushpos == last_flushpos &&
    3884       43804 :         !TimestampDifferenceExceeds(send_time, now,
    3885             :                                     wal_receiver_status_interval * 1000))
    3886       43560 :         return;
    3887       87128 :     send_time = now;
    3888             : 
    3889       87128 :     if (!reply_message)
    3890             :     {
    3891         746 :         MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
    3892             : 
    3893         746 :         reply_message = makeStringInfo();
    3894         746 :         MemoryContextSwitchTo(oldctx);
    3895             :     }
    3896             :     else
    3897       86382 :         resetStringInfo(reply_message);
    3898             : 
    3899       87128 :     pq_sendbyte(reply_message, 'r');
    3900       87128 :     pq_sendint64(reply_message, recvpos);   /* write */
    3901       87128 :     pq_sendint64(reply_message, flushpos);  /* flush */
    3902       87128 :     pq_sendint64(reply_message, writepos);  /* apply */
    3903       87128 :     pq_sendint64(reply_message, now);   /* sendTime */
    3904       87128 :     pq_sendbyte(reply_message, requestReply);   /* replyRequested */
    3905             : 
    3906       87128 :     elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
    3907             :          force,
    3908             :          LSN_FORMAT_ARGS(recvpos),
    3909             :          LSN_FORMAT_ARGS(writepos),
    3910             :          LSN_FORMAT_ARGS(flushpos));
    3911             : 
    3912       87128 :     walrcv_send(LogRepWorkerWalRcvConn,
    3913             :                 reply_message->data, reply_message->len);
    3914             : 
    3915       87128 :     if (recvpos > last_recvpos)
    3916       84962 :         last_recvpos = recvpos;
    3917       87128 :     if (writepos > last_writepos)
    3918       84960 :         last_writepos = writepos;
    3919       87128 :     if (flushpos > last_flushpos)
    3920       84444 :         last_flushpos = flushpos;
    3921             : }
    3922             : 
    3923             : /*
    3924             :  * Exit routine for apply workers due to subscription parameter changes.
    3925             :  */
    3926             : static void
    3927          74 : apply_worker_exit(void)
    3928             : {
    3929          74 :     if (am_parallel_apply_worker())
    3930             :     {
    3931             :         /*
    3932             :          * Don't stop the parallel apply worker as the leader will detect the
    3933             :          * subscription parameter change and restart logical replication later
    3934             :          * anyway. This also prevents the leader from reporting errors when
    3935             :          * trying to communicate with a stopped parallel apply worker, which
    3936             :          * would accidentally disable subscriptions if disable_on_error was
    3937             :          * set.
    3938             :          */
    3939           0 :         return;
    3940             :     }
    3941             : 
    3942             :     /*
    3943             :      * Reset the last-start time for this apply worker so that the launcher
    3944             :      * will restart it without waiting for wal_retrieve_retry_interval if the
    3945             :      * subscription is still active, and so that we won't leak that hash table
    3946             :      * entry if it isn't.
    3947             :      */
    3948          74 :     if (am_leader_apply_worker())
    3949          74 :         ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
    3950             : 
    3951          74 :     proc_exit(0);
    3952             : }
    3953             : 
    3954             : /*
    3955             :  * Reread subscription info if needed.
    3956             :  *
    3957             :  * For significant changes, we react by exiting the current process; a new
    3958             :  * one will be launched afterwards if needed.
    3959             :  */
    3960             : void
    3961        8658 : maybe_reread_subscription(void)
    3962             : {
    3963             :     MemoryContext oldctx;
    3964             :     Subscription *newsub;
    3965        8658 :     bool        started_tx = false;
    3966             : 
    3967             :     /* When cache state is valid there is nothing to do here. */
    3968        8658 :     if (MySubscriptionValid)
    3969        8508 :         return;
    3970             : 
    3971             :     /* This function might be called inside or outside of transaction. */
    3972         150 :     if (!IsTransactionState())
    3973             :     {
    3974         142 :         StartTransactionCommand();
    3975         142 :         started_tx = true;
    3976             :     }
    3977             : 
    3978             :     /* Ensure allocations in permanent context. */
    3979         150 :     oldctx = MemoryContextSwitchTo(ApplyContext);
    3980             : 
    3981         150 :     newsub = GetSubscription(MyLogicalRepWorker->subid, true);
    3982             : 
    3983             :     /*
    3984             :      * Exit if the subscription was removed. This normally should not happen
    3985             :      * as the worker gets killed during DROP SUBSCRIPTION.
    3986             :      */
    3987         150 :     if (!newsub)
    3988             :     {
    3989           0 :         ereport(LOG,
    3990             :                 (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
    3991             :                         MySubscription->name)));
    3992             : 
    3993             :         /* Ensure we remove no-longer-useful entry for worker's start time */
    3994           0 :         if (am_leader_apply_worker())
    3995           0 :             ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
    3996             : 
    3997           0 :         proc_exit(0);
    3998             :     }
    3999             : 
    4000             :     /* Exit if the subscription was disabled. */
    4001         150 :     if (!newsub->enabled)
    4002             :     {
    4003          18 :         ereport(LOG,
    4004             :                 (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
    4005             :                         MySubscription->name)));
    4006             : 
    4007          18 :         apply_worker_exit();
    4008             :     }
    4009             : 
    4010             :     /* !slotname should never happen when enabled is true. */
    4011             :     Assert(newsub->slotname);
    4012             : 
    4013             :     /* two-phase cannot be altered while the worker is running */
    4014             :     Assert(newsub->twophasestate == MySubscription->twophasestate);
    4015             : 
    4016             :     /*
    4017             :      * Exit if any parameter that affects the remote connection was changed.
    4018             :      * The launcher will start a new worker but note that the parallel apply
    4019             :      * worker won't restart if the streaming option's value is changed from
    4020             :      * 'parallel' to any other value or the server decides not to stream the
    4021             :      * in-progress transaction.
    4022             :      */
    4023         132 :     if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
    4024         128 :         strcmp(newsub->name, MySubscription->name) != 0 ||
    4025         126 :         strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
    4026         126 :         newsub->binary != MySubscription->binary ||
    4027         114 :         newsub->stream != MySubscription->stream ||
    4028         104 :         newsub->passwordrequired != MySubscription->passwordrequired ||
    4029         104 :         strcmp(newsub->origin, MySubscription->origin) != 0 ||
    4030         104 :         newsub->owner != MySubscription->owner ||
    4031         102 :         !equal(newsub->publications, MySubscription->publications))
    4032             :     {
    4033          48 :         if (am_parallel_apply_worker())
    4034           0 :             ereport(LOG,
    4035             :                     (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
    4036             :                             MySubscription->name)));
    4037             :         else
    4038          48 :             ereport(LOG,
    4039             :                     (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
    4040             :                             MySubscription->name)));
    4041             : 
    4042          48 :         apply_worker_exit();
    4043             :     }
    4044             : 
    4045             :     /*
    4046             :      * Exit if the subscription owner's superuser privileges have been
    4047             :      * revoked.
    4048             :      */
    4049          84 :     if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
    4050             :     {
    4051           8 :         if (am_parallel_apply_worker())
    4052           0 :             ereport(LOG,
    4053             :                     errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
    4054             :                            MySubscription->name));
    4055             :         else
    4056           8 :             ereport(LOG,
    4057             :                     errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
    4058             :                            MySubscription->name));
    4059             : 
    4060           8 :         apply_worker_exit();
    4061             :     }
    4062             : 
    4063             :     /* Check for other changes that should never happen too. */
    4064          76 :     if (newsub->dbid != MySubscription->dbid)
    4065             :     {
    4066           0 :         elog(ERROR, "subscription %u changed unexpectedly",
    4067             :              MyLogicalRepWorker->subid);
    4068             :     }
    4069             : 
    4070             :     /* Clean old subscription info and switch to new one. */
    4071          76 :     FreeSubscription(MySubscription);
    4072          76 :     MySubscription = newsub;
    4073             : 
    4074          76 :     MemoryContextSwitchTo(oldctx);
    4075             : 
    4076             :     /* Change synchronous commit according to the user's wishes */
    4077          76 :     SetConfigOption("synchronous_commit", MySubscription->synccommit,
    4078             :                     PGC_BACKEND, PGC_S_OVERRIDE);
    4079             : 
    4080          76 :     if (started_tx)
    4081          74 :         CommitTransactionCommand();
    4082             : 
    4083          76 :     MySubscriptionValid = true;
    4084             : }
    4085             : 
    4086             : /*
    4087             :  * Callback from subscription syscache invalidation.
    4088             :  */
    4089             : static void
    4090         160 : subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
    4091             : {
    4092         160 :     MySubscriptionValid = false;
    4093         160 : }
    4094             : 
    4095             : /*
    4096             :  * subxact_info_write
    4097             :  *    Store information about subxacts for a toplevel transaction.
    4098             :  *
    4099             :  * For each subxact we store offset of its first change in the main file.
    4100             :  * The file is always over-written as a whole.
    4101             :  *
    4102             :  * XXX We should only store subxacts that were not aborted yet.
    4103             :  */
    4104             : static void
    4105         744 : subxact_info_write(Oid subid, TransactionId xid)
    4106             : {
    4107             :     char        path[MAXPGPATH];
    4108             :     Size        len;
    4109             :     BufFile    *fd;
    4110             : 
    4111             :     Assert(TransactionIdIsValid(xid));
    4112             : 
    4113             :     /* construct the subxact filename */
    4114         744 :     subxact_filename(path, subid, xid);
    4115             : 
    4116             :     /* Delete the subxacts file, if exists. */
    4117         744 :     if (subxact_data.nsubxacts == 0)
    4118             :     {
    4119         580 :         cleanup_subxact_info();
    4120         580 :         BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
    4121             : 
    4122         580 :         return;
    4123             :     }
    4124             : 
    4125             :     /*
    4126             :      * Create the subxact file if it not already created, otherwise open the
    4127             :      * existing file.
    4128             :      */
    4129         164 :     fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR,
    4130             :                             true);
    4131         164 :     if (fd == NULL)
    4132          16 :         fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path);
    4133             : 
    4134         164 :     len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
    4135             : 
    4136             :     /* Write the subxact count and subxact info */
    4137         164 :     BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
    4138         164 :     BufFileWrite(fd, subxact_data.subxacts, len);
    4139             : 
    4140         164 :     BufFileClose(fd);
    4141             : 
    4142             :     /* free the memory allocated for subxact info */
    4143         164 :     cleanup_subxact_info();
    4144             : }
    4145             : 
    4146             : /*
    4147             :  * subxact_info_read
    4148             :  *    Restore information about subxacts of a streamed transaction.
    4149             :  *
    4150             :  * Read information about subxacts into the structure subxact_data that can be
    4151             :  * used later.
    4152             :  */
    4153             : static void
    4154         688 : subxact_info_read(Oid subid, TransactionId xid)
    4155             : {
    4156             :     char        path[MAXPGPATH];
    4157             :     Size        len;
    4158             :     BufFile    *fd;
    4159             :     MemoryContext oldctx;
    4160             : 
    4161             :     Assert(!subxact_data.subxacts);
    4162             :     Assert(subxact_data.nsubxacts == 0);
    4163             :     Assert(subxact_data.nsubxacts_max == 0);
    4164             : 
    4165             :     /*
    4166             :      * If the subxact file doesn't exist that means we don't have any subxact
    4167             :      * info.
    4168             :      */
    4169         688 :     subxact_filename(path, subid, xid);
    4170         688 :     fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
    4171             :                             true);
    4172         688 :     if (fd == NULL)
    4173         530 :         return;
    4174             : 
    4175             :     /* read number of subxact items */
    4176         158 :     BufFileReadExact(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
    4177             : 
    4178         158 :     len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
    4179             : 
    4180             :     /* we keep the maximum as a power of 2 */
    4181         158 :     subxact_data.nsubxacts_max = 1 << my_log2(subxact_data.nsubxacts);
    4182             : 
    4183             :     /*
    4184             :      * Allocate subxact information in the logical streaming context. We need
    4185             :      * this information during the complete stream so that we can add the sub
    4186             :      * transaction info to this. On stream stop we will flush this information
    4187             :      * to the subxact file and reset the logical streaming context.
    4188             :      */
    4189         158 :     oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
    4190         158 :     subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
    4191             :                                    sizeof(SubXactInfo));
    4192         158 :     MemoryContextSwitchTo(oldctx);
    4193             : 
    4194         158 :     if (len > 0)
    4195         158 :         BufFileReadExact(fd, subxact_data.subxacts, len);
    4196             : 
    4197         158 :     BufFileClose(fd);
    4198             : }
    4199             : 
    4200             : /*
    4201             :  * subxact_info_add
    4202             :  *    Add information about a subxact (offset in the main file).
    4203             :  */
    4204             : static void
    4205      205026 : subxact_info_add(TransactionId xid)
    4206             : {
    4207      205026 :     SubXactInfo *subxacts = subxact_data.subxacts;
    4208             :     int64       i;
    4209             : 
    4210             :     /* We must have a valid top level stream xid and a stream fd. */
    4211             :     Assert(TransactionIdIsValid(stream_xid));
    4212             :     Assert(stream_fd != NULL);
    4213             : 
    4214             :     /*
    4215             :      * If the XID matches the toplevel transaction, we don't want to add it.
    4216             :      */
    4217      205026 :     if (stream_xid == xid)
    4218      184778 :         return;
    4219             : 
    4220             :     /*
    4221             :      * In most cases we're checking the same subxact as we've already seen in
    4222             :      * the last call, so make sure to ignore it (this change comes later).
    4223             :      */
    4224       20248 :     if (subxact_data.subxact_last == xid)
    4225       20096 :         return;
    4226             : 
    4227             :     /* OK, remember we're processing this XID. */
    4228         152 :     subxact_data.subxact_last = xid;
    4229             : 
    4230             :     /*
    4231             :      * Check if the transaction is already present in the array of subxact. We
    4232             :      * intentionally scan the array from the tail, because we're likely adding
    4233             :      * a change for the most recent subtransactions.
    4234             :      *
    4235             :      * XXX Can we rely on the subxact XIDs arriving in sorted order? That
    4236             :      * would allow us to use binary search here.
    4237             :      */
    4238         190 :     for (i = subxact_data.nsubxacts; i > 0; i--)
    4239             :     {
    4240             :         /* found, so we're done */
    4241         152 :         if (subxacts[i - 1].xid == xid)
    4242         114 :             return;
    4243             :     }
    4244             : 
    4245             :     /* This is a new subxact, so we need to add it to the array. */
    4246          38 :     if (subxact_data.nsubxacts == 0)
    4247             :     {
    4248             :         MemoryContext oldctx;
    4249             : 
    4250          16 :         subxact_data.nsubxacts_max = 128;
    4251             : 
    4252             :         /*
    4253             :          * Allocate this memory for subxacts in per-stream context, see
    4254             :          * subxact_info_read.
    4255             :          */
    4256          16 :         oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
    4257          16 :         subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
    4258          16 :         MemoryContextSwitchTo(oldctx);
    4259             :     }
    4260          22 :     else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
    4261             :     {
    4262          20 :         subxact_data.nsubxacts_max *= 2;
    4263          20 :         subxacts = repalloc(subxacts,
    4264          20 :                             subxact_data.nsubxacts_max * sizeof(SubXactInfo));
    4265             :     }
    4266             : 
    4267          38 :     subxacts[subxact_data.nsubxacts].xid = xid;
    4268             : 
    4269             :     /*
    4270             :      * Get the current offset of the stream file and store it as offset of
    4271             :      * this subxact.
    4272             :      */
    4273          38 :     BufFileTell(stream_fd,
    4274          38 :                 &subxacts[subxact_data.nsubxacts].fileno,
    4275          38 :                 &subxacts[subxact_data.nsubxacts].offset);
    4276             : 
    4277          38 :     subxact_data.nsubxacts++;
    4278          38 :     subxact_data.subxacts = subxacts;
    4279             : }
    4280             : 
    4281             : /* format filename for file containing the info about subxacts */
    4282             : static inline void
    4283        1494 : subxact_filename(char *path, Oid subid, TransactionId xid)
    4284             : {
    4285        1494 :     snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
    4286        1494 : }
    4287             : 
    4288             : /* format filename for file containing serialized changes */
    4289             : static inline void
    4290         876 : changes_filename(char *path, Oid subid, TransactionId xid)
    4291             : {
    4292         876 :     snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
    4293         876 : }
    4294             : 
    4295             : /*
    4296             :  * stream_cleanup_files
    4297             :  *    Cleanup files for a subscription / toplevel transaction.
    4298             :  *
    4299             :  * Remove files with serialized changes and subxact info for a particular
    4300             :  * toplevel transaction. Each subscription has a separate set of files
    4301             :  * for any toplevel transaction.
    4302             :  */
    4303             : void
    4304          62 : stream_cleanup_files(Oid subid, TransactionId xid)
    4305             : {
    4306             :     char        path[MAXPGPATH];
    4307             : 
    4308             :     /* Delete the changes file. */
    4309          62 :     changes_filename(path, subid, xid);
    4310          62 :     BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false);
    4311             : 
    4312             :     /* Delete the subxact file, if it exists. */
    4313          62 :     subxact_filename(path, subid, xid);
    4314          62 :     BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
    4315          62 : }
    4316             : 
    4317             : /*
    4318             :  * stream_open_file
    4319             :  *    Open a file that we'll use to serialize changes for a toplevel
    4320             :  * transaction.
    4321             :  *
    4322             :  * Open a file for streamed changes from a toplevel transaction identified
    4323             :  * by stream_xid (global variable). If it's the first chunk of streamed
    4324             :  * changes for this transaction, create the buffile, otherwise open the
    4325             :  * previously created file.
    4326             :  */
    4327             : static void
    4328         726 : stream_open_file(Oid subid, TransactionId xid, bool first_segment)
    4329             : {
    4330             :     char        path[MAXPGPATH];
    4331             :     MemoryContext oldcxt;
    4332             : 
    4333             :     Assert(OidIsValid(subid));
    4334             :     Assert(TransactionIdIsValid(xid));
    4335             :     Assert(stream_fd == NULL);
    4336             : 
    4337             : 
    4338         726 :     changes_filename(path, subid, xid);
    4339         726 :     elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
    4340             : 
    4341             :     /*
    4342             :      * Create/open the buffiles under the logical streaming context so that we
    4343             :      * have those files until stream stop.
    4344             :      */
    4345         726 :     oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
    4346             : 
    4347             :     /*
    4348             :      * If this is the first streamed segment, create the changes file.
    4349             :      * Otherwise, just open the file for writing, in append mode.
    4350             :      */
    4351         726 :     if (first_segment)
    4352          64 :         stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
    4353             :                                          path);
    4354             :     else
    4355             :     {
    4356             :         /*
    4357             :          * Open the file and seek to the end of the file because we always
    4358             :          * append the changes file.
    4359             :          */
    4360         662 :         stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset,
    4361             :                                        path, O_RDWR, false);
    4362         662 :         BufFileSeek(stream_fd, 0, 0, SEEK_END);
    4363             :     }
    4364             : 
    4365         726 :     MemoryContextSwitchTo(oldcxt);
    4366         726 : }
    4367             : 
    4368             : /*
    4369             :  * stream_close_file
    4370             :  *    Close the currently open file with streamed changes.
    4371             :  */
    4372             : static void
    4373         786 : stream_close_file(void)
    4374             : {
    4375             :     Assert(stream_fd != NULL);
    4376             : 
    4377         786 :     BufFileClose(stream_fd);
    4378             : 
    4379         786 :     stream_fd = NULL;
    4380         786 : }
    4381             : 
    4382             : /*
    4383             :  * stream_write_change
    4384             :  *    Serialize a change to a file for the current toplevel transaction.
    4385             :  *
    4386             :  * The change is serialized in a simple format, with length (not including
    4387             :  * the length), action code (identifying the message type) and message
    4388             :  * contents (without the subxact TransactionId value).
    4389             :  */
    4390             : static void
    4391      215108 : stream_write_change(char action, StringInfo s)
    4392             : {
    4393             :     int         len;
    4394             : 
    4395             :     Assert(stream_fd != NULL);
    4396             : 
    4397             :     /* total on-disk size, including the action type character */
    4398      215108 :     len = (s->len - s->cursor) + sizeof(char);
    4399             : 
    4400             :     /* first write the size */
    4401      215108 :     BufFileWrite(stream_fd, &len, sizeof(len));
    4402             : 
    4403             :     /* then the action */
    4404      215108 :     BufFileWrite(stream_fd, &action, sizeof(action));
    4405             : 
    4406             :     /* and finally the remaining part of the buffer (after the XID) */
    4407      215108 :     len = (s->len - s->cursor);
    4408             : 
    4409      215108 :     BufFileWrite(stream_fd, &s->data[s->cursor], len);
    4410      215108 : }
    4411             : 
    4412             : /*
    4413             :  * stream_open_and_write_change
    4414             :  *    Serialize a message to a file for the given transaction.
    4415             :  *
    4416             :  * This function is similar to stream_write_change except that it will open the
    4417             :  * target file if not already before writing the message and close the file at
    4418             :  * the end.
    4419             :  */
    4420             : static void
    4421          10 : stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
    4422             : {
    4423             :     Assert(!in_streamed_transaction);
    4424             : 
    4425          10 :     if (!stream_fd)
    4426          10 :         stream_start_internal(xid, false);
    4427             : 
    4428          10 :     stream_write_change(action, s);
    4429          10 :     stream_stop_internal(xid);
    4430          10 : }
    4431             : 
    4432             : /*
    4433             :  * Sets streaming options including replication slot name and origin start
    4434             :  * position. Workers need these options for logical replication.
    4435             :  */
    4436             : void
    4437         746 : set_stream_options(WalRcvStreamOptions *options,
    4438             :                    char *slotname,
    4439             :                    XLogRecPtr *origin_startpos)
    4440             : {
    4441             :     int         server_version;
    4442             : 
    4443         746 :     options->logical = true;
    4444         746 :     options->startpoint = *origin_startpos;
    4445         746 :     options->slotname = slotname;
    4446             : 
    4447         746 :     server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
    4448         746 :     options->proto.logical.proto_version =
    4449         746 :         server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
    4450             :         server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
    4451             :         server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
    4452             :         LOGICALREP_PROTO_VERSION_NUM;
    4453             : 
    4454         746 :     options->proto.logical.publication_names = MySubscription->publications;
    4455         746 :     options->proto.logical.binary = MySubscription->binary;
    4456             : 
    4457             :     /*
    4458             :      * Assign the appropriate option value for streaming option according to
    4459             :      * the 'streaming' mode and the publisher's ability to support that mode.
    4460             :      */
    4461         746 :     if (server_version >= 160000 &&
    4462         746 :         MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
    4463             :     {
    4464         678 :         options->proto.logical.streaming_str = "parallel";
    4465         678 :         MyLogicalRepWorker->parallel_apply = true;
    4466             :     }
    4467          68 :     else if (server_version >= 140000 &&
    4468          68 :              MySubscription->stream != LOGICALREP_STREAM_OFF)
    4469             :     {
    4470          52 :         options->proto.logical.streaming_str = "on";
    4471          52 :         MyLogicalRepWorker->parallel_apply = false;
    4472             :     }
    4473             :     else
    4474             :     {
    4475          16 :         options->proto.logical.streaming_str = NULL;
    4476          16 :         MyLogicalRepWorker->parallel_apply = false;
    4477             :     }
    4478             : 
    4479         746 :     options->proto.logical.twophase = false;
    4480         746 :     options->proto.logical.origin = pstrdup(MySubscription->origin);
    4481         746 : }
    4482             : 
    4483             : /*
    4484             :  * Cleanup the memory for subxacts and reset the related variables.
    4485             :  */
    4486             : static inline void
    4487         752 : cleanup_subxact_info()
    4488             : {
    4489         752 :     if (subxact_data.subxacts)
    4490         174 :         pfree(subxact_data.subxacts);
    4491             : 
    4492         752 :     subxact_data.subxacts = NULL;
    4493         752 :     subxact_data.subxact_last = InvalidTransactionId;
    4494         752 :     subxact_data.nsubxacts = 0;
    4495         752 :     subxact_data.nsubxacts_max = 0;
    4496         752 : }
    4497             : 
    4498             : /*
    4499             :  * Common function to run the apply loop with error handling. Disable the
    4500             :  * subscription, if necessary.
    4501             :  *
    4502             :  * Note that we don't handle FATAL errors which are probably because
    4503             :  * of system resource error and are not repeatable.
    4504             :  */
    4505             : void
    4506         746 : start_apply(XLogRecPtr origin_startpos)
    4507             : {
    4508         746 :     PG_TRY();
    4509             :     {
    4510         746 :         LogicalRepApplyLoop(origin_startpos);
    4511             :     }
    4512         112 :     PG_CATCH();
    4513             :     {
    4514             :         /*
    4515             :          * Reset the origin state to prevent the advancement of origin
    4516             :          * progress if we fail to apply. Otherwise, this will result in
    4517             :          * transaction loss as that transaction won't be sent again by the
    4518             :          * server.
    4519             :          */
    4520         112 :         replorigin_reset(0, (Datum) 0);
    4521             : 
    4522         112 :         if (MySubscription->disableonerr)
    4523           6 :             DisableSubscriptionAndExit();
    4524             :         else
    4525             :         {
    4526             :             /*
    4527             :              * Report the worker failed while applying changes. Abort the
    4528             :              * current transaction so that the stats message is sent in an
    4529             :              * idle state.
    4530             :              */
    4531         106 :             AbortOutOfAnyTransaction();
    4532         106 :             pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
    4533             : 
    4534         106 :             PG_RE_THROW();
    4535             :         }
    4536             :     }
    4537           0 :     PG_END_TRY();
    4538           0 : }
    4539             : 
    4540             : /*
    4541             :  * Runs the leader apply worker.
    4542             :  *
    4543             :  * It sets up replication origin, streaming options and then starts streaming.
    4544             :  */
    4545             : static void
    4546         444 : run_apply_worker()
    4547             : {
    4548             :     char        originname[NAMEDATALEN];
    4549         444 :     XLogRecPtr  origin_startpos = InvalidXLogRecPtr;
    4550         444 :     char       *slotname = NULL;
    4551             :     WalRcvStreamOptions options;
    4552             :     RepOriginId originid;
    4553             :     TimeLineID  startpointTLI;
    4554             :     char       *err;
    4555             :     bool        must_use_password;
    4556             : 
    4557         444 :     slotname = MySubscription->slotname;
    4558             : 
    4559             :     /*
    4560             :      * This shouldn't happen if the subscription is enabled, but guard against
    4561             :      * DDL bugs or manual catalog changes.  (libpqwalreceiver will crash if
    4562             :      * slot is NULL.)
    4563             :      */
    4564         444 :     if (!slotname)
    4565           0 :         ereport(ERROR,
    4566             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    4567             :                  errmsg("subscription has no replication slot set")));
    4568             : 
    4569             :     /* Setup replication origin tracking. */
    4570         444 :     ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
    4571             :                                        originname, sizeof(originname));
    4572         444 :     StartTransactionCommand();
    4573         444 :     originid = replorigin_by_name(originname, true);
    4574         444 :     if (!OidIsValid(originid))
    4575           0 :         originid = replorigin_create(originname);
    4576         444 :     replorigin_session_setup(originid, 0);
    4577         444 :     replorigin_session_origin = originid;
    4578         444 :     origin_startpos = replorigin_session_get_progress(false);
    4579         444 :     CommitTransactionCommand();
    4580             : 
    4581             :     /* Is the use of a password mandatory? */
    4582         852 :     must_use_password = MySubscription->passwordrequired &&
    4583         408 :         !MySubscription->ownersuperuser;
    4584             : 
    4585         444 :     LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
    4586             :                                             true, must_use_password,
    4587             :                                             MySubscription->name, &err);
    4588             : 
    4589         428 :     if (LogRepWorkerWalRcvConn == NULL)
    4590          46 :         ereport(ERROR,
    4591             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    4592             :                  errmsg("apply worker for subscription \"%s\" could not connect to the publisher: %s",
    4593             :                         MySubscription->name, err)));
    4594             : 
    4595             :     /*
    4596             :      * We don't really use the output identify_system for anything but it does
    4597             :      * some initializations on the upstream so let's still call it.
    4598             :      */
    4599         382 :     (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
    4600             : 
    4601         382 :     set_apply_error_context_origin(originname);
    4602             : 
    4603         382 :     set_stream_options(&options, slotname, &origin_startpos);
    4604             : 
    4605             :     /*
    4606             :      * Even when the two_phase mode is requested by the user, it remains as
    4607             :      * the tri-state PENDING until all tablesyncs have reached READY state.
    4608             :      * Only then, can it become ENABLED.
    4609             :      *
    4610             :      * Note: If the subscription has no tables then leave the state as
    4611             :      * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
    4612             :      * work.
    4613             :      */
    4614         412 :     if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
    4615          30 :         AllTablesyncsReady())
    4616             :     {
    4617             :         /* Start streaming with two_phase enabled */
    4618          16 :         options.proto.logical.twophase = true;
    4619          16 :         walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
    4620             : 
    4621          16 :         StartTransactionCommand();
    4622             : 
    4623             :         /*
    4624             :          * Updating pg_subscription might involve TOAST table access, so
    4625             :          * ensure we have a valid snapshot.
    4626             :          */
    4627          16 :         PushActiveSnapshot(GetTransactionSnapshot());
    4628             : 
    4629          16 :         UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
    4630          16 :         MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
    4631          16 :         PopActiveSnapshot();
    4632          16 :         CommitTransactionCommand();
    4633             :     }
    4634             :     else
    4635             :     {
    4636         366 :         walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
    4637             :     }
    4638             : 
    4639         382 :     ereport(DEBUG1,
    4640             :             (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
    4641             :                              MySubscription->name,
    4642             :                              MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
    4643             :                              MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
    4644             :                              MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
    4645             :                              "?")));
    4646             : 
    4647             :     /* Run the main loop. */
    4648         382 :     start_apply(origin_startpos);
    4649           0 : }
    4650             : 
    4651             : /*
    4652             :  * Common initialization for leader apply worker, parallel apply worker and
    4653             :  * tablesync worker.
    4654             :  *
    4655             :  * Initialize the database connection, in-memory subscription and necessary
    4656             :  * config options.
    4657             :  */
    4658             : void
    4659         860 : InitializeLogRepWorker(void)
    4660             : {
    4661             :     MemoryContext oldctx;
    4662             : 
    4663             :     /* Run as replica session replication role. */
    4664         860 :     SetConfigOption("session_replication_role", "replica",
    4665             :                     PGC_SUSET, PGC_S_OVERRIDE);
    4666             : 
    4667             :     /* Connect to our database. */
    4668         860 :     BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
    4669         860 :                                               MyLogicalRepWorker->userid,
    4670             :                                               0);
    4671             : 
    4672             :     /*
    4673             :      * Set always-secure search path, so malicious users can't redirect user
    4674             :      * code (e.g. pg_index.indexprs).
    4675             :      */
    4676         854 :     SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
    4677             : 
    4678             :     /* Load the subscription into persistent memory context. */
    4679         854 :     ApplyContext = AllocSetContextCreate(TopMemoryContext,
    4680             :                                          "ApplyContext",
    4681             :                                          ALLOCSET_DEFAULT_SIZES);
    4682         854 :     StartTransactionCommand();
    4683         854 :     oldctx = MemoryContextSwitchTo(ApplyContext);
    4684             : 
    4685         854 :     MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
    4686         852 :     if (!MySubscription)
    4687             :     {
    4688           0 :         ereport(LOG,
    4689             :                 (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
    4690             :                         MyLogicalRepWorker->subid)));
    4691             : 
    4692             :         /* Ensure we remove no-longer-useful entry for worker's start time */
    4693           0 :         if (am_leader_apply_worker())
    4694           0 :             ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
    4695             : 
    4696           0 :         proc_exit(0);
    4697             :     }
    4698             : 
    4699         852 :     MySubscriptionValid = true;
    4700         852 :     MemoryContextSwitchTo(oldctx);
    4701             : 
    4702         852 :     if (!MySubscription->enabled)
    4703             :     {
    4704           0 :         ereport(LOG,
    4705             :                 (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
    4706             :                         MySubscription->name)));
    4707             : 
    4708           0 :         apply_worker_exit();
    4709             :     }
    4710             : 
    4711             :     /* Setup synchronous commit according to the user's wishes */
    4712         852 :     SetConfigOption("synchronous_commit", MySubscription->synccommit,
    4713             :                     PGC_BACKEND, PGC_S_OVERRIDE);
    4714             : 
    4715             :     /*
    4716             :      * Keep us informed about subscription or role changes. Note that the
    4717             :      * role's superuser privilege can be revoked.
    4718             :      */
    4719         852 :     CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
    4720             :                                   subscription_change_cb,
    4721             :                                   (Datum) 0);
    4722             : 
    4723         852 :     CacheRegisterSyscacheCallback(AUTHOID,
    4724             :                                   subscription_change_cb,
    4725             :                                   (Datum) 0);
    4726             : 
    4727         852 :     if (am_tablesync_worker())
    4728         388 :         ereport(LOG,
    4729             :                 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
    4730             :                         MySubscription->name,
    4731             :                         get_rel_name(MyLogicalRepWorker->relid))));
    4732             :     else
    4733         464 :         ereport(LOG,
    4734             :                 (errmsg("logical replication apply worker for subscription \"%s\" has started",
    4735             :                         MySubscription->name)));
    4736             : 
    4737         852 :     CommitTransactionCommand();
    4738         852 : }
    4739             : 
    4740             : /*
    4741             :  * Reset the origin state.
    4742             :  */
    4743             : static void
    4744         944 : replorigin_reset(int code, Datum arg)
    4745             : {
    4746         944 :     replorigin_session_origin = InvalidRepOriginId;
    4747         944 :     replorigin_session_origin_lsn = InvalidXLogRecPtr;
    4748         944 :     replorigin_session_origin_timestamp = 0;
    4749         944 : }
    4750             : 
    4751             : /* Common function to setup the leader apply or tablesync worker. */
    4752             : void
    4753         840 : SetupApplyOrSyncWorker(int worker_slot)
    4754             : {
    4755             :     /* Attach to slot */
    4756         840 :     logicalrep_worker_attach(worker_slot);
    4757             : 
    4758             :     Assert(am_tablesync_worker() || am_leader_apply_worker());
    4759             : 
    4760             :     /* Setup signal handling */
    4761         840 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
    4762         840 :     pqsignal(SIGTERM, die);
    4763         840 :     BackgroundWorkerUnblockSignals();
    4764             : 
    4765             :     /*
    4766             :      * We don't currently need any ResourceOwner in a walreceiver process, but
    4767             :      * if we did, we could call CreateAuxProcessResourceOwner here.
    4768             :      */
    4769             : 
    4770             :     /* Initialise stats to a sanish value */
    4771         840 :     MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
    4772         840 :         MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
    4773             : 
    4774             :     /* Load the libpq-specific functions */
    4775         840 :     load_file("libpqwalreceiver", false);
    4776             : 
    4777         840 :     InitializeLogRepWorker();
    4778             : 
    4779             :     /*
    4780             :      * Register a callback to reset the origin state before aborting any
    4781             :      * pending transaction during shutdown (see ShutdownPostgres()). This will
    4782             :      * avoid origin advancement for an in-complete transaction which could
    4783             :      * otherwise lead to its loss as such a transaction won't be sent by the
    4784             :      * server again.
    4785             :      *
    4786             :      * Note that even a LOG or DEBUG statement placed after setting the origin
    4787             :      * state may process a shutdown signal before committing the current apply
    4788             :      * operation. So, it is important to register such a callback here.
    4789             :      */
    4790         832 :     before_shmem_exit(replorigin_reset, (Datum) 0);
    4791             : 
    4792             :     /* Connect to the origin and start the replication. */
    4793         832 :     elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
    4794             :          MySubscription->conninfo);
    4795             : 
    4796             :     /*
    4797             :      * Setup callback for syscache so that we know when something changes in
    4798             :      * the subscription relation state.
    4799             :      */
    4800         832 :     CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
    4801             :                                   invalidate_syncing_table_states,
    4802             :                                   (Datum) 0);
    4803         832 : }
    4804             : 
    4805             : /* Logical Replication Apply worker entry point */
    4806             : void
    4807         450 : ApplyWorkerMain(Datum main_arg)
    4808             : {
    4809         450 :     int         worker_slot = DatumGetInt32(main_arg);
    4810             : 
    4811         450 :     InitializingApplyWorker = true;
    4812             : 
    4813         450 :     SetupApplyOrSyncWorker(worker_slot);
    4814             : 
    4815         444 :     InitializingApplyWorker = false;
    4816             : 
    4817         444 :     run_apply_worker();
    4818             : 
    4819           0 :     proc_exit(0);
    4820             : }
    4821             : 
    4822             : /*
    4823             :  * After error recovery, disable the subscription in a new transaction
    4824             :  * and exit cleanly.
    4825             :  */
    4826             : void
    4827           8 : DisableSubscriptionAndExit(void)
    4828             : {
    4829             :     /*
    4830             :      * Emit the error message, and recover from the error state to an idle
    4831             :      * state
    4832             :      */
    4833           8 :     HOLD_INTERRUPTS();
    4834             : 
    4835           8 :     EmitErrorReport();
    4836           8 :     AbortOutOfAnyTransaction();
    4837           8 :     FlushErrorState();
    4838             : 
    4839           8 :     RESUME_INTERRUPTS();
    4840             : 
    4841             :     /* Report the worker failed during either table synchronization or apply */
    4842           8 :     pgstat_report_subscription_error(MyLogicalRepWorker->subid,
    4843           8 :                                      !am_tablesync_worker());
    4844             : 
    4845             :     /* Disable the subscription */
    4846           8 :     StartTransactionCommand();
    4847             : 
    4848             :     /*
    4849             :      * Updating pg_subscription might involve TOAST table access, so ensure we
    4850             :      * have a valid snapshot.
    4851             :      */
    4852           8 :     PushActiveSnapshot(GetTransactionSnapshot());
    4853             : 
    4854           8 :     DisableSubscription(MySubscription->oid);
    4855           8 :     PopActiveSnapshot();
    4856           8 :     CommitTransactionCommand();
    4857             : 
    4858             :     /* Ensure we remove no-longer-useful entry for worker's start time */
    4859           8 :     if (am_leader_apply_worker())
    4860           6 :         ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
    4861             : 
    4862             :     /* Notify the subscription has been disabled and exit */
    4863           8 :     ereport(LOG,
    4864             :             errmsg("subscription \"%s\" has been disabled because of an error",
    4865             :                    MySubscription->name));
    4866             : 
    4867           8 :     proc_exit(0);
    4868             : }
    4869             : 
    4870             : /*
    4871             :  * Is current process a logical replication worker?
    4872             :  */
    4873             : bool
    4874        3988 : IsLogicalWorker(void)
    4875             : {
    4876        3988 :     return MyLogicalRepWorker != NULL;
    4877             : }
    4878             : 
    4879             : /*
    4880             :  * Is current process a logical replication parallel apply worker?
    4881             :  */
    4882             : bool
    4883        2760 : IsLogicalParallelApplyWorker(void)
    4884             : {
    4885        2760 :     return IsLogicalWorker() && am_parallel_apply_worker();
    4886             : }
    4887             : 
    4888             : /*
    4889             :  * Start skipping changes of the transaction if the given LSN matches the
    4890             :  * LSN specified by subscription's skiplsn.
    4891             :  */
    4892             : static void
    4893         994 : maybe_start_skipping_changes(XLogRecPtr finish_lsn)
    4894             : {
    4895             :     Assert(!is_skipping_changes());
    4896             :     Assert(!in_remote_transaction);
    4897             :     Assert(!in_streamed_transaction);
    4898             : 
    4899             :     /*
    4900             :      * Quick return if it's not requested to skip this transaction. This
    4901             :      * function is called for every remote transaction and we assume that
    4902             :      * skipping the transaction is not used often.
    4903             :      */
    4904         994 :     if (likely(XLogRecPtrIsInvalid(MySubscription->skiplsn) ||
    4905             :                MySubscription->skiplsn != finish_lsn))
    4906         988 :         return;
    4907             : 
    4908             :     /* Start skipping all changes of this transaction */
    4909           6 :     skip_xact_finish_lsn = finish_lsn;
    4910             : 
    4911           6 :     ereport(LOG,
    4912             :             errmsg("logical replication starts skipping transaction at LSN %X/%X",
    4913             :                    LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
    4914             : }
    4915             : 
    4916             : /*
    4917             :  * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
    4918             :  */
    4919             : static void
    4920          54 : stop_skipping_changes(void)
    4921             : {
    4922          54 :     if (!is_skipping_changes())
    4923          48 :         return;
    4924             : 
    4925           6 :     ereport(LOG,
    4926             :             (errmsg("logical replication completed skipping transaction at LSN %X/%X",
    4927             :                     LSN_FORMAT_ARGS(skip_xact_finish_lsn))));
    4928             : 
    4929             :     /* Stop skipping changes */
    4930           6 :     skip_xact_finish_lsn = InvalidXLogRecPtr;
    4931             : }
    4932             : 
    4933             : /*
    4934             :  * Clear subskiplsn of pg_subscription catalog.
    4935             :  *
    4936             :  * finish_lsn is the transaction's finish LSN that is used to check if the
    4937             :  * subskiplsn matches it. If not matched, we raise a warning when clearing the
    4938             :  * subskiplsn in order to inform users for cases e.g., where the user mistakenly
    4939             :  * specified the wrong subskiplsn.
    4940             :  */
    4941             : static void
    4942        1028 : clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
    4943             : {
    4944             :     Relation    rel;
    4945             :     Form_pg_subscription subform;
    4946             :     HeapTuple   tup;
    4947        1028 :     XLogRecPtr  myskiplsn = MySubscription->skiplsn;
    4948        1028 :     bool        started_tx = false;
    4949             : 
    4950        1028 :     if (likely(XLogRecPtrIsInvalid(myskiplsn)) || am_parallel_apply_worker())
    4951        1022 :         return;
    4952             : 
    4953           6 :     if (!IsTransactionState())
    4954             :     {
    4955           2 :         StartTransactionCommand();
    4956           2 :         started_tx = true;
    4957             :     }
    4958             : 
    4959             :     /*
    4960             :      * Updating pg_subscription might involve TOAST table access, so ensure we
    4961             :      * have a valid snapshot.
    4962             :      */
    4963           6 :     PushActiveSnapshot(GetTransactionSnapshot());
    4964             : 
    4965             :     /*
    4966             :      * Protect subskiplsn of pg_subscription from being concurrently updated
    4967             :      * while clearing it.
    4968             :      */
    4969           6 :     LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
    4970             :                      AccessShareLock);
    4971             : 
    4972           6 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    4973             : 
    4974             :     /* Fetch the existing tuple. */
    4975           6 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
    4976             :                               ObjectIdGetDatum(MySubscription->oid));
    4977             : 
    4978           6 :     if (!HeapTupleIsValid(tup))
    4979           0 :         elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
    4980             : 
    4981           6 :     subform = (Form_pg_subscription) GETSTRUCT(tup);
    4982             : 
    4983             :     /*
    4984             :      * Clear the subskiplsn. If the user has already changed subskiplsn before
    4985             :      * clearing it we don't update the catalog and the replication origin
    4986             :      * state won't get advanced. So in the worst case, if the server crashes
    4987             :      * before sending an acknowledgment of the flush position the transaction
    4988             :      * will be sent again and the user needs to set subskiplsn again. We can
    4989             :      * reduce the possibility by logging a replication origin WAL record to
    4990             :      * advance the origin LSN instead but there is no way to advance the
    4991             :      * origin timestamp and it doesn't seem to be worth doing anything about
    4992             :      * it since it's a very rare case.
    4993             :      */
    4994           6 :     if (subform->subskiplsn == myskiplsn)
    4995             :     {
    4996             :         bool        nulls[Natts_pg_subscription];
    4997             :         bool        replaces[Natts_pg_subscription];
    4998             :         Datum       values[Natts_pg_subscription];
    4999             : 
    5000           6 :         memset(values, 0, sizeof(values));
    5001           6 :         memset(nulls, false, sizeof(nulls));
    5002           6 :         memset(replaces, false, sizeof(replaces));
    5003             : 
    5004             :         /* reset subskiplsn */
    5005           6 :         values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
    5006           6 :         replaces[Anum_pg_subscription_subskiplsn - 1] = true;
    5007             : 
    5008           6 :         tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
    5009             :                                 replaces);
    5010           6 :         CatalogTupleUpdate(rel, &tup->t_self, tup);
    5011             : 
    5012           6 :         if (myskiplsn != finish_lsn)
    5013           0 :             ereport(WARNING,
    5014             :                     errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
    5015             :                     errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.",
    5016             :                               LSN_FORMAT_ARGS(finish_lsn),
    5017             :                               LSN_FORMAT_ARGS(myskiplsn)));
    5018             :     }
    5019             : 
    5020           6 :     heap_freetuple(tup);
    5021           6 :     table_close(rel, NoLock);
    5022             : 
    5023           6 :     PopActiveSnapshot();
    5024             : 
    5025           6 :     if (started_tx)
    5026           2 :         CommitTransactionCommand();
    5027             : }
    5028             : 
    5029             : /* Error callback to give more context info about the change being applied */
    5030             : void
    5031        1406 : apply_error_callback(void *arg)
    5032             : {
    5033        1406 :     ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
    5034             : 
    5035        1406 :     if (apply_error_callback_arg.command == 0)
    5036         694 :         return;
    5037             : 
    5038             :     Assert(errarg->origin_name);
    5039             : 
    5040         712 :     if (errarg->rel == NULL)
    5041             :     {
    5042         626 :         if (!TransactionIdIsValid(errarg->remote_xid))
    5043           0 :             errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
    5044             :                        errarg->origin_name,
    5045             :                        logicalrep_message_type(errarg->command));
    5046         626 :         else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
    5047         514 :             errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
    5048             :                        errarg->origin_name,
    5049             :                        logicalrep_message_type(errarg->command),
    5050             :                        errarg->remote_xid);
    5051             :         else
    5052         224 :             errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
    5053             :                        errarg->origin_name,
    5054             :                        logicalrep_message_type(errarg->command),
    5055             :                        errarg->remote_xid,
    5056         112 :                        LSN_FORMAT_ARGS(errarg->finish_lsn));
    5057             :     }
    5058             :     else
    5059             :     {
    5060          86 :         if (errarg->remote_attnum < 0)
    5061             :         {
    5062          86 :             if (XLogRecPtrIsInvalid(errarg->finish_lsn))
    5063           0 :                 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
    5064             :                            errarg->origin_name,
    5065             :                            logicalrep_message_type(errarg->command),
    5066           0 :                            errarg->rel->remoterel.nspname,
    5067           0 :                            errarg->rel->remoterel.relname,
    5068             :                            errarg->remote_xid);
    5069             :             else
    5070         172 :                 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
    5071             :                            errarg->origin_name,
    5072             :                            logicalrep_message_type(errarg->command),
    5073          86 :                            errarg->rel->remoterel.nspname,
    5074          86 :                            errarg->rel->remoterel.relname,
    5075             :                            errarg->remote_xid,
    5076          86 :                            LSN_FORMAT_ARGS(errarg->finish_lsn));
    5077             :         }
    5078             :         else
    5079             :         {
    5080           0 :             if (XLogRecPtrIsInvalid(errarg->finish_lsn))
    5081           0 :                 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
    5082             :                            errarg->origin_name,
    5083             :                            logicalrep_message_type(errarg->command),
    5084           0 :                            errarg->rel->remoterel.nspname,
    5085           0 :                            errarg->rel->remoterel.relname,
    5086           0 :                            errarg->rel->remoterel.attnames[errarg->remote_attnum],
    5087             :                            errarg->remote_xid);
    5088             :             else
    5089           0 :                 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
    5090             :                            errarg->origin_name,
    5091             :                            logicalrep_message_type(errarg->command),
    5092           0 :                            errarg->rel->remoterel.nspname,
    5093           0 :                            errarg->rel->remoterel.relname,
    5094           0 :                            errarg->rel->remoterel.attnames[errarg->remote_attnum],
    5095             :                            errarg->remote_xid,
    5096           0 :                            LSN_FORMAT_ARGS(errarg->finish_lsn));
    5097             :         }
    5098             :     }
    5099             : }
    5100             : 
    5101             : /* Set transaction information of apply error callback */
    5102             : static inline void
    5103        5694 : set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
    5104             : {
    5105        5694 :     apply_error_callback_arg.remote_xid = xid;
    5106        5694 :     apply_error_callback_arg.finish_lsn = lsn;
    5107        5694 : }
    5108             : 
    5109             : /* Reset all information of apply error callback */
    5110             : static inline void
    5111        2810 : reset_apply_error_context_info(void)
    5112             : {
    5113        2810 :     apply_error_callback_arg.command = 0;
    5114        2810 :     apply_error_callback_arg.rel = NULL;
    5115        2810 :     apply_error_callback_arg.remote_attnum = -1;
    5116        2810 :     set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
    5117        2810 : }
    5118             : 
    5119             : /*
    5120             :  * Request wakeup of the workers for the given subscription OID
    5121             :  * at commit of the current transaction.
    5122             :  *
    5123             :  * This is used to ensure that the workers process assorted changes
    5124             :  * as soon as possible.
    5125             :  */
    5126             : void
    5127         386 : LogicalRepWorkersWakeupAtCommit(Oid subid)
    5128             : {
    5129             :     MemoryContext oldcxt;
    5130             : 
    5131         386 :     oldcxt = MemoryContextSwitchTo(TopTransactionContext);
    5132         386 :     on_commit_wakeup_workers_subids =
    5133         386 :         list_append_unique_oid(on_commit_wakeup_workers_subids, subid);
    5134         386 :     MemoryContextSwitchTo(oldcxt);
    5135         386 : }
    5136             : 
    5137             : /*
    5138             :  * Wake up the workers of any subscriptions that were changed in this xact.
    5139             :  */
    5140             : void
    5141     1042496 : AtEOXact_LogicalRepWorkers(bool isCommit)
    5142             : {
    5143     1042496 :     if (isCommit && on_commit_wakeup_workers_subids != NIL)
    5144             :     {
    5145             :         ListCell   *lc;
    5146             : 
    5147         376 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    5148         752 :         foreach(lc, on_commit_wakeup_workers_subids)
    5149             :         {
    5150         376 :             Oid         subid = lfirst_oid(lc);
    5151             :             List       *workers;
    5152             :             ListCell   *lc2;
    5153             : 
    5154         376 :             workers = logicalrep_workers_find(subid, true, false);
    5155         486 :             foreach(lc2, workers)
    5156             :             {
    5157         110 :                 LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
    5158             : 
    5159         110 :                 logicalrep_worker_wakeup_ptr(worker);
    5160             :             }
    5161             :         }
    5162         376 :         LWLockRelease(LogicalRepWorkerLock);
    5163             :     }
    5164             : 
    5165             :     /* The List storage will be reclaimed automatically in xact cleanup. */
    5166     1042496 :     on_commit_wakeup_workers_subids = NIL;
    5167     1042496 : }
    5168             : 
    5169             : /*
    5170             :  * Allocate the origin name in long-lived context for error context message.
    5171             :  */
    5172             : void
    5173         766 : set_apply_error_context_origin(char *originname)
    5174             : {
    5175         766 :     apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
    5176             :                                                                originname);
    5177         766 : }
    5178             : 
    5179             : /*
    5180             :  * Return the action to be taken for the given transaction. See
    5181             :  * TransApplyAction for information on each of the actions.
    5182             :  *
    5183             :  * *winfo is assigned to the destination parallel worker info when the leader
    5184             :  * apply worker has to pass all the transaction's changes to the parallel
    5185             :  * apply worker.
    5186             :  */
    5187             : static TransApplyAction
    5188      652422 : get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
    5189             : {
    5190      652422 :     *winfo = NULL;
    5191             : 
    5192      652422 :     if (am_parallel_apply_worker())
    5193             :     {
    5194      137876 :         return TRANS_PARALLEL_APPLY;
    5195             :     }
    5196             : 
    5197             :     /*
    5198             :      * If we are processing this transaction using a parallel apply worker
    5199             :      * then either we send the changes to the parallel worker or if the worker
    5200             :      * is busy then serialize the changes to the file which will later be
    5201             :      * processed by the parallel worker.
    5202             :      */
    5203      514546 :     *winfo = pa_find_worker(xid);
    5204             : 
    5205      514546 :     if (*winfo && (*winfo)->serialize_changes)
    5206             :     {
    5207       10074 :         return TRANS_LEADER_PARTIAL_SERIALIZE;
    5208             :     }
    5209      504472 :     else if (*winfo)
    5210             :     {
    5211      137788 :         return TRANS_LEADER_SEND_TO_PARALLEL;
    5212             :     }
    5213             : 
    5214             :     /*
    5215             :      * If there is no parallel worker involved to process this transaction
    5216             :      * then we either directly apply the change or serialize it to a file
    5217             :      * which will later be applied when the transaction finish message is
    5218             :      * processed.
    5219             :      */
    5220      366684 :     else if (in_streamed_transaction)
    5221             :     {
    5222      206398 :         return TRANS_LEADER_SERIALIZE;
    5223             :     }
    5224             :     else
    5225             :     {
    5226      160286 :         return TRANS_LEADER_APPLY;
    5227             :     }
    5228             : }

Generated by: LCOV version 1.16