LCOV - code coverage report
Current view: top level - src/backend/replication/logical - worker.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 515 582 88.5 %
Date: 2019-11-13 23:06:49 Functions: 26 29 89.7 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * worker.c
       3             :  *     PostgreSQL logical replication worker (apply)
       4             :  *
       5             :  * Copyright (c) 2016-2019, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/worker.c
       9             :  *
      10             :  * NOTES
      11             :  *    This file contains the worker which applies logical changes as they come
      12             :  *    from remote logical replication stream.
      13             :  *
      14             :  *    The main worker (apply) is started by logical replication worker
      15             :  *    launcher for every enabled subscription in a database. It uses
      16             :  *    walsender protocol to communicate with publisher.
      17             :  *
      18             :  *    This module includes server facing code and shares libpqwalreceiver
      19             :  *    module with walreceiver for providing the libpq specific functionality.
      20             :  *
      21             :  *-------------------------------------------------------------------------
      22             :  */
      23             : 
      24             : #include "postgres.h"
      25             : 
      26             : #include "access/table.h"
      27             : #include "access/tableam.h"
      28             : #include "access/xact.h"
      29             : #include "access/xlog_internal.h"
      30             : #include "catalog/catalog.h"
      31             : #include "catalog/namespace.h"
      32             : #include "catalog/pg_subscription.h"
      33             : #include "catalog/pg_subscription_rel.h"
      34             : #include "commands/tablecmds.h"
      35             : #include "commands/trigger.h"
      36             : #include "executor/executor.h"
      37             : #include "executor/nodeModifyTable.h"
      38             : #include "funcapi.h"
      39             : #include "libpq/pqformat.h"
      40             : #include "libpq/pqsignal.h"
      41             : #include "mb/pg_wchar.h"
      42             : #include "miscadmin.h"
      43             : #include "nodes/makefuncs.h"
      44             : #include "optimizer/optimizer.h"
      45             : #include "parser/parse_relation.h"
      46             : #include "pgstat.h"
      47             : #include "postmaster/bgworker.h"
      48             : #include "postmaster/postmaster.h"
      49             : #include "postmaster/walwriter.h"
      50             : #include "replication/decode.h"
      51             : #include "replication/logical.h"
      52             : #include "replication/logicalproto.h"
      53             : #include "replication/logicalrelation.h"
      54             : #include "replication/logicalworker.h"
      55             : #include "replication/origin.h"
      56             : #include "replication/reorderbuffer.h"
      57             : #include "replication/snapbuild.h"
      58             : #include "replication/walreceiver.h"
      59             : #include "replication/worker_internal.h"
      60             : #include "rewrite/rewriteHandler.h"
      61             : #include "storage/bufmgr.h"
      62             : #include "storage/ipc.h"
      63             : #include "storage/lmgr.h"
      64             : #include "storage/proc.h"
      65             : #include "storage/procarray.h"
      66             : #include "tcop/tcopprot.h"
      67             : #include "utils/builtins.h"
      68             : #include "utils/catcache.h"
      69             : #include "utils/datum.h"
      70             : #include "utils/fmgroids.h"
      71             : #include "utils/guc.h"
      72             : #include "utils/inval.h"
      73             : #include "utils/lsyscache.h"
      74             : #include "utils/memutils.h"
      75             : #include "utils/rel.h"
      76             : #include "utils/syscache.h"
      77             : #include "utils/timeout.h"
      78             : 
      79             : #define NAPTIME_PER_CYCLE 1000  /* max sleep time between cycles (1s) */
      80             : 
      81             : typedef struct FlushPosition
      82             : {
      83             :     dlist_node  node;
      84             :     XLogRecPtr  local_end;
      85             :     XLogRecPtr  remote_end;
      86             : } FlushPosition;
      87             : 
      88             : static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
      89             : 
      90             : typedef struct SlotErrCallbackArg
      91             : {
      92             :     LogicalRepRelMapEntry *rel;
      93             :     int         local_attnum;
      94             :     int         remote_attnum;
      95             : } SlotErrCallbackArg;
      96             : 
      97             : static MemoryContext ApplyMessageContext = NULL;
      98             : MemoryContext ApplyContext = NULL;
      99             : 
     100             : WalReceiverConn *wrconn = NULL;
     101             : 
     102             : Subscription *MySubscription = NULL;
     103             : bool        MySubscriptionValid = false;
     104             : 
     105             : bool        in_remote_transaction = false;
     106             : static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
     107             : 
     108             : static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
     109             : 
     110             : static void store_flush_position(XLogRecPtr remote_lsn);
     111             : 
     112             : static void maybe_reread_subscription(void);
     113             : 
     114             : /* Flags set by signal handlers */
     115             : static volatile sig_atomic_t got_SIGHUP = false;
     116             : 
     117             : /*
     118             :  * Should this worker apply changes for given relation.
     119             :  *
     120             :  * This is mainly needed for initial relation data sync as that runs in
     121             :  * separate worker process running in parallel and we need some way to skip
     122             :  * changes coming to the main apply worker during the sync of a table.
     123             :  *
     124             :  * Note we need to do smaller or equals comparison for SYNCDONE state because
     125             :  * it might hold position of end of initial slot consistent point WAL
     126             :  * record + 1 (ie start of next record) and next record can be COMMIT of
     127             :  * transaction we are now processing (which is what we set remote_final_lsn
     128             :  * to in apply_handle_begin).
     129             :  */
     130             : static bool
     131        1336 : should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
     132             : {
     133        1336 :     if (am_tablesync_worker())
     134           0 :         return MyLogicalRepWorker->relid == rel->localreloid;
     135             :     else
     136        2672 :         return (rel->state == SUBREL_STATE_READY ||
     137          24 :                 (rel->state == SUBREL_STATE_SYNCDONE &&
     138           0 :                  rel->statelsn <= remote_final_lsn));
     139             : }
     140             : 
     141             : /*
     142             :  * Make sure that we started local transaction.
     143             :  *
     144             :  * Also switches to ApplyMessageContext as necessary.
     145             :  */
     146             : static bool
     147        1336 : ensure_transaction(void)
     148             : {
     149        1336 :     if (IsTransactionState())
     150             :     {
     151        1124 :         SetCurrentStatementStartTimestamp();
     152             : 
     153        1124 :         if (CurrentMemoryContext != ApplyMessageContext)
     154           0 :             MemoryContextSwitchTo(ApplyMessageContext);
     155             : 
     156        1124 :         return false;
     157             :     }
     158             : 
     159         212 :     SetCurrentStatementStartTimestamp();
     160         212 :     StartTransactionCommand();
     161             : 
     162         212 :     maybe_reread_subscription();
     163             : 
     164         212 :     MemoryContextSwitchTo(ApplyMessageContext);
     165         212 :     return true;
     166             : }
     167             : 
     168             : 
     169             : /*
     170             :  * Executor state preparation for evaluation of constraint expressions,
     171             :  * indexes and triggers.
     172             :  *
     173             :  * This is based on similar code in copy.c
     174             :  */
     175             : static EState *
     176        1300 : create_estate_for_relation(LogicalRepRelMapEntry *rel)
     177             : {
     178             :     EState     *estate;
     179             :     ResultRelInfo *resultRelInfo;
     180             :     RangeTblEntry *rte;
     181             : 
     182        1300 :     estate = CreateExecutorState();
     183             : 
     184        1300 :     rte = makeNode(RangeTblEntry);
     185        1300 :     rte->rtekind = RTE_RELATION;
     186        1300 :     rte->relid = RelationGetRelid(rel->localrel);
     187        1300 :     rte->relkind = rel->localrel->rd_rel->relkind;
     188        1300 :     rte->rellockmode = AccessShareLock;
     189        1300 :     ExecInitRangeTable(estate, list_make1(rte));
     190             : 
     191        1300 :     resultRelInfo = makeNode(ResultRelInfo);
     192        1300 :     InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
     193             : 
     194        1300 :     estate->es_result_relations = resultRelInfo;
     195        1300 :     estate->es_num_result_relations = 1;
     196        1300 :     estate->es_result_relation_info = resultRelInfo;
     197             : 
     198        1300 :     estate->es_output_cid = GetCurrentCommandId(true);
     199             : 
     200             :     /* Prepare to catch AFTER triggers. */
     201        1300 :     AfterTriggerBeginQuery();
     202             : 
     203        1300 :     return estate;
     204             : }
     205             : 
     206             : /*
     207             :  * Executes default values for columns for which we can't map to remote
     208             :  * relation columns.
     209             :  *
     210             :  * This allows us to support tables which have more columns on the downstream
     211             :  * than on the upstream.
     212             :  */
     213             : static void
     214         704 : slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
     215             :                    TupleTableSlot *slot)
     216             : {
     217         704 :     TupleDesc   desc = RelationGetDescr(rel->localrel);
     218         704 :     int         num_phys_attrs = desc->natts;
     219             :     int         i;
     220             :     int         attnum,
     221         704 :                 num_defaults = 0;
     222             :     int        *defmap;
     223             :     ExprState **defexprs;
     224             :     ExprContext *econtext;
     225             : 
     226         704 :     econtext = GetPerTupleExprContext(estate);
     227             : 
     228             :     /* We got all the data via replication, no need to evaluate anything. */
     229         704 :     if (num_phys_attrs == rel->remoterel.natts)
     230         694 :         return;
     231             : 
     232          10 :     defmap = (int *) palloc(num_phys_attrs * sizeof(int));
     233          10 :     defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
     234             : 
     235          38 :     for (attnum = 0; attnum < num_phys_attrs; attnum++)
     236             :     {
     237             :         Expr       *defexpr;
     238             : 
     239          28 :         if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
     240           4 :             continue;
     241             : 
     242          24 :         if (rel->attrmap[attnum] >= 0)
     243          14 :             continue;
     244             : 
     245          10 :         defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
     246             : 
     247          10 :         if (defexpr != NULL)
     248             :         {
     249             :             /* Run the expression through planner */
     250           8 :             defexpr = expression_planner(defexpr);
     251             : 
     252             :             /* Initialize executable expression in copycontext */
     253           8 :             defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
     254           8 :             defmap[num_defaults] = attnum;
     255           8 :             num_defaults++;
     256             :         }
     257             : 
     258             :     }
     259             : 
     260          18 :     for (i = 0; i < num_defaults; i++)
     261          16 :         slot->tts_values[defmap[i]] =
     262           8 :             ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
     263             : }
     264             : 
     265             : /*
     266             :  * Error callback to give more context info about type conversion failure.
     267             :  */
     268             : static void
     269           0 : slot_store_error_callback(void *arg)
     270             : {
     271           0 :     SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
     272             :     LogicalRepRelMapEntry *rel;
     273             :     char       *remotetypname;
     274             :     Oid         remotetypoid,
     275             :                 localtypoid;
     276             : 
     277             :     /* Nothing to do if remote attribute number is not set */
     278           0 :     if (errarg->remote_attnum < 0)
     279           0 :         return;
     280             : 
     281           0 :     rel = errarg->rel;
     282           0 :     remotetypoid = rel->remoterel.atttyps[errarg->remote_attnum];
     283             : 
     284             :     /* Fetch remote type name from the LogicalRepTypMap cache */
     285           0 :     remotetypname = logicalrep_typmap_gettypname(remotetypoid);
     286             : 
     287             :     /* Fetch local type OID from the local sys cache */
     288           0 :     localtypoid = get_atttype(rel->localreloid, errarg->local_attnum + 1);
     289             : 
     290           0 :     errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
     291             :                "remote type %s, local type %s",
     292             :                rel->remoterel.nspname, rel->remoterel.relname,
     293           0 :                rel->remoterel.attnames[errarg->remote_attnum],
     294             :                remotetypname,
     295             :                format_type_be(localtypoid));
     296             : }
     297             : 
     298             : /*
     299             :  * Store data in C string form into slot.
     300             :  * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
     301             :  * use better.
     302             :  */
     303             : static void
     304        1300 : slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
     305             :                     char **values)
     306             : {
     307        1300 :     int         natts = slot->tts_tupleDescriptor->natts;
     308             :     int         i;
     309             :     SlotErrCallbackArg errarg;
     310             :     ErrorContextCallback errcallback;
     311             : 
     312        1300 :     ExecClearTuple(slot);
     313             : 
     314             :     /* Push callback + info on the error context stack */
     315        1300 :     errarg.rel = rel;
     316        1300 :     errarg.local_attnum = -1;
     317        1300 :     errarg.remote_attnum = -1;
     318        1300 :     errcallback.callback = slot_store_error_callback;
     319        1300 :     errcallback.arg = (void *) &errarg;
     320        1300 :     errcallback.previous = error_context_stack;
     321        1300 :     error_context_stack = &errcallback;
     322             : 
     323             :     /* Call the "in" function for each non-dropped attribute */
     324        3204 :     for (i = 0; i < natts; i++)
     325             :     {
     326        1904 :         Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
     327        1904 :         int         remoteattnum = rel->attrmap[i];
     328             : 
     329        3768 :         if (!att->attisdropped && remoteattnum >= 0 &&
     330        1864 :             values[remoteattnum] != NULL)
     331        1578 :         {
     332             :             Oid         typinput;
     333             :             Oid         typioparam;
     334             : 
     335        1578 :             errarg.local_attnum = i;
     336        1578 :             errarg.remote_attnum = remoteattnum;
     337             : 
     338        1578 :             getTypeInputInfo(att->atttypid, &typinput, &typioparam);
     339        3156 :             slot->tts_values[i] =
     340        1578 :                 OidInputFunctionCall(typinput, values[remoteattnum],
     341             :                                      typioparam, att->atttypmod);
     342        1578 :             slot->tts_isnull[i] = false;
     343             : 
     344        1578 :             errarg.local_attnum = -1;
     345        1578 :             errarg.remote_attnum = -1;
     346             :         }
     347             :         else
     348             :         {
     349             :             /*
     350             :              * We assign NULL to dropped attributes, NULL values, and missing
     351             :              * values (missing values should be later filled using
     352             :              * slot_fill_defaults).
     353             :              */
     354         326 :             slot->tts_values[i] = (Datum) 0;
     355         326 :             slot->tts_isnull[i] = true;
     356             :         }
     357             :     }
     358             : 
     359             :     /* Pop the error context stack */
     360        1300 :     error_context_stack = errcallback.previous;
     361             : 
     362        1300 :     ExecStoreVirtualTuple(slot);
     363        1300 : }
     364             : 
     365             : /*
     366             :  * Modify slot with user data provided as C strings.
     367             :  * This is somewhat similar to heap_modify_tuple but also calls the type
     368             :  * input function on the user data as the input is the text representation
     369             :  * of the types.
     370             :  */
     371             : static void
     372         214 : slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
     373             :                      char **values, bool *replaces)
     374             : {
     375         214 :     int         natts = slot->tts_tupleDescriptor->natts;
     376             :     int         i;
     377             :     SlotErrCallbackArg errarg;
     378             :     ErrorContextCallback errcallback;
     379             : 
     380         214 :     slot_getallattrs(slot);
     381         214 :     ExecClearTuple(slot);
     382             : 
     383             :     /* Push callback + info on the error context stack */
     384         214 :     errarg.rel = rel;
     385         214 :     errarg.local_attnum = -1;
     386         214 :     errarg.remote_attnum = -1;
     387         214 :     errcallback.callback = slot_store_error_callback;
     388         214 :     errcallback.arg = (void *) &errarg;
     389         214 :     errcallback.previous = error_context_stack;
     390         214 :     error_context_stack = &errcallback;
     391             : 
     392             :     /* Call the "in" function for each replaced attribute */
     393         600 :     for (i = 0; i < natts; i++)
     394             :     {
     395         386 :         Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
     396         386 :         int         remoteattnum = rel->attrmap[i];
     397             : 
     398         386 :         if (remoteattnum < 0)
     399          26 :             continue;
     400             : 
     401         360 :         if (!replaces[remoteattnum])
     402           0 :             continue;
     403             : 
     404         360 :         if (values[remoteattnum] != NULL)
     405             :         {
     406             :             Oid         typinput;
     407             :             Oid         typioparam;
     408             : 
     409         316 :             errarg.local_attnum = i;
     410         316 :             errarg.remote_attnum = remoteattnum;
     411             : 
     412         316 :             getTypeInputInfo(att->atttypid, &typinput, &typioparam);
     413         632 :             slot->tts_values[i] =
     414         316 :                 OidInputFunctionCall(typinput, values[remoteattnum],
     415             :                                      typioparam, att->atttypmod);
     416         316 :             slot->tts_isnull[i] = false;
     417             : 
     418         316 :             errarg.local_attnum = -1;
     419         316 :             errarg.remote_attnum = -1;
     420             :         }
     421             :         else
     422             :         {
     423          44 :             slot->tts_values[i] = (Datum) 0;
     424          44 :             slot->tts_isnull[i] = true;
     425             :         }
     426             :     }
     427             : 
     428             :     /* Pop the error context stack */
     429         214 :     error_context_stack = errcallback.previous;
     430             : 
     431         214 :     ExecStoreVirtualTuple(slot);
     432         214 : }
     433             : 
     434             : /*
     435             :  * Handle BEGIN message.
     436             :  */
     437             : static void
     438         274 : apply_handle_begin(StringInfo s)
     439             : {
     440             :     LogicalRepBeginData begin_data;
     441             : 
     442         274 :     logicalrep_read_begin(s, &begin_data);
     443             : 
     444         274 :     remote_final_lsn = begin_data.final_lsn;
     445             : 
     446         274 :     in_remote_transaction = true;
     447             : 
     448         274 :     pgstat_report_activity(STATE_RUNNING, NULL);
     449         274 : }
     450             : 
     451             : /*
     452             :  * Handle COMMIT message.
     453             :  *
     454             :  * TODO, support tracking of multiple origins
     455             :  */
     456             : static void
     457         272 : apply_handle_commit(StringInfo s)
     458             : {
     459             :     LogicalRepCommitData commit_data;
     460             : 
     461         272 :     logicalrep_read_commit(s, &commit_data);
     462             : 
     463             :     Assert(commit_data.commit_lsn == remote_final_lsn);
     464             : 
     465             :     /* The synchronization worker runs in single transaction. */
     466         272 :     if (IsTransactionState() && !am_tablesync_worker())
     467             :     {
     468             :         /*
     469             :          * Update origin state so we can restart streaming from correct
     470             :          * position in case of crash.
     471             :          */
     472         210 :         replorigin_session_origin_lsn = commit_data.end_lsn;
     473         210 :         replorigin_session_origin_timestamp = commit_data.committime;
     474             : 
     475         210 :         CommitTransactionCommand();
     476         210 :         pgstat_report_stat(false);
     477             : 
     478         210 :         store_flush_position(commit_data.end_lsn);
     479             :     }
     480             :     else
     481             :     {
     482             :         /* Process any invalidation messages that might have accumulated. */
     483          62 :         AcceptInvalidationMessages();
     484          62 :         maybe_reread_subscription();
     485             :     }
     486             : 
     487         272 :     in_remote_transaction = false;
     488             : 
     489             :     /* Process any tables that are being synchronized in parallel. */
     490         272 :     process_syncing_tables(commit_data.end_lsn);
     491             : 
     492         272 :     pgstat_report_activity(STATE_IDLE, NULL);
     493         272 : }
     494             : 
     495             : /*
     496             :  * Handle ORIGIN message.
     497             :  *
     498             :  * TODO, support tracking of multiple origins
     499             :  */
     500             : static void
     501           0 : apply_handle_origin(StringInfo s)
     502             : {
     503             :     /*
     504             :      * ORIGIN message can only come inside remote transaction and before any
     505             :      * actual writes.
     506             :      */
     507           0 :     if (!in_remote_transaction ||
     508           0 :         (IsTransactionState() && !am_tablesync_worker()))
     509           0 :         ereport(ERROR,
     510             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     511             :                  errmsg("ORIGIN message sent out of order")));
     512           0 : }
     513             : 
     514             : /*
     515             :  * Handle RELATION message.
     516             :  *
     517             :  * Note we don't do validation against local schema here. The validation
     518             :  * against local schema is postponed until first change for given relation
     519             :  * comes as we only care about it when applying changes for it anyway and we
     520             :  * do less locking this way.
     521             :  */
     522             : static void
     523          90 : apply_handle_relation(StringInfo s)
     524             : {
     525             :     LogicalRepRelation *rel;
     526             : 
     527          90 :     rel = logicalrep_read_rel(s);
     528          90 :     logicalrep_relmap_update(rel);
     529          90 : }
     530             : 
     531             : /*
     532             :  * Handle TYPE message.
     533             :  *
     534             :  * Note we don't do local mapping here, that's done when the type is
     535             :  * actually used.
     536             :  */
     537             : static void
     538          32 : apply_handle_type(StringInfo s)
     539             : {
     540             :     LogicalRepTyp typ;
     541             : 
     542          32 :     logicalrep_read_typ(s, &typ);
     543          32 :     logicalrep_typmap_update(&typ);
     544          32 : }
     545             : 
     546             : /*
     547             :  * Get replica identity index or if it is not defined a primary key.
     548             :  *
     549             :  * If neither is defined, returns InvalidOid
     550             :  */
     551             : static Oid
     552         596 : GetRelationIdentityOrPK(Relation rel)
     553             : {
     554             :     Oid         idxoid;
     555             : 
     556         596 :     idxoid = RelationGetReplicaIndex(rel);
     557             : 
     558         596 :     if (!OidIsValid(idxoid))
     559         244 :         idxoid = RelationGetPrimaryKeyIndex(rel);
     560             : 
     561         596 :     return idxoid;
     562             : }
     563             : 
     564             : /*
     565             :  * Handle INSERT message.
     566             :  */
     567             : static void
     568         730 : apply_handle_insert(StringInfo s)
     569             : {
     570             :     LogicalRepRelMapEntry *rel;
     571             :     LogicalRepTupleData newtup;
     572             :     LogicalRepRelId relid;
     573             :     EState     *estate;
     574             :     TupleTableSlot *remoteslot;
     575             :     MemoryContext oldctx;
     576             : 
     577         730 :     ensure_transaction();
     578             : 
     579         730 :     relid = logicalrep_read_insert(s, &newtup);
     580         730 :     rel = logicalrep_rel_open(relid, RowExclusiveLock);
     581         728 :     if (!should_apply_changes_for_rel(rel))
     582             :     {
     583             :         /*
     584             :          * The relation can't become interesting in the middle of the
     585             :          * transaction so it's safe to unlock it.
     586             :          */
     587          24 :         logicalrep_rel_close(rel, RowExclusiveLock);
     588          24 :         return;
     589             :     }
     590             : 
     591             :     /* Initialize the executor state. */
     592         704 :     estate = create_estate_for_relation(rel);
     593         704 :     remoteslot = ExecInitExtraTupleSlot(estate,
     594         704 :                                         RelationGetDescr(rel->localrel),
     595             :                                         &TTSOpsVirtual);
     596             : 
     597             :     /* Input functions may need an active snapshot, so get one */
     598         704 :     PushActiveSnapshot(GetTransactionSnapshot());
     599             : 
     600             :     /* Process and store remote tuple in the slot */
     601         704 :     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
     602         704 :     slot_store_cstrings(remoteslot, rel, newtup.values);
     603         704 :     slot_fill_defaults(rel, estate, remoteslot);
     604         704 :     MemoryContextSwitchTo(oldctx);
     605             : 
     606         704 :     ExecOpenIndices(estate->es_result_relation_info, false);
     607             : 
     608             :     /* Do the insert. */
     609         704 :     ExecSimpleRelationInsert(estate, remoteslot);
     610             : 
     611             :     /* Cleanup. */
     612         704 :     ExecCloseIndices(estate->es_result_relation_info);
     613         704 :     PopActiveSnapshot();
     614             : 
     615             :     /* Handle queued AFTER triggers. */
     616         704 :     AfterTriggerEndQuery(estate);
     617             : 
     618         704 :     ExecResetTupleTable(estate->es_tupleTable, false);
     619         704 :     FreeExecutorState(estate);
     620             : 
     621         704 :     logicalrep_rel_close(rel, NoLock);
     622             : 
     623         704 :     CommandCounterIncrement();
     624             : }
     625             : 
     626             : /*
     627             :  * Check if the logical replication relation is updatable and throw
     628             :  * appropriate error if it isn't.
     629             :  */
     630             : static void
     631         596 : check_relation_updatable(LogicalRepRelMapEntry *rel)
     632             : {
     633             :     /* Updatable, no error. */
     634         596 :     if (rel->updatable)
     635         596 :         return;
     636             : 
     637             :     /*
     638             :      * We are in error mode so it's fine this is somewhat slow. It's better to
     639             :      * give user correct error.
     640             :      */
     641           0 :     if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
     642             :     {
     643           0 :         ereport(ERROR,
     644             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     645             :                  errmsg("publisher did not send replica identity column "
     646             :                         "expected by the logical replication target relation \"%s.%s\"",
     647             :                         rel->remoterel.nspname, rel->remoterel.relname)));
     648             :     }
     649             : 
     650           0 :     ereport(ERROR,
     651             :             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     652             :              errmsg("logical replication target relation \"%s.%s\" has "
     653             :                     "neither REPLICA IDENTITY index nor PRIMARY "
     654             :                     "KEY and published relation does not have "
     655             :                     "REPLICA IDENTITY FULL",
     656             :                     rel->remoterel.nspname, rel->remoterel.relname)));
     657             : }
     658             : 
     659             : /*
     660             :  * Handle UPDATE message.
     661             :  *
     662             :  * TODO: FDW support
     663             :  */
     664             : static void
     665         214 : apply_handle_update(StringInfo s)
     666             : {
     667             :     LogicalRepRelMapEntry *rel;
     668             :     LogicalRepRelId relid;
     669             :     Oid         idxoid;
     670             :     EState     *estate;
     671             :     EPQState    epqstate;
     672             :     LogicalRepTupleData oldtup;
     673             :     LogicalRepTupleData newtup;
     674             :     bool        has_oldtup;
     675             :     TupleTableSlot *localslot;
     676             :     TupleTableSlot *remoteslot;
     677             :     bool        found;
     678             :     MemoryContext oldctx;
     679             : 
     680         214 :     ensure_transaction();
     681             : 
     682         214 :     relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
     683             :                                    &newtup);
     684         214 :     rel = logicalrep_rel_open(relid, RowExclusiveLock);
     685         214 :     if (!should_apply_changes_for_rel(rel))
     686             :     {
     687             :         /*
     688             :          * The relation can't become interesting in the middle of the
     689             :          * transaction so it's safe to unlock it.
     690             :          */
     691           0 :         logicalrep_rel_close(rel, RowExclusiveLock);
     692           0 :         return;
     693             :     }
     694             : 
     695             :     /* Check if we can do the update. */
     696         214 :     check_relation_updatable(rel);
     697             : 
     698             :     /* Initialize the executor state. */
     699         214 :     estate = create_estate_for_relation(rel);
     700         214 :     remoteslot = ExecInitExtraTupleSlot(estate,
     701         214 :                                         RelationGetDescr(rel->localrel),
     702             :                                         &TTSOpsVirtual);
     703         214 :     localslot = table_slot_create(rel->localrel,
     704             :                                   &estate->es_tupleTable);
     705         214 :     EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
     706             : 
     707         214 :     PushActiveSnapshot(GetTransactionSnapshot());
     708         214 :     ExecOpenIndices(estate->es_result_relation_info, false);
     709             : 
     710             :     /* Build the search tuple. */
     711         214 :     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
     712         214 :     slot_store_cstrings(remoteslot, rel,
     713         214 :                         has_oldtup ? oldtup.values : newtup.values);
     714         214 :     MemoryContextSwitchTo(oldctx);
     715             : 
     716             :     /*
     717             :      * Try to find tuple using either replica identity index, primary key or
     718             :      * if needed, sequential scan.
     719             :      */
     720         214 :     idxoid = GetRelationIdentityOrPK(rel->localrel);
     721             :     Assert(OidIsValid(idxoid) ||
     722             :            (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup));
     723             : 
     724         214 :     if (OidIsValid(idxoid))
     725         170 :         found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
     726             :                                              LockTupleExclusive,
     727             :                                              remoteslot, localslot);
     728             :     else
     729          44 :         found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
     730             :                                          remoteslot, localslot);
     731             : 
     732         214 :     ExecClearTuple(remoteslot);
     733             : 
     734             :     /*
     735             :      * Tuple found.
     736             :      *
     737             :      * Note this will fail if there are other conflicting unique indexes.
     738             :      */
     739         214 :     if (found)
     740             :     {
     741             :         /* Process and store remote tuple in the slot */
     742         214 :         oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
     743         214 :         ExecCopySlot(remoteslot, localslot);
     744         214 :         slot_modify_cstrings(remoteslot, rel, newtup.values, newtup.changed);
     745         214 :         MemoryContextSwitchTo(oldctx);
     746             : 
     747         214 :         EvalPlanQualSetSlot(&epqstate, remoteslot);
     748             : 
     749             :         /* Do the actual update. */
     750         214 :         ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
     751             :     }
     752             :     else
     753             :     {
     754             :         /*
     755             :          * The tuple to be updated could not be found.
     756             :          *
     757             :          * TODO what to do here, change the log level to LOG perhaps?
     758             :          */
     759           0 :         elog(DEBUG1,
     760             :              "logical replication did not find row for update "
     761             :              "in replication target relation \"%s\"",
     762             :              RelationGetRelationName(rel->localrel));
     763             :     }
     764             : 
     765             :     /* Cleanup. */
     766         214 :     ExecCloseIndices(estate->es_result_relation_info);
     767         214 :     PopActiveSnapshot();
     768             : 
     769             :     /* Handle queued AFTER triggers. */
     770         214 :     AfterTriggerEndQuery(estate);
     771             : 
     772         214 :     EvalPlanQualEnd(&epqstate);
     773         214 :     ExecResetTupleTable(estate->es_tupleTable, false);
     774         214 :     FreeExecutorState(estate);
     775             : 
     776         214 :     logicalrep_rel_close(rel, NoLock);
     777             : 
     778         214 :     CommandCounterIncrement();
     779             : }
     780             : 
     781             : /*
     782             :  * Handle DELETE message.
     783             :  *
     784             :  * TODO: FDW support
     785             :  */
     786             : static void
     787         382 : apply_handle_delete(StringInfo s)
     788             : {
     789             :     LogicalRepRelMapEntry *rel;
     790             :     LogicalRepTupleData oldtup;
     791             :     LogicalRepRelId relid;
     792             :     Oid         idxoid;
     793             :     EState     *estate;
     794             :     EPQState    epqstate;
     795             :     TupleTableSlot *remoteslot;
     796             :     TupleTableSlot *localslot;
     797             :     bool        found;
     798             :     MemoryContext oldctx;
     799             : 
     800         382 :     ensure_transaction();
     801             : 
     802         382 :     relid = logicalrep_read_delete(s, &oldtup);
     803         382 :     rel = logicalrep_rel_open(relid, RowExclusiveLock);
     804         382 :     if (!should_apply_changes_for_rel(rel))
     805             :     {
     806             :         /*
     807             :          * The relation can't become interesting in the middle of the
     808             :          * transaction so it's safe to unlock it.
     809             :          */
     810           0 :         logicalrep_rel_close(rel, RowExclusiveLock);
     811           0 :         return;
     812             :     }
     813             : 
     814             :     /* Check if we can do the delete. */
     815         382 :     check_relation_updatable(rel);
     816             : 
     817             :     /* Initialize the executor state. */
     818         382 :     estate = create_estate_for_relation(rel);
     819         382 :     remoteslot = ExecInitExtraTupleSlot(estate,
     820         382 :                                         RelationGetDescr(rel->localrel),
     821             :                                         &TTSOpsVirtual);
     822         382 :     localslot = table_slot_create(rel->localrel,
     823             :                                   &estate->es_tupleTable);
     824         382 :     EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
     825             : 
     826         382 :     PushActiveSnapshot(GetTransactionSnapshot());
     827         382 :     ExecOpenIndices(estate->es_result_relation_info, false);
     828             : 
     829             :     /* Find the tuple using the replica identity index. */
     830         382 :     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
     831         382 :     slot_store_cstrings(remoteslot, rel, oldtup.values);
     832         382 :     MemoryContextSwitchTo(oldctx);
     833             : 
     834             :     /*
     835             :      * Try to find tuple using either replica identity index, primary key or
     836             :      * if needed, sequential scan.
     837             :      */
     838         382 :     idxoid = GetRelationIdentityOrPK(rel->localrel);
     839             :     Assert(OidIsValid(idxoid) ||
     840             :            (rel->remoterel.replident == REPLICA_IDENTITY_FULL));
     841             : 
     842         382 :     if (OidIsValid(idxoid))
     843         182 :         found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
     844             :                                              LockTupleExclusive,
     845             :                                              remoteslot, localslot);
     846             :     else
     847         200 :         found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
     848             :                                          remoteslot, localslot);
     849             :     /* If found delete it. */
     850         382 :     if (found)
     851             :     {
     852         382 :         EvalPlanQualSetSlot(&epqstate, localslot);
     853             : 
     854             :         /* Do the actual delete. */
     855         382 :         ExecSimpleRelationDelete(estate, &epqstate, localslot);
     856             :     }
     857             :     else
     858             :     {
     859             :         /* The tuple to be deleted could not be found. */
     860           0 :         elog(DEBUG1,
     861             :              "logical replication could not find row for delete "
     862             :              "in replication target relation \"%s\"",
     863             :              RelationGetRelationName(rel->localrel));
     864             :     }
     865             : 
     866             :     /* Cleanup. */
     867         382 :     ExecCloseIndices(estate->es_result_relation_info);
     868         382 :     PopActiveSnapshot();
     869             : 
     870             :     /* Handle queued AFTER triggers. */
     871         382 :     AfterTriggerEndQuery(estate);
     872             : 
     873         382 :     EvalPlanQualEnd(&epqstate);
     874         382 :     ExecResetTupleTable(estate->es_tupleTable, false);
     875         382 :     FreeExecutorState(estate);
     876             : 
     877         382 :     logicalrep_rel_close(rel, NoLock);
     878             : 
     879         382 :     CommandCounterIncrement();
     880             : }
     881             : 
     882             : /*
     883             :  * Handle TRUNCATE message.
     884             :  *
     885             :  * TODO: FDW support
     886             :  */
     887             : static void
     888          10 : apply_handle_truncate(StringInfo s)
     889             : {
     890          10 :     bool        cascade = false;
     891          10 :     bool        restart_seqs = false;
     892          10 :     List       *remote_relids = NIL;
     893          10 :     List       *remote_rels = NIL;
     894          10 :     List       *rels = NIL;
     895          10 :     List       *relids = NIL;
     896          10 :     List       *relids_logged = NIL;
     897             :     ListCell   *lc;
     898             : 
     899          10 :     ensure_transaction();
     900             : 
     901          10 :     remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
     902             : 
     903          22 :     foreach(lc, remote_relids)
     904             :     {
     905          12 :         LogicalRepRelId relid = lfirst_oid(lc);
     906             :         LogicalRepRelMapEntry *rel;
     907             : 
     908          12 :         rel = logicalrep_rel_open(relid, RowExclusiveLock);
     909          12 :         if (!should_apply_changes_for_rel(rel))
     910             :         {
     911             :             /*
     912             :              * The relation can't become interesting in the middle of the
     913             :              * transaction so it's safe to unlock it.
     914             :              */
     915           0 :             logicalrep_rel_close(rel, RowExclusiveLock);
     916           0 :             continue;
     917             :         }
     918             : 
     919          12 :         remote_rels = lappend(remote_rels, rel);
     920          12 :         rels = lappend(rels, rel->localrel);
     921          12 :         relids = lappend_oid(relids, rel->localreloid);
     922          12 :         if (RelationIsLogicallyLogged(rel->localrel))
     923          12 :             relids_logged = lappend_oid(relids_logged, rel->localreloid);
     924             :     }
     925             : 
     926             :     /*
     927             :      * Even if we used CASCADE on the upstream master we explicitly default to
     928             :      * replaying changes without further cascading. This might be later
     929             :      * changeable with a user specified option.
     930             :      */
     931          10 :     ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
     932             : 
     933          22 :     foreach(lc, remote_rels)
     934             :     {
     935          12 :         LogicalRepRelMapEntry *rel = lfirst(lc);
     936             : 
     937          12 :         logicalrep_rel_close(rel, NoLock);
     938             :     }
     939             : 
     940          10 :     CommandCounterIncrement();
     941          10 : }
     942             : 
     943             : 
     944             : /*
     945             :  * Logical replication protocol message dispatcher.
     946             :  */
     947             : static void
     948        2004 : apply_dispatch(StringInfo s)
     949             : {
     950        2004 :     char        action = pq_getmsgbyte(s);
     951             : 
     952        2004 :     switch (action)
     953             :     {
     954             :             /* BEGIN */
     955             :         case 'B':
     956         274 :             apply_handle_begin(s);
     957         274 :             break;
     958             :             /* COMMIT */
     959             :         case 'C':
     960         272 :             apply_handle_commit(s);
     961         272 :             break;
     962             :             /* INSERT */
     963             :         case 'I':
     964         730 :             apply_handle_insert(s);
     965         728 :             break;
     966             :             /* UPDATE */
     967             :         case 'U':
     968         214 :             apply_handle_update(s);
     969         214 :             break;
     970             :             /* DELETE */
     971             :         case 'D':
     972         382 :             apply_handle_delete(s);
     973         382 :             break;
     974             :             /* TRUNCATE */
     975             :         case 'T':
     976          10 :             apply_handle_truncate(s);
     977          10 :             break;
     978             :             /* RELATION */
     979             :         case 'R':
     980          90 :             apply_handle_relation(s);
     981          90 :             break;
     982             :             /* TYPE */
     983             :         case 'Y':
     984          32 :             apply_handle_type(s);
     985          32 :             break;
     986             :             /* ORIGIN */
     987             :         case 'O':
     988           0 :             apply_handle_origin(s);
     989           0 :             break;
     990             :         default:
     991           0 :             ereport(ERROR,
     992             :                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
     993             :                      errmsg("invalid logical replication message type \"%c\"", action)));
     994             :     }
     995        2002 : }
     996             : 
     997             : /*
     998             :  * Figure out which write/flush positions to report to the walsender process.
     999             :  *
    1000             :  * We can't simply report back the last LSN the walsender sent us because the
    1001             :  * local transaction might not yet be flushed to disk locally. Instead we
    1002             :  * build a list that associates local with remote LSNs for every commit. When
    1003             :  * reporting back the flush position to the sender we iterate that list and
    1004             :  * check which entries on it are already locally flushed. Those we can report
    1005             :  * as having been flushed.
    1006             :  *
    1007             :  * The have_pending_txes is true if there are outstanding transactions that
    1008             :  * need to be flushed.
    1009             :  */
    1010             : static void
    1011        8948 : get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
    1012             :                    bool *have_pending_txes)
    1013             : {
    1014             :     dlist_mutable_iter iter;
    1015        8948 :     XLogRecPtr  local_flush = GetFlushRecPtr();
    1016             : 
    1017        8948 :     *write = InvalidXLogRecPtr;
    1018        8948 :     *flush = InvalidXLogRecPtr;
    1019             : 
    1020        9146 :     dlist_foreach_modify(iter, &lsn_mapping)
    1021             :     {
    1022        8448 :         FlushPosition *pos =
    1023        8448 :         dlist_container(FlushPosition, node, iter.cur);
    1024             : 
    1025        8448 :         *write = pos->remote_end;
    1026             : 
    1027        8448 :         if (pos->local_end <= local_flush)
    1028             :         {
    1029         198 :             *flush = pos->remote_end;
    1030         198 :             dlist_delete(iter.cur);
    1031         198 :             pfree(pos);
    1032             :         }
    1033             :         else
    1034             :         {
    1035             :             /*
    1036             :              * Don't want to uselessly iterate over the rest of the list which
    1037             :              * could potentially be long. Instead get the last element and
    1038             :              * grab the write position from there.
    1039             :              */
    1040        8250 :             pos = dlist_tail_element(FlushPosition, node,
    1041             :                                      &lsn_mapping);
    1042        8250 :             *write = pos->remote_end;
    1043        8250 :             *have_pending_txes = true;
    1044       16500 :             return;
    1045             :         }
    1046             :     }
    1047             : 
    1048         698 :     *have_pending_txes = !dlist_is_empty(&lsn_mapping);
    1049             : }
    1050             : 
    1051             : /*
    1052             :  * Store current remote/local lsn pair in the tracking list.
    1053             :  */
    1054             : static void
    1055         210 : store_flush_position(XLogRecPtr remote_lsn)
    1056             : {
    1057             :     FlushPosition *flushpos;
    1058             : 
    1059             :     /* Need to do this in permanent context */
    1060         210 :     MemoryContextSwitchTo(ApplyContext);
    1061             : 
    1062             :     /* Track commit lsn  */
    1063         210 :     flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
    1064         210 :     flushpos->local_end = XactLastCommitEnd;
    1065         210 :     flushpos->remote_end = remote_lsn;
    1066             : 
    1067         210 :     dlist_push_tail(&lsn_mapping, &flushpos->node);
    1068         210 :     MemoryContextSwitchTo(ApplyMessageContext);
    1069         210 : }
    1070             : 
    1071             : 
    1072             : /* Update statistics of the worker. */
    1073             : static void
    1074        6200 : UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
    1075             : {
    1076        6200 :     MyLogicalRepWorker->last_lsn = last_lsn;
    1077        6200 :     MyLogicalRepWorker->last_send_time = send_time;
    1078        6200 :     MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
    1079        6200 :     if (reply)
    1080             :     {
    1081        4196 :         MyLogicalRepWorker->reply_lsn = last_lsn;
    1082        4196 :         MyLogicalRepWorker->reply_time = send_time;
    1083             :     }
    1084        6200 : }
    1085             : 
    1086             : /*
    1087             :  * Apply main loop.
    1088             :  */
    1089             : static void
    1090          44 : LogicalRepApplyLoop(XLogRecPtr last_received)
    1091             : {
    1092          44 :     TimestampTz last_recv_timestamp = GetCurrentTimestamp();
    1093             : 
    1094             :     /*
    1095             :      * Init the ApplyMessageContext which we clean up after each replication
    1096             :      * protocol message.
    1097             :      */
    1098          44 :     ApplyMessageContext = AllocSetContextCreate(ApplyContext,
    1099             :                                                 "ApplyMessageContext",
    1100             :                                                 ALLOCSET_DEFAULT_SIZES);
    1101             : 
    1102             :     /* mark as idle, before starting to loop */
    1103          44 :     pgstat_report_activity(STATE_IDLE, NULL);
    1104             : 
    1105             :     for (;;)
    1106        4676 :     {
    1107        4720 :         pgsocket    fd = PGINVALID_SOCKET;
    1108             :         int         rc;
    1109             :         int         len;
    1110        4720 :         char       *buf = NULL;
    1111        4720 :         bool        endofstream = false;
    1112        4720 :         bool        ping_sent = false;
    1113             :         long        wait_time;
    1114             : 
    1115        4720 :         CHECK_FOR_INTERRUPTS();
    1116             : 
    1117        4720 :         MemoryContextSwitchTo(ApplyMessageContext);
    1118             : 
    1119        4720 :         len = walrcv_receive(wrconn, &buf, &fd);
    1120             : 
    1121        4714 :         if (len != 0)
    1122             :         {
    1123             :             /* Process the data */
    1124             :             for (;;)
    1125             :             {
    1126       16924 :                 CHECK_FOR_INTERRUPTS();
    1127             : 
    1128       10726 :                 if (len == 0)
    1129             :                 {
    1130        4522 :                     break;
    1131             :                 }
    1132        6204 :                 else if (len < 0)
    1133             :                 {
    1134           4 :                     ereport(LOG,
    1135             :                             (errmsg("data stream from publisher has ended")));
    1136           4 :                     endofstream = true;
    1137           4 :                     break;
    1138             :                 }
    1139             :                 else
    1140             :                 {
    1141             :                     int         c;
    1142             :                     StringInfoData s;
    1143             : 
    1144             :                     /* Reset timeout. */
    1145        6200 :                     last_recv_timestamp = GetCurrentTimestamp();
    1146        6200 :                     ping_sent = false;
    1147             : 
    1148             :                     /* Ensure we are reading the data into our memory context. */
    1149        6200 :                     MemoryContextSwitchTo(ApplyMessageContext);
    1150             : 
    1151        6200 :                     s.data = buf;
    1152        6200 :                     s.len = len;
    1153        6200 :                     s.cursor = 0;
    1154        6200 :                     s.maxlen = -1;
    1155             : 
    1156        6200 :                     c = pq_getmsgbyte(&s);
    1157             : 
    1158        6200 :                     if (c == 'w')
    1159             :                     {
    1160             :                         XLogRecPtr  start_lsn;
    1161             :                         XLogRecPtr  end_lsn;
    1162             :                         TimestampTz send_time;
    1163             : 
    1164        2004 :                         start_lsn = pq_getmsgint64(&s);
    1165        2004 :                         end_lsn = pq_getmsgint64(&s);
    1166        2004 :                         send_time = pq_getmsgint64(&s);
    1167             : 
    1168        2004 :                         if (last_received < start_lsn)
    1169        1418 :                             last_received = start_lsn;
    1170             : 
    1171        2004 :                         if (last_received < end_lsn)
    1172           0 :                             last_received = end_lsn;
    1173             : 
    1174        2004 :                         UpdateWorkerStats(last_received, send_time, false);
    1175             : 
    1176        2004 :                         apply_dispatch(&s);
    1177             :                     }
    1178        4196 :                     else if (c == 'k')
    1179             :                     {
    1180             :                         XLogRecPtr  end_lsn;
    1181             :                         TimestampTz timestamp;
    1182             :                         bool        reply_requested;
    1183             : 
    1184        4196 :                         end_lsn = pq_getmsgint64(&s);
    1185        4196 :                         timestamp = pq_getmsgint64(&s);
    1186        4196 :                         reply_requested = pq_getmsgbyte(&s);
    1187             : 
    1188        4196 :                         if (last_received < end_lsn)
    1189         140 :                             last_received = end_lsn;
    1190             : 
    1191        4196 :                         send_feedback(last_received, reply_requested, false);
    1192        4196 :                         UpdateWorkerStats(last_received, timestamp, true);
    1193             :                     }
    1194             :                     /* other message types are purposefully ignored */
    1195             : 
    1196        6198 :                     MemoryContextReset(ApplyMessageContext);
    1197             :                 }
    1198             : 
    1199        6198 :                 len = walrcv_receive(wrconn, &buf, &fd);
    1200             :             }
    1201             :         }
    1202             : 
    1203             :         /* confirm all writes so far */
    1204        4712 :         send_feedback(last_received, false, false);
    1205             : 
    1206        4712 :         if (!in_remote_transaction)
    1207             :         {
    1208             :             /*
    1209             :              * If we didn't get any transactions for a while there might be
    1210             :              * unconsumed invalidation messages in the queue, consume them
    1211             :              * now.
    1212             :              */
    1213        4602 :             AcceptInvalidationMessages();
    1214        4602 :             maybe_reread_subscription();
    1215             : 
    1216             :             /* Process any table synchronization changes. */
    1217        4596 :             process_syncing_tables(last_received);
    1218             :         }
    1219             : 
    1220             :         /* Cleanup the memory. */
    1221        4706 :         MemoryContextResetAndDeleteChildren(ApplyMessageContext);
    1222        4706 :         MemoryContextSwitchTo(TopMemoryContext);
    1223             : 
    1224             :         /* Check if we need to exit the streaming loop. */
    1225        4706 :         if (endofstream)
    1226             :         {
    1227             :             TimeLineID  tli;
    1228             : 
    1229           4 :             walrcv_endstreaming(wrconn, &tli);
    1230           0 :             break;
    1231             :         }
    1232             : 
    1233             :         /*
    1234             :          * Wait for more data or latch.  If we have unflushed transactions,
    1235             :          * wake up after WalWriterDelay to see if they've been flushed yet (in
    1236             :          * which case we should send a feedback message).  Otherwise, there's
    1237             :          * no particular urgency about waking up unless we get data or a
    1238             :          * signal.
    1239             :          */
    1240        4702 :         if (!dlist_is_empty(&lsn_mapping))
    1241        4226 :             wait_time = WalWriterDelay;
    1242             :         else
    1243         476 :             wait_time = NAPTIME_PER_CYCLE;
    1244             : 
    1245        4702 :         rc = WaitLatchOrSocket(MyLatch,
    1246             :                                WL_SOCKET_READABLE | WL_LATCH_SET |
    1247             :                                WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1248             :                                fd, wait_time,
    1249             :                                WAIT_EVENT_LOGICAL_APPLY_MAIN);
    1250             : 
    1251        4702 :         if (rc & WL_LATCH_SET)
    1252             :         {
    1253         124 :             ResetLatch(MyLatch);
    1254         124 :             CHECK_FOR_INTERRUPTS();
    1255             :         }
    1256             : 
    1257        4676 :         if (got_SIGHUP)
    1258             :         {
    1259           0 :             got_SIGHUP = false;
    1260           0 :             ProcessConfigFile(PGC_SIGHUP);
    1261             :         }
    1262             : 
    1263        4676 :         if (rc & WL_TIMEOUT)
    1264             :         {
    1265             :             /*
    1266             :              * We didn't receive anything new. If we haven't heard anything
    1267             :              * from the server for more than wal_receiver_timeout / 2, ping
    1268             :              * the server. Also, if it's been longer than
    1269             :              * wal_receiver_status_interval since the last update we sent,
    1270             :              * send a status update to the master anyway, to report any
    1271             :              * progress in applying WAL.
    1272             :              */
    1273          40 :             bool        requestReply = false;
    1274             : 
    1275             :             /*
    1276             :              * Check if time since last receive from standby has reached the
    1277             :              * configured limit.
    1278             :              */
    1279          40 :             if (wal_receiver_timeout > 0)
    1280             :             {
    1281          40 :                 TimestampTz now = GetCurrentTimestamp();
    1282             :                 TimestampTz timeout;
    1283             : 
    1284          40 :                 timeout =
    1285          40 :                     TimestampTzPlusMilliseconds(last_recv_timestamp,
    1286             :                                                 wal_receiver_timeout);
    1287             : 
    1288          40 :                 if (now >= timeout)
    1289           0 :                     ereport(ERROR,
    1290             :                             (errmsg("terminating logical replication worker due to timeout")));
    1291             : 
    1292             :                 /*
    1293             :                  * We didn't receive anything new, for half of receiver
    1294             :                  * replication timeout. Ping the server.
    1295             :                  */
    1296          40 :                 if (!ping_sent)
    1297             :                 {
    1298          40 :                     timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
    1299             :                                                           (wal_receiver_timeout / 2));
    1300          40 :                     if (now >= timeout)
    1301             :                     {
    1302           0 :                         requestReply = true;
    1303           0 :                         ping_sent = true;
    1304             :                     }
    1305             :                 }
    1306             :             }
    1307             : 
    1308          40 :             send_feedback(last_received, requestReply, requestReply);
    1309             :         }
    1310             :     }
    1311           0 : }
    1312             : 
    1313             : /*
    1314             :  * Send a Standby Status Update message to server.
    1315             :  *
    1316             :  * 'recvpos' is the latest LSN we've received data to, force is set if we need
    1317             :  * to send a response to avoid timeouts.
    1318             :  */
    1319             : static void
    1320        8948 : send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
    1321             : {
    1322             :     static StringInfo reply_message = NULL;
    1323             :     static TimestampTz send_time = 0;
    1324             : 
    1325             :     static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
    1326             :     static XLogRecPtr last_writepos = InvalidXLogRecPtr;
    1327             :     static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
    1328             : 
    1329             :     XLogRecPtr  writepos;
    1330             :     XLogRecPtr  flushpos;
    1331             :     TimestampTz now;
    1332             :     bool        have_pending_txes;
    1333             : 
    1334             :     /*
    1335             :      * If the user doesn't want status to be reported to the publisher, be
    1336             :      * sure to exit before doing anything at all.
    1337             :      */
    1338        8948 :     if (!force && wal_receiver_status_interval <= 0)
    1339        4470 :         return;
    1340             : 
    1341             :     /* It's legal to not pass a recvpos */
    1342        8948 :     if (recvpos < last_recvpos)
    1343           0 :         recvpos = last_recvpos;
    1344             : 
    1345        8948 :     get_flush_position(&writepos, &flushpos, &have_pending_txes);
    1346             : 
    1347             :     /*
    1348             :      * No outstanding transactions to flush, we can report the latest received
    1349             :      * position. This is important for synchronous replication.
    1350             :      */
    1351        8948 :     if (!have_pending_txes)
    1352         698 :         flushpos = writepos = recvpos;
    1353             : 
    1354        8948 :     if (writepos < last_writepos)
    1355           0 :         writepos = last_writepos;
    1356             : 
    1357        8948 :     if (flushpos < last_flushpos)
    1358        8222 :         flushpos = last_flushpos;
    1359             : 
    1360        8948 :     now = GetCurrentTimestamp();
    1361             : 
    1362             :     /* if we've already reported everything we're good */
    1363       13884 :     if (!force &&
    1364        9458 :         writepos == last_writepos &&
    1365        9028 :         flushpos == last_flushpos &&
    1366        4506 :         !TimestampDifferenceExceeds(send_time, now,
    1367             :                                     wal_receiver_status_interval * 1000))
    1368        4470 :         return;
    1369        4478 :     send_time = now;
    1370             : 
    1371        4478 :     if (!reply_message)
    1372             :     {
    1373          44 :         MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
    1374             : 
    1375          44 :         reply_message = makeStringInfo();
    1376          44 :         MemoryContextSwitchTo(oldctx);
    1377             :     }
    1378             :     else
    1379        4434 :         resetStringInfo(reply_message);
    1380             : 
    1381        4478 :     pq_sendbyte(reply_message, 'r');
    1382        4478 :     pq_sendint64(reply_message, recvpos);   /* write */
    1383        4478 :     pq_sendint64(reply_message, flushpos);  /* flush */
    1384        4478 :     pq_sendint64(reply_message, writepos);  /* apply */
    1385        4478 :     pq_sendint64(reply_message, now);   /* sendTime */
    1386        4478 :     pq_sendbyte(reply_message, requestReply);   /* replyRequested */
    1387             : 
    1388        4478 :     elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
    1389             :          force,
    1390             :          (uint32) (recvpos >> 32), (uint32) recvpos,
    1391             :          (uint32) (writepos >> 32), (uint32) writepos,
    1392             :          (uint32) (flushpos >> 32), (uint32) flushpos
    1393             :         );
    1394             : 
    1395        4478 :     walrcv_send(wrconn, reply_message->data, reply_message->len);
    1396             : 
    1397        4478 :     if (recvpos > last_recvpos)
    1398         414 :         last_recvpos = recvpos;
    1399        4478 :     if (writepos > last_writepos)
    1400         414 :         last_writepos = writepos;
    1401        4478 :     if (flushpos > last_flushpos)
    1402         274 :         last_flushpos = flushpos;
    1403             : }
    1404             : 
    1405             : /*
    1406             :  * Reread subscription info if needed. Most changes will be exit.
    1407             :  */
    1408             : static void
    1409        4876 : maybe_reread_subscription(void)
    1410             : {
    1411             :     MemoryContext oldctx;
    1412             :     Subscription *newsub;
    1413        4876 :     bool        started_tx = false;
    1414             : 
    1415             :     /* When cache state is valid there is nothing to do here. */
    1416        4876 :     if (MySubscriptionValid)
    1417        4862 :         return;
    1418             : 
    1419             :     /* This function might be called inside or outside of transaction. */
    1420          14 :     if (!IsTransactionState())
    1421             :     {
    1422          14 :         StartTransactionCommand();
    1423          14 :         started_tx = true;
    1424             :     }
    1425             : 
    1426             :     /* Ensure allocations in permanent context. */
    1427          14 :     oldctx = MemoryContextSwitchTo(ApplyContext);
    1428             : 
    1429          14 :     newsub = GetSubscription(MyLogicalRepWorker->subid, true);
    1430             : 
    1431             :     /*
    1432             :      * Exit if the subscription was removed. This normally should not happen
    1433             :      * as the worker gets killed during DROP SUBSCRIPTION.
    1434             :      */
    1435          14 :     if (!newsub)
    1436             :     {
    1437           0 :         ereport(LOG,
    1438             :                 (errmsg("logical replication apply worker for subscription \"%s\" will "
    1439             :                         "stop because the subscription was removed",
    1440             :                         MySubscription->name)));
    1441             : 
    1442           0 :         proc_exit(0);
    1443             :     }
    1444             : 
    1445             :     /*
    1446             :      * Exit if the subscription was disabled. This normally should not happen
    1447             :      * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
    1448             :      */
    1449          14 :     if (!newsub->enabled)
    1450             :     {
    1451           0 :         ereport(LOG,
    1452             :                 (errmsg("logical replication apply worker for subscription \"%s\" will "
    1453             :                         "stop because the subscription was disabled",
    1454             :                         MySubscription->name)));
    1455             : 
    1456           0 :         proc_exit(0);
    1457             :     }
    1458             : 
    1459             :     /*
    1460             :      * Exit if connection string was changed. The launcher will start new
    1461             :      * worker.
    1462             :      */
    1463          14 :     if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
    1464             :     {
    1465           2 :         ereport(LOG,
    1466             :                 (errmsg("logical replication apply worker for subscription \"%s\" will "
    1467             :                         "restart because the connection information was changed",
    1468             :                         MySubscription->name)));
    1469             : 
    1470           2 :         proc_exit(0);
    1471             :     }
    1472             : 
    1473             :     /*
    1474             :      * Exit if subscription name was changed (it's used for
    1475             :      * fallback_application_name). The launcher will start new worker.
    1476             :      */
    1477          12 :     if (strcmp(newsub->name, MySubscription->name) != 0)
    1478             :     {
    1479           2 :         ereport(LOG,
    1480             :                 (errmsg("logical replication apply worker for subscription \"%s\" will "
    1481             :                         "restart because subscription was renamed",
    1482             :                         MySubscription->name)));
    1483             : 
    1484           2 :         proc_exit(0);
    1485             :     }
    1486             : 
    1487             :     /* !slotname should never happen when enabled is true. */
    1488             :     Assert(newsub->slotname);
    1489             : 
    1490             :     /*
    1491             :      * We need to make new connection to new slot if slot name has changed so
    1492             :      * exit here as well if that's the case.
    1493             :      */
    1494          10 :     if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
    1495             :     {
    1496           0 :         ereport(LOG,
    1497             :                 (errmsg("logical replication apply worker for subscription \"%s\" will "
    1498             :                         "restart because the replication slot name was changed",
    1499             :                         MySubscription->name)));
    1500             : 
    1501           0 :         proc_exit(0);
    1502             :     }
    1503             : 
    1504             :     /*
    1505             :      * Exit if publication list was changed. The launcher will start new
    1506             :      * worker.
    1507             :      */
    1508          10 :     if (!equal(newsub->publications, MySubscription->publications))
    1509             :     {
    1510           2 :         ereport(LOG,
    1511             :                 (errmsg("logical replication apply worker for subscription \"%s\" will "
    1512             :                         "restart because subscription's publications were changed",
    1513             :                         MySubscription->name)));
    1514             : 
    1515           2 :         proc_exit(0);
    1516             :     }
    1517             : 
    1518             :     /* Check for other changes that should never happen too. */
    1519           8 :     if (newsub->dbid != MySubscription->dbid)
    1520             :     {
    1521           0 :         elog(ERROR, "subscription %u changed unexpectedly",
    1522             :              MyLogicalRepWorker->subid);
    1523             :     }
    1524             : 
    1525             :     /* Clean old subscription info and switch to new one. */
    1526           8 :     FreeSubscription(MySubscription);
    1527           8 :     MySubscription = newsub;
    1528             : 
    1529           8 :     MemoryContextSwitchTo(oldctx);
    1530             : 
    1531             :     /* Change synchronous commit according to the user's wishes */
    1532           8 :     SetConfigOption("synchronous_commit", MySubscription->synccommit,
    1533             :                     PGC_BACKEND, PGC_S_OVERRIDE);
    1534             : 
    1535           8 :     if (started_tx)
    1536           8 :         CommitTransactionCommand();
    1537             : 
    1538           8 :     MySubscriptionValid = true;
    1539             : }
    1540             : 
    1541             : /*
    1542             :  * Callback from subscription syscache invalidation.
    1543             :  */
    1544             : static void
    1545          16 : subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
    1546             : {
    1547          16 :     MySubscriptionValid = false;
    1548          16 : }
    1549             : 
    1550             : /* SIGHUP: set flag to reload configuration at next convenient time */
    1551             : static void
    1552           0 : logicalrep_worker_sighup(SIGNAL_ARGS)
    1553             : {
    1554           0 :     int         save_errno = errno;
    1555             : 
    1556           0 :     got_SIGHUP = true;
    1557             : 
    1558             :     /* Waken anything waiting on the process latch */
    1559           0 :     SetLatch(MyLatch);
    1560             : 
    1561           0 :     errno = save_errno;
    1562           0 : }
    1563             : 
    1564             : /* Logical Replication Apply worker entry point */
    1565             : void
    1566         122 : ApplyWorkerMain(Datum main_arg)
    1567             : {
    1568         122 :     int         worker_slot = DatumGetInt32(main_arg);
    1569             :     MemoryContext oldctx;
    1570             :     char        originname[NAMEDATALEN];
    1571             :     XLogRecPtr  origin_startpos;
    1572             :     char       *myslotname;
    1573             :     WalRcvStreamOptions options;
    1574             : 
    1575             :     /* Attach to slot */
    1576         122 :     logicalrep_worker_attach(worker_slot);
    1577             : 
    1578             :     /* Setup signal handling */
    1579         122 :     pqsignal(SIGHUP, logicalrep_worker_sighup);
    1580         122 :     pqsignal(SIGTERM, die);
    1581         122 :     BackgroundWorkerUnblockSignals();
    1582             : 
    1583             :     /*
    1584             :      * We don't currently need any ResourceOwner in a walreceiver process, but
    1585             :      * if we did, we could call CreateAuxProcessResourceOwner here.
    1586             :      */
    1587             : 
    1588             :     /* Initialise stats to a sanish value */
    1589         244 :     MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
    1590         122 :         MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
    1591             : 
    1592             :     /* Load the libpq-specific functions */
    1593         122 :     load_file("libpqwalreceiver", false);
    1594             : 
    1595             :     /* Run as replica session replication role. */
    1596         122 :     SetConfigOption("session_replication_role", "replica",
    1597             :                     PGC_SUSET, PGC_S_OVERRIDE);
    1598             : 
    1599             :     /* Connect to our database. */
    1600         122 :     BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
    1601         122 :                                               MyLogicalRepWorker->userid,
    1602             :                                               0);
    1603             : 
    1604             :     /* Load the subscription into persistent memory context. */
    1605         122 :     ApplyContext = AllocSetContextCreate(TopMemoryContext,
    1606             :                                          "ApplyContext",
    1607             :                                          ALLOCSET_DEFAULT_SIZES);
    1608         122 :     StartTransactionCommand();
    1609         122 :     oldctx = MemoryContextSwitchTo(ApplyContext);
    1610             : 
    1611         122 :     MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
    1612         122 :     if (!MySubscription)
    1613             :     {
    1614           0 :         ereport(LOG,
    1615             :                 (errmsg("logical replication apply worker for subscription %u will not "
    1616             :                         "start because the subscription was removed during startup",
    1617             :                         MyLogicalRepWorker->subid)));
    1618           0 :         proc_exit(0);
    1619             :     }
    1620             : 
    1621         122 :     MySubscriptionValid = true;
    1622         122 :     MemoryContextSwitchTo(oldctx);
    1623             : 
    1624         122 :     if (!MySubscription->enabled)
    1625             :     {
    1626           0 :         ereport(LOG,
    1627             :                 (errmsg("logical replication apply worker for subscription \"%s\" will not "
    1628             :                         "start because the subscription was disabled during startup",
    1629             :                         MySubscription->name)));
    1630             : 
    1631           0 :         proc_exit(0);
    1632             :     }
    1633             : 
    1634             :     /* Setup synchronous commit according to the user's wishes */
    1635         122 :     SetConfigOption("synchronous_commit", MySubscription->synccommit,
    1636             :                     PGC_BACKEND, PGC_S_OVERRIDE);
    1637             : 
    1638             :     /* Keep us informed about subscription changes. */
    1639         122 :     CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
    1640             :                                   subscription_change_cb,
    1641             :                                   (Datum) 0);
    1642             : 
    1643         122 :     if (am_tablesync_worker())
    1644          74 :         ereport(LOG,
    1645             :                 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
    1646             :                         MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
    1647             :     else
    1648          48 :         ereport(LOG,
    1649             :                 (errmsg("logical replication apply worker for subscription \"%s\" has started",
    1650             :                         MySubscription->name)));
    1651             : 
    1652         122 :     CommitTransactionCommand();
    1653             : 
    1654             :     /* Connect to the origin and start the replication. */
    1655         122 :     elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
    1656             :          MySubscription->conninfo);
    1657             : 
    1658         122 :     if (am_tablesync_worker())
    1659             :     {
    1660             :         char       *syncslotname;
    1661             : 
    1662             :         /* This is table synchronization worker, call initial sync. */
    1663          74 :         syncslotname = LogicalRepSyncTableStart(&origin_startpos);
    1664             : 
    1665             :         /* The slot name needs to be allocated in permanent memory context. */
    1666           0 :         oldctx = MemoryContextSwitchTo(ApplyContext);
    1667           0 :         myslotname = pstrdup(syncslotname);
    1668           0 :         MemoryContextSwitchTo(oldctx);
    1669             : 
    1670           0 :         pfree(syncslotname);
    1671             :     }
    1672             :     else
    1673             :     {
    1674             :         /* This is main apply worker */
    1675             :         RepOriginId originid;
    1676             :         TimeLineID  startpointTLI;
    1677             :         char       *err;
    1678             : 
    1679          48 :         myslotname = MySubscription->slotname;
    1680             : 
    1681             :         /*
    1682             :          * This shouldn't happen if the subscription is enabled, but guard
    1683             :          * against DDL bugs or manual catalog changes.  (libpqwalreceiver will
    1684             :          * crash if slot is NULL.)
    1685             :          */
    1686          48 :         if (!myslotname)
    1687           0 :             ereport(ERROR,
    1688             :                     (errmsg("subscription has no replication slot set")));
    1689             : 
    1690             :         /* Setup replication origin tracking. */
    1691          48 :         StartTransactionCommand();
    1692          48 :         snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
    1693          48 :         originid = replorigin_by_name(originname, true);
    1694          48 :         if (!OidIsValid(originid))
    1695           0 :             originid = replorigin_create(originname);
    1696          48 :         replorigin_session_setup(originid);
    1697          48 :         replorigin_session_origin = originid;
    1698          48 :         origin_startpos = replorigin_session_get_progress(false);
    1699          48 :         CommitTransactionCommand();
    1700             : 
    1701          48 :         wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
    1702             :                                 &err);
    1703          48 :         if (wrconn == NULL)
    1704           4 :             ereport(ERROR,
    1705             :                     (errmsg("could not connect to the publisher: %s", err)));
    1706             : 
    1707             :         /*
    1708             :          * We don't really use the output identify_system for anything but it
    1709             :          * does some initializations on the upstream so let's still call it.
    1710             :          */
    1711          44 :         (void) walrcv_identify_system(wrconn, &startpointTLI);
    1712             : 
    1713             :     }
    1714             : 
    1715             :     /*
    1716             :      * Setup callback for syscache so that we know when something changes in
    1717             :      * the subscription relation state.
    1718             :      */
    1719          44 :     CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
    1720             :                                   invalidate_syncing_table_states,
    1721             :                                   (Datum) 0);
    1722             : 
    1723             :     /* Build logical replication streaming options. */
    1724          44 :     options.logical = true;
    1725          44 :     options.startpoint = origin_startpos;
    1726          44 :     options.slotname = myslotname;
    1727          44 :     options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
    1728          44 :     options.proto.logical.publication_names = MySubscription->publications;
    1729             : 
    1730             :     /* Start normal logical streaming replication. */
    1731          44 :     walrcv_startstreaming(wrconn, &options);
    1732             : 
    1733             :     /* Run the main loop. */
    1734          44 :     LogicalRepApplyLoop(origin_startpos);
    1735             : 
    1736           0 :     proc_exit(0);
    1737             : }
    1738             : 
    1739             : /*
    1740             :  * Is current process a logical replication worker?
    1741             :  */
    1742             : bool
    1743         366 : IsLogicalWorker(void)
    1744             : {
    1745         366 :     return MyLogicalRepWorker != NULL;
    1746             : }

Generated by: LCOV version 1.13