LCOV - code coverage report
Current view: top level - src/backend/replication/logical - worker.c (source / functions) Hit Total Coverage
Test: PostgreSQL 15devel Lines: 1074 1142 94.0 %
Date: 2021-12-09 04:09:06 Functions: 61 61 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * worker.c
       3             :  *     PostgreSQL logical replication worker (apply)
       4             :  *
       5             :  * Copyright (c) 2016-2021, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/worker.c
       9             :  *
      10             :  * NOTES
      11             :  *    This file contains the worker which applies logical changes as they come
      12             :  *    from remote logical replication stream.
      13             :  *
      14             :  *    The main worker (apply) is started by logical replication worker
      15             :  *    launcher for every enabled subscription in a database. It uses
      16             :  *    walsender protocol to communicate with publisher.
      17             :  *
      18             :  *    This module includes server facing code and shares libpqwalreceiver
      19             :  *    module with walreceiver for providing the libpq specific functionality.
      20             :  *
      21             :  *
      22             :  * STREAMED TRANSACTIONS
      23             :  * ---------------------
      24             :  * Streamed transactions (large transactions exceeding a memory limit on the
      25             :  * upstream) are not applied immediately, but instead, the data is written
      26             :  * to temporary files and then applied at once when the final commit arrives.
      27             :  *
      28             :  * Unlike the regular (non-streamed) case, handling streamed transactions has
      29             :  * to handle aborts of both the toplevel transaction and subtransactions. This
      30             :  * is achieved by tracking offsets for subtransactions, which is then used
      31             :  * to truncate the file with serialized changes.
      32             :  *
      33             :  * The files are placed in tmp file directory by default, and the filenames
      34             :  * include both the XID of the toplevel transaction and OID of the
      35             :  * subscription. This is necessary so that different workers processing a
      36             :  * remote transaction with the same XID doesn't interfere.
      37             :  *
      38             :  * We use BufFiles instead of using normal temporary files because (a) the
      39             :  * BufFile infrastructure supports temporary files that exceed the OS file size
      40             :  * limit, (b) provides a way for automatic clean up on the error and (c) provides
      41             :  * a way to survive these files across local transactions and allow to open and
      42             :  * close at stream start and close. We decided to use FileSet
      43             :  * infrastructure as without that it deletes the files on the closure of the
      44             :  * file and if we decide to keep stream files open across the start/stop stream
      45             :  * then it will consume a lot of memory (more than 8K for each BufFile and
      46             :  * there could be multiple such BufFiles as the subscriber could receive
      47             :  * multiple start/stop streams for different transactions before getting the
      48             :  * commit). Moreover, if we don't use FileSet then we also need to invent
      49             :  * a new way to pass filenames to BufFile APIs so that we are allowed to open
      50             :  * the file we desired across multiple stream-open calls for the same
      51             :  * transaction.
      52             :  *
      53             :  * TWO_PHASE TRANSACTIONS
      54             :  * ----------------------
      55             :  * Two phase transactions are replayed at prepare and then committed or
      56             :  * rolled back at commit prepared and rollback prepared respectively. It is
      57             :  * possible to have a prepared transaction that arrives at the apply worker
      58             :  * when the tablesync is busy doing the initial copy. In this case, the apply
      59             :  * worker skips all the prepared operations [e.g. inserts] while the tablesync
      60             :  * is still busy (see the condition of should_apply_changes_for_rel). The
      61             :  * tablesync worker might not get such a prepared transaction because say it
      62             :  * was prior to the initial consistent point but might have got some later
      63             :  * commits. Now, the tablesync worker will exit without doing anything for the
      64             :  * prepared transaction skipped by the apply worker as the sync location for it
      65             :  * will be already ahead of the apply worker's current location. This would lead
      66             :  * to an "empty prepare", because later when the apply worker does the commit
      67             :  * prepare, there is nothing in it (the inserts were skipped earlier).
      68             :  *
      69             :  * To avoid this, and similar prepare confusions the subscription's two_phase
      70             :  * commit is enabled only after the initial sync is over. The two_phase option
      71             :  * has been implemented as a tri-state with values DISABLED, PENDING, and
      72             :  * ENABLED.
      73             :  *
      74             :  * Even if the user specifies they want a subscription with two_phase = on,
      75             :  * internally it will start with a tri-state of PENDING which only becomes
      76             :  * ENABLED after all tablesync initializations are completed - i.e. when all
      77             :  * tablesync workers have reached their READY state. In other words, the value
      78             :  * PENDING is only a temporary state for subscription start-up.
      79             :  *
      80             :  * Until the two_phase is properly available (ENABLED) the subscription will
      81             :  * behave as if two_phase = off. When the apply worker detects that all
      82             :  * tablesyncs have become READY (while the tri-state was PENDING) it will
      83             :  * restart the apply worker process. This happens in
      84             :  * process_syncing_tables_for_apply.
      85             :  *
      86             :  * When the (re-started) apply worker finds that all tablesyncs are READY for a
      87             :  * two_phase tri-state of PENDING it start streaming messages with the
      88             :  * two_phase option which in turn enables the decoding of two-phase commits at
      89             :  * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
      90             :  * Now, it is possible that during the time we have not enabled two_phase, the
      91             :  * publisher (replication server) would have skipped some prepares but we
      92             :  * ensure that such prepares are sent along with commit prepare, see
      93             :  * ReorderBufferFinishPrepared.
      94             :  *
      95             :  * If the subscription has no tables then a two_phase tri-state PENDING is
      96             :  * left unchanged. This lets the user still do an ALTER TABLE REFRESH
      97             :  * PUBLICATION which might otherwise be disallowed (see below).
      98             :  *
      99             :  * If ever a user needs to be aware of the tri-state value, they can fetch it
     100             :  * from the pg_subscription catalog (see column subtwophasestate).
     101             :  *
     102             :  * We don't allow to toggle two_phase option of a subscription because it can
     103             :  * lead to an inconsistent replica. Consider, initially, it was on and we have
     104             :  * received some prepare then we turn it off, now at commit time the server
     105             :  * will send the entire transaction data along with the commit. With some more
     106             :  * analysis, we can allow changing this option from off to on but not sure if
     107             :  * that alone would be useful.
     108             :  *
     109             :  * Finally, to avoid problems mentioned in previous paragraphs from any
     110             :  * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
     111             :  * to 'off' and then again back to 'on') there is a restriction for
     112             :  * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
     113             :  * the two_phase tri-state is ENABLED, except when copy_data = false.
     114             :  *
     115             :  * We can get prepare of the same GID more than once for the genuine cases
     116             :  * where we have defined multiple subscriptions for publications on the same
     117             :  * server and prepared transaction has operations on tables subscribed to those
     118             :  * subscriptions. For such cases, if we use the GID sent by publisher one of
     119             :  * the prepares will be successful and others will fail, in which case the
     120             :  * server will send them again. Now, this can lead to a deadlock if user has
     121             :  * set synchronous_standby_names for all the subscriptions on subscriber. To
     122             :  * avoid such deadlocks, we generate a unique GID (consisting of the
     123             :  * subscription oid and the xid of the prepared transaction) for each prepare
     124             :  * transaction on the subscriber.
     125             :  *-------------------------------------------------------------------------
     126             :  */
     127             : 
     128             : #include "postgres.h"
     129             : 
     130             : #include <sys/stat.h>
     131             : #include <unistd.h>
     132             : 
     133             : #include "access/table.h"
     134             : #include "access/tableam.h"
     135             : #include "access/twophase.h"
     136             : #include "access/xact.h"
     137             : #include "access/xlog_internal.h"
     138             : #include "catalog/catalog.h"
     139             : #include "catalog/namespace.h"
     140             : #include "catalog/partition.h"
     141             : #include "catalog/pg_inherits.h"
     142             : #include "catalog/pg_subscription.h"
     143             : #include "catalog/pg_subscription_rel.h"
     144             : #include "catalog/pg_tablespace.h"
     145             : #include "commands/tablecmds.h"
     146             : #include "commands/tablespace.h"
     147             : #include "commands/trigger.h"
     148             : #include "executor/executor.h"
     149             : #include "executor/execPartition.h"
     150             : #include "executor/nodeModifyTable.h"
     151             : #include "funcapi.h"
     152             : #include "libpq/pqformat.h"
     153             : #include "libpq/pqsignal.h"
     154             : #include "mb/pg_wchar.h"
     155             : #include "miscadmin.h"
     156             : #include "nodes/makefuncs.h"
     157             : #include "optimizer/optimizer.h"
     158             : #include "pgstat.h"
     159             : #include "postmaster/bgworker.h"
     160             : #include "postmaster/interrupt.h"
     161             : #include "postmaster/postmaster.h"
     162             : #include "postmaster/walwriter.h"
     163             : #include "replication/decode.h"
     164             : #include "replication/logical.h"
     165             : #include "replication/logicalproto.h"
     166             : #include "replication/logicalrelation.h"
     167             : #include "replication/logicalworker.h"
     168             : #include "replication/origin.h"
     169             : #include "replication/reorderbuffer.h"
     170             : #include "replication/snapbuild.h"
     171             : #include "replication/walreceiver.h"
     172             : #include "replication/worker_internal.h"
     173             : #include "rewrite/rewriteHandler.h"
     174             : #include "storage/buffile.h"
     175             : #include "storage/bufmgr.h"
     176             : #include "storage/fd.h"
     177             : #include "storage/ipc.h"
     178             : #include "storage/lmgr.h"
     179             : #include "storage/proc.h"
     180             : #include "storage/procarray.h"
     181             : #include "tcop/tcopprot.h"
     182             : #include "utils/builtins.h"
     183             : #include "utils/catcache.h"
     184             : #include "utils/dynahash.h"
     185             : #include "utils/datum.h"
     186             : #include "utils/fmgroids.h"
     187             : #include "utils/guc.h"
     188             : #include "utils/inval.h"
     189             : #include "utils/lsyscache.h"
     190             : #include "utils/memutils.h"
     191             : #include "utils/rel.h"
     192             : #include "utils/syscache.h"
     193             : #include "utils/timeout.h"
     194             : 
     195             : #define NAPTIME_PER_CYCLE 1000  /* max sleep time between cycles (1s) */
     196             : 
     197             : typedef struct FlushPosition
     198             : {
     199             :     dlist_node  node;
     200             :     XLogRecPtr  local_end;
     201             :     XLogRecPtr  remote_end;
     202             : } FlushPosition;
     203             : 
     204             : static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
     205             : 
     206             : typedef struct ApplyExecutionData
     207             : {
     208             :     EState     *estate;         /* executor state, used to track resources */
     209             : 
     210             :     LogicalRepRelMapEntry *targetRel;   /* replication target rel */
     211             :     ResultRelInfo *targetRelInfo;   /* ResultRelInfo for same */
     212             : 
     213             :     /* These fields are used when the target relation is partitioned: */
     214             :     ModifyTableState *mtstate;  /* dummy ModifyTable state */
     215             :     PartitionTupleRouting *proute;  /* partition routing info */
     216             : } ApplyExecutionData;
     217             : 
     218             : /* Struct for saving and restoring apply errcontext information */
     219             : typedef struct ApplyErrorCallbackArg
     220             : {
     221             :     LogicalRepMsgType command;  /* 0 if invalid */
     222             :     LogicalRepRelMapEntry *rel;
     223             : 
     224             :     /* Remote node information */
     225             :     int         remote_attnum;  /* -1 if invalid */
     226             :     TransactionId remote_xid;
     227             :     TimestampTz ts;             /* commit, rollback, or prepare timestamp */
     228             : } ApplyErrorCallbackArg;
     229             : 
     230             : static ApplyErrorCallbackArg apply_error_callback_arg =
     231             : {
     232             :     .command = 0,
     233             :     .rel = NULL,
     234             :     .remote_attnum = -1,
     235             :     .remote_xid = InvalidTransactionId,
     236             :     .ts = 0,
     237             : };
     238             : 
     239             : static MemoryContext ApplyMessageContext = NULL;
     240             : MemoryContext ApplyContext = NULL;
     241             : 
     242             : /* per stream context for streaming transactions */
     243             : static MemoryContext LogicalStreamingContext = NULL;
     244             : 
     245             : WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
     246             : 
     247             : Subscription *MySubscription = NULL;
     248             : bool        MySubscriptionValid = false;
     249             : 
     250             : bool        in_remote_transaction = false;
     251             : static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
     252             : 
     253             : /* fields valid only when processing streamed transaction */
     254             : static bool in_streamed_transaction = false;
     255             : 
     256             : static TransactionId stream_xid = InvalidTransactionId;
     257             : 
     258             : /* BufFile handle of the current streaming file */
     259             : static BufFile *stream_fd = NULL;
     260             : 
     261             : typedef struct SubXactInfo
     262             : {
     263             :     TransactionId xid;          /* XID of the subxact */
     264             :     int         fileno;         /* file number in the buffile */
     265             :     off_t       offset;         /* offset in the file */
     266             : } SubXactInfo;
     267             : 
     268             : /* Sub-transaction data for the current streaming transaction */
     269             : typedef struct ApplySubXactData
     270             : {
     271             :     uint32      nsubxacts;      /* number of sub-transactions */
     272             :     uint32      nsubxacts_max;  /* current capacity of subxacts */
     273             :     TransactionId subxact_last; /* xid of the last sub-transaction */
     274             :     SubXactInfo *subxacts;      /* sub-xact offset in changes file */
     275             : } ApplySubXactData;
     276             : 
     277             : static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
     278             : 
     279             : static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
     280             : static inline void changes_filename(char *path, Oid subid, TransactionId xid);
     281             : 
     282             : /*
     283             :  * Information about subtransactions of a given toplevel transaction.
     284             :  */
     285             : static void subxact_info_write(Oid subid, TransactionId xid);
     286             : static void subxact_info_read(Oid subid, TransactionId xid);
     287             : static void subxact_info_add(TransactionId xid);
     288             : static inline void cleanup_subxact_info(void);
     289             : 
     290             : /*
     291             :  * Serialize and deserialize changes for a toplevel transaction.
     292             :  */
     293             : static void stream_cleanup_files(Oid subid, TransactionId xid);
     294             : static void stream_open_file(Oid subid, TransactionId xid, bool first);
     295             : static void stream_write_change(char action, StringInfo s);
     296             : static void stream_close_file(void);
     297             : 
     298             : static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
     299             : 
     300             : static void store_flush_position(XLogRecPtr remote_lsn);
     301             : 
     302             : static void maybe_reread_subscription(void);
     303             : 
     304             : /* prototype needed because of stream_commit */
     305             : static void apply_dispatch(StringInfo s);
     306             : 
     307             : static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
     308             : static void apply_handle_insert_internal(ApplyExecutionData *edata,
     309             :                                          ResultRelInfo *relinfo,
     310             :                                          TupleTableSlot *remoteslot);
     311             : static void apply_handle_update_internal(ApplyExecutionData *edata,
     312             :                                          ResultRelInfo *relinfo,
     313             :                                          TupleTableSlot *remoteslot,
     314             :                                          LogicalRepTupleData *newtup);
     315             : static void apply_handle_delete_internal(ApplyExecutionData *edata,
     316             :                                          ResultRelInfo *relinfo,
     317             :                                          TupleTableSlot *remoteslot);
     318             : static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
     319             :                                     LogicalRepRelation *remoterel,
     320             :                                     TupleTableSlot *remoteslot,
     321             :                                     TupleTableSlot **localslot);
     322             : static void apply_handle_tuple_routing(ApplyExecutionData *edata,
     323             :                                        TupleTableSlot *remoteslot,
     324             :                                        LogicalRepTupleData *newtup,
     325             :                                        CmdType operation);
     326             : 
     327             : /* Compute GID for two_phase transactions */
     328             : static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
     329             : 
     330             : /* Common streaming function to apply all the spooled messages */
     331             : static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
     332             : 
     333             : /* Functions for apply error callback */
     334             : static void apply_error_callback(void *arg);
     335             : static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz ts);
     336             : static inline void reset_apply_error_context_info(void);
     337             : 
     338             : /*
     339             :  * Should this worker apply changes for given relation.
     340             :  *
     341             :  * This is mainly needed for initial relation data sync as that runs in
     342             :  * separate worker process running in parallel and we need some way to skip
     343             :  * changes coming to the main apply worker during the sync of a table.
     344             :  *
     345             :  * Note we need to do smaller or equals comparison for SYNCDONE state because
     346             :  * it might hold position of end of initial slot consistent point WAL
     347             :  * record + 1 (ie start of next record) and next record can be COMMIT of
     348             :  * transaction we are now processing (which is what we set remote_final_lsn
     349             :  * to in apply_handle_begin).
     350             :  */
     351             : static bool
     352      216302 : should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
     353             : {
     354      216302 :     if (am_tablesync_worker())
     355           0 :         return MyLogicalRepWorker->relid == rel->localreloid;
     356             :     else
     357      216368 :         return (rel->state == SUBREL_STATE_READY ||
     358          66 :                 (rel->state == SUBREL_STATE_SYNCDONE &&
     359           0 :                  rel->statelsn <= remote_final_lsn));
     360             : }
     361             : 
     362             : /*
     363             :  * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
     364             :  *
     365             :  * Start a transaction, if this is the first step (else we keep using the
     366             :  * existing transaction).
     367             :  * Also provide a global snapshot and ensure we run in ApplyMessageContext.
     368             :  */
     369             : static void
     370      217074 : begin_replication_step(void)
     371             : {
     372      217074 :     SetCurrentStatementStartTimestamp();
     373             : 
     374      217074 :     if (!IsTransactionState())
     375             :     {
     376        1228 :         StartTransactionCommand();
     377        1228 :         maybe_reread_subscription();
     378             :     }
     379             : 
     380      217070 :     PushActiveSnapshot(GetTransactionSnapshot());
     381             : 
     382      217070 :     MemoryContextSwitchTo(ApplyMessageContext);
     383      217070 : }
     384             : 
     385             : /*
     386             :  * Finish up one step of a replication transaction.
     387             :  * Callers of begin_replication_step() must also call this.
     388             :  *
     389             :  * We don't close out the transaction here, but we should increment
     390             :  * the command counter to make the effects of this step visible.
     391             :  */
     392             : static void
     393      217058 : end_replication_step(void)
     394             : {
     395      217058 :     PopActiveSnapshot();
     396             : 
     397      217058 :     CommandCounterIncrement();
     398      217058 : }
     399             : 
     400             : /*
     401             :  * Handle streamed transactions.
     402             :  *
     403             :  * If in streaming mode (receiving a block of streamed transaction), we
     404             :  * simply redirect it to a file for the proper toplevel transaction.
     405             :  *
     406             :  * Returns true for streamed transactions, false otherwise (regular mode).
     407             :  */
     408             : static bool
     409      459234 : handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
     410             : {
     411             :     TransactionId xid;
     412             : 
     413             :     /* not in streaming mode */
     414      459234 :     if (!in_streamed_transaction)
     415      216662 :         return false;
     416             : 
     417             :     Assert(stream_fd != NULL);
     418             :     Assert(TransactionIdIsValid(stream_xid));
     419             : 
     420             :     /*
     421             :      * We should have received XID of the subxact as the first part of the
     422             :      * message, so extract it.
     423             :      */
     424      242572 :     xid = pq_getmsgint(s, 4);
     425             : 
     426      242572 :     if (!TransactionIdIsValid(xid))
     427           0 :         ereport(ERROR,
     428             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     429             :                  errmsg_internal("invalid transaction ID in streamed replication transaction")));
     430             : 
     431             :     /* Add the new subxact to the array (unless already there). */
     432      242572 :     subxact_info_add(xid);
     433             : 
     434             :     /* write the change to the current file */
     435      242572 :     stream_write_change(action, s);
     436             : 
     437      242572 :     return true;
     438             : }
     439             : 
     440             : /*
     441             :  * Executor state preparation for evaluation of constraint expressions,
     442             :  * indexes and triggers for the specified relation.
     443             :  *
     444             :  * Note that the caller must open and close any indexes to be updated.
     445             :  */
     446             : static ApplyExecutionData *
     447      216190 : create_edata_for_relation(LogicalRepRelMapEntry *rel)
     448             : {
     449             :     ApplyExecutionData *edata;
     450             :     EState     *estate;
     451             :     RangeTblEntry *rte;
     452             :     ResultRelInfo *resultRelInfo;
     453             : 
     454      216190 :     edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
     455      216190 :     edata->targetRel = rel;
     456             : 
     457      216190 :     edata->estate = estate = CreateExecutorState();
     458             : 
     459      216190 :     rte = makeNode(RangeTblEntry);
     460      216190 :     rte->rtekind = RTE_RELATION;
     461      216190 :     rte->relid = RelationGetRelid(rel->localrel);
     462      216190 :     rte->relkind = rel->localrel->rd_rel->relkind;
     463      216190 :     rte->rellockmode = AccessShareLock;
     464      216190 :     ExecInitRangeTable(estate, list_make1(rte));
     465             : 
     466      216190 :     edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
     467             : 
     468             :     /*
     469             :      * Use Relation opened by logicalrep_rel_open() instead of opening it
     470             :      * again.
     471             :      */
     472      216190 :     InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
     473             : 
     474             :     /*
     475             :      * We put the ResultRelInfo in the es_opened_result_relations list, even
     476             :      * though we don't populate the es_result_relations array.  That's a bit
     477             :      * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
     478             :      *
     479             :      * ExecOpenIndices() is not called here either, each execution path doing
     480             :      * an apply operation being responsible for that.
     481             :      */
     482      216190 :     estate->es_opened_result_relations =
     483      216190 :         lappend(estate->es_opened_result_relations, resultRelInfo);
     484             : 
     485      216190 :     estate->es_output_cid = GetCurrentCommandId(true);
     486             : 
     487             :     /* Prepare to catch AFTER triggers. */
     488      216190 :     AfterTriggerBeginQuery();
     489             : 
     490             :     /* other fields of edata remain NULL for now */
     491             : 
     492      216190 :     return edata;
     493             : }
     494             : 
     495             : /*
     496             :  * Finish any operations related to the executor state created by
     497             :  * create_edata_for_relation().
     498             :  */
     499             : static void
     500      216188 : finish_edata(ApplyExecutionData *edata)
     501             : {
     502      216188 :     EState     *estate = edata->estate;
     503             : 
     504             :     /* Handle any queued AFTER triggers. */
     505      216188 :     AfterTriggerEndQuery(estate);
     506             : 
     507             :     /* Shut down tuple routing, if any was done. */
     508      216188 :     if (edata->proute)
     509          86 :         ExecCleanupTupleRouting(edata->mtstate, edata->proute);
     510             : 
     511             :     /*
     512             :      * Cleanup.  It might seem that we should call ExecCloseResultRelations()
     513             :      * here, but we intentionally don't.  It would close the rel we added to
     514             :      * es_opened_result_relations above, which is wrong because we took no
     515             :      * corresponding refcount.  We rely on ExecCleanupTupleRouting() to close
     516             :      * any other relations opened during execution.
     517             :      */
     518      216188 :     ExecResetTupleTable(estate->es_tupleTable, false);
     519      216188 :     FreeExecutorState(estate);
     520      216188 :     pfree(edata);
     521      216188 : }
     522             : 
     523             : /*
     524             :  * Executes default values for columns for which we can't map to remote
     525             :  * relation columns.
     526             :  *
     527             :  * This allows us to support tables which have more columns on the downstream
     528             :  * than on the upstream.
     529             :  */
     530             : static void
     531      102946 : slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
     532             :                    TupleTableSlot *slot)
     533             : {
     534      102946 :     TupleDesc   desc = RelationGetDescr(rel->localrel);
     535      102946 :     int         num_phys_attrs = desc->natts;
     536             :     int         i;
     537             :     int         attnum,
     538      102946 :                 num_defaults = 0;
     539             :     int        *defmap;
     540             :     ExprState **defexprs;
     541             :     ExprContext *econtext;
     542             : 
     543      102946 :     econtext = GetPerTupleExprContext(estate);
     544             : 
     545             :     /* We got all the data via replication, no need to evaluate anything. */
     546      102946 :     if (num_phys_attrs == rel->remoterel.natts)
     547       10872 :         return;
     548             : 
     549       92074 :     defmap = (int *) palloc(num_phys_attrs * sizeof(int));
     550       92074 :     defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
     551             : 
     552             :     Assert(rel->attrmap->maplen == num_phys_attrs);
     553      487248 :     for (attnum = 0; attnum < num_phys_attrs; attnum++)
     554             :     {
     555             :         Expr       *defexpr;
     556             : 
     557      395174 :         if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
     558           4 :             continue;
     559             : 
     560      395170 :         if (rel->attrmap->attnums[attnum] >= 0)
     561      209136 :             continue;
     562             : 
     563      186034 :         defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
     564             : 
     565      186034 :         if (defexpr != NULL)
     566             :         {
     567             :             /* Run the expression through planner */
     568      150066 :             defexpr = expression_planner(defexpr);
     569             : 
     570             :             /* Initialize executable expression in copycontext */
     571      150066 :             defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
     572      150066 :             defmap[num_defaults] = attnum;
     573      150066 :             num_defaults++;
     574             :         }
     575             : 
     576             :     }
     577             : 
     578      242140 :     for (i = 0; i < num_defaults; i++)
     579      150066 :         slot->tts_values[defmap[i]] =
     580      150066 :             ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
     581             : }
     582             : 
     583             : /*
     584             :  * Store tuple data into slot.
     585             :  *
     586             :  * Incoming data can be either text or binary format.
     587             :  */
     588             : static void
     589      216190 : slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
     590             :                 LogicalRepTupleData *tupleData)
     591             : {
     592      216190 :     int         natts = slot->tts_tupleDescriptor->natts;
     593             :     int         i;
     594             : 
     595      216190 :     ExecClearTuple(slot);
     596             : 
     597             :     /* Call the "in" function for each non-dropped, non-null attribute */
     598             :     Assert(natts == rel->attrmap->maplen);
     599     1103984 :     for (i = 0; i < natts; i++)
     600             :     {
     601      887794 :         Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
     602      887794 :         int         remoteattnum = rel->attrmap->attnums[i];
     603             : 
     604      887794 :         if (!att->attisdropped && remoteattnum >= 0)
     605      506614 :         {
     606      506614 :             StringInfo  colvalue = &tupleData->colvalues[remoteattnum];
     607             : 
     608             :             Assert(remoteattnum < tupleData->ncols);
     609             : 
     610             :             /* Set attnum for error callback */
     611      506614 :             apply_error_callback_arg.remote_attnum = remoteattnum;
     612             : 
     613      506614 :             if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
     614             :             {
     615             :                 Oid         typinput;
     616             :                 Oid         typioparam;
     617             : 
     618      364382 :                 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
     619      728764 :                 slot->tts_values[i] =
     620      364382 :                     OidInputFunctionCall(typinput, colvalue->data,
     621             :                                          typioparam, att->atttypmod);
     622      364382 :                 slot->tts_isnull[i] = false;
     623             :             }
     624      142232 :             else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
     625             :             {
     626             :                 Oid         typreceive;
     627             :                 Oid         typioparam;
     628             : 
     629             :                 /*
     630             :                  * In some code paths we may be asked to re-parse the same
     631             :                  * tuple data.  Reset the StringInfo's cursor so that works.
     632             :                  */
     633       66738 :                 colvalue->cursor = 0;
     634             : 
     635       66738 :                 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
     636      133476 :                 slot->tts_values[i] =
     637       66738 :                     OidReceiveFunctionCall(typreceive, colvalue,
     638             :                                            typioparam, att->atttypmod);
     639             : 
     640             :                 /* Trouble if it didn't eat the whole buffer */
     641       66738 :                 if (colvalue->cursor != colvalue->len)
     642           0 :                     ereport(ERROR,
     643             :                             (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
     644             :                              errmsg("incorrect binary data format in logical replication column %d",
     645             :                                     remoteattnum + 1)));
     646       66738 :                 slot->tts_isnull[i] = false;
     647             :             }
     648             :             else
     649             :             {
     650             :                 /*
     651             :                  * NULL value from remote.  (We don't expect to see
     652             :                  * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
     653             :                  * NULL.)
     654             :                  */
     655       75494 :                 slot->tts_values[i] = (Datum) 0;
     656       75494 :                 slot->tts_isnull[i] = true;
     657             :             }
     658             : 
     659             :             /* Reset attnum for error callback */
     660      506614 :             apply_error_callback_arg.remote_attnum = -1;
     661             :         }
     662             :         else
     663             :         {
     664             :             /*
     665             :              * We assign NULL to dropped attributes and missing values
     666             :              * (missing values should be later filled using
     667             :              * slot_fill_defaults).
     668             :              */
     669      381180 :             slot->tts_values[i] = (Datum) 0;
     670      381180 :             slot->tts_isnull[i] = true;
     671             :         }
     672             :     }
     673             : 
     674      216190 :     ExecStoreVirtualTuple(slot);
     675      216190 : }
     676             : 
     677             : /*
     678             :  * Replace updated columns with data from the LogicalRepTupleData struct.
     679             :  * This is somewhat similar to heap_modify_tuple but also calls the type
     680             :  * input functions on the user data.
     681             :  *
     682             :  * "slot" is filled with a copy of the tuple in "srcslot", replacing
     683             :  * columns provided in "tupleData" and leaving others as-is.
     684             :  *
     685             :  * Caution: unreplaced pass-by-ref columns in "slot" will point into the
     686             :  * storage for "srcslot".  This is OK for current usage, but someday we may
     687             :  * need to materialize "slot" at the end to make it independent of "srcslot".
     688             :  */
     689             : static void
     690       57782 : slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
     691             :                  LogicalRepRelMapEntry *rel,
     692             :                  LogicalRepTupleData *tupleData)
     693             : {
     694       57782 :     int         natts = slot->tts_tupleDescriptor->natts;
     695             :     int         i;
     696             : 
     697             :     /* We'll fill "slot" with a virtual tuple, so we must start with ... */
     698       57782 :     ExecClearTuple(slot);
     699             : 
     700             :     /*
     701             :      * Copy all the column data from srcslot, so that we'll have valid values
     702             :      * for unreplaced columns.
     703             :      */
     704             :     Assert(natts == srcslot->tts_tupleDescriptor->natts);
     705       57782 :     slot_getallattrs(srcslot);
     706       57782 :     memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
     707       57782 :     memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
     708             : 
     709             :     /* Call the "in" function for each replaced attribute */
     710             :     Assert(natts == rel->attrmap->maplen);
     711      288390 :     for (i = 0; i < natts; i++)
     712             :     {
     713      230608 :         Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
     714      230608 :         int         remoteattnum = rel->attrmap->attnums[i];
     715             : 
     716      230608 :         if (remoteattnum < 0)
     717      105082 :             continue;
     718             : 
     719             :         Assert(remoteattnum < tupleData->ncols);
     720             : 
     721      125526 :         if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
     722             :         {
     723      125520 :             StringInfo  colvalue = &tupleData->colvalues[remoteattnum];
     724             : 
     725             :             /* Set attnum for error callback */
     726      125520 :             apply_error_callback_arg.remote_attnum = remoteattnum;
     727             : 
     728      125520 :             if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
     729             :             {
     730             :                 Oid         typinput;
     731             :                 Oid         typioparam;
     732             : 
     733       82074 :                 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
     734      164148 :                 slot->tts_values[i] =
     735       82074 :                     OidInputFunctionCall(typinput, colvalue->data,
     736             :                                          typioparam, att->atttypmod);
     737       82074 :                 slot->tts_isnull[i] = false;
     738             :             }
     739       43446 :             else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
     740             :             {
     741             :                 Oid         typreceive;
     742             :                 Oid         typioparam;
     743             : 
     744             :                 /*
     745             :                  * In some code paths we may be asked to re-parse the same
     746             :                  * tuple data.  Reset the StringInfo's cursor so that works.
     747             :                  */
     748       43362 :                 colvalue->cursor = 0;
     749             : 
     750       43362 :                 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
     751       86724 :                 slot->tts_values[i] =
     752       43362 :                     OidReceiveFunctionCall(typreceive, colvalue,
     753             :                                            typioparam, att->atttypmod);
     754             : 
     755             :                 /* Trouble if it didn't eat the whole buffer */
     756       43362 :                 if (colvalue->cursor != colvalue->len)
     757           0 :                     ereport(ERROR,
     758             :                             (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
     759             :                              errmsg("incorrect binary data format in logical replication column %d",
     760             :                                     remoteattnum + 1)));
     761       43362 :                 slot->tts_isnull[i] = false;
     762             :             }
     763             :             else
     764             :             {
     765             :                 /* must be LOGICALREP_COLUMN_NULL */
     766          84 :                 slot->tts_values[i] = (Datum) 0;
     767          84 :                 slot->tts_isnull[i] = true;
     768             :             }
     769             : 
     770             :             /* Reset attnum for error callback */
     771      125520 :             apply_error_callback_arg.remote_attnum = -1;
     772             :         }
     773             :     }
     774             : 
     775             :     /* And finally, declare that "slot" contains a valid virtual tuple */
     776       57782 :     ExecStoreVirtualTuple(slot);
     777       57782 : }
     778             : 
     779             : /*
     780             :  * Handle BEGIN message.
     781             :  */
     782             : static void
     783         670 : apply_handle_begin(StringInfo s)
     784             : {
     785             :     LogicalRepBeginData begin_data;
     786             : 
     787         670 :     logicalrep_read_begin(s, &begin_data);
     788         670 :     set_apply_error_context_xact(begin_data.xid, begin_data.committime);
     789             : 
     790         670 :     remote_final_lsn = begin_data.final_lsn;
     791             : 
     792         670 :     in_remote_transaction = true;
     793             : 
     794         670 :     pgstat_report_activity(STATE_RUNNING, NULL);
     795         670 : }
     796             : 
     797             : /*
     798             :  * Handle COMMIT message.
     799             :  *
     800             :  * TODO, support tracking of multiple origins
     801             :  */
     802             : static void
     803         650 : apply_handle_commit(StringInfo s)
     804             : {
     805             :     LogicalRepCommitData commit_data;
     806             : 
     807         650 :     logicalrep_read_commit(s, &commit_data);
     808             : 
     809         650 :     if (commit_data.commit_lsn != remote_final_lsn)
     810           0 :         ereport(ERROR,
     811             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     812             :                  errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
     813             :                                  LSN_FORMAT_ARGS(commit_data.commit_lsn),
     814             :                                  LSN_FORMAT_ARGS(remote_final_lsn))));
     815             : 
     816         650 :     apply_handle_commit_internal(&commit_data);
     817             : 
     818             :     /* Process any tables that are being synchronized in parallel. */
     819         650 :     process_syncing_tables(commit_data.end_lsn);
     820             : 
     821         650 :     pgstat_report_activity(STATE_IDLE, NULL);
     822         650 :     reset_apply_error_context_info();
     823         650 : }
     824             : 
     825             : /*
     826             :  * Handle BEGIN PREPARE message.
     827             :  */
     828             : static void
     829          24 : apply_handle_begin_prepare(StringInfo s)
     830             : {
     831             :     LogicalRepPreparedTxnData begin_data;
     832             : 
     833             :     /* Tablesync should never receive prepare. */
     834          24 :     if (am_tablesync_worker())
     835           0 :         ereport(ERROR,
     836             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     837             :                  errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
     838             : 
     839          24 :     logicalrep_read_begin_prepare(s, &begin_data);
     840          24 :     set_apply_error_context_xact(begin_data.xid, begin_data.prepare_time);
     841             : 
     842          24 :     remote_final_lsn = begin_data.prepare_lsn;
     843             : 
     844          24 :     in_remote_transaction = true;
     845             : 
     846          24 :     pgstat_report_activity(STATE_RUNNING, NULL);
     847          24 : }
     848             : 
     849             : /*
     850             :  * Common function to prepare the GID.
     851             :  */
     852             : static void
     853          36 : apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
     854             : {
     855             :     char        gid[GIDSIZE];
     856             : 
     857             :     /*
     858             :      * Compute unique GID for two_phase transactions. We don't use GID of
     859             :      * prepared transaction sent by server as that can lead to deadlock when
     860             :      * we have multiple subscriptions from same node point to publications on
     861             :      * the same node. See comments atop worker.c
     862             :      */
     863          36 :     TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
     864             :                            gid, sizeof(gid));
     865             : 
     866             :     /*
     867             :      * BeginTransactionBlock is necessary to balance the EndTransactionBlock
     868             :      * called within the PrepareTransactionBlock below.
     869             :      */
     870          36 :     BeginTransactionBlock();
     871          36 :     CommitTransactionCommand(); /* Completes the preceding Begin command. */
     872             : 
     873             :     /*
     874             :      * Update origin state so we can restart streaming from correct position
     875             :      * in case of crash.
     876             :      */
     877          36 :     replorigin_session_origin_lsn = prepare_data->end_lsn;
     878          36 :     replorigin_session_origin_timestamp = prepare_data->prepare_time;
     879             : 
     880          36 :     PrepareTransactionBlock(gid);
     881          36 : }
     882             : 
     883             : /*
     884             :  * Handle PREPARE message.
     885             :  */
     886             : static void
     887          24 : apply_handle_prepare(StringInfo s)
     888             : {
     889             :     LogicalRepPreparedTxnData prepare_data;
     890             : 
     891          24 :     logicalrep_read_prepare(s, &prepare_data);
     892             : 
     893          24 :     if (prepare_data.prepare_lsn != remote_final_lsn)
     894           0 :         ereport(ERROR,
     895             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     896             :                  errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
     897             :                                  LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
     898             :                                  LSN_FORMAT_ARGS(remote_final_lsn))));
     899             : 
     900             :     /*
     901             :      * Unlike commit, here, we always prepare the transaction even though no
     902             :      * change has happened in this transaction. It is done this way because at
     903             :      * commit prepared time, we won't know whether we have skipped preparing a
     904             :      * transaction because of no change.
     905             :      *
     906             :      * XXX, We can optimize such that at commit prepared time, we first check
     907             :      * whether we have prepared the transaction or not but that doesn't seem
     908             :      * worthwhile because such cases shouldn't be common.
     909             :      */
     910          24 :     begin_replication_step();
     911             : 
     912          24 :     apply_handle_prepare_internal(&prepare_data);
     913             : 
     914          24 :     end_replication_step();
     915          24 :     CommitTransactionCommand();
     916          24 :     pgstat_report_stat(false);
     917             : 
     918          24 :     store_flush_position(prepare_data.end_lsn);
     919             : 
     920          24 :     in_remote_transaction = false;
     921             : 
     922             :     /* Process any tables that are being synchronized in parallel. */
     923          24 :     process_syncing_tables(prepare_data.end_lsn);
     924             : 
     925          24 :     pgstat_report_activity(STATE_IDLE, NULL);
     926          24 :     reset_apply_error_context_info();
     927          24 : }
     928             : 
     929             : /*
     930             :  * Handle a COMMIT PREPARED of a previously PREPARED transaction.
     931             :  */
     932             : static void
     933          30 : apply_handle_commit_prepared(StringInfo s)
     934             : {
     935             :     LogicalRepCommitPreparedTxnData prepare_data;
     936             :     char        gid[GIDSIZE];
     937             : 
     938          30 :     logicalrep_read_commit_prepared(s, &prepare_data);
     939          30 :     set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_time);
     940             : 
     941             :     /* Compute GID for two_phase transactions. */
     942          30 :     TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
     943             :                            gid, sizeof(gid));
     944             : 
     945             :     /* There is no transaction when COMMIT PREPARED is called */
     946          30 :     begin_replication_step();
     947             : 
     948             :     /*
     949             :      * Update origin state so we can restart streaming from correct position
     950             :      * in case of crash.
     951             :      */
     952          30 :     replorigin_session_origin_lsn = prepare_data.end_lsn;
     953          30 :     replorigin_session_origin_timestamp = prepare_data.commit_time;
     954             : 
     955          30 :     FinishPreparedTransaction(gid, true);
     956          30 :     end_replication_step();
     957          30 :     CommitTransactionCommand();
     958          30 :     pgstat_report_stat(false);
     959             : 
     960          30 :     store_flush_position(prepare_data.end_lsn);
     961          30 :     in_remote_transaction = false;
     962             : 
     963             :     /* Process any tables that are being synchronized in parallel. */
     964          30 :     process_syncing_tables(prepare_data.end_lsn);
     965             : 
     966          30 :     pgstat_report_activity(STATE_IDLE, NULL);
     967          30 :     reset_apply_error_context_info();
     968          30 : }
     969             : 
     970             : /*
     971             :  * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
     972             :  */
     973             : static void
     974           8 : apply_handle_rollback_prepared(StringInfo s)
     975             : {
     976             :     LogicalRepRollbackPreparedTxnData rollback_data;
     977             :     char        gid[GIDSIZE];
     978             : 
     979           8 :     logicalrep_read_rollback_prepared(s, &rollback_data);
     980           8 :     set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_time);
     981             : 
     982             :     /* Compute GID for two_phase transactions. */
     983           8 :     TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
     984             :                            gid, sizeof(gid));
     985             : 
     986             :     /*
     987             :      * It is possible that we haven't received prepare because it occurred
     988             :      * before walsender reached a consistent point or the two_phase was still
     989             :      * not enabled by that time, so in such cases, we need to skip rollback
     990             :      * prepared.
     991             :      */
     992           8 :     if (LookupGXact(gid, rollback_data.prepare_end_lsn,
     993             :                     rollback_data.prepare_time))
     994             :     {
     995             :         /*
     996             :          * Update origin state so we can restart streaming from correct
     997             :          * position in case of crash.
     998             :          */
     999           8 :         replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
    1000           8 :         replorigin_session_origin_timestamp = rollback_data.rollback_time;
    1001             : 
    1002             :         /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
    1003           8 :         begin_replication_step();
    1004           8 :         FinishPreparedTransaction(gid, false);
    1005           8 :         end_replication_step();
    1006           8 :         CommitTransactionCommand();
    1007             :     }
    1008             : 
    1009           8 :     pgstat_report_stat(false);
    1010             : 
    1011           8 :     store_flush_position(rollback_data.rollback_end_lsn);
    1012           8 :     in_remote_transaction = false;
    1013             : 
    1014             :     /* Process any tables that are being synchronized in parallel. */
    1015           8 :     process_syncing_tables(rollback_data.rollback_end_lsn);
    1016             : 
    1017           8 :     pgstat_report_activity(STATE_IDLE, NULL);
    1018           8 :     reset_apply_error_context_info();
    1019           8 : }
    1020             : 
    1021             : /*
    1022             :  * Handle STREAM PREPARE.
    1023             :  *
    1024             :  * Logic is in two parts:
    1025             :  * 1. Replay all the spooled operations
    1026             :  * 2. Mark the transaction as prepared
    1027             :  */
    1028             : static void
    1029          12 : apply_handle_stream_prepare(StringInfo s)
    1030             : {
    1031             :     LogicalRepPreparedTxnData prepare_data;
    1032             : 
    1033          12 :     if (in_streamed_transaction)
    1034           0 :         ereport(ERROR,
    1035             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1036             :                  errmsg_internal("STREAM PREPARE message without STREAM STOP")));
    1037             : 
    1038             :     /* Tablesync should never receive prepare. */
    1039          12 :     if (am_tablesync_worker())
    1040           0 :         ereport(ERROR,
    1041             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1042             :                  errmsg_internal("tablesync worker received a STREAM PREPARE message")));
    1043             : 
    1044          12 :     logicalrep_read_stream_prepare(s, &prepare_data);
    1045          12 :     set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_time);
    1046             : 
    1047          12 :     elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
    1048             : 
    1049             :     /* Replay all the spooled operations. */
    1050          12 :     apply_spooled_messages(prepare_data.xid, prepare_data.prepare_lsn);
    1051             : 
    1052             :     /* Mark the transaction as prepared. */
    1053          12 :     apply_handle_prepare_internal(&prepare_data);
    1054             : 
    1055          12 :     CommitTransactionCommand();
    1056             : 
    1057          12 :     pgstat_report_stat(false);
    1058             : 
    1059          12 :     store_flush_position(prepare_data.end_lsn);
    1060             : 
    1061          12 :     in_remote_transaction = false;
    1062             : 
    1063             :     /* unlink the files with serialized changes and subxact info. */
    1064          12 :     stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
    1065             : 
    1066             :     /* Process any tables that are being synchronized in parallel. */
    1067          12 :     process_syncing_tables(prepare_data.end_lsn);
    1068             : 
    1069          12 :     pgstat_report_activity(STATE_IDLE, NULL);
    1070             : 
    1071          12 :     reset_apply_error_context_info();
    1072          12 : }
    1073             : 
    1074             : /*
    1075             :  * Handle ORIGIN message.
    1076             :  *
    1077             :  * TODO, support tracking of multiple origins
    1078             :  */
    1079             : static void
    1080          22 : apply_handle_origin(StringInfo s)
    1081             : {
    1082             :     /*
    1083             :      * ORIGIN message can only come inside streaming transaction or inside
    1084             :      * remote transaction and before any actual writes.
    1085             :      */
    1086          22 :     if (!in_streamed_transaction &&
    1087          36 :         (!in_remote_transaction ||
    1088          18 :          (IsTransactionState() && !am_tablesync_worker())))
    1089           0 :         ereport(ERROR,
    1090             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1091             :                  errmsg_internal("ORIGIN message sent out of order")));
    1092          22 : }
    1093             : 
    1094             : /*
    1095             :  * Handle STREAM START message.
    1096             :  */
    1097             : static void
    1098         652 : apply_handle_stream_start(StringInfo s)
    1099             : {
    1100             :     bool        first_segment;
    1101             : 
    1102         652 :     if (in_streamed_transaction)
    1103           0 :         ereport(ERROR,
    1104             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1105             :                  errmsg_internal("duplicate STREAM START message")));
    1106             : 
    1107             :     /*
    1108             :      * Start a transaction on stream start, this transaction will be committed
    1109             :      * on the stream stop unless it is a tablesync worker in which case it
    1110             :      * will be committed after processing all the messages. We need the
    1111             :      * transaction for handling the buffile, used for serializing the
    1112             :      * streaming data and subxact info.
    1113             :      */
    1114         652 :     begin_replication_step();
    1115             : 
    1116             :     /* notify handle methods we're processing a remote transaction */
    1117         652 :     in_streamed_transaction = true;
    1118             : 
    1119             :     /* extract XID of the top-level transaction */
    1120         652 :     stream_xid = logicalrep_read_stream_start(s, &first_segment);
    1121             : 
    1122         652 :     if (!TransactionIdIsValid(stream_xid))
    1123           0 :         ereport(ERROR,
    1124             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1125             :                  errmsg_internal("invalid transaction ID in streamed replication transaction")));
    1126             : 
    1127         652 :     set_apply_error_context_xact(stream_xid, 0);
    1128             : 
    1129             :     /*
    1130             :      * Initialize the worker's stream_fileset if we haven't yet. This will be
    1131             :      * used for the entire duration of the worker so create it in a permanent
    1132             :      * context. We create this on the very first streaming message from any
    1133             :      * transaction and then use it for this and other streaming transactions.
    1134             :      * Now, we could create a fileset at the start of the worker as well but
    1135             :      * then we won't be sure that it will ever be used.
    1136             :      */
    1137         652 :     if (MyLogicalRepWorker->stream_fileset == NULL)
    1138             :     {
    1139             :         MemoryContext oldctx;
    1140             : 
    1141          18 :         oldctx = MemoryContextSwitchTo(ApplyContext);
    1142             : 
    1143          18 :         MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
    1144          18 :         FileSetInit(MyLogicalRepWorker->stream_fileset);
    1145             : 
    1146          18 :         MemoryContextSwitchTo(oldctx);
    1147             :     }
    1148             : 
    1149             :     /* open the spool file for this transaction */
    1150         652 :     stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment);
    1151             : 
    1152             :     /* if this is not the first segment, open existing subxact file */
    1153         652 :     if (!first_segment)
    1154         612 :         subxact_info_read(MyLogicalRepWorker->subid, stream_xid);
    1155             : 
    1156         652 :     pgstat_report_activity(STATE_RUNNING, NULL);
    1157             : 
    1158         652 :     end_replication_step();
    1159         652 : }
    1160             : 
    1161             : /*
    1162             :  * Handle STREAM STOP message.
    1163             :  */
    1164             : static void
    1165         652 : apply_handle_stream_stop(StringInfo s)
    1166             : {
    1167         652 :     if (!in_streamed_transaction)
    1168           0 :         ereport(ERROR,
    1169             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1170             :                  errmsg_internal("STREAM STOP message without STREAM START")));
    1171             : 
    1172             :     /*
    1173             :      * Close the file with serialized changes, and serialize information about
    1174             :      * subxacts for the toplevel transaction.
    1175             :      */
    1176         652 :     subxact_info_write(MyLogicalRepWorker->subid, stream_xid);
    1177         652 :     stream_close_file();
    1178             : 
    1179             :     /* We must be in a valid transaction state */
    1180             :     Assert(IsTransactionState());
    1181             : 
    1182             :     /* Commit the per-stream transaction */
    1183         652 :     CommitTransactionCommand();
    1184             : 
    1185         652 :     in_streamed_transaction = false;
    1186             : 
    1187             :     /* Reset per-stream context */
    1188         652 :     MemoryContextReset(LogicalStreamingContext);
    1189             : 
    1190         652 :     pgstat_report_activity(STATE_IDLE, NULL);
    1191         652 :     reset_apply_error_context_info();
    1192         652 : }
    1193             : 
    1194             : /*
    1195             :  * Handle STREAM abort message.
    1196             :  */
    1197             : static void
    1198          22 : apply_handle_stream_abort(StringInfo s)
    1199             : {
    1200             :     TransactionId xid;
    1201             :     TransactionId subxid;
    1202             : 
    1203          22 :     if (in_streamed_transaction)
    1204           0 :         ereport(ERROR,
    1205             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1206             :                  errmsg_internal("STREAM ABORT message without STREAM STOP")));
    1207             : 
    1208          22 :     logicalrep_read_stream_abort(s, &xid, &subxid);
    1209             : 
    1210             :     /*
    1211             :      * If the two XIDs are the same, it's in fact abort of toplevel xact, so
    1212             :      * just delete the files with serialized info.
    1213             :      */
    1214          22 :     if (xid == subxid)
    1215             :     {
    1216           0 :         set_apply_error_context_xact(xid, 0);
    1217           0 :         stream_cleanup_files(MyLogicalRepWorker->subid, xid);
    1218             :     }
    1219             :     else
    1220             :     {
    1221             :         /*
    1222             :          * OK, so it's a subxact. We need to read the subxact file for the
    1223             :          * toplevel transaction, determine the offset tracked for the subxact,
    1224             :          * and truncate the file with changes. We also remove the subxacts
    1225             :          * with higher offsets (or rather higher XIDs).
    1226             :          *
    1227             :          * We intentionally scan the array from the tail, because we're likely
    1228             :          * aborting a change for the most recent subtransactions.
    1229             :          *
    1230             :          * We can't use the binary search here as subxact XIDs won't
    1231             :          * necessarily arrive in sorted order, consider the case where we have
    1232             :          * released the savepoint for multiple subtransactions and then
    1233             :          * performed rollback to savepoint for one of the earlier
    1234             :          * sub-transaction.
    1235             :          */
    1236             :         int64       i;
    1237             :         int64       subidx;
    1238             :         BufFile    *fd;
    1239          22 :         bool        found = false;
    1240             :         char        path[MAXPGPATH];
    1241             : 
    1242          22 :         set_apply_error_context_xact(subxid, 0);
    1243             : 
    1244          22 :         subidx = -1;
    1245          22 :         begin_replication_step();
    1246          22 :         subxact_info_read(MyLogicalRepWorker->subid, xid);
    1247             : 
    1248          26 :         for (i = subxact_data.nsubxacts; i > 0; i--)
    1249             :         {
    1250          18 :             if (subxact_data.subxacts[i - 1].xid == subxid)
    1251             :             {
    1252          14 :                 subidx = (i - 1);
    1253          14 :                 found = true;
    1254          14 :                 break;
    1255             :             }
    1256             :         }
    1257             : 
    1258             :         /*
    1259             :          * If it's an empty sub-transaction then we will not find the subxid
    1260             :          * here so just cleanup the subxact info and return.
    1261             :          */
    1262          22 :         if (!found)
    1263             :         {
    1264             :             /* Cleanup the subxact info */
    1265           8 :             cleanup_subxact_info();
    1266           8 :             end_replication_step();
    1267           8 :             CommitTransactionCommand();
    1268           8 :             reset_apply_error_context_info();
    1269           8 :             return;
    1270             :         }
    1271             : 
    1272             :         /* open the changes file */
    1273          14 :         changes_filename(path, MyLogicalRepWorker->subid, xid);
    1274          14 :         fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
    1275             :                                 O_RDWR, false);
    1276             : 
    1277             :         /* OK, truncate the file at the right offset */
    1278          14 :         BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
    1279          14 :                                subxact_data.subxacts[subidx].offset);
    1280          14 :         BufFileClose(fd);
    1281             : 
    1282             :         /* discard the subxacts added later */
    1283          14 :         subxact_data.nsubxacts = subidx;
    1284             : 
    1285             :         /* write the updated subxact list */
    1286          14 :         subxact_info_write(MyLogicalRepWorker->subid, xid);
    1287             : 
    1288          14 :         end_replication_step();
    1289          14 :         CommitTransactionCommand();
    1290             :     }
    1291             : 
    1292          14 :     reset_apply_error_context_info();
    1293             : }
    1294             : 
    1295             : /*
    1296             :  * Common spoolfile processing.
    1297             :  */
    1298             : static void
    1299          38 : apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
    1300             : {
    1301             :     StringInfoData s2;
    1302             :     int         nchanges;
    1303             :     char        path[MAXPGPATH];
    1304          38 :     char       *buffer = NULL;
    1305             :     MemoryContext oldcxt;
    1306             :     BufFile    *fd;
    1307             : 
    1308             :     /* Make sure we have an open transaction */
    1309          38 :     begin_replication_step();
    1310             : 
    1311             :     /*
    1312             :      * Allocate file handle and memory required to process all the messages in
    1313             :      * TopTransactionContext to avoid them getting reset after each message is
    1314             :      * processed.
    1315             :      */
    1316          38 :     oldcxt = MemoryContextSwitchTo(TopTransactionContext);
    1317             : 
    1318             :     /* Open the spool file for the committed/prepared transaction */
    1319          38 :     changes_filename(path, MyLogicalRepWorker->subid, xid);
    1320          38 :     elog(DEBUG1, "replaying changes from file \"%s\"", path);
    1321             : 
    1322          38 :     fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
    1323             :                             false);
    1324             : 
    1325          38 :     buffer = palloc(BLCKSZ);
    1326          38 :     initStringInfo(&s2);
    1327             : 
    1328          38 :     MemoryContextSwitchTo(oldcxt);
    1329             : 
    1330          38 :     remote_final_lsn = lsn;
    1331             : 
    1332             :     /*
    1333             :      * Make sure the handle apply_dispatch methods are aware we're in a remote
    1334             :      * transaction.
    1335             :      */
    1336          38 :     in_remote_transaction = true;
    1337          38 :     pgstat_report_activity(STATE_RUNNING, NULL);
    1338             : 
    1339          38 :     end_replication_step();
    1340             : 
    1341             :     /*
    1342             :      * Read the entries one by one and pass them through the same logic as in
    1343             :      * apply_dispatch.
    1344             :      */
    1345          38 :     nchanges = 0;
    1346             :     while (true)
    1347      214490 :     {
    1348             :         int         nbytes;
    1349             :         int         len;
    1350             : 
    1351      214528 :         CHECK_FOR_INTERRUPTS();
    1352             : 
    1353             :         /* read length of the on-disk record */
    1354      214528 :         nbytes = BufFileRead(fd, &len, sizeof(len));
    1355             : 
    1356             :         /* have we reached end of the file? */
    1357      214528 :         if (nbytes == 0)
    1358          38 :             break;
    1359             : 
    1360             :         /* do we have a correct length? */
    1361      214490 :         if (nbytes != sizeof(len))
    1362           0 :             ereport(ERROR,
    1363             :                     (errcode_for_file_access(),
    1364             :                      errmsg("could not read from streaming transaction's changes file \"%s\": %m",
    1365             :                             path)));
    1366             : 
    1367      214490 :         if (len <= 0)
    1368           0 :             elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
    1369             :                  len, path);
    1370             : 
    1371             :         /* make sure we have sufficiently large buffer */
    1372      214490 :         buffer = repalloc(buffer, len);
    1373             : 
    1374             :         /* and finally read the data into the buffer */
    1375      214490 :         if (BufFileRead(fd, buffer, len) != len)
    1376           0 :             ereport(ERROR,
    1377             :                     (errcode_for_file_access(),
    1378             :                      errmsg("could not read from streaming transaction's changes file \"%s\": %m",
    1379             :                             path)));
    1380             : 
    1381             :         /* copy the buffer to the stringinfo and call apply_dispatch */
    1382      214490 :         resetStringInfo(&s2);
    1383      214490 :         appendBinaryStringInfo(&s2, buffer, len);
    1384             : 
    1385             :         /* Ensure we are reading the data into our memory context. */
    1386      214490 :         oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
    1387             : 
    1388      214490 :         apply_dispatch(&s2);
    1389             : 
    1390      214490 :         MemoryContextReset(ApplyMessageContext);
    1391             : 
    1392      214490 :         MemoryContextSwitchTo(oldcxt);
    1393             : 
    1394      214490 :         nchanges++;
    1395             : 
    1396      214490 :         if (nchanges % 1000 == 0)
    1397         202 :             elog(DEBUG1, "replayed %d changes from file \"%s\"",
    1398             :                  nchanges, path);
    1399             :     }
    1400             : 
    1401          38 :     BufFileClose(fd);
    1402             : 
    1403          38 :     pfree(buffer);
    1404          38 :     pfree(s2.data);
    1405             : 
    1406          38 :     elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
    1407             :          nchanges, path);
    1408             : 
    1409          38 :     return;
    1410             : }
    1411             : 
    1412             : /*
    1413             :  * Handle STREAM COMMIT message.
    1414             :  */
    1415             : static void
    1416          26 : apply_handle_stream_commit(StringInfo s)
    1417             : {
    1418             :     TransactionId xid;
    1419             :     LogicalRepCommitData commit_data;
    1420             : 
    1421          26 :     if (in_streamed_transaction)
    1422           0 :         ereport(ERROR,
    1423             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1424             :                  errmsg_internal("STREAM COMMIT message without STREAM STOP")));
    1425             : 
    1426          26 :     xid = logicalrep_read_stream_commit(s, &commit_data);
    1427          26 :     set_apply_error_context_xact(xid, commit_data.committime);
    1428             : 
    1429          26 :     elog(DEBUG1, "received commit for streamed transaction %u", xid);
    1430             : 
    1431          26 :     apply_spooled_messages(xid, commit_data.commit_lsn);
    1432             : 
    1433          26 :     apply_handle_commit_internal(&commit_data);
    1434             : 
    1435             :     /* unlink the files with serialized changes and subxact info */
    1436          26 :     stream_cleanup_files(MyLogicalRepWorker->subid, xid);
    1437             : 
    1438             :     /* Process any tables that are being synchronized in parallel. */
    1439          26 :     process_syncing_tables(commit_data.end_lsn);
    1440             : 
    1441          26 :     pgstat_report_activity(STATE_IDLE, NULL);
    1442             : 
    1443          26 :     reset_apply_error_context_info();
    1444          26 : }
    1445             : 
    1446             : /*
    1447             :  * Helper function for apply_handle_commit and apply_handle_stream_commit.
    1448             :  */
    1449             : static void
    1450         676 : apply_handle_commit_internal(LogicalRepCommitData *commit_data)
    1451             : {
    1452         676 :     if (IsTransactionState())
    1453             :     {
    1454             :         /*
    1455             :          * Update origin state so we can restart streaming from correct
    1456             :          * position in case of crash.
    1457             :          */
    1458         464 :         replorigin_session_origin_lsn = commit_data->end_lsn;
    1459         464 :         replorigin_session_origin_timestamp = commit_data->committime;
    1460             : 
    1461         464 :         CommitTransactionCommand();
    1462         464 :         pgstat_report_stat(false);
    1463             : 
    1464         464 :         store_flush_position(commit_data->end_lsn);
    1465             :     }
    1466             :     else
    1467             :     {
    1468             :         /* Process any invalidation messages that might have accumulated. */
    1469         212 :         AcceptInvalidationMessages();
    1470         212 :         maybe_reread_subscription();
    1471             :     }
    1472             : 
    1473         676 :     in_remote_transaction = false;
    1474         676 : }
    1475             : 
    1476             : /*
    1477             :  * Handle RELATION message.
    1478             :  *
    1479             :  * Note we don't do validation against local schema here. The validation
    1480             :  * against local schema is postponed until first change for given relation
    1481             :  * comes as we only care about it when applying changes for it anyway and we
    1482             :  * do less locking this way.
    1483             :  */
    1484             : static void
    1485         386 : apply_handle_relation(StringInfo s)
    1486             : {
    1487             :     LogicalRepRelation *rel;
    1488             : 
    1489         386 :     if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
    1490          56 :         return;
    1491             : 
    1492         330 :     rel = logicalrep_read_rel(s);
    1493         330 :     logicalrep_relmap_update(rel);
    1494             : }
    1495             : 
    1496             : /*
    1497             :  * Handle TYPE message.
    1498             :  *
    1499             :  * This implementation pays no attention to TYPE messages; we expect the user
    1500             :  * to have set things up so that the incoming data is acceptable to the input
    1501             :  * functions for the locally subscribed tables.  Hence, we just read and
    1502             :  * discard the message.
    1503             :  */
    1504             : static void
    1505          32 : apply_handle_type(StringInfo s)
    1506             : {
    1507             :     LogicalRepTyp typ;
    1508             : 
    1509          32 :     if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
    1510           0 :         return;
    1511             : 
    1512          32 :     logicalrep_read_typ(s, &typ);
    1513             : }
    1514             : 
    1515             : /*
    1516             :  * Get replica identity index or if it is not defined a primary key.
    1517             :  *
    1518             :  * If neither is defined, returns InvalidOid
    1519             :  */
    1520             : static Oid
    1521      113246 : GetRelationIdentityOrPK(Relation rel)
    1522             : {
    1523             :     Oid         idxoid;
    1524             : 
    1525      113246 :     idxoid = RelationGetReplicaIndex(rel);
    1526             : 
    1527      113246 :     if (!OidIsValid(idxoid))
    1528         250 :         idxoid = RelationGetPrimaryKeyIndex(rel);
    1529             : 
    1530      113246 :     return idxoid;
    1531             : }
    1532             : 
    1533             : /*
    1534             :  * Handle INSERT message.
    1535             :  */
    1536             : 
    1537             : static void
    1538      224918 : apply_handle_insert(StringInfo s)
    1539             : {
    1540             :     LogicalRepRelMapEntry *rel;
    1541             :     LogicalRepTupleData newtup;
    1542             :     LogicalRepRelId relid;
    1543             :     ApplyExecutionData *edata;
    1544             :     EState     *estate;
    1545             :     TupleTableSlot *remoteslot;
    1546             :     MemoryContext oldctx;
    1547             : 
    1548      224918 :     if (handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
    1549      121958 :         return;
    1550             : 
    1551      103026 :     begin_replication_step();
    1552             : 
    1553      103022 :     relid = logicalrep_read_insert(s, &newtup);
    1554      103022 :     rel = logicalrep_rel_open(relid, RowExclusiveLock);
    1555      103012 :     if (!should_apply_changes_for_rel(rel))
    1556             :     {
    1557             :         /*
    1558             :          * The relation can't become interesting in the middle of the
    1559             :          * transaction so it's safe to unlock it.
    1560             :          */
    1561          66 :         logicalrep_rel_close(rel, RowExclusiveLock);
    1562          66 :         end_replication_step();
    1563          66 :         return;
    1564             :     }
    1565             : 
    1566             :     /* Set relation for error callback */
    1567      102946 :     apply_error_callback_arg.rel = rel;
    1568             : 
    1569             :     /* Initialize the executor state. */
    1570      102946 :     edata = create_edata_for_relation(rel);
    1571      102946 :     estate = edata->estate;
    1572      102946 :     remoteslot = ExecInitExtraTupleSlot(estate,
    1573      102946 :                                         RelationGetDescr(rel->localrel),
    1574             :                                         &TTSOpsVirtual);
    1575             : 
    1576             :     /* Process and store remote tuple in the slot */
    1577      102946 :     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    1578      102946 :     slot_store_data(remoteslot, rel, &newtup);
    1579      102946 :     slot_fill_defaults(rel, estate, remoteslot);
    1580      102946 :     MemoryContextSwitchTo(oldctx);
    1581             : 
    1582             :     /* For a partitioned table, insert the tuple into a partition. */
    1583      102946 :     if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
    1584          44 :         apply_handle_tuple_routing(edata,
    1585             :                                    remoteslot, NULL, CMD_INSERT);
    1586             :     else
    1587      102902 :         apply_handle_insert_internal(edata, edata->targetRelInfo,
    1588             :                                      remoteslot);
    1589             : 
    1590      102944 :     finish_edata(edata);
    1591             : 
    1592             :     /* Reset relation for error callback */
    1593      102944 :     apply_error_callback_arg.rel = NULL;
    1594             : 
    1595      102944 :     logicalrep_rel_close(rel, NoLock);
    1596             : 
    1597      102944 :     end_replication_step();
    1598             : }
    1599             : 
    1600             : /*
    1601             :  * Workhorse for apply_handle_insert()
    1602             :  * relinfo is for the relation we're actually inserting into
    1603             :  * (could be a child partition of edata->targetRelInfo)
    1604             :  */
    1605             : static void
    1606      102948 : apply_handle_insert_internal(ApplyExecutionData *edata,
    1607             :                              ResultRelInfo *relinfo,
    1608             :                              TupleTableSlot *remoteslot)
    1609             : {
    1610      102948 :     EState     *estate = edata->estate;
    1611             : 
    1612             :     /* We must open indexes here. */
    1613      102948 :     ExecOpenIndices(relinfo, false);
    1614             : 
    1615             :     /* Do the insert. */
    1616      102948 :     ExecSimpleRelationInsert(relinfo, estate, remoteslot);
    1617             : 
    1618             :     /* Cleanup. */
    1619      102946 :     ExecCloseIndices(relinfo);
    1620      102946 : }
    1621             : 
    1622             : /*
    1623             :  * Check if the logical replication relation is updatable and throw
    1624             :  * appropriate error if it isn't.
    1625             :  */
    1626             : static void
    1627      113244 : check_relation_updatable(LogicalRepRelMapEntry *rel)
    1628             : {
    1629             :     /* Updatable, no error. */
    1630      113244 :     if (rel->updatable)
    1631      113244 :         return;
    1632             : 
    1633             :     /*
    1634             :      * We are in error mode so it's fine this is somewhat slow. It's better to
    1635             :      * give user correct error.
    1636             :      */
    1637           0 :     if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
    1638             :     {
    1639           0 :         ereport(ERROR,
    1640             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1641             :                  errmsg("publisher did not send replica identity column "
    1642             :                         "expected by the logical replication target relation \"%s.%s\"",
    1643             :                         rel->remoterel.nspname, rel->remoterel.relname)));
    1644             :     }
    1645             : 
    1646           0 :     ereport(ERROR,
    1647             :             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1648             :              errmsg("logical replication target relation \"%s.%s\" has "
    1649             :                     "neither REPLICA IDENTITY index nor PRIMARY "
    1650             :                     "KEY and published relation does not have "
    1651             :                     "REPLICA IDENTITY FULL",
    1652             :                     rel->remoterel.nspname, rel->remoterel.relname)));
    1653             : }
    1654             : 
    1655             : /*
    1656             :  * Handle UPDATE message.
    1657             :  *
    1658             :  * TODO: FDW support
    1659             :  */
    1660             : static void
    1661      120292 : apply_handle_update(StringInfo s)
    1662             : {
    1663             :     LogicalRepRelMapEntry *rel;
    1664             :     LogicalRepRelId relid;
    1665             :     ApplyExecutionData *edata;
    1666             :     EState     *estate;
    1667             :     LogicalRepTupleData oldtup;
    1668             :     LogicalRepTupleData newtup;
    1669             :     bool        has_oldtup;
    1670             :     TupleTableSlot *remoteslot;
    1671             :     RangeTblEntry *target_rte;
    1672             :     MemoryContext oldctx;
    1673             : 
    1674      120292 :     if (handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
    1675       62504 :         return;
    1676             : 
    1677       57788 :     begin_replication_step();
    1678             : 
    1679       57788 :     relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
    1680             :                                    &newtup);
    1681       57788 :     rel = logicalrep_rel_open(relid, RowExclusiveLock);
    1682       57788 :     if (!should_apply_changes_for_rel(rel))
    1683             :     {
    1684             :         /*
    1685             :          * The relation can't become interesting in the middle of the
    1686             :          * transaction so it's safe to unlock it.
    1687             :          */
    1688           0 :         logicalrep_rel_close(rel, RowExclusiveLock);
    1689           0 :         end_replication_step();
    1690           0 :         return;
    1691             :     }
    1692             : 
    1693             :     /* Set relation for error callback */
    1694       57788 :     apply_error_callback_arg.rel = rel;
    1695             : 
    1696             :     /* Check if we can do the update. */
    1697       57788 :     check_relation_updatable(rel);
    1698             : 
    1699             :     /* Initialize the executor state. */
    1700       57788 :     edata = create_edata_for_relation(rel);
    1701       57788 :     estate = edata->estate;
    1702       57788 :     remoteslot = ExecInitExtraTupleSlot(estate,
    1703       57788 :                                         RelationGetDescr(rel->localrel),
    1704             :                                         &TTSOpsVirtual);
    1705             : 
    1706             :     /*
    1707             :      * Populate updatedCols so that per-column triggers can fire, and so
    1708             :      * executor can correctly pass down indexUnchanged hint.  This could
    1709             :      * include more columns than were actually changed on the publisher
    1710             :      * because the logical replication protocol doesn't contain that
    1711             :      * information.  But it would for example exclude columns that only exist
    1712             :      * on the subscriber, since we are not touching those.
    1713             :      */
    1714       57788 :     target_rte = list_nth(estate->es_range_table, 0);
    1715      288412 :     for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
    1716             :     {
    1717      230624 :         Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i);
    1718      230624 :         int         remoteattnum = rel->attrmap->attnums[i];
    1719             : 
    1720      230624 :         if (!att->attisdropped && remoteattnum >= 0)
    1721             :         {
    1722             :             Assert(remoteattnum < newtup.ncols);
    1723      125540 :             if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
    1724      125534 :                 target_rte->updatedCols =
    1725      125534 :                     bms_add_member(target_rte->updatedCols,
    1726             :                                    i + 1 - FirstLowInvalidHeapAttributeNumber);
    1727             :         }
    1728             :     }
    1729             : 
    1730             :     /* Also populate extraUpdatedCols, in case we have generated columns */
    1731       57788 :     fill_extraUpdatedCols(target_rte, rel->localrel);
    1732             : 
    1733             :     /* Build the search tuple. */
    1734       57788 :     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    1735       57788 :     slot_store_data(remoteslot, rel,
    1736       57788 :                     has_oldtup ? &oldtup : &newtup);
    1737       57788 :     MemoryContextSwitchTo(oldctx);
    1738             : 
    1739             :     /* For a partitioned table, apply update to correct partition. */
    1740       57788 :     if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
    1741          14 :         apply_handle_tuple_routing(edata,
    1742             :                                    remoteslot, &newtup, CMD_UPDATE);
    1743             :     else
    1744       57774 :         apply_handle_update_internal(edata, edata->targetRelInfo,
    1745             :                                      remoteslot, &newtup);
    1746             : 
    1747       57788 :     finish_edata(edata);
    1748             : 
    1749             :     /* Reset relation for error callback */
    1750       57788 :     apply_error_callback_arg.rel = NULL;
    1751             : 
    1752       57788 :     logicalrep_rel_close(rel, NoLock);
    1753             : 
    1754       57788 :     end_replication_step();
    1755             : }
    1756             : 
    1757             : /*
    1758             :  * Workhorse for apply_handle_update()
    1759             :  * relinfo is for the relation we're actually updating in
    1760             :  * (could be a child partition of edata->targetRelInfo)
    1761             :  */
    1762             : static void
    1763       57774 : apply_handle_update_internal(ApplyExecutionData *edata,
    1764             :                              ResultRelInfo *relinfo,
    1765             :                              TupleTableSlot *remoteslot,
    1766             :                              LogicalRepTupleData *newtup)
    1767             : {
    1768       57774 :     EState     *estate = edata->estate;
    1769       57774 :     LogicalRepRelMapEntry *relmapentry = edata->targetRel;
    1770       57774 :     Relation    localrel = relinfo->ri_RelationDesc;
    1771             :     EPQState    epqstate;
    1772             :     TupleTableSlot *localslot;
    1773             :     bool        found;
    1774             :     MemoryContext oldctx;
    1775             : 
    1776       57774 :     EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
    1777       57774 :     ExecOpenIndices(relinfo, false);
    1778             : 
    1779       57774 :     found = FindReplTupleInLocalRel(estate, localrel,
    1780             :                                     &relmapentry->remoterel,
    1781             :                                     remoteslot, &localslot);
    1782       57774 :     ExecClearTuple(remoteslot);
    1783             : 
    1784             :     /*
    1785             :      * Tuple found.
    1786             :      *
    1787             :      * Note this will fail if there are other conflicting unique indexes.
    1788             :      */
    1789       57774 :     if (found)
    1790             :     {
    1791             :         /* Process and store remote tuple in the slot */
    1792       57772 :         oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    1793       57772 :         slot_modify_data(remoteslot, localslot, relmapentry, newtup);
    1794       57772 :         MemoryContextSwitchTo(oldctx);
    1795             : 
    1796       57772 :         EvalPlanQualSetSlot(&epqstate, remoteslot);
    1797             : 
    1798             :         /* Do the actual update. */
    1799       57772 :         ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
    1800             :                                  remoteslot);
    1801             :     }
    1802             :     else
    1803             :     {
    1804             :         /*
    1805             :          * The tuple to be updated could not be found.  Do nothing except for
    1806             :          * emitting a log message.
    1807             :          *
    1808             :          * XXX should this be promoted to ereport(LOG) perhaps?
    1809             :          */
    1810           2 :         elog(DEBUG1,
    1811             :              "logical replication did not find row to be updated "
    1812             :              "in replication target relation \"%s\"",
    1813             :              RelationGetRelationName(localrel));
    1814             :     }
    1815             : 
    1816             :     /* Cleanup. */
    1817       57774 :     ExecCloseIndices(relinfo);
    1818       57774 :     EvalPlanQualEnd(&epqstate);
    1819       57774 : }
    1820             : 
    1821             : /*
    1822             :  * Handle DELETE message.
    1823             :  *
    1824             :  * TODO: FDW support
    1825             :  */
    1826             : static void
    1827      113576 : apply_handle_delete(StringInfo s)
    1828             : {
    1829             :     LogicalRepRelMapEntry *rel;
    1830             :     LogicalRepTupleData oldtup;
    1831             :     LogicalRepRelId relid;
    1832             :     ApplyExecutionData *edata;
    1833             :     EState     *estate;
    1834             :     TupleTableSlot *remoteslot;
    1835             :     MemoryContext oldctx;
    1836             : 
    1837      113576 :     if (handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
    1838       58120 :         return;
    1839             : 
    1840       55456 :     begin_replication_step();
    1841             : 
    1842       55456 :     relid = logicalrep_read_delete(s, &oldtup);
    1843       55456 :     rel = logicalrep_rel_open(relid, RowExclusiveLock);
    1844       55456 :     if (!should_apply_changes_for_rel(rel))
    1845             :     {
    1846             :         /*
    1847             :          * The relation can't become interesting in the middle of the
    1848             :          * transaction so it's safe to unlock it.
    1849             :          */
    1850           0 :         logicalrep_rel_close(rel, RowExclusiveLock);
    1851           0 :         end_replication_step();
    1852           0 :         return;
    1853             :     }
    1854             : 
    1855             :     /* Set relation for error callback */
    1856       55456 :     apply_error_callback_arg.rel = rel;
    1857             : 
    1858             :     /* Check if we can do the delete. */
    1859       55456 :     check_relation_updatable(rel);
    1860             : 
    1861             :     /* Initialize the executor state. */
    1862       55456 :     edata = create_edata_for_relation(rel);
    1863       55456 :     estate = edata->estate;
    1864       55456 :     remoteslot = ExecInitExtraTupleSlot(estate,
    1865       55456 :                                         RelationGetDescr(rel->localrel),
    1866             :                                         &TTSOpsVirtual);
    1867             : 
    1868             :     /* Build the search tuple. */
    1869       55456 :     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    1870       55456 :     slot_store_data(remoteslot, rel, &oldtup);
    1871       55456 :     MemoryContextSwitchTo(oldctx);
    1872             : 
    1873             :     /* For a partitioned table, apply delete to correct partition. */
    1874       55456 :     if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
    1875          28 :         apply_handle_tuple_routing(edata,
    1876             :                                    remoteslot, NULL, CMD_DELETE);
    1877             :     else
    1878       55428 :         apply_handle_delete_internal(edata, edata->targetRelInfo,
    1879             :                                      remoteslot);
    1880             : 
    1881       55456 :     finish_edata(edata);
    1882             : 
    1883             :     /* Reset relation for error callback */
    1884       55456 :     apply_error_callback_arg.rel = NULL;
    1885             : 
    1886       55456 :     logicalrep_rel_close(rel, NoLock);
    1887             : 
    1888       55456 :     end_replication_step();
    1889             : }
    1890             : 
    1891             : /*
    1892             :  * Workhorse for apply_handle_delete()
    1893             :  * relinfo is for the relation we're actually deleting from
    1894             :  * (could be a child partition of edata->targetRelInfo)
    1895             :  */
    1896             : static void
    1897       55458 : apply_handle_delete_internal(ApplyExecutionData *edata,
    1898             :                              ResultRelInfo *relinfo,
    1899             :                              TupleTableSlot *remoteslot)
    1900             : {
    1901       55458 :     EState     *estate = edata->estate;
    1902       55458 :     Relation    localrel = relinfo->ri_RelationDesc;
    1903       55458 :     LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
    1904             :     EPQState    epqstate;
    1905             :     TupleTableSlot *localslot;
    1906             :     bool        found;
    1907             : 
    1908       55458 :     EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
    1909       55458 :     ExecOpenIndices(relinfo, false);
    1910             : 
    1911       55458 :     found = FindReplTupleInLocalRel(estate, localrel, remoterel,
    1912             :                                     remoteslot, &localslot);
    1913             : 
    1914             :     /* If found delete it. */
    1915       55458 :     if (found)
    1916             :     {
    1917       55448 :         EvalPlanQualSetSlot(&epqstate, localslot);
    1918             : 
    1919             :         /* Do the actual delete. */
    1920       55448 :         ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
    1921             :     }
    1922             :     else
    1923             :     {
    1924             :         /*
    1925             :          * The tuple to be deleted could not be found.  Do nothing except for
    1926             :          * emitting a log message.
    1927             :          *
    1928             :          * XXX should this be promoted to ereport(LOG) perhaps?
    1929             :          */
    1930          10 :         elog(DEBUG1,
    1931             :              "logical replication did not find row to be deleted "
    1932             :              "in replication target relation \"%s\"",
    1933             :              RelationGetRelationName(localrel));
    1934             :     }
    1935             : 
    1936             :     /* Cleanup. */
    1937       55458 :     ExecCloseIndices(relinfo);
    1938       55458 :     EvalPlanQualEnd(&epqstate);
    1939       55458 : }
    1940             : 
    1941             : /*
    1942             :  * Try to find a tuple received from the publication side (in 'remoteslot') in
    1943             :  * the corresponding local relation using either replica identity index,
    1944             :  * primary key or if needed, sequential scan.
    1945             :  *
    1946             :  * Local tuple, if found, is returned in '*localslot'.
    1947             :  */
    1948             : static bool
    1949      113246 : FindReplTupleInLocalRel(EState *estate, Relation localrel,
    1950             :                         LogicalRepRelation *remoterel,
    1951             :                         TupleTableSlot *remoteslot,
    1952             :                         TupleTableSlot **localslot)
    1953             : {
    1954             :     Oid         idxoid;
    1955             :     bool        found;
    1956             : 
    1957      113246 :     *localslot = table_slot_create(localrel, &estate->es_tupleTable);
    1958             : 
    1959      113246 :     idxoid = GetRelationIdentityOrPK(localrel);
    1960             :     Assert(OidIsValid(idxoid) ||
    1961             :            (remoterel->replident == REPLICA_IDENTITY_FULL));
    1962             : 
    1963      113246 :     if (OidIsValid(idxoid))
    1964      113002 :         found = RelationFindReplTupleByIndex(localrel, idxoid,
    1965             :                                              LockTupleExclusive,
    1966             :                                              remoteslot, *localslot);
    1967             :     else
    1968         244 :         found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
    1969             :                                          remoteslot, *localslot);
    1970             : 
    1971      113246 :     return found;
    1972             : }
    1973             : 
    1974             : /*
    1975             :  * This handles insert, update, delete on a partitioned table.
    1976             :  */
    1977             : static void
    1978          86 : apply_handle_tuple_routing(ApplyExecutionData *edata,
    1979             :                            TupleTableSlot *remoteslot,
    1980             :                            LogicalRepTupleData *newtup,
    1981             :                            CmdType operation)
    1982             : {
    1983          86 :     EState     *estate = edata->estate;
    1984          86 :     LogicalRepRelMapEntry *relmapentry = edata->targetRel;
    1985          86 :     ResultRelInfo *relinfo = edata->targetRelInfo;
    1986          86 :     Relation    parentrel = relinfo->ri_RelationDesc;
    1987             :     ModifyTableState *mtstate;
    1988             :     PartitionTupleRouting *proute;
    1989             :     ResultRelInfo *partrelinfo;
    1990             :     Relation    partrel;
    1991             :     TupleTableSlot *remoteslot_part;
    1992             :     TupleConversionMap *map;
    1993             :     MemoryContext oldctx;
    1994             : 
    1995             :     /* ModifyTableState is needed for ExecFindPartition(). */
    1996          86 :     edata->mtstate = mtstate = makeNode(ModifyTableState);
    1997          86 :     mtstate->ps.plan = NULL;
    1998          86 :     mtstate->ps.state = estate;
    1999          86 :     mtstate->operation = operation;
    2000          86 :     mtstate->resultRelInfo = relinfo;
    2001             : 
    2002             :     /* ... as is PartitionTupleRouting. */
    2003          86 :     edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
    2004             : 
    2005             :     /*
    2006             :      * Find the partition to which the "search tuple" belongs.
    2007             :      */
    2008             :     Assert(remoteslot != NULL);
    2009          86 :     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    2010          86 :     partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
    2011             :                                     remoteslot, estate);
    2012             :     Assert(partrelinfo != NULL);
    2013          86 :     partrel = partrelinfo->ri_RelationDesc;
    2014             : 
    2015             :     /*
    2016             :      * To perform any of the operations below, the tuple must match the
    2017             :      * partition's rowtype. Convert if needed or just copy, using a dedicated
    2018             :      * slot to store the tuple in any case.
    2019             :      */
    2020          86 :     remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
    2021          86 :     if (remoteslot_part == NULL)
    2022          28 :         remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
    2023          86 :     map = partrelinfo->ri_RootToPartitionMap;
    2024          86 :     if (map != NULL)
    2025          58 :         remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
    2026             :                                                 remoteslot_part);
    2027             :     else
    2028             :     {
    2029          28 :         remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
    2030          28 :         slot_getallattrs(remoteslot_part);
    2031             :     }
    2032          86 :     MemoryContextSwitchTo(oldctx);
    2033             : 
    2034          86 :     switch (operation)
    2035             :     {
    2036          44 :         case CMD_INSERT:
    2037          44 :             apply_handle_insert_internal(edata, partrelinfo,
    2038             :                                          remoteslot_part);
    2039          44 :             break;
    2040             : 
    2041          28 :         case CMD_DELETE:
    2042          28 :             apply_handle_delete_internal(edata, partrelinfo,
    2043             :                                          remoteslot_part);
    2044          28 :             break;
    2045             : 
    2046          14 :         case CMD_UPDATE:
    2047             : 
    2048             :             /*
    2049             :              * For UPDATE, depending on whether or not the updated tuple
    2050             :              * satisfies the partition's constraint, perform a simple UPDATE
    2051             :              * of the partition or move the updated tuple into a different
    2052             :              * suitable partition.
    2053             :              */
    2054             :             {
    2055          14 :                 AttrMap    *attrmap = map ? map->attrMap : NULL;
    2056             :                 LogicalRepRelMapEntry *part_entry;
    2057             :                 TupleTableSlot *localslot;
    2058             :                 ResultRelInfo *partrelinfo_new;
    2059             :                 bool        found;
    2060             : 
    2061          14 :                 part_entry = logicalrep_partition_open(relmapentry, partrel,
    2062             :                                                        attrmap);
    2063             : 
    2064             :                 /* Get the matching local tuple from the partition. */
    2065          14 :                 found = FindReplTupleInLocalRel(estate, partrel,
    2066             :                                                 &part_entry->remoterel,
    2067             :                                                 remoteslot_part, &localslot);
    2068          14 :                 if (!found)
    2069             :                 {
    2070             :                     /*
    2071             :                      * The tuple to be updated could not be found.  Do nothing
    2072             :                      * except for emitting a log message.
    2073             :                      *
    2074             :                      * XXX should this be promoted to ereport(LOG) perhaps?
    2075             :                      */
    2076           4 :                     elog(DEBUG1,
    2077             :                          "logical replication did not find row to be updated "
    2078             :                          "in replication target relation's partition \"%s\"",
    2079             :                          RelationGetRelationName(partrel));
    2080           4 :                     return;
    2081             :                 }
    2082             : 
    2083             :                 /*
    2084             :                  * Apply the update to the local tuple, putting the result in
    2085             :                  * remoteslot_part.
    2086             :                  */
    2087          10 :                 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    2088          10 :                 slot_modify_data(remoteslot_part, localslot, part_entry,
    2089             :                                  newtup);
    2090          10 :                 MemoryContextSwitchTo(oldctx);
    2091             : 
    2092             :                 /*
    2093             :                  * Does the updated tuple still satisfy the current
    2094             :                  * partition's constraint?
    2095             :                  */
    2096          20 :                 if (!partrel->rd_rel->relispartition ||
    2097          10 :                     ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
    2098             :                                        false))
    2099           8 :                 {
    2100             :                     /*
    2101             :                      * Yes, so simply UPDATE the partition.  We don't call
    2102             :                      * apply_handle_update_internal() here, which would
    2103             :                      * normally do the following work, to avoid repeating some
    2104             :                      * work already done above to find the local tuple in the
    2105             :                      * partition.
    2106             :                      */
    2107             :                     EPQState    epqstate;
    2108             : 
    2109           8 :                     EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
    2110           8 :                     ExecOpenIndices(partrelinfo, false);
    2111             : 
    2112           8 :                     EvalPlanQualSetSlot(&epqstate, remoteslot_part);
    2113           8 :                     ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
    2114             :                                              localslot, remoteslot_part);
    2115           8 :                     ExecCloseIndices(partrelinfo);
    2116           8 :                     EvalPlanQualEnd(&epqstate);
    2117             :                 }
    2118             :                 else
    2119             :                 {
    2120             :                     /* Move the tuple into the new partition. */
    2121             : 
    2122             :                     /*
    2123             :                      * New partition will be found using tuple routing, which
    2124             :                      * can only occur via the parent table.  We might need to
    2125             :                      * convert the tuple to the parent's rowtype.  Note that
    2126             :                      * this is the tuple found in the partition, not the
    2127             :                      * original search tuple received by this function.
    2128             :                      */
    2129           2 :                     if (map)
    2130             :                     {
    2131             :                         TupleConversionMap *PartitionToRootMap =
    2132           2 :                         convert_tuples_by_name(RelationGetDescr(partrel),
    2133             :                                                RelationGetDescr(parentrel));
    2134             : 
    2135             :                         remoteslot =
    2136           2 :                             execute_attr_map_slot(PartitionToRootMap->attrMap,
    2137             :                                                   remoteslot_part, remoteslot);
    2138             :                     }
    2139             :                     else
    2140             :                     {
    2141           0 :                         remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
    2142           0 :                         slot_getallattrs(remoteslot);
    2143             :                     }
    2144             : 
    2145             : 
    2146             :                     /* Find the new partition. */
    2147           2 :                     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    2148           2 :                     partrelinfo_new = ExecFindPartition(mtstate, relinfo,
    2149             :                                                         proute, remoteslot,
    2150             :                                                         estate);
    2151           2 :                     MemoryContextSwitchTo(oldctx);
    2152             :                     Assert(partrelinfo_new != partrelinfo);
    2153             : 
    2154             :                     /* DELETE old tuple found in the old partition. */
    2155           2 :                     apply_handle_delete_internal(edata, partrelinfo,
    2156             :                                                  localslot);
    2157             : 
    2158             :                     /* INSERT new tuple into the new partition. */
    2159             : 
    2160             :                     /*
    2161             :                      * Convert the replacement tuple to match the destination
    2162             :                      * partition rowtype.
    2163             :                      */
    2164           2 :                     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    2165           2 :                     partrel = partrelinfo_new->ri_RelationDesc;
    2166           2 :                     remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
    2167           2 :                     if (remoteslot_part == NULL)
    2168           2 :                         remoteslot_part = table_slot_create(partrel,
    2169             :                                                             &estate->es_tupleTable);
    2170           2 :                     map = partrelinfo_new->ri_RootToPartitionMap;
    2171           2 :                     if (map != NULL)
    2172             :                     {
    2173           0 :                         remoteslot_part = execute_attr_map_slot(map->attrMap,
    2174             :                                                                 remoteslot,
    2175             :                                                                 remoteslot_part);
    2176             :                     }
    2177             :                     else
    2178             :                     {
    2179           2 :                         remoteslot_part = ExecCopySlot(remoteslot_part,
    2180             :                                                        remoteslot);
    2181           2 :                         slot_getallattrs(remoteslot);
    2182             :                     }
    2183           2 :                     MemoryContextSwitchTo(oldctx);
    2184           2 :                     apply_handle_insert_internal(edata, partrelinfo_new,
    2185             :                                                  remoteslot_part);
    2186             :                 }
    2187             :             }
    2188          10 :             break;
    2189             : 
    2190           0 :         default:
    2191           0 :             elog(ERROR, "unrecognized CmdType: %d", (int) operation);
    2192             :             break;
    2193             :     }
    2194             : }
    2195             : 
    2196             : /*
    2197             :  * Handle TRUNCATE message.
    2198             :  *
    2199             :  * TODO: FDW support
    2200             :  */
    2201             : static void
    2202          30 : apply_handle_truncate(StringInfo s)
    2203             : {
    2204          30 :     bool        cascade = false;
    2205          30 :     bool        restart_seqs = false;
    2206          30 :     List       *remote_relids = NIL;
    2207          30 :     List       *remote_rels = NIL;
    2208          30 :     List       *rels = NIL;
    2209          30 :     List       *part_rels = NIL;
    2210          30 :     List       *relids = NIL;
    2211          30 :     List       *relids_logged = NIL;
    2212             :     ListCell   *lc;
    2213          30 :     LOCKMODE    lockmode = AccessExclusiveLock;
    2214             : 
    2215          30 :     if (handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
    2216           0 :         return;
    2217             : 
    2218          30 :     begin_replication_step();
    2219             : 
    2220          30 :     remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
    2221             : 
    2222          76 :     foreach(lc, remote_relids)
    2223             :     {
    2224          46 :         LogicalRepRelId relid = lfirst_oid(lc);
    2225             :         LogicalRepRelMapEntry *rel;
    2226             : 
    2227          46 :         rel = logicalrep_rel_open(relid, lockmode);
    2228          46 :         if (!should_apply_changes_for_rel(rel))
    2229             :         {
    2230             :             /*
    2231             :              * The relation can't become interesting in the middle of the
    2232             :              * transaction so it's safe to unlock it.
    2233             :              */
    2234           0 :             logicalrep_rel_close(rel, lockmode);
    2235           0 :             continue;
    2236             :         }
    2237             : 
    2238          46 :         remote_rels = lappend(remote_rels, rel);
    2239          46 :         rels = lappend(rels, rel->localrel);
    2240          46 :         relids = lappend_oid(relids, rel->localreloid);
    2241          46 :         if (RelationIsLogicallyLogged(rel->localrel))
    2242          46 :             relids_logged = lappend_oid(relids_logged, rel->localreloid);
    2243             : 
    2244             :         /*
    2245             :          * Truncate partitions if we got a message to truncate a partitioned
    2246             :          * table.
    2247             :          */
    2248          46 :         if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
    2249             :         {
    2250             :             ListCell   *child;
    2251           8 :             List       *children = find_all_inheritors(rel->localreloid,
    2252             :                                                        lockmode,
    2253             :                                                        NULL);
    2254             : 
    2255          30 :             foreach(child, children)
    2256             :             {
    2257          22 :                 Oid         childrelid = lfirst_oid(child);
    2258             :                 Relation    childrel;
    2259             : 
    2260          22 :                 if (list_member_oid(relids, childrelid))
    2261           8 :                     continue;
    2262             : 
    2263             :                 /* find_all_inheritors already got lock */
    2264          14 :                 childrel = table_open(childrelid, NoLock);
    2265             : 
    2266             :                 /*
    2267             :                  * Ignore temp tables of other backends.  See similar code in
    2268             :                  * ExecuteTruncate().
    2269             :                  */
    2270          14 :                 if (RELATION_IS_OTHER_TEMP(childrel))
    2271             :                 {
    2272           0 :                     table_close(childrel, lockmode);
    2273           0 :                     continue;
    2274             :                 }
    2275             : 
    2276          14 :                 rels = lappend(rels, childrel);
    2277          14 :                 part_rels = lappend(part_rels, childrel);
    2278          14 :                 relids = lappend_oid(relids, childrelid);
    2279             :                 /* Log this relation only if needed for logical decoding */
    2280          14 :                 if (RelationIsLogicallyLogged(childrel))
    2281          14 :                     relids_logged = lappend_oid(relids_logged, childrelid);
    2282             :             }
    2283             :         }
    2284             :     }
    2285             : 
    2286             :     /*
    2287             :      * Even if we used CASCADE on the upstream primary we explicitly default
    2288             :      * to replaying changes without further cascading. This might be later
    2289             :      * changeable with a user specified option.
    2290             :      */
    2291          30 :     ExecuteTruncateGuts(rels,
    2292             :                         relids,
    2293             :                         relids_logged,
    2294             :                         DROP_RESTRICT,
    2295             :                         restart_seqs);
    2296          76 :     foreach(lc, remote_rels)
    2297             :     {
    2298          46 :         LogicalRepRelMapEntry *rel = lfirst(lc);
    2299             : 
    2300          46 :         logicalrep_rel_close(rel, NoLock);
    2301             :     }
    2302          44 :     foreach(lc, part_rels)
    2303             :     {
    2304          14 :         Relation    rel = lfirst(lc);
    2305             : 
    2306          14 :         table_close(rel, NoLock);
    2307             :     }
    2308             : 
    2309          30 :     end_replication_step();
    2310             : }
    2311             : 
    2312             : 
    2313             : /*
    2314             :  * Logical replication protocol message dispatcher.
    2315             :  */
    2316             : static void
    2317      462026 : apply_dispatch(StringInfo s)
    2318             : {
    2319      462026 :     LogicalRepMsgType action = pq_getmsgbyte(s);
    2320             :     LogicalRepMsgType saved_command;
    2321             : 
    2322             :     /*
    2323             :      * Set the current command being applied. Since this function can be
    2324             :      * called recusively when applying spooled changes, save the current
    2325             :      * command.
    2326             :      */
    2327      462026 :     saved_command = apply_error_callback_arg.command;
    2328      462026 :     apply_error_callback_arg.command = action;
    2329             : 
    2330      462026 :     switch (action)
    2331             :     {
    2332         670 :         case LOGICAL_REP_MSG_BEGIN:
    2333         670 :             apply_handle_begin(s);
    2334         670 :             break;
    2335             : 
    2336         650 :         case LOGICAL_REP_MSG_COMMIT:
    2337         650 :             apply_handle_commit(s);
    2338         650 :             break;
    2339             : 
    2340      224918 :         case LOGICAL_REP_MSG_INSERT:
    2341      224918 :             apply_handle_insert(s);
    2342      224902 :             break;
    2343             : 
    2344      120292 :         case LOGICAL_REP_MSG_UPDATE:
    2345      120292 :             apply_handle_update(s);
    2346      120292 :             break;
    2347             : 
    2348      113576 :         case LOGICAL_REP_MSG_DELETE:
    2349      113576 :             apply_handle_delete(s);
    2350      113576 :             break;
    2351             : 
    2352          30 :         case LOGICAL_REP_MSG_TRUNCATE:
    2353          30 :             apply_handle_truncate(s);
    2354          30 :             break;
    2355             : 
    2356         386 :         case LOGICAL_REP_MSG_RELATION:
    2357         386 :             apply_handle_relation(s);
    2358         386 :             break;
    2359             : 
    2360          32 :         case LOGICAL_REP_MSG_TYPE:
    2361          32 :             apply_handle_type(s);
    2362          32 :             break;
    2363             : 
    2364          22 :         case LOGICAL_REP_MSG_ORIGIN:
    2365          22 :             apply_handle_origin(s);
    2366          22 :             break;
    2367             : 
    2368           0 :         case LOGICAL_REP_MSG_MESSAGE:
    2369             : 
    2370             :             /*
    2371             :              * Logical replication does not use generic logical messages yet.
    2372             :              * Although, it could be used by other applications that use this
    2373             :              * output plugin.
    2374             :              */
    2375           0 :             break;
    2376             : 
    2377         652 :         case LOGICAL_REP_MSG_STREAM_START:
    2378         652 :             apply_handle_stream_start(s);
    2379         652 :             break;
    2380             : 
    2381         652 :         case LOGICAL_REP_MSG_STREAM_STOP:
    2382         652 :             apply_handle_stream_stop(s);
    2383         652 :             break;
    2384             : 
    2385          22 :         case LOGICAL_REP_MSG_STREAM_ABORT:
    2386          22 :             apply_handle_stream_abort(s);
    2387          22 :             break;
    2388             : 
    2389          26 :         case LOGICAL_REP_MSG_STREAM_COMMIT:
    2390          26 :             apply_handle_stream_commit(s);
    2391          26 :             break;
    2392             : 
    2393          24 :         case LOGICAL_REP_MSG_BEGIN_PREPARE:
    2394          24 :             apply_handle_begin_prepare(s);
    2395          24 :             break;
    2396             : 
    2397          24 :         case LOGICAL_REP_MSG_PREPARE:
    2398          24 :             apply_handle_prepare(s);
    2399          24 :             break;
    2400             : 
    2401          30 :         case LOGICAL_REP_MSG_COMMIT_PREPARED:
    2402          30 :             apply_handle_commit_prepared(s);
    2403          30 :             break;
    2404             : 
    2405           8 :         case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
    2406           8 :             apply_handle_rollback_prepared(s);
    2407           8 :             break;
    2408             : 
    2409          12 :         case LOGICAL_REP_MSG_STREAM_PREPARE:
    2410          12 :             apply_handle_stream_prepare(s);
    2411          12 :             break;
    2412             : 
    2413           0 :         default:
    2414           0 :             ereport(ERROR,
    2415             :                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
    2416             :                      errmsg("invalid logical replication message type \"%c\"", action)));
    2417             :     }
    2418             : 
    2419             :     /* Reset the current command */
    2420      462010 :     apply_error_callback_arg.command = saved_command;
    2421      462010 : }
    2422             : 
    2423             : /*
    2424             :  * Figure out which write/flush positions to report to the walsender process.
    2425             :  *
    2426             :  * We can't simply report back the last LSN the walsender sent us because the
    2427             :  * local transaction might not yet be flushed to disk locally. Instead we
    2428             :  * build a list that associates local with remote LSNs for every commit. When
    2429             :  * reporting back the flush position to the sender we iterate that list and
    2430             :  * check which entries on it are already locally flushed. Those we can report
    2431             :  * as having been flushed.
    2432             :  *
    2433             :  * The have_pending_txes is true if there are outstanding transactions that
    2434             :  * need to be flushed.
    2435             :  */
    2436             : static void
    2437       63990 : get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
    2438             :                    bool *have_pending_txes)
    2439             : {
    2440             :     dlist_mutable_iter iter;
    2441       63990 :     XLogRecPtr  local_flush = GetFlushRecPtr(NULL);
    2442             : 
    2443       63990 :     *write = InvalidXLogRecPtr;
    2444       63990 :     *flush = InvalidXLogRecPtr;
    2445             : 
    2446       64482 :     dlist_foreach_modify(iter, &lsn_mapping)
    2447             :     {
    2448       32418 :         FlushPosition *pos =
    2449       32418 :         dlist_container(FlushPosition, node, iter.cur);
    2450             : 
    2451       32418 :         *write = pos->remote_end;
    2452             : 
    2453       32418 :         if (pos->local_end <= local_flush)
    2454             :         {
    2455         492 :             *flush = pos->remote_end;
    2456         492 :             dlist_delete(iter.cur);
    2457         492 :             pfree(pos);
    2458             :         }
    2459             :         else
    2460             :         {
    2461             :             /*
    2462             :              * Don't want to uselessly iterate over the rest of the list which
    2463             :              * could potentially be long. Instead get the last element and
    2464             :              * grab the write position from there.
    2465             :              */
    2466       31926 :             pos = dlist_tail_element(FlushPosition, node,
    2467             :                                      &lsn_mapping);
    2468       31926 :             *write = pos->remote_end;
    2469       31926 :             *have_pending_txes = true;
    2470       31926 :             return;
    2471             :         }
    2472             :     }
    2473             : 
    2474       32064 :     *have_pending_txes = !dlist_is_empty(&lsn_mapping);
    2475             : }
    2476             : 
    2477             : /*
    2478             :  * Store current remote/local lsn pair in the tracking list.
    2479             :  */
    2480             : static void
    2481         538 : store_flush_position(XLogRecPtr remote_lsn)
    2482             : {
    2483             :     FlushPosition *flushpos;
    2484             : 
    2485             :     /* Need to do this in permanent context */
    2486         538 :     MemoryContextSwitchTo(ApplyContext);
    2487             : 
    2488             :     /* Track commit lsn  */
    2489         538 :     flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
    2490         538 :     flushpos->local_end = XactLastCommitEnd;
    2491         538 :     flushpos->remote_end = remote_lsn;
    2492             : 
    2493         538 :     dlist_push_tail(&lsn_mapping, &flushpos->node);
    2494         538 :     MemoryContextSwitchTo(ApplyMessageContext);
    2495         538 : }
    2496             : 
    2497             : 
    2498             : /* Update statistics of the worker. */
    2499             : static void
    2500      257006 : UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
    2501             : {
    2502      257006 :     MyLogicalRepWorker->last_lsn = last_lsn;
    2503      257006 :     MyLogicalRepWorker->last_send_time = send_time;
    2504      257006 :     MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
    2505      257006 :     if (reply)
    2506             :     {
    2507        9470 :         MyLogicalRepWorker->reply_lsn = last_lsn;
    2508        9470 :         MyLogicalRepWorker->reply_time = send_time;
    2509             :     }
    2510      257006 : }
    2511             : 
    2512             : /*
    2513             :  * Apply main loop.
    2514             :  */
    2515             : static void
    2516         296 : LogicalRepApplyLoop(XLogRecPtr last_received)
    2517             : {
    2518         296 :     TimestampTz last_recv_timestamp = GetCurrentTimestamp();
    2519         296 :     bool        ping_sent = false;
    2520             :     TimeLineID  tli;
    2521             :     ErrorContextCallback errcallback;
    2522             : 
    2523             :     /*
    2524             :      * Init the ApplyMessageContext which we clean up after each replication
    2525             :      * protocol message.
    2526             :      */
    2527         296 :     ApplyMessageContext = AllocSetContextCreate(ApplyContext,
    2528             :                                                 "ApplyMessageContext",
    2529             :                                                 ALLOCSET_DEFAULT_SIZES);
    2530             : 
    2531             :     /*
    2532             :      * This memory context is used for per-stream data when the streaming mode
    2533             :      * is enabled. This context is reset on each stream stop.
    2534             :      */
    2535         296 :     LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
    2536             :                                                     "LogicalStreamingContext",
    2537             :                                                     ALLOCSET_DEFAULT_SIZES);
    2538             : 
    2539             :     /* mark as idle, before starting to loop */
    2540         296 :     pgstat_report_activity(STATE_IDLE, NULL);
    2541             : 
    2542             :     /*
    2543             :      * Push apply error context callback. Fields will be filled during
    2544             :      * applying a change.
    2545             :      */
    2546         296 :     errcallback.callback = apply_error_callback;
    2547         296 :     errcallback.previous = error_context_stack;
    2548         296 :     error_context_stack = &errcallback;
    2549             : 
    2550             :     /* This outer loop iterates once per wait. */
    2551             :     for (;;)
    2552       54202 :     {
    2553       54498 :         pgsocket    fd = PGINVALID_SOCKET;
    2554             :         int         rc;
    2555             :         int         len;
    2556       54498 :         char       *buf = NULL;
    2557       54498 :         bool        endofstream = false;
    2558             :         long        wait_time;
    2559             : 
    2560       54498 :         CHECK_FOR_INTERRUPTS();
    2561             : 
    2562       54498 :         MemoryContextSwitchTo(ApplyMessageContext);
    2563             : 
    2564       54498 :         len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
    2565             : 
    2566       54478 :         if (len != 0)
    2567             :         {
    2568             :             /* Loop to process all available data (without blocking). */
    2569             :             for (;;)
    2570             :             {
    2571      310866 :                 CHECK_FOR_INTERRUPTS();
    2572             : 
    2573      310866 :                 if (len == 0)
    2574             :                 {
    2575       53854 :                     break;
    2576             :                 }
    2577      257012 :                 else if (len < 0)
    2578             :                 {
    2579           6 :                     ereport(LOG,
    2580             :                             (errmsg("data stream from publisher has ended")));
    2581           6 :                     endofstream = true;
    2582           6 :                     break;
    2583             :                 }
    2584             :                 else
    2585             :                 {
    2586             :                     int         c;
    2587             :                     StringInfoData s;
    2588             : 
    2589             :                     /* Reset timeout. */
    2590      257006 :                     last_recv_timestamp = GetCurrentTimestamp();
    2591      257006 :                     ping_sent = false;
    2592             : 
    2593             :                     /* Ensure we are reading the data into our memory context. */
    2594      257006 :                     MemoryContextSwitchTo(ApplyMessageContext);
    2595             : 
    2596      257006 :                     s.data = buf;
    2597      257006 :                     s.len = len;
    2598      257006 :                     s.cursor = 0;
    2599      257006 :                     s.maxlen = -1;
    2600             : 
    2601      257006 :                     c = pq_getmsgbyte(&s);
    2602             : 
    2603      257006 :                     if (c == 'w')
    2604             :                     {
    2605             :                         XLogRecPtr  start_lsn;
    2606             :                         XLogRecPtr  end_lsn;
    2607             :                         TimestampTz send_time;
    2608             : 
    2609      247536 :                         start_lsn = pq_getmsgint64(&s);
    2610      247536 :                         end_lsn = pq_getmsgint64(&s);
    2611      247536 :                         send_time = pq_getmsgint64(&s);
    2612             : 
    2613      247536 :                         if (last_received < start_lsn)
    2614      226400 :                             last_received = start_lsn;
    2615             : 
    2616      247536 :                         if (last_received < end_lsn)
    2617           0 :                             last_received = end_lsn;
    2618             : 
    2619      247536 :                         UpdateWorkerStats(last_received, send_time, false);
    2620             : 
    2621      247536 :                         apply_dispatch(&s);
    2622             :                     }
    2623        9470 :                     else if (c == 'k')
    2624             :                     {
    2625             :                         XLogRecPtr  end_lsn;
    2626             :                         TimestampTz timestamp;
    2627             :                         bool        reply_requested;
    2628             : 
    2629        9470 :                         end_lsn = pq_getmsgint64(&s);
    2630        9470 :                         timestamp = pq_getmsgint64(&s);
    2631        9470 :                         reply_requested = pq_getmsgbyte(&s);
    2632             : 
    2633        9470 :                         if (last_received < end_lsn)
    2634         416 :                             last_received = end_lsn;
    2635             : 
    2636        9470 :                         send_feedback(last_received, reply_requested, false);
    2637        9470 :                         UpdateWorkerStats(last_received, timestamp, true);
    2638             :                     }
    2639             :                     /* other message types are purposefully ignored */
    2640             : 
    2641      256990 :                     MemoryContextReset(ApplyMessageContext);
    2642             :                 }
    2643             : 
    2644      256990 :                 len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
    2645             :             }
    2646             :         }
    2647             : 
    2648             :         /* confirm all writes so far */
    2649       54462 :         send_feedback(last_received, false, false);
    2650             : 
    2651       54462 :         if (!in_remote_transaction && !in_streamed_transaction)
    2652             :         {
    2653             :             /*
    2654             :              * If we didn't get any transactions for a while there might be
    2655             :              * unconsumed invalidation messages in the queue, consume them
    2656             :              * now.
    2657             :              */
    2658       10598 :             AcceptInvalidationMessages();
    2659       10598 :             maybe_reread_subscription();
    2660             : 
    2661             :             /* Process any table synchronization changes. */
    2662       10580 :             process_syncing_tables(last_received);
    2663             :         }
    2664             : 
    2665             :         /* Cleanup the memory. */
    2666       54270 :         MemoryContextResetAndDeleteChildren(ApplyMessageContext);
    2667       54270 :         MemoryContextSwitchTo(TopMemoryContext);
    2668             : 
    2669             :         /* Check if we need to exit the streaming loop. */
    2670       54270 :         if (endofstream)
    2671           6 :             break;
    2672             : 
    2673             :         /*
    2674             :          * Wait for more data or latch.  If we have unflushed transactions,
    2675             :          * wake up after WalWriterDelay to see if they've been flushed yet (in
    2676             :          * which case we should send a feedback message).  Otherwise, there's
    2677             :          * no particular urgency about waking up unless we get data or a
    2678             :          * signal.
    2679             :          */
    2680       54264 :         if (!dlist_is_empty(&lsn_mapping))
    2681       23208 :             wait_time = WalWriterDelay;
    2682             :         else
    2683       31056 :             wait_time = NAPTIME_PER_CYCLE;
    2684             : 
    2685       54264 :         rc = WaitLatchOrSocket(MyLatch,
    2686             :                                WL_SOCKET_READABLE | WL_LATCH_SET |
    2687             :                                WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    2688             :                                fd, wait_time,
    2689             :                                WAIT_EVENT_LOGICAL_APPLY_MAIN);
    2690             : 
    2691       54264 :         if (rc & WL_LATCH_SET)
    2692             :         {
    2693         318 :             ResetLatch(MyLatch);
    2694         318 :             CHECK_FOR_INTERRUPTS();
    2695             :         }
    2696             : 
    2697       54202 :         if (ConfigReloadPending)
    2698             :         {
    2699          10 :             ConfigReloadPending = false;
    2700          10 :             ProcessConfigFile(PGC_SIGHUP);
    2701             :         }
    2702             : 
    2703       54202 :         if (rc & WL_TIMEOUT)
    2704             :         {
    2705             :             /*
    2706             :              * We didn't receive anything new. If we haven't heard anything
    2707             :              * from the server for more than wal_receiver_timeout / 2, ping
    2708             :              * the server. Also, if it's been longer than
    2709             :              * wal_receiver_status_interval since the last update we sent,
    2710             :              * send a status update to the primary anyway, to report any
    2711             :              * progress in applying WAL.
    2712             :              */
    2713          58 :             bool        requestReply = false;
    2714             : 
    2715             :             /*
    2716             :              * Check if time since last receive from primary has reached the
    2717             :              * configured limit.
    2718             :              */
    2719          58 :             if (wal_receiver_timeout > 0)
    2720             :             {
    2721          58 :                 TimestampTz now = GetCurrentTimestamp();
    2722             :                 TimestampTz timeout;
    2723             : 
    2724          58 :                 timeout =
    2725          58 :                     TimestampTzPlusMilliseconds(last_recv_timestamp,
    2726             :                                                 wal_receiver_timeout);
    2727             : 
    2728          58 :                 if (now >= timeout)
    2729           0 :                     ereport(ERROR,
    2730             :                             (errcode(ERRCODE_CONNECTION_FAILURE),
    2731             :                              errmsg("terminating logical replication worker due to timeout")));
    2732             : 
    2733             :                 /* Check to see if it's time for a ping. */
    2734          58 :                 if (!ping_sent)
    2735             :                 {
    2736          58 :                     timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
    2737             :                                                           (wal_receiver_timeout / 2));
    2738          58 :                     if (now >= timeout)
    2739             :                     {
    2740           0 :                         requestReply = true;
    2741           0 :                         ping_sent = true;
    2742             :                     }
    2743             :                 }
    2744             :             }
    2745             : 
    2746          58 :             send_feedback(last_received, requestReply, requestReply);
    2747             :         }
    2748             :     }
    2749             : 
    2750             :     /* Pop the error context stack */
    2751           6 :     error_context_stack = errcallback.previous;
    2752             : 
    2753             :     /* All done */
    2754           6 :     walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
    2755           0 : }
    2756             : 
    2757             : /*
    2758             :  * Send a Standby Status Update message to server.
    2759             :  *
    2760             :  * 'recvpos' is the latest LSN we've received data to, force is set if we need
    2761             :  * to send a response to avoid timeouts.
    2762             :  */
    2763             : static void
    2764       63990 : send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
    2765             : {
    2766             :     static StringInfo reply_message = NULL;
    2767             :     static TimestampTz send_time = 0;
    2768             : 
    2769             :     static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
    2770             :     static XLogRecPtr last_writepos = InvalidXLogRecPtr;
    2771             :     static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
    2772             : 
    2773             :     XLogRecPtr  writepos;
    2774             :     XLogRecPtr  flushpos;
    2775             :     TimestampTz now;
    2776             :     bool        have_pending_txes;
    2777             : 
    2778             :     /*
    2779             :      * If the user doesn't want status to be reported to the publisher, be
    2780             :      * sure to exit before doing anything at all.
    2781             :      */
    2782       63990 :     if (!force && wal_receiver_status_interval <= 0)
    2783       27198 :         return;
    2784             : 
    2785             :     /* It's legal to not pass a recvpos */
    2786       63990 :     if (recvpos < last_recvpos)
    2787           0 :         recvpos = last_recvpos;
    2788             : 
    2789       63990 :     get_flush_position(&writepos, &flushpos, &have_pending_txes);
    2790             : 
    2791             :     /*
    2792             :      * No outstanding transactions to flush, we can report the latest received
    2793             :      * position. This is important for synchronous replication.
    2794             :      */
    2795       63990 :     if (!have_pending_txes)
    2796       32064 :         flushpos = writepos = recvpos;
    2797             : 
    2798       63990 :     if (writepos < last_writepos)
    2799           0 :         writepos = last_writepos;
    2800             : 
    2801       63990 :     if (flushpos < last_flushpos)
    2802       31880 :         flushpos = last_flushpos;
    2803             : 
    2804       63990 :     now = GetCurrentTimestamp();
    2805             : 
    2806             :     /* if we've already reported everything we're good */
    2807       63990 :     if (!force &&
    2808       55782 :         writepos == last_writepos &&
    2809       27334 :         flushpos == last_flushpos &&
    2810       27290 :         !TimestampDifferenceExceeds(send_time, now,
    2811             :                                     wal_receiver_status_interval * 1000))
    2812       27198 :         return;
    2813       36792 :     send_time = now;
    2814             : 
    2815       36792 :     if (!reply_message)
    2816             :     {
    2817         296 :         MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
    2818             : 
    2819         296 :         reply_message = makeStringInfo();
    2820         296 :         MemoryContextSwitchTo(oldctx);
    2821             :     }
    2822             :     else
    2823       36496 :         resetStringInfo(reply_message);
    2824             : 
    2825       36792 :     pq_sendbyte(reply_message, 'r');
    2826       36792 :     pq_sendint64(reply_message, recvpos);   /* write */
    2827       36792 :     pq_sendint64(reply_message, flushpos);  /* flush */
    2828       36792 :     pq_sendint64(reply_message, writepos);  /* apply */
    2829       36792 :     pq_sendint64(reply_message, now);   /* sendTime */
    2830       36792 :     pq_sendbyte(reply_message, requestReply);   /* replyRequested */
    2831             : 
    2832       36792 :     elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
    2833             :          force,
    2834             :          LSN_FORMAT_ARGS(recvpos),
    2835             :          LSN_FORMAT_ARGS(writepos),
    2836             :          LSN_FORMAT_ARGS(flushpos));
    2837             : 
    2838       36792 :     walrcv_send(LogRepWorkerWalRcvConn,
    2839             :                 reply_message->data, reply_message->len);
    2840             : 
    2841       36792 :     if (recvpos > last_recvpos)
    2842       28448 :         last_recvpos = recvpos;
    2843       36792 :     if (writepos > last_writepos)
    2844       28450 :         last_writepos = writepos;
    2845       36792 :     if (flushpos > last_flushpos)
    2846       28136 :         last_flushpos = flushpos;
    2847             : }
    2848             : 
    2849             : /*
    2850             :  * Reread subscription info if needed. Most changes will be exit.
    2851             :  */
    2852             : static void
    2853       12038 : maybe_reread_subscription(void)
    2854             : {
    2855             :     MemoryContext oldctx;
    2856             :     Subscription *newsub;
    2857       12038 :     bool        started_tx = false;
    2858             : 
    2859             :     /* When cache state is valid there is nothing to do here. */
    2860       12038 :     if (MySubscriptionValid)
    2861       11982 :         return;
    2862             : 
    2863             :     /* This function might be called inside or outside of transaction. */
    2864          56 :     if (!IsTransactionState())
    2865             :     {
    2866          50 :         StartTransactionCommand();
    2867          50 :         started_tx = true;
    2868             :     }
    2869             : 
    2870             :     /* Ensure allocations in permanent context. */
    2871          56 :     oldctx = MemoryContextSwitchTo(ApplyContext);
    2872             : 
    2873          56 :     newsub = GetSubscription(MyLogicalRepWorker->subid, true);
    2874             : 
    2875             :     /*
    2876             :      * Exit if the subscription was removed. This normally should not happen
    2877             :      * as the worker gets killed during DROP SUBSCRIPTION.
    2878             :      */
    2879          56 :     if (!newsub)
    2880             :     {
    2881           0 :         ereport(LOG,
    2882             :                 (errmsg("logical replication apply worker for subscription \"%s\" will "
    2883             :                         "stop because the subscription was removed",
    2884             :                         MySubscription->name)));
    2885             : 
    2886           0 :         proc_exit(0);
    2887             :     }
    2888             : 
    2889             :     /*
    2890             :      * Exit if the subscription was disabled. This normally should not happen
    2891             :      * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
    2892             :      */
    2893          56 :     if (!newsub->enabled)
    2894             :     {
    2895           2 :         ereport(LOG,
    2896             :                 (errmsg("logical replication apply worker for subscription \"%s\" will "
    2897             :                         "stop because the subscription was disabled",
    2898             :                         MySubscription->name)));
    2899             : 
    2900           2 :         proc_exit(0);
    2901             :     }
    2902             : 
    2903             :     /* !slotname should never happen when enabled is true. */
    2904             :     Assert(newsub->slotname);
    2905             : 
    2906             :     /* two-phase should not be altered */
    2907             :     Assert(newsub->twophasestate == MySubscription->twophasestate);
    2908             : 
    2909             :     /*
    2910             :      * Exit if any parameter that affects the remote connection was changed.
    2911             :      * The launcher will start a new worker.
    2912             :      */
    2913          54 :     if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
    2914          52 :         strcmp(newsub->name, MySubscription->name) != 0 ||
    2915          50 :         strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
    2916          50 :         newsub->binary != MySubscription->binary ||
    2917          44 :         newsub->stream != MySubscription->stream ||
    2918          40 :         !equal(newsub->publications, MySubscription->publications))
    2919             :     {
    2920          20 :         ereport(LOG,
    2921             :                 (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change",
    2922             :                         MySubscription->name)));
    2923             : 
    2924          20 :         proc_exit(0);
    2925             :     }
    2926             : 
    2927             :     /* Check for other changes that should never happen too. */
    2928          34 :     if (newsub->dbid != MySubscription->dbid)
    2929             :     {
    2930           0 :         elog(ERROR, "subscription %u changed unexpectedly",
    2931             :              MyLogicalRepWorker->subid);
    2932             :     }
    2933             : 
    2934             :     /* Clean old subscription info and switch to new one. */
    2935          34 :     FreeSubscription(MySubscription);
    2936          34 :     MySubscription = newsub;
    2937             : 
    2938          34 :     MemoryContextSwitchTo(oldctx);
    2939             : 
    2940             :     /* Change synchronous commit according to the user's wishes */
    2941          34 :     SetConfigOption("synchronous_commit", MySubscription->synccommit,
    2942             :                     PGC_BACKEND, PGC_S_OVERRIDE);
    2943             : 
    2944          34 :     if (started_tx)
    2945          32 :         CommitTransactionCommand();
    2946             : 
    2947          34 :     MySubscriptionValid = true;
    2948             : }
    2949             : 
    2950             : /*
    2951             :  * Callback from subscription syscache invalidation.
    2952             :  */
    2953             : static void
    2954          58 : subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
    2955             : {
    2956          58 :     MySubscriptionValid = false;
    2957          58 : }
    2958             : 
    2959             : /*
    2960             :  * subxact_info_write
    2961             :  *    Store information about subxacts for a toplevel transaction.
    2962             :  *
    2963             :  * For each subxact we store offset of it's first change in the main file.
    2964             :  * The file is always over-written as a whole.
    2965             :  *
    2966             :  * XXX We should only store subxacts that were not aborted yet.
    2967             :  */
    2968             : static void
    2969         666 : subxact_info_write(Oid subid, TransactionId xid)
    2970             : {
    2971             :     char        path[MAXPGPATH];
    2972             :     Size        len;
    2973             :     BufFile    *fd;
    2974             : 
    2975             :     Assert(TransactionIdIsValid(xid));
    2976             : 
    2977             :     /* construct the subxact filename */
    2978         666 :     subxact_filename(path, subid, xid);
    2979             : 
    2980             :     /* Delete the subxacts file, if exists. */
    2981         666 :     if (subxact_data.nsubxacts == 0)
    2982             :     {
    2983         542 :         cleanup_subxact_info();
    2984         542 :         BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
    2985             : 
    2986         542 :         return;
    2987             :     }
    2988             : 
    2989             :     /*
    2990             :      * Create the subxact file if it not already created, otherwise open the
    2991             :      * existing file.
    2992             :      */
    2993         124 :     fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR,
    2994             :                             true);
    2995         124 :     if (fd == NULL)
    2996          16 :         fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path);
    2997             : 
    2998         124 :     len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
    2999             : 
    3000             :     /* Write the subxact count and subxact info */
    3001         124 :     BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
    3002         124 :     BufFileWrite(fd, subxact_data.subxacts, len);
    3003             : 
    3004         124 :     BufFileClose(fd);
    3005             : 
    3006             :     /* free the memory allocated for subxact info */
    3007         124 :     cleanup_subxact_info();
    3008             : }
    3009             : 
    3010             : /*
    3011             :  * subxact_info_read
    3012             :  *    Restore information about subxacts of a streamed transaction.
    3013             :  *
    3014             :  * Read information about subxacts into the structure subxact_data that can be
    3015             :  * used later.
    3016             :  */
    3017             : static void
    3018         634 : subxact_info_read(Oid subid, TransactionId xid)
    3019             : {
    3020             :     char        path[MAXPGPATH];
    3021             :     Size        len;
    3022             :     BufFile    *fd;
    3023             :     MemoryContext oldctx;
    3024             : 
    3025             :     Assert(!subxact_data.subxacts);
    3026             :     Assert(subxact_data.nsubxacts == 0);
    3027             :     Assert(subxact_data.nsubxacts_max == 0);
    3028             : 
    3029             :     /*
    3030             :      * If the subxact file doesn't exist that means we don't have any subxact
    3031             :      * info.
    3032             :      */
    3033         634 :     subxact_filename(path, subid, xid);
    3034         634 :     fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
    3035             :                             true);
    3036         634 :     if (fd == NULL)
    3037         518 :         return;
    3038             : 
    3039             :     /* read number of subxact items */
    3040         116 :     if (BufFileRead(fd, &subxact_data.nsubxacts,
    3041             :                     sizeof(subxact_data.nsubxacts)) !=
    3042             :         sizeof(subxact_data.nsubxacts))
    3043           0 :         ereport(ERROR,
    3044             :                 (errcode_for_file_access(),
    3045             :                  errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
    3046             :                         path)));
    3047             : 
    3048         116 :     len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
    3049             : 
    3050             :     /* we keep the maximum as a power of 2 */
    3051         116 :     subxact_data.nsubxacts_max = 1 << my_log2(subxact_data.nsubxacts);
    3052             : 
    3053             :     /*
    3054             :      * Allocate subxact information in the logical streaming context. We need
    3055             :      * this information during the complete stream so that we can add the sub
    3056             :      * transaction info to this. On stream stop we will flush this information
    3057             :      * to the subxact file and reset the logical streaming context.
    3058             :      */
    3059         116 :     oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
    3060         116 :     subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
    3061             :                                    sizeof(SubXactInfo));
    3062         116 :     MemoryContextSwitchTo(oldctx);
    3063             : 
    3064         116 :     if ((len > 0) && ((BufFileRead(fd, subxact_data.subxacts, len)) != len))
    3065           0 :         ereport(ERROR,
    3066             :                 (errcode_for_file_access(),
    3067             :                  errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
    3068             :                         path)));
    3069             : 
    3070         116 :     BufFileClose(fd);
    3071             : }
    3072             : 
    3073             : /*
    3074             :  * subxact_info_add
    3075             :  *    Add information about a subxact (offset in the main file).
    3076             :  */
    3077             : static void
    3078      242572 : subxact_info_add(TransactionId xid)
    3079             : {
    3080      242572 :     SubXactInfo *subxacts = subxact_data.subxacts;
    3081             :     int64       i;
    3082             : 
    3083             :     /* We must have a valid top level stream xid and a stream fd. */
    3084             :     Assert(TransactionIdIsValid(stream_xid));
    3085             :     Assert(stream_fd != NULL);
    3086             : 
    3087             :     /*
    3088             :      * If the XID matches the toplevel transaction, we don't want to add it.
    3089             :      */
    3090      242572 :     if (stream_xid == xid)
    3091      198808 :         return;
    3092             : 
    3093             :     /*
    3094             :      * In most cases we're checking the same subxact as we've already seen in
    3095             :      * the last call, so make sure to ignore it (this change comes later).
    3096             :      */
    3097       43764 :     if (subxact_data.subxact_last == xid)
    3098       43626 :         return;
    3099             : 
    3100             :     /* OK, remember we're processing this XID. */
    3101         138 :     subxact_data.subxact_last = xid;
    3102             : 
    3103             :     /*
    3104             :      * Check if the transaction is already present in the array of subxact. We
    3105             :      * intentionally scan the array from the tail, because we're likely adding
    3106             :      * a change for the most recent subtransactions.
    3107             :      *
    3108             :      * XXX Can we rely on the subxact XIDs arriving in sorted order? That
    3109             :      * would allow us to use binary search here.
    3110             :      */
    3111         176 :     for (i = subxact_data.nsubxacts; i > 0; i--)
    3112             :     {
    3113             :         /* found, so we're done */
    3114         138 :         if (subxacts[i - 1].xid == xid)
    3115         100 :             return;
    3116             :     }
    3117             : 
    3118             :     /* This is a new subxact, so we need to add it to the array. */
    3119          38 :     if (subxact_data.nsubxacts == 0)
    3120             :     {
    3121             :         MemoryContext oldctx;
    3122             : 
    3123          16 :         subxact_data.nsubxacts_max = 128;
    3124             : 
    3125             :         /*
    3126             :          * Allocate this memory for subxacts in per-stream context, see
    3127             :          * subxact_info_read.
    3128             :          */
    3129          16 :         oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
    3130          16 :         subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
    3131          16 :         MemoryContextSwitchTo(oldctx);
    3132             :     }
    3133          22 :     else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
    3134             :     {
    3135          20 :         subxact_data.nsubxacts_max *= 2;
    3136          20 :         subxacts = repalloc(subxacts,
    3137          20 :                             subxact_data.nsubxacts_max * sizeof(SubXactInfo));
    3138             :     }
    3139             : 
    3140          38 :     subxacts[subxact_data.nsubxacts].xid = xid;
    3141             : 
    3142             :     /*
    3143             :      * Get the current offset of the stream file and store it as offset of
    3144             :      * this subxact.
    3145             :      */
    3146          38 :     BufFileTell(stream_fd,
    3147          38 :                 &subxacts[subxact_data.nsubxacts].fileno,
    3148          38 :                 &subxacts[subxact_data.nsubxacts].offset);
    3149             : 
    3150          38 :     subxact_data.nsubxacts++;
    3151          38 :     subxact_data.subxacts = subxacts;
    3152             : }
    3153             : 
    3154             : /* format filename for file containing the info about subxacts */
    3155             : static inline void
    3156        1338 : subxact_filename(char *path, Oid subid, TransactionId xid)
    3157             : {
    3158        1338 :     snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
    3159        1338 : }
    3160             : 
    3161             : /* format filename for file containing serialized changes */
    3162             : static inline void
    3163         742 : changes_filename(char *path, Oid subid, TransactionId xid)
    3164             : {
    3165         742 :     snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
    3166         742 : }
    3167             : 
    3168             : /*
    3169             :  * stream_cleanup_files
    3170             :  *    Cleanup files for a subscription / toplevel transaction.
    3171             :  *
    3172             :  * Remove files with serialized changes and subxact info for a particular
    3173             :  * toplevel transaction. Each subscription has a separate set of files
    3174             :  * for any toplevel transaction.
    3175             :  */
    3176             : static void
    3177          38 : stream_cleanup_files(Oid subid, TransactionId xid)
    3178             : {
    3179             :     char        path[MAXPGPATH];
    3180             : 
    3181             :     /* Delete the changes file. */
    3182          38 :     changes_filename(path, subid, xid);
    3183          38 :     BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false);
    3184             : 
    3185             :     /* Delete the subxact file, if it exists. */
    3186          38 :     subxact_filename(path, subid, xid);
    3187          38 :     BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
    3188          38 : }
    3189             : 
    3190             : /*
    3191             :  * stream_open_file
    3192             :  *    Open a file that we'll use to serialize changes for a toplevel
    3193             :  * transaction.
    3194             :  *
    3195             :  * Open a file for streamed changes from a toplevel transaction identified
    3196             :  * by stream_xid (global variable). If it's the first chunk of streamed
    3197             :  * changes for this transaction, create the buffile, otherwise open the
    3198             :  * previously created file.
    3199             :  *
    3200             :  * This can only be called at the beginning of a "streaming" block, i.e.
    3201             :  * between stream_start/stream_stop messages from the upstream.
    3202             :  */
    3203             : static void
    3204         652 : stream_open_file(Oid subid, TransactionId xid, bool first_segment)
    3205             : {
    3206             :     char        path[MAXPGPATH];
    3207             :     MemoryContext oldcxt;
    3208             : 
    3209             :     Assert(in_streamed_transaction);
    3210             :     Assert(OidIsValid(subid));
    3211             :     Assert(TransactionIdIsValid(xid));
    3212             :     Assert(stream_fd == NULL);
    3213             : 
    3214             : 
    3215         652 :     changes_filename(path, subid, xid);
    3216         652 :     elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
    3217             : 
    3218             :     /*
    3219             :      * Create/open the buffiles under the logical streaming context so that we
    3220             :      * have those files until stream stop.
    3221             :      */
    3222         652 :     oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
    3223             : 
    3224             :     /*
    3225             :      * If this is the first streamed segment, create the changes file.
    3226             :      * Otherwise, just open the file for writing, in append mode.
    3227             :      */
    3228         652 :     if (first_segment)
    3229          40 :         stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
    3230             :                                          path);
    3231             :     else
    3232             :     {
    3233             :         /*
    3234             :          * Open the file and seek to the end of the file because we always
    3235             :          * append the changes file.
    3236             :          */
    3237         612 :         stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset,
    3238             :                                        path, O_RDWR, false);
    3239         612 :         BufFileSeek(stream_fd, 0, 0, SEEK_END);
    3240             :     }
    3241             : 
    3242         652 :     MemoryContextSwitchTo(oldcxt);
    3243         652 : }
    3244             : 
    3245             : /*
    3246             :  * stream_close_file
    3247             :  *    Close the currently open file with streamed changes.
    3248             :  *
    3249             :  * This can only be called at the end of a streaming block, i.e. at stream_stop
    3250             :  * message from the upstream.
    3251             :  */
    3252             : static void
    3253         652 : stream_close_file(void)
    3254             : {
    3255             :     Assert(in_streamed_transaction);
    3256             :     Assert(TransactionIdIsValid(stream_xid));
    3257             :     Assert(stream_fd != NULL);
    3258             : 
    3259         652 :     BufFileClose(stream_fd);
    3260             : 
    3261         652 :     stream_xid = InvalidTransactionId;
    3262         652 :     stream_fd = NULL;
    3263         652 : }
    3264             : 
    3265             : /*
    3266             :  * stream_write_change
    3267             :  *    Serialize a change to a file for the current toplevel transaction.
    3268             :  *
    3269             :  * The change is serialized in a simple format, with length (not including
    3270             :  * the length), action code (identifying the message type) and message
    3271             :  * contents (without the subxact TransactionId value).
    3272             :  */
    3273             : static void
    3274      242572 : stream_write_change(char action, StringInfo s)
    3275             : {
    3276             :     int         len;
    3277             : 
    3278             :     Assert(in_streamed_transaction);
    3279             :     Assert(TransactionIdIsValid(stream_xid));
    3280             :     Assert(stream_fd != NULL);
    3281             : 
    3282             :     /* total on-disk size, including the action type character */
    3283      242572 :     len = (s->len - s->cursor) + sizeof(char);
    3284             : 
    3285             :     /* first write the size */
    3286      242572 :     BufFileWrite(stream_fd, &len, sizeof(len));
    3287             : 
    3288             :     /* then the action */
    3289      242572 :     BufFileWrite(stream_fd, &action, sizeof(action));
    3290             : 
    3291             :     /* and finally the remaining part of the buffer (after the XID) */
    3292      242572 :     len = (s->len - s->cursor);
    3293             : 
    3294      242572 :     BufFileWrite(stream_fd, &s->data[s->cursor], len);
    3295      242572 : }
    3296             : 
    3297             : /*
    3298             :  * Cleanup the memory for subxacts and reset the related variables.
    3299             :  */
    3300             : static inline void
    3301         674 : cleanup_subxact_info()
    3302             : {
    3303         674 :     if (subxact_data.subxacts)
    3304         132 :         pfree(subxact_data.subxacts);
    3305             : 
    3306         674 :     subxact_data.subxacts = NULL;
    3307         674 :     subxact_data.subxact_last = InvalidTransactionId;
    3308         674 :     subxact_data.nsubxacts = 0;
    3309         674 :     subxact_data.nsubxacts_max = 0;
    3310         674 : }
    3311             : 
    3312             : /*
    3313             :  * Form the prepared transaction GID for two_phase transactions.
    3314             :  *
    3315             :  * Return the GID in the supplied buffer.
    3316             :  */
    3317             : static void
    3318          74 : TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
    3319             : {
    3320             :     Assert(subid != InvalidRepOriginId);
    3321             : 
    3322          74 :     if (!TransactionIdIsValid(xid))
    3323           0 :         ereport(ERROR,
    3324             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    3325             :                  errmsg_internal("invalid two-phase transaction ID")));
    3326             : 
    3327          74 :     snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
    3328          74 : }
    3329             : 
    3330             : /* Logical Replication Apply worker entry point */
    3331             : void
    3332         316 : ApplyWorkerMain(Datum main_arg)
    3333             : {
    3334         316 :     int         worker_slot = DatumGetInt32(main_arg);
    3335         316 :     MemoryContext cctx = CurrentMemoryContext;
    3336             :     MemoryContext oldctx;
    3337             :     char        originname[NAMEDATALEN];
    3338             :     XLogRecPtr  origin_startpos;
    3339             :     char       *myslotname;
    3340             :     WalRcvStreamOptions options;
    3341             :     int         server_version;
    3342             : 
    3343             :     /* Attach to slot */
    3344         316 :     logicalrep_worker_attach(worker_slot);
    3345             : 
    3346             :     /* Setup signal handling */
    3347         316 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
    3348         316 :     pqsignal(SIGTERM, die);
    3349         316 :     BackgroundWorkerUnblockSignals();
    3350             : 
    3351             :     /*
    3352             :      * We don't currently need any ResourceOwner in a walreceiver process, but
    3353             :      * if we did, we could call CreateAuxProcessResourceOwner here.
    3354             :      */
    3355             : 
    3356             :     /* Initialise stats to a sanish value */
    3357         316 :     MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
    3358         316 :         MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
    3359             : 
    3360             :     /* Load the libpq-specific functions */
    3361         316 :     load_file("libpqwalreceiver", false);
    3362             : 
    3363             :     /* Run as replica session replication role. */
    3364         316 :     SetConfigOption("session_replication_role", "replica",
    3365             :                     PGC_SUSET, PGC_S_OVERRIDE);
    3366             : 
    3367             :     /* Connect to our database. */
    3368         316 :     BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
    3369         316 :                                               MyLogicalRepWorker->userid,
    3370             :                                               0);
    3371             : 
    3372             :     /*
    3373             :      * Set always-secure search path, so malicious users can't redirect user
    3374             :      * code (e.g. pg_index.indexprs).
    3375             :      */
    3376         312 :     SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
    3377             : 
    3378             :     /* Load the subscription into persistent memory context. */
    3379         312 :     ApplyContext = AllocSetContextCreate(TopMemoryContext,
    3380             :                                          "ApplyContext",
    3381             :                                          ALLOCSET_DEFAULT_SIZES);
    3382         312 :     StartTransactionCommand();
    3383         312 :     oldctx = MemoryContextSwitchTo(ApplyContext);
    3384             : 
    3385         312 :     MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
    3386         312 :     if (!MySubscription)
    3387             :     {
    3388           0 :         ereport(LOG,
    3389             :                 (errmsg("logical replication apply worker for subscription %u will not "
    3390             :                         "start because the subscription was removed during startup",
    3391             :                         MyLogicalRepWorker->subid)));
    3392           0 :         proc_exit(0);
    3393             :     }
    3394             : 
    3395         312 :     MySubscriptionValid = true;
    3396         312 :     MemoryContextSwitchTo(oldctx);
    3397             : 
    3398         312 :     if (!MySubscription->enabled)
    3399             :     {
    3400           0 :         ereport(LOG,
    3401             :                 (errmsg("logical replication apply worker for subscription \"%s\" will not "
    3402             :                         "start because the subscription was disabled during startup",
    3403             :                         MySubscription->name)));
    3404             : 
    3405           0 :         proc_exit(0);
    3406             :     }
    3407             : 
    3408             :     /* Setup synchronous commit according to the user's wishes */
    3409         312 :     SetConfigOption("synchronous_commit", MySubscription->synccommit,
    3410             :                     PGC_BACKEND, PGC_S_OVERRIDE);
    3411             : 
    3412             :     /* Keep us informed about subscription changes. */
    3413         312 :     CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
    3414             :                                   subscription_change_cb,
    3415             :                                   (Datum) 0);
    3416             : 
    3417         312 :     if (am_tablesync_worker())
    3418         178 :         ereport(LOG,
    3419             :                 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
    3420             :                         MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
    3421             :     else
    3422         134 :         ereport(LOG,
    3423             :                 (errmsg("logical replication apply worker for subscription \"%s\" has started",
    3424             :                         MySubscription->name)));
    3425             : 
    3426         312 :     CommitTransactionCommand();
    3427             : 
    3428             :     /* Connect to the origin and start the replication. */
    3429         312 :     elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
    3430             :          MySubscription->conninfo);
    3431             : 
    3432         312 :     if (am_tablesync_worker())
    3433             :     {
    3434             :         char       *syncslotname;
    3435             : 
    3436         178 :         PG_TRY();
    3437             :         {
    3438             :             /* This is table synchronization worker, call initial sync. */
    3439         178 :             syncslotname = LogicalRepSyncTableStart(&origin_startpos);
    3440             :         }
    3441          10 :         PG_CATCH();
    3442             :         {
    3443          10 :             MemoryContext ecxt = MemoryContextSwitchTo(cctx);
    3444          10 :             ErrorData  *errdata = CopyErrorData();
    3445             : 
    3446             :             /*
    3447             :              * Report the table sync error. There is no corresponding message
    3448             :              * type for table synchronization.
    3449             :              */
    3450          10 :             pgstat_report_subworker_error(MyLogicalRepWorker->subid,
    3451          10 :                                           MyLogicalRepWorker->relid,
    3452          10 :                                           MyLogicalRepWorker->relid,
    3453             :                                           0,    /* message type */
    3454             :                                           InvalidTransactionId,
    3455          10 :                                           errdata->message);
    3456          10 :             MemoryContextSwitchTo(ecxt);
    3457          10 :             PG_RE_THROW();
    3458             :         }
    3459         166 :         PG_END_TRY();
    3460             : 
    3461             :         /* allocate slot name in long-lived context */
    3462         166 :         myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
    3463             : 
    3464         166 :         pfree(syncslotname);
    3465             :     }
    3466             :     else
    3467             :     {
    3468             :         /* This is main apply worker */
    3469             :         RepOriginId originid;
    3470             :         TimeLineID  startpointTLI;
    3471             :         char       *err;
    3472             : 
    3473         134 :         myslotname = MySubscription->slotname;
    3474             : 
    3475             :         /*
    3476             :          * This shouldn't happen if the subscription is enabled, but guard
    3477             :          * against DDL bugs or manual catalog changes.  (libpqwalreceiver will
    3478             :          * crash if slot is NULL.)
    3479             :          */
    3480         134 :         if (!myslotname)
    3481           0 :             ereport(ERROR,
    3482             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    3483             :                      errmsg("subscription has no replication slot set")));
    3484             : 
    3485             :         /* Setup replication origin tracking. */
    3486         134 :         StartTransactionCommand();
    3487         134 :         snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
    3488         134 :         originid = replorigin_by_name(originname, true);
    3489         134 :         if (!OidIsValid(originid))
    3490           0 :             originid = replorigin_create(originname);
    3491         134 :         replorigin_session_setup(originid);
    3492         134 :         replorigin_session_origin = originid;
    3493         134 :         origin_startpos = replorigin_session_get_progress(false);
    3494         134 :         CommitTransactionCommand();
    3495             : 
    3496         134 :         LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
    3497             :                                                 MySubscription->name, &err);
    3498         134 :         if (LogRepWorkerWalRcvConn == NULL)
    3499           4 :             ereport(ERROR,
    3500             :                     (errcode(ERRCODE_CONNECTION_FAILURE),
    3501             :                      errmsg("could not connect to the publisher: %s", err)));
    3502             : 
    3503             :         /*
    3504             :          * We don't really use the output identify_system for anything but it
    3505             :          * does some initializations on the upstream so let's still call it.
    3506             :          */
    3507         130 :         (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
    3508             :     }
    3509             : 
    3510             :     /*
    3511             :      * Setup callback for syscache so that we know when something changes in
    3512             :      * the subscription relation state.
    3513             :      */
    3514         296 :     CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
    3515             :                                   invalidate_syncing_table_states,
    3516             :                                   (Datum) 0);
    3517             : 
    3518             :     /* Build logical replication streaming options. */
    3519         296 :     options.logical = true;
    3520         296 :     options.startpoint = origin_startpos;
    3521         296 :     options.slotname = myslotname;
    3522             : 
    3523         296 :     server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
    3524         296 :     options.proto.logical.proto_version =
    3525         296 :         server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
    3526             :         server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
    3527             :         LOGICALREP_PROTO_VERSION_NUM;
    3528             : 
    3529         296 :     options.proto.logical.publication_names = MySubscription->publications;
    3530         296 :     options.proto.logical.binary = MySubscription->binary;
    3531         296 :     options.proto.logical.streaming = MySubscription->stream;
    3532         296 :     options.proto.logical.twophase = false;
    3533             : 
    3534         296 :     if (!am_tablesync_worker())
    3535             :     {
    3536             :         /*
    3537             :          * Even when the two_phase mode is requested by the user, it remains
    3538             :          * as the tri-state PENDING until all tablesyncs have reached READY
    3539             :          * state. Only then, can it become ENABLED.
    3540             :          *
    3541             :          * Note: If the subscription has no tables then leave the state as
    3542             :          * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
    3543             :          * work.
    3544             :          */
    3545         130 :         if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
    3546          12 :             AllTablesyncsReady())
    3547             :         {
    3548             :             /* Start streaming with two_phase enabled */
    3549           4 :             options.proto.logical.twophase = true;
    3550           4 :             walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
    3551             : 
    3552           4 :             StartTransactionCommand();
    3553           4 :             UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
    3554           4 :             MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
    3555           4 :             CommitTransactionCommand();
    3556             :         }
    3557             :         else
    3558             :         {
    3559         126 :             walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
    3560             :         }
    3561             : 
    3562         130 :         ereport(DEBUG1,
    3563             :                 (errmsg("logical replication apply worker for subscription \"%s\" two_phase is %s.",
    3564             :                         MySubscription->name,
    3565             :                         MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
    3566             :                         MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
    3567             :                         MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
    3568             :                         "?")));
    3569             :     }
    3570             :     else
    3571             :     {
    3572             :         /* Start normal logical streaming replication. */
    3573         166 :         walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
    3574             :     }
    3575             : 
    3576             :     /* Run the main loop. */
    3577         296 :     PG_TRY();
    3578             :     {
    3579         296 :         LogicalRepApplyLoop(origin_startpos);
    3580             :     }
    3581          38 :     PG_CATCH();
    3582             :     {
    3583             :         /* report the apply error */
    3584          38 :         if (apply_error_callback_arg.command != 0)
    3585             :         {
    3586          12 :             MemoryContext ecxt = MemoryContextSwitchTo(cctx);
    3587          12 :             ErrorData  *errdata = CopyErrorData();
    3588             : 
    3589          10 :             pgstat_report_subworker_error(MyLogicalRepWorker->subid,
    3590          12 :                                           MyLogicalRepWorker->relid,
    3591          12 :                                           apply_error_callback_arg.rel != NULL
    3592           2 :                                           ? apply_error_callback_arg.rel->localreloid
    3593             :                                           : InvalidOid,
    3594             :                                           apply_error_callback_arg.command,
    3595             :                                           apply_error_callback_arg.remote_xid,
    3596          12 :                                           errdata->message);
    3597          12 :             MemoryContextSwitchTo(ecxt);
    3598             :         }
    3599             : 
    3600          38 :         PG_RE_THROW();
    3601             :     }
    3602           0 :     PG_END_TRY();
    3603             : 
    3604           0 :     proc_exit(0);
    3605             : }
    3606             : 
    3607             : /*
    3608             :  * Is current process a logical replication worker?
    3609             :  */
    3610             : bool
    3611         576 : IsLogicalWorker(void)
    3612             : {
    3613         576 :     return MyLogicalRepWorker != NULL;
    3614             : }
    3615             : 
    3616             : /* Error callback to give more context info about the change being applied */
    3617             : static void
    3618         312 : apply_error_callback(void *arg)
    3619             : {
    3620             :     StringInfoData buf;
    3621         312 :     ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
    3622             : 
    3623         312 :     if (apply_error_callback_arg.command == 0)
    3624         280 :         return;
    3625             : 
    3626          32 :     initStringInfo(&buf);
    3627          32 :     appendStringInfo(&buf, _("processing remote data during \"%s\""),
    3628             :                      logicalrep_message_type(errarg->command));
    3629             : 
    3630             :     /* append relation information */
    3631          32 :     if (errarg->rel)
    3632             :     {
    3633          18 :         appendStringInfo(&buf, _(" for replication target relation \"%s.%s\""),
    3634          18 :                          errarg->rel->remoterel.nspname,
    3635          18 :                          errarg->rel->remoterel.relname);
    3636          18 :         if (errarg->remote_attnum >= 0)
    3637           0 :             appendStringInfo(&buf, _(" column \"%s\""),
    3638           0 :                              errarg->rel->remoterel.attnames[errarg->remote_attnum]);
    3639             :     }
    3640             : 
    3641             :     /* append transaction information */
    3642          32 :     if (TransactionIdIsNormal(errarg->remote_xid))
    3643             :     {
    3644          32 :         appendStringInfo(&buf, _(" in transaction %u"), errarg->remote_xid);
    3645          32 :         if (errarg->ts != 0)
    3646          32 :             appendStringInfo(&buf, _(" at %s"),
    3647             :                              timestamptz_to_str(errarg->ts));
    3648             :     }
    3649             : 
    3650          32 :     errcontext("%s", buf.data);
    3651          32 :     pfree(buf.data);
    3652             : }
    3653             : 
    3654             : /* Set transaction information of apply error callback */
    3655             : static inline void
    3656        2868 : set_apply_error_context_xact(TransactionId xid, TimestampTz ts)
    3657             : {
    3658        2868 :     apply_error_callback_arg.remote_xid = xid;
    3659        2868 :     apply_error_callback_arg.ts = ts;
    3660        2868 : }
    3661             : 
    3662             : /* Reset all information of apply error callback */
    3663             : static inline void
    3664        1424 : reset_apply_error_context_info(void)
    3665             : {
    3666        1424 :     apply_error_callback_arg.command = 0;
    3667        1424 :     apply_error_callback_arg.rel = NULL;
    3668        1424 :     apply_error_callback_arg.remote_attnum = -1;
    3669        1424 :     set_apply_error_context_xact(InvalidTransactionId, 0);
    3670        1424 : }

Generated by: LCOV version 1.14