LCOV - code coverage report
Current view: top level - src/backend/replication/logical - worker.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 1592 1707 93.3 %
Date: 2025-07-27 09:17:26 Functions: 90 90 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * worker.c
       3             :  *     PostgreSQL logical replication worker (apply)
       4             :  *
       5             :  * Copyright (c) 2016-2025, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/worker.c
       9             :  *
      10             :  * NOTES
      11             :  *    This file contains the worker which applies logical changes as they come
      12             :  *    from remote logical replication stream.
      13             :  *
      14             :  *    The main worker (apply) is started by logical replication worker
      15             :  *    launcher for every enabled subscription in a database. It uses
      16             :  *    walsender protocol to communicate with publisher.
      17             :  *
      18             :  *    This module includes server facing code and shares libpqwalreceiver
      19             :  *    module with walreceiver for providing the libpq specific functionality.
      20             :  *
      21             :  *
      22             :  * STREAMED TRANSACTIONS
      23             :  * ---------------------
      24             :  * Streamed transactions (large transactions exceeding a memory limit on the
      25             :  * upstream) are applied using one of two approaches:
      26             :  *
      27             :  * 1) Write to temporary files and apply when the final commit arrives
      28             :  *
      29             :  * This approach is used when the user has set the subscription's streaming
      30             :  * option as on.
      31             :  *
      32             :  * Unlike the regular (non-streamed) case, handling streamed transactions has
      33             :  * to handle aborts of both the toplevel transaction and subtransactions. This
      34             :  * is achieved by tracking offsets for subtransactions, which is then used
      35             :  * to truncate the file with serialized changes.
      36             :  *
      37             :  * The files are placed in tmp file directory by default, and the filenames
      38             :  * include both the XID of the toplevel transaction and OID of the
      39             :  * subscription. This is necessary so that different workers processing a
      40             :  * remote transaction with the same XID doesn't interfere.
      41             :  *
      42             :  * We use BufFiles instead of using normal temporary files because (a) the
      43             :  * BufFile infrastructure supports temporary files that exceed the OS file size
      44             :  * limit, (b) provides a way for automatic clean up on the error and (c) provides
      45             :  * a way to survive these files across local transactions and allow to open and
      46             :  * close at stream start and close. We decided to use FileSet
      47             :  * infrastructure as without that it deletes the files on the closure of the
      48             :  * file and if we decide to keep stream files open across the start/stop stream
      49             :  * then it will consume a lot of memory (more than 8K for each BufFile and
      50             :  * there could be multiple such BufFiles as the subscriber could receive
      51             :  * multiple start/stop streams for different transactions before getting the
      52             :  * commit). Moreover, if we don't use FileSet then we also need to invent
      53             :  * a new way to pass filenames to BufFile APIs so that we are allowed to open
      54             :  * the file we desired across multiple stream-open calls for the same
      55             :  * transaction.
      56             :  *
      57             :  * 2) Parallel apply workers.
      58             :  *
      59             :  * This approach is used when the user has set the subscription's streaming
      60             :  * option as parallel. See logical/applyparallelworker.c for information about
      61             :  * this approach.
      62             :  *
      63             :  * TWO_PHASE TRANSACTIONS
      64             :  * ----------------------
      65             :  * Two phase transactions are replayed at prepare and then committed or
      66             :  * rolled back at commit prepared and rollback prepared respectively. It is
      67             :  * possible to have a prepared transaction that arrives at the apply worker
      68             :  * when the tablesync is busy doing the initial copy. In this case, the apply
      69             :  * worker skips all the prepared operations [e.g. inserts] while the tablesync
      70             :  * is still busy (see the condition of should_apply_changes_for_rel). The
      71             :  * tablesync worker might not get such a prepared transaction because say it
      72             :  * was prior to the initial consistent point but might have got some later
      73             :  * commits. Now, the tablesync worker will exit without doing anything for the
      74             :  * prepared transaction skipped by the apply worker as the sync location for it
      75             :  * will be already ahead of the apply worker's current location. This would lead
      76             :  * to an "empty prepare", because later when the apply worker does the commit
      77             :  * prepare, there is nothing in it (the inserts were skipped earlier).
      78             :  *
      79             :  * To avoid this, and similar prepare confusions the subscription's two_phase
      80             :  * commit is enabled only after the initial sync is over. The two_phase option
      81             :  * has been implemented as a tri-state with values DISABLED, PENDING, and
      82             :  * ENABLED.
      83             :  *
      84             :  * Even if the user specifies they want a subscription with two_phase = on,
      85             :  * internally it will start with a tri-state of PENDING which only becomes
      86             :  * ENABLED after all tablesync initializations are completed - i.e. when all
      87             :  * tablesync workers have reached their READY state. In other words, the value
      88             :  * PENDING is only a temporary state for subscription start-up.
      89             :  *
      90             :  * Until the two_phase is properly available (ENABLED) the subscription will
      91             :  * behave as if two_phase = off. When the apply worker detects that all
      92             :  * tablesyncs have become READY (while the tri-state was PENDING) it will
      93             :  * restart the apply worker process. This happens in
      94             :  * process_syncing_tables_for_apply.
      95             :  *
      96             :  * When the (re-started) apply worker finds that all tablesyncs are READY for a
      97             :  * two_phase tri-state of PENDING it start streaming messages with the
      98             :  * two_phase option which in turn enables the decoding of two-phase commits at
      99             :  * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
     100             :  * Now, it is possible that during the time we have not enabled two_phase, the
     101             :  * publisher (replication server) would have skipped some prepares but we
     102             :  * ensure that such prepares are sent along with commit prepare, see
     103             :  * ReorderBufferFinishPrepared.
     104             :  *
     105             :  * If the subscription has no tables then a two_phase tri-state PENDING is
     106             :  * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
     107             :  * PUBLICATION which might otherwise be disallowed (see below).
     108             :  *
     109             :  * If ever a user needs to be aware of the tri-state value, they can fetch it
     110             :  * from the pg_subscription catalog (see column subtwophasestate).
     111             :  *
     112             :  * Finally, to avoid problems mentioned in previous paragraphs from any
     113             :  * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
     114             :  * to 'off' and then again back to 'on') there is a restriction for
     115             :  * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
     116             :  * the two_phase tri-state is ENABLED, except when copy_data = false.
     117             :  *
     118             :  * We can get prepare of the same GID more than once for the genuine cases
     119             :  * where we have defined multiple subscriptions for publications on the same
     120             :  * server and prepared transaction has operations on tables subscribed to those
     121             :  * subscriptions. For such cases, if we use the GID sent by publisher one of
     122             :  * the prepares will be successful and others will fail, in which case the
     123             :  * server will send them again. Now, this can lead to a deadlock if user has
     124             :  * set synchronous_standby_names for all the subscriptions on subscriber. To
     125             :  * avoid such deadlocks, we generate a unique GID (consisting of the
     126             :  * subscription oid and the xid of the prepared transaction) for each prepare
     127             :  * transaction on the subscriber.
     128             :  *
     129             :  * FAILOVER
     130             :  * ----------------------
     131             :  * The logical slot on the primary can be synced to the standby by specifying
     132             :  * failover = true when creating the subscription. Enabling failover allows us
     133             :  * to smoothly transition to the promoted standby, ensuring that we can
     134             :  * subscribe to the new primary without losing any data.
     135             :  *
     136             :  * RETAIN DEAD TUPLES
     137             :  * ----------------------
     138             :  * Each apply worker that enabled retain_dead_tuples option maintains a
     139             :  * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to
     140             :  * prevent dead rows from being removed prematurely when the apply worker still
     141             :  * needs them to detect conflicts reliably. This helps to retain the required
     142             :  * commit_ts module information, which further helps to detect
     143             :  * update_origin_differs and delete_origin_differs conflicts reliably, as
     144             :  * otherwise, vacuum freeze could remove the required information.
     145             :  *
     146             :  * The logical replication launcher manages an internal replication slot named
     147             :  * "pg_conflict_detection". It asynchronously aggregates the non-removable
     148             :  * transaction ID from all apply workers to determine the appropriate xmin for
     149             :  * the slot, thereby retaining necessary tuples.
     150             :  *
     151             :  * The non-removable transaction ID in the apply worker is advanced to the
     152             :  * oldest running transaction ID once all concurrent transactions on the
     153             :  * publisher have been applied and flushed locally. The process involves:
     154             :  *
     155             :  * - RDT_GET_CANDIDATE_XID:
     156             :  *   Call GetOldestActiveTransactionId() to take oldestRunningXid as the
     157             :  *   candidate xid.
     158             :  *
     159             :  * - RDT_REQUEST_PUBLISHER_STATUS:
     160             :  *   Send a message to the walsender requesting the publisher status, which
     161             :  *   includes the latest WAL write position and information about transactions
     162             :  *   that are in the commit phase.
     163             :  *
     164             :  * - RDT_WAIT_FOR_PUBLISHER_STATUS:
     165             :  *   Wait for the status from the walsender. After receiving the first status,
     166             :  *   do not proceed if there are concurrent remote transactions that are still
     167             :  *   in the commit phase. These transactions might have been assigned an
     168             :  *   earlier commit timestamp but have not yet written the commit WAL record.
     169             :  *   Continue to request the publisher status (RDT_REQUEST_PUBLISHER_STATUS)
     170             :  *   until all these transactions have completed.
     171             :  *
     172             :  * - RDT_WAIT_FOR_LOCAL_FLUSH:
     173             :  *   Advance the non-removable transaction ID if the current flush location has
     174             :  *   reached or surpassed the last received WAL position.
     175             :  *
     176             :  * The overall state progression is: GET_CANDIDATE_XID ->
     177             :  * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
     178             :  * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
     179             :  * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID.
     180             :  *
     181             :  * Retaining the dead tuples for this period is sufficient for ensuring
     182             :  * eventual consistency using last-update-wins strategy, as dead tuples are
     183             :  * useful for detecting conflicts only during the application of concurrent
     184             :  * transactions from remote nodes. After applying and flushing all remote
     185             :  * transactions that occurred concurrently with the tuple DELETE, any
     186             :  * subsequent UPDATE from a remote node should have a later timestamp. In such
     187             :  * cases, it is acceptable to detect an update_missing scenario and convert the
     188             :  * UPDATE to an INSERT when applying it. But, detecting concurrent remote
     189             :  * transactions with earlier timestamps than the DELETE is necessary, as the
     190             :  * UPDATEs in remote transactions should be ignored if their timestamp is
     191             :  * earlier than that of the dead tuples.
     192             :  *
     193             :  * Note that advancing the non-removable transaction ID is not supported if the
     194             :  * publisher is also a physical standby. This is because the logical walsender
     195             :  * on the standby can only get the WAL replay position but there may be more
     196             :  * WALs that are being replicated from the primary and those WALs could have
     197             :  * earlier commit timestamp.
     198             :  *
     199             :  * Similarly, when the publisher has subscribed to another publisher,
     200             :  * information necessary for conflict detection cannot be retained for
     201             :  * changes from origins other than the publisher. This is because publisher
     202             :  * lacks the information on concurrent transactions of other publishers to
     203             :  * which it subscribes. As the information on concurrent transactions is
     204             :  * unavailable beyond subscriber's immediate publishers, the non-removable
     205             :  * transaction ID might be advanced prematurely before changes from other
     206             :  * origins have been fully applied.
     207             :  *
     208             :  * XXX Retaining information for changes from other origins might be possible
     209             :  * by requesting the subscription on that origin to enable retain_dead_tuples
     210             :  * and fetching the conflict detection slot.xmin along with the publisher's
     211             :  * status. In the RDT_WAIT_FOR_PUBLISHER_STATUS phase, the apply worker could
     212             :  * wait for the remote slot's xmin to reach the oldest active transaction ID,
     213             :  * ensuring that all transactions from other origins have been applied on the
     214             :  * publisher, thereby getting the latest WAL position that includes all
     215             :  * concurrent changes. However, this approach may impact performance, so it
     216             :  * might not worth the effort.
     217             :  *
     218             :  * XXX It seems feasible to get the latest commit's WAL location from the
     219             :  * publisher and wait till that is applied. However, we can't do that
     220             :  * because commit timestamps can regress as a commit with a later LSN is not
     221             :  * guaranteed to have a later timestamp than those with earlier LSNs. Having
     222             :  * said that, even if that is possible, it won't improve performance much as
     223             :  * the apply always lag and moves slowly as compared with the transactions
     224             :  * on the publisher.
     225             :  *-------------------------------------------------------------------------
     226             :  */
     227             : 
     228             : #include "postgres.h"
     229             : 
     230             : #include <sys/stat.h>
     231             : #include <unistd.h>
     232             : 
     233             : #include "access/commit_ts.h"
     234             : #include "access/table.h"
     235             : #include "access/tableam.h"
     236             : #include "access/twophase.h"
     237             : #include "access/xact.h"
     238             : #include "catalog/indexing.h"
     239             : #include "catalog/pg_inherits.h"
     240             : #include "catalog/pg_subscription.h"
     241             : #include "catalog/pg_subscription_rel.h"
     242             : #include "commands/subscriptioncmds.h"
     243             : #include "commands/tablecmds.h"
     244             : #include "commands/trigger.h"
     245             : #include "executor/executor.h"
     246             : #include "executor/execPartition.h"
     247             : #include "libpq/pqformat.h"
     248             : #include "miscadmin.h"
     249             : #include "optimizer/optimizer.h"
     250             : #include "parser/parse_relation.h"
     251             : #include "pgstat.h"
     252             : #include "postmaster/bgworker.h"
     253             : #include "postmaster/interrupt.h"
     254             : #include "postmaster/walwriter.h"
     255             : #include "replication/conflict.h"
     256             : #include "replication/logicallauncher.h"
     257             : #include "replication/logicalproto.h"
     258             : #include "replication/logicalrelation.h"
     259             : #include "replication/logicalworker.h"
     260             : #include "replication/origin.h"
     261             : #include "replication/slot.h"
     262             : #include "replication/walreceiver.h"
     263             : #include "replication/worker_internal.h"
     264             : #include "rewrite/rewriteHandler.h"
     265             : #include "storage/buffile.h"
     266             : #include "storage/ipc.h"
     267             : #include "storage/lmgr.h"
     268             : #include "storage/procarray.h"
     269             : #include "tcop/tcopprot.h"
     270             : #include "utils/acl.h"
     271             : #include "utils/dynahash.h"
     272             : #include "utils/guc.h"
     273             : #include "utils/inval.h"
     274             : #include "utils/lsyscache.h"
     275             : #include "utils/memutils.h"
     276             : #include "utils/pg_lsn.h"
     277             : #include "utils/rel.h"
     278             : #include "utils/rls.h"
     279             : #include "utils/snapmgr.h"
     280             : #include "utils/syscache.h"
     281             : #include "utils/usercontext.h"
     282             : 
     283             : #define NAPTIME_PER_CYCLE 1000  /* max sleep time between cycles (1s) */
     284             : 
     285             : typedef struct FlushPosition
     286             : {
     287             :     dlist_node  node;
     288             :     XLogRecPtr  local_end;
     289             :     XLogRecPtr  remote_end;
     290             : } FlushPosition;
     291             : 
     292             : static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
     293             : 
     294             : typedef struct ApplyExecutionData
     295             : {
     296             :     EState     *estate;         /* executor state, used to track resources */
     297             : 
     298             :     LogicalRepRelMapEntry *targetRel;   /* replication target rel */
     299             :     ResultRelInfo *targetRelInfo;   /* ResultRelInfo for same */
     300             : 
     301             :     /* These fields are used when the target relation is partitioned: */
     302             :     ModifyTableState *mtstate;  /* dummy ModifyTable state */
     303             :     PartitionTupleRouting *proute;  /* partition routing info */
     304             : } ApplyExecutionData;
     305             : 
     306             : /* Struct for saving and restoring apply errcontext information */
     307             : typedef struct ApplyErrorCallbackArg
     308             : {
     309             :     LogicalRepMsgType command;  /* 0 if invalid */
     310             :     LogicalRepRelMapEntry *rel;
     311             : 
     312             :     /* Remote node information */
     313             :     int         remote_attnum;  /* -1 if invalid */
     314             :     TransactionId remote_xid;
     315             :     XLogRecPtr  finish_lsn;
     316             :     char       *origin_name;
     317             : } ApplyErrorCallbackArg;
     318             : 
     319             : /*
     320             :  * The action to be taken for the changes in the transaction.
     321             :  *
     322             :  * TRANS_LEADER_APPLY:
     323             :  * This action means that we are in the leader apply worker or table sync
     324             :  * worker. The changes of the transaction are either directly applied or
     325             :  * are read from temporary files (for streaming transactions) and then
     326             :  * applied by the worker.
     327             :  *
     328             :  * TRANS_LEADER_SERIALIZE:
     329             :  * This action means that we are in the leader apply worker or table sync
     330             :  * worker. Changes are written to temporary files and then applied when the
     331             :  * final commit arrives.
     332             :  *
     333             :  * TRANS_LEADER_SEND_TO_PARALLEL:
     334             :  * This action means that we are in the leader apply worker and need to send
     335             :  * the changes to the parallel apply worker.
     336             :  *
     337             :  * TRANS_LEADER_PARTIAL_SERIALIZE:
     338             :  * This action means that we are in the leader apply worker and have sent some
     339             :  * changes directly to the parallel apply worker and the remaining changes are
     340             :  * serialized to a file, due to timeout while sending data. The parallel apply
     341             :  * worker will apply these serialized changes when the final commit arrives.
     342             :  *
     343             :  * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
     344             :  * serializing changes, the leader worker also needs to serialize the
     345             :  * STREAM_XXX message to a file, and wait for the parallel apply worker to
     346             :  * finish the transaction when processing the transaction finish command. So
     347             :  * this new action was introduced to keep the code and logic clear.
     348             :  *
     349             :  * TRANS_PARALLEL_APPLY:
     350             :  * This action means that we are in the parallel apply worker and changes of
     351             :  * the transaction are applied directly by the worker.
     352             :  */
     353             : typedef enum
     354             : {
     355             :     /* The action for non-streaming transactions. */
     356             :     TRANS_LEADER_APPLY,
     357             : 
     358             :     /* Actions for streaming transactions. */
     359             :     TRANS_LEADER_SERIALIZE,
     360             :     TRANS_LEADER_SEND_TO_PARALLEL,
     361             :     TRANS_LEADER_PARTIAL_SERIALIZE,
     362             :     TRANS_PARALLEL_APPLY,
     363             : } TransApplyAction;
     364             : 
     365             : /*
     366             :  * The phases involved in advancing the non-removable transaction ID.
     367             :  *
     368             :  * See comments atop worker.c for details of the transition between these
     369             :  * phases.
     370             :  */
     371             : typedef enum
     372             : {
     373             :     RDT_GET_CANDIDATE_XID,
     374             :     RDT_REQUEST_PUBLISHER_STATUS,
     375             :     RDT_WAIT_FOR_PUBLISHER_STATUS,
     376             :     RDT_WAIT_FOR_LOCAL_FLUSH
     377             : } RetainDeadTuplesPhase;
     378             : 
     379             : /*
     380             :  * Critical information for managing phase transitions within the
     381             :  * RetainDeadTuplesPhase.
     382             :  */
     383             : typedef struct RetainDeadTuplesData
     384             : {
     385             :     RetainDeadTuplesPhase phase;    /* current phase */
     386             :     XLogRecPtr  remote_lsn;     /* WAL write position on the publisher */
     387             : 
     388             :     /*
     389             :      * Oldest transaction ID that was in the commit phase on the publisher.
     390             :      * Use FullTransactionId to prevent issues with transaction ID wraparound,
     391             :      * where a new remote_oldestxid could falsely appear to originate from the
     392             :      * past and block advancement.
     393             :      */
     394             :     FullTransactionId remote_oldestxid;
     395             : 
     396             :     /*
     397             :      * Next transaction ID to be assigned on the publisher. Use
     398             :      * FullTransactionId for consistency and to allow straightforward
     399             :      * comparisons with remote_oldestxid.
     400             :      */
     401             :     FullTransactionId remote_nextxid;
     402             : 
     403             :     TimestampTz reply_time;     /* when the publisher responds with status */
     404             : 
     405             :     /*
     406             :      * Publisher transaction ID that must be awaited to complete before
     407             :      * entering the final phase (RDT_WAIT_FOR_LOCAL_FLUSH). Use
     408             :      * FullTransactionId for the same reason as remote_nextxid.
     409             :      */
     410             :     FullTransactionId remote_wait_for;
     411             : 
     412             :     TransactionId candidate_xid;    /* candidate for the non-removable
     413             :                                      * transaction ID */
     414             :     TimestampTz flushpos_update_time;   /* when the remote flush position was
     415             :                                          * updated in final phase
     416             :                                          * (RDT_WAIT_FOR_LOCAL_FLUSH) */
     417             : 
     418             :     /*
     419             :      * The following fields are used to determine the timing for the next
     420             :      * round of transaction ID advancement.
     421             :      */
     422             :     TimestampTz last_recv_time; /* when the last message was received */
     423             :     TimestampTz candidate_xid_time; /* when the candidate_xid is decided */
     424             :     int         xid_advance_interval;   /* how much time (ms) to wait before
     425             :                                          * attempting to advance the
     426             :                                          * non-removable transaction ID */
     427             : } RetainDeadTuplesData;
     428             : 
     429             : /*
     430             :  * The minimum (100ms) and maximum (3 minutes) intervals for advancing
     431             :  * non-removable transaction IDs. The maximum interval is a bit arbitrary but
     432             :  * is sufficient to not cause any undue network traffic.
     433             :  */
     434             : #define MIN_XID_ADVANCE_INTERVAL 100
     435             : #define MAX_XID_ADVANCE_INTERVAL 180000
     436             : 
     437             : /* errcontext tracker */
     438             : static ApplyErrorCallbackArg apply_error_callback_arg =
     439             : {
     440             :     .command = 0,
     441             :     .rel = NULL,
     442             :     .remote_attnum = -1,
     443             :     .remote_xid = InvalidTransactionId,
     444             :     .finish_lsn = InvalidXLogRecPtr,
     445             :     .origin_name = NULL,
     446             : };
     447             : 
     448             : ErrorContextCallback *apply_error_context_stack = NULL;
     449             : 
     450             : MemoryContext ApplyMessageContext = NULL;
     451             : MemoryContext ApplyContext = NULL;
     452             : 
     453             : /* per stream context for streaming transactions */
     454             : static MemoryContext LogicalStreamingContext = NULL;
     455             : 
     456             : WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
     457             : 
     458             : Subscription *MySubscription = NULL;
     459             : static bool MySubscriptionValid = false;
     460             : 
     461             : static List *on_commit_wakeup_workers_subids = NIL;
     462             : 
     463             : bool        in_remote_transaction = false;
     464             : static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
     465             : 
     466             : /* fields valid only when processing streamed transaction */
     467             : static bool in_streamed_transaction = false;
     468             : 
     469             : static TransactionId stream_xid = InvalidTransactionId;
     470             : 
     471             : /*
     472             :  * The number of changes applied by parallel apply worker during one streaming
     473             :  * block.
     474             :  */
     475             : static uint32 parallel_stream_nchanges = 0;
     476             : 
     477             : /* Are we initializing an apply worker? */
     478             : bool        InitializingApplyWorker = false;
     479             : 
     480             : /*
     481             :  * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
     482             :  * the subscription if the remote transaction's finish LSN matches the subskiplsn.
     483             :  * Once we start skipping changes, we don't stop it until we skip all changes of
     484             :  * the transaction even if pg_subscription is updated and MySubscription->skiplsn
     485             :  * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
     486             :  * we don't skip receiving and spooling the changes since we decide whether or not
     487             :  * to skip applying the changes when starting to apply changes. The subskiplsn is
     488             :  * cleared after successfully skipping the transaction or applying non-empty
     489             :  * transaction. The latter prevents the mistakenly specified subskiplsn from
     490             :  * being left. Note that we cannot skip the streaming transactions when using
     491             :  * parallel apply workers because we cannot get the finish LSN before applying
     492             :  * the changes. So, we don't start parallel apply worker when finish LSN is set
     493             :  * by the user.
     494             :  */
     495             : static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
     496             : #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
     497             : 
     498             : /* BufFile handle of the current streaming file */
     499             : static BufFile *stream_fd = NULL;
     500             : 
     501             : /*
     502             :  * The remote WAL position that has been applied and flushed locally. We record
     503             :  * and use this information both while sending feedback to the server and
     504             :  * advancing oldest_nonremovable_xid.
     505             :  */
     506             : static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
     507             : 
     508             : typedef struct SubXactInfo
     509             : {
     510             :     TransactionId xid;          /* XID of the subxact */
     511             :     int         fileno;         /* file number in the buffile */
     512             :     off_t       offset;         /* offset in the file */
     513             : } SubXactInfo;
     514             : 
     515             : /* Sub-transaction data for the current streaming transaction */
     516             : typedef struct ApplySubXactData
     517             : {
     518             :     uint32      nsubxacts;      /* number of sub-transactions */
     519             :     uint32      nsubxacts_max;  /* current capacity of subxacts */
     520             :     TransactionId subxact_last; /* xid of the last sub-transaction */
     521             :     SubXactInfo *subxacts;      /* sub-xact offset in changes file */
     522             : } ApplySubXactData;
     523             : 
     524             : static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
     525             : 
     526             : static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
     527             : static inline void changes_filename(char *path, Oid subid, TransactionId xid);
     528             : 
     529             : /*
     530             :  * Information about subtransactions of a given toplevel transaction.
     531             :  */
     532             : static void subxact_info_write(Oid subid, TransactionId xid);
     533             : static void subxact_info_read(Oid subid, TransactionId xid);
     534             : static void subxact_info_add(TransactionId xid);
     535             : static inline void cleanup_subxact_info(void);
     536             : 
     537             : /*
     538             :  * Serialize and deserialize changes for a toplevel transaction.
     539             :  */
     540             : static void stream_open_file(Oid subid, TransactionId xid,
     541             :                              bool first_segment);
     542             : static void stream_write_change(char action, StringInfo s);
     543             : static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
     544             : static void stream_close_file(void);
     545             : 
     546             : static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
     547             : 
     548             : static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
     549             :                                            bool status_received);
     550             : static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data);
     551             : static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
     552             :                                          bool status_received);
     553             : static void get_candidate_xid(RetainDeadTuplesData *rdt_data);
     554             : static void request_publisher_status(RetainDeadTuplesData *rdt_data);
     555             : static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
     556             :                                       bool status_received);
     557             : static void wait_for_local_flush(RetainDeadTuplesData *rdt_data);
     558             : static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data,
     559             :                                         bool new_xid_found);
     560             : 
     561             : static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
     562             : static void apply_handle_insert_internal(ApplyExecutionData *edata,
     563             :                                          ResultRelInfo *relinfo,
     564             :                                          TupleTableSlot *remoteslot);
     565             : static void apply_handle_update_internal(ApplyExecutionData *edata,
     566             :                                          ResultRelInfo *relinfo,
     567             :                                          TupleTableSlot *remoteslot,
     568             :                                          LogicalRepTupleData *newtup,
     569             :                                          Oid localindexoid);
     570             : static void apply_handle_delete_internal(ApplyExecutionData *edata,
     571             :                                          ResultRelInfo *relinfo,
     572             :                                          TupleTableSlot *remoteslot,
     573             :                                          Oid localindexoid);
     574             : static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
     575             :                                     LogicalRepRelation *remoterel,
     576             :                                     Oid localidxoid,
     577             :                                     TupleTableSlot *remoteslot,
     578             :                                     TupleTableSlot **localslot);
     579             : static void apply_handle_tuple_routing(ApplyExecutionData *edata,
     580             :                                        TupleTableSlot *remoteslot,
     581             :                                        LogicalRepTupleData *newtup,
     582             :                                        CmdType operation);
     583             : 
     584             : /* Functions for skipping changes */
     585             : static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
     586             : static void stop_skipping_changes(void);
     587             : static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
     588             : 
     589             : /* Functions for apply error callback */
     590             : static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
     591             : static inline void reset_apply_error_context_info(void);
     592             : 
     593             : static TransApplyAction get_transaction_apply_action(TransactionId xid,
     594             :                                                      ParallelApplyWorkerInfo **winfo);
     595             : 
     596             : static void replorigin_reset(int code, Datum arg);
     597             : 
     598             : /*
     599             :  * Form the origin name for the subscription.
     600             :  *
     601             :  * This is a common function for tablesync and other workers. Tablesync workers
     602             :  * must pass a valid relid. Other callers must pass relid = InvalidOid.
     603             :  *
     604             :  * Return the name in the supplied buffer.
     605             :  */
     606             : void
     607        2546 : ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
     608             :                                    char *originname, Size szoriginname)
     609             : {
     610        2546 :     if (OidIsValid(relid))
     611             :     {
     612             :         /* Replication origin name for tablesync workers. */
     613        1490 :         snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
     614             :     }
     615             :     else
     616             :     {
     617             :         /* Replication origin name for non-tablesync workers. */
     618        1056 :         snprintf(originname, szoriginname, "pg_%u", suboid);
     619             :     }
     620        2546 : }
     621             : 
     622             : /*
     623             :  * Should this worker apply changes for given relation.
     624             :  *
     625             :  * This is mainly needed for initial relation data sync as that runs in
     626             :  * separate worker process running in parallel and we need some way to skip
     627             :  * changes coming to the leader apply worker during the sync of a table.
     628             :  *
     629             :  * Note we need to do smaller or equals comparison for SYNCDONE state because
     630             :  * it might hold position of end of initial slot consistent point WAL
     631             :  * record + 1 (ie start of next record) and next record can be COMMIT of
     632             :  * transaction we are now processing (which is what we set remote_final_lsn
     633             :  * to in apply_handle_begin).
     634             :  *
     635             :  * Note that for streaming transactions that are being applied in the parallel
     636             :  * apply worker, we disallow applying changes if the target table in the
     637             :  * subscription is not in the READY state, because we cannot decide whether to
     638             :  * apply the change as we won't know remote_final_lsn by that time.
     639             :  *
     640             :  * We already checked this in pa_can_start() before assigning the
     641             :  * streaming transaction to the parallel worker, but it also needs to be
     642             :  * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
     643             :  * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
     644             :  * while applying this transaction.
     645             :  */
     646             : static bool
     647      296234 : should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
     648             : {
     649      296234 :     switch (MyLogicalRepWorker->type)
     650             :     {
     651          20 :         case WORKERTYPE_TABLESYNC:
     652          20 :             return MyLogicalRepWorker->relid == rel->localreloid;
     653             : 
     654      136730 :         case WORKERTYPE_PARALLEL_APPLY:
     655             :             /* We don't synchronize rel's that are in unknown state. */
     656      136730 :             if (rel->state != SUBREL_STATE_READY &&
     657           0 :                 rel->state != SUBREL_STATE_UNKNOWN)
     658           0 :                 ereport(ERROR,
     659             :                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     660             :                          errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
     661             :                                 MySubscription->name),
     662             :                          errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
     663             : 
     664      136730 :             return rel->state == SUBREL_STATE_READY;
     665             : 
     666      159484 :         case WORKERTYPE_APPLY:
     667      159628 :             return (rel->state == SUBREL_STATE_READY ||
     668         144 :                     (rel->state == SUBREL_STATE_SYNCDONE &&
     669          22 :                      rel->statelsn <= remote_final_lsn));
     670             : 
     671           0 :         case WORKERTYPE_UNKNOWN:
     672             :             /* Should never happen. */
     673           0 :             elog(ERROR, "Unknown worker type");
     674             :     }
     675             : 
     676           0 :     return false;               /* dummy for compiler */
     677             : }
     678             : 
     679             : /*
     680             :  * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
     681             :  *
     682             :  * Start a transaction, if this is the first step (else we keep using the
     683             :  * existing transaction).
     684             :  * Also provide a global snapshot and ensure we run in ApplyMessageContext.
     685             :  */
     686             : static void
     687      297138 : begin_replication_step(void)
     688             : {
     689      297138 :     SetCurrentStatementStartTimestamp();
     690             : 
     691      297138 :     if (!IsTransactionState())
     692             :     {
     693        1870 :         StartTransactionCommand();
     694        1870 :         maybe_reread_subscription();
     695             :     }
     696             : 
     697      297134 :     PushActiveSnapshot(GetTransactionSnapshot());
     698             : 
     699      297134 :     MemoryContextSwitchTo(ApplyMessageContext);
     700      297134 : }
     701             : 
     702             : /*
     703             :  * Finish up one step of a replication transaction.
     704             :  * Callers of begin_replication_step() must also call this.
     705             :  *
     706             :  * We don't close out the transaction here, but we should increment
     707             :  * the command counter to make the effects of this step visible.
     708             :  */
     709             : static void
     710      297066 : end_replication_step(void)
     711             : {
     712      297066 :     PopActiveSnapshot();
     713             : 
     714      297066 :     CommandCounterIncrement();
     715      297066 : }
     716             : 
     717             : /*
     718             :  * Handle streamed transactions for both the leader apply worker and the
     719             :  * parallel apply workers.
     720             :  *
     721             :  * In the streaming case (receiving a block of the streamed transaction), for
     722             :  * serialize mode, simply redirect it to a file for the proper toplevel
     723             :  * transaction, and for parallel mode, the leader apply worker will send the
     724             :  * changes to parallel apply workers and the parallel apply worker will define
     725             :  * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
     726             :  * messages will be applied by both leader apply worker and parallel apply
     727             :  * workers).
     728             :  *
     729             :  * Returns true for streamed transactions (when the change is either serialized
     730             :  * to file or sent to parallel apply worker), false otherwise (regular mode or
     731             :  * needs to be processed by parallel apply worker).
     732             :  *
     733             :  * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
     734             :  * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
     735             :  * to a parallel apply worker.
     736             :  */
     737             : static bool
     738      648832 : handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
     739             : {
     740             :     TransactionId current_xid;
     741             :     ParallelApplyWorkerInfo *winfo;
     742             :     TransApplyAction apply_action;
     743             :     StringInfoData original_msg;
     744             : 
     745      648832 :     apply_action = get_transaction_apply_action(stream_xid, &winfo);
     746             : 
     747             :     /* not in streaming mode */
     748      648832 :     if (apply_action == TRANS_LEADER_APPLY)
     749      160238 :         return false;
     750             : 
     751             :     Assert(TransactionIdIsValid(stream_xid));
     752             : 
     753             :     /*
     754             :      * The parallel apply worker needs the xid in this message to decide
     755             :      * whether to define a savepoint, so save the original message that has
     756             :      * not moved the cursor after the xid. We will serialize this message to a
     757             :      * file in PARTIAL_SERIALIZE mode.
     758             :      */
     759      488594 :     original_msg = *s;
     760             : 
     761             :     /*
     762             :      * We should have received XID of the subxact as the first part of the
     763             :      * message, so extract it.
     764             :      */
     765      488594 :     current_xid = pq_getmsgint(s, 4);
     766             : 
     767      488594 :     if (!TransactionIdIsValid(current_xid))
     768           0 :         ereport(ERROR,
     769             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     770             :                  errmsg_internal("invalid transaction ID in streamed replication transaction")));
     771             : 
     772      488594 :     switch (apply_action)
     773             :     {
     774      205024 :         case TRANS_LEADER_SERIALIZE:
     775             :             Assert(stream_fd);
     776             : 
     777             :             /* Add the new subxact to the array (unless already there). */
     778      205024 :             subxact_info_add(current_xid);
     779             : 
     780             :             /* Write the change to the current file */
     781      205024 :             stream_write_change(action, s);
     782      205024 :             return true;
     783             : 
     784      136772 :         case TRANS_LEADER_SEND_TO_PARALLEL:
     785             :             Assert(winfo);
     786             : 
     787             :             /*
     788             :              * XXX The publisher side doesn't always send relation/type update
     789             :              * messages after the streaming transaction, so also update the
     790             :              * relation/type in leader apply worker. See function
     791             :              * cleanup_rel_sync_cache.
     792             :              */
     793      136772 :             if (pa_send_data(winfo, s->len, s->data))
     794      136772 :                 return (action != LOGICAL_REP_MSG_RELATION &&
     795             :                         action != LOGICAL_REP_MSG_TYPE);
     796             : 
     797             :             /*
     798             :              * Switch to serialize mode when we are not able to send the
     799             :              * change to parallel apply worker.
     800             :              */
     801           0 :             pa_switch_to_partial_serialize(winfo, false);
     802             : 
     803             :             /* fall through */
     804       10012 :         case TRANS_LEADER_PARTIAL_SERIALIZE:
     805       10012 :             stream_write_change(action, &original_msg);
     806             : 
     807             :             /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
     808       10012 :             return (action != LOGICAL_REP_MSG_RELATION &&
     809             :                     action != LOGICAL_REP_MSG_TYPE);
     810             : 
     811      136786 :         case TRANS_PARALLEL_APPLY:
     812      136786 :             parallel_stream_nchanges += 1;
     813             : 
     814             :             /* Define a savepoint for a subxact if needed. */
     815      136786 :             pa_start_subtrans(current_xid, stream_xid);
     816      136786 :             return false;
     817             : 
     818           0 :         default:
     819           0 :             elog(ERROR, "unexpected apply action: %d", (int) apply_action);
     820             :             return false;       /* silence compiler warning */
     821             :     }
     822             : }
     823             : 
     824             : /*
     825             :  * Executor state preparation for evaluation of constraint expressions,
     826             :  * indexes and triggers for the specified relation.
     827             :  *
     828             :  * Note that the caller must open and close any indexes to be updated.
     829             :  */
     830             : static ApplyExecutionData *
     831      296054 : create_edata_for_relation(LogicalRepRelMapEntry *rel)
     832             : {
     833             :     ApplyExecutionData *edata;
     834             :     EState     *estate;
     835             :     RangeTblEntry *rte;
     836      296054 :     List       *perminfos = NIL;
     837             :     ResultRelInfo *resultRelInfo;
     838             : 
     839      296054 :     edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
     840      296054 :     edata->targetRel = rel;
     841             : 
     842      296054 :     edata->estate = estate = CreateExecutorState();
     843             : 
     844      296054 :     rte = makeNode(RangeTblEntry);
     845      296054 :     rte->rtekind = RTE_RELATION;
     846      296054 :     rte->relid = RelationGetRelid(rel->localrel);
     847      296054 :     rte->relkind = rel->localrel->rd_rel->relkind;
     848      296054 :     rte->rellockmode = AccessShareLock;
     849             : 
     850      296054 :     addRTEPermissionInfo(&perminfos, rte);
     851             : 
     852      296054 :     ExecInitRangeTable(estate, list_make1(rte), perminfos,
     853             :                        bms_make_singleton(1));
     854             : 
     855      296054 :     edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
     856             : 
     857             :     /*
     858             :      * Use Relation opened by logicalrep_rel_open() instead of opening it
     859             :      * again.
     860             :      */
     861      296054 :     InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
     862             : 
     863             :     /*
     864             :      * We put the ResultRelInfo in the es_opened_result_relations list, even
     865             :      * though we don't populate the es_result_relations array.  That's a bit
     866             :      * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
     867             :      *
     868             :      * ExecOpenIndices() is not called here either, each execution path doing
     869             :      * an apply operation being responsible for that.
     870             :      */
     871      296054 :     estate->es_opened_result_relations =
     872      296054 :         lappend(estate->es_opened_result_relations, resultRelInfo);
     873             : 
     874      296054 :     estate->es_output_cid = GetCurrentCommandId(true);
     875             : 
     876             :     /* Prepare to catch AFTER triggers. */
     877      296054 :     AfterTriggerBeginQuery();
     878             : 
     879             :     /* other fields of edata remain NULL for now */
     880             : 
     881      296054 :     return edata;
     882             : }
     883             : 
     884             : /*
     885             :  * Finish any operations related to the executor state created by
     886             :  * create_edata_for_relation().
     887             :  */
     888             : static void
     889      296000 : finish_edata(ApplyExecutionData *edata)
     890             : {
     891      296000 :     EState     *estate = edata->estate;
     892             : 
     893             :     /* Handle any queued AFTER triggers. */
     894      296000 :     AfterTriggerEndQuery(estate);
     895             : 
     896             :     /* Shut down tuple routing, if any was done. */
     897      296000 :     if (edata->proute)
     898         148 :         ExecCleanupTupleRouting(edata->mtstate, edata->proute);
     899             : 
     900             :     /*
     901             :      * Cleanup.  It might seem that we should call ExecCloseResultRelations()
     902             :      * here, but we intentionally don't.  It would close the rel we added to
     903             :      * es_opened_result_relations above, which is wrong because we took no
     904             :      * corresponding refcount.  We rely on ExecCleanupTupleRouting() to close
     905             :      * any other relations opened during execution.
     906             :      */
     907      296000 :     ExecResetTupleTable(estate->es_tupleTable, false);
     908      296000 :     FreeExecutorState(estate);
     909      296000 :     pfree(edata);
     910      296000 : }
     911             : 
     912             : /*
     913             :  * Executes default values for columns for which we can't map to remote
     914             :  * relation columns.
     915             :  *
     916             :  * This allows us to support tables which have more columns on the downstream
     917             :  * than on the upstream.
     918             :  */
     919             : static void
     920      151546 : slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
     921             :                    TupleTableSlot *slot)
     922             : {
     923      151546 :     TupleDesc   desc = RelationGetDescr(rel->localrel);
     924      151546 :     int         num_phys_attrs = desc->natts;
     925             :     int         i;
     926             :     int         attnum,
     927      151546 :                 num_defaults = 0;
     928             :     int        *defmap;
     929             :     ExprState **defexprs;
     930             :     ExprContext *econtext;
     931             : 
     932      151546 :     econtext = GetPerTupleExprContext(estate);
     933             : 
     934             :     /* We got all the data via replication, no need to evaluate anything. */
     935      151546 :     if (num_phys_attrs == rel->remoterel.natts)
     936       71256 :         return;
     937             : 
     938       80290 :     defmap = (int *) palloc(num_phys_attrs * sizeof(int));
     939       80290 :     defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
     940             : 
     941             :     Assert(rel->attrmap->maplen == num_phys_attrs);
     942      421326 :     for (attnum = 0; attnum < num_phys_attrs; attnum++)
     943             :     {
     944             :         Expr       *defexpr;
     945             : 
     946      341036 :         if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
     947          18 :             continue;
     948             : 
     949      341018 :         if (rel->attrmap->attnums[attnum] >= 0)
     950      184536 :             continue;
     951             : 
     952      156482 :         defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
     953             : 
     954      156482 :         if (defexpr != NULL)
     955             :         {
     956             :             /* Run the expression through planner */
     957      140262 :             defexpr = expression_planner(defexpr);
     958             : 
     959             :             /* Initialize executable expression in copycontext */
     960      140262 :             defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
     961      140262 :             defmap[num_defaults] = attnum;
     962      140262 :             num_defaults++;
     963             :         }
     964             :     }
     965             : 
     966      220552 :     for (i = 0; i < num_defaults; i++)
     967      140262 :         slot->tts_values[defmap[i]] =
     968      140262 :             ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
     969             : }
     970             : 
     971             : /*
     972             :  * Store tuple data into slot.
     973             :  *
     974             :  * Incoming data can be either text or binary format.
     975             :  */
     976             : static void
     977      296074 : slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
     978             :                 LogicalRepTupleData *tupleData)
     979             : {
     980      296074 :     int         natts = slot->tts_tupleDescriptor->natts;
     981             :     int         i;
     982             : 
     983      296074 :     ExecClearTuple(slot);
     984             : 
     985             :     /* Call the "in" function for each non-dropped, non-null attribute */
     986             :     Assert(natts == rel->attrmap->maplen);
     987     1315116 :     for (i = 0; i < natts; i++)
     988             :     {
     989     1019042 :         Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
     990     1019042 :         int         remoteattnum = rel->attrmap->attnums[i];
     991             : 
     992     1019042 :         if (!att->attisdropped && remoteattnum >= 0)
     993      605208 :         {
     994      605208 :             StringInfo  colvalue = &tupleData->colvalues[remoteattnum];
     995             : 
     996             :             Assert(remoteattnum < tupleData->ncols);
     997             : 
     998             :             /* Set attnum for error callback */
     999      605208 :             apply_error_callback_arg.remote_attnum = remoteattnum;
    1000             : 
    1001      605208 :             if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
    1002             :             {
    1003             :                 Oid         typinput;
    1004             :                 Oid         typioparam;
    1005             : 
    1006      284592 :                 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
    1007      569184 :                 slot->tts_values[i] =
    1008      284592 :                     OidInputFunctionCall(typinput, colvalue->data,
    1009             :                                          typioparam, att->atttypmod);
    1010      284592 :                 slot->tts_isnull[i] = false;
    1011             :             }
    1012      320616 :             else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
    1013             :             {
    1014             :                 Oid         typreceive;
    1015             :                 Oid         typioparam;
    1016             : 
    1017             :                 /*
    1018             :                  * In some code paths we may be asked to re-parse the same
    1019             :                  * tuple data.  Reset the StringInfo's cursor so that works.
    1020             :                  */
    1021      219960 :                 colvalue->cursor = 0;
    1022             : 
    1023      219960 :                 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
    1024      439920 :                 slot->tts_values[i] =
    1025      219960 :                     OidReceiveFunctionCall(typreceive, colvalue,
    1026             :                                            typioparam, att->atttypmod);
    1027             : 
    1028             :                 /* Trouble if it didn't eat the whole buffer */
    1029      219960 :                 if (colvalue->cursor != colvalue->len)
    1030           0 :                     ereport(ERROR,
    1031             :                             (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
    1032             :                              errmsg("incorrect binary data format in logical replication column %d",
    1033             :                                     remoteattnum + 1)));
    1034      219960 :                 slot->tts_isnull[i] = false;
    1035             :             }
    1036             :             else
    1037             :             {
    1038             :                 /*
    1039             :                  * NULL value from remote.  (We don't expect to see
    1040             :                  * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
    1041             :                  * NULL.)
    1042             :                  */
    1043      100656 :                 slot->tts_values[i] = (Datum) 0;
    1044      100656 :                 slot->tts_isnull[i] = true;
    1045             :             }
    1046             : 
    1047             :             /* Reset attnum for error callback */
    1048      605208 :             apply_error_callback_arg.remote_attnum = -1;
    1049             :         }
    1050             :         else
    1051             :         {
    1052             :             /*
    1053             :              * We assign NULL to dropped attributes and missing values
    1054             :              * (missing values should be later filled using
    1055             :              * slot_fill_defaults).
    1056             :              */
    1057      413834 :             slot->tts_values[i] = (Datum) 0;
    1058      413834 :             slot->tts_isnull[i] = true;
    1059             :         }
    1060             :     }
    1061             : 
    1062      296074 :     ExecStoreVirtualTuple(slot);
    1063      296074 : }
    1064             : 
    1065             : /*
    1066             :  * Replace updated columns with data from the LogicalRepTupleData struct.
    1067             :  * This is somewhat similar to heap_modify_tuple but also calls the type
    1068             :  * input functions on the user data.
    1069             :  *
    1070             :  * "slot" is filled with a copy of the tuple in "srcslot", replacing
    1071             :  * columns provided in "tupleData" and leaving others as-is.
    1072             :  *
    1073             :  * Caution: unreplaced pass-by-ref columns in "slot" will point into the
    1074             :  * storage for "srcslot".  This is OK for current usage, but someday we may
    1075             :  * need to materialize "slot" at the end to make it independent of "srcslot".
    1076             :  */
    1077             : static void
    1078       63848 : slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
    1079             :                  LogicalRepRelMapEntry *rel,
    1080             :                  LogicalRepTupleData *tupleData)
    1081             : {
    1082       63848 :     int         natts = slot->tts_tupleDescriptor->natts;
    1083             :     int         i;
    1084             : 
    1085             :     /* We'll fill "slot" with a virtual tuple, so we must start with ... */
    1086       63848 :     ExecClearTuple(slot);
    1087             : 
    1088             :     /*
    1089             :      * Copy all the column data from srcslot, so that we'll have valid values
    1090             :      * for unreplaced columns.
    1091             :      */
    1092             :     Assert(natts == srcslot->tts_tupleDescriptor->natts);
    1093       63848 :     slot_getallattrs(srcslot);
    1094       63848 :     memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
    1095       63848 :     memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
    1096             : 
    1097             :     /* Call the "in" function for each replaced attribute */
    1098             :     Assert(natts == rel->attrmap->maplen);
    1099      318560 :     for (i = 0; i < natts; i++)
    1100             :     {
    1101      254712 :         Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
    1102      254712 :         int         remoteattnum = rel->attrmap->attnums[i];
    1103             : 
    1104      254712 :         if (remoteattnum < 0)
    1105      117038 :             continue;
    1106             : 
    1107             :         Assert(remoteattnum < tupleData->ncols);
    1108             : 
    1109      137674 :         if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
    1110             :         {
    1111      137668 :             StringInfo  colvalue = &tupleData->colvalues[remoteattnum];
    1112             : 
    1113             :             /* Set attnum for error callback */
    1114      137668 :             apply_error_callback_arg.remote_attnum = remoteattnum;
    1115             : 
    1116      137668 :             if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
    1117             :             {
    1118             :                 Oid         typinput;
    1119             :                 Oid         typioparam;
    1120             : 
    1121       50860 :                 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
    1122      101720 :                 slot->tts_values[i] =
    1123       50860 :                     OidInputFunctionCall(typinput, colvalue->data,
    1124             :                                          typioparam, att->atttypmod);
    1125       50860 :                 slot->tts_isnull[i] = false;
    1126             :             }
    1127       86808 :             else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
    1128             :             {
    1129             :                 Oid         typreceive;
    1130             :                 Oid         typioparam;
    1131             : 
    1132             :                 /*
    1133             :                  * In some code paths we may be asked to re-parse the same
    1134             :                  * tuple data.  Reset the StringInfo's cursor so that works.
    1135             :                  */
    1136       86712 :                 colvalue->cursor = 0;
    1137             : 
    1138       86712 :                 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
    1139      173424 :                 slot->tts_values[i] =
    1140       86712 :                     OidReceiveFunctionCall(typreceive, colvalue,
    1141             :                                            typioparam, att->atttypmod);
    1142             : 
    1143             :                 /* Trouble if it didn't eat the whole buffer */
    1144       86712 :                 if (colvalue->cursor != colvalue->len)
    1145           0 :                     ereport(ERROR,
    1146             :                             (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
    1147             :                              errmsg("incorrect binary data format in logical replication column %d",
    1148             :                                     remoteattnum + 1)));
    1149       86712 :                 slot->tts_isnull[i] = false;
    1150             :             }
    1151             :             else
    1152             :             {
    1153             :                 /* must be LOGICALREP_COLUMN_NULL */
    1154          96 :                 slot->tts_values[i] = (Datum) 0;
    1155          96 :                 slot->tts_isnull[i] = true;
    1156             :             }
    1157             : 
    1158             :             /* Reset attnum for error callback */
    1159      137668 :             apply_error_callback_arg.remote_attnum = -1;
    1160             :         }
    1161             :     }
    1162             : 
    1163             :     /* And finally, declare that "slot" contains a valid virtual tuple */
    1164       63848 :     ExecStoreVirtualTuple(slot);
    1165       63848 : }
    1166             : 
    1167             : /*
    1168             :  * Handle BEGIN message.
    1169             :  */
    1170             : static void
    1171         924 : apply_handle_begin(StringInfo s)
    1172             : {
    1173             :     LogicalRepBeginData begin_data;
    1174             : 
    1175             :     /* There must not be an active streaming transaction. */
    1176             :     Assert(!TransactionIdIsValid(stream_xid));
    1177             : 
    1178         924 :     logicalrep_read_begin(s, &begin_data);
    1179         924 :     set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
    1180             : 
    1181         924 :     remote_final_lsn = begin_data.final_lsn;
    1182             : 
    1183         924 :     maybe_start_skipping_changes(begin_data.final_lsn);
    1184             : 
    1185         924 :     in_remote_transaction = true;
    1186             : 
    1187         924 :     pgstat_report_activity(STATE_RUNNING, NULL);
    1188         924 : }
    1189             : 
    1190             : /*
    1191             :  * Handle COMMIT message.
    1192             :  *
    1193             :  * TODO, support tracking of multiple origins
    1194             :  */
    1195             : static void
    1196         858 : apply_handle_commit(StringInfo s)
    1197             : {
    1198             :     LogicalRepCommitData commit_data;
    1199             : 
    1200         858 :     logicalrep_read_commit(s, &commit_data);
    1201             : 
    1202         858 :     if (commit_data.commit_lsn != remote_final_lsn)
    1203           0 :         ereport(ERROR,
    1204             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1205             :                  errmsg_internal("incorrect commit LSN %X/%08X in commit message (expected %X/%08X)",
    1206             :                                  LSN_FORMAT_ARGS(commit_data.commit_lsn),
    1207             :                                  LSN_FORMAT_ARGS(remote_final_lsn))));
    1208             : 
    1209         858 :     apply_handle_commit_internal(&commit_data);
    1210             : 
    1211             :     /* Process any tables that are being synchronized in parallel. */
    1212         858 :     process_syncing_tables(commit_data.end_lsn);
    1213             : 
    1214         858 :     pgstat_report_activity(STATE_IDLE, NULL);
    1215         858 :     reset_apply_error_context_info();
    1216         858 : }
    1217             : 
    1218             : /*
    1219             :  * Handle BEGIN PREPARE message.
    1220             :  */
    1221             : static void
    1222          32 : apply_handle_begin_prepare(StringInfo s)
    1223             : {
    1224             :     LogicalRepPreparedTxnData begin_data;
    1225             : 
    1226             :     /* Tablesync should never receive prepare. */
    1227          32 :     if (am_tablesync_worker())
    1228           0 :         ereport(ERROR,
    1229             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1230             :                  errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
    1231             : 
    1232             :     /* There must not be an active streaming transaction. */
    1233             :     Assert(!TransactionIdIsValid(stream_xid));
    1234             : 
    1235          32 :     logicalrep_read_begin_prepare(s, &begin_data);
    1236          32 :     set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
    1237             : 
    1238          32 :     remote_final_lsn = begin_data.prepare_lsn;
    1239             : 
    1240          32 :     maybe_start_skipping_changes(begin_data.prepare_lsn);
    1241             : 
    1242          32 :     in_remote_transaction = true;
    1243             : 
    1244          32 :     pgstat_report_activity(STATE_RUNNING, NULL);
    1245          32 : }
    1246             : 
    1247             : /*
    1248             :  * Common function to prepare the GID.
    1249             :  */
    1250             : static void
    1251          46 : apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
    1252             : {
    1253             :     char        gid[GIDSIZE];
    1254             : 
    1255             :     /*
    1256             :      * Compute unique GID for two_phase transactions. We don't use GID of
    1257             :      * prepared transaction sent by server as that can lead to deadlock when
    1258             :      * we have multiple subscriptions from same node point to publications on
    1259             :      * the same node. See comments atop worker.c
    1260             :      */
    1261          46 :     TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
    1262             :                            gid, sizeof(gid));
    1263             : 
    1264             :     /*
    1265             :      * BeginTransactionBlock is necessary to balance the EndTransactionBlock
    1266             :      * called within the PrepareTransactionBlock below.
    1267             :      */
    1268          46 :     if (!IsTransactionBlock())
    1269             :     {
    1270          46 :         BeginTransactionBlock();
    1271          46 :         CommitTransactionCommand(); /* Completes the preceding Begin command. */
    1272             :     }
    1273             : 
    1274             :     /*
    1275             :      * Update origin state so we can restart streaming from correct position
    1276             :      * in case of crash.
    1277             :      */
    1278          46 :     replorigin_session_origin_lsn = prepare_data->end_lsn;
    1279          46 :     replorigin_session_origin_timestamp = prepare_data->prepare_time;
    1280             : 
    1281          46 :     PrepareTransactionBlock(gid);
    1282          46 : }
    1283             : 
    1284             : /*
    1285             :  * Handle PREPARE message.
    1286             :  */
    1287             : static void
    1288          30 : apply_handle_prepare(StringInfo s)
    1289             : {
    1290             :     LogicalRepPreparedTxnData prepare_data;
    1291             : 
    1292          30 :     logicalrep_read_prepare(s, &prepare_data);
    1293             : 
    1294          30 :     if (prepare_data.prepare_lsn != remote_final_lsn)
    1295           0 :         ereport(ERROR,
    1296             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1297             :                  errmsg_internal("incorrect prepare LSN %X/%08X in prepare message (expected %X/%08X)",
    1298             :                                  LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
    1299             :                                  LSN_FORMAT_ARGS(remote_final_lsn))));
    1300             : 
    1301             :     /*
    1302             :      * Unlike commit, here, we always prepare the transaction even though no
    1303             :      * change has happened in this transaction or all changes are skipped. It
    1304             :      * is done this way because at commit prepared time, we won't know whether
    1305             :      * we have skipped preparing a transaction because of those reasons.
    1306             :      *
    1307             :      * XXX, We can optimize such that at commit prepared time, we first check
    1308             :      * whether we have prepared the transaction or not but that doesn't seem
    1309             :      * worthwhile because such cases shouldn't be common.
    1310             :      */
    1311          30 :     begin_replication_step();
    1312             : 
    1313          30 :     apply_handle_prepare_internal(&prepare_data);
    1314             : 
    1315          30 :     end_replication_step();
    1316          30 :     CommitTransactionCommand();
    1317          28 :     pgstat_report_stat(false);
    1318             : 
    1319             :     /*
    1320             :      * It is okay not to set the local_end LSN for the prepare because we
    1321             :      * always flush the prepare record. So, we can send the acknowledgment of
    1322             :      * the remote_end LSN as soon as prepare is finished.
    1323             :      *
    1324             :      * XXX For the sake of consistency with commit, we could have set it with
    1325             :      * the LSN of prepare but as of now we don't track that value similar to
    1326             :      * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
    1327             :      * it.
    1328             :      */
    1329          28 :     store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
    1330             : 
    1331          28 :     in_remote_transaction = false;
    1332             : 
    1333             :     /* Process any tables that are being synchronized in parallel. */
    1334          28 :     process_syncing_tables(prepare_data.end_lsn);
    1335             : 
    1336             :     /*
    1337             :      * Since we have already prepared the transaction, in a case where the
    1338             :      * server crashes before clearing the subskiplsn, it will be left but the
    1339             :      * transaction won't be resent. But that's okay because it's a rare case
    1340             :      * and the subskiplsn will be cleared when finishing the next transaction.
    1341             :      */
    1342          28 :     stop_skipping_changes();
    1343          28 :     clear_subscription_skip_lsn(prepare_data.prepare_lsn);
    1344             : 
    1345          28 :     pgstat_report_activity(STATE_IDLE, NULL);
    1346          28 :     reset_apply_error_context_info();
    1347          28 : }
    1348             : 
    1349             : /*
    1350             :  * Handle a COMMIT PREPARED of a previously PREPARED transaction.
    1351             :  *
    1352             :  * Note that we don't need to wait here if the transaction was prepared in a
    1353             :  * parallel apply worker. In that case, we have already waited for the prepare
    1354             :  * to finish in apply_handle_stream_prepare() which will ensure all the
    1355             :  * operations in that transaction have happened in the subscriber, so no
    1356             :  * concurrent transaction can cause deadlock or transaction dependency issues.
    1357             :  */
    1358             : static void
    1359          40 : apply_handle_commit_prepared(StringInfo s)
    1360             : {
    1361             :     LogicalRepCommitPreparedTxnData prepare_data;
    1362             :     char        gid[GIDSIZE];
    1363             : 
    1364          40 :     logicalrep_read_commit_prepared(s, &prepare_data);
    1365          40 :     set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
    1366             : 
    1367             :     /* Compute GID for two_phase transactions. */
    1368          40 :     TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
    1369             :                            gid, sizeof(gid));
    1370             : 
    1371             :     /* There is no transaction when COMMIT PREPARED is called */
    1372          40 :     begin_replication_step();
    1373             : 
    1374             :     /*
    1375             :      * Update origin state so we can restart streaming from correct position
    1376             :      * in case of crash.
    1377             :      */
    1378          40 :     replorigin_session_origin_lsn = prepare_data.end_lsn;
    1379          40 :     replorigin_session_origin_timestamp = prepare_data.commit_time;
    1380             : 
    1381          40 :     FinishPreparedTransaction(gid, true);
    1382          40 :     end_replication_step();
    1383          40 :     CommitTransactionCommand();
    1384          40 :     pgstat_report_stat(false);
    1385             : 
    1386          40 :     store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
    1387          40 :     in_remote_transaction = false;
    1388             : 
    1389             :     /* Process any tables that are being synchronized in parallel. */
    1390          40 :     process_syncing_tables(prepare_data.end_lsn);
    1391             : 
    1392          40 :     clear_subscription_skip_lsn(prepare_data.end_lsn);
    1393             : 
    1394          40 :     pgstat_report_activity(STATE_IDLE, NULL);
    1395          40 :     reset_apply_error_context_info();
    1396          40 : }
    1397             : 
    1398             : /*
    1399             :  * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
    1400             :  *
    1401             :  * Note that we don't need to wait here if the transaction was prepared in a
    1402             :  * parallel apply worker. In that case, we have already waited for the prepare
    1403             :  * to finish in apply_handle_stream_prepare() which will ensure all the
    1404             :  * operations in that transaction have happened in the subscriber, so no
    1405             :  * concurrent transaction can cause deadlock or transaction dependency issues.
    1406             :  */
    1407             : static void
    1408          10 : apply_handle_rollback_prepared(StringInfo s)
    1409             : {
    1410             :     LogicalRepRollbackPreparedTxnData rollback_data;
    1411             :     char        gid[GIDSIZE];
    1412             : 
    1413          10 :     logicalrep_read_rollback_prepared(s, &rollback_data);
    1414          10 :     set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
    1415             : 
    1416             :     /* Compute GID for two_phase transactions. */
    1417          10 :     TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
    1418             :                            gid, sizeof(gid));
    1419             : 
    1420             :     /*
    1421             :      * It is possible that we haven't received prepare because it occurred
    1422             :      * before walsender reached a consistent point or the two_phase was still
    1423             :      * not enabled by that time, so in such cases, we need to skip rollback
    1424             :      * prepared.
    1425             :      */
    1426          10 :     if (LookupGXact(gid, rollback_data.prepare_end_lsn,
    1427             :                     rollback_data.prepare_time))
    1428             :     {
    1429             :         /*
    1430             :          * Update origin state so we can restart streaming from correct
    1431             :          * position in case of crash.
    1432             :          */
    1433          10 :         replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
    1434          10 :         replorigin_session_origin_timestamp = rollback_data.rollback_time;
    1435             : 
    1436             :         /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
    1437          10 :         begin_replication_step();
    1438          10 :         FinishPreparedTransaction(gid, false);
    1439          10 :         end_replication_step();
    1440          10 :         CommitTransactionCommand();
    1441             : 
    1442          10 :         clear_subscription_skip_lsn(rollback_data.rollback_end_lsn);
    1443             :     }
    1444             : 
    1445          10 :     pgstat_report_stat(false);
    1446             : 
    1447             :     /*
    1448             :      * It is okay not to set the local_end LSN for the rollback of prepared
    1449             :      * transaction because we always flush the WAL record for it. See
    1450             :      * apply_handle_prepare.
    1451             :      */
    1452          10 :     store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr);
    1453          10 :     in_remote_transaction = false;
    1454             : 
    1455             :     /* Process any tables that are being synchronized in parallel. */
    1456          10 :     process_syncing_tables(rollback_data.rollback_end_lsn);
    1457             : 
    1458          10 :     pgstat_report_activity(STATE_IDLE, NULL);
    1459          10 :     reset_apply_error_context_info();
    1460          10 : }
    1461             : 
    1462             : /*
    1463             :  * Handle STREAM PREPARE.
    1464             :  */
    1465             : static void
    1466          22 : apply_handle_stream_prepare(StringInfo s)
    1467             : {
    1468             :     LogicalRepPreparedTxnData prepare_data;
    1469             :     ParallelApplyWorkerInfo *winfo;
    1470             :     TransApplyAction apply_action;
    1471             : 
    1472             :     /* Save the message before it is consumed. */
    1473          22 :     StringInfoData original_msg = *s;
    1474             : 
    1475          22 :     if (in_streamed_transaction)
    1476           0 :         ereport(ERROR,
    1477             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1478             :                  errmsg_internal("STREAM PREPARE message without STREAM STOP")));
    1479             : 
    1480             :     /* Tablesync should never receive prepare. */
    1481          22 :     if (am_tablesync_worker())
    1482           0 :         ereport(ERROR,
    1483             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1484             :                  errmsg_internal("tablesync worker received a STREAM PREPARE message")));
    1485             : 
    1486          22 :     logicalrep_read_stream_prepare(s, &prepare_data);
    1487          22 :     set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
    1488             : 
    1489          22 :     apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
    1490             : 
    1491          22 :     switch (apply_action)
    1492             :     {
    1493          10 :         case TRANS_LEADER_APPLY:
    1494             : 
    1495             :             /*
    1496             :              * The transaction has been serialized to file, so replay all the
    1497             :              * spooled operations.
    1498             :              */
    1499          10 :             apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
    1500             :                                    prepare_data.xid, prepare_data.prepare_lsn);
    1501             : 
    1502             :             /* Mark the transaction as prepared. */
    1503          10 :             apply_handle_prepare_internal(&prepare_data);
    1504             : 
    1505          10 :             CommitTransactionCommand();
    1506             : 
    1507             :             /*
    1508             :              * It is okay not to set the local_end LSN for the prepare because
    1509             :              * we always flush the prepare record. See apply_handle_prepare.
    1510             :              */
    1511          10 :             store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
    1512             : 
    1513          10 :             in_remote_transaction = false;
    1514             : 
    1515             :             /* Unlink the files with serialized changes and subxact info. */
    1516          10 :             stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
    1517             : 
    1518          10 :             elog(DEBUG1, "finished processing the STREAM PREPARE command");
    1519          10 :             break;
    1520             : 
    1521           4 :         case TRANS_LEADER_SEND_TO_PARALLEL:
    1522             :             Assert(winfo);
    1523             : 
    1524           4 :             if (pa_send_data(winfo, s->len, s->data))
    1525             :             {
    1526             :                 /* Finish processing the streaming transaction. */
    1527           4 :                 pa_xact_finish(winfo, prepare_data.end_lsn);
    1528           4 :                 break;
    1529             :             }
    1530             : 
    1531             :             /*
    1532             :              * Switch to serialize mode when we are not able to send the
    1533             :              * change to parallel apply worker.
    1534             :              */
    1535           0 :             pa_switch_to_partial_serialize(winfo, true);
    1536             : 
    1537             :             /* fall through */
    1538           2 :         case TRANS_LEADER_PARTIAL_SERIALIZE:
    1539             :             Assert(winfo);
    1540             : 
    1541           2 :             stream_open_and_write_change(prepare_data.xid,
    1542             :                                          LOGICAL_REP_MSG_STREAM_PREPARE,
    1543             :                                          &original_msg);
    1544             : 
    1545           2 :             pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
    1546             : 
    1547             :             /* Finish processing the streaming transaction. */
    1548           2 :             pa_xact_finish(winfo, prepare_data.end_lsn);
    1549           2 :             break;
    1550             : 
    1551           6 :         case TRANS_PARALLEL_APPLY:
    1552             : 
    1553             :             /*
    1554             :              * If the parallel apply worker is applying spooled messages then
    1555             :              * close the file before preparing.
    1556             :              */
    1557           6 :             if (stream_fd)
    1558           2 :                 stream_close_file();
    1559             : 
    1560           6 :             begin_replication_step();
    1561             : 
    1562             :             /* Mark the transaction as prepared. */
    1563           6 :             apply_handle_prepare_internal(&prepare_data);
    1564             : 
    1565           6 :             end_replication_step();
    1566             : 
    1567           6 :             CommitTransactionCommand();
    1568             : 
    1569             :             /*
    1570             :              * It is okay not to set the local_end LSN for the prepare because
    1571             :              * we always flush the prepare record. See apply_handle_prepare.
    1572             :              */
    1573           6 :             MyParallelShared->last_commit_end = InvalidXLogRecPtr;
    1574             : 
    1575           6 :             pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
    1576           6 :             pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock);
    1577             : 
    1578           6 :             pa_reset_subtrans();
    1579             : 
    1580           6 :             elog(DEBUG1, "finished processing the STREAM PREPARE command");
    1581           6 :             break;
    1582             : 
    1583           0 :         default:
    1584           0 :             elog(ERROR, "unexpected apply action: %d", (int) apply_action);
    1585             :             break;
    1586             :     }
    1587             : 
    1588          22 :     pgstat_report_stat(false);
    1589             : 
    1590             :     /* Process any tables that are being synchronized in parallel. */
    1591          22 :     process_syncing_tables(prepare_data.end_lsn);
    1592             : 
    1593             :     /*
    1594             :      * Similar to prepare case, the subskiplsn could be left in a case of
    1595             :      * server crash but it's okay. See the comments in apply_handle_prepare().
    1596             :      */
    1597          22 :     stop_skipping_changes();
    1598          22 :     clear_subscription_skip_lsn(prepare_data.prepare_lsn);
    1599             : 
    1600          22 :     pgstat_report_activity(STATE_IDLE, NULL);
    1601             : 
    1602          22 :     reset_apply_error_context_info();
    1603          22 : }
    1604             : 
    1605             : /*
    1606             :  * Handle ORIGIN message.
    1607             :  *
    1608             :  * TODO, support tracking of multiple origins
    1609             :  */
    1610             : static void
    1611          18 : apply_handle_origin(StringInfo s)
    1612             : {
    1613             :     /*
    1614             :      * ORIGIN message can only come inside streaming transaction or inside
    1615             :      * remote transaction and before any actual writes.
    1616             :      */
    1617          18 :     if (!in_streamed_transaction &&
    1618          28 :         (!in_remote_transaction ||
    1619          14 :          (IsTransactionState() && !am_tablesync_worker())))
    1620           0 :         ereport(ERROR,
    1621             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1622             :                  errmsg_internal("ORIGIN message sent out of order")));
    1623          18 : }
    1624             : 
    1625             : /*
    1626             :  * Initialize fileset (if not already done).
    1627             :  *
    1628             :  * Create a new file when first_segment is true, otherwise open the existing
    1629             :  * file.
    1630             :  */
    1631             : void
    1632         724 : stream_start_internal(TransactionId xid, bool first_segment)
    1633             : {
    1634         724 :     begin_replication_step();
    1635             : 
    1636             :     /*
    1637             :      * Initialize the worker's stream_fileset if we haven't yet. This will be
    1638             :      * used for the entire duration of the worker so create it in a permanent
    1639             :      * context. We create this on the very first streaming message from any
    1640             :      * transaction and then use it for this and other streaming transactions.
    1641             :      * Now, we could create a fileset at the start of the worker as well but
    1642             :      * then we won't be sure that it will ever be used.
    1643             :      */
    1644         724 :     if (!MyLogicalRepWorker->stream_fileset)
    1645             :     {
    1646             :         MemoryContext oldctx;
    1647             : 
    1648          28 :         oldctx = MemoryContextSwitchTo(ApplyContext);
    1649             : 
    1650          28 :         MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
    1651          28 :         FileSetInit(MyLogicalRepWorker->stream_fileset);
    1652             : 
    1653          28 :         MemoryContextSwitchTo(oldctx);
    1654             :     }
    1655             : 
    1656             :     /* Open the spool file for this transaction. */
    1657         724 :     stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
    1658             : 
    1659             :     /* If this is not the first segment, open existing subxact file. */
    1660         724 :     if (!first_segment)
    1661         660 :         subxact_info_read(MyLogicalRepWorker->subid, xid);
    1662             : 
    1663         724 :     end_replication_step();
    1664         724 : }
    1665             : 
    1666             : /*
    1667             :  * Handle STREAM START message.
    1668             :  */
    1669             : static void
    1670        1676 : apply_handle_stream_start(StringInfo s)
    1671             : {
    1672             :     bool        first_segment;
    1673             :     ParallelApplyWorkerInfo *winfo;
    1674             :     TransApplyAction apply_action;
    1675             : 
    1676             :     /* Save the message before it is consumed. */
    1677        1676 :     StringInfoData original_msg = *s;
    1678             : 
    1679        1676 :     if (in_streamed_transaction)
    1680           0 :         ereport(ERROR,
    1681             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1682             :                  errmsg_internal("duplicate STREAM START message")));
    1683             : 
    1684             :     /* There must not be an active streaming transaction. */
    1685             :     Assert(!TransactionIdIsValid(stream_xid));
    1686             : 
    1687             :     /* notify handle methods we're processing a remote transaction */
    1688        1676 :     in_streamed_transaction = true;
    1689             : 
    1690             :     /* extract XID of the top-level transaction */
    1691        1676 :     stream_xid = logicalrep_read_stream_start(s, &first_segment);
    1692             : 
    1693        1676 :     if (!TransactionIdIsValid(stream_xid))
    1694           0 :         ereport(ERROR,
    1695             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1696             :                  errmsg_internal("invalid transaction ID in streamed replication transaction")));
    1697             : 
    1698        1676 :     set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
    1699             : 
    1700             :     /* Try to allocate a worker for the streaming transaction. */
    1701        1676 :     if (first_segment)
    1702         164 :         pa_allocate_worker(stream_xid);
    1703             : 
    1704        1676 :     apply_action = get_transaction_apply_action(stream_xid, &winfo);
    1705             : 
    1706        1676 :     switch (apply_action)
    1707             :     {
    1708         684 :         case TRANS_LEADER_SERIALIZE:
    1709             : 
    1710             :             /*
    1711             :              * Function stream_start_internal starts a transaction. This
    1712             :              * transaction will be committed on the stream stop unless it is a
    1713             :              * tablesync worker in which case it will be committed after
    1714             :              * processing all the messages. We need this transaction for
    1715             :              * handling the BufFile, used for serializing the streaming data
    1716             :              * and subxact info.
    1717             :              */
    1718         684 :             stream_start_internal(stream_xid, first_segment);
    1719         684 :             break;
    1720             : 
    1721         484 :         case TRANS_LEADER_SEND_TO_PARALLEL:
    1722             :             Assert(winfo);
    1723             : 
    1724             :             /*
    1725             :              * Once we start serializing the changes, the parallel apply
    1726             :              * worker will wait for the leader to release the stream lock
    1727             :              * until the end of the transaction. So, we don't need to release
    1728             :              * the lock or increment the stream count in that case.
    1729             :              */
    1730         484 :             if (pa_send_data(winfo, s->len, s->data))
    1731             :             {
    1732             :                 /*
    1733             :                  * Unlock the shared object lock so that the parallel apply
    1734             :                  * worker can continue to receive changes.
    1735             :                  */
    1736         476 :                 if (!first_segment)
    1737         430 :                     pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
    1738             : 
    1739             :                 /*
    1740             :                  * Increment the number of streaming blocks waiting to be
    1741             :                  * processed by parallel apply worker.
    1742             :                  */
    1743         476 :                 pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
    1744             : 
    1745             :                 /* Cache the parallel apply worker for this transaction. */
    1746         476 :                 pa_set_stream_apply_worker(winfo);
    1747         476 :                 break;
    1748             :             }
    1749             : 
    1750             :             /*
    1751             :              * Switch to serialize mode when we are not able to send the
    1752             :              * change to parallel apply worker.
    1753             :              */
    1754           8 :             pa_switch_to_partial_serialize(winfo, !first_segment);
    1755             : 
    1756             :             /* fall through */
    1757          30 :         case TRANS_LEADER_PARTIAL_SERIALIZE:
    1758             :             Assert(winfo);
    1759             : 
    1760             :             /*
    1761             :              * Open the spool file unless it was already opened when switching
    1762             :              * to serialize mode. The transaction started in
    1763             :              * stream_start_internal will be committed on the stream stop.
    1764             :              */
    1765          30 :             if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
    1766          22 :                 stream_start_internal(stream_xid, first_segment);
    1767             : 
    1768          30 :             stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg);
    1769             : 
    1770             :             /* Cache the parallel apply worker for this transaction. */
    1771          30 :             pa_set_stream_apply_worker(winfo);
    1772          30 :             break;
    1773             : 
    1774         486 :         case TRANS_PARALLEL_APPLY:
    1775         486 :             if (first_segment)
    1776             :             {
    1777             :                 /* Hold the lock until the end of the transaction. */
    1778          54 :                 pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock);
    1779          54 :                 pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED);
    1780             : 
    1781             :                 /*
    1782             :                  * Signal the leader apply worker, as it may be waiting for
    1783             :                  * us.
    1784             :                  */
    1785          54 :                 logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
    1786             :             }
    1787             : 
    1788         486 :             parallel_stream_nchanges = 0;
    1789         486 :             break;
    1790             : 
    1791           0 :         default:
    1792           0 :             elog(ERROR, "unexpected apply action: %d", (int) apply_action);
    1793             :             break;
    1794             :     }
    1795             : 
    1796        1676 :     pgstat_report_activity(STATE_RUNNING, NULL);
    1797        1676 : }
    1798             : 
    1799             : /*
    1800             :  * Update the information about subxacts and close the file.
    1801             :  *
    1802             :  * This function should be called when the stream_start_internal function has
    1803             :  * been called.
    1804             :  */
    1805             : void
    1806         724 : stream_stop_internal(TransactionId xid)
    1807             : {
    1808             :     /*
    1809             :      * Serialize information about subxacts for the toplevel transaction, then
    1810             :      * close the stream messages spool file.
    1811             :      */
    1812         724 :     subxact_info_write(MyLogicalRepWorker->subid, xid);
    1813         724 :     stream_close_file();
    1814             : 
    1815             :     /* We must be in a valid transaction state */
    1816             :     Assert(IsTransactionState());
    1817             : 
    1818             :     /* Commit the per-stream transaction */
    1819         724 :     CommitTransactionCommand();
    1820             : 
    1821             :     /* Reset per-stream context */
    1822         724 :     MemoryContextReset(LogicalStreamingContext);
    1823         724 : }
    1824             : 
    1825             : /*
    1826             :  * Handle STREAM STOP message.
    1827             :  */
    1828             : static void
    1829        1674 : apply_handle_stream_stop(StringInfo s)
    1830             : {
    1831             :     ParallelApplyWorkerInfo *winfo;
    1832             :     TransApplyAction apply_action;
    1833             : 
    1834        1674 :     if (!in_streamed_transaction)
    1835           0 :         ereport(ERROR,
    1836             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1837             :                  errmsg_internal("STREAM STOP message without STREAM START")));
    1838             : 
    1839        1674 :     apply_action = get_transaction_apply_action(stream_xid, &winfo);
    1840             : 
    1841        1674 :     switch (apply_action)
    1842             :     {
    1843         684 :         case TRANS_LEADER_SERIALIZE:
    1844         684 :             stream_stop_internal(stream_xid);
    1845         684 :             break;
    1846             : 
    1847         476 :         case TRANS_LEADER_SEND_TO_PARALLEL:
    1848             :             Assert(winfo);
    1849             : 
    1850             :             /*
    1851             :              * Lock before sending the STREAM_STOP message so that the leader
    1852             :              * can hold the lock first and the parallel apply worker will wait
    1853             :              * for leader to release the lock. See Locking Considerations atop
    1854             :              * applyparallelworker.c.
    1855             :              */
    1856         476 :             pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
    1857             : 
    1858         476 :             if (pa_send_data(winfo, s->len, s->data))
    1859             :             {
    1860         476 :                 pa_set_stream_apply_worker(NULL);
    1861         476 :                 break;
    1862             :             }
    1863             : 
    1864             :             /*
    1865             :              * Switch to serialize mode when we are not able to send the
    1866             :              * change to parallel apply worker.
    1867             :              */
    1868           0 :             pa_switch_to_partial_serialize(winfo, true);
    1869             : 
    1870             :             /* fall through */
    1871          30 :         case TRANS_LEADER_PARTIAL_SERIALIZE:
    1872          30 :             stream_write_change(LOGICAL_REP_MSG_STREAM_STOP, s);
    1873          30 :             stream_stop_internal(stream_xid);
    1874          30 :             pa_set_stream_apply_worker(NULL);
    1875          30 :             break;
    1876             : 
    1877         484 :         case TRANS_PARALLEL_APPLY:
    1878         484 :             elog(DEBUG1, "applied %u changes in the streaming chunk",
    1879             :                  parallel_stream_nchanges);
    1880             : 
    1881             :             /*
    1882             :              * By the time parallel apply worker is processing the changes in
    1883             :              * the current streaming block, the leader apply worker may have
    1884             :              * sent multiple streaming blocks. This can lead to parallel apply
    1885             :              * worker start waiting even when there are more chunk of streams
    1886             :              * in the queue. So, try to lock only if there is no message left
    1887             :              * in the queue. See Locking Considerations atop
    1888             :              * applyparallelworker.c.
    1889             :              *
    1890             :              * Note that here we have a race condition where we can start
    1891             :              * waiting even when there are pending streaming chunks. This can
    1892             :              * happen if the leader sends another streaming block and acquires
    1893             :              * the stream lock again after the parallel apply worker checks
    1894             :              * that there is no pending streaming block and before it actually
    1895             :              * starts waiting on a lock. We can handle this case by not
    1896             :              * allowing the leader to increment the stream block count during
    1897             :              * the time parallel apply worker acquires the lock but it is not
    1898             :              * clear whether that is worth the complexity.
    1899             :              *
    1900             :              * Now, if this missed chunk contains rollback to savepoint, then
    1901             :              * there is a risk of deadlock which probably shouldn't happen
    1902             :              * after restart.
    1903             :              */
    1904         484 :             pa_decr_and_wait_stream_block();
    1905         480 :             break;
    1906             : 
    1907           0 :         default:
    1908           0 :             elog(ERROR, "unexpected apply action: %d", (int) apply_action);
    1909             :             break;
    1910             :     }
    1911             : 
    1912        1670 :     in_streamed_transaction = false;
    1913        1670 :     stream_xid = InvalidTransactionId;
    1914             : 
    1915             :     /*
    1916             :      * The parallel apply worker could be in a transaction in which case we
    1917             :      * need to report the state as STATE_IDLEINTRANSACTION.
    1918             :      */
    1919        1670 :     if (IsTransactionOrTransactionBlock())
    1920         480 :         pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL);
    1921             :     else
    1922        1190 :         pgstat_report_activity(STATE_IDLE, NULL);
    1923             : 
    1924        1670 :     reset_apply_error_context_info();
    1925        1670 : }
    1926             : 
    1927             : /*
    1928             :  * Helper function to handle STREAM ABORT message when the transaction was
    1929             :  * serialized to file.
    1930             :  */
    1931             : static void
    1932          28 : stream_abort_internal(TransactionId xid, TransactionId subxid)
    1933             : {
    1934             :     /*
    1935             :      * If the two XIDs are the same, it's in fact abort of toplevel xact, so
    1936             :      * just delete the files with serialized info.
    1937             :      */
    1938          28 :     if (xid == subxid)
    1939           2 :         stream_cleanup_files(MyLogicalRepWorker->subid, xid);
    1940             :     else
    1941             :     {
    1942             :         /*
    1943             :          * OK, so it's a subxact. We need to read the subxact file for the
    1944             :          * toplevel transaction, determine the offset tracked for the subxact,
    1945             :          * and truncate the file with changes. We also remove the subxacts
    1946             :          * with higher offsets (or rather higher XIDs).
    1947             :          *
    1948             :          * We intentionally scan the array from the tail, because we're likely
    1949             :          * aborting a change for the most recent subtransactions.
    1950             :          *
    1951             :          * We can't use the binary search here as subxact XIDs won't
    1952             :          * necessarily arrive in sorted order, consider the case where we have
    1953             :          * released the savepoint for multiple subtransactions and then
    1954             :          * performed rollback to savepoint for one of the earlier
    1955             :          * sub-transaction.
    1956             :          */
    1957             :         int64       i;
    1958             :         int64       subidx;
    1959             :         BufFile    *fd;
    1960          26 :         bool        found = false;
    1961             :         char        path[MAXPGPATH];
    1962             : 
    1963          26 :         subidx = -1;
    1964          26 :         begin_replication_step();
    1965          26 :         subxact_info_read(MyLogicalRepWorker->subid, xid);
    1966             : 
    1967          30 :         for (i = subxact_data.nsubxacts; i > 0; i--)
    1968             :         {
    1969          22 :             if (subxact_data.subxacts[i - 1].xid == subxid)
    1970             :             {
    1971          18 :                 subidx = (i - 1);
    1972          18 :                 found = true;
    1973          18 :                 break;
    1974             :             }
    1975             :         }
    1976             : 
    1977             :         /*
    1978             :          * If it's an empty sub-transaction then we will not find the subxid
    1979             :          * here so just cleanup the subxact info and return.
    1980             :          */
    1981          26 :         if (!found)
    1982             :         {
    1983             :             /* Cleanup the subxact info */
    1984           8 :             cleanup_subxact_info();
    1985           8 :             end_replication_step();
    1986           8 :             CommitTransactionCommand();
    1987           8 :             return;
    1988             :         }
    1989             : 
    1990             :         /* open the changes file */
    1991          18 :         changes_filename(path, MyLogicalRepWorker->subid, xid);
    1992          18 :         fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
    1993             :                                 O_RDWR, false);
    1994             : 
    1995             :         /* OK, truncate the file at the right offset */
    1996          18 :         BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
    1997          18 :                                subxact_data.subxacts[subidx].offset);
    1998          18 :         BufFileClose(fd);
    1999             : 
    2000             :         /* discard the subxacts added later */
    2001          18 :         subxact_data.nsubxacts = subidx;
    2002             : 
    2003             :         /* write the updated subxact list */
    2004          18 :         subxact_info_write(MyLogicalRepWorker->subid, xid);
    2005             : 
    2006          18 :         end_replication_step();
    2007          18 :         CommitTransactionCommand();
    2008             :     }
    2009             : }
    2010             : 
    2011             : /*
    2012             :  * Handle STREAM ABORT message.
    2013             :  */
    2014             : static void
    2015          76 : apply_handle_stream_abort(StringInfo s)
    2016             : {
    2017             :     TransactionId xid;
    2018             :     TransactionId subxid;
    2019             :     LogicalRepStreamAbortData abort_data;
    2020             :     ParallelApplyWorkerInfo *winfo;
    2021             :     TransApplyAction apply_action;
    2022             : 
    2023             :     /* Save the message before it is consumed. */
    2024          76 :     StringInfoData original_msg = *s;
    2025             :     bool        toplevel_xact;
    2026             : 
    2027          76 :     if (in_streamed_transaction)
    2028           0 :         ereport(ERROR,
    2029             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    2030             :                  errmsg_internal("STREAM ABORT message without STREAM STOP")));
    2031             : 
    2032             :     /* We receive abort information only when we can apply in parallel. */
    2033          76 :     logicalrep_read_stream_abort(s, &abort_data,
    2034          76 :                                  MyLogicalRepWorker->parallel_apply);
    2035             : 
    2036          76 :     xid = abort_data.xid;
    2037          76 :     subxid = abort_data.subxid;
    2038          76 :     toplevel_xact = (xid == subxid);
    2039             : 
    2040          76 :     set_apply_error_context_xact(subxid, abort_data.abort_lsn);
    2041             : 
    2042          76 :     apply_action = get_transaction_apply_action(xid, &winfo);
    2043             : 
    2044          76 :     switch (apply_action)
    2045             :     {
    2046          28 :         case TRANS_LEADER_APPLY:
    2047             : 
    2048             :             /*
    2049             :              * We are in the leader apply worker and the transaction has been
    2050             :              * serialized to file.
    2051             :              */
    2052          28 :             stream_abort_internal(xid, subxid);
    2053             : 
    2054          28 :             elog(DEBUG1, "finished processing the STREAM ABORT command");
    2055          28 :             break;
    2056             : 
    2057          20 :         case TRANS_LEADER_SEND_TO_PARALLEL:
    2058             :             Assert(winfo);
    2059             : 
    2060             :             /*
    2061             :              * For the case of aborting the subtransaction, we increment the
    2062             :              * number of streaming blocks and take the lock again before
    2063             :              * sending the STREAM_ABORT to ensure that the parallel apply
    2064             :              * worker will wait on the lock for the next set of changes after
    2065             :              * processing the STREAM_ABORT message if it is not already
    2066             :              * waiting for STREAM_STOP message.
    2067             :              *
    2068             :              * It is important to perform this locking before sending the
    2069             :              * STREAM_ABORT message so that the leader can hold the lock first
    2070             :              * and the parallel apply worker will wait for the leader to
    2071             :              * release the lock. This is the same as what we do in
    2072             :              * apply_handle_stream_stop. See Locking Considerations atop
    2073             :              * applyparallelworker.c.
    2074             :              */
    2075          20 :             if (!toplevel_xact)
    2076             :             {
    2077          18 :                 pa_unlock_stream(xid, AccessExclusiveLock);
    2078          18 :                 pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
    2079          18 :                 pa_lock_stream(xid, AccessExclusiveLock);
    2080             :             }
    2081             : 
    2082          20 :             if (pa_send_data(winfo, s->len, s->data))
    2083             :             {
    2084             :                 /*
    2085             :                  * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
    2086             :                  * wait here for the parallel apply worker to finish as that
    2087             :                  * is not required to maintain the commit order and won't have
    2088             :                  * the risk of failures due to transaction dependencies and
    2089             :                  * deadlocks. However, it is possible that before the parallel
    2090             :                  * worker finishes and we clear the worker info, the xid
    2091             :                  * wraparound happens on the upstream and a new transaction
    2092             :                  * with the same xid can appear and that can lead to duplicate
    2093             :                  * entries in ParallelApplyTxnHash. Yet another problem could
    2094             :                  * be that we may have serialized the changes in partial
    2095             :                  * serialize mode and the file containing xact changes may
    2096             :                  * already exist, and after xid wraparound trying to create
    2097             :                  * the file for the same xid can lead to an error. To avoid
    2098             :                  * these problems, we decide to wait for the aborts to finish.
    2099             :                  *
    2100             :                  * Note, it is okay to not update the flush location position
    2101             :                  * for aborts as in worst case that means such a transaction
    2102             :                  * won't be sent again after restart.
    2103             :                  */
    2104          20 :                 if (toplevel_xact)
    2105           2 :                     pa_xact_finish(winfo, InvalidXLogRecPtr);
    2106             : 
    2107          20 :                 break;
    2108             :             }
    2109             : 
    2110             :             /*
    2111             :              * Switch to serialize mode when we are not able to send the
    2112             :              * change to parallel apply worker.
    2113             :              */
    2114           0 :             pa_switch_to_partial_serialize(winfo, true);
    2115             : 
    2116             :             /* fall through */
    2117           4 :         case TRANS_LEADER_PARTIAL_SERIALIZE:
    2118             :             Assert(winfo);
    2119             : 
    2120             :             /*
    2121             :              * Parallel apply worker might have applied some changes, so write
    2122             :              * the STREAM_ABORT message so that it can rollback the
    2123             :              * subtransaction if needed.
    2124             :              */
    2125           4 :             stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_ABORT,
    2126             :                                          &original_msg);
    2127             : 
    2128           4 :             if (toplevel_xact)
    2129             :             {
    2130           2 :                 pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
    2131           2 :                 pa_xact_finish(winfo, InvalidXLogRecPtr);
    2132             :             }
    2133           4 :             break;
    2134             : 
    2135          24 :         case TRANS_PARALLEL_APPLY:
    2136             : 
    2137             :             /*
    2138             :              * If the parallel apply worker is applying spooled messages then
    2139             :              * close the file before aborting.
    2140             :              */
    2141          24 :             if (toplevel_xact && stream_fd)
    2142           2 :                 stream_close_file();
    2143             : 
    2144          24 :             pa_stream_abort(&abort_data);
    2145             : 
    2146             :             /*
    2147             :              * We need to wait after processing rollback to savepoint for the
    2148             :              * next set of changes.
    2149             :              *
    2150             :              * We have a race condition here due to which we can start waiting
    2151             :              * here when there are more chunk of streams in the queue. See
    2152             :              * apply_handle_stream_stop.
    2153             :              */
    2154          24 :             if (!toplevel_xact)
    2155          20 :                 pa_decr_and_wait_stream_block();
    2156             : 
    2157          24 :             elog(DEBUG1, "finished processing the STREAM ABORT command");
    2158          24 :             break;
    2159             : 
    2160           0 :         default:
    2161           0 :             elog(ERROR, "unexpected apply action: %d", (int) apply_action);
    2162             :             break;
    2163             :     }
    2164             : 
    2165          76 :     reset_apply_error_context_info();
    2166          76 : }
    2167             : 
    2168             : /*
    2169             :  * Ensure that the passed location is fileset's end.
    2170             :  */
    2171             : static void
    2172           8 : ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
    2173             :                     off_t offset)
    2174             : {
    2175             :     char        path[MAXPGPATH];
    2176             :     BufFile    *fd;
    2177             :     int         last_fileno;
    2178             :     off_t       last_offset;
    2179             : 
    2180             :     Assert(!IsTransactionState());
    2181             : 
    2182           8 :     begin_replication_step();
    2183             : 
    2184           8 :     changes_filename(path, MyLogicalRepWorker->subid, xid);
    2185             : 
    2186           8 :     fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
    2187             : 
    2188           8 :     BufFileSeek(fd, 0, 0, SEEK_END);
    2189           8 :     BufFileTell(fd, &last_fileno, &last_offset);
    2190             : 
    2191           8 :     BufFileClose(fd);
    2192             : 
    2193           8 :     end_replication_step();
    2194             : 
    2195           8 :     if (last_fileno != fileno || last_offset != offset)
    2196           0 :         elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
    2197             :              path);
    2198           8 : }
    2199             : 
    2200             : /*
    2201             :  * Common spoolfile processing.
    2202             :  */
    2203             : void
    2204          62 : apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
    2205             :                        XLogRecPtr lsn)
    2206             : {
    2207             :     int         nchanges;
    2208             :     char        path[MAXPGPATH];
    2209          62 :     char       *buffer = NULL;
    2210             :     MemoryContext oldcxt;
    2211             :     ResourceOwner oldowner;
    2212             :     int         fileno;
    2213             :     off_t       offset;
    2214             : 
    2215          62 :     if (!am_parallel_apply_worker())
    2216          54 :         maybe_start_skipping_changes(lsn);
    2217             : 
    2218             :     /* Make sure we have an open transaction */
    2219          62 :     begin_replication_step();
    2220             : 
    2221             :     /*
    2222             :      * Allocate file handle and memory required to process all the messages in
    2223             :      * TopTransactionContext to avoid them getting reset after each message is
    2224             :      * processed.
    2225             :      */
    2226          62 :     oldcxt = MemoryContextSwitchTo(TopTransactionContext);
    2227             : 
    2228             :     /* Open the spool file for the committed/prepared transaction */
    2229          62 :     changes_filename(path, MyLogicalRepWorker->subid, xid);
    2230          62 :     elog(DEBUG1, "replaying changes from file \"%s\"", path);
    2231             : 
    2232             :     /*
    2233             :      * Make sure the file is owned by the toplevel transaction so that the
    2234             :      * file will not be accidentally closed when aborting a subtransaction.
    2235             :      */
    2236          62 :     oldowner = CurrentResourceOwner;
    2237          62 :     CurrentResourceOwner = TopTransactionResourceOwner;
    2238             : 
    2239          62 :     stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
    2240             : 
    2241          62 :     CurrentResourceOwner = oldowner;
    2242             : 
    2243          62 :     buffer = palloc(BLCKSZ);
    2244             : 
    2245          62 :     MemoryContextSwitchTo(oldcxt);
    2246             : 
    2247          62 :     remote_final_lsn = lsn;
    2248             : 
    2249             :     /*
    2250             :      * Make sure the handle apply_dispatch methods are aware we're in a remote
    2251             :      * transaction.
    2252             :      */
    2253          62 :     in_remote_transaction = true;
    2254          62 :     pgstat_report_activity(STATE_RUNNING, NULL);
    2255             : 
    2256          62 :     end_replication_step();
    2257             : 
    2258             :     /*
    2259             :      * Read the entries one by one and pass them through the same logic as in
    2260             :      * apply_dispatch.
    2261             :      */
    2262          62 :     nchanges = 0;
    2263             :     while (true)
    2264      176938 :     {
    2265             :         StringInfoData s2;
    2266             :         size_t      nbytes;
    2267             :         int         len;
    2268             : 
    2269      177000 :         CHECK_FOR_INTERRUPTS();
    2270             : 
    2271             :         /* read length of the on-disk record */
    2272      177000 :         nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
    2273             : 
    2274             :         /* have we reached end of the file? */
    2275      177000 :         if (nbytes == 0)
    2276          52 :             break;
    2277             : 
    2278             :         /* do we have a correct length? */
    2279      176948 :         if (len <= 0)
    2280           0 :             elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
    2281             :                  len, path);
    2282             : 
    2283             :         /* make sure we have sufficiently large buffer */
    2284      176948 :         buffer = repalloc(buffer, len);
    2285             : 
    2286             :         /* and finally read the data into the buffer */
    2287      176948 :         BufFileReadExact(stream_fd, buffer, len);
    2288             : 
    2289      176948 :         BufFileTell(stream_fd, &fileno, &offset);
    2290             : 
    2291             :         /* init a stringinfo using the buffer and call apply_dispatch */
    2292      176948 :         initReadOnlyStringInfo(&s2, buffer, len);
    2293             : 
    2294             :         /* Ensure we are reading the data into our memory context. */
    2295      176948 :         oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
    2296             : 
    2297      176948 :         apply_dispatch(&s2);
    2298             : 
    2299      176946 :         MemoryContextReset(ApplyMessageContext);
    2300             : 
    2301      176946 :         MemoryContextSwitchTo(oldcxt);
    2302             : 
    2303      176946 :         nchanges++;
    2304             : 
    2305             :         /*
    2306             :          * It is possible the file has been closed because we have processed
    2307             :          * the transaction end message like stream_commit in which case that
    2308             :          * must be the last message.
    2309             :          */
    2310      176946 :         if (!stream_fd)
    2311             :         {
    2312           8 :             ensure_last_message(stream_fileset, xid, fileno, offset);
    2313           8 :             break;
    2314             :         }
    2315             : 
    2316      176938 :         if (nchanges % 1000 == 0)
    2317         166 :             elog(DEBUG1, "replayed %d changes from file \"%s\"",
    2318             :                  nchanges, path);
    2319             :     }
    2320             : 
    2321          60 :     if (stream_fd)
    2322          52 :         stream_close_file();
    2323             : 
    2324          60 :     elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
    2325             :          nchanges, path);
    2326             : 
    2327          60 :     return;
    2328             : }
    2329             : 
    2330             : /*
    2331             :  * Handle STREAM COMMIT message.
    2332             :  */
    2333             : static void
    2334         122 : apply_handle_stream_commit(StringInfo s)
    2335             : {
    2336             :     TransactionId xid;
    2337             :     LogicalRepCommitData commit_data;
    2338             :     ParallelApplyWorkerInfo *winfo;
    2339             :     TransApplyAction apply_action;
    2340             : 
    2341             :     /* Save the message before it is consumed. */
    2342         122 :     StringInfoData original_msg = *s;
    2343             : 
    2344         122 :     if (in_streamed_transaction)
    2345           0 :         ereport(ERROR,
    2346             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    2347             :                  errmsg_internal("STREAM COMMIT message without STREAM STOP")));
    2348             : 
    2349         122 :     xid = logicalrep_read_stream_commit(s, &commit_data);
    2350         122 :     set_apply_error_context_xact(xid, commit_data.commit_lsn);
    2351             : 
    2352         122 :     apply_action = get_transaction_apply_action(xid, &winfo);
    2353             : 
    2354         122 :     switch (apply_action)
    2355             :     {
    2356          44 :         case TRANS_LEADER_APPLY:
    2357             : 
    2358             :             /*
    2359             :              * The transaction has been serialized to file, so replay all the
    2360             :              * spooled operations.
    2361             :              */
    2362          44 :             apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
    2363             :                                    commit_data.commit_lsn);
    2364             : 
    2365          42 :             apply_handle_commit_internal(&commit_data);
    2366             : 
    2367             :             /* Unlink the files with serialized changes and subxact info. */
    2368          42 :             stream_cleanup_files(MyLogicalRepWorker->subid, xid);
    2369             : 
    2370          42 :             elog(DEBUG1, "finished processing the STREAM COMMIT command");
    2371          42 :             break;
    2372             : 
    2373          36 :         case TRANS_LEADER_SEND_TO_PARALLEL:
    2374             :             Assert(winfo);
    2375             : 
    2376          36 :             if (pa_send_data(winfo, s->len, s->data))
    2377             :             {
    2378             :                 /* Finish processing the streaming transaction. */
    2379          36 :                 pa_xact_finish(winfo, commit_data.end_lsn);
    2380          34 :                 break;
    2381             :             }
    2382             : 
    2383             :             /*
    2384             :              * Switch to serialize mode when we are not able to send the
    2385             :              * change to parallel apply worker.
    2386             :              */
    2387           0 :             pa_switch_to_partial_serialize(winfo, true);
    2388             : 
    2389             :             /* fall through */
    2390           4 :         case TRANS_LEADER_PARTIAL_SERIALIZE:
    2391             :             Assert(winfo);
    2392             : 
    2393           4 :             stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_COMMIT,
    2394             :                                          &original_msg);
    2395             : 
    2396           4 :             pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
    2397             : 
    2398             :             /* Finish processing the streaming transaction. */
    2399           4 :             pa_xact_finish(winfo, commit_data.end_lsn);
    2400           4 :             break;
    2401             : 
    2402          38 :         case TRANS_PARALLEL_APPLY:
    2403             : 
    2404             :             /*
    2405             :              * If the parallel apply worker is applying spooled messages then
    2406             :              * close the file before committing.
    2407             :              */
    2408          38 :             if (stream_fd)
    2409           4 :                 stream_close_file();
    2410             : 
    2411          38 :             apply_handle_commit_internal(&commit_data);
    2412             : 
    2413          38 :             MyParallelShared->last_commit_end = XactLastCommitEnd;
    2414             : 
    2415             :             /*
    2416             :              * It is important to set the transaction state as finished before
    2417             :              * releasing the lock. See pa_wait_for_xact_finish.
    2418             :              */
    2419          38 :             pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
    2420          38 :             pa_unlock_transaction(xid, AccessExclusiveLock);
    2421             : 
    2422          38 :             pa_reset_subtrans();
    2423             : 
    2424          38 :             elog(DEBUG1, "finished processing the STREAM COMMIT command");
    2425          38 :             break;
    2426             : 
    2427           0 :         default:
    2428           0 :             elog(ERROR, "unexpected apply action: %d", (int) apply_action);
    2429             :             break;
    2430             :     }
    2431             : 
    2432             :     /* Process any tables that are being synchronized in parallel. */
    2433         118 :     process_syncing_tables(commit_data.end_lsn);
    2434             : 
    2435         118 :     pgstat_report_activity(STATE_IDLE, NULL);
    2436             : 
    2437         118 :     reset_apply_error_context_info();
    2438         118 : }
    2439             : 
    2440             : /*
    2441             :  * Helper function for apply_handle_commit and apply_handle_stream_commit.
    2442             :  */
    2443             : static void
    2444         938 : apply_handle_commit_internal(LogicalRepCommitData *commit_data)
    2445             : {
    2446         938 :     if (is_skipping_changes())
    2447             :     {
    2448           4 :         stop_skipping_changes();
    2449             : 
    2450             :         /*
    2451             :          * Start a new transaction to clear the subskiplsn, if not started
    2452             :          * yet.
    2453             :          */
    2454           4 :         if (!IsTransactionState())
    2455           2 :             StartTransactionCommand();
    2456             :     }
    2457             : 
    2458         938 :     if (IsTransactionState())
    2459             :     {
    2460             :         /*
    2461             :          * The transaction is either non-empty or skipped, so we clear the
    2462             :          * subskiplsn.
    2463             :          */
    2464         938 :         clear_subscription_skip_lsn(commit_data->commit_lsn);
    2465             : 
    2466             :         /*
    2467             :          * Update origin state so we can restart streaming from correct
    2468             :          * position in case of crash.
    2469             :          */
    2470         938 :         replorigin_session_origin_lsn = commit_data->end_lsn;
    2471         938 :         replorigin_session_origin_timestamp = commit_data->committime;
    2472             : 
    2473         938 :         CommitTransactionCommand();
    2474             : 
    2475         938 :         if (IsTransactionBlock())
    2476             :         {
    2477           8 :             EndTransactionBlock(false);
    2478           8 :             CommitTransactionCommand();
    2479             :         }
    2480             : 
    2481         938 :         pgstat_report_stat(false);
    2482             : 
    2483         938 :         store_flush_position(commit_data->end_lsn, XactLastCommitEnd);
    2484             :     }
    2485             :     else
    2486             :     {
    2487             :         /* Process any invalidation messages that might have accumulated. */
    2488           0 :         AcceptInvalidationMessages();
    2489           0 :         maybe_reread_subscription();
    2490             :     }
    2491             : 
    2492         938 :     in_remote_transaction = false;
    2493         938 : }
    2494             : 
    2495             : /*
    2496             :  * Handle RELATION message.
    2497             :  *
    2498             :  * Note we don't do validation against local schema here. The validation
    2499             :  * against local schema is postponed until first change for given relation
    2500             :  * comes as we only care about it when applying changes for it anyway and we
    2501             :  * do less locking this way.
    2502             :  */
    2503             : static void
    2504         882 : apply_handle_relation(StringInfo s)
    2505             : {
    2506             :     LogicalRepRelation *rel;
    2507             : 
    2508         882 :     if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
    2509          70 :         return;
    2510             : 
    2511         812 :     rel = logicalrep_read_rel(s);
    2512         812 :     logicalrep_relmap_update(rel);
    2513             : 
    2514             :     /* Also reset all entries in the partition map that refer to remoterel. */
    2515         812 :     logicalrep_partmap_reset_relmap(rel);
    2516             : }
    2517             : 
    2518             : /*
    2519             :  * Handle TYPE message.
    2520             :  *
    2521             :  * This implementation pays no attention to TYPE messages; we expect the user
    2522             :  * to have set things up so that the incoming data is acceptable to the input
    2523             :  * functions for the locally subscribed tables.  Hence, we just read and
    2524             :  * discard the message.
    2525             :  */
    2526             : static void
    2527          36 : apply_handle_type(StringInfo s)
    2528             : {
    2529             :     LogicalRepTyp typ;
    2530             : 
    2531          36 :     if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
    2532           0 :         return;
    2533             : 
    2534          36 :     logicalrep_read_typ(s, &typ);
    2535             : }
    2536             : 
    2537             : /*
    2538             :  * Check that we (the subscription owner) have sufficient privileges on the
    2539             :  * target relation to perform the given operation.
    2540             :  */
    2541             : static void
    2542      440594 : TargetPrivilegesCheck(Relation rel, AclMode mode)
    2543             : {
    2544             :     Oid         relid;
    2545             :     AclResult   aclresult;
    2546             : 
    2547      440594 :     relid = RelationGetRelid(rel);
    2548      440594 :     aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
    2549      440594 :     if (aclresult != ACLCHECK_OK)
    2550          14 :         aclcheck_error(aclresult,
    2551          14 :                        get_relkind_objtype(rel->rd_rel->relkind),
    2552          14 :                        get_rel_name(relid));
    2553             : 
    2554             :     /*
    2555             :      * We lack the infrastructure to honor RLS policies.  It might be possible
    2556             :      * to add such infrastructure here, but tablesync workers lack it, too, so
    2557             :      * we don't bother.  RLS does not ordinarily apply to TRUNCATE commands,
    2558             :      * but it seems dangerous to replicate a TRUNCATE and then refuse to
    2559             :      * replicate subsequent INSERTs, so we forbid all commands the same.
    2560             :      */
    2561      440580 :     if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
    2562           6 :         ereport(ERROR,
    2563             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    2564             :                  errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
    2565             :                         GetUserNameFromId(GetUserId(), true),
    2566             :                         RelationGetRelationName(rel))));
    2567      440574 : }
    2568             : 
    2569             : /*
    2570             :  * Handle INSERT message.
    2571             :  */
    2572             : 
    2573             : static void
    2574      371696 : apply_handle_insert(StringInfo s)
    2575             : {
    2576             :     LogicalRepRelMapEntry *rel;
    2577             :     LogicalRepTupleData newtup;
    2578             :     LogicalRepRelId relid;
    2579             :     UserContext ucxt;
    2580             :     ApplyExecutionData *edata;
    2581             :     EState     *estate;
    2582             :     TupleTableSlot *remoteslot;
    2583             :     MemoryContext oldctx;
    2584             :     bool        run_as_owner;
    2585             : 
    2586             :     /*
    2587             :      * Quick return if we are skipping data modification changes or handling
    2588             :      * streamed transactions.
    2589             :      */
    2590      723390 :     if (is_skipping_changes() ||
    2591      351694 :         handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
    2592      220136 :         return;
    2593             : 
    2594      151682 :     begin_replication_step();
    2595             : 
    2596      151680 :     relid = logicalrep_read_insert(s, &newtup);
    2597      151680 :     rel = logicalrep_rel_open(relid, RowExclusiveLock);
    2598      151668 :     if (!should_apply_changes_for_rel(rel))
    2599             :     {
    2600             :         /*
    2601             :          * The relation can't become interesting in the middle of the
    2602             :          * transaction so it's safe to unlock it.
    2603             :          */
    2604         122 :         logicalrep_rel_close(rel, RowExclusiveLock);
    2605         122 :         end_replication_step();
    2606         122 :         return;
    2607             :     }
    2608             : 
    2609             :     /*
    2610             :      * Make sure that any user-supplied code runs as the table owner, unless
    2611             :      * the user has opted out of that behavior.
    2612             :      */
    2613      151546 :     run_as_owner = MySubscription->runasowner;
    2614      151546 :     if (!run_as_owner)
    2615      151530 :         SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
    2616             : 
    2617             :     /* Set relation for error callback */
    2618      151546 :     apply_error_callback_arg.rel = rel;
    2619             : 
    2620             :     /* Initialize the executor state. */
    2621      151546 :     edata = create_edata_for_relation(rel);
    2622      151546 :     estate = edata->estate;
    2623      151546 :     remoteslot = ExecInitExtraTupleSlot(estate,
    2624      151546 :                                         RelationGetDescr(rel->localrel),
    2625             :                                         &TTSOpsVirtual);
    2626             : 
    2627             :     /* Process and store remote tuple in the slot */
    2628      151546 :     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    2629      151546 :     slot_store_data(remoteslot, rel, &newtup);
    2630      151546 :     slot_fill_defaults(rel, estate, remoteslot);
    2631      151546 :     MemoryContextSwitchTo(oldctx);
    2632             : 
    2633             :     /* For a partitioned table, insert the tuple into a partition. */
    2634      151546 :     if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
    2635         100 :         apply_handle_tuple_routing(edata,
    2636             :                                    remoteslot, NULL, CMD_INSERT);
    2637             :     else
    2638             :     {
    2639      151446 :         ResultRelInfo *relinfo = edata->targetRelInfo;
    2640             : 
    2641      151446 :         ExecOpenIndices(relinfo, false);
    2642      151446 :         apply_handle_insert_internal(edata, relinfo, remoteslot);
    2643      151416 :         ExecCloseIndices(relinfo);
    2644             :     }
    2645             : 
    2646      151504 :     finish_edata(edata);
    2647             : 
    2648             :     /* Reset relation for error callback */
    2649      151504 :     apply_error_callback_arg.rel = NULL;
    2650             : 
    2651      151504 :     if (!run_as_owner)
    2652      151494 :         RestoreUserContext(&ucxt);
    2653             : 
    2654      151504 :     logicalrep_rel_close(rel, NoLock);
    2655             : 
    2656      151504 :     end_replication_step();
    2657             : }
    2658             : 
    2659             : /*
    2660             :  * Workhorse for apply_handle_insert()
    2661             :  * relinfo is for the relation we're actually inserting into
    2662             :  * (could be a child partition of edata->targetRelInfo)
    2663             :  */
    2664             : static void
    2665      151548 : apply_handle_insert_internal(ApplyExecutionData *edata,
    2666             :                              ResultRelInfo *relinfo,
    2667             :                              TupleTableSlot *remoteslot)
    2668             : {
    2669      151548 :     EState     *estate = edata->estate;
    2670             : 
    2671             :     /* Caller should have opened indexes already. */
    2672             :     Assert(relinfo->ri_IndexRelationDescs != NULL ||
    2673             :            !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
    2674             :            RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
    2675             : 
    2676             :     /* Caller will not have done this bit. */
    2677             :     Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
    2678      151548 :     InitConflictIndexes(relinfo);
    2679             : 
    2680             :     /* Do the insert. */
    2681      151548 :     TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
    2682      151536 :     ExecSimpleRelationInsert(relinfo, estate, remoteslot);
    2683      151506 : }
    2684             : 
    2685             : /*
    2686             :  * Check if the logical replication relation is updatable and throw
    2687             :  * appropriate error if it isn't.
    2688             :  */
    2689             : static void
    2690      144570 : check_relation_updatable(LogicalRepRelMapEntry *rel)
    2691             : {
    2692             :     /*
    2693             :      * For partitioned tables, we only need to care if the target partition is
    2694             :      * updatable (aka has PK or RI defined for it).
    2695             :      */
    2696      144570 :     if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
    2697          60 :         return;
    2698             : 
    2699             :     /* Updatable, no error. */
    2700      144510 :     if (rel->updatable)
    2701      144510 :         return;
    2702             : 
    2703             :     /*
    2704             :      * We are in error mode so it's fine this is somewhat slow. It's better to
    2705             :      * give user correct error.
    2706             :      */
    2707           0 :     if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
    2708             :     {
    2709           0 :         ereport(ERROR,
    2710             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2711             :                  errmsg("publisher did not send replica identity column "
    2712             :                         "expected by the logical replication target relation \"%s.%s\"",
    2713             :                         rel->remoterel.nspname, rel->remoterel.relname)));
    2714             :     }
    2715             : 
    2716           0 :     ereport(ERROR,
    2717             :             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2718             :              errmsg("logical replication target relation \"%s.%s\" has "
    2719             :                     "neither REPLICA IDENTITY index nor PRIMARY "
    2720             :                     "KEY and published relation does not have "
    2721             :                     "REPLICA IDENTITY FULL",
    2722             :                     rel->remoterel.nspname, rel->remoterel.relname)));
    2723             : }
    2724             : 
    2725             : /*
    2726             :  * Handle UPDATE message.
    2727             :  *
    2728             :  * TODO: FDW support
    2729             :  */
    2730             : static void
    2731      132320 : apply_handle_update(StringInfo s)
    2732             : {
    2733             :     LogicalRepRelMapEntry *rel;
    2734             :     LogicalRepRelId relid;
    2735             :     UserContext ucxt;
    2736             :     ApplyExecutionData *edata;
    2737             :     EState     *estate;
    2738             :     LogicalRepTupleData oldtup;
    2739             :     LogicalRepTupleData newtup;
    2740             :     bool        has_oldtup;
    2741             :     TupleTableSlot *remoteslot;
    2742             :     RTEPermissionInfo *target_perminfo;
    2743             :     MemoryContext oldctx;
    2744             :     bool        run_as_owner;
    2745             : 
    2746             :     /*
    2747             :      * Quick return if we are skipping data modification changes or handling
    2748             :      * streamed transactions.
    2749             :      */
    2750      264634 :     if (is_skipping_changes() ||
    2751      132314 :         handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
    2752       68446 :         return;
    2753             : 
    2754       63874 :     begin_replication_step();
    2755             : 
    2756       63872 :     relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
    2757             :                                    &newtup);
    2758       63872 :     rel = logicalrep_rel_open(relid, RowExclusiveLock);
    2759       63872 :     if (!should_apply_changes_for_rel(rel))
    2760             :     {
    2761             :         /*
    2762             :          * The relation can't become interesting in the middle of the
    2763             :          * transaction so it's safe to unlock it.
    2764             :          */
    2765           0 :         logicalrep_rel_close(rel, RowExclusiveLock);
    2766           0 :         end_replication_step();
    2767           0 :         return;
    2768             :     }
    2769             : 
    2770             :     /* Set relation for error callback */
    2771       63872 :     apply_error_callback_arg.rel = rel;
    2772             : 
    2773             :     /* Check if we can do the update. */
    2774       63872 :     check_relation_updatable(rel);
    2775             : 
    2776             :     /*
    2777             :      * Make sure that any user-supplied code runs as the table owner, unless
    2778             :      * the user has opted out of that behavior.
    2779             :      */
    2780       63872 :     run_as_owner = MySubscription->runasowner;
    2781       63872 :     if (!run_as_owner)
    2782       63866 :         SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
    2783             : 
    2784             :     /* Initialize the executor state. */
    2785       63870 :     edata = create_edata_for_relation(rel);
    2786       63870 :     estate = edata->estate;
    2787       63870 :     remoteslot = ExecInitExtraTupleSlot(estate,
    2788       63870 :                                         RelationGetDescr(rel->localrel),
    2789             :                                         &TTSOpsVirtual);
    2790             : 
    2791             :     /*
    2792             :      * Populate updatedCols so that per-column triggers can fire, and so
    2793             :      * executor can correctly pass down indexUnchanged hint.  This could
    2794             :      * include more columns than were actually changed on the publisher
    2795             :      * because the logical replication protocol doesn't contain that
    2796             :      * information.  But it would for example exclude columns that only exist
    2797             :      * on the subscriber, since we are not touching those.
    2798             :      */
    2799       63870 :     target_perminfo = list_nth(estate->es_rteperminfos, 0);
    2800      318614 :     for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
    2801             :     {
    2802      254744 :         Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i);
    2803      254744 :         int         remoteattnum = rel->attrmap->attnums[i];
    2804             : 
    2805      254744 :         if (!att->attisdropped && remoteattnum >= 0)
    2806             :         {
    2807             :             Assert(remoteattnum < newtup.ncols);
    2808      137710 :             if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
    2809      137704 :                 target_perminfo->updatedCols =
    2810      137704 :                     bms_add_member(target_perminfo->updatedCols,
    2811             :                                    i + 1 - FirstLowInvalidHeapAttributeNumber);
    2812             :         }
    2813             :     }
    2814             : 
    2815             :     /* Build the search tuple. */
    2816       63870 :     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    2817       63870 :     slot_store_data(remoteslot, rel,
    2818       63870 :                     has_oldtup ? &oldtup : &newtup);
    2819       63870 :     MemoryContextSwitchTo(oldctx);
    2820             : 
    2821             :     /* For a partitioned table, apply update to correct partition. */
    2822       63870 :     if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
    2823          26 :         apply_handle_tuple_routing(edata,
    2824             :                                    remoteslot, &newtup, CMD_UPDATE);
    2825             :     else
    2826       63844 :         apply_handle_update_internal(edata, edata->targetRelInfo,
    2827             :                                      remoteslot, &newtup, rel->localindexoid);
    2828             : 
    2829       63858 :     finish_edata(edata);
    2830             : 
    2831             :     /* Reset relation for error callback */
    2832       63858 :     apply_error_callback_arg.rel = NULL;
    2833             : 
    2834       63858 :     if (!run_as_owner)
    2835       63854 :         RestoreUserContext(&ucxt);
    2836             : 
    2837       63858 :     logicalrep_rel_close(rel, NoLock);
    2838             : 
    2839       63858 :     end_replication_step();
    2840             : }
    2841             : 
    2842             : /*
    2843             :  * Workhorse for apply_handle_update()
    2844             :  * relinfo is for the relation we're actually updating in
    2845             :  * (could be a child partition of edata->targetRelInfo)
    2846             :  */
    2847             : static void
    2848       63844 : apply_handle_update_internal(ApplyExecutionData *edata,
    2849             :                              ResultRelInfo *relinfo,
    2850             :                              TupleTableSlot *remoteslot,
    2851             :                              LogicalRepTupleData *newtup,
    2852             :                              Oid localindexoid)
    2853             : {
    2854       63844 :     EState     *estate = edata->estate;
    2855       63844 :     LogicalRepRelMapEntry *relmapentry = edata->targetRel;
    2856       63844 :     Relation    localrel = relinfo->ri_RelationDesc;
    2857             :     EPQState    epqstate;
    2858       63844 :     TupleTableSlot *localslot = NULL;
    2859       63844 :     ConflictTupleInfo conflicttuple = {0};
    2860             :     bool        found;
    2861             :     MemoryContext oldctx;
    2862             : 
    2863       63844 :     EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
    2864       63844 :     ExecOpenIndices(relinfo, false);
    2865             : 
    2866       63844 :     found = FindReplTupleInLocalRel(edata, localrel,
    2867             :                                     &relmapentry->remoterel,
    2868             :                                     localindexoid,
    2869             :                                     remoteslot, &localslot);
    2870             : 
    2871             :     /*
    2872             :      * Tuple found.
    2873             :      *
    2874             :      * Note this will fail if there are other conflicting unique indexes.
    2875             :      */
    2876       63836 :     if (found)
    2877             :     {
    2878             :         /*
    2879             :          * Report the conflict if the tuple was modified by a different
    2880             :          * origin.
    2881             :          */
    2882       63826 :         if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
    2883           4 :                                     &conflicttuple.origin, &conflicttuple.ts) &&
    2884           4 :             conflicttuple.origin != replorigin_session_origin)
    2885             :         {
    2886             :             TupleTableSlot *newslot;
    2887             : 
    2888             :             /* Store the new tuple for conflict reporting */
    2889           4 :             newslot = table_slot_create(localrel, &estate->es_tupleTable);
    2890           4 :             slot_store_data(newslot, relmapentry, newtup);
    2891             : 
    2892           4 :             conflicttuple.slot = localslot;
    2893             : 
    2894           4 :             ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
    2895             :                                 remoteslot, newslot,
    2896           4 :                                 list_make1(&conflicttuple));
    2897             :         }
    2898             : 
    2899             :         /* Process and store remote tuple in the slot */
    2900       63826 :         oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    2901       63826 :         slot_modify_data(remoteslot, localslot, relmapentry, newtup);
    2902       63826 :         MemoryContextSwitchTo(oldctx);
    2903             : 
    2904       63826 :         EvalPlanQualSetSlot(&epqstate, remoteslot);
    2905             : 
    2906       63826 :         InitConflictIndexes(relinfo);
    2907             : 
    2908             :         /* Do the actual update. */
    2909       63826 :         TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
    2910       63826 :         ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
    2911             :                                  remoteslot);
    2912             :     }
    2913             :     else
    2914             :     {
    2915          10 :         TupleTableSlot *newslot = localslot;
    2916             : 
    2917             :         /* Store the new tuple for conflict reporting */
    2918          10 :         slot_store_data(newslot, relmapentry, newtup);
    2919             : 
    2920             :         /*
    2921             :          * The tuple to be updated could not be found.  Do nothing except for
    2922             :          * emitting a log message.
    2923             :          */
    2924          10 :         ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
    2925          10 :                             remoteslot, newslot, list_make1(&conflicttuple));
    2926             :     }
    2927             : 
    2928             :     /* Cleanup. */
    2929       63832 :     ExecCloseIndices(relinfo);
    2930       63832 :     EvalPlanQualEnd(&epqstate);
    2931       63832 : }
    2932             : 
    2933             : /*
    2934             :  * Handle DELETE message.
    2935             :  *
    2936             :  * TODO: FDW support
    2937             :  */
    2938             : static void
    2939      163868 : apply_handle_delete(StringInfo s)
    2940             : {
    2941             :     LogicalRepRelMapEntry *rel;
    2942             :     LogicalRepTupleData oldtup;
    2943             :     LogicalRepRelId relid;
    2944             :     UserContext ucxt;
    2945             :     ApplyExecutionData *edata;
    2946             :     EState     *estate;
    2947             :     TupleTableSlot *remoteslot;
    2948             :     MemoryContext oldctx;
    2949             :     bool        run_as_owner;
    2950             : 
    2951             :     /*
    2952             :      * Quick return if we are skipping data modification changes or handling
    2953             :      * streamed transactions.
    2954             :      */
    2955      327736 :     if (is_skipping_changes() ||
    2956      163868 :         handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
    2957       83230 :         return;
    2958             : 
    2959       80638 :     begin_replication_step();
    2960             : 
    2961       80638 :     relid = logicalrep_read_delete(s, &oldtup);
    2962       80638 :     rel = logicalrep_rel_open(relid, RowExclusiveLock);
    2963       80638 :     if (!should_apply_changes_for_rel(rel))
    2964             :     {
    2965             :         /*
    2966             :          * The relation can't become interesting in the middle of the
    2967             :          * transaction so it's safe to unlock it.
    2968             :          */
    2969           0 :         logicalrep_rel_close(rel, RowExclusiveLock);
    2970           0 :         end_replication_step();
    2971           0 :         return;
    2972             :     }
    2973             : 
    2974             :     /* Set relation for error callback */
    2975       80638 :     apply_error_callback_arg.rel = rel;
    2976             : 
    2977             :     /* Check if we can do the delete. */
    2978       80638 :     check_relation_updatable(rel);
    2979             : 
    2980             :     /*
    2981             :      * Make sure that any user-supplied code runs as the table owner, unless
    2982             :      * the user has opted out of that behavior.
    2983             :      */
    2984       80638 :     run_as_owner = MySubscription->runasowner;
    2985       80638 :     if (!run_as_owner)
    2986       80634 :         SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
    2987             : 
    2988             :     /* Initialize the executor state. */
    2989       80638 :     edata = create_edata_for_relation(rel);
    2990       80638 :     estate = edata->estate;
    2991       80638 :     remoteslot = ExecInitExtraTupleSlot(estate,
    2992       80638 :                                         RelationGetDescr(rel->localrel),
    2993             :                                         &TTSOpsVirtual);
    2994             : 
    2995             :     /* Build the search tuple. */
    2996       80638 :     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    2997       80638 :     slot_store_data(remoteslot, rel, &oldtup);
    2998       80638 :     MemoryContextSwitchTo(oldctx);
    2999             : 
    3000             :     /* For a partitioned table, apply delete to correct partition. */
    3001       80638 :     if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
    3002          34 :         apply_handle_tuple_routing(edata,
    3003             :                                    remoteslot, NULL, CMD_DELETE);
    3004             :     else
    3005             :     {
    3006       80604 :         ResultRelInfo *relinfo = edata->targetRelInfo;
    3007             : 
    3008       80604 :         ExecOpenIndices(relinfo, false);
    3009       80604 :         apply_handle_delete_internal(edata, relinfo,
    3010             :                                      remoteslot, rel->localindexoid);
    3011       80604 :         ExecCloseIndices(relinfo);
    3012             :     }
    3013             : 
    3014       80638 :     finish_edata(edata);
    3015             : 
    3016             :     /* Reset relation for error callback */
    3017       80638 :     apply_error_callback_arg.rel = NULL;
    3018             : 
    3019       80638 :     if (!run_as_owner)
    3020       80634 :         RestoreUserContext(&ucxt);
    3021             : 
    3022       80638 :     logicalrep_rel_close(rel, NoLock);
    3023             : 
    3024       80638 :     end_replication_step();
    3025             : }
    3026             : 
    3027             : /*
    3028             :  * Workhorse for apply_handle_delete()
    3029             :  * relinfo is for the relation we're actually deleting from
    3030             :  * (could be a child partition of edata->targetRelInfo)
    3031             :  */
    3032             : static void
    3033       80638 : apply_handle_delete_internal(ApplyExecutionData *edata,
    3034             :                              ResultRelInfo *relinfo,
    3035             :                              TupleTableSlot *remoteslot,
    3036             :                              Oid localindexoid)
    3037             : {
    3038       80638 :     EState     *estate = edata->estate;
    3039       80638 :     Relation    localrel = relinfo->ri_RelationDesc;
    3040       80638 :     LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
    3041             :     EPQState    epqstate;
    3042             :     TupleTableSlot *localslot;
    3043       80638 :     ConflictTupleInfo conflicttuple = {0};
    3044             :     bool        found;
    3045             : 
    3046       80638 :     EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
    3047             : 
    3048             :     /* Caller should have opened indexes already. */
    3049             :     Assert(relinfo->ri_IndexRelationDescs != NULL ||
    3050             :            !localrel->rd_rel->relhasindex ||
    3051             :            RelationGetIndexList(localrel) == NIL);
    3052             : 
    3053       80638 :     found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
    3054             :                                     remoteslot, &localslot);
    3055             : 
    3056             :     /* If found delete it. */
    3057       80638 :     if (found)
    3058             :     {
    3059             :         /*
    3060             :          * Report the conflict if the tuple was modified by a different
    3061             :          * origin.
    3062             :          */
    3063       80620 :         if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
    3064           8 :                                     &conflicttuple.origin, &conflicttuple.ts) &&
    3065           8 :             conflicttuple.origin != replorigin_session_origin)
    3066             :         {
    3067           6 :             conflicttuple.slot = localslot;
    3068           6 :             ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
    3069             :                                 remoteslot, NULL,
    3070           6 :                                 list_make1(&conflicttuple));
    3071             :         }
    3072             : 
    3073       80620 :         EvalPlanQualSetSlot(&epqstate, localslot);
    3074             : 
    3075             :         /* Do the actual delete. */
    3076       80620 :         TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE);
    3077       80620 :         ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
    3078             :     }
    3079             :     else
    3080             :     {
    3081             :         /*
    3082             :          * The tuple to be deleted could not be found.  Do nothing except for
    3083             :          * emitting a log message.
    3084             :          */
    3085          18 :         ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
    3086          18 :                             remoteslot, NULL, list_make1(&conflicttuple));
    3087             :     }
    3088             : 
    3089             :     /* Cleanup. */
    3090       80638 :     EvalPlanQualEnd(&epqstate);
    3091       80638 : }
    3092             : 
    3093             : /*
    3094             :  * Try to find a tuple received from the publication side (in 'remoteslot') in
    3095             :  * the corresponding local relation using either replica identity index,
    3096             :  * primary key, index or if needed, sequential scan.
    3097             :  *
    3098             :  * Local tuple, if found, is returned in '*localslot'.
    3099             :  */
    3100             : static bool
    3101      144508 : FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
    3102             :                         LogicalRepRelation *remoterel,
    3103             :                         Oid localidxoid,
    3104             :                         TupleTableSlot *remoteslot,
    3105             :                         TupleTableSlot **localslot)
    3106             : {
    3107      144508 :     EState     *estate = edata->estate;
    3108             :     bool        found;
    3109             : 
    3110             :     /*
    3111             :      * Regardless of the top-level operation, we're performing a read here, so
    3112             :      * check for SELECT privileges.
    3113             :      */
    3114      144508 :     TargetPrivilegesCheck(localrel, ACL_SELECT);
    3115             : 
    3116      144500 :     *localslot = table_slot_create(localrel, &estate->es_tupleTable);
    3117             : 
    3118             :     Assert(OidIsValid(localidxoid) ||
    3119             :            (remoterel->replident == REPLICA_IDENTITY_FULL));
    3120             : 
    3121      144500 :     if (OidIsValid(localidxoid))
    3122             :     {
    3123             : #ifdef USE_ASSERT_CHECKING
    3124             :         Relation    idxrel = index_open(localidxoid, AccessShareLock);
    3125             : 
    3126             :         /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
    3127             :         Assert(GetRelationIdentityOrPK(localrel) == localidxoid ||
    3128             :                (remoterel->replident == REPLICA_IDENTITY_FULL &&
    3129             :                 IsIndexUsableForReplicaIdentityFull(idxrel,
    3130             :                                                     edata->targetRel->attrmap)));
    3131             :         index_close(idxrel, AccessShareLock);
    3132             : #endif
    3133             : 
    3134      144202 :         found = RelationFindReplTupleByIndex(localrel, localidxoid,
    3135             :                                              LockTupleExclusive,
    3136             :                                              remoteslot, *localslot);
    3137             :     }
    3138             :     else
    3139         298 :         found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
    3140             :                                          remoteslot, *localslot);
    3141             : 
    3142      144500 :     return found;
    3143             : }
    3144             : 
    3145             : /*
    3146             :  * This handles insert, update, delete on a partitioned table.
    3147             :  */
    3148             : static void
    3149         160 : apply_handle_tuple_routing(ApplyExecutionData *edata,
    3150             :                            TupleTableSlot *remoteslot,
    3151             :                            LogicalRepTupleData *newtup,
    3152             :                            CmdType operation)
    3153             : {
    3154         160 :     EState     *estate = edata->estate;
    3155         160 :     LogicalRepRelMapEntry *relmapentry = edata->targetRel;
    3156         160 :     ResultRelInfo *relinfo = edata->targetRelInfo;
    3157         160 :     Relation    parentrel = relinfo->ri_RelationDesc;
    3158             :     ModifyTableState *mtstate;
    3159             :     PartitionTupleRouting *proute;
    3160             :     ResultRelInfo *partrelinfo;
    3161             :     Relation    partrel;
    3162             :     TupleTableSlot *remoteslot_part;
    3163             :     TupleConversionMap *map;
    3164             :     MemoryContext oldctx;
    3165         160 :     LogicalRepRelMapEntry *part_entry = NULL;
    3166         160 :     AttrMap    *attrmap = NULL;
    3167             : 
    3168             :     /* ModifyTableState is needed for ExecFindPartition(). */
    3169         160 :     edata->mtstate = mtstate = makeNode(ModifyTableState);
    3170         160 :     mtstate->ps.plan = NULL;
    3171         160 :     mtstate->ps.state = estate;
    3172         160 :     mtstate->operation = operation;
    3173         160 :     mtstate->resultRelInfo = relinfo;
    3174             : 
    3175             :     /* ... as is PartitionTupleRouting. */
    3176         160 :     edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
    3177             : 
    3178             :     /*
    3179             :      * Find the partition to which the "search tuple" belongs.
    3180             :      */
    3181             :     Assert(remoteslot != NULL);
    3182         160 :     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    3183         160 :     partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
    3184             :                                     remoteslot, estate);
    3185             :     Assert(partrelinfo != NULL);
    3186         160 :     partrel = partrelinfo->ri_RelationDesc;
    3187             : 
    3188             :     /*
    3189             :      * Check for supported relkind.  We need this since partitions might be of
    3190             :      * unsupported relkinds; and the set of partitions can change, so checking
    3191             :      * at CREATE/ALTER SUBSCRIPTION would be insufficient.
    3192             :      */
    3193         160 :     CheckSubscriptionRelkind(partrel->rd_rel->relkind,
    3194         160 :                              get_namespace_name(RelationGetNamespace(partrel)),
    3195         160 :                              RelationGetRelationName(partrel));
    3196             : 
    3197             :     /*
    3198             :      * To perform any of the operations below, the tuple must match the
    3199             :      * partition's rowtype. Convert if needed or just copy, using a dedicated
    3200             :      * slot to store the tuple in any case.
    3201             :      */
    3202         160 :     remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
    3203         160 :     if (remoteslot_part == NULL)
    3204          94 :         remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
    3205         160 :     map = ExecGetRootToChildMap(partrelinfo, estate);
    3206         160 :     if (map != NULL)
    3207             :     {
    3208          66 :         attrmap = map->attrMap;
    3209          66 :         remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
    3210             :                                                 remoteslot_part);
    3211             :     }
    3212             :     else
    3213             :     {
    3214          94 :         remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
    3215          94 :         slot_getallattrs(remoteslot_part);
    3216             :     }
    3217         160 :     MemoryContextSwitchTo(oldctx);
    3218             : 
    3219             :     /* Check if we can do the update or delete on the leaf partition. */
    3220         160 :     if (operation == CMD_UPDATE || operation == CMD_DELETE)
    3221             :     {
    3222          60 :         part_entry = logicalrep_partition_open(relmapentry, partrel,
    3223             :                                                attrmap);
    3224          60 :         check_relation_updatable(part_entry);
    3225             :     }
    3226             : 
    3227         160 :     switch (operation)
    3228             :     {
    3229         100 :         case CMD_INSERT:
    3230         100 :             apply_handle_insert_internal(edata, partrelinfo,
    3231             :                                          remoteslot_part);
    3232          88 :             break;
    3233             : 
    3234          34 :         case CMD_DELETE:
    3235          34 :             apply_handle_delete_internal(edata, partrelinfo,
    3236             :                                          remoteslot_part,
    3237             :                                          part_entry->localindexoid);
    3238          34 :             break;
    3239             : 
    3240          26 :         case CMD_UPDATE:
    3241             : 
    3242             :             /*
    3243             :              * For UPDATE, depending on whether or not the updated tuple
    3244             :              * satisfies the partition's constraint, perform a simple UPDATE
    3245             :              * of the partition or move the updated tuple into a different
    3246             :              * suitable partition.
    3247             :              */
    3248             :             {
    3249             :                 TupleTableSlot *localslot;
    3250             :                 ResultRelInfo *partrelinfo_new;
    3251             :                 Relation    partrel_new;
    3252             :                 bool        found;
    3253             :                 EPQState    epqstate;
    3254          26 :                 ConflictTupleInfo conflicttuple = {0};
    3255             : 
    3256             :                 /* Get the matching local tuple from the partition. */
    3257          26 :                 found = FindReplTupleInLocalRel(edata, partrel,
    3258             :                                                 &part_entry->remoterel,
    3259             :                                                 part_entry->localindexoid,
    3260             :                                                 remoteslot_part, &localslot);
    3261          26 :                 if (!found)
    3262             :                 {
    3263           4 :                     TupleTableSlot *newslot = localslot;
    3264             : 
    3265             :                     /* Store the new tuple for conflict reporting */
    3266           4 :                     slot_store_data(newslot, part_entry, newtup);
    3267             : 
    3268             :                     /*
    3269             :                      * The tuple to be updated could not be found.  Do nothing
    3270             :                      * except for emitting a log message.
    3271             :                      */
    3272           4 :                     ReportApplyConflict(estate, partrelinfo, LOG,
    3273             :                                         CT_UPDATE_MISSING, remoteslot_part,
    3274           4 :                                         newslot, list_make1(&conflicttuple));
    3275             : 
    3276           4 :                     return;
    3277             :                 }
    3278             : 
    3279             :                 /*
    3280             :                  * Report the conflict if the tuple was modified by a
    3281             :                  * different origin.
    3282             :                  */
    3283          22 :                 if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
    3284             :                                             &conflicttuple.origin,
    3285           2 :                                             &conflicttuple.ts) &&
    3286           2 :                     conflicttuple.origin != replorigin_session_origin)
    3287             :                 {
    3288             :                     TupleTableSlot *newslot;
    3289             : 
    3290             :                     /* Store the new tuple for conflict reporting */
    3291           2 :                     newslot = table_slot_create(partrel, &estate->es_tupleTable);
    3292           2 :                     slot_store_data(newslot, part_entry, newtup);
    3293             : 
    3294           2 :                     conflicttuple.slot = localslot;
    3295             : 
    3296           2 :                     ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
    3297             :                                         remoteslot_part, newslot,
    3298           2 :                                         list_make1(&conflicttuple));
    3299             :                 }
    3300             : 
    3301             :                 /*
    3302             :                  * Apply the update to the local tuple, putting the result in
    3303             :                  * remoteslot_part.
    3304             :                  */
    3305          22 :                 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    3306          22 :                 slot_modify_data(remoteslot_part, localslot, part_entry,
    3307             :                                  newtup);
    3308          22 :                 MemoryContextSwitchTo(oldctx);
    3309             : 
    3310          22 :                 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
    3311             : 
    3312             :                 /*
    3313             :                  * Does the updated tuple still satisfy the current
    3314             :                  * partition's constraint?
    3315             :                  */
    3316          44 :                 if (!partrel->rd_rel->relispartition ||
    3317          22 :                     ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
    3318             :                                        false))
    3319             :                 {
    3320             :                     /*
    3321             :                      * Yes, so simply UPDATE the partition.  We don't call
    3322             :                      * apply_handle_update_internal() here, which would
    3323             :                      * normally do the following work, to avoid repeating some
    3324             :                      * work already done above to find the local tuple in the
    3325             :                      * partition.
    3326             :                      */
    3327          20 :                     InitConflictIndexes(partrelinfo);
    3328             : 
    3329          20 :                     EvalPlanQualSetSlot(&epqstate, remoteslot_part);
    3330          20 :                     TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
    3331             :                                           ACL_UPDATE);
    3332          20 :                     ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
    3333             :                                              localslot, remoteslot_part);
    3334             :                 }
    3335             :                 else
    3336             :                 {
    3337             :                     /* Move the tuple into the new partition. */
    3338             : 
    3339             :                     /*
    3340             :                      * New partition will be found using tuple routing, which
    3341             :                      * can only occur via the parent table.  We might need to
    3342             :                      * convert the tuple to the parent's rowtype.  Note that
    3343             :                      * this is the tuple found in the partition, not the
    3344             :                      * original search tuple received by this function.
    3345             :                      */
    3346           2 :                     if (map)
    3347             :                     {
    3348             :                         TupleConversionMap *PartitionToRootMap =
    3349           2 :                             convert_tuples_by_name(RelationGetDescr(partrel),
    3350             :                                                    RelationGetDescr(parentrel));
    3351             : 
    3352             :                         remoteslot =
    3353           2 :                             execute_attr_map_slot(PartitionToRootMap->attrMap,
    3354             :                                                   remoteslot_part, remoteslot);
    3355             :                     }
    3356             :                     else
    3357             :                     {
    3358           0 :                         remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
    3359           0 :                         slot_getallattrs(remoteslot);
    3360             :                     }
    3361             : 
    3362             :                     /* Find the new partition. */
    3363           2 :                     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    3364           2 :                     partrelinfo_new = ExecFindPartition(mtstate, relinfo,
    3365             :                                                         proute, remoteslot,
    3366             :                                                         estate);
    3367           2 :                     MemoryContextSwitchTo(oldctx);
    3368             :                     Assert(partrelinfo_new != partrelinfo);
    3369           2 :                     partrel_new = partrelinfo_new->ri_RelationDesc;
    3370             : 
    3371             :                     /* Check that new partition also has supported relkind. */
    3372           2 :                     CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
    3373           2 :                                              get_namespace_name(RelationGetNamespace(partrel_new)),
    3374           2 :                                              RelationGetRelationName(partrel_new));
    3375             : 
    3376             :                     /* DELETE old tuple found in the old partition. */
    3377           2 :                     EvalPlanQualSetSlot(&epqstate, localslot);
    3378           2 :                     TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, ACL_DELETE);
    3379           2 :                     ExecSimpleRelationDelete(partrelinfo, estate, &epqstate, localslot);
    3380             : 
    3381             :                     /* INSERT new tuple into the new partition. */
    3382             : 
    3383             :                     /*
    3384             :                      * Convert the replacement tuple to match the destination
    3385             :                      * partition rowtype.
    3386             :                      */
    3387           2 :                     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    3388           2 :                     remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
    3389           2 :                     if (remoteslot_part == NULL)
    3390           2 :                         remoteslot_part = table_slot_create(partrel_new,
    3391             :                                                             &estate->es_tupleTable);
    3392           2 :                     map = ExecGetRootToChildMap(partrelinfo_new, estate);
    3393           2 :                     if (map != NULL)
    3394             :                     {
    3395           0 :                         remoteslot_part = execute_attr_map_slot(map->attrMap,
    3396             :                                                                 remoteslot,
    3397             :                                                                 remoteslot_part);
    3398             :                     }
    3399             :                     else
    3400             :                     {
    3401           2 :                         remoteslot_part = ExecCopySlot(remoteslot_part,
    3402             :                                                        remoteslot);
    3403           2 :                         slot_getallattrs(remoteslot);
    3404             :                     }
    3405           2 :                     MemoryContextSwitchTo(oldctx);
    3406           2 :                     apply_handle_insert_internal(edata, partrelinfo_new,
    3407             :                                                  remoteslot_part);
    3408             :                 }
    3409             : 
    3410          22 :                 EvalPlanQualEnd(&epqstate);
    3411             :             }
    3412          22 :             break;
    3413             : 
    3414           0 :         default:
    3415           0 :             elog(ERROR, "unrecognized CmdType: %d", (int) operation);
    3416             :             break;
    3417             :     }
    3418             : }
    3419             : 
    3420             : /*
    3421             :  * Handle TRUNCATE message.
    3422             :  *
    3423             :  * TODO: FDW support
    3424             :  */
    3425             : static void
    3426          38 : apply_handle_truncate(StringInfo s)
    3427             : {
    3428          38 :     bool        cascade = false;
    3429          38 :     bool        restart_seqs = false;
    3430          38 :     List       *remote_relids = NIL;
    3431          38 :     List       *remote_rels = NIL;
    3432          38 :     List       *rels = NIL;
    3433          38 :     List       *part_rels = NIL;
    3434          38 :     List       *relids = NIL;
    3435          38 :     List       *relids_logged = NIL;
    3436             :     ListCell   *lc;
    3437          38 :     LOCKMODE    lockmode = AccessExclusiveLock;
    3438             : 
    3439             :     /*
    3440             :      * Quick return if we are skipping data modification changes or handling
    3441             :      * streamed transactions.
    3442             :      */
    3443          76 :     if (is_skipping_changes() ||
    3444          38 :         handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
    3445           0 :         return;
    3446             : 
    3447          38 :     begin_replication_step();
    3448             : 
    3449          38 :     remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
    3450             : 
    3451          94 :     foreach(lc, remote_relids)
    3452             :     {
    3453          56 :         LogicalRepRelId relid = lfirst_oid(lc);
    3454             :         LogicalRepRelMapEntry *rel;
    3455             : 
    3456          56 :         rel = logicalrep_rel_open(relid, lockmode);
    3457          56 :         if (!should_apply_changes_for_rel(rel))
    3458             :         {
    3459             :             /*
    3460             :              * The relation can't become interesting in the middle of the
    3461             :              * transaction so it's safe to unlock it.
    3462             :              */
    3463           0 :             logicalrep_rel_close(rel, lockmode);
    3464           0 :             continue;
    3465             :         }
    3466             : 
    3467          56 :         remote_rels = lappend(remote_rels, rel);
    3468          56 :         TargetPrivilegesCheck(rel->localrel, ACL_TRUNCATE);
    3469          56 :         rels = lappend(rels, rel->localrel);
    3470          56 :         relids = lappend_oid(relids, rel->localreloid);
    3471          56 :         if (RelationIsLogicallyLogged(rel->localrel))
    3472           0 :             relids_logged = lappend_oid(relids_logged, rel->localreloid);
    3473             : 
    3474             :         /*
    3475             :          * Truncate partitions if we got a message to truncate a partitioned
    3476             :          * table.
    3477             :          */
    3478          56 :         if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
    3479             :         {
    3480             :             ListCell   *child;
    3481           8 :             List       *children = find_all_inheritors(rel->localreloid,
    3482             :                                                        lockmode,
    3483             :                                                        NULL);
    3484             : 
    3485          30 :             foreach(child, children)
    3486             :             {
    3487          22 :                 Oid         childrelid = lfirst_oid(child);
    3488             :                 Relation    childrel;
    3489             : 
    3490          22 :                 if (list_member_oid(relids, childrelid))
    3491           8 :                     continue;
    3492             : 
    3493             :                 /* find_all_inheritors already got lock */
    3494          14 :                 childrel = table_open(childrelid, NoLock);
    3495             : 
    3496             :                 /*
    3497             :                  * Ignore temp tables of other backends.  See similar code in
    3498             :                  * ExecuteTruncate().
    3499             :                  */
    3500          14 :                 if (RELATION_IS_OTHER_TEMP(childrel))
    3501             :                 {
    3502           0 :                     table_close(childrel, lockmode);
    3503           0 :                     continue;
    3504             :                 }
    3505             : 
    3506          14 :                 TargetPrivilegesCheck(childrel, ACL_TRUNCATE);
    3507          14 :                 rels = lappend(rels, childrel);
    3508          14 :                 part_rels = lappend(part_rels, childrel);
    3509          14 :                 relids = lappend_oid(relids, childrelid);
    3510             :                 /* Log this relation only if needed for logical decoding */
    3511          14 :                 if (RelationIsLogicallyLogged(childrel))
    3512           0 :                     relids_logged = lappend_oid(relids_logged, childrelid);
    3513             :             }
    3514             :         }
    3515             :     }
    3516             : 
    3517             :     /*
    3518             :      * Even if we used CASCADE on the upstream primary we explicitly default
    3519             :      * to replaying changes without further cascading. This might be later
    3520             :      * changeable with a user specified option.
    3521             :      *
    3522             :      * MySubscription->runasowner tells us whether we want to execute
    3523             :      * replication actions as the subscription owner; the last argument to
    3524             :      * TruncateGuts tells it whether we want to switch to the table owner.
    3525             :      * Those are exactly opposite conditions.
    3526             :      */
    3527          38 :     ExecuteTruncateGuts(rels,
    3528             :                         relids,
    3529             :                         relids_logged,
    3530             :                         DROP_RESTRICT,
    3531             :                         restart_seqs,
    3532          38 :                         !MySubscription->runasowner);
    3533          94 :     foreach(lc, remote_rels)
    3534             :     {
    3535          56 :         LogicalRepRelMapEntry *rel = lfirst(lc);
    3536             : 
    3537          56 :         logicalrep_rel_close(rel, NoLock);
    3538             :     }
    3539          52 :     foreach(lc, part_rels)
    3540             :     {
    3541          14 :         Relation    rel = lfirst(lc);
    3542             : 
    3543          14 :         table_close(rel, NoLock);
    3544             :     }
    3545             : 
    3546          38 :     end_replication_step();
    3547             : }
    3548             : 
    3549             : 
    3550             : /*
    3551             :  * Logical replication protocol message dispatcher.
    3552             :  */
    3553             : void
    3554      674322 : apply_dispatch(StringInfo s)
    3555             : {
    3556      674322 :     LogicalRepMsgType action = pq_getmsgbyte(s);
    3557             :     LogicalRepMsgType saved_command;
    3558             : 
    3559             :     /*
    3560             :      * Set the current command being applied. Since this function can be
    3561             :      * called recursively when applying spooled changes, save the current
    3562             :      * command.
    3563             :      */
    3564      674322 :     saved_command = apply_error_callback_arg.command;
    3565      674322 :     apply_error_callback_arg.command = action;
    3566             : 
    3567      674322 :     switch (action)
    3568             :     {
    3569         924 :         case LOGICAL_REP_MSG_BEGIN:
    3570         924 :             apply_handle_begin(s);
    3571         924 :             break;
    3572             : 
    3573         858 :         case LOGICAL_REP_MSG_COMMIT:
    3574         858 :             apply_handle_commit(s);
    3575         858 :             break;
    3576             : 
    3577      371696 :         case LOGICAL_REP_MSG_INSERT:
    3578      371696 :             apply_handle_insert(s);
    3579      371640 :             break;
    3580             : 
    3581      132320 :         case LOGICAL_REP_MSG_UPDATE:
    3582      132320 :             apply_handle_update(s);
    3583      132304 :             break;
    3584             : 
    3585      163868 :         case LOGICAL_REP_MSG_DELETE:
    3586      163868 :             apply_handle_delete(s);
    3587      163868 :             break;
    3588             : 
    3589          38 :         case LOGICAL_REP_MSG_TRUNCATE:
    3590          38 :             apply_handle_truncate(s);
    3591          38 :             break;
    3592             : 
    3593         882 :         case LOGICAL_REP_MSG_RELATION:
    3594         882 :             apply_handle_relation(s);
    3595         882 :             break;
    3596             : 
    3597          36 :         case LOGICAL_REP_MSG_TYPE:
    3598          36 :             apply_handle_type(s);
    3599          36 :             break;
    3600             : 
    3601          18 :         case LOGICAL_REP_MSG_ORIGIN:
    3602          18 :             apply_handle_origin(s);
    3603          18 :             break;
    3604             : 
    3605           0 :         case LOGICAL_REP_MSG_MESSAGE:
    3606             : 
    3607             :             /*
    3608             :              * Logical replication does not use generic logical messages yet.
    3609             :              * Although, it could be used by other applications that use this
    3610             :              * output plugin.
    3611             :              */
    3612           0 :             break;
    3613             : 
    3614        1676 :         case LOGICAL_REP_MSG_STREAM_START:
    3615        1676 :             apply_handle_stream_start(s);
    3616        1676 :             break;
    3617             : 
    3618        1674 :         case LOGICAL_REP_MSG_STREAM_STOP:
    3619        1674 :             apply_handle_stream_stop(s);
    3620        1670 :             break;
    3621             : 
    3622          76 :         case LOGICAL_REP_MSG_STREAM_ABORT:
    3623          76 :             apply_handle_stream_abort(s);
    3624          76 :             break;
    3625             : 
    3626         122 :         case LOGICAL_REP_MSG_STREAM_COMMIT:
    3627         122 :             apply_handle_stream_commit(s);
    3628         118 :             break;
    3629             : 
    3630          32 :         case LOGICAL_REP_MSG_BEGIN_PREPARE:
    3631          32 :             apply_handle_begin_prepare(s);
    3632          32 :             break;
    3633             : 
    3634          30 :         case LOGICAL_REP_MSG_PREPARE:
    3635          30 :             apply_handle_prepare(s);
    3636          28 :             break;
    3637             : 
    3638          40 :         case LOGICAL_REP_MSG_COMMIT_PREPARED:
    3639          40 :             apply_handle_commit_prepared(s);
    3640          40 :             break;
    3641             : 
    3642          10 :         case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
    3643          10 :             apply_handle_rollback_prepared(s);
    3644          10 :             break;
    3645             : 
    3646          22 :         case LOGICAL_REP_MSG_STREAM_PREPARE:
    3647          22 :             apply_handle_stream_prepare(s);
    3648          22 :             break;
    3649             : 
    3650           0 :         default:
    3651           0 :             ereport(ERROR,
    3652             :                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
    3653             :                      errmsg("invalid logical replication message type \"??? (%d)\"", action)));
    3654             :     }
    3655             : 
    3656             :     /* Reset the current command */
    3657      674240 :     apply_error_callback_arg.command = saved_command;
    3658      674240 : }
    3659             : 
    3660             : /*
    3661             :  * Figure out which write/flush positions to report to the walsender process.
    3662             :  *
    3663             :  * We can't simply report back the last LSN the walsender sent us because the
    3664             :  * local transaction might not yet be flushed to disk locally. Instead we
    3665             :  * build a list that associates local with remote LSNs for every commit. When
    3666             :  * reporting back the flush position to the sender we iterate that list and
    3667             :  * check which entries on it are already locally flushed. Those we can report
    3668             :  * as having been flushed.
    3669             :  *
    3670             :  * The have_pending_txes is true if there are outstanding transactions that
    3671             :  * need to be flushed.
    3672             :  */
    3673             : static void
    3674      125282 : get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
    3675             :                    bool *have_pending_txes)
    3676             : {
    3677             :     dlist_mutable_iter iter;
    3678      125282 :     XLogRecPtr  local_flush = GetFlushRecPtr(NULL);
    3679             : 
    3680      125282 :     *write = InvalidXLogRecPtr;
    3681      125282 :     *flush = InvalidXLogRecPtr;
    3682             : 
    3683      126230 :     dlist_foreach_modify(iter, &lsn_mapping)
    3684             :     {
    3685       16736 :         FlushPosition *pos =
    3686       16736 :             dlist_container(FlushPosition, node, iter.cur);
    3687             : 
    3688       16736 :         *write = pos->remote_end;
    3689             : 
    3690       16736 :         if (pos->local_end <= local_flush)
    3691             :         {
    3692         948 :             *flush = pos->remote_end;
    3693         948 :             dlist_delete(iter.cur);
    3694         948 :             pfree(pos);
    3695             :         }
    3696             :         else
    3697             :         {
    3698             :             /*
    3699             :              * Don't want to uselessly iterate over the rest of the list which
    3700             :              * could potentially be long. Instead get the last element and
    3701             :              * grab the write position from there.
    3702             :              */
    3703       15788 :             pos = dlist_tail_element(FlushPosition, node,
    3704             :                                      &lsn_mapping);
    3705       15788 :             *write = pos->remote_end;
    3706       15788 :             *have_pending_txes = true;
    3707       15788 :             return;
    3708             :         }
    3709             :     }
    3710             : 
    3711      109494 :     *have_pending_txes = !dlist_is_empty(&lsn_mapping);
    3712             : }
    3713             : 
    3714             : /*
    3715             :  * Store current remote/local lsn pair in the tracking list.
    3716             :  */
    3717             : void
    3718        1070 : store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
    3719             : {
    3720             :     FlushPosition *flushpos;
    3721             : 
    3722             :     /*
    3723             :      * Skip for parallel apply workers, because the lsn_mapping is maintained
    3724             :      * by the leader apply worker.
    3725             :      */
    3726        1070 :     if (am_parallel_apply_worker())
    3727          38 :         return;
    3728             : 
    3729             :     /* Need to do this in permanent context */
    3730        1032 :     MemoryContextSwitchTo(ApplyContext);
    3731             : 
    3732             :     /* Track commit lsn  */
    3733        1032 :     flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
    3734        1032 :     flushpos->local_end = local_lsn;
    3735        1032 :     flushpos->remote_end = remote_lsn;
    3736             : 
    3737        1032 :     dlist_push_tail(&lsn_mapping, &flushpos->node);
    3738        1032 :     MemoryContextSwitchTo(ApplyMessageContext);
    3739             : }
    3740             : 
    3741             : 
    3742             : /* Update statistics of the worker. */
    3743             : static void
    3744      372860 : UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
    3745             : {
    3746      372860 :     MyLogicalRepWorker->last_lsn = last_lsn;
    3747      372860 :     MyLogicalRepWorker->last_send_time = send_time;
    3748      372860 :     MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
    3749      372860 :     if (reply)
    3750             :     {
    3751        3192 :         MyLogicalRepWorker->reply_lsn = last_lsn;
    3752        3192 :         MyLogicalRepWorker->reply_time = send_time;
    3753             :     }
    3754      372860 : }
    3755             : 
    3756             : /*
    3757             :  * Apply main loop.
    3758             :  */
    3759             : static void
    3760         770 : LogicalRepApplyLoop(XLogRecPtr last_received)
    3761             : {
    3762         770 :     TimestampTz last_recv_timestamp = GetCurrentTimestamp();
    3763         770 :     bool        ping_sent = false;
    3764             :     TimeLineID  tli;
    3765             :     ErrorContextCallback errcallback;
    3766         770 :     RetainDeadTuplesData rdt_data = {0};
    3767             : 
    3768             :     /*
    3769             :      * Init the ApplyMessageContext which we clean up after each replication
    3770             :      * protocol message.
    3771             :      */
    3772         770 :     ApplyMessageContext = AllocSetContextCreate(ApplyContext,
    3773             :                                                 "ApplyMessageContext",
    3774             :                                                 ALLOCSET_DEFAULT_SIZES);
    3775             : 
    3776             :     /*
    3777             :      * This memory context is used for per-stream data when the streaming mode
    3778             :      * is enabled. This context is reset on each stream stop.
    3779             :      */
    3780         770 :     LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
    3781             :                                                     "LogicalStreamingContext",
    3782             :                                                     ALLOCSET_DEFAULT_SIZES);
    3783             : 
    3784             :     /* mark as idle, before starting to loop */
    3785         770 :     pgstat_report_activity(STATE_IDLE, NULL);
    3786             : 
    3787             :     /*
    3788             :      * Push apply error context callback. Fields will be filled while applying
    3789             :      * a change.
    3790             :      */
    3791         770 :     errcallback.callback = apply_error_callback;
    3792         770 :     errcallback.previous = error_context_stack;
    3793         770 :     error_context_stack = &errcallback;
    3794         770 :     apply_error_context_stack = error_context_stack;
    3795             : 
    3796             :     /* This outer loop iterates once per wait. */
    3797             :     for (;;)
    3798      121060 :     {
    3799      121830 :         pgsocket    fd = PGINVALID_SOCKET;
    3800             :         int         rc;
    3801             :         int         len;
    3802      121830 :         char       *buf = NULL;
    3803      121830 :         bool        endofstream = false;
    3804             :         long        wait_time;
    3805             : 
    3806      121830 :         CHECK_FOR_INTERRUPTS();
    3807             : 
    3808      121830 :         MemoryContextSwitchTo(ApplyMessageContext);
    3809             : 
    3810      121830 :         len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
    3811             : 
    3812      121796 :         if (len != 0)
    3813             :         {
    3814             :             /* Loop to process all available data (without blocking). */
    3815             :             for (;;)
    3816             :             {
    3817      492568 :                 CHECK_FOR_INTERRUPTS();
    3818             : 
    3819      492568 :                 if (len == 0)
    3820             :                 {
    3821      119690 :                     break;
    3822             :                 }
    3823      372878 :                 else if (len < 0)
    3824             :                 {
    3825          18 :                     ereport(LOG,
    3826             :                             (errmsg("data stream from publisher has ended")));
    3827          18 :                     endofstream = true;
    3828          18 :                     break;
    3829             :                 }
    3830             :                 else
    3831             :                 {
    3832             :                     int         c;
    3833             :                     StringInfoData s;
    3834             : 
    3835      372860 :                     if (ConfigReloadPending)
    3836             :                     {
    3837           0 :                         ConfigReloadPending = false;
    3838           0 :                         ProcessConfigFile(PGC_SIGHUP);
    3839             :                     }
    3840             : 
    3841             :                     /* Reset timeout. */
    3842      372860 :                     last_recv_timestamp = GetCurrentTimestamp();
    3843      372860 :                     ping_sent = false;
    3844             : 
    3845      372860 :                     rdt_data.last_recv_time = last_recv_timestamp;
    3846             : 
    3847             :                     /* Ensure we are reading the data into our memory context. */
    3848      372860 :                     MemoryContextSwitchTo(ApplyMessageContext);
    3849             : 
    3850      372860 :                     initReadOnlyStringInfo(&s, buf, len);
    3851             : 
    3852      372860 :                     c = pq_getmsgbyte(&s);
    3853             : 
    3854      372860 :                     if (c == 'w')
    3855             :                     {
    3856             :                         XLogRecPtr  start_lsn;
    3857             :                         XLogRecPtr  end_lsn;
    3858             :                         TimestampTz send_time;
    3859             : 
    3860      369632 :                         start_lsn = pq_getmsgint64(&s);
    3861      369632 :                         end_lsn = pq_getmsgint64(&s);
    3862      369632 :                         send_time = pq_getmsgint64(&s);
    3863             : 
    3864      369632 :                         if (last_received < start_lsn)
    3865      296826 :                             last_received = start_lsn;
    3866             : 
    3867      369632 :                         if (last_received < end_lsn)
    3868           0 :                             last_received = end_lsn;
    3869             : 
    3870      369632 :                         UpdateWorkerStats(last_received, send_time, false);
    3871             : 
    3872      369632 :                         apply_dispatch(&s);
    3873             : 
    3874      369558 :                         maybe_advance_nonremovable_xid(&rdt_data, false);
    3875             :                     }
    3876        3228 :                     else if (c == 'k')
    3877             :                     {
    3878             :                         XLogRecPtr  end_lsn;
    3879             :                         TimestampTz timestamp;
    3880             :                         bool        reply_requested;
    3881             : 
    3882        3192 :                         end_lsn = pq_getmsgint64(&s);
    3883        3192 :                         timestamp = pq_getmsgint64(&s);
    3884        3192 :                         reply_requested = pq_getmsgbyte(&s);
    3885             : 
    3886        3192 :                         if (last_received < end_lsn)
    3887        1836 :                             last_received = end_lsn;
    3888             : 
    3889        3192 :                         send_feedback(last_received, reply_requested, false);
    3890             : 
    3891        3192 :                         maybe_advance_nonremovable_xid(&rdt_data, false);
    3892             : 
    3893        3192 :                         UpdateWorkerStats(last_received, timestamp, true);
    3894             :                     }
    3895          36 :                     else if (c == 's')  /* Primary status update */
    3896             :                     {
    3897          36 :                         rdt_data.remote_lsn = pq_getmsgint64(&s);
    3898          36 :                         rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
    3899          36 :                         rdt_data.remote_nextxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
    3900          36 :                         rdt_data.reply_time = pq_getmsgint64(&s);
    3901             : 
    3902             :                         /*
    3903             :                          * This should never happen, see
    3904             :                          * ProcessStandbyPSRequestMessage. But if it happens
    3905             :                          * due to a bug, we don't want to proceed as it can
    3906             :                          * incorrectly advance oldest_nonremovable_xid.
    3907             :                          */
    3908          36 :                         if (XLogRecPtrIsInvalid(rdt_data.remote_lsn))
    3909           0 :                             elog(ERROR, "cannot get the latest WAL position from the publisher");
    3910             : 
    3911          36 :                         maybe_advance_nonremovable_xid(&rdt_data, true);
    3912             : 
    3913          36 :                         UpdateWorkerStats(last_received, rdt_data.reply_time, false);
    3914             :                     }
    3915             :                     /* other message types are purposefully ignored */
    3916             : 
    3917      372786 :                     MemoryContextReset(ApplyMessageContext);
    3918             :                 }
    3919             : 
    3920      372786 :                 len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
    3921             :             }
    3922             :         }
    3923             : 
    3924             :         /* confirm all writes so far */
    3925      121722 :         send_feedback(last_received, false, false);
    3926             : 
    3927             :         /* Reset the timestamp if no message was received */
    3928      121722 :         rdt_data.last_recv_time = 0;
    3929             : 
    3930      121722 :         maybe_advance_nonremovable_xid(&rdt_data, false);
    3931             : 
    3932      121722 :         if (!in_remote_transaction && !in_streamed_transaction)
    3933             :         {
    3934             :             /*
    3935             :              * If we didn't get any transactions for a while there might be
    3936             :              * unconsumed invalidation messages in the queue, consume them
    3937             :              * now.
    3938             :              */
    3939        5198 :             AcceptInvalidationMessages();
    3940        5198 :             maybe_reread_subscription();
    3941             : 
    3942             :             /* Process any table synchronization changes. */
    3943        5120 :             process_syncing_tables(last_received);
    3944             :         }
    3945             : 
    3946             :         /* Cleanup the memory. */
    3947      121262 :         MemoryContextReset(ApplyMessageContext);
    3948      121262 :         MemoryContextSwitchTo(TopMemoryContext);
    3949             : 
    3950             :         /* Check if we need to exit the streaming loop. */
    3951      121262 :         if (endofstream)
    3952          18 :             break;
    3953             : 
    3954             :         /*
    3955             :          * Wait for more data or latch.  If we have unflushed transactions,
    3956             :          * wake up after WalWriterDelay to see if they've been flushed yet (in
    3957             :          * which case we should send a feedback message).  Otherwise, there's
    3958             :          * no particular urgency about waking up unless we get data or a
    3959             :          * signal.
    3960             :          */
    3961      121244 :         if (!dlist_is_empty(&lsn_mapping))
    3962       14734 :             wait_time = WalWriterDelay;
    3963             :         else
    3964      106510 :             wait_time = NAPTIME_PER_CYCLE;
    3965             : 
    3966             :         /*
    3967             :          * Ensure to wake up when it's possible to advance the non-removable
    3968             :          * transaction ID.
    3969             :          */
    3970      121244 :         if (rdt_data.phase == RDT_GET_CANDIDATE_XID &&
    3971      121172 :             rdt_data.xid_advance_interval)
    3972          80 :             wait_time = Min(wait_time, rdt_data.xid_advance_interval);
    3973             : 
    3974      121244 :         rc = WaitLatchOrSocket(MyLatch,
    3975             :                                WL_SOCKET_READABLE | WL_LATCH_SET |
    3976             :                                WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    3977             :                                fd, wait_time,
    3978             :                                WAIT_EVENT_LOGICAL_APPLY_MAIN);
    3979             : 
    3980      121244 :         if (rc & WL_LATCH_SET)
    3981             :         {
    3982        1346 :             ResetLatch(MyLatch);
    3983        1346 :             CHECK_FOR_INTERRUPTS();
    3984             :         }
    3985             : 
    3986      121060 :         if (ConfigReloadPending)
    3987             :         {
    3988          16 :             ConfigReloadPending = false;
    3989          16 :             ProcessConfigFile(PGC_SIGHUP);
    3990             :         }
    3991             : 
    3992      121060 :         if (rc & WL_TIMEOUT)
    3993             :         {
    3994             :             /*
    3995             :              * We didn't receive anything new. If we haven't heard anything
    3996             :              * from the server for more than wal_receiver_timeout / 2, ping
    3997             :              * the server. Also, if it's been longer than
    3998             :              * wal_receiver_status_interval since the last update we sent,
    3999             :              * send a status update to the primary anyway, to report any
    4000             :              * progress in applying WAL.
    4001             :              */
    4002         352 :             bool        requestReply = false;
    4003             : 
    4004             :             /*
    4005             :              * Check if time since last receive from primary has reached the
    4006             :              * configured limit.
    4007             :              */
    4008         352 :             if (wal_receiver_timeout > 0)
    4009             :             {
    4010         352 :                 TimestampTz now = GetCurrentTimestamp();
    4011             :                 TimestampTz timeout;
    4012             : 
    4013         352 :                 timeout =
    4014         352 :                     TimestampTzPlusMilliseconds(last_recv_timestamp,
    4015             :                                                 wal_receiver_timeout);
    4016             : 
    4017         352 :                 if (now >= timeout)
    4018           0 :                     ereport(ERROR,
    4019             :                             (errcode(ERRCODE_CONNECTION_FAILURE),
    4020             :                              errmsg("terminating logical replication worker due to timeout")));
    4021             : 
    4022             :                 /* Check to see if it's time for a ping. */
    4023         352 :                 if (!ping_sent)
    4024             :                 {
    4025         352 :                     timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
    4026             :                                                           (wal_receiver_timeout / 2));
    4027         352 :                     if (now >= timeout)
    4028             :                     {
    4029           0 :                         requestReply = true;
    4030           0 :                         ping_sent = true;
    4031             :                     }
    4032             :                 }
    4033             :             }
    4034             : 
    4035         352 :             send_feedback(last_received, requestReply, requestReply);
    4036             : 
    4037         352 :             maybe_advance_nonremovable_xid(&rdt_data, false);
    4038             : 
    4039             :             /*
    4040             :              * Force reporting to ensure long idle periods don't lead to
    4041             :              * arbitrarily delayed stats. Stats can only be reported outside
    4042             :              * of (implicit or explicit) transactions. That shouldn't lead to
    4043             :              * stats being delayed for long, because transactions are either
    4044             :              * sent as a whole on commit or streamed. Streamed transactions
    4045             :              * are spilled to disk and applied on commit.
    4046             :              */
    4047         352 :             if (!IsTransactionState())
    4048         352 :                 pgstat_report_stat(true);
    4049             :         }
    4050             :     }
    4051             : 
    4052             :     /* Pop the error context stack */
    4053          18 :     error_context_stack = errcallback.previous;
    4054          18 :     apply_error_context_stack = error_context_stack;
    4055             : 
    4056             :     /* All done */
    4057          18 :     walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
    4058           0 : }
    4059             : 
    4060             : /*
    4061             :  * Send a Standby Status Update message to server.
    4062             :  *
    4063             :  * 'recvpos' is the latest LSN we've received data to, force is set if we need
    4064             :  * to send a response to avoid timeouts.
    4065             :  */
    4066             : static void
    4067      125266 : send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
    4068             : {
    4069             :     static StringInfo reply_message = NULL;
    4070             :     static TimestampTz send_time = 0;
    4071             : 
    4072             :     static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
    4073             :     static XLogRecPtr last_writepos = InvalidXLogRecPtr;
    4074             : 
    4075             :     XLogRecPtr  writepos;
    4076             :     XLogRecPtr  flushpos;
    4077             :     TimestampTz now;
    4078             :     bool        have_pending_txes;
    4079             : 
    4080             :     /*
    4081             :      * If the user doesn't want status to be reported to the publisher, be
    4082             :      * sure to exit before doing anything at all.
    4083             :      */
    4084      125266 :     if (!force && wal_receiver_status_interval <= 0)
    4085       33078 :         return;
    4086             : 
    4087             :     /* It's legal to not pass a recvpos */
    4088      125266 :     if (recvpos < last_recvpos)
    4089           0 :         recvpos = last_recvpos;
    4090             : 
    4091      125266 :     get_flush_position(&writepos, &flushpos, &have_pending_txes);
    4092             : 
    4093             :     /*
    4094             :      * No outstanding transactions to flush, we can report the latest received
    4095             :      * position. This is important for synchronous replication.
    4096             :      */
    4097      125266 :     if (!have_pending_txes)
    4098      109484 :         flushpos = writepos = recvpos;
    4099             : 
    4100      125266 :     if (writepos < last_writepos)
    4101           0 :         writepos = last_writepos;
    4102             : 
    4103      125266 :     if (flushpos < last_flushpos)
    4104       15694 :         flushpos = last_flushpos;
    4105             : 
    4106      125266 :     now = GetCurrentTimestamp();
    4107             : 
    4108             :     /* if we've already reported everything we're good */
    4109      125266 :     if (!force &&
    4110      125262 :         writepos == last_writepos &&
    4111       33606 :         flushpos == last_flushpos &&
    4112       33340 :         !TimestampDifferenceExceeds(send_time, now,
    4113             :                                     wal_receiver_status_interval * 1000))
    4114       33078 :         return;
    4115       92188 :     send_time = now;
    4116             : 
    4117       92188 :     if (!reply_message)
    4118             :     {
    4119         770 :         MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
    4120             : 
    4121         770 :         reply_message = makeStringInfo();
    4122         770 :         MemoryContextSwitchTo(oldctx);
    4123             :     }
    4124             :     else
    4125       91418 :         resetStringInfo(reply_message);
    4126             : 
    4127       92188 :     pq_sendbyte(reply_message, 'r');
    4128       92188 :     pq_sendint64(reply_message, recvpos);   /* write */
    4129       92188 :     pq_sendint64(reply_message, flushpos);  /* flush */
    4130       92188 :     pq_sendint64(reply_message, writepos);  /* apply */
    4131       92188 :     pq_sendint64(reply_message, now);   /* sendTime */
    4132       92188 :     pq_sendbyte(reply_message, requestReply);   /* replyRequested */
    4133             : 
    4134       92188 :     elog(DEBUG2, "sending feedback (force %d) to recv %X/%08X, write %X/%08X, flush %X/%08X",
    4135             :          force,
    4136             :          LSN_FORMAT_ARGS(recvpos),
    4137             :          LSN_FORMAT_ARGS(writepos),
    4138             :          LSN_FORMAT_ARGS(flushpos));
    4139             : 
    4140       92188 :     walrcv_send(LogRepWorkerWalRcvConn,
    4141             :                 reply_message->data, reply_message->len);
    4142             : 
    4143       92188 :     if (recvpos > last_recvpos)
    4144       91656 :         last_recvpos = recvpos;
    4145       92188 :     if (writepos > last_writepos)
    4146       91658 :         last_writepos = writepos;
    4147       92188 :     if (flushpos > last_flushpos)
    4148       91156 :         last_flushpos = flushpos;
    4149             : }
    4150             : 
    4151             : /*
    4152             :  * Attempt to advance the non-removable transaction ID.
    4153             :  *
    4154             :  * See comments atop worker.c for details.
    4155             :  */
    4156             : static void
    4157      494860 : maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
    4158             :                                bool status_received)
    4159             : {
    4160      494860 :     if (!can_advance_nonremovable_xid(rdt_data))
    4161      494434 :         return;
    4162             : 
    4163         426 :     process_rdt_phase_transition(rdt_data, status_received);
    4164             : }
    4165             : 
    4166             : /*
    4167             :  * Preliminary check to determine if advancing the non-removable transaction ID
    4168             :  * is allowed.
    4169             :  */
    4170             : static bool
    4171      494860 : can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
    4172             : {
    4173             :     /*
    4174             :      * It is sufficient to manage non-removable transaction ID for a
    4175             :      * subscription by the main apply worker to detect conflicts reliably even
    4176             :      * for table sync or parallel apply workers.
    4177             :      */
    4178      494860 :     if (!am_leader_apply_worker())
    4179         592 :         return false;
    4180             : 
    4181             :     /* No need to advance if retaining dead tuples is not required */
    4182      494268 :     if (!MySubscription->retaindeadtuples)
    4183      493842 :         return false;
    4184             : 
    4185         426 :     return true;
    4186             : }
    4187             : 
    4188             : /*
    4189             :  * Process phase transitions during the non-removable transaction ID
    4190             :  * advancement. See comments atop worker.c for details of the transition.
    4191             :  */
    4192             : static void
    4193         532 : process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
    4194             :                              bool status_received)
    4195             : {
    4196         532 :     switch (rdt_data->phase)
    4197             :     {
    4198         206 :         case RDT_GET_CANDIDATE_XID:
    4199         206 :             get_candidate_xid(rdt_data);
    4200         206 :             break;
    4201          38 :         case RDT_REQUEST_PUBLISHER_STATUS:
    4202          38 :             request_publisher_status(rdt_data);
    4203          38 :             break;
    4204         182 :         case RDT_WAIT_FOR_PUBLISHER_STATUS:
    4205         182 :             wait_for_publisher_status(rdt_data, status_received);
    4206         182 :             break;
    4207         106 :         case RDT_WAIT_FOR_LOCAL_FLUSH:
    4208         106 :             wait_for_local_flush(rdt_data);
    4209         106 :             break;
    4210             :     }
    4211         532 : }
    4212             : 
    4213             : /*
    4214             :  * Workhorse for the RDT_GET_CANDIDATE_XID phase.
    4215             :  */
    4216             : static void
    4217         206 : get_candidate_xid(RetainDeadTuplesData *rdt_data)
    4218             : {
    4219             :     TransactionId oldest_running_xid;
    4220             :     TimestampTz now;
    4221             : 
    4222             :     /*
    4223             :      * Use last_recv_time when applying changes in the loop to avoid
    4224             :      * unnecessary system time retrieval. If last_recv_time is not available,
    4225             :      * obtain the current timestamp.
    4226             :      */
    4227         206 :     now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
    4228             : 
    4229             :     /*
    4230             :      * Compute the candidate_xid and request the publisher status at most once
    4231             :      * per xid_advance_interval. Refer to adjust_xid_advance_interval() for
    4232             :      * details on how this value is dynamically adjusted. This is to avoid
    4233             :      * using CPU and network resources without making much progress.
    4234             :      */
    4235         206 :     if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
    4236             :                                     rdt_data->xid_advance_interval))
    4237         150 :         return;
    4238             : 
    4239             :     /*
    4240             :      * Immediately update the timer, even if the function returns later
    4241             :      * without setting candidate_xid due to inactivity on the subscriber. This
    4242             :      * avoids frequent calls to GetOldestActiveTransactionId.
    4243             :      */
    4244          56 :     rdt_data->candidate_xid_time = now;
    4245             : 
    4246             :     /*
    4247             :      * Consider transactions in the current database, as only dead tuples from
    4248             :      * this database are required for conflict detection.
    4249             :      */
    4250          56 :     oldest_running_xid = GetOldestActiveTransactionId(false, false);
    4251             : 
    4252             :     /*
    4253             :      * Oldest active transaction ID (oldest_running_xid) can't be behind any
    4254             :      * of its previously computed value.
    4255             :      */
    4256             :     Assert(TransactionIdPrecedesOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
    4257             :                                          oldest_running_xid));
    4258             : 
    4259             :     /* Return if the oldest_nonremovable_xid cannot be advanced */
    4260          56 :     if (TransactionIdEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
    4261             :                             oldest_running_xid))
    4262             :     {
    4263          18 :         adjust_xid_advance_interval(rdt_data, false);
    4264          18 :         return;
    4265             :     }
    4266             : 
    4267          38 :     adjust_xid_advance_interval(rdt_data, true);
    4268             : 
    4269          38 :     rdt_data->candidate_xid = oldest_running_xid;
    4270          38 :     rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
    4271             : 
    4272             :     /* process the next phase */
    4273          38 :     process_rdt_phase_transition(rdt_data, false);
    4274             : }
    4275             : 
    4276             : /*
    4277             :  * Workhorse for the RDT_REQUEST_PUBLISHER_STATUS phase.
    4278             :  */
    4279             : static void
    4280          38 : request_publisher_status(RetainDeadTuplesData *rdt_data)
    4281             : {
    4282             :     static StringInfo request_message = NULL;
    4283             : 
    4284          38 :     if (!request_message)
    4285             :     {
    4286          14 :         MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
    4287             : 
    4288          14 :         request_message = makeStringInfo();
    4289          14 :         MemoryContextSwitchTo(oldctx);
    4290             :     }
    4291             :     else
    4292          24 :         resetStringInfo(request_message);
    4293             : 
    4294             :     /*
    4295             :      * Send the current time to update the remote walsender's latest reply
    4296             :      * message received time.
    4297             :      */
    4298          38 :     pq_sendbyte(request_message, 'p');
    4299          38 :     pq_sendint64(request_message, GetCurrentTimestamp());
    4300             : 
    4301          38 :     elog(DEBUG2, "sending publisher status request message");
    4302             : 
    4303             :     /* Send a request for the publisher status */
    4304          38 :     walrcv_send(LogRepWorkerWalRcvConn,
    4305             :                 request_message->data, request_message->len);
    4306             : 
    4307          38 :     rdt_data->phase = RDT_WAIT_FOR_PUBLISHER_STATUS;
    4308             : 
    4309             :     /*
    4310             :      * Skip calling maybe_advance_nonremovable_xid() since further transition
    4311             :      * is possible only once we receive the publisher status message.
    4312             :      */
    4313          38 : }
    4314             : 
    4315             : /*
    4316             :  * Workhorse for the RDT_WAIT_FOR_PUBLISHER_STATUS phase.
    4317             :  */
    4318             : static void
    4319         182 : wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
    4320             :                           bool status_received)
    4321             : {
    4322             :     /*
    4323             :      * Return if we have requested but not yet received the publisher status.
    4324             :      */
    4325         182 :     if (!status_received)
    4326         146 :         return;
    4327             : 
    4328          36 :     if (!FullTransactionIdIsValid(rdt_data->remote_wait_for))
    4329          36 :         rdt_data->remote_wait_for = rdt_data->remote_nextxid;
    4330             : 
    4331             :     /*
    4332             :      * Check if all remote concurrent transactions that were active at the
    4333             :      * first status request have now completed. If completed, proceed to the
    4334             :      * next phase; otherwise, continue checking the publisher status until
    4335             :      * these transactions finish.
    4336             :      *
    4337             :      * It's possible that transactions in the commit phase during the last
    4338             :      * cycle have now finished committing, but remote_oldestxid remains older
    4339             :      * than remote_wait_for. This can happen if some old transaction came in
    4340             :      * the commit phase when we requested status in this cycle. We do not
    4341             :      * handle this case explicitly as it's rare and the benefit doesn't
    4342             :      * justify the required complexity. Tracking would require either caching
    4343             :      * all xids at the publisher or sending them to subscribers. The condition
    4344             :      * will resolve naturally once the remaining transactions are finished.
    4345             :      *
    4346             :      * Directly advancing the non-removable transaction ID is possible if
    4347             :      * there are no activities on the publisher since the last advancement
    4348             :      * cycle. However, it requires maintaining two fields, last_remote_nextxid
    4349             :      * and last_remote_lsn, within the structure for comparison with the
    4350             :      * current cycle's values. Considering the minimal cost of continuing in
    4351             :      * RDT_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to
    4352             :      * advance the transaction ID here.
    4353             :      */
    4354          36 :     if (FullTransactionIdPrecedesOrEquals(rdt_data->remote_wait_for,
    4355             :                                           rdt_data->remote_oldestxid))
    4356          36 :         rdt_data->phase = RDT_WAIT_FOR_LOCAL_FLUSH;
    4357             :     else
    4358           0 :         rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
    4359             : 
    4360             :     /* process the next phase */
    4361          36 :     process_rdt_phase_transition(rdt_data, false);
    4362             : }
    4363             : 
    4364             : /*
    4365             :  * Workhorse for the RDT_WAIT_FOR_LOCAL_FLUSH phase.
    4366             :  */
    4367             : static void
    4368         106 : wait_for_local_flush(RetainDeadTuplesData *rdt_data)
    4369             : {
    4370             :     Assert(!XLogRecPtrIsInvalid(rdt_data->remote_lsn) &&
    4371             :            TransactionIdIsValid(rdt_data->candidate_xid));
    4372             : 
    4373             :     /*
    4374             :      * We expect the publisher and subscriber clocks to be in sync using time
    4375             :      * sync service like NTP. Otherwise, we will advance this worker's
    4376             :      * oldest_nonremovable_xid prematurely, leading to the removal of rows
    4377             :      * required to detect conflicts reliably. This check primarily addresses
    4378             :      * scenarios where the publisher's clock falls behind; if the publisher's
    4379             :      * clock is ahead, subsequent transactions will naturally bear later
    4380             :      * commit timestamps, conforming to the design outlined atop worker.c.
    4381             :      *
    4382             :      * XXX Consider waiting for the publisher's clock to catch up with the
    4383             :      * subscriber's before proceeding to the next phase.
    4384             :      */
    4385         106 :     if (TimestampDifferenceExceeds(rdt_data->reply_time,
    4386             :                                    rdt_data->candidate_xid_time, 0))
    4387           0 :         ereport(ERROR,
    4388             :                 errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"),
    4389             :                 errdetail_internal("The clock on the publisher is behind that of the subscriber."));
    4390             : 
    4391             :     /*
    4392             :      * Do not attempt to advance the non-removable transaction ID when table
    4393             :      * sync is in progress. During this time, changes from a single
    4394             :      * transaction may be applied by multiple table sync workers corresponding
    4395             :      * to the target tables. So, it's necessary for all table sync workers to
    4396             :      * apply and flush the corresponding changes before advancing the
    4397             :      * transaction ID, otherwise, dead tuples that are still needed for
    4398             :      * conflict detection in table sync workers could be removed prematurely.
    4399             :      * However, confirming the apply and flush progress across all table sync
    4400             :      * workers is complex and not worth the effort, so we simply return if not
    4401             :      * all tables are in the READY state.
    4402             :      *
    4403             :      * It is safe to add new tables with initial states to the subscription
    4404             :      * after this check because any changes applied to these tables should
    4405             :      * have a WAL position greater than the rdt_data->remote_lsn.
    4406             :      */
    4407         106 :     if (!AllTablesyncsReady())
    4408           6 :         return;
    4409             : 
    4410             :     /*
    4411             :      * Update and check the remote flush position if we are applying changes
    4412             :      * in a loop. This is done at most once per WalWriterDelay to avoid
    4413             :      * performing costly operations in get_flush_position() too frequently
    4414             :      * during change application.
    4415             :      */
    4416         138 :     if (last_flushpos < rdt_data->remote_lsn && rdt_data->last_recv_time &&
    4417          38 :         TimestampDifferenceExceeds(rdt_data->flushpos_update_time,
    4418             :                                    rdt_data->last_recv_time, WalWriterDelay))
    4419             :     {
    4420             :         XLogRecPtr  writepos;
    4421             :         XLogRecPtr  flushpos;
    4422             :         bool        have_pending_txes;
    4423             : 
    4424             :         /* Fetch the latest remote flush position */
    4425          16 :         get_flush_position(&writepos, &flushpos, &have_pending_txes);
    4426             : 
    4427          16 :         if (flushpos > last_flushpos)
    4428           0 :             last_flushpos = flushpos;
    4429             : 
    4430          16 :         rdt_data->flushpos_update_time = rdt_data->last_recv_time;
    4431             :     }
    4432             : 
    4433             :     /* Return to wait for the changes to be applied */
    4434         100 :     if (last_flushpos < rdt_data->remote_lsn)
    4435          68 :         return;
    4436             : 
    4437             :     /*
    4438             :      * Reaching here means the remote WAL position has been received, and all
    4439             :      * transactions up to that position on the publisher have been applied and
    4440             :      * flushed locally. So, we can advance the non-removable transaction ID.
    4441             :      */
    4442          32 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
    4443          32 :     MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid;
    4444          32 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
    4445             : 
    4446          32 :     elog(DEBUG2, "confirmed flush up to remote lsn %X/%X: new oldest_nonremovable_xid %u",
    4447             :          LSN_FORMAT_ARGS(rdt_data->remote_lsn),
    4448             :          rdt_data->candidate_xid);
    4449             : 
    4450             :     /* Notify launcher to update the xmin of the conflict slot */
    4451          32 :     ApplyLauncherWakeup();
    4452             : 
    4453             :     /*
    4454             :      * Reset all data fields except those used to determine the timing for the
    4455             :      * next round of transaction ID advancement. We can even use
    4456             :      * flushpos_update_time in the next round to decide whether to get the
    4457             :      * latest flush position.
    4458             :      */
    4459          32 :     rdt_data->phase = RDT_GET_CANDIDATE_XID;
    4460          32 :     rdt_data->remote_lsn = InvalidXLogRecPtr;
    4461          32 :     rdt_data->remote_oldestxid = InvalidFullTransactionId;
    4462          32 :     rdt_data->remote_nextxid = InvalidFullTransactionId;
    4463          32 :     rdt_data->reply_time = 0;
    4464          32 :     rdt_data->remote_wait_for = InvalidFullTransactionId;
    4465          32 :     rdt_data->candidate_xid = InvalidTransactionId;
    4466             : 
    4467             :     /* process the next phase */
    4468          32 :     process_rdt_phase_transition(rdt_data, false);
    4469             : }
    4470             : 
    4471             : /*
    4472             :  * Adjust the interval for advancing non-removable transaction IDs.
    4473             :  *
    4474             :  * We double the interval to try advancing the non-removable transaction IDs
    4475             :  * if there is no activity on the node. The maximum value of the interval is
    4476             :  * capped by wal_receiver_status_interval if it is not zero, otherwise to a
    4477             :  * 3 minutes which should be sufficient to avoid using CPU or network
    4478             :  * resources without much benefit.
    4479             :  *
    4480             :  * The interval is reset to a minimum value of 100ms once there is some
    4481             :  * activity on the node.
    4482             :  *
    4483             :  * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
    4484             :  * consider the other interval or a separate GUC if the need arises.
    4485             :  */
    4486             : static void
    4487          56 : adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
    4488             : {
    4489          56 :     if (!new_xid_found && rdt_data->xid_advance_interval)
    4490          14 :     {
    4491          14 :         int         max_interval = wal_receiver_status_interval
    4492          28 :             ? wal_receiver_status_interval * 1000
    4493          14 :             : MAX_XID_ADVANCE_INTERVAL;
    4494             : 
    4495             :         /*
    4496             :          * No new transaction ID has been assigned since the last check, so
    4497             :          * double the interval, but not beyond the maximum allowable value.
    4498             :          */
    4499          14 :         rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
    4500             :                                              max_interval);
    4501             :     }
    4502             :     else
    4503             :     {
    4504             :         /*
    4505             :          * A new transaction ID was found or the interval is not yet
    4506             :          * initialized, so set the interval to the minimum value.
    4507             :          */
    4508          42 :         rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
    4509             :     }
    4510          56 : }
    4511             : 
    4512             : /*
    4513             :  * Exit routine for apply workers due to subscription parameter changes.
    4514             :  */
    4515             : static void
    4516          82 : apply_worker_exit(void)
    4517             : {
    4518          82 :     if (am_parallel_apply_worker())
    4519             :     {
    4520             :         /*
    4521             :          * Don't stop the parallel apply worker as the leader will detect the
    4522             :          * subscription parameter change and restart logical replication later
    4523             :          * anyway. This also prevents the leader from reporting errors when
    4524             :          * trying to communicate with a stopped parallel apply worker, which
    4525             :          * would accidentally disable subscriptions if disable_on_error was
    4526             :          * set.
    4527             :          */
    4528           0 :         return;
    4529             :     }
    4530             : 
    4531             :     /*
    4532             :      * Reset the last-start time for this apply worker so that the launcher
    4533             :      * will restart it without waiting for wal_retrieve_retry_interval if the
    4534             :      * subscription is still active, and so that we won't leak that hash table
    4535             :      * entry if it isn't.
    4536             :      */
    4537          82 :     if (am_leader_apply_worker())
    4538          82 :         ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
    4539             : 
    4540          82 :     proc_exit(0);
    4541             : }
    4542             : 
    4543             : /*
    4544             :  * Reread subscription info if needed.
    4545             :  *
    4546             :  * For significant changes, we react by exiting the current process; a new
    4547             :  * one will be launched afterwards if needed.
    4548             :  */
    4549             : void
    4550        7178 : maybe_reread_subscription(void)
    4551             : {
    4552             :     MemoryContext oldctx;
    4553             :     Subscription *newsub;
    4554        7178 :     bool        started_tx = false;
    4555             : 
    4556             :     /* When cache state is valid there is nothing to do here. */
    4557        7178 :     if (MySubscriptionValid)
    4558        7014 :         return;
    4559             : 
    4560             :     /* This function might be called inside or outside of transaction. */
    4561         164 :     if (!IsTransactionState())
    4562             :     {
    4563         156 :         StartTransactionCommand();
    4564         156 :         started_tx = true;
    4565             :     }
    4566             : 
    4567             :     /* Ensure allocations in permanent context. */
    4568         164 :     oldctx = MemoryContextSwitchTo(ApplyContext);
    4569             : 
    4570         164 :     newsub = GetSubscription(MyLogicalRepWorker->subid, true);
    4571             : 
    4572             :     /*
    4573             :      * Exit if the subscription was removed. This normally should not happen
    4574             :      * as the worker gets killed during DROP SUBSCRIPTION.
    4575             :      */
    4576         164 :     if (!newsub)
    4577             :     {
    4578           0 :         ereport(LOG,
    4579             :                 (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
    4580             :                         MySubscription->name)));
    4581             : 
    4582             :         /* Ensure we remove no-longer-useful entry for worker's start time */
    4583           0 :         if (am_leader_apply_worker())
    4584           0 :             ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
    4585             : 
    4586           0 :         proc_exit(0);
    4587             :     }
    4588             : 
    4589             :     /* Exit if the subscription was disabled. */
    4590         164 :     if (!newsub->enabled)
    4591             :     {
    4592          22 :         ereport(LOG,
    4593             :                 (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
    4594             :                         MySubscription->name)));
    4595             : 
    4596          22 :         apply_worker_exit();
    4597             :     }
    4598             : 
    4599             :     /* !slotname should never happen when enabled is true. */
    4600             :     Assert(newsub->slotname);
    4601             : 
    4602             :     /* two-phase cannot be altered while the worker is running */
    4603             :     Assert(newsub->twophasestate == MySubscription->twophasestate);
    4604             : 
    4605             :     /*
    4606             :      * Exit if any parameter that affects the remote connection was changed.
    4607             :      * The launcher will start a new worker but note that the parallel apply
    4608             :      * worker won't restart if the streaming option's value is changed from
    4609             :      * 'parallel' to any other value or the server decides not to stream the
    4610             :      * in-progress transaction.
    4611             :      */
    4612         142 :     if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
    4613         138 :         strcmp(newsub->name, MySubscription->name) != 0 ||
    4614         136 :         strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
    4615         136 :         newsub->binary != MySubscription->binary ||
    4616         124 :         newsub->stream != MySubscription->stream ||
    4617         114 :         newsub->passwordrequired != MySubscription->passwordrequired ||
    4618         114 :         strcmp(newsub->origin, MySubscription->origin) != 0 ||
    4619         110 :         newsub->owner != MySubscription->owner ||
    4620         108 :         !equal(newsub->publications, MySubscription->publications))
    4621             :     {
    4622          52 :         if (am_parallel_apply_worker())
    4623           0 :             ereport(LOG,
    4624             :                     (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
    4625             :                             MySubscription->name)));
    4626             :         else
    4627          52 :             ereport(LOG,
    4628             :                     (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
    4629             :                             MySubscription->name)));
    4630             : 
    4631          52 :         apply_worker_exit();
    4632             :     }
    4633             : 
    4634             :     /*
    4635             :      * Exit if the subscription owner's superuser privileges have been
    4636             :      * revoked.
    4637             :      */
    4638          90 :     if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
    4639             :     {
    4640           8 :         if (am_parallel_apply_worker())
    4641           0 :             ereport(LOG,
    4642             :                     errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
    4643             :                            MySubscription->name));
    4644             :         else
    4645           8 :             ereport(LOG,
    4646             :                     errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
    4647             :                            MySubscription->name));
    4648             : 
    4649           8 :         apply_worker_exit();
    4650             :     }
    4651             : 
    4652             :     /* Check for other changes that should never happen too. */
    4653          82 :     if (newsub->dbid != MySubscription->dbid)
    4654             :     {
    4655           0 :         elog(ERROR, "subscription %u changed unexpectedly",
    4656             :              MyLogicalRepWorker->subid);
    4657             :     }
    4658             : 
    4659             :     /* Clean old subscription info and switch to new one. */
    4660          82 :     FreeSubscription(MySubscription);
    4661          82 :     MySubscription = newsub;
    4662             : 
    4663          82 :     MemoryContextSwitchTo(oldctx);
    4664             : 
    4665             :     /* Change synchronous commit according to the user's wishes */
    4666          82 :     SetConfigOption("synchronous_commit", MySubscription->synccommit,
    4667             :                     PGC_BACKEND, PGC_S_OVERRIDE);
    4668             : 
    4669          82 :     if (started_tx)
    4670          78 :         CommitTransactionCommand();
    4671             : 
    4672          82 :     MySubscriptionValid = true;
    4673             : }
    4674             : 
    4675             : /*
    4676             :  * Callback from subscription syscache invalidation.
    4677             :  */
    4678             : static void
    4679         170 : subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
    4680             : {
    4681         170 :     MySubscriptionValid = false;
    4682         170 : }
    4683             : 
    4684             : /*
    4685             :  * subxact_info_write
    4686             :  *    Store information about subxacts for a toplevel transaction.
    4687             :  *
    4688             :  * For each subxact we store offset of its first change in the main file.
    4689             :  * The file is always over-written as a whole.
    4690             :  *
    4691             :  * XXX We should only store subxacts that were not aborted yet.
    4692             :  */
    4693             : static void
    4694         742 : subxact_info_write(Oid subid, TransactionId xid)
    4695             : {
    4696             :     char        path[MAXPGPATH];
    4697             :     Size        len;
    4698             :     BufFile    *fd;
    4699             : 
    4700             :     Assert(TransactionIdIsValid(xid));
    4701             : 
    4702             :     /* construct the subxact filename */
    4703         742 :     subxact_filename(path, subid, xid);
    4704             : 
    4705             :     /* Delete the subxacts file, if exists. */
    4706         742 :     if (subxact_data.nsubxacts == 0)
    4707             :     {
    4708         578 :         cleanup_subxact_info();
    4709         578 :         BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
    4710             : 
    4711         578 :         return;
    4712             :     }
    4713             : 
    4714             :     /*
    4715             :      * Create the subxact file if it not already created, otherwise open the
    4716             :      * existing file.
    4717             :      */
    4718         164 :     fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR,
    4719             :                             true);
    4720         164 :     if (fd == NULL)
    4721          16 :         fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path);
    4722             : 
    4723         164 :     len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
    4724             : 
    4725             :     /* Write the subxact count and subxact info */
    4726         164 :     BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
    4727         164 :     BufFileWrite(fd, subxact_data.subxacts, len);
    4728             : 
    4729         164 :     BufFileClose(fd);
    4730             : 
    4731             :     /* free the memory allocated for subxact info */
    4732         164 :     cleanup_subxact_info();
    4733             : }
    4734             : 
    4735             : /*
    4736             :  * subxact_info_read
    4737             :  *    Restore information about subxacts of a streamed transaction.
    4738             :  *
    4739             :  * Read information about subxacts into the structure subxact_data that can be
    4740             :  * used later.
    4741             :  */
    4742             : static void
    4743         686 : subxact_info_read(Oid subid, TransactionId xid)
    4744             : {
    4745             :     char        path[MAXPGPATH];
    4746             :     Size        len;
    4747             :     BufFile    *fd;
    4748             :     MemoryContext oldctx;
    4749             : 
    4750             :     Assert(!subxact_data.subxacts);
    4751             :     Assert(subxact_data.nsubxacts == 0);
    4752             :     Assert(subxact_data.nsubxacts_max == 0);
    4753             : 
    4754             :     /*
    4755             :      * If the subxact file doesn't exist that means we don't have any subxact
    4756             :      * info.
    4757             :      */
    4758         686 :     subxact_filename(path, subid, xid);
    4759         686 :     fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
    4760             :                             true);
    4761         686 :     if (fd == NULL)
    4762         528 :         return;
    4763             : 
    4764             :     /* read number of subxact items */
    4765         158 :     BufFileReadExact(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
    4766             : 
    4767         158 :     len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
    4768             : 
    4769             :     /* we keep the maximum as a power of 2 */
    4770         158 :     subxact_data.nsubxacts_max = 1 << my_log2(subxact_data.nsubxacts);
    4771             : 
    4772             :     /*
    4773             :      * Allocate subxact information in the logical streaming context. We need
    4774             :      * this information during the complete stream so that we can add the sub
    4775             :      * transaction info to this. On stream stop we will flush this information
    4776             :      * to the subxact file and reset the logical streaming context.
    4777             :      */
    4778         158 :     oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
    4779         158 :     subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
    4780             :                                    sizeof(SubXactInfo));
    4781         158 :     MemoryContextSwitchTo(oldctx);
    4782             : 
    4783         158 :     if (len > 0)
    4784         158 :         BufFileReadExact(fd, subxact_data.subxacts, len);
    4785             : 
    4786         158 :     BufFileClose(fd);
    4787             : }
    4788             : 
    4789             : /*
    4790             :  * subxact_info_add
    4791             :  *    Add information about a subxact (offset in the main file).
    4792             :  */
    4793             : static void
    4794      205024 : subxact_info_add(TransactionId xid)
    4795             : {
    4796      205024 :     SubXactInfo *subxacts = subxact_data.subxacts;
    4797             :     int64       i;
    4798             : 
    4799             :     /* We must have a valid top level stream xid and a stream fd. */
    4800             :     Assert(TransactionIdIsValid(stream_xid));
    4801             :     Assert(stream_fd != NULL);
    4802             : 
    4803             :     /*
    4804             :      * If the XID matches the toplevel transaction, we don't want to add it.
    4805             :      */
    4806      205024 :     if (stream_xid == xid)
    4807      184776 :         return;
    4808             : 
    4809             :     /*
    4810             :      * In most cases we're checking the same subxact as we've already seen in
    4811             :      * the last call, so make sure to ignore it (this change comes later).
    4812             :      */
    4813       20248 :     if (subxact_data.subxact_last == xid)
    4814       20096 :         return;
    4815             : 
    4816             :     /* OK, remember we're processing this XID. */
    4817         152 :     subxact_data.subxact_last = xid;
    4818             : 
    4819             :     /*
    4820             :      * Check if the transaction is already present in the array of subxact. We
    4821             :      * intentionally scan the array from the tail, because we're likely adding
    4822             :      * a change for the most recent subtransactions.
    4823             :      *
    4824             :      * XXX Can we rely on the subxact XIDs arriving in sorted order? That
    4825             :      * would allow us to use binary search here.
    4826             :      */
    4827         190 :     for (i = subxact_data.nsubxacts; i > 0; i--)
    4828             :     {
    4829             :         /* found, so we're done */
    4830         152 :         if (subxacts[i - 1].xid == xid)
    4831         114 :             return;
    4832             :     }
    4833             : 
    4834             :     /* This is a new subxact, so we need to add it to the array. */
    4835          38 :     if (subxact_data.nsubxacts == 0)
    4836             :     {
    4837             :         MemoryContext oldctx;
    4838             : 
    4839          16 :         subxact_data.nsubxacts_max = 128;
    4840             : 
    4841             :         /*
    4842             :          * Allocate this memory for subxacts in per-stream context, see
    4843             :          * subxact_info_read.
    4844             :          */
    4845          16 :         oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
    4846          16 :         subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
    4847          16 :         MemoryContextSwitchTo(oldctx);
    4848             :     }
    4849          22 :     else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
    4850             :     {
    4851          20 :         subxact_data.nsubxacts_max *= 2;
    4852          20 :         subxacts = repalloc(subxacts,
    4853          20 :                             subxact_data.nsubxacts_max * sizeof(SubXactInfo));
    4854             :     }
    4855             : 
    4856          38 :     subxacts[subxact_data.nsubxacts].xid = xid;
    4857             : 
    4858             :     /*
    4859             :      * Get the current offset of the stream file and store it as offset of
    4860             :      * this subxact.
    4861             :      */
    4862          38 :     BufFileTell(stream_fd,
    4863          38 :                 &subxacts[subxact_data.nsubxacts].fileno,
    4864          38 :                 &subxacts[subxact_data.nsubxacts].offset);
    4865             : 
    4866          38 :     subxact_data.nsubxacts++;
    4867          38 :     subxact_data.subxacts = subxacts;
    4868             : }
    4869             : 
    4870             : /* format filename for file containing the info about subxacts */
    4871             : static inline void
    4872        1490 : subxact_filename(char *path, Oid subid, TransactionId xid)
    4873             : {
    4874        1490 :     snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
    4875        1490 : }
    4876             : 
    4877             : /* format filename for file containing serialized changes */
    4878             : static inline void
    4879         874 : changes_filename(char *path, Oid subid, TransactionId xid)
    4880             : {
    4881         874 :     snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
    4882         874 : }
    4883             : 
    4884             : /*
    4885             :  * stream_cleanup_files
    4886             :  *    Cleanup files for a subscription / toplevel transaction.
    4887             :  *
    4888             :  * Remove files with serialized changes and subxact info for a particular
    4889             :  * toplevel transaction. Each subscription has a separate set of files
    4890             :  * for any toplevel transaction.
    4891             :  */
    4892             : void
    4893          62 : stream_cleanup_files(Oid subid, TransactionId xid)
    4894             : {
    4895             :     char        path[MAXPGPATH];
    4896             : 
    4897             :     /* Delete the changes file. */
    4898          62 :     changes_filename(path, subid, xid);
    4899          62 :     BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false);
    4900             : 
    4901             :     /* Delete the subxact file, if it exists. */
    4902          62 :     subxact_filename(path, subid, xid);
    4903          62 :     BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
    4904          62 : }
    4905             : 
    4906             : /*
    4907             :  * stream_open_file
    4908             :  *    Open a file that we'll use to serialize changes for a toplevel
    4909             :  * transaction.
    4910             :  *
    4911             :  * Open a file for streamed changes from a toplevel transaction identified
    4912             :  * by stream_xid (global variable). If it's the first chunk of streamed
    4913             :  * changes for this transaction, create the buffile, otherwise open the
    4914             :  * previously created file.
    4915             :  */
    4916             : static void
    4917         724 : stream_open_file(Oid subid, TransactionId xid, bool first_segment)
    4918             : {
    4919             :     char        path[MAXPGPATH];
    4920             :     MemoryContext oldcxt;
    4921             : 
    4922             :     Assert(OidIsValid(subid));
    4923             :     Assert(TransactionIdIsValid(xid));
    4924             :     Assert(stream_fd == NULL);
    4925             : 
    4926             : 
    4927         724 :     changes_filename(path, subid, xid);
    4928         724 :     elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
    4929             : 
    4930             :     /*
    4931             :      * Create/open the buffiles under the logical streaming context so that we
    4932             :      * have those files until stream stop.
    4933             :      */
    4934         724 :     oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
    4935             : 
    4936             :     /*
    4937             :      * If this is the first streamed segment, create the changes file.
    4938             :      * Otherwise, just open the file for writing, in append mode.
    4939             :      */
    4940         724 :     if (first_segment)
    4941          64 :         stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
    4942             :                                          path);
    4943             :     else
    4944             :     {
    4945             :         /*
    4946             :          * Open the file and seek to the end of the file because we always
    4947             :          * append the changes file.
    4948             :          */
    4949         660 :         stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset,
    4950             :                                        path, O_RDWR, false);
    4951         660 :         BufFileSeek(stream_fd, 0, 0, SEEK_END);
    4952             :     }
    4953             : 
    4954         724 :     MemoryContextSwitchTo(oldcxt);
    4955         724 : }
    4956             : 
    4957             : /*
    4958             :  * stream_close_file
    4959             :  *    Close the currently open file with streamed changes.
    4960             :  */
    4961             : static void
    4962         784 : stream_close_file(void)
    4963             : {
    4964             :     Assert(stream_fd != NULL);
    4965             : 
    4966         784 :     BufFileClose(stream_fd);
    4967             : 
    4968         784 :     stream_fd = NULL;
    4969         784 : }
    4970             : 
    4971             : /*
    4972             :  * stream_write_change
    4973             :  *    Serialize a change to a file for the current toplevel transaction.
    4974             :  *
    4975             :  * The change is serialized in a simple format, with length (not including
    4976             :  * the length), action code (identifying the message type) and message
    4977             :  * contents (without the subxact TransactionId value).
    4978             :  */
    4979             : static void
    4980      215106 : stream_write_change(char action, StringInfo s)
    4981             : {
    4982             :     int         len;
    4983             : 
    4984             :     Assert(stream_fd != NULL);
    4985             : 
    4986             :     /* total on-disk size, including the action type character */
    4987      215106 :     len = (s->len - s->cursor) + sizeof(char);
    4988             : 
    4989             :     /* first write the size */
    4990      215106 :     BufFileWrite(stream_fd, &len, sizeof(len));
    4991             : 
    4992             :     /* then the action */
    4993      215106 :     BufFileWrite(stream_fd, &action, sizeof(action));
    4994             : 
    4995             :     /* and finally the remaining part of the buffer (after the XID) */
    4996      215106 :     len = (s->len - s->cursor);
    4997             : 
    4998      215106 :     BufFileWrite(stream_fd, &s->data[s->cursor], len);
    4999      215106 : }
    5000             : 
    5001             : /*
    5002             :  * stream_open_and_write_change
    5003             :  *    Serialize a message to a file for the given transaction.
    5004             :  *
    5005             :  * This function is similar to stream_write_change except that it will open the
    5006             :  * target file if not already before writing the message and close the file at
    5007             :  * the end.
    5008             :  */
    5009             : static void
    5010          10 : stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
    5011             : {
    5012             :     Assert(!in_streamed_transaction);
    5013             : 
    5014          10 :     if (!stream_fd)
    5015          10 :         stream_start_internal(xid, false);
    5016             : 
    5017          10 :     stream_write_change(action, s);
    5018          10 :     stream_stop_internal(xid);
    5019          10 : }
    5020             : 
    5021             : /*
    5022             :  * Sets streaming options including replication slot name and origin start
    5023             :  * position. Workers need these options for logical replication.
    5024             :  */
    5025             : void
    5026         770 : set_stream_options(WalRcvStreamOptions *options,
    5027             :                    char *slotname,
    5028             :                    XLogRecPtr *origin_startpos)
    5029             : {
    5030             :     int         server_version;
    5031             : 
    5032         770 :     options->logical = true;
    5033         770 :     options->startpoint = *origin_startpos;
    5034         770 :     options->slotname = slotname;
    5035             : 
    5036         770 :     server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
    5037         770 :     options->proto.logical.proto_version =
    5038         770 :         server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
    5039             :         server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
    5040             :         server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
    5041             :         LOGICALREP_PROTO_VERSION_NUM;
    5042             : 
    5043         770 :     options->proto.logical.publication_names = MySubscription->publications;
    5044         770 :     options->proto.logical.binary = MySubscription->binary;
    5045             : 
    5046             :     /*
    5047             :      * Assign the appropriate option value for streaming option according to
    5048             :      * the 'streaming' mode and the publisher's ability to support that mode.
    5049             :      */
    5050         770 :     if (server_version >= 160000 &&
    5051         770 :         MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
    5052             :     {
    5053         702 :         options->proto.logical.streaming_str = "parallel";
    5054         702 :         MyLogicalRepWorker->parallel_apply = true;
    5055             :     }
    5056          68 :     else if (server_version >= 140000 &&
    5057          68 :              MySubscription->stream != LOGICALREP_STREAM_OFF)
    5058             :     {
    5059          52 :         options->proto.logical.streaming_str = "on";
    5060          52 :         MyLogicalRepWorker->parallel_apply = false;
    5061             :     }
    5062             :     else
    5063             :     {
    5064          16 :         options->proto.logical.streaming_str = NULL;
    5065          16 :         MyLogicalRepWorker->parallel_apply = false;
    5066             :     }
    5067             : 
    5068         770 :     options->proto.logical.twophase = false;
    5069         770 :     options->proto.logical.origin = pstrdup(MySubscription->origin);
    5070         770 : }
    5071             : 
    5072             : /*
    5073             :  * Cleanup the memory for subxacts and reset the related variables.
    5074             :  */
    5075             : static inline void
    5076         750 : cleanup_subxact_info()
    5077             : {
    5078         750 :     if (subxact_data.subxacts)
    5079         174 :         pfree(subxact_data.subxacts);
    5080             : 
    5081         750 :     subxact_data.subxacts = NULL;
    5082         750 :     subxact_data.subxact_last = InvalidTransactionId;
    5083         750 :     subxact_data.nsubxacts = 0;
    5084         750 :     subxact_data.nsubxacts_max = 0;
    5085         750 : }
    5086             : 
    5087             : /*
    5088             :  * Common function to run the apply loop with error handling. Disable the
    5089             :  * subscription, if necessary.
    5090             :  *
    5091             :  * Note that we don't handle FATAL errors which are probably because
    5092             :  * of system resource error and are not repeatable.
    5093             :  */
    5094             : void
    5095         770 : start_apply(XLogRecPtr origin_startpos)
    5096             : {
    5097         770 :     PG_TRY();
    5098             :     {
    5099         770 :         LogicalRepApplyLoop(origin_startpos);
    5100             :     }
    5101         122 :     PG_CATCH();
    5102             :     {
    5103             :         /*
    5104             :          * Reset the origin state to prevent the advancement of origin
    5105             :          * progress if we fail to apply. Otherwise, this will result in
    5106             :          * transaction loss as that transaction won't be sent again by the
    5107             :          * server.
    5108             :          */
    5109         122 :         replorigin_reset(0, (Datum) 0);
    5110             : 
    5111         122 :         if (MySubscription->disableonerr)
    5112           6 :             DisableSubscriptionAndExit();
    5113             :         else
    5114             :         {
    5115             :             /*
    5116             :              * Report the worker failed while applying changes. Abort the
    5117             :              * current transaction so that the stats message is sent in an
    5118             :              * idle state.
    5119             :              */
    5120         116 :             AbortOutOfAnyTransaction();
    5121         116 :             pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
    5122             : 
    5123         116 :             PG_RE_THROW();
    5124             :         }
    5125             :     }
    5126           0 :     PG_END_TRY();
    5127           0 : }
    5128             : 
    5129             : /*
    5130             :  * Runs the leader apply worker.
    5131             :  *
    5132             :  * It sets up replication origin, streaming options and then starts streaming.
    5133             :  */
    5134             : static void
    5135         472 : run_apply_worker()
    5136             : {
    5137             :     char        originname[NAMEDATALEN];
    5138         472 :     XLogRecPtr  origin_startpos = InvalidXLogRecPtr;
    5139         472 :     char       *slotname = NULL;
    5140             :     WalRcvStreamOptions options;
    5141             :     RepOriginId originid;
    5142             :     TimeLineID  startpointTLI;
    5143             :     char       *err;
    5144             :     bool        must_use_password;
    5145             : 
    5146         472 :     slotname = MySubscription->slotname;
    5147             : 
    5148             :     /*
    5149             :      * This shouldn't happen if the subscription is enabled, but guard against
    5150             :      * DDL bugs or manual catalog changes.  (libpqwalreceiver will crash if
    5151             :      * slot is NULL.)
    5152             :      */
    5153         472 :     if (!slotname)
    5154           0 :         ereport(ERROR,
    5155             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    5156             :                  errmsg("subscription has no replication slot set")));
    5157             : 
    5158             :     /* Setup replication origin tracking. */
    5159         472 :     ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
    5160             :                                        originname, sizeof(originname));
    5161         472 :     StartTransactionCommand();
    5162         472 :     originid = replorigin_by_name(originname, true);
    5163         472 :     if (!OidIsValid(originid))
    5164           0 :         originid = replorigin_create(originname);
    5165         472 :     replorigin_session_setup(originid, 0);
    5166         472 :     replorigin_session_origin = originid;
    5167         472 :     origin_startpos = replorigin_session_get_progress(false);
    5168         472 :     CommitTransactionCommand();
    5169             : 
    5170             :     /* Is the use of a password mandatory? */
    5171         906 :     must_use_password = MySubscription->passwordrequired &&
    5172         434 :         !MySubscription->ownersuperuser;
    5173             : 
    5174         472 :     LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
    5175             :                                             true, must_use_password,
    5176             :                                             MySubscription->name, &err);
    5177             : 
    5178         452 :     if (LogRepWorkerWalRcvConn == NULL)
    5179          48 :         ereport(ERROR,
    5180             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    5181             :                  errmsg("apply worker for subscription \"%s\" could not connect to the publisher: %s",
    5182             :                         MySubscription->name, err)));
    5183             : 
    5184             :     /*
    5185             :      * We don't really use the output identify_system for anything but it does
    5186             :      * some initializations on the upstream so let's still call it.
    5187             :      */
    5188         404 :     (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
    5189             : 
    5190         404 :     set_apply_error_context_origin(originname);
    5191             : 
    5192         404 :     set_stream_options(&options, slotname, &origin_startpos);
    5193             : 
    5194             :     /*
    5195             :      * Even when the two_phase mode is requested by the user, it remains as
    5196             :      * the tri-state PENDING until all tablesyncs have reached READY state.
    5197             :      * Only then, can it become ENABLED.
    5198             :      *
    5199             :      * Note: If the subscription has no tables then leave the state as
    5200             :      * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
    5201             :      * work.
    5202             :      */
    5203         436 :     if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
    5204          32 :         AllTablesyncsReady())
    5205             :     {
    5206             :         /* Start streaming with two_phase enabled */
    5207          18 :         options.proto.logical.twophase = true;
    5208          18 :         walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
    5209             : 
    5210          18 :         StartTransactionCommand();
    5211             : 
    5212             :         /*
    5213             :          * Updating pg_subscription might involve TOAST table access, so
    5214             :          * ensure we have a valid snapshot.
    5215             :          */
    5216          18 :         PushActiveSnapshot(GetTransactionSnapshot());
    5217             : 
    5218          18 :         UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
    5219          18 :         MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
    5220          18 :         PopActiveSnapshot();
    5221          18 :         CommitTransactionCommand();
    5222             :     }
    5223             :     else
    5224             :     {
    5225         386 :         walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
    5226             :     }
    5227             : 
    5228         404 :     ereport(DEBUG1,
    5229             :             (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
    5230             :                              MySubscription->name,
    5231             :                              MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
    5232             :                              MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
    5233             :                              MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
    5234             :                              "?")));
    5235             : 
    5236             :     /* Run the main loop. */
    5237         404 :     start_apply(origin_startpos);
    5238           0 : }
    5239             : 
    5240             : /*
    5241             :  * Common initialization for leader apply worker, parallel apply worker and
    5242             :  * tablesync worker.
    5243             :  *
    5244             :  * Initialize the database connection, in-memory subscription and necessary
    5245             :  * config options.
    5246             :  */
    5247             : void
    5248         890 : InitializeLogRepWorker(void)
    5249             : {
    5250             :     MemoryContext oldctx;
    5251             : 
    5252             :     /* Run as replica session replication role. */
    5253         890 :     SetConfigOption("session_replication_role", "replica",
    5254             :                     PGC_SUSET, PGC_S_OVERRIDE);
    5255             : 
    5256             :     /* Connect to our database. */
    5257         890 :     BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
    5258         890 :                                               MyLogicalRepWorker->userid,
    5259             :                                               0);
    5260             : 
    5261             :     /*
    5262             :      * Set always-secure search path, so malicious users can't redirect user
    5263             :      * code (e.g. pg_index.indexprs).
    5264             :      */
    5265         884 :     SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
    5266             : 
    5267             :     /* Load the subscription into persistent memory context. */
    5268         884 :     ApplyContext = AllocSetContextCreate(TopMemoryContext,
    5269             :                                          "ApplyContext",
    5270             :                                          ALLOCSET_DEFAULT_SIZES);
    5271         884 :     StartTransactionCommand();
    5272         884 :     oldctx = MemoryContextSwitchTo(ApplyContext);
    5273             : 
    5274         884 :     MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
    5275         880 :     if (!MySubscription)
    5276             :     {
    5277           0 :         ereport(LOG,
    5278             :                 (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
    5279             :                         MyLogicalRepWorker->subid)));
    5280             : 
    5281             :         /* Ensure we remove no-longer-useful entry for worker's start time */
    5282           0 :         if (am_leader_apply_worker())
    5283           0 :             ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
    5284             : 
    5285           0 :         proc_exit(0);
    5286             :     }
    5287             : 
    5288         880 :     MySubscriptionValid = true;
    5289         880 :     MemoryContextSwitchTo(oldctx);
    5290             : 
    5291         880 :     if (!MySubscription->enabled)
    5292             :     {
    5293           0 :         ereport(LOG,
    5294             :                 (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
    5295             :                         MySubscription->name)));
    5296             : 
    5297           0 :         apply_worker_exit();
    5298             :     }
    5299             : 
    5300             :     /*
    5301             :      * Restart the worker if retain_dead_tuples was enabled during startup.
    5302             :      *
    5303             :      * At this point, the replication slot used for conflict detection might
    5304             :      * not exist yet, or could be dropped soon if the launcher perceives
    5305             :      * retain_dead_tuples as disabled. To avoid unnecessary tracking of
    5306             :      * oldest_nonremovable_xid when the slot is absent or at risk of being
    5307             :      * dropped, a restart is initiated.
    5308             :      *
    5309             :      * The oldest_nonremovable_xid should be initialized only when the
    5310             :      * retain_dead_tuples is enabled before launching the worker. See
    5311             :      * logicalrep_worker_launch.
    5312             :      */
    5313         880 :     if (am_leader_apply_worker() &&
    5314         472 :         MySubscription->retaindeadtuples &&
    5315          16 :         !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid))
    5316             :     {
    5317           0 :         ereport(LOG,
    5318             :                 errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup",
    5319             :                        MySubscription->name, "retain_dead_tuples"));
    5320             : 
    5321           0 :         apply_worker_exit();
    5322             :     }
    5323             : 
    5324             :     /* Setup synchronous commit according to the user's wishes */
    5325         880 :     SetConfigOption("synchronous_commit", MySubscription->synccommit,
    5326             :                     PGC_BACKEND, PGC_S_OVERRIDE);
    5327             : 
    5328             :     /*
    5329             :      * Keep us informed about subscription or role changes. Note that the
    5330             :      * role's superuser privilege can be revoked.
    5331             :      */
    5332         880 :     CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
    5333             :                                   subscription_change_cb,
    5334             :                                   (Datum) 0);
    5335             : 
    5336         880 :     CacheRegisterSyscacheCallback(AUTHOID,
    5337             :                                   subscription_change_cb,
    5338             :                                   (Datum) 0);
    5339             : 
    5340         880 :     if (am_tablesync_worker())
    5341         388 :         ereport(LOG,
    5342             :                 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
    5343             :                         MySubscription->name,
    5344             :                         get_rel_name(MyLogicalRepWorker->relid))));
    5345             :     else
    5346         492 :         ereport(LOG,
    5347             :                 (errmsg("logical replication apply worker for subscription \"%s\" has started",
    5348             :                         MySubscription->name)));
    5349             : 
    5350         880 :     CommitTransactionCommand();
    5351         880 : }
    5352             : 
    5353             : /*
    5354             :  * Reset the origin state.
    5355             :  */
    5356             : static void
    5357         982 : replorigin_reset(int code, Datum arg)
    5358             : {
    5359         982 :     replorigin_session_origin = InvalidRepOriginId;
    5360         982 :     replorigin_session_origin_lsn = InvalidXLogRecPtr;
    5361         982 :     replorigin_session_origin_timestamp = 0;
    5362         982 : }
    5363             : 
    5364             : /* Common function to setup the leader apply or tablesync worker. */
    5365             : void
    5366         870 : SetupApplyOrSyncWorker(int worker_slot)
    5367             : {
    5368             :     /* Attach to slot */
    5369         870 :     logicalrep_worker_attach(worker_slot);
    5370             : 
    5371             :     Assert(am_tablesync_worker() || am_leader_apply_worker());
    5372             : 
    5373             :     /* Setup signal handling */
    5374         870 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
    5375         870 :     pqsignal(SIGTERM, die);
    5376         870 :     BackgroundWorkerUnblockSignals();
    5377             : 
    5378             :     /*
    5379             :      * We don't currently need any ResourceOwner in a walreceiver process, but
    5380             :      * if we did, we could call CreateAuxProcessResourceOwner here.
    5381             :      */
    5382             : 
    5383             :     /* Initialise stats to a sanish value */
    5384         870 :     MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
    5385         870 :         MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
    5386             : 
    5387             :     /* Load the libpq-specific functions */
    5388         870 :     load_file("libpqwalreceiver", false);
    5389             : 
    5390         870 :     InitializeLogRepWorker();
    5391             : 
    5392             :     /*
    5393             :      * Register a callback to reset the origin state before aborting any
    5394             :      * pending transaction during shutdown (see ShutdownPostgres()). This will
    5395             :      * avoid origin advancement for an in-complete transaction which could
    5396             :      * otherwise lead to its loss as such a transaction won't be sent by the
    5397             :      * server again.
    5398             :      *
    5399             :      * Note that even a LOG or DEBUG statement placed after setting the origin
    5400             :      * state may process a shutdown signal before committing the current apply
    5401             :      * operation. So, it is important to register such a callback here.
    5402             :      */
    5403         860 :     before_shmem_exit(replorigin_reset, (Datum) 0);
    5404             : 
    5405             :     /* Connect to the origin and start the replication. */
    5406         860 :     elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
    5407             :          MySubscription->conninfo);
    5408             : 
    5409             :     /*
    5410             :      * Setup callback for syscache so that we know when something changes in
    5411             :      * the subscription relation state.
    5412             :      */
    5413         860 :     CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
    5414             :                                   invalidate_syncing_table_states,
    5415             :                                   (Datum) 0);
    5416         860 : }
    5417             : 
    5418             : /* Logical Replication Apply worker entry point */
    5419             : void
    5420         478 : ApplyWorkerMain(Datum main_arg)
    5421             : {
    5422         478 :     int         worker_slot = DatumGetInt32(main_arg);
    5423             : 
    5424         478 :     InitializingApplyWorker = true;
    5425             : 
    5426         478 :     SetupApplyOrSyncWorker(worker_slot);
    5427             : 
    5428         472 :     InitializingApplyWorker = false;
    5429             : 
    5430         472 :     run_apply_worker();
    5431             : 
    5432           0 :     proc_exit(0);
    5433             : }
    5434             : 
    5435             : /*
    5436             :  * After error recovery, disable the subscription in a new transaction
    5437             :  * and exit cleanly.
    5438             :  */
    5439             : void
    5440           8 : DisableSubscriptionAndExit(void)
    5441             : {
    5442             :     /*
    5443             :      * Emit the error message, and recover from the error state to an idle
    5444             :      * state
    5445             :      */
    5446           8 :     HOLD_INTERRUPTS();
    5447             : 
    5448           8 :     EmitErrorReport();
    5449           8 :     AbortOutOfAnyTransaction();
    5450           8 :     FlushErrorState();
    5451             : 
    5452           8 :     RESUME_INTERRUPTS();
    5453             : 
    5454             :     /* Report the worker failed during either table synchronization or apply */
    5455           8 :     pgstat_report_subscription_error(MyLogicalRepWorker->subid,
    5456           8 :                                      !am_tablesync_worker());
    5457             : 
    5458             :     /* Disable the subscription */
    5459           8 :     StartTransactionCommand();
    5460             : 
    5461             :     /*
    5462             :      * Updating pg_subscription might involve TOAST table access, so ensure we
    5463             :      * have a valid snapshot.
    5464             :      */
    5465           8 :     PushActiveSnapshot(GetTransactionSnapshot());
    5466             : 
    5467           8 :     DisableSubscription(MySubscription->oid);
    5468           8 :     PopActiveSnapshot();
    5469           8 :     CommitTransactionCommand();
    5470             : 
    5471             :     /* Ensure we remove no-longer-useful entry for worker's start time */
    5472           8 :     if (am_leader_apply_worker())
    5473           6 :         ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
    5474             : 
    5475             :     /* Notify the subscription has been disabled and exit */
    5476           8 :     ereport(LOG,
    5477             :             errmsg("subscription \"%s\" has been disabled because of an error",
    5478             :                    MySubscription->name));
    5479             : 
    5480             :     /*
    5481             :      * Skip the track_commit_timestamp check when disabling the worker due to
    5482             :      * an error, as verifying commit timestamps is unnecessary in this
    5483             :      * context.
    5484             :      */
    5485           8 :     if (MySubscription->retaindeadtuples)
    5486           0 :         CheckSubDeadTupleRetention(false, true, WARNING);
    5487             : 
    5488           8 :     proc_exit(0);
    5489             : }
    5490             : 
    5491             : /*
    5492             :  * Is current process a logical replication worker?
    5493             :  */
    5494             : bool
    5495        4006 : IsLogicalWorker(void)
    5496             : {
    5497        4006 :     return MyLogicalRepWorker != NULL;
    5498             : }
    5499             : 
    5500             : /*
    5501             :  * Is current process a logical replication parallel apply worker?
    5502             :  */
    5503             : bool
    5504        2766 : IsLogicalParallelApplyWorker(void)
    5505             : {
    5506        2766 :     return IsLogicalWorker() && am_parallel_apply_worker();
    5507             : }
    5508             : 
    5509             : /*
    5510             :  * Start skipping changes of the transaction if the given LSN matches the
    5511             :  * LSN specified by subscription's skiplsn.
    5512             :  */
    5513             : static void
    5514        1010 : maybe_start_skipping_changes(XLogRecPtr finish_lsn)
    5515             : {
    5516             :     Assert(!is_skipping_changes());
    5517             :     Assert(!in_remote_transaction);
    5518             :     Assert(!in_streamed_transaction);
    5519             : 
    5520             :     /*
    5521             :      * Quick return if it's not requested to skip this transaction. This
    5522             :      * function is called for every remote transaction and we assume that
    5523             :      * skipping the transaction is not used often.
    5524             :      */
    5525        1010 :     if (likely(XLogRecPtrIsInvalid(MySubscription->skiplsn) ||
    5526             :                MySubscription->skiplsn != finish_lsn))
    5527        1004 :         return;
    5528             : 
    5529             :     /* Start skipping all changes of this transaction */
    5530           6 :     skip_xact_finish_lsn = finish_lsn;
    5531             : 
    5532           6 :     ereport(LOG,
    5533             :             errmsg("logical replication starts skipping transaction at LSN %X/%08X",
    5534             :                    LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
    5535             : }
    5536             : 
    5537             : /*
    5538             :  * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
    5539             :  */
    5540             : static void
    5541          54 : stop_skipping_changes(void)
    5542             : {
    5543          54 :     if (!is_skipping_changes())
    5544          48 :         return;
    5545             : 
    5546           6 :     ereport(LOG,
    5547             :             errmsg("logical replication completed skipping transaction at LSN %X/%08X",
    5548             :                    LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
    5549             : 
    5550             :     /* Stop skipping changes */
    5551           6 :     skip_xact_finish_lsn = InvalidXLogRecPtr;
    5552             : }
    5553             : 
    5554             : /*
    5555             :  * Clear subskiplsn of pg_subscription catalog.
    5556             :  *
    5557             :  * finish_lsn is the transaction's finish LSN that is used to check if the
    5558             :  * subskiplsn matches it. If not matched, we raise a warning when clearing the
    5559             :  * subskiplsn in order to inform users for cases e.g., where the user mistakenly
    5560             :  * specified the wrong subskiplsn.
    5561             :  */
    5562             : static void
    5563        1038 : clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
    5564             : {
    5565             :     Relation    rel;
    5566             :     Form_pg_subscription subform;
    5567             :     HeapTuple   tup;
    5568        1038 :     XLogRecPtr  myskiplsn = MySubscription->skiplsn;
    5569        1038 :     bool        started_tx = false;
    5570             : 
    5571        1038 :     if (likely(XLogRecPtrIsInvalid(myskiplsn)) || am_parallel_apply_worker())
    5572        1032 :         return;
    5573             : 
    5574           6 :     if (!IsTransactionState())
    5575             :     {
    5576           2 :         StartTransactionCommand();
    5577           2 :         started_tx = true;
    5578             :     }
    5579             : 
    5580             :     /*
    5581             :      * Updating pg_subscription might involve TOAST table access, so ensure we
    5582             :      * have a valid snapshot.
    5583             :      */
    5584           6 :     PushActiveSnapshot(GetTransactionSnapshot());
    5585             : 
    5586             :     /*
    5587             :      * Protect subskiplsn of pg_subscription from being concurrently updated
    5588             :      * while clearing it.
    5589             :      */
    5590           6 :     LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
    5591             :                      AccessShareLock);
    5592             : 
    5593           6 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
    5594             : 
    5595             :     /* Fetch the existing tuple. */
    5596           6 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
    5597             :                               ObjectIdGetDatum(MySubscription->oid));
    5598             : 
    5599           6 :     if (!HeapTupleIsValid(tup))
    5600           0 :         elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
    5601             : 
    5602           6 :     subform = (Form_pg_subscription) GETSTRUCT(tup);
    5603             : 
    5604             :     /*
    5605             :      * Clear the subskiplsn. If the user has already changed subskiplsn before
    5606             :      * clearing it we don't update the catalog and the replication origin
    5607             :      * state won't get advanced. So in the worst case, if the server crashes
    5608             :      * before sending an acknowledgment of the flush position the transaction
    5609             :      * will be sent again and the user needs to set subskiplsn again. We can
    5610             :      * reduce the possibility by logging a replication origin WAL record to
    5611             :      * advance the origin LSN instead but there is no way to advance the
    5612             :      * origin timestamp and it doesn't seem to be worth doing anything about
    5613             :      * it since it's a very rare case.
    5614             :      */
    5615           6 :     if (subform->subskiplsn == myskiplsn)
    5616             :     {
    5617             :         bool        nulls[Natts_pg_subscription];
    5618             :         bool        replaces[Natts_pg_subscription];
    5619             :         Datum       values[Natts_pg_subscription];
    5620             : 
    5621           6 :         memset(values, 0, sizeof(values));
    5622           6 :         memset(nulls, false, sizeof(nulls));
    5623           6 :         memset(replaces, false, sizeof(replaces));
    5624             : 
    5625             :         /* reset subskiplsn */
    5626           6 :         values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
    5627           6 :         replaces[Anum_pg_subscription_subskiplsn - 1] = true;
    5628             : 
    5629           6 :         tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
    5630             :                                 replaces);
    5631           6 :         CatalogTupleUpdate(rel, &tup->t_self, tup);
    5632             : 
    5633           6 :         if (myskiplsn != finish_lsn)
    5634           0 :             ereport(WARNING,
    5635             :                     errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
    5636             :                     errdetail("Remote transaction's finish WAL location (LSN) %X/%08X did not match skip-LSN %X/%08X.",
    5637             :                               LSN_FORMAT_ARGS(finish_lsn),
    5638             :                               LSN_FORMAT_ARGS(myskiplsn)));
    5639             :     }
    5640             : 
    5641           6 :     heap_freetuple(tup);
    5642           6 :     table_close(rel, NoLock);
    5643             : 
    5644           6 :     PopActiveSnapshot();
    5645             : 
    5646           6 :     if (started_tx)
    5647           2 :         CommitTransactionCommand();
    5648             : }
    5649             : 
    5650             : /* Error callback to give more context info about the change being applied */
    5651             : void
    5652        1502 : apply_error_callback(void *arg)
    5653             : {
    5654        1502 :     ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
    5655             : 
    5656        1502 :     if (apply_error_callback_arg.command == 0)
    5657         776 :         return;
    5658             : 
    5659             :     Assert(errarg->origin_name);
    5660             : 
    5661         726 :     if (errarg->rel == NULL)
    5662             :     {
    5663         624 :         if (!TransactionIdIsValid(errarg->remote_xid))
    5664           0 :             errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
    5665             :                        errarg->origin_name,
    5666             :                        logicalrep_message_type(errarg->command));
    5667         624 :         else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
    5668         516 :             errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
    5669             :                        errarg->origin_name,
    5670             :                        logicalrep_message_type(errarg->command),
    5671             :                        errarg->remote_xid);
    5672             :         else
    5673         216 :             errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%08X",
    5674             :                        errarg->origin_name,
    5675             :                        logicalrep_message_type(errarg->command),
    5676             :                        errarg->remote_xid,
    5677         108 :                        LSN_FORMAT_ARGS(errarg->finish_lsn));
    5678             :     }
    5679             :     else
    5680             :     {
    5681         102 :         if (errarg->remote_attnum < 0)
    5682             :         {
    5683         102 :             if (XLogRecPtrIsInvalid(errarg->finish_lsn))
    5684           4 :                 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
    5685             :                            errarg->origin_name,
    5686             :                            logicalrep_message_type(errarg->command),
    5687           2 :                            errarg->rel->remoterel.nspname,
    5688           2 :                            errarg->rel->remoterel.relname,
    5689             :                            errarg->remote_xid);
    5690             :             else
    5691         200 :                 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%08X",
    5692             :                            errarg->origin_name,
    5693             :                            logicalrep_message_type(errarg->command),
    5694         100 :                            errarg->rel->remoterel.nspname,
    5695         100 :                            errarg->rel->remoterel.relname,
    5696             :                            errarg->remote_xid,
    5697         100 :                            LSN_FORMAT_ARGS(errarg->finish_lsn));
    5698             :         }
    5699             :         else
    5700             :         {
    5701           0 :             if (XLogRecPtrIsInvalid(errarg->finish_lsn))
    5702           0 :                 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
    5703             :                            errarg->origin_name,
    5704             :                            logicalrep_message_type(errarg->command),
    5705           0 :                            errarg->rel->remoterel.nspname,
    5706           0 :                            errarg->rel->remoterel.relname,
    5707           0 :                            errarg->rel->remoterel.attnames[errarg->remote_attnum],
    5708             :                            errarg->remote_xid);
    5709             :             else
    5710           0 :                 errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%08X",
    5711             :                            errarg->origin_name,
    5712             :                            logicalrep_message_type(errarg->command),
    5713           0 :                            errarg->rel->remoterel.nspname,
    5714           0 :                            errarg->rel->remoterel.relname,
    5715           0 :                            errarg->rel->remoterel.attnames[errarg->remote_attnum],
    5716             :                            errarg->remote_xid,
    5717           0 :                            LSN_FORMAT_ARGS(errarg->finish_lsn));
    5718             :         }
    5719             :     }
    5720             : }
    5721             : 
    5722             : /* Set transaction information of apply error callback */
    5723             : static inline void
    5724        5724 : set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
    5725             : {
    5726        5724 :     apply_error_callback_arg.remote_xid = xid;
    5727        5724 :     apply_error_callback_arg.finish_lsn = lsn;
    5728        5724 : }
    5729             : 
    5730             : /* Reset all information of apply error callback */
    5731             : static inline void
    5732        2822 : reset_apply_error_context_info(void)
    5733             : {
    5734        2822 :     apply_error_callback_arg.command = 0;
    5735        2822 :     apply_error_callback_arg.rel = NULL;
    5736        2822 :     apply_error_callback_arg.remote_attnum = -1;
    5737        2822 :     set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
    5738        2822 : }
    5739             : 
    5740             : /*
    5741             :  * Request wakeup of the workers for the given subscription OID
    5742             :  * at commit of the current transaction.
    5743             :  *
    5744             :  * This is used to ensure that the workers process assorted changes
    5745             :  * as soon as possible.
    5746             :  */
    5747             : void
    5748         406 : LogicalRepWorkersWakeupAtCommit(Oid subid)
    5749             : {
    5750             :     MemoryContext oldcxt;
    5751             : 
    5752         406 :     oldcxt = MemoryContextSwitchTo(TopTransactionContext);
    5753         406 :     on_commit_wakeup_workers_subids =
    5754         406 :         list_append_unique_oid(on_commit_wakeup_workers_subids, subid);
    5755         406 :     MemoryContextSwitchTo(oldcxt);
    5756         406 : }
    5757             : 
    5758             : /*
    5759             :  * Wake up the workers of any subscriptions that were changed in this xact.
    5760             :  */
    5761             : void
    5762     1061494 : AtEOXact_LogicalRepWorkers(bool isCommit)
    5763             : {
    5764     1061494 :     if (isCommit && on_commit_wakeup_workers_subids != NIL)
    5765             :     {
    5766             :         ListCell   *lc;
    5767             : 
    5768         396 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    5769         792 :         foreach(lc, on_commit_wakeup_workers_subids)
    5770             :         {
    5771         396 :             Oid         subid = lfirst_oid(lc);
    5772             :             List       *workers;
    5773             :             ListCell   *lc2;
    5774             : 
    5775         396 :             workers = logicalrep_workers_find(subid, true, false);
    5776         514 :             foreach(lc2, workers)
    5777             :             {
    5778         118 :                 LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
    5779             : 
    5780         118 :                 logicalrep_worker_wakeup_ptr(worker);
    5781             :             }
    5782             :         }
    5783         396 :         LWLockRelease(LogicalRepWorkerLock);
    5784             :     }
    5785             : 
    5786             :     /* The List storage will be reclaimed automatically in xact cleanup. */
    5787     1061494 :     on_commit_wakeup_workers_subids = NIL;
    5788     1061494 : }
    5789             : 
    5790             : /*
    5791             :  * Allocate the origin name in long-lived context for error context message.
    5792             :  */
    5793             : void
    5794         790 : set_apply_error_context_origin(char *originname)
    5795             : {
    5796         790 :     apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
    5797             :                                                                originname);
    5798         790 : }
    5799             : 
    5800             : /*
    5801             :  * Return the action to be taken for the given transaction. See
    5802             :  * TransApplyAction for information on each of the actions.
    5803             :  *
    5804             :  * *winfo is assigned to the destination parallel worker info when the leader
    5805             :  * apply worker has to pass all the transaction's changes to the parallel
    5806             :  * apply worker.
    5807             :  */
    5808             : static TransApplyAction
    5809      652402 : get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
    5810             : {
    5811      652402 :     *winfo = NULL;
    5812             : 
    5813      652402 :     if (am_parallel_apply_worker())
    5814             :     {
    5815      137824 :         return TRANS_PARALLEL_APPLY;
    5816             :     }
    5817             : 
    5818             :     /*
    5819             :      * If we are processing this transaction using a parallel apply worker
    5820             :      * then either we send the changes to the parallel worker or if the worker
    5821             :      * is busy then serialize the changes to the file which will later be
    5822             :      * processed by the parallel worker.
    5823             :      */
    5824      514578 :     *winfo = pa_find_worker(xid);
    5825             : 
    5826      514578 :     if (*winfo && (*winfo)->serialize_changes)
    5827             :     {
    5828       10074 :         return TRANS_LEADER_PARTIAL_SERIALIZE;
    5829             :     }
    5830      504504 :     else if (*winfo)
    5831             :     {
    5832      137792 :         return TRANS_LEADER_SEND_TO_PARALLEL;
    5833             :     }
    5834             : 
    5835             :     /*
    5836             :      * If there is no parallel worker involved to process this transaction
    5837             :      * then we either directly apply the change or serialize it to a file
    5838             :      * which will later be applied when the transaction finish message is
    5839             :      * processed.
    5840             :      */
    5841      366712 :     else if (in_streamed_transaction)
    5842             :     {
    5843      206392 :         return TRANS_LEADER_SERIALIZE;
    5844             :     }
    5845             :     else
    5846             :     {
    5847      160320 :         return TRANS_LEADER_APPLY;
    5848             :     }
    5849             : }

Generated by: LCOV version 1.16