LCOV - code coverage report
Current view: top level - src/backend/replication/logical - worker.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 638 708 90.1 %
Date: 2020-06-01 00:06:26 Functions: 31 33 93.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * worker.c
       3             :  *     PostgreSQL logical replication worker (apply)
       4             :  *
       5             :  * Copyright (c) 2016-2020, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/worker.c
       9             :  *
      10             :  * NOTES
      11             :  *    This file contains the worker which applies logical changes as they come
      12             :  *    from remote logical replication stream.
      13             :  *
      14             :  *    The main worker (apply) is started by logical replication worker
      15             :  *    launcher for every enabled subscription in a database. It uses
      16             :  *    walsender protocol to communicate with publisher.
      17             :  *
      18             :  *    This module includes server facing code and shares libpqwalreceiver
      19             :  *    module with walreceiver for providing the libpq specific functionality.
      20             :  *
      21             :  *-------------------------------------------------------------------------
      22             :  */
      23             : 
      24             : #include "postgres.h"
      25             : 
      26             : #include "access/table.h"
      27             : #include "access/tableam.h"
      28             : #include "access/xact.h"
      29             : #include "access/xlog_internal.h"
      30             : #include "catalog/catalog.h"
      31             : #include "catalog/namespace.h"
      32             : #include "catalog/partition.h"
      33             : #include "catalog/pg_inherits.h"
      34             : #include "catalog/pg_subscription.h"
      35             : #include "catalog/pg_subscription_rel.h"
      36             : #include "commands/tablecmds.h"
      37             : #include "commands/trigger.h"
      38             : #include "executor/executor.h"
      39             : #include "executor/execPartition.h"
      40             : #include "executor/nodeModifyTable.h"
      41             : #include "funcapi.h"
      42             : #include "libpq/pqformat.h"
      43             : #include "libpq/pqsignal.h"
      44             : #include "mb/pg_wchar.h"
      45             : #include "miscadmin.h"
      46             : #include "nodes/makefuncs.h"
      47             : #include "optimizer/optimizer.h"
      48             : #include "parser/analyze.h"
      49             : #include "parser/parse_relation.h"
      50             : #include "pgstat.h"
      51             : #include "postmaster/bgworker.h"
      52             : #include "postmaster/interrupt.h"
      53             : #include "postmaster/postmaster.h"
      54             : #include "postmaster/walwriter.h"
      55             : #include "replication/decode.h"
      56             : #include "replication/logical.h"
      57             : #include "replication/logicalproto.h"
      58             : #include "replication/logicalrelation.h"
      59             : #include "replication/logicalworker.h"
      60             : #include "replication/origin.h"
      61             : #include "replication/reorderbuffer.h"
      62             : #include "replication/snapbuild.h"
      63             : #include "replication/walreceiver.h"
      64             : #include "replication/worker_internal.h"
      65             : #include "rewrite/rewriteHandler.h"
      66             : #include "storage/bufmgr.h"
      67             : #include "storage/ipc.h"
      68             : #include "storage/lmgr.h"
      69             : #include "storage/proc.h"
      70             : #include "storage/procarray.h"
      71             : #include "tcop/tcopprot.h"
      72             : #include "utils/builtins.h"
      73             : #include "utils/catcache.h"
      74             : #include "utils/datum.h"
      75             : #include "utils/fmgroids.h"
      76             : #include "utils/guc.h"
      77             : #include "utils/inval.h"
      78             : #include "utils/lsyscache.h"
      79             : #include "utils/memutils.h"
      80             : #include "utils/rel.h"
      81             : #include "utils/syscache.h"
      82             : #include "utils/timeout.h"
      83             : 
      84             : #define NAPTIME_PER_CYCLE 1000  /* max sleep time between cycles (1s) */
      85             : 
      86             : typedef struct FlushPosition
      87             : {
      88             :     dlist_node  node;
      89             :     XLogRecPtr  local_end;
      90             :     XLogRecPtr  remote_end;
      91             : } FlushPosition;
      92             : 
      93             : static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
      94             : 
      95             : typedef struct SlotErrCallbackArg
      96             : {
      97             :     LogicalRepRelMapEntry *rel;
      98             :     int         local_attnum;
      99             :     int         remote_attnum;
     100             : } SlotErrCallbackArg;
     101             : 
     102             : static MemoryContext ApplyMessageContext = NULL;
     103             : MemoryContext ApplyContext = NULL;
     104             : 
     105             : WalReceiverConn *wrconn = NULL;
     106             : 
     107             : Subscription *MySubscription = NULL;
     108             : bool        MySubscriptionValid = false;
     109             : 
     110             : bool        in_remote_transaction = false;
     111             : static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
     112             : 
     113             : static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
     114             : 
     115             : static void store_flush_position(XLogRecPtr remote_lsn);
     116             : 
     117             : static void maybe_reread_subscription(void);
     118             : 
     119             : static void apply_handle_insert_internal(ResultRelInfo *relinfo,
     120             :                                          EState *estate, TupleTableSlot *remoteslot);
     121             : static void apply_handle_update_internal(ResultRelInfo *relinfo,
     122             :                                          EState *estate, TupleTableSlot *remoteslot,
     123             :                                          LogicalRepTupleData *newtup,
     124             :                                          LogicalRepRelMapEntry *relmapentry);
     125             : static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
     126             :                                          TupleTableSlot *remoteslot,
     127             :                                          LogicalRepRelation *remoterel);
     128             : static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
     129             :                                     LogicalRepRelation *remoterel,
     130             :                                     TupleTableSlot *remoteslot,
     131             :                                     TupleTableSlot **localslot);
     132             : static void apply_handle_tuple_routing(ResultRelInfo *relinfo,
     133             :                                        EState *estate,
     134             :                                        TupleTableSlot *remoteslot,
     135             :                                        LogicalRepTupleData *newtup,
     136             :                                        LogicalRepRelMapEntry *relmapentry,
     137             :                                        CmdType operation);
     138             : 
     139             : /*
     140             :  * Should this worker apply changes for given relation.
     141             :  *
     142             :  * This is mainly needed for initial relation data sync as that runs in
     143             :  * separate worker process running in parallel and we need some way to skip
     144             :  * changes coming to the main apply worker during the sync of a table.
     145             :  *
     146             :  * Note we need to do smaller or equals comparison for SYNCDONE state because
     147             :  * it might hold position of end of initial slot consistent point WAL
     148             :  * record + 1 (ie start of next record) and next record can be COMMIT of
     149             :  * transaction we are now processing (which is what we set remote_final_lsn
     150             :  * to in apply_handle_begin).
     151             :  */
     152             : static bool
     153        1612 : should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
     154             : {
     155        1612 :     if (am_tablesync_worker())
     156           0 :         return MyLogicalRepWorker->relid == rel->localreloid;
     157             :     else
     158        1636 :         return (rel->state == SUBREL_STATE_READY ||
     159          24 :                 (rel->state == SUBREL_STATE_SYNCDONE &&
     160           0 :                  rel->statelsn <= remote_final_lsn));
     161             : }
     162             : 
     163             : /*
     164             :  * Make sure that we started local transaction.
     165             :  *
     166             :  * Also switches to ApplyMessageContext as necessary.
     167             :  */
     168             : static bool
     169        1598 : ensure_transaction(void)
     170             : {
     171        1598 :     if (IsTransactionState())
     172             :     {
     173        1252 :         SetCurrentStatementStartTimestamp();
     174             : 
     175        1252 :         if (CurrentMemoryContext != ApplyMessageContext)
     176           0 :             MemoryContextSwitchTo(ApplyMessageContext);
     177             : 
     178        1252 :         return false;
     179             :     }
     180             : 
     181         346 :     SetCurrentStatementStartTimestamp();
     182         346 :     StartTransactionCommand();
     183             : 
     184         346 :     maybe_reread_subscription();
     185             : 
     186         346 :     MemoryContextSwitchTo(ApplyMessageContext);
     187         346 :     return true;
     188             : }
     189             : 
     190             : 
     191             : /*
     192             :  * Executor state preparation for evaluation of constraint expressions,
     193             :  * indexes and triggers.
     194             :  *
     195             :  * This is based on similar code in copy.c
     196             :  */
     197             : static EState *
     198        1548 : create_estate_for_relation(LogicalRepRelMapEntry *rel)
     199             : {
     200             :     EState     *estate;
     201             :     ResultRelInfo *resultRelInfo;
     202             :     RangeTblEntry *rte;
     203             : 
     204        1548 :     estate = CreateExecutorState();
     205             : 
     206        1548 :     rte = makeNode(RangeTblEntry);
     207        1548 :     rte->rtekind = RTE_RELATION;
     208        1548 :     rte->relid = RelationGetRelid(rel->localrel);
     209        1548 :     rte->relkind = rel->localrel->rd_rel->relkind;
     210        1548 :     rte->rellockmode = AccessShareLock;
     211        1548 :     ExecInitRangeTable(estate, list_make1(rte));
     212             : 
     213        1548 :     resultRelInfo = makeNode(ResultRelInfo);
     214        1548 :     InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
     215             : 
     216        1548 :     estate->es_result_relations = resultRelInfo;
     217        1548 :     estate->es_num_result_relations = 1;
     218        1548 :     estate->es_result_relation_info = resultRelInfo;
     219             : 
     220        1548 :     estate->es_output_cid = GetCurrentCommandId(true);
     221             : 
     222             :     /* Prepare to catch AFTER triggers. */
     223        1548 :     AfterTriggerBeginQuery();
     224             : 
     225        1548 :     return estate;
     226             : }
     227             : 
     228             : /*
     229             :  * Executes default values for columns for which we can't map to remote
     230             :  * relation columns.
     231             :  *
     232             :  * This allows us to support tables which have more columns on the downstream
     233             :  * than on the upstream.
     234             :  */
     235             : static void
     236         834 : slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
     237             :                    TupleTableSlot *slot)
     238             : {
     239         834 :     TupleDesc   desc = RelationGetDescr(rel->localrel);
     240         834 :     int         num_phys_attrs = desc->natts;
     241             :     int         i;
     242             :     int         attnum,
     243         834 :                 num_defaults = 0;
     244             :     int        *defmap;
     245             :     ExprState **defexprs;
     246             :     ExprContext *econtext;
     247             : 
     248         834 :     econtext = GetPerTupleExprContext(estate);
     249             : 
     250             :     /* We got all the data via replication, no need to evaluate anything. */
     251         834 :     if (num_phys_attrs == rel->remoterel.natts)
     252         736 :         return;
     253             : 
     254          98 :     defmap = (int *) palloc(num_phys_attrs * sizeof(int));
     255          98 :     defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
     256             : 
     257             :     Assert(rel->attrmap->maplen == num_phys_attrs);
     258         392 :     for (attnum = 0; attnum < num_phys_attrs; attnum++)
     259             :     {
     260             :         Expr       *defexpr;
     261             : 
     262         294 :         if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
     263           4 :             continue;
     264             : 
     265         290 :         if (rel->attrmap->attnums[attnum] >= 0)
     266         192 :             continue;
     267             : 
     268          98 :         defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
     269             : 
     270          98 :         if (defexpr != NULL)
     271             :         {
     272             :             /* Run the expression through planner */
     273          98 :             defexpr = expression_planner(defexpr);
     274             : 
     275             :             /* Initialize executable expression in copycontext */
     276          98 :             defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
     277          98 :             defmap[num_defaults] = attnum;
     278          98 :             num_defaults++;
     279             :         }
     280             : 
     281             :     }
     282             : 
     283         196 :     for (i = 0; i < num_defaults; i++)
     284          98 :         slot->tts_values[defmap[i]] =
     285          98 :             ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
     286             : }
     287             : 
     288             : /*
     289             :  * Error callback to give more context info about type conversion failure.
     290             :  */
     291             : static void
     292           0 : slot_store_error_callback(void *arg)
     293             : {
     294           0 :     SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
     295             :     LogicalRepRelMapEntry *rel;
     296             :     char       *remotetypname;
     297             :     Oid         remotetypoid,
     298             :                 localtypoid;
     299             : 
     300             :     /* Nothing to do if remote attribute number is not set */
     301           0 :     if (errarg->remote_attnum < 0)
     302           0 :         return;
     303             : 
     304           0 :     rel = errarg->rel;
     305           0 :     remotetypoid = rel->remoterel.atttyps[errarg->remote_attnum];
     306             : 
     307             :     /* Fetch remote type name from the LogicalRepTypMap cache */
     308           0 :     remotetypname = logicalrep_typmap_gettypname(remotetypoid);
     309             : 
     310             :     /* Fetch local type OID from the local sys cache */
     311           0 :     localtypoid = get_atttype(rel->localreloid, errarg->local_attnum + 1);
     312             : 
     313           0 :     errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
     314             :                "remote type %s, local type %s",
     315             :                rel->remoterel.nspname, rel->remoterel.relname,
     316           0 :                rel->remoterel.attnames[errarg->remote_attnum],
     317             :                remotetypname,
     318             :                format_type_be(localtypoid));
     319             : }
     320             : 
     321             : /*
     322             :  * Store data in C string form into slot.
     323             :  * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
     324             :  * use better.
     325             :  */
     326             : static void
     327        1548 : slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
     328             :                     char **values)
     329             : {
     330        1548 :     int         natts = slot->tts_tupleDescriptor->natts;
     331             :     int         i;
     332             :     SlotErrCallbackArg errarg;
     333             :     ErrorContextCallback errcallback;
     334             : 
     335        1548 :     ExecClearTuple(slot);
     336             : 
     337             :     /* Push callback + info on the error context stack */
     338        1548 :     errarg.rel = rel;
     339        1548 :     errarg.local_attnum = -1;
     340        1548 :     errarg.remote_attnum = -1;
     341        1548 :     errcallback.callback = slot_store_error_callback;
     342        1548 :     errcallback.arg = (void *) &errarg;
     343        1548 :     errcallback.previous = error_context_stack;
     344        1548 :     error_context_stack = &errcallback;
     345             : 
     346             :     /* Call the "in" function for each non-dropped attribute */
     347             :     Assert(natts == rel->attrmap->maplen);
     348        4122 :     for (i = 0; i < natts; i++)
     349             :     {
     350        2574 :         Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
     351        2574 :         int         remoteattnum = rel->attrmap->attnums[i];
     352             : 
     353        2574 :         if (!att->attisdropped && remoteattnum >= 0 &&
     354        2330 :             values[remoteattnum] != NULL)
     355        1852 :         {
     356             :             Oid         typinput;
     357             :             Oid         typioparam;
     358             : 
     359        1852 :             errarg.local_attnum = i;
     360        1852 :             errarg.remote_attnum = remoteattnum;
     361             : 
     362        1852 :             getTypeInputInfo(att->atttypid, &typinput, &typioparam);
     363        3704 :             slot->tts_values[i] =
     364        1852 :                 OidInputFunctionCall(typinput, values[remoteattnum],
     365             :                                      typioparam, att->atttypmod);
     366        1852 :             slot->tts_isnull[i] = false;
     367             : 
     368        1852 :             errarg.local_attnum = -1;
     369        1852 :             errarg.remote_attnum = -1;
     370             :         }
     371             :         else
     372             :         {
     373             :             /*
     374             :              * We assign NULL to dropped attributes, NULL values, and missing
     375             :              * values (missing values should be later filled using
     376             :              * slot_fill_defaults).
     377             :              */
     378         722 :             slot->tts_values[i] = (Datum) 0;
     379         722 :             slot->tts_isnull[i] = true;
     380             :         }
     381             :     }
     382             : 
     383             :     /* Pop the error context stack */
     384        1548 :     error_context_stack = errcallback.previous;
     385             : 
     386        1548 :     ExecStoreVirtualTuple(slot);
     387        1548 : }
     388             : 
     389             : /*
     390             :  * Replace selected columns with user data provided as C strings.
     391             :  * This is somewhat similar to heap_modify_tuple but also calls the type
     392             :  * input functions on the user data.
     393             :  * "slot" is filled with a copy of the tuple in "srcslot", with
     394             :  * columns selected by the "replaces" array replaced with data values
     395             :  * from "values".
     396             :  * Caution: unreplaced pass-by-ref columns in "slot" will point into the
     397             :  * storage for "srcslot".  This is OK for current usage, but someday we may
     398             :  * need to materialize "slot" at the end to make it independent of "srcslot".
     399             :  */
     400             : static void
     401         262 : slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot,
     402             :                      LogicalRepRelMapEntry *rel,
     403             :                      char **values, bool *replaces)
     404             : {
     405         262 :     int         natts = slot->tts_tupleDescriptor->natts;
     406             :     int         i;
     407             :     SlotErrCallbackArg errarg;
     408             :     ErrorContextCallback errcallback;
     409             : 
     410             :     /* We'll fill "slot" with a virtual tuple, so we must start with ... */
     411         262 :     ExecClearTuple(slot);
     412             : 
     413             :     /*
     414             :      * Copy all the column data from srcslot, so that we'll have valid values
     415             :      * for unreplaced columns.
     416             :      */
     417             :     Assert(natts == srcslot->tts_tupleDescriptor->natts);
     418         262 :     slot_getallattrs(srcslot);
     419         262 :     memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
     420         262 :     memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
     421             : 
     422             :     /* For error reporting, push callback + info on the error context stack */
     423         262 :     errarg.rel = rel;
     424         262 :     errarg.local_attnum = -1;
     425         262 :     errarg.remote_attnum = -1;
     426         262 :     errcallback.callback = slot_store_error_callback;
     427         262 :     errcallback.arg = (void *) &errarg;
     428         262 :     errcallback.previous = error_context_stack;
     429         262 :     error_context_stack = &errcallback;
     430             : 
     431             :     /* Call the "in" function for each replaced attribute */
     432             :     Assert(natts == rel->attrmap->maplen);
     433         798 :     for (i = 0; i < natts; i++)
     434             :     {
     435         536 :         Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
     436         536 :         int         remoteattnum = rel->attrmap->attnums[i];
     437             : 
     438         536 :         if (remoteattnum < 0)
     439          72 :             continue;
     440             : 
     441         464 :         if (!replaces[remoteattnum])
     442           2 :             continue;
     443             : 
     444         462 :         if (values[remoteattnum] != NULL)
     445             :         {
     446             :             Oid         typinput;
     447             :             Oid         typioparam;
     448             : 
     449         388 :             errarg.local_attnum = i;
     450         388 :             errarg.remote_attnum = remoteattnum;
     451             : 
     452         388 :             getTypeInputInfo(att->atttypid, &typinput, &typioparam);
     453         776 :             slot->tts_values[i] =
     454         388 :                 OidInputFunctionCall(typinput, values[remoteattnum],
     455             :                                      typioparam, att->atttypmod);
     456         388 :             slot->tts_isnull[i] = false;
     457             : 
     458         388 :             errarg.local_attnum = -1;
     459         388 :             errarg.remote_attnum = -1;
     460             :         }
     461             :         else
     462             :         {
     463          74 :             slot->tts_values[i] = (Datum) 0;
     464          74 :             slot->tts_isnull[i] = true;
     465             :         }
     466             :     }
     467             : 
     468             :     /* Pop the error context stack */
     469         262 :     error_context_stack = errcallback.previous;
     470             : 
     471             :     /* And finally, declare that "slot" contains a valid virtual tuple */
     472         262 :     ExecStoreVirtualTuple(slot);
     473         262 : }
     474             : 
     475             : /*
     476             :  * Handle BEGIN message.
     477             :  */
     478             : static void
     479         462 : apply_handle_begin(StringInfo s)
     480             : {
     481             :     LogicalRepBeginData begin_data;
     482             : 
     483         462 :     logicalrep_read_begin(s, &begin_data);
     484             : 
     485         462 :     remote_final_lsn = begin_data.final_lsn;
     486             : 
     487         462 :     in_remote_transaction = true;
     488             : 
     489         462 :     pgstat_report_activity(STATE_RUNNING, NULL);
     490         462 : }
     491             : 
     492             : /*
     493             :  * Handle COMMIT message.
     494             :  *
     495             :  * TODO, support tracking of multiple origins
     496             :  */
     497             : static void
     498         460 : apply_handle_commit(StringInfo s)
     499             : {
     500             :     LogicalRepCommitData commit_data;
     501             : 
     502         460 :     logicalrep_read_commit(s, &commit_data);
     503             : 
     504             :     Assert(commit_data.commit_lsn == remote_final_lsn);
     505             : 
     506             :     /* The synchronization worker runs in single transaction. */
     507         460 :     if (IsTransactionState() && !am_tablesync_worker())
     508             :     {
     509             :         /*
     510             :          * Update origin state so we can restart streaming from correct
     511             :          * position in case of crash.
     512             :          */
     513         344 :         replorigin_session_origin_lsn = commit_data.end_lsn;
     514         344 :         replorigin_session_origin_timestamp = commit_data.committime;
     515             : 
     516         344 :         CommitTransactionCommand();
     517         344 :         pgstat_report_stat(false);
     518             : 
     519         344 :         store_flush_position(commit_data.end_lsn);
     520             :     }
     521             :     else
     522             :     {
     523             :         /* Process any invalidation messages that might have accumulated. */
     524         116 :         AcceptInvalidationMessages();
     525         116 :         maybe_reread_subscription();
     526             :     }
     527             : 
     528         460 :     in_remote_transaction = false;
     529             : 
     530             :     /* Process any tables that are being synchronized in parallel. */
     531         460 :     process_syncing_tables(commit_data.end_lsn);
     532             : 
     533         460 :     pgstat_report_activity(STATE_IDLE, NULL);
     534         460 : }
     535             : 
     536             : /*
     537             :  * Handle ORIGIN message.
     538             :  *
     539             :  * TODO, support tracking of multiple origins
     540             :  */
     541             : static void
     542           0 : apply_handle_origin(StringInfo s)
     543             : {
     544             :     /*
     545             :      * ORIGIN message can only come inside remote transaction and before any
     546             :      * actual writes.
     547             :      */
     548           0 :     if (!in_remote_transaction ||
     549           0 :         (IsTransactionState() && !am_tablesync_worker()))
     550           0 :         ereport(ERROR,
     551             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     552             :                  errmsg("ORIGIN message sent out of order")));
     553           0 : }
     554             : 
     555             : /*
     556             :  * Handle RELATION message.
     557             :  *
     558             :  * Note we don't do validation against local schema here. The validation
     559             :  * against local schema is postponed until first change for given relation
     560             :  * comes as we only care about it when applying changes for it anyway and we
     561             :  * do less locking this way.
     562             :  */
     563             : static void
     564         170 : apply_handle_relation(StringInfo s)
     565             : {
     566             :     LogicalRepRelation *rel;
     567             : 
     568         170 :     rel = logicalrep_read_rel(s);
     569         170 :     logicalrep_relmap_update(rel);
     570         170 : }
     571             : 
     572             : /*
     573             :  * Handle TYPE message.
     574             :  *
     575             :  * Note we don't do local mapping here, that's done when the type is
     576             :  * actually used.
     577             :  */
     578             : static void
     579          32 : apply_handle_type(StringInfo s)
     580             : {
     581             :     LogicalRepTyp typ;
     582             : 
     583          32 :     logicalrep_read_typ(s, &typ);
     584          32 :     logicalrep_typmap_update(&typ);
     585          32 : }
     586             : 
     587             : /*
     588             :  * Get replica identity index or if it is not defined a primary key.
     589             :  *
     590             :  * If neither is defined, returns InvalidOid
     591             :  */
     592             : static Oid
     593         716 : GetRelationIdentityOrPK(Relation rel)
     594             : {
     595             :     Oid         idxoid;
     596             : 
     597         716 :     idxoid = RelationGetReplicaIndex(rel);
     598             : 
     599         716 :     if (!OidIsValid(idxoid))
     600         246 :         idxoid = RelationGetPrimaryKeyIndex(rel);
     601             : 
     602         716 :     return idxoid;
     603             : }
     604             : 
     605             : /*
     606             :  * Handle INSERT message.
     607             :  */
     608             : 
     609             : static void
     610         860 : apply_handle_insert(StringInfo s)
     611             : {
     612             :     LogicalRepRelMapEntry *rel;
     613             :     LogicalRepTupleData newtup;
     614             :     LogicalRepRelId relid;
     615             :     EState     *estate;
     616             :     TupleTableSlot *remoteslot;
     617             :     MemoryContext oldctx;
     618             : 
     619         860 :     ensure_transaction();
     620             : 
     621         860 :     relid = logicalrep_read_insert(s, &newtup);
     622         860 :     rel = logicalrep_rel_open(relid, RowExclusiveLock);
     623         858 :     if (!should_apply_changes_for_rel(rel))
     624             :     {
     625             :         /*
     626             :          * The relation can't become interesting in the middle of the
     627             :          * transaction so it's safe to unlock it.
     628             :          */
     629          24 :         logicalrep_rel_close(rel, RowExclusiveLock);
     630          24 :         return;
     631             :     }
     632             : 
     633             :     /* Initialize the executor state. */
     634         834 :     estate = create_estate_for_relation(rel);
     635         834 :     remoteslot = ExecInitExtraTupleSlot(estate,
     636         834 :                                         RelationGetDescr(rel->localrel),
     637             :                                         &TTSOpsVirtual);
     638             : 
     639             :     /* Input functions may need an active snapshot, so get one */
     640         834 :     PushActiveSnapshot(GetTransactionSnapshot());
     641             : 
     642             :     /* Process and store remote tuple in the slot */
     643         834 :     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
     644         834 :     slot_store_cstrings(remoteslot, rel, newtup.values);
     645         834 :     slot_fill_defaults(rel, estate, remoteslot);
     646         834 :     MemoryContextSwitchTo(oldctx);
     647             : 
     648             :     /* For a partitioned table, insert the tuple into a partition. */
     649         834 :     if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     650          36 :         apply_handle_tuple_routing(estate->es_result_relation_info, estate,
     651             :                                    remoteslot, NULL, rel, CMD_INSERT);
     652             :     else
     653         798 :         apply_handle_insert_internal(estate->es_result_relation_info, estate,
     654             :                                      remoteslot);
     655             : 
     656         834 :     PopActiveSnapshot();
     657             : 
     658             :     /* Handle queued AFTER triggers. */
     659         834 :     AfterTriggerEndQuery(estate);
     660             : 
     661         834 :     ExecResetTupleTable(estate->es_tupleTable, false);
     662         834 :     FreeExecutorState(estate);
     663             : 
     664         834 :     logicalrep_rel_close(rel, NoLock);
     665             : 
     666         834 :     CommandCounterIncrement();
     667             : }
     668             : 
     669             : /* Workhorse for apply_handle_insert() */
     670             : static void
     671         836 : apply_handle_insert_internal(ResultRelInfo *relinfo,
     672             :                              EState *estate, TupleTableSlot *remoteslot)
     673             : {
     674         836 :     ExecOpenIndices(relinfo, false);
     675             : 
     676             :     /* Do the insert. */
     677         836 :     ExecSimpleRelationInsert(estate, remoteslot);
     678             : 
     679             :     /* Cleanup. */
     680         836 :     ExecCloseIndices(relinfo);
     681         836 : }
     682             : 
     683             : /*
     684             :  * Check if the logical replication relation is updatable and throw
     685             :  * appropriate error if it isn't.
     686             :  */
     687             : static void
     688         714 : check_relation_updatable(LogicalRepRelMapEntry *rel)
     689             : {
     690             :     /* Updatable, no error. */
     691         714 :     if (rel->updatable)
     692         714 :         return;
     693             : 
     694             :     /*
     695             :      * We are in error mode so it's fine this is somewhat slow. It's better to
     696             :      * give user correct error.
     697             :      */
     698           0 :     if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
     699             :     {
     700           0 :         ereport(ERROR,
     701             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     702             :                  errmsg("publisher did not send replica identity column "
     703             :                         "expected by the logical replication target relation \"%s.%s\"",
     704             :                         rel->remoterel.nspname, rel->remoterel.relname)));
     705             :     }
     706             : 
     707           0 :     ereport(ERROR,
     708             :             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     709             :              errmsg("logical replication target relation \"%s.%s\" has "
     710             :                     "neither REPLICA IDENTITY index nor PRIMARY "
     711             :                     "KEY and published relation does not have "
     712             :                     "REPLICA IDENTITY FULL",
     713             :                     rel->remoterel.nspname, rel->remoterel.relname)));
     714             : }
     715             : 
     716             : /*
     717             :  * Handle UPDATE message.
     718             :  *
     719             :  * TODO: FDW support
     720             :  */
     721             : static void
     722         262 : apply_handle_update(StringInfo s)
     723             : {
     724             :     LogicalRepRelMapEntry *rel;
     725             :     LogicalRepRelId relid;
     726             :     EState     *estate;
     727             :     LogicalRepTupleData oldtup;
     728             :     LogicalRepTupleData newtup;
     729             :     bool        has_oldtup;
     730             :     TupleTableSlot *remoteslot;
     731             :     RangeTblEntry *target_rte;
     732             :     MemoryContext oldctx;
     733             : 
     734         262 :     ensure_transaction();
     735             : 
     736         262 :     relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
     737             :                                    &newtup);
     738         262 :     rel = logicalrep_rel_open(relid, RowExclusiveLock);
     739         262 :     if (!should_apply_changes_for_rel(rel))
     740             :     {
     741             :         /*
     742             :          * The relation can't become interesting in the middle of the
     743             :          * transaction so it's safe to unlock it.
     744             :          */
     745           0 :         logicalrep_rel_close(rel, RowExclusiveLock);
     746           0 :         return;
     747             :     }
     748             : 
     749             :     /* Check if we can do the update. */
     750         262 :     check_relation_updatable(rel);
     751             : 
     752             :     /* Initialize the executor state. */
     753         262 :     estate = create_estate_for_relation(rel);
     754         262 :     remoteslot = ExecInitExtraTupleSlot(estate,
     755         262 :                                         RelationGetDescr(rel->localrel),
     756             :                                         &TTSOpsVirtual);
     757             : 
     758             :     /*
     759             :      * Populate updatedCols so that per-column triggers can fire.  This could
     760             :      * include more columns than were actually changed on the publisher
     761             :      * because the logical replication protocol doesn't contain that
     762             :      * information.  But it would for example exclude columns that only exist
     763             :      * on the subscriber, since we are not touching those.
     764             :      */
     765         262 :     target_rte = list_nth(estate->es_range_table, 0);
     766         798 :     for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
     767             :     {
     768         536 :         if (newtup.changed[i])
     769         462 :             target_rte->updatedCols = bms_add_member(target_rte->updatedCols,
     770             :                                                      i + 1 - FirstLowInvalidHeapAttributeNumber);
     771             :     }
     772             : 
     773         262 :     fill_extraUpdatedCols(target_rte, RelationGetDescr(rel->localrel));
     774             : 
     775         262 :     PushActiveSnapshot(GetTransactionSnapshot());
     776             : 
     777             :     /* Build the search tuple. */
     778         262 :     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
     779         262 :     slot_store_cstrings(remoteslot, rel,
     780         262 :                         has_oldtup ? oldtup.values : newtup.values);
     781         262 :     MemoryContextSwitchTo(oldctx);
     782             : 
     783             :     /* For a partitioned table, apply update to correct partition. */
     784         262 :     if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     785          10 :         apply_handle_tuple_routing(estate->es_result_relation_info, estate,
     786             :                                    remoteslot, &newtup, rel, CMD_UPDATE);
     787             :     else
     788         252 :         apply_handle_update_internal(estate->es_result_relation_info, estate,
     789             :                                      remoteslot, &newtup, rel);
     790             : 
     791         262 :     PopActiveSnapshot();
     792             : 
     793             :     /* Handle queued AFTER triggers. */
     794         262 :     AfterTriggerEndQuery(estate);
     795             : 
     796         262 :     ExecResetTupleTable(estate->es_tupleTable, false);
     797         262 :     FreeExecutorState(estate);
     798             : 
     799         262 :     logicalrep_rel_close(rel, NoLock);
     800             : 
     801         262 :     CommandCounterIncrement();
     802             : }
     803             : 
     804             : /* Workhorse for apply_handle_update() */
     805             : static void
     806         252 : apply_handle_update_internal(ResultRelInfo *relinfo,
     807             :                              EState *estate, TupleTableSlot *remoteslot,
     808             :                              LogicalRepTupleData *newtup,
     809             :                              LogicalRepRelMapEntry *relmapentry)
     810             : {
     811         252 :     Relation    localrel = relinfo->ri_RelationDesc;
     812             :     EPQState    epqstate;
     813             :     TupleTableSlot *localslot;
     814             :     bool        found;
     815             :     MemoryContext oldctx;
     816             : 
     817         252 :     EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
     818         252 :     ExecOpenIndices(relinfo, false);
     819             : 
     820         252 :     found = FindReplTupleInLocalRel(estate, localrel,
     821             :                                     &relmapentry->remoterel,
     822             :                                     remoteslot, &localslot);
     823         252 :     ExecClearTuple(remoteslot);
     824             : 
     825             :     /*
     826             :      * Tuple found.
     827             :      *
     828             :      * Note this will fail if there are other conflicting unique indexes.
     829             :      */
     830         252 :     if (found)
     831             :     {
     832             :         /* Process and store remote tuple in the slot */
     833         252 :         oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
     834         252 :         slot_modify_cstrings(remoteslot, localslot, relmapentry,
     835         252 :                              newtup->values, newtup->changed);
     836         252 :         MemoryContextSwitchTo(oldctx);
     837             : 
     838         252 :         EvalPlanQualSetSlot(&epqstate, remoteslot);
     839             : 
     840             :         /* Do the actual update. */
     841         252 :         ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
     842             :     }
     843             :     else
     844             :     {
     845             :         /*
     846             :          * The tuple to be updated could not be found.
     847             :          *
     848             :          * TODO what to do here, change the log level to LOG perhaps?
     849             :          */
     850           0 :         elog(DEBUG1,
     851             :              "logical replication did not find row for update "
     852             :              "in replication target relation \"%s\"",
     853             :              RelationGetRelationName(localrel));
     854             :     }
     855             : 
     856             :     /* Cleanup. */
     857         252 :     ExecCloseIndices(relinfo);
     858         252 :     EvalPlanQualEnd(&epqstate);
     859         252 : }
     860             : 
     861             : /*
     862             :  * Handle DELETE message.
     863             :  *
     864             :  * TODO: FDW support
     865             :  */
     866             : static void
     867         452 : apply_handle_delete(StringInfo s)
     868             : {
     869             :     LogicalRepRelMapEntry *rel;
     870             :     LogicalRepTupleData oldtup;
     871             :     LogicalRepRelId relid;
     872             :     EState     *estate;
     873             :     TupleTableSlot *remoteslot;
     874             :     MemoryContext oldctx;
     875             : 
     876         452 :     ensure_transaction();
     877             : 
     878         452 :     relid = logicalrep_read_delete(s, &oldtup);
     879         452 :     rel = logicalrep_rel_open(relid, RowExclusiveLock);
     880         452 :     if (!should_apply_changes_for_rel(rel))
     881             :     {
     882             :         /*
     883             :          * The relation can't become interesting in the middle of the
     884             :          * transaction so it's safe to unlock it.
     885             :          */
     886           0 :         logicalrep_rel_close(rel, RowExclusiveLock);
     887           0 :         return;
     888             :     }
     889             : 
     890             :     /* Check if we can do the delete. */
     891         452 :     check_relation_updatable(rel);
     892             : 
     893             :     /* Initialize the executor state. */
     894         452 :     estate = create_estate_for_relation(rel);
     895         452 :     remoteslot = ExecInitExtraTupleSlot(estate,
     896         452 :                                         RelationGetDescr(rel->localrel),
     897             :                                         &TTSOpsVirtual);
     898             : 
     899         452 :     PushActiveSnapshot(GetTransactionSnapshot());
     900             : 
     901             :     /* Build the search tuple. */
     902         452 :     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
     903         452 :     slot_store_cstrings(remoteslot, rel, oldtup.values);
     904         452 :     MemoryContextSwitchTo(oldctx);
     905             : 
     906             :     /* For a partitioned table, apply delete to correct partition. */
     907         452 :     if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     908          24 :         apply_handle_tuple_routing(estate->es_result_relation_info, estate,
     909             :                                    remoteslot, NULL, rel, CMD_DELETE);
     910             :     else
     911         428 :         apply_handle_delete_internal(estate->es_result_relation_info, estate,
     912             :                                      remoteslot, &rel->remoterel);
     913             : 
     914         452 :     PopActiveSnapshot();
     915             : 
     916             :     /* Handle queued AFTER triggers. */
     917         452 :     AfterTriggerEndQuery(estate);
     918             : 
     919         452 :     ExecResetTupleTable(estate->es_tupleTable, false);
     920         452 :     FreeExecutorState(estate);
     921             : 
     922         452 :     logicalrep_rel_close(rel, NoLock);
     923             : 
     924         452 :     CommandCounterIncrement();
     925             : }
     926             : 
     927             : /* Workhorse for apply_handle_delete() */
     928             : static void
     929         454 : apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
     930             :                              TupleTableSlot *remoteslot,
     931             :                              LogicalRepRelation *remoterel)
     932             : {
     933         454 :     Relation    localrel = relinfo->ri_RelationDesc;
     934             :     EPQState    epqstate;
     935             :     TupleTableSlot *localslot;
     936             :     bool        found;
     937             : 
     938         454 :     EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
     939         454 :     ExecOpenIndices(relinfo, false);
     940             : 
     941         454 :     found = FindReplTupleInLocalRel(estate, localrel, remoterel,
     942             :                                     remoteslot, &localslot);
     943             : 
     944             :     /* If found delete it. */
     945         454 :     if (found)
     946             :     {
     947         454 :         EvalPlanQualSetSlot(&epqstate, localslot);
     948             : 
     949             :         /* Do the actual delete. */
     950         454 :         ExecSimpleRelationDelete(estate, &epqstate, localslot);
     951             :     }
     952             :     else
     953             :     {
     954             :         /* The tuple to be deleted could not be found. */
     955           0 :         elog(DEBUG1,
     956             :              "logical replication could not find row for delete "
     957             :              "in replication target relation \"%s\"",
     958             :              RelationGetRelationName(localrel));
     959             :     }
     960             : 
     961             :     /* Cleanup. */
     962         454 :     ExecCloseIndices(relinfo);
     963         454 :     EvalPlanQualEnd(&epqstate);
     964         454 : }
     965             : 
     966             : /*
     967             :  * Try to find a tuple received from the publication side (in 'remoteslot') in
     968             :  * the corresponding local relation using either replica identity index,
     969             :  * primary key or if needed, sequential scan.
     970             :  *
     971             :  * Local tuple, if found, is returned in '*localslot'.
     972             :  */
     973             : static bool
     974         716 : FindReplTupleInLocalRel(EState *estate, Relation localrel,
     975             :                         LogicalRepRelation *remoterel,
     976             :                         TupleTableSlot *remoteslot,
     977             :                         TupleTableSlot **localslot)
     978             : {
     979             :     Oid         idxoid;
     980             :     bool        found;
     981             : 
     982         716 :     *localslot = table_slot_create(localrel, &estate->es_tupleTable);
     983             : 
     984         716 :     idxoid = GetRelationIdentityOrPK(localrel);
     985             :     Assert(OidIsValid(idxoid) ||
     986             :            (remoterel->replident == REPLICA_IDENTITY_FULL));
     987             : 
     988         716 :     if (OidIsValid(idxoid))
     989         472 :         found = RelationFindReplTupleByIndex(localrel, idxoid,
     990             :                                              LockTupleExclusive,
     991             :                                              remoteslot, *localslot);
     992             :     else
     993         244 :         found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
     994             :                                          remoteslot, *localslot);
     995             : 
     996         716 :     return found;
     997             : }
     998             : 
     999             : /*
    1000             :  * This handles insert, update, delete on a partitioned table.
    1001             :  */
    1002             : static void
    1003          70 : apply_handle_tuple_routing(ResultRelInfo *relinfo,
    1004             :                            EState *estate,
    1005             :                            TupleTableSlot *remoteslot,
    1006             :                            LogicalRepTupleData *newtup,
    1007             :                            LogicalRepRelMapEntry *relmapentry,
    1008             :                            CmdType operation)
    1009             : {
    1010          70 :     Relation    parentrel = relinfo->ri_RelationDesc;
    1011          70 :     ModifyTableState *mtstate = NULL;
    1012          70 :     PartitionTupleRouting *proute = NULL;
    1013             :     ResultRelInfo *partrelinfo;
    1014             :     Relation    partrel;
    1015             :     TupleTableSlot *remoteslot_part;
    1016             :     PartitionRoutingInfo *partinfo;
    1017             :     TupleConversionMap *map;
    1018             :     MemoryContext oldctx;
    1019             : 
    1020             :     /* ModifyTableState is needed for ExecFindPartition(). */
    1021          70 :     mtstate = makeNode(ModifyTableState);
    1022          70 :     mtstate->ps.plan = NULL;
    1023          70 :     mtstate->ps.state = estate;
    1024          70 :     mtstate->operation = operation;
    1025          70 :     mtstate->resultRelInfo = relinfo;
    1026          70 :     proute = ExecSetupPartitionTupleRouting(estate, mtstate, parentrel);
    1027             : 
    1028             :     /*
    1029             :      * Find the partition to which the "search tuple" belongs.
    1030             :      */
    1031             :     Assert(remoteslot != NULL);
    1032          70 :     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    1033          70 :     partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
    1034             :                                     remoteslot, estate);
    1035             :     Assert(partrelinfo != NULL);
    1036          70 :     partrel = partrelinfo->ri_RelationDesc;
    1037             : 
    1038             :     /*
    1039             :      * To perform any of the operations below, the tuple must match the
    1040             :      * partition's rowtype. Convert if needed or just copy, using a dedicated
    1041             :      * slot to store the tuple in any case.
    1042             :      */
    1043          70 :     partinfo = partrelinfo->ri_PartitionInfo;
    1044          70 :     remoteslot_part = partinfo->pi_PartitionTupleSlot;
    1045          70 :     if (remoteslot_part == NULL)
    1046          22 :         remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
    1047          70 :     map = partinfo->pi_RootToPartitionMap;
    1048          70 :     if (map != NULL)
    1049          48 :         remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
    1050             :                                                 remoteslot_part);
    1051             :     else
    1052             :     {
    1053          22 :         remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
    1054          22 :         slot_getallattrs(remoteslot_part);
    1055             :     }
    1056          70 :     MemoryContextSwitchTo(oldctx);
    1057             : 
    1058          70 :     estate->es_result_relation_info = partrelinfo;
    1059          70 :     switch (operation)
    1060             :     {
    1061          36 :         case CMD_INSERT:
    1062          36 :             apply_handle_insert_internal(partrelinfo, estate,
    1063             :                                          remoteslot_part);
    1064          36 :             break;
    1065             : 
    1066          24 :         case CMD_DELETE:
    1067          24 :             apply_handle_delete_internal(partrelinfo, estate,
    1068             :                                          remoteslot_part,
    1069             :                                          &relmapentry->remoterel);
    1070          24 :             break;
    1071             : 
    1072          10 :         case CMD_UPDATE:
    1073             : 
    1074             :             /*
    1075             :              * For UPDATE, depending on whether or not the updated tuple
    1076             :              * satisfies the partition's constraint, perform a simple UPDATE
    1077             :              * of the partition or move the updated tuple into a different
    1078             :              * suitable partition.
    1079             :              */
    1080          10 :             {
    1081          10 :                 AttrMap    *attrmap = map ? map->attrMap : NULL;
    1082             :                 LogicalRepRelMapEntry *part_entry;
    1083             :                 TupleTableSlot *localslot;
    1084             :                 ResultRelInfo *partrelinfo_new;
    1085             :                 bool        found;
    1086             : 
    1087          10 :                 part_entry = logicalrep_partition_open(relmapentry, partrel,
    1088             :                                                        attrmap);
    1089             : 
    1090             :                 /* Get the matching local tuple from the partition. */
    1091          10 :                 found = FindReplTupleInLocalRel(estate, partrel,
    1092             :                                                 &part_entry->remoterel,
    1093             :                                                 remoteslot_part, &localslot);
    1094             : 
    1095          10 :                 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    1096          10 :                 if (found)
    1097             :                 {
    1098             :                     /* Apply the update.  */
    1099          10 :                     slot_modify_cstrings(remoteslot_part, localslot,
    1100             :                                          part_entry,
    1101          10 :                                          newtup->values, newtup->changed);
    1102          10 :                     MemoryContextSwitchTo(oldctx);
    1103             :                 }
    1104             :                 else
    1105             :                 {
    1106             :                     /*
    1107             :                      * The tuple to be updated could not be found.
    1108             :                      *
    1109             :                      * TODO what to do here, change the log level to LOG
    1110             :                      * perhaps?
    1111             :                      */
    1112           0 :                     elog(DEBUG1,
    1113             :                          "logical replication did not find row for update "
    1114             :                          "in replication target relation \"%s\"",
    1115             :                          RelationGetRelationName(partrel));
    1116             :                 }
    1117             : 
    1118             :                 /*
    1119             :                  * Does the updated tuple still satisfy the current
    1120             :                  * partition's constraint?
    1121             :                  */
    1122          20 :                 if (partrelinfo->ri_PartitionCheck == NULL ||
    1123          10 :                     ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
    1124             :                                        false))
    1125           8 :                 {
    1126             :                     /*
    1127             :                      * Yes, so simply UPDATE the partition.  We don't call
    1128             :                      * apply_handle_update_internal() here, which would
    1129             :                      * normally do the following work, to avoid repeating some
    1130             :                      * work already done above to find the local tuple in the
    1131             :                      * partition.
    1132             :                      */
    1133             :                     EPQState    epqstate;
    1134             : 
    1135           8 :                     EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
    1136           8 :                     ExecOpenIndices(partrelinfo, false);
    1137             : 
    1138           8 :                     EvalPlanQualSetSlot(&epqstate, remoteslot_part);
    1139           8 :                     ExecSimpleRelationUpdate(estate, &epqstate, localslot,
    1140             :                                              remoteslot_part);
    1141           8 :                     ExecCloseIndices(partrelinfo);
    1142           8 :                     EvalPlanQualEnd(&epqstate);
    1143             :                 }
    1144             :                 else
    1145             :                 {
    1146             :                     /* Move the tuple into the new partition. */
    1147             : 
    1148             :                     /*
    1149             :                      * New partition will be found using tuple routing, which
    1150             :                      * can only occur via the parent table.  We might need to
    1151             :                      * convert the tuple to the parent's rowtype.  Note that
    1152             :                      * this is the tuple found in the partition, not the
    1153             :                      * original search tuple received by this function.
    1154             :                      */
    1155           2 :                     if (map)
    1156             :                     {
    1157             :                         TupleConversionMap *PartitionToRootMap =
    1158           2 :                         convert_tuples_by_name(RelationGetDescr(partrel),
    1159             :                                                RelationGetDescr(parentrel));
    1160             : 
    1161             :                         remoteslot =
    1162           2 :                             execute_attr_map_slot(PartitionToRootMap->attrMap,
    1163             :                                                   remoteslot_part, remoteslot);
    1164             :                     }
    1165             :                     else
    1166             :                     {
    1167           0 :                         remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
    1168           0 :                         slot_getallattrs(remoteslot);
    1169             :                     }
    1170             : 
    1171             : 
    1172             :                     /* Find the new partition. */
    1173           2 :                     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    1174           2 :                     partrelinfo_new = ExecFindPartition(mtstate, relinfo,
    1175             :                                                         proute, remoteslot,
    1176             :                                                         estate);
    1177           2 :                     MemoryContextSwitchTo(oldctx);
    1178             :                     Assert(partrelinfo_new != partrelinfo);
    1179             : 
    1180             :                     /* DELETE old tuple found in the old partition. */
    1181           2 :                     estate->es_result_relation_info = partrelinfo;
    1182           2 :                     apply_handle_delete_internal(partrelinfo, estate,
    1183             :                                                  localslot,
    1184             :                                                  &relmapentry->remoterel);
    1185             : 
    1186             :                     /* INSERT new tuple into the new partition. */
    1187             : 
    1188             :                     /*
    1189             :                      * Convert the replacement tuple to match the destination
    1190             :                      * partition rowtype.
    1191             :                      */
    1192           2 :                     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
    1193           2 :                     partrel = partrelinfo_new->ri_RelationDesc;
    1194           2 :                     partinfo = partrelinfo_new->ri_PartitionInfo;
    1195           2 :                     remoteslot_part = partinfo->pi_PartitionTupleSlot;
    1196           2 :                     if (remoteslot_part == NULL)
    1197           2 :                         remoteslot_part = table_slot_create(partrel,
    1198             :                                                             &estate->es_tupleTable);
    1199           2 :                     map = partinfo->pi_RootToPartitionMap;
    1200           2 :                     if (map != NULL)
    1201             :                     {
    1202           0 :                         remoteslot_part = execute_attr_map_slot(map->attrMap,
    1203             :                                                                 remoteslot,
    1204             :                                                                 remoteslot_part);
    1205             :                     }
    1206             :                     else
    1207             :                     {
    1208           2 :                         remoteslot_part = ExecCopySlot(remoteslot_part,
    1209             :                                                        remoteslot);
    1210           2 :                         slot_getallattrs(remoteslot);
    1211             :                     }
    1212           2 :                     MemoryContextSwitchTo(oldctx);
    1213           2 :                     estate->es_result_relation_info = partrelinfo_new;
    1214           2 :                     apply_handle_insert_internal(partrelinfo_new, estate,
    1215             :                                                  remoteslot_part);
    1216             :                 }
    1217             :             }
    1218          10 :             break;
    1219             : 
    1220           0 :         default:
    1221           0 :             elog(ERROR, "unrecognized CmdType: %d", (int) operation);
    1222             :             break;
    1223             :     }
    1224             : 
    1225          70 :     ExecCleanupTupleRouting(mtstate, proute);
    1226          70 : }
    1227             : 
    1228             : /*
    1229             :  * Handle TRUNCATE message.
    1230             :  *
    1231             :  * TODO: FDW support
    1232             :  */
    1233             : static void
    1234          24 : apply_handle_truncate(StringInfo s)
    1235             : {
    1236          24 :     bool        cascade = false;
    1237          24 :     bool        restart_seqs = false;
    1238          24 :     List       *remote_relids = NIL;
    1239          24 :     List       *remote_rels = NIL;
    1240          24 :     List       *rels = NIL;
    1241          24 :     List       *part_rels = NIL;
    1242          24 :     List       *relids = NIL;
    1243          24 :     List       *relids_logged = NIL;
    1244             :     ListCell   *lc;
    1245             : 
    1246          24 :     ensure_transaction();
    1247             : 
    1248          24 :     remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
    1249             : 
    1250          64 :     foreach(lc, remote_relids)
    1251             :     {
    1252          40 :         LogicalRepRelId relid = lfirst_oid(lc);
    1253             :         LogicalRepRelMapEntry *rel;
    1254             : 
    1255          40 :         rel = logicalrep_rel_open(relid, RowExclusiveLock);
    1256          40 :         if (!should_apply_changes_for_rel(rel))
    1257             :         {
    1258             :             /*
    1259             :              * The relation can't become interesting in the middle of the
    1260             :              * transaction so it's safe to unlock it.
    1261             :              */
    1262           0 :             logicalrep_rel_close(rel, RowExclusiveLock);
    1263           0 :             continue;
    1264             :         }
    1265             : 
    1266          40 :         remote_rels = lappend(remote_rels, rel);
    1267          40 :         rels = lappend(rels, rel->localrel);
    1268          40 :         relids = lappend_oid(relids, rel->localreloid);
    1269          40 :         if (RelationIsLogicallyLogged(rel->localrel))
    1270          40 :             relids_logged = lappend_oid(relids_logged, rel->localreloid);
    1271             : 
    1272             :         /*
    1273             :          * Truncate partitions if we got a message to truncate a partitioned
    1274             :          * table.
    1275             :          */
    1276          40 :         if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
    1277             :         {
    1278             :             ListCell   *child;
    1279           8 :             List       *children = find_all_inheritors(rel->localreloid,
    1280             :                                                        RowExclusiveLock,
    1281             :                                                        NULL);
    1282             : 
    1283          30 :             foreach(child, children)
    1284             :             {
    1285          22 :                 Oid         childrelid = lfirst_oid(child);
    1286             :                 Relation    childrel;
    1287             : 
    1288          22 :                 if (list_member_oid(relids, childrelid))
    1289           8 :                     continue;
    1290             : 
    1291             :                 /* find_all_inheritors already got lock */
    1292          14 :                 childrel = table_open(childrelid, NoLock);
    1293             : 
    1294             :                 /*
    1295             :                  * Ignore temp tables of other backends.  See similar code in
    1296             :                  * ExecuteTruncate().
    1297             :                  */
    1298          14 :                 if (RELATION_IS_OTHER_TEMP(childrel))
    1299             :                 {
    1300           0 :                     table_close(childrel, RowExclusiveLock);
    1301           0 :                     continue;
    1302             :                 }
    1303             : 
    1304          14 :                 rels = lappend(rels, childrel);
    1305          14 :                 part_rels = lappend(part_rels, childrel);
    1306          14 :                 relids = lappend_oid(relids, childrelid);
    1307             :                 /* Log this relation only if needed for logical decoding */
    1308          14 :                 if (RelationIsLogicallyLogged(childrel))
    1309          14 :                     relids_logged = lappend_oid(relids_logged, childrelid);
    1310             :             }
    1311             :         }
    1312             :     }
    1313             : 
    1314             :     /*
    1315             :      * Even if we used CASCADE on the upstream master we explicitly default to
    1316             :      * replaying changes without further cascading. This might be later
    1317             :      * changeable with a user specified option.
    1318             :      */
    1319          24 :     ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
    1320             : 
    1321          64 :     foreach(lc, remote_rels)
    1322             :     {
    1323          40 :         LogicalRepRelMapEntry *rel = lfirst(lc);
    1324             : 
    1325          40 :         logicalrep_rel_close(rel, NoLock);
    1326             :     }
    1327          38 :     foreach(lc, part_rels)
    1328             :     {
    1329          14 :         Relation    rel = lfirst(lc);
    1330             : 
    1331          14 :         table_close(rel, NoLock);
    1332             :     }
    1333             : 
    1334          24 :     CommandCounterIncrement();
    1335          24 : }
    1336             : 
    1337             : 
    1338             : /*
    1339             :  * Logical replication protocol message dispatcher.
    1340             :  */
    1341             : static void
    1342        2722 : apply_dispatch(StringInfo s)
    1343             : {
    1344        2722 :     char        action = pq_getmsgbyte(s);
    1345             : 
    1346        2722 :     switch (action)
    1347             :     {
    1348             :             /* BEGIN */
    1349         462 :         case 'B':
    1350         462 :             apply_handle_begin(s);
    1351         462 :             break;
    1352             :             /* COMMIT */
    1353         460 :         case 'C':
    1354         460 :             apply_handle_commit(s);
    1355         460 :             break;
    1356             :             /* INSERT */
    1357         860 :         case 'I':
    1358         860 :             apply_handle_insert(s);
    1359         858 :             break;
    1360             :             /* UPDATE */
    1361         262 :         case 'U':
    1362         262 :             apply_handle_update(s);
    1363         262 :             break;
    1364             :             /* DELETE */
    1365         452 :         case 'D':
    1366         452 :             apply_handle_delete(s);
    1367         452 :             break;
    1368             :             /* TRUNCATE */
    1369          24 :         case 'T':
    1370          24 :             apply_handle_truncate(s);
    1371          24 :             break;
    1372             :             /* RELATION */
    1373         170 :         case 'R':
    1374         170 :             apply_handle_relation(s);
    1375         170 :             break;
    1376             :             /* TYPE */
    1377          32 :         case 'Y':
    1378          32 :             apply_handle_type(s);
    1379          32 :             break;
    1380             :             /* ORIGIN */
    1381           0 :         case 'O':
    1382           0 :             apply_handle_origin(s);
    1383           0 :             break;
    1384           0 :         default:
    1385           0 :             ereport(ERROR,
    1386             :                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1387             :                      errmsg("invalid logical replication message type \"%c\"", action)));
    1388             :     }
    1389        2720 : }
    1390             : 
    1391             : /*
    1392             :  * Figure out which write/flush positions to report to the walsender process.
    1393             :  *
    1394             :  * We can't simply report back the last LSN the walsender sent us because the
    1395             :  * local transaction might not yet be flushed to disk locally. Instead we
    1396             :  * build a list that associates local with remote LSNs for every commit. When
    1397             :  * reporting back the flush position to the sender we iterate that list and
    1398             :  * check which entries on it are already locally flushed. Those we can report
    1399             :  * as having been flushed.
    1400             :  *
    1401             :  * The have_pending_txes is true if there are outstanding transactions that
    1402             :  * need to be flushed.
    1403             :  */
    1404             : static void
    1405       13200 : get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
    1406             :                    bool *have_pending_txes)
    1407             : {
    1408             :     dlist_mutable_iter iter;
    1409       13200 :     XLogRecPtr  local_flush = GetFlushRecPtr();
    1410             : 
    1411       13200 :     *write = InvalidXLogRecPtr;
    1412       13200 :     *flush = InvalidXLogRecPtr;
    1413             : 
    1414       13530 :     dlist_foreach_modify(iter, &lsn_mapping)
    1415             :     {
    1416       12374 :         FlushPosition *pos =
    1417       12374 :         dlist_container(FlushPosition, node, iter.cur);
    1418             : 
    1419       12374 :         *write = pos->remote_end;
    1420             : 
    1421       12374 :         if (pos->local_end <= local_flush)
    1422             :         {
    1423         330 :             *flush = pos->remote_end;
    1424         330 :             dlist_delete(iter.cur);
    1425         330 :             pfree(pos);
    1426             :         }
    1427             :         else
    1428             :         {
    1429             :             /*
    1430             :              * Don't want to uselessly iterate over the rest of the list which
    1431             :              * could potentially be long. Instead get the last element and
    1432             :              * grab the write position from there.
    1433             :              */
    1434       12044 :             pos = dlist_tail_element(FlushPosition, node,
    1435             :                                      &lsn_mapping);
    1436       12044 :             *write = pos->remote_end;
    1437       12044 :             *have_pending_txes = true;
    1438       12044 :             return;
    1439             :         }
    1440             :     }
    1441             : 
    1442        1156 :     *have_pending_txes = !dlist_is_empty(&lsn_mapping);
    1443             : }
    1444             : 
    1445             : /*
    1446             :  * Store current remote/local lsn pair in the tracking list.
    1447             :  */
    1448             : static void
    1449         344 : store_flush_position(XLogRecPtr remote_lsn)
    1450             : {
    1451             :     FlushPosition *flushpos;
    1452             : 
    1453             :     /* Need to do this in permanent context */
    1454         344 :     MemoryContextSwitchTo(ApplyContext);
    1455             : 
    1456             :     /* Track commit lsn  */
    1457         344 :     flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
    1458         344 :     flushpos->local_end = XactLastCommitEnd;
    1459         344 :     flushpos->remote_end = remote_lsn;
    1460             : 
    1461         344 :     dlist_push_tail(&lsn_mapping, &flushpos->node);
    1462         344 :     MemoryContextSwitchTo(ApplyMessageContext);
    1463         344 : }
    1464             : 
    1465             : 
    1466             : /* Update statistics of the worker. */
    1467             : static void
    1468        9020 : UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
    1469             : {
    1470        9020 :     MyLogicalRepWorker->last_lsn = last_lsn;
    1471        9020 :     MyLogicalRepWorker->last_send_time = send_time;
    1472        9020 :     MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
    1473        9020 :     if (reply)
    1474             :     {
    1475        6298 :         MyLogicalRepWorker->reply_lsn = last_lsn;
    1476        6298 :         MyLogicalRepWorker->reply_time = send_time;
    1477             :     }
    1478        9020 : }
    1479             : 
    1480             : /*
    1481             :  * Apply main loop.
    1482             :  */
    1483             : static void
    1484          50 : LogicalRepApplyLoop(XLogRecPtr last_received)
    1485             : {
    1486          50 :     TimestampTz last_recv_timestamp = GetCurrentTimestamp();
    1487             : 
    1488             :     /*
    1489             :      * Init the ApplyMessageContext which we clean up after each replication
    1490             :      * protocol message.
    1491             :      */
    1492          50 :     ApplyMessageContext = AllocSetContextCreate(ApplyContext,
    1493             :                                                 "ApplyMessageContext",
    1494             :                                                 ALLOCSET_DEFAULT_SIZES);
    1495             : 
    1496             :     /* mark as idle, before starting to loop */
    1497          50 :     pgstat_report_activity(STATE_IDLE, NULL);
    1498             : 
    1499             :     for (;;)
    1500        6792 :     {
    1501        6842 :         pgsocket    fd = PGINVALID_SOCKET;
    1502             :         int         rc;
    1503             :         int         len;
    1504        6842 :         char       *buf = NULL;
    1505        6842 :         bool        endofstream = false;
    1506        6842 :         bool        ping_sent = false;
    1507             :         long        wait_time;
    1508             : 
    1509        6842 :         CHECK_FOR_INTERRUPTS();
    1510             : 
    1511        6842 :         MemoryContextSwitchTo(ApplyMessageContext);
    1512             : 
    1513        6842 :         len = walrcv_receive(wrconn, &buf, &fd);
    1514             : 
    1515        6832 :         if (len != 0)
    1516             :         {
    1517             :             /* Process the data */
    1518             :             for (;;)
    1519             :             {
    1520       15602 :                 CHECK_FOR_INTERRUPTS();
    1521             : 
    1522       15602 :                 if (len == 0)
    1523             :                 {
    1524        6578 :                     break;
    1525             :                 }
    1526        9024 :                 else if (len < 0)
    1527             :                 {
    1528           4 :                     ereport(LOG,
    1529             :                             (errmsg("data stream from publisher has ended")));
    1530           4 :                     endofstream = true;
    1531           4 :                     break;
    1532             :                 }
    1533             :                 else
    1534             :                 {
    1535             :                     int         c;
    1536             :                     StringInfoData s;
    1537             : 
    1538             :                     /* Reset timeout. */
    1539        9020 :                     last_recv_timestamp = GetCurrentTimestamp();
    1540        9020 :                     ping_sent = false;
    1541             : 
    1542             :                     /* Ensure we are reading the data into our memory context. */
    1543        9020 :                     MemoryContextSwitchTo(ApplyMessageContext);
    1544             : 
    1545        9020 :                     s.data = buf;
    1546        9020 :                     s.len = len;
    1547        9020 :                     s.cursor = 0;
    1548        9020 :                     s.maxlen = -1;
    1549             : 
    1550        9020 :                     c = pq_getmsgbyte(&s);
    1551             : 
    1552        9020 :                     if (c == 'w')
    1553             :                     {
    1554             :                         XLogRecPtr  start_lsn;
    1555             :                         XLogRecPtr  end_lsn;
    1556             :                         TimestampTz send_time;
    1557             : 
    1558        2722 :                         start_lsn = pq_getmsgint64(&s);
    1559        2722 :                         end_lsn = pq_getmsgint64(&s);
    1560        2722 :                         send_time = pq_getmsgint64(&s);
    1561             : 
    1562        2722 :                         if (last_received < start_lsn)
    1563        1758 :                             last_received = start_lsn;
    1564             : 
    1565        2722 :                         if (last_received < end_lsn)
    1566           0 :                             last_received = end_lsn;
    1567             : 
    1568        2722 :                         UpdateWorkerStats(last_received, send_time, false);
    1569             : 
    1570        2722 :                         apply_dispatch(&s);
    1571             :                     }
    1572        6298 :                     else if (c == 'k')
    1573             :                     {
    1574             :                         XLogRecPtr  end_lsn;
    1575             :                         TimestampTz timestamp;
    1576             :                         bool        reply_requested;
    1577             : 
    1578        6298 :                         end_lsn = pq_getmsgint64(&s);
    1579        6298 :                         timestamp = pq_getmsgint64(&s);
    1580        6298 :                         reply_requested = pq_getmsgbyte(&s);
    1581             : 
    1582        6298 :                         if (last_received < end_lsn)
    1583         196 :                             last_received = end_lsn;
    1584             : 
    1585        6298 :                         send_feedback(last_received, reply_requested, false);
    1586        6298 :                         UpdateWorkerStats(last_received, timestamp, true);
    1587             :                     }
    1588             :                     /* other message types are purposefully ignored */
    1589             : 
    1590        9018 :                     MemoryContextReset(ApplyMessageContext);
    1591             :                 }
    1592             : 
    1593        9018 :                 len = walrcv_receive(wrconn, &buf, &fd);
    1594             :             }
    1595             :         }
    1596             : 
    1597             :         /* confirm all writes so far */
    1598        6830 :         send_feedback(last_received, false, false);
    1599             : 
    1600        6830 :         if (!in_remote_transaction)
    1601             :         {
    1602             :             /*
    1603             :              * If we didn't get any transactions for a while there might be
    1604             :              * unconsumed invalidation messages in the queue, consume them
    1605             :              * now.
    1606             :              */
    1607        6566 :             AcceptInvalidationMessages();
    1608        6566 :             maybe_reread_subscription();
    1609             : 
    1610             :             /* Process any table synchronization changes. */
    1611        6560 :             process_syncing_tables(last_received);
    1612             :         }
    1613             : 
    1614             :         /* Cleanup the memory. */
    1615        6824 :         MemoryContextResetAndDeleteChildren(ApplyMessageContext);
    1616        6824 :         MemoryContextSwitchTo(TopMemoryContext);
    1617             : 
    1618             :         /* Check if we need to exit the streaming loop. */
    1619        6824 :         if (endofstream)
    1620             :         {
    1621             :             TimeLineID  tli;
    1622             : 
    1623           4 :             walrcv_endstreaming(wrconn, &tli);
    1624           0 :             break;
    1625             :         }
    1626             : 
    1627             :         /*
    1628             :          * Wait for more data or latch.  If we have unflushed transactions,
    1629             :          * wake up after WalWriterDelay to see if they've been flushed yet (in
    1630             :          * which case we should send a feedback message).  Otherwise, there's
    1631             :          * no particular urgency about waking up unless we get data or a
    1632             :          * signal.
    1633             :          */
    1634        6820 :         if (!dlist_is_empty(&lsn_mapping))
    1635        6072 :             wait_time = WalWriterDelay;
    1636             :         else
    1637         748 :             wait_time = NAPTIME_PER_CYCLE;
    1638             : 
    1639        6820 :         rc = WaitLatchOrSocket(MyLatch,
    1640             :                                WL_SOCKET_READABLE | WL_LATCH_SET |
    1641             :                                WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1642             :                                fd, wait_time,
    1643             :                                WAIT_EVENT_LOGICAL_APPLY_MAIN);
    1644             : 
    1645        6820 :         if (rc & WL_LATCH_SET)
    1646             :         {
    1647         148 :             ResetLatch(MyLatch);
    1648         148 :             CHECK_FOR_INTERRUPTS();
    1649             :         }
    1650             : 
    1651        6792 :         if (ConfigReloadPending)
    1652             :         {
    1653           0 :             ConfigReloadPending = false;
    1654           0 :             ProcessConfigFile(PGC_SIGHUP);
    1655             :         }
    1656             : 
    1657        6792 :         if (rc & WL_TIMEOUT)
    1658             :         {
    1659             :             /*
    1660             :              * We didn't receive anything new. If we haven't heard anything
    1661             :              * from the server for more than wal_receiver_timeout / 2, ping
    1662             :              * the server. Also, if it's been longer than
    1663             :              * wal_receiver_status_interval since the last update we sent,
    1664             :              * send a status update to the master anyway, to report any
    1665             :              * progress in applying WAL.
    1666             :              */
    1667          72 :             bool        requestReply = false;
    1668             : 
    1669             :             /*
    1670             :              * Check if time since last receive from standby has reached the
    1671             :              * configured limit.
    1672             :              */
    1673          72 :             if (wal_receiver_timeout > 0)
    1674             :             {
    1675          72 :                 TimestampTz now = GetCurrentTimestamp();
    1676             :                 TimestampTz timeout;
    1677             : 
    1678          72 :                 timeout =
    1679          72 :                     TimestampTzPlusMilliseconds(last_recv_timestamp,
    1680             :                                                 wal_receiver_timeout);
    1681             : 
    1682          72 :                 if (now >= timeout)
    1683           0 :                     ereport(ERROR,
    1684             :                             (errmsg("terminating logical replication worker due to timeout")));
    1685             : 
    1686             :                 /*
    1687             :                  * We didn't receive anything new, for half of receiver
    1688             :                  * replication timeout. Ping the server.
    1689             :                  */
    1690          72 :                 if (!ping_sent)
    1691             :                 {
    1692          72 :                     timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
    1693             :                                                           (wal_receiver_timeout / 2));
    1694          72 :                     if (now >= timeout)
    1695             :                     {
    1696           0 :                         requestReply = true;
    1697           0 :                         ping_sent = true;
    1698             :                     }
    1699             :                 }
    1700             :             }
    1701             : 
    1702          72 :             send_feedback(last_received, requestReply, requestReply);
    1703             :         }
    1704             :     }
    1705           0 : }
    1706             : 
    1707             : /*
    1708             :  * Send a Standby Status Update message to server.
    1709             :  *
    1710             :  * 'recvpos' is the latest LSN we've received data to, force is set if we need
    1711             :  * to send a response to avoid timeouts.
    1712             :  */
    1713             : static void
    1714       13200 : send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
    1715             : {
    1716             :     static StringInfo reply_message = NULL;
    1717             :     static TimestampTz send_time = 0;
    1718             : 
    1719             :     static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
    1720             :     static XLogRecPtr last_writepos = InvalidXLogRecPtr;
    1721             :     static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
    1722             : 
    1723             :     XLogRecPtr  writepos;
    1724             :     XLogRecPtr  flushpos;
    1725             :     TimestampTz now;
    1726             :     bool        have_pending_txes;
    1727             : 
    1728             :     /*
    1729             :      * If the user doesn't want status to be reported to the publisher, be
    1730             :      * sure to exit before doing anything at all.
    1731             :      */
    1732       13200 :     if (!force && wal_receiver_status_interval <= 0)
    1733        6804 :         return;
    1734             : 
    1735             :     /* It's legal to not pass a recvpos */
    1736       13200 :     if (recvpos < last_recvpos)
    1737           0 :         recvpos = last_recvpos;
    1738             : 
    1739       13200 :     get_flush_position(&writepos, &flushpos, &have_pending_txes);
    1740             : 
    1741             :     /*
    1742             :      * No outstanding transactions to flush, we can report the latest received
    1743             :      * position. This is important for synchronous replication.
    1744             :      */
    1745       13200 :     if (!have_pending_txes)
    1746        1156 :         flushpos = writepos = recvpos;
    1747             : 
    1748       13200 :     if (writepos < last_writepos)
    1749           0 :         writepos = last_writepos;
    1750             : 
    1751       13200 :     if (flushpos < last_flushpos)
    1752       12002 :         flushpos = last_flushpos;
    1753             : 
    1754       13200 :     now = GetCurrentTimestamp();
    1755             : 
    1756             :     /* if we've already reported everything we're good */
    1757       13200 :     if (!force &&
    1758        7556 :         writepos == last_writepos &&
    1759        6894 :         flushpos == last_flushpos &&
    1760        6844 :         !TimestampDifferenceExceeds(send_time, now,
    1761             :                                     wal_receiver_status_interval * 1000))
    1762        6804 :         return;
    1763        6396 :     send_time = now;
    1764             : 
    1765        6396 :     if (!reply_message)
    1766             :     {
    1767          50 :         MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
    1768             : 
    1769          50 :         reply_message = makeStringInfo();
    1770          50 :         MemoryContextSwitchTo(oldctx);
    1771             :     }
    1772             :     else
    1773        6346 :         resetStringInfo(reply_message);
    1774             : 
    1775        6396 :     pq_sendbyte(reply_message, 'r');
    1776        6396 :     pq_sendint64(reply_message, recvpos);   /* write */
    1777        6396 :     pq_sendint64(reply_message, flushpos);  /* flush */
    1778        6396 :     pq_sendint64(reply_message, writepos);  /* apply */
    1779        6396 :     pq_sendint64(reply_message, now);   /* sendTime */
    1780        6396 :     pq_sendbyte(reply_message, requestReply);   /* replyRequested */
    1781             : 
    1782        6396 :     elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
    1783             :          force,
    1784             :          (uint32) (recvpos >> 32), (uint32) recvpos,
    1785             :          (uint32) (writepos >> 32), (uint32) writepos,
    1786             :          (uint32) (flushpos >> 32), (uint32) flushpos
    1787             :         );
    1788             : 
    1789        6396 :     walrcv_send(wrconn, reply_message->data, reply_message->len);
    1790             : 
    1791        6396 :     if (recvpos > last_recvpos)
    1792         662 :         last_recvpos = recvpos;
    1793        6396 :     if (writepos > last_writepos)
    1794         662 :         last_writepos = writepos;
    1795        6396 :     if (flushpos > last_flushpos)
    1796         438 :         last_flushpos = flushpos;
    1797             : }
    1798             : 
    1799             : /*
    1800             :  * Reread subscription info if needed. Most changes will be exit.
    1801             :  */
    1802             : static void
    1803        7028 : maybe_reread_subscription(void)
    1804             : {
    1805             :     MemoryContext oldctx;
    1806             :     Subscription *newsub;
    1807        7028 :     bool        started_tx = false;
    1808             : 
    1809             :     /* When cache state is valid there is nothing to do here. */
    1810        7028 :     if (MySubscriptionValid)
    1811        7014 :         return;
    1812             : 
    1813             :     /* This function might be called inside or outside of transaction. */
    1814          14 :     if (!IsTransactionState())
    1815             :     {
    1816          14 :         StartTransactionCommand();
    1817          14 :         started_tx = true;
    1818             :     }
    1819             : 
    1820             :     /* Ensure allocations in permanent context. */
    1821          14 :     oldctx = MemoryContextSwitchTo(ApplyContext);
    1822             : 
    1823          14 :     newsub = GetSubscription(MyLogicalRepWorker->subid, true);
    1824             : 
    1825             :     /*
    1826             :      * Exit if the subscription was removed. This normally should not happen
    1827             :      * as the worker gets killed during DROP SUBSCRIPTION.
    1828             :      */
    1829          14 :     if (!newsub)
    1830             :     {
    1831           0 :         ereport(LOG,
    1832             :                 (errmsg("logical replication apply worker for subscription \"%s\" will "
    1833             :                         "stop because the subscription was removed",
    1834             :                         MySubscription->name)));
    1835             : 
    1836           0 :         proc_exit(0);
    1837             :     }
    1838             : 
    1839             :     /*
    1840             :      * Exit if the subscription was disabled. This normally should not happen
    1841             :      * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
    1842             :      */
    1843          14 :     if (!newsub->enabled)
    1844             :     {
    1845           0 :         ereport(LOG,
    1846             :                 (errmsg("logical replication apply worker for subscription \"%s\" will "
    1847             :                         "stop because the subscription was disabled",
    1848             :                         MySubscription->name)));
    1849             : 
    1850           0 :         proc_exit(0);
    1851             :     }
    1852             : 
    1853             :     /*
    1854             :      * Exit if connection string was changed. The launcher will start new
    1855             :      * worker.
    1856             :      */
    1857          14 :     if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
    1858             :     {
    1859           2 :         ereport(LOG,
    1860             :                 (errmsg("logical replication apply worker for subscription \"%s\" will "
    1861             :                         "restart because the connection information was changed",
    1862             :                         MySubscription->name)));
    1863             : 
    1864           2 :         proc_exit(0);
    1865             :     }
    1866             : 
    1867             :     /*
    1868             :      * Exit if subscription name was changed (it's used for
    1869             :      * fallback_application_name). The launcher will start new worker.
    1870             :      */
    1871          12 :     if (strcmp(newsub->name, MySubscription->name) != 0)
    1872             :     {
    1873           2 :         ereport(LOG,
    1874             :                 (errmsg("logical replication apply worker for subscription \"%s\" will "
    1875             :                         "restart because subscription was renamed",
    1876             :                         MySubscription->name)));
    1877             : 
    1878           2 :         proc_exit(0);
    1879             :     }
    1880             : 
    1881             :     /* !slotname should never happen when enabled is true. */
    1882             :     Assert(newsub->slotname);
    1883             : 
    1884             :     /*
    1885             :      * We need to make new connection to new slot if slot name has changed so
    1886             :      * exit here as well if that's the case.
    1887             :      */
    1888          10 :     if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
    1889             :     {
    1890           0 :         ereport(LOG,
    1891             :                 (errmsg("logical replication apply worker for subscription \"%s\" will "
    1892             :                         "restart because the replication slot name was changed",
    1893             :                         MySubscription->name)));
    1894             : 
    1895           0 :         proc_exit(0);
    1896             :     }
    1897             : 
    1898             :     /*
    1899             :      * Exit if publication list was changed. The launcher will start new
    1900             :      * worker.
    1901             :      */
    1902          10 :     if (!equal(newsub->publications, MySubscription->publications))
    1903             :     {
    1904           2 :         ereport(LOG,
    1905             :                 (errmsg("logical replication apply worker for subscription \"%s\" will "
    1906             :                         "restart because subscription's publications were changed",
    1907             :                         MySubscription->name)));
    1908             : 
    1909           2 :         proc_exit(0);
    1910             :     }
    1911             : 
    1912             :     /* Check for other changes that should never happen too. */
    1913           8 :     if (newsub->dbid != MySubscription->dbid)
    1914             :     {
    1915           0 :         elog(ERROR, "subscription %u changed unexpectedly",
    1916             :              MyLogicalRepWorker->subid);
    1917             :     }
    1918             : 
    1919             :     /* Clean old subscription info and switch to new one. */
    1920           8 :     FreeSubscription(MySubscription);
    1921           8 :     MySubscription = newsub;
    1922             : 
    1923           8 :     MemoryContextSwitchTo(oldctx);
    1924             : 
    1925             :     /* Change synchronous commit according to the user's wishes */
    1926           8 :     SetConfigOption("synchronous_commit", MySubscription->synccommit,
    1927             :                     PGC_BACKEND, PGC_S_OVERRIDE);
    1928             : 
    1929           8 :     if (started_tx)
    1930           8 :         CommitTransactionCommand();
    1931             : 
    1932           8 :     MySubscriptionValid = true;
    1933             : }
    1934             : 
    1935             : /*
    1936             :  * Callback from subscription syscache invalidation.
    1937             :  */
    1938             : static void
    1939          16 : subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
    1940             : {
    1941          16 :     MySubscriptionValid = false;
    1942          16 : }
    1943             : 
    1944             : /* Logical Replication Apply worker entry point */
    1945             : void
    1946         162 : ApplyWorkerMain(Datum main_arg)
    1947             : {
    1948         162 :     int         worker_slot = DatumGetInt32(main_arg);
    1949             :     MemoryContext oldctx;
    1950             :     char        originname[NAMEDATALEN];
    1951             :     XLogRecPtr  origin_startpos;
    1952             :     char       *myslotname;
    1953             :     WalRcvStreamOptions options;
    1954             : 
    1955             :     /* Attach to slot */
    1956         162 :     logicalrep_worker_attach(worker_slot);
    1957             : 
    1958             :     /* Setup signal handling */
    1959         162 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
    1960         162 :     pqsignal(SIGTERM, die);
    1961         162 :     BackgroundWorkerUnblockSignals();
    1962             : 
    1963             :     /*
    1964             :      * We don't currently need any ResourceOwner in a walreceiver process, but
    1965             :      * if we did, we could call CreateAuxProcessResourceOwner here.
    1966             :      */
    1967             : 
    1968             :     /* Initialise stats to a sanish value */
    1969         324 :     MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
    1970         162 :         MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
    1971             : 
    1972             :     /* Load the libpq-specific functions */
    1973         162 :     load_file("libpqwalreceiver", false);
    1974             : 
    1975             :     /* Run as replica session replication role. */
    1976         162 :     SetConfigOption("session_replication_role", "replica",
    1977             :                     PGC_SUSET, PGC_S_OVERRIDE);
    1978             : 
    1979             :     /* Connect to our database. */
    1980         162 :     BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
    1981         162 :                                               MyLogicalRepWorker->userid,
    1982             :                                               0);
    1983             : 
    1984             :     /* Load the subscription into persistent memory context. */
    1985         162 :     ApplyContext = AllocSetContextCreate(TopMemoryContext,
    1986             :                                          "ApplyContext",
    1987             :                                          ALLOCSET_DEFAULT_SIZES);
    1988         162 :     StartTransactionCommand();
    1989         162 :     oldctx = MemoryContextSwitchTo(ApplyContext);
    1990             : 
    1991         162 :     MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
    1992         162 :     if (!MySubscription)
    1993             :     {
    1994           0 :         ereport(LOG,
    1995             :                 (errmsg("logical replication apply worker for subscription %u will not "
    1996             :                         "start because the subscription was removed during startup",
    1997             :                         MyLogicalRepWorker->subid)));
    1998           0 :         proc_exit(0);
    1999             :     }
    2000             : 
    2001         162 :     MySubscriptionValid = true;
    2002         162 :     MemoryContextSwitchTo(oldctx);
    2003             : 
    2004         162 :     if (!MySubscription->enabled)
    2005             :     {
    2006           0 :         ereport(LOG,
    2007             :                 (errmsg("logical replication apply worker for subscription \"%s\" will not "
    2008             :                         "start because the subscription was disabled during startup",
    2009             :                         MySubscription->name)));
    2010             : 
    2011           0 :         proc_exit(0);
    2012             :     }
    2013             : 
    2014             :     /* Setup synchronous commit according to the user's wishes */
    2015         162 :     SetConfigOption("synchronous_commit", MySubscription->synccommit,
    2016             :                     PGC_BACKEND, PGC_S_OVERRIDE);
    2017             : 
    2018             :     /* Keep us informed about subscription changes. */
    2019         162 :     CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
    2020             :                                   subscription_change_cb,
    2021             :                                   (Datum) 0);
    2022             : 
    2023         162 :     if (am_tablesync_worker())
    2024         102 :         ereport(LOG,
    2025             :                 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
    2026             :                         MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
    2027             :     else
    2028          60 :         ereport(LOG,
    2029             :                 (errmsg("logical replication apply worker for subscription \"%s\" has started",
    2030             :                         MySubscription->name)));
    2031             : 
    2032         162 :     CommitTransactionCommand();
    2033             : 
    2034             :     /* Connect to the origin and start the replication. */
    2035         162 :     elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
    2036             :          MySubscription->conninfo);
    2037             : 
    2038         162 :     if (am_tablesync_worker())
    2039             :     {
    2040             :         char       *syncslotname;
    2041             : 
    2042             :         /* This is table synchronization worker, call initial sync. */
    2043         102 :         syncslotname = LogicalRepSyncTableStart(&origin_startpos);
    2044             : 
    2045             :         /* The slot name needs to be allocated in permanent memory context. */
    2046           0 :         oldctx = MemoryContextSwitchTo(ApplyContext);
    2047           0 :         myslotname = pstrdup(syncslotname);
    2048           0 :         MemoryContextSwitchTo(oldctx);
    2049             : 
    2050           0 :         pfree(syncslotname);
    2051             :     }
    2052             :     else
    2053             :     {
    2054             :         /* This is main apply worker */
    2055             :         RepOriginId originid;
    2056             :         TimeLineID  startpointTLI;
    2057             :         char       *err;
    2058             : 
    2059          60 :         myslotname = MySubscription->slotname;
    2060             : 
    2061             :         /*
    2062             :          * This shouldn't happen if the subscription is enabled, but guard
    2063             :          * against DDL bugs or manual catalog changes.  (libpqwalreceiver will
    2064             :          * crash if slot is NULL.)
    2065             :          */
    2066          60 :         if (!myslotname)
    2067           0 :             ereport(ERROR,
    2068             :                     (errmsg("subscription has no replication slot set")));
    2069             : 
    2070             :         /* Setup replication origin tracking. */
    2071          60 :         StartTransactionCommand();
    2072          60 :         snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
    2073          60 :         originid = replorigin_by_name(originname, true);
    2074          60 :         if (!OidIsValid(originid))
    2075           0 :             originid = replorigin_create(originname);
    2076          60 :         replorigin_session_setup(originid);
    2077          60 :         replorigin_session_origin = originid;
    2078          60 :         origin_startpos = replorigin_session_get_progress(false);
    2079          60 :         CommitTransactionCommand();
    2080             : 
    2081          60 :         wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
    2082             :                                 &err);
    2083          60 :         if (wrconn == NULL)
    2084          10 :             ereport(ERROR,
    2085             :                     (errmsg("could not connect to the publisher: %s", err)));
    2086             : 
    2087             :         /*
    2088             :          * We don't really use the output identify_system for anything but it
    2089             :          * does some initializations on the upstream so let's still call it.
    2090             :          */
    2091          50 :         (void) walrcv_identify_system(wrconn, &startpointTLI);
    2092             : 
    2093             :     }
    2094             : 
    2095             :     /*
    2096             :      * Setup callback for syscache so that we know when something changes in
    2097             :      * the subscription relation state.
    2098             :      */
    2099          50 :     CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
    2100             :                                   invalidate_syncing_table_states,
    2101             :                                   (Datum) 0);
    2102             : 
    2103             :     /* Build logical replication streaming options. */
    2104          50 :     options.logical = true;
    2105          50 :     options.startpoint = origin_startpos;
    2106          50 :     options.slotname = myslotname;
    2107          50 :     options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
    2108          50 :     options.proto.logical.publication_names = MySubscription->publications;
    2109             : 
    2110             :     /* Start normal logical streaming replication. */
    2111          50 :     walrcv_startstreaming(wrconn, &options);
    2112             : 
    2113             :     /* Run the main loop. */
    2114          50 :     LogicalRepApplyLoop(origin_startpos);
    2115             : 
    2116           0 :     proc_exit(0);
    2117             : }
    2118             : 
    2119             : /*
    2120             :  * Is current process a logical replication worker?
    2121             :  */
    2122             : bool
    2123         408 : IsLogicalWorker(void)
    2124             : {
    2125         408 :     return MyLogicalRepWorker != NULL;
    2126             : }

Generated by: LCOV version 1.13