Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * worker.c
3 : * PostgreSQL logical replication worker (apply)
4 : *
5 : * Copyright (c) 2016-2025, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/worker.c
9 : *
10 : * NOTES
11 : * This file contains the worker which applies logical changes as they come
12 : * from remote logical replication stream.
13 : *
14 : * The main worker (apply) is started by logical replication worker
15 : * launcher for every enabled subscription in a database. It uses
16 : * walsender protocol to communicate with publisher.
17 : *
18 : * This module includes server facing code and shares libpqwalreceiver
19 : * module with walreceiver for providing the libpq specific functionality.
20 : *
21 : *
22 : * STREAMED TRANSACTIONS
23 : * ---------------------
24 : * Streamed transactions (large transactions exceeding a memory limit on the
25 : * upstream) are applied using one of two approaches:
26 : *
27 : * 1) Write to temporary files and apply when the final commit arrives
28 : *
29 : * This approach is used when the user has set the subscription's streaming
30 : * option as on.
31 : *
32 : * Unlike the regular (non-streamed) case, handling streamed transactions has
33 : * to handle aborts of both the toplevel transaction and subtransactions. This
34 : * is achieved by tracking offsets for subtransactions, which is then used
35 : * to truncate the file with serialized changes.
36 : *
37 : * The files are placed in tmp file directory by default, and the filenames
38 : * include both the XID of the toplevel transaction and OID of the
39 : * subscription. This is necessary so that different workers processing a
40 : * remote transaction with the same XID doesn't interfere.
41 : *
42 : * We use BufFiles instead of using normal temporary files because (a) the
43 : * BufFile infrastructure supports temporary files that exceed the OS file size
44 : * limit, (b) provides a way for automatic clean up on the error and (c) provides
45 : * a way to survive these files across local transactions and allow to open and
46 : * close at stream start and close. We decided to use FileSet
47 : * infrastructure as without that it deletes the files on the closure of the
48 : * file and if we decide to keep stream files open across the start/stop stream
49 : * then it will consume a lot of memory (more than 8K for each BufFile and
50 : * there could be multiple such BufFiles as the subscriber could receive
51 : * multiple start/stop streams for different transactions before getting the
52 : * commit). Moreover, if we don't use FileSet then we also need to invent
53 : * a new way to pass filenames to BufFile APIs so that we are allowed to open
54 : * the file we desired across multiple stream-open calls for the same
55 : * transaction.
56 : *
57 : * 2) Parallel apply workers.
58 : *
59 : * This approach is used when the user has set the subscription's streaming
60 : * option as parallel. See logical/applyparallelworker.c for information about
61 : * this approach.
62 : *
63 : * TWO_PHASE TRANSACTIONS
64 : * ----------------------
65 : * Two phase transactions are replayed at prepare and then committed or
66 : * rolled back at commit prepared and rollback prepared respectively. It is
67 : * possible to have a prepared transaction that arrives at the apply worker
68 : * when the tablesync is busy doing the initial copy. In this case, the apply
69 : * worker skips all the prepared operations [e.g. inserts] while the tablesync
70 : * is still busy (see the condition of should_apply_changes_for_rel). The
71 : * tablesync worker might not get such a prepared transaction because say it
72 : * was prior to the initial consistent point but might have got some later
73 : * commits. Now, the tablesync worker will exit without doing anything for the
74 : * prepared transaction skipped by the apply worker as the sync location for it
75 : * will be already ahead of the apply worker's current location. This would lead
76 : * to an "empty prepare", because later when the apply worker does the commit
77 : * prepare, there is nothing in it (the inserts were skipped earlier).
78 : *
79 : * To avoid this, and similar prepare confusions the subscription's two_phase
80 : * commit is enabled only after the initial sync is over. The two_phase option
81 : * has been implemented as a tri-state with values DISABLED, PENDING, and
82 : * ENABLED.
83 : *
84 : * Even if the user specifies they want a subscription with two_phase = on,
85 : * internally it will start with a tri-state of PENDING which only becomes
86 : * ENABLED after all tablesync initializations are completed - i.e. when all
87 : * tablesync workers have reached their READY state. In other words, the value
88 : * PENDING is only a temporary state for subscription start-up.
89 : *
90 : * Until the two_phase is properly available (ENABLED) the subscription will
91 : * behave as if two_phase = off. When the apply worker detects that all
92 : * tablesyncs have become READY (while the tri-state was PENDING) it will
93 : * restart the apply worker process. This happens in
94 : * process_syncing_tables_for_apply.
95 : *
96 : * When the (re-started) apply worker finds that all tablesyncs are READY for a
97 : * two_phase tri-state of PENDING it start streaming messages with the
98 : * two_phase option which in turn enables the decoding of two-phase commits at
99 : * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
100 : * Now, it is possible that during the time we have not enabled two_phase, the
101 : * publisher (replication server) would have skipped some prepares but we
102 : * ensure that such prepares are sent along with commit prepare, see
103 : * ReorderBufferFinishPrepared.
104 : *
105 : * If the subscription has no tables then a two_phase tri-state PENDING is
106 : * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
107 : * PUBLICATION which might otherwise be disallowed (see below).
108 : *
109 : * If ever a user needs to be aware of the tri-state value, they can fetch it
110 : * from the pg_subscription catalog (see column subtwophasestate).
111 : *
112 : * We don't allow to toggle two_phase option of a subscription because it can
113 : * lead to an inconsistent replica. Consider, initially, it was on and we have
114 : * received some prepare then we turn it off, now at commit time the server
115 : * will send the entire transaction data along with the commit. With some more
116 : * analysis, we can allow changing this option from off to on but not sure if
117 : * that alone would be useful.
118 : *
119 : * Finally, to avoid problems mentioned in previous paragraphs from any
120 : * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
121 : * to 'off' and then again back to 'on') there is a restriction for
122 : * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
123 : * the two_phase tri-state is ENABLED, except when copy_data = false.
124 : *
125 : * We can get prepare of the same GID more than once for the genuine cases
126 : * where we have defined multiple subscriptions for publications on the same
127 : * server and prepared transaction has operations on tables subscribed to those
128 : * subscriptions. For such cases, if we use the GID sent by publisher one of
129 : * the prepares will be successful and others will fail, in which case the
130 : * server will send them again. Now, this can lead to a deadlock if user has
131 : * set synchronous_standby_names for all the subscriptions on subscriber. To
132 : * avoid such deadlocks, we generate a unique GID (consisting of the
133 : * subscription oid and the xid of the prepared transaction) for each prepare
134 : * transaction on the subscriber.
135 : *
136 : * FAILOVER
137 : * ----------------------
138 : * The logical slot on the primary can be synced to the standby by specifying
139 : * failover = true when creating the subscription. Enabling failover allows us
140 : * to smoothly transition to the promoted standby, ensuring that we can
141 : * subscribe to the new primary without losing any data.
142 : *-------------------------------------------------------------------------
143 : */
144 :
145 : #include "postgres.h"
146 :
147 : #include <sys/stat.h>
148 : #include <unistd.h>
149 :
150 : #include "access/table.h"
151 : #include "access/tableam.h"
152 : #include "access/twophase.h"
153 : #include "access/xact.h"
154 : #include "catalog/indexing.h"
155 : #include "catalog/pg_inherits.h"
156 : #include "catalog/pg_subscription.h"
157 : #include "catalog/pg_subscription_rel.h"
158 : #include "commands/tablecmds.h"
159 : #include "commands/trigger.h"
160 : #include "executor/executor.h"
161 : #include "executor/execPartition.h"
162 : #include "libpq/pqformat.h"
163 : #include "miscadmin.h"
164 : #include "optimizer/optimizer.h"
165 : #include "parser/parse_relation.h"
166 : #include "pgstat.h"
167 : #include "postmaster/bgworker.h"
168 : #include "postmaster/interrupt.h"
169 : #include "postmaster/walwriter.h"
170 : #include "replication/conflict.h"
171 : #include "replication/logicallauncher.h"
172 : #include "replication/logicalproto.h"
173 : #include "replication/logicalrelation.h"
174 : #include "replication/logicalworker.h"
175 : #include "replication/origin.h"
176 : #include "replication/walreceiver.h"
177 : #include "replication/worker_internal.h"
178 : #include "rewrite/rewriteHandler.h"
179 : #include "storage/buffile.h"
180 : #include "storage/ipc.h"
181 : #include "storage/lmgr.h"
182 : #include "tcop/tcopprot.h"
183 : #include "utils/acl.h"
184 : #include "utils/dynahash.h"
185 : #include "utils/guc.h"
186 : #include "utils/inval.h"
187 : #include "utils/lsyscache.h"
188 : #include "utils/memutils.h"
189 : #include "utils/pg_lsn.h"
190 : #include "utils/rel.h"
191 : #include "utils/rls.h"
192 : #include "utils/snapmgr.h"
193 : #include "utils/syscache.h"
194 : #include "utils/usercontext.h"
195 :
196 : #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
197 :
198 : typedef struct FlushPosition
199 : {
200 : dlist_node node;
201 : XLogRecPtr local_end;
202 : XLogRecPtr remote_end;
203 : } FlushPosition;
204 :
205 : static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
206 :
207 : typedef struct ApplyExecutionData
208 : {
209 : EState *estate; /* executor state, used to track resources */
210 :
211 : LogicalRepRelMapEntry *targetRel; /* replication target rel */
212 : ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
213 :
214 : /* These fields are used when the target relation is partitioned: */
215 : ModifyTableState *mtstate; /* dummy ModifyTable state */
216 : PartitionTupleRouting *proute; /* partition routing info */
217 : } ApplyExecutionData;
218 :
219 : /* Struct for saving and restoring apply errcontext information */
220 : typedef struct ApplyErrorCallbackArg
221 : {
222 : LogicalRepMsgType command; /* 0 if invalid */
223 : LogicalRepRelMapEntry *rel;
224 :
225 : /* Remote node information */
226 : int remote_attnum; /* -1 if invalid */
227 : TransactionId remote_xid;
228 : XLogRecPtr finish_lsn;
229 : char *origin_name;
230 : } ApplyErrorCallbackArg;
231 :
232 : /*
233 : * The action to be taken for the changes in the transaction.
234 : *
235 : * TRANS_LEADER_APPLY:
236 : * This action means that we are in the leader apply worker or table sync
237 : * worker. The changes of the transaction are either directly applied or
238 : * are read from temporary files (for streaming transactions) and then
239 : * applied by the worker.
240 : *
241 : * TRANS_LEADER_SERIALIZE:
242 : * This action means that we are in the leader apply worker or table sync
243 : * worker. Changes are written to temporary files and then applied when the
244 : * final commit arrives.
245 : *
246 : * TRANS_LEADER_SEND_TO_PARALLEL:
247 : * This action means that we are in the leader apply worker and need to send
248 : * the changes to the parallel apply worker.
249 : *
250 : * TRANS_LEADER_PARTIAL_SERIALIZE:
251 : * This action means that we are in the leader apply worker and have sent some
252 : * changes directly to the parallel apply worker and the remaining changes are
253 : * serialized to a file, due to timeout while sending data. The parallel apply
254 : * worker will apply these serialized changes when the final commit arrives.
255 : *
256 : * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
257 : * serializing changes, the leader worker also needs to serialize the
258 : * STREAM_XXX message to a file, and wait for the parallel apply worker to
259 : * finish the transaction when processing the transaction finish command. So
260 : * this new action was introduced to keep the code and logic clear.
261 : *
262 : * TRANS_PARALLEL_APPLY:
263 : * This action means that we are in the parallel apply worker and changes of
264 : * the transaction are applied directly by the worker.
265 : */
266 : typedef enum
267 : {
268 : /* The action for non-streaming transactions. */
269 : TRANS_LEADER_APPLY,
270 :
271 : /* Actions for streaming transactions. */
272 : TRANS_LEADER_SERIALIZE,
273 : TRANS_LEADER_SEND_TO_PARALLEL,
274 : TRANS_LEADER_PARTIAL_SERIALIZE,
275 : TRANS_PARALLEL_APPLY,
276 : } TransApplyAction;
277 :
278 : /* errcontext tracker */
279 : static ApplyErrorCallbackArg apply_error_callback_arg =
280 : {
281 : .command = 0,
282 : .rel = NULL,
283 : .remote_attnum = -1,
284 : .remote_xid = InvalidTransactionId,
285 : .finish_lsn = InvalidXLogRecPtr,
286 : .origin_name = NULL,
287 : };
288 :
289 : ErrorContextCallback *apply_error_context_stack = NULL;
290 :
291 : MemoryContext ApplyMessageContext = NULL;
292 : MemoryContext ApplyContext = NULL;
293 :
294 : /* per stream context for streaming transactions */
295 : static MemoryContext LogicalStreamingContext = NULL;
296 :
297 : WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
298 :
299 : Subscription *MySubscription = NULL;
300 : static bool MySubscriptionValid = false;
301 :
302 : static List *on_commit_wakeup_workers_subids = NIL;
303 :
304 : bool in_remote_transaction = false;
305 : static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
306 :
307 : /* fields valid only when processing streamed transaction */
308 : static bool in_streamed_transaction = false;
309 :
310 : static TransactionId stream_xid = InvalidTransactionId;
311 :
312 : /*
313 : * The number of changes applied by parallel apply worker during one streaming
314 : * block.
315 : */
316 : static uint32 parallel_stream_nchanges = 0;
317 :
318 : /* Are we initializing an apply worker? */
319 : bool InitializingApplyWorker = false;
320 :
321 : /*
322 : * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
323 : * the subscription if the remote transaction's finish LSN matches the subskiplsn.
324 : * Once we start skipping changes, we don't stop it until we skip all changes of
325 : * the transaction even if pg_subscription is updated and MySubscription->skiplsn
326 : * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
327 : * we don't skip receiving and spooling the changes since we decide whether or not
328 : * to skip applying the changes when starting to apply changes. The subskiplsn is
329 : * cleared after successfully skipping the transaction or applying non-empty
330 : * transaction. The latter prevents the mistakenly specified subskiplsn from
331 : * being left. Note that we cannot skip the streaming transactions when using
332 : * parallel apply workers because we cannot get the finish LSN before applying
333 : * the changes. So, we don't start parallel apply worker when finish LSN is set
334 : * by the user.
335 : */
336 : static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
337 : #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
338 :
339 : /* BufFile handle of the current streaming file */
340 : static BufFile *stream_fd = NULL;
341 :
342 : typedef struct SubXactInfo
343 : {
344 : TransactionId xid; /* XID of the subxact */
345 : int fileno; /* file number in the buffile */
346 : off_t offset; /* offset in the file */
347 : } SubXactInfo;
348 :
349 : /* Sub-transaction data for the current streaming transaction */
350 : typedef struct ApplySubXactData
351 : {
352 : uint32 nsubxacts; /* number of sub-transactions */
353 : uint32 nsubxacts_max; /* current capacity of subxacts */
354 : TransactionId subxact_last; /* xid of the last sub-transaction */
355 : SubXactInfo *subxacts; /* sub-xact offset in changes file */
356 : } ApplySubXactData;
357 :
358 : static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
359 :
360 : static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
361 : static inline void changes_filename(char *path, Oid subid, TransactionId xid);
362 :
363 : /*
364 : * Information about subtransactions of a given toplevel transaction.
365 : */
366 : static void subxact_info_write(Oid subid, TransactionId xid);
367 : static void subxact_info_read(Oid subid, TransactionId xid);
368 : static void subxact_info_add(TransactionId xid);
369 : static inline void cleanup_subxact_info(void);
370 :
371 : /*
372 : * Serialize and deserialize changes for a toplevel transaction.
373 : */
374 : static void stream_open_file(Oid subid, TransactionId xid,
375 : bool first_segment);
376 : static void stream_write_change(char action, StringInfo s);
377 : static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
378 : static void stream_close_file(void);
379 :
380 : static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
381 :
382 : static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
383 : static void apply_handle_insert_internal(ApplyExecutionData *edata,
384 : ResultRelInfo *relinfo,
385 : TupleTableSlot *remoteslot);
386 : static void apply_handle_update_internal(ApplyExecutionData *edata,
387 : ResultRelInfo *relinfo,
388 : TupleTableSlot *remoteslot,
389 : LogicalRepTupleData *newtup,
390 : Oid localindexoid);
391 : static void apply_handle_delete_internal(ApplyExecutionData *edata,
392 : ResultRelInfo *relinfo,
393 : TupleTableSlot *remoteslot,
394 : Oid localindexoid);
395 : static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
396 : LogicalRepRelation *remoterel,
397 : Oid localidxoid,
398 : TupleTableSlot *remoteslot,
399 : TupleTableSlot **localslot);
400 : static void apply_handle_tuple_routing(ApplyExecutionData *edata,
401 : TupleTableSlot *remoteslot,
402 : LogicalRepTupleData *newtup,
403 : CmdType operation);
404 :
405 : /* Functions for skipping changes */
406 : static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
407 : static void stop_skipping_changes(void);
408 : static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
409 :
410 : /* Functions for apply error callback */
411 : static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
412 : static inline void reset_apply_error_context_info(void);
413 :
414 : static TransApplyAction get_transaction_apply_action(TransactionId xid,
415 : ParallelApplyWorkerInfo **winfo);
416 :
417 : /*
418 : * Form the origin name for the subscription.
419 : *
420 : * This is a common function for tablesync and other workers. Tablesync workers
421 : * must pass a valid relid. Other callers must pass relid = InvalidOid.
422 : *
423 : * Return the name in the supplied buffer.
424 : */
425 : void
426 2462 : ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
427 : char *originname, Size szoriginname)
428 : {
429 2462 : if (OidIsValid(relid))
430 : {
431 : /* Replication origin name for tablesync workers. */
432 1474 : snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
433 : }
434 : else
435 : {
436 : /* Replication origin name for non-tablesync workers. */
437 988 : snprintf(originname, szoriginname, "pg_%u", suboid);
438 : }
439 2462 : }
440 :
441 : /*
442 : * Should this worker apply changes for given relation.
443 : *
444 : * This is mainly needed for initial relation data sync as that runs in
445 : * separate worker process running in parallel and we need some way to skip
446 : * changes coming to the leader apply worker during the sync of a table.
447 : *
448 : * Note we need to do smaller or equals comparison for SYNCDONE state because
449 : * it might hold position of end of initial slot consistent point WAL
450 : * record + 1 (ie start of next record) and next record can be COMMIT of
451 : * transaction we are now processing (which is what we set remote_final_lsn
452 : * to in apply_handle_begin).
453 : *
454 : * Note that for streaming transactions that are being applied in the parallel
455 : * apply worker, we disallow applying changes if the target table in the
456 : * subscription is not in the READY state, because we cannot decide whether to
457 : * apply the change as we won't know remote_final_lsn by that time.
458 : *
459 : * We already checked this in pa_can_start() before assigning the
460 : * streaming transaction to the parallel worker, but it also needs to be
461 : * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
462 : * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
463 : * while applying this transaction.
464 : */
465 : static bool
466 297178 : should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
467 : {
468 297178 : switch (MyLogicalRepWorker->type)
469 : {
470 0 : case WORKERTYPE_TABLESYNC:
471 0 : return MyLogicalRepWorker->relid == rel->localreloid;
472 :
473 137714 : case WORKERTYPE_PARALLEL_APPLY:
474 : /* We don't synchronize rel's that are in unknown state. */
475 137714 : if (rel->state != SUBREL_STATE_READY &&
476 0 : rel->state != SUBREL_STATE_UNKNOWN)
477 0 : ereport(ERROR,
478 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
479 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
480 : MySubscription->name),
481 : errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
482 :
483 137714 : return rel->state == SUBREL_STATE_READY;
484 :
485 159464 : case WORKERTYPE_APPLY:
486 159566 : return (rel->state == SUBREL_STATE_READY ||
487 102 : (rel->state == SUBREL_STATE_SYNCDONE &&
488 0 : rel->statelsn <= remote_final_lsn));
489 :
490 0 : case WORKERTYPE_UNKNOWN:
491 : /* Should never happen. */
492 0 : elog(ERROR, "Unknown worker type");
493 : }
494 :
495 0 : return false; /* dummy for compiler */
496 : }
497 :
498 : /*
499 : * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
500 : *
501 : * Start a transaction, if this is the first step (else we keep using the
502 : * existing transaction).
503 : * Also provide a global snapshot and ensure we run in ApplyMessageContext.
504 : */
505 : static void
506 298084 : begin_replication_step(void)
507 : {
508 298084 : SetCurrentStatementStartTimestamp();
509 :
510 298084 : if (!IsTransactionState())
511 : {
512 1852 : StartTransactionCommand();
513 1852 : maybe_reread_subscription();
514 : }
515 :
516 298080 : PushActiveSnapshot(GetTransactionSnapshot());
517 :
518 298080 : MemoryContextSwitchTo(ApplyMessageContext);
519 298080 : }
520 :
521 : /*
522 : * Finish up one step of a replication transaction.
523 : * Callers of begin_replication_step() must also call this.
524 : *
525 : * We don't close out the transaction here, but we should increment
526 : * the command counter to make the effects of this step visible.
527 : */
528 : static void
529 298024 : end_replication_step(void)
530 : {
531 298024 : PopActiveSnapshot();
532 :
533 298024 : CommandCounterIncrement();
534 298024 : }
535 :
536 : /*
537 : * Handle streamed transactions for both the leader apply worker and the
538 : * parallel apply workers.
539 : *
540 : * In the streaming case (receiving a block of the streamed transaction), for
541 : * serialize mode, simply redirect it to a file for the proper toplevel
542 : * transaction, and for parallel mode, the leader apply worker will send the
543 : * changes to parallel apply workers and the parallel apply worker will define
544 : * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
545 : * messages will be applied by both leader apply worker and parallel apply
546 : * workers).
547 : *
548 : * Returns true for streamed transactions (when the change is either serialized
549 : * to file or sent to parallel apply worker), false otherwise (regular mode or
550 : * needs to be processed by parallel apply worker).
551 : *
552 : * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
553 : * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
554 : * to a parallel apply worker.
555 : */
556 : static bool
557 649758 : handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
558 : {
559 : TransactionId current_xid;
560 : ParallelApplyWorkerInfo *winfo;
561 : TransApplyAction apply_action;
562 : StringInfoData original_msg;
563 :
564 649758 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
565 :
566 : /* not in streaming mode */
567 649758 : if (apply_action == TRANS_LEADER_APPLY)
568 160180 : return false;
569 :
570 : Assert(TransactionIdIsValid(stream_xid));
571 :
572 : /*
573 : * The parallel apply worker needs the xid in this message to decide
574 : * whether to define a savepoint, so save the original message that has
575 : * not moved the cursor after the xid. We will serialize this message to a
576 : * file in PARTIAL_SERIALIZE mode.
577 : */
578 489578 : original_msg = *s;
579 :
580 : /*
581 : * We should have received XID of the subxact as the first part of the
582 : * message, so extract it.
583 : */
584 489578 : current_xid = pq_getmsgint(s, 4);
585 :
586 489578 : if (!TransactionIdIsValid(current_xid))
587 0 : ereport(ERROR,
588 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
589 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
590 :
591 489578 : switch (apply_action)
592 : {
593 205024 : case TRANS_LEADER_SERIALIZE:
594 : Assert(stream_fd);
595 :
596 : /* Add the new subxact to the array (unless already there). */
597 205024 : subxact_info_add(current_xid);
598 :
599 : /* Write the change to the current file */
600 205024 : stream_write_change(action, s);
601 205024 : return true;
602 :
603 136772 : case TRANS_LEADER_SEND_TO_PARALLEL:
604 : Assert(winfo);
605 :
606 : /*
607 : * XXX The publisher side doesn't always send relation/type update
608 : * messages after the streaming transaction, so also update the
609 : * relation/type in leader apply worker. See function
610 : * cleanup_rel_sync_cache.
611 : */
612 136772 : if (pa_send_data(winfo, s->len, s->data))
613 136772 : return (action != LOGICAL_REP_MSG_RELATION &&
614 : action != LOGICAL_REP_MSG_TYPE);
615 :
616 : /*
617 : * Switch to serialize mode when we are not able to send the
618 : * change to parallel apply worker.
619 : */
620 0 : pa_switch_to_partial_serialize(winfo, false);
621 :
622 : /* fall through */
623 10012 : case TRANS_LEADER_PARTIAL_SERIALIZE:
624 10012 : stream_write_change(action, &original_msg);
625 :
626 : /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
627 10012 : return (action != LOGICAL_REP_MSG_RELATION &&
628 : action != LOGICAL_REP_MSG_TYPE);
629 :
630 137770 : case TRANS_PARALLEL_APPLY:
631 137770 : parallel_stream_nchanges += 1;
632 :
633 : /* Define a savepoint for a subxact if needed. */
634 137770 : pa_start_subtrans(current_xid, stream_xid);
635 137770 : return false;
636 :
637 0 : default:
638 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
639 : return false; /* silence compiler warning */
640 : }
641 : }
642 :
643 : /*
644 : * Executor state preparation for evaluation of constraint expressions,
645 : * indexes and triggers for the specified relation.
646 : *
647 : * Note that the caller must open and close any indexes to be updated.
648 : */
649 : static ApplyExecutionData *
650 297018 : create_edata_for_relation(LogicalRepRelMapEntry *rel)
651 : {
652 : ApplyExecutionData *edata;
653 : EState *estate;
654 : RangeTblEntry *rte;
655 297018 : List *perminfos = NIL;
656 : ResultRelInfo *resultRelInfo;
657 :
658 297018 : edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
659 297018 : edata->targetRel = rel;
660 :
661 297018 : edata->estate = estate = CreateExecutorState();
662 :
663 297018 : rte = makeNode(RangeTblEntry);
664 297018 : rte->rtekind = RTE_RELATION;
665 297018 : rte->relid = RelationGetRelid(rel->localrel);
666 297018 : rte->relkind = rel->localrel->rd_rel->relkind;
667 297018 : rte->rellockmode = AccessShareLock;
668 :
669 297018 : addRTEPermissionInfo(&perminfos, rte);
670 :
671 297018 : ExecInitRangeTable(estate, list_make1(rte), perminfos,
672 : bms_make_singleton(1));
673 :
674 297018 : edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
675 :
676 : /*
677 : * Use Relation opened by logicalrep_rel_open() instead of opening it
678 : * again.
679 : */
680 297018 : InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
681 :
682 : /*
683 : * We put the ResultRelInfo in the es_opened_result_relations list, even
684 : * though we don't populate the es_result_relations array. That's a bit
685 : * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
686 : *
687 : * ExecOpenIndices() is not called here either, each execution path doing
688 : * an apply operation being responsible for that.
689 : */
690 297018 : estate->es_opened_result_relations =
691 297018 : lappend(estate->es_opened_result_relations, resultRelInfo);
692 :
693 297018 : estate->es_output_cid = GetCurrentCommandId(true);
694 :
695 : /* Prepare to catch AFTER triggers. */
696 297018 : AfterTriggerBeginQuery();
697 :
698 : /* other fields of edata remain NULL for now */
699 :
700 297018 : return edata;
701 : }
702 :
703 : /*
704 : * Finish any operations related to the executor state created by
705 : * create_edata_for_relation().
706 : */
707 : static void
708 296976 : finish_edata(ApplyExecutionData *edata)
709 : {
710 296976 : EState *estate = edata->estate;
711 :
712 : /* Handle any queued AFTER triggers. */
713 296976 : AfterTriggerEndQuery(estate);
714 :
715 : /* Shut down tuple routing, if any was done. */
716 296976 : if (edata->proute)
717 148 : ExecCleanupTupleRouting(edata->mtstate, edata->proute);
718 :
719 : /*
720 : * Cleanup. It might seem that we should call ExecCloseResultRelations()
721 : * here, but we intentionally don't. It would close the rel we added to
722 : * es_opened_result_relations above, which is wrong because we took no
723 : * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
724 : * any other relations opened during execution.
725 : */
726 296976 : ExecResetTupleTable(estate->es_tupleTable, false);
727 296976 : FreeExecutorState(estate);
728 296976 : pfree(edata);
729 296976 : }
730 :
731 : /*
732 : * Executes default values for columns for which we can't map to remote
733 : * relation columns.
734 : *
735 : * This allows us to support tables which have more columns on the downstream
736 : * than on the upstream.
737 : */
738 : static void
739 152514 : slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
740 : TupleTableSlot *slot)
741 : {
742 152514 : TupleDesc desc = RelationGetDescr(rel->localrel);
743 152514 : int num_phys_attrs = desc->natts;
744 : int i;
745 : int attnum,
746 152514 : num_defaults = 0;
747 : int *defmap;
748 : ExprState **defexprs;
749 : ExprContext *econtext;
750 :
751 152514 : econtext = GetPerTupleExprContext(estate);
752 :
753 : /* We got all the data via replication, no need to evaluate anything. */
754 152514 : if (num_phys_attrs == rel->remoterel.natts)
755 72224 : return;
756 :
757 80290 : defmap = (int *) palloc(num_phys_attrs * sizeof(int));
758 80290 : defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
759 :
760 : Assert(rel->attrmap->maplen == num_phys_attrs);
761 421326 : for (attnum = 0; attnum < num_phys_attrs; attnum++)
762 : {
763 : Expr *defexpr;
764 :
765 341036 : if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
766 18 : continue;
767 :
768 341018 : if (rel->attrmap->attnums[attnum] >= 0)
769 184536 : continue;
770 :
771 156482 : defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
772 :
773 156482 : if (defexpr != NULL)
774 : {
775 : /* Run the expression through planner */
776 140262 : defexpr = expression_planner(defexpr);
777 :
778 : /* Initialize executable expression in copycontext */
779 140262 : defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
780 140262 : defmap[num_defaults] = attnum;
781 140262 : num_defaults++;
782 : }
783 : }
784 :
785 220552 : for (i = 0; i < num_defaults; i++)
786 140262 : slot->tts_values[defmap[i]] =
787 140262 : ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
788 : }
789 :
790 : /*
791 : * Store tuple data into slot.
792 : *
793 : * Incoming data can be either text or binary format.
794 : */
795 : static void
796 297036 : slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
797 : LogicalRepTupleData *tupleData)
798 : {
799 297036 : int natts = slot->tts_tupleDescriptor->natts;
800 : int i;
801 :
802 297036 : ExecClearTuple(slot);
803 :
804 : /* Call the "in" function for each non-dropped, non-null attribute */
805 : Assert(natts == rel->attrmap->maplen);
806 1317010 : for (i = 0; i < natts; i++)
807 : {
808 1019974 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
809 1019974 : int remoteattnum = rel->attrmap->attnums[i];
810 :
811 1019974 : if (!att->attisdropped && remoteattnum >= 0)
812 606140 : {
813 606140 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
814 :
815 : Assert(remoteattnum < tupleData->ncols);
816 :
817 : /* Set attnum for error callback */
818 606140 : apply_error_callback_arg.remote_attnum = remoteattnum;
819 :
820 606140 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
821 : {
822 : Oid typinput;
823 : Oid typioparam;
824 :
825 284542 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
826 569084 : slot->tts_values[i] =
827 284542 : OidInputFunctionCall(typinput, colvalue->data,
828 : typioparam, att->atttypmod);
829 284542 : slot->tts_isnull[i] = false;
830 : }
831 321598 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
832 : {
833 : Oid typreceive;
834 : Oid typioparam;
835 :
836 : /*
837 : * In some code paths we may be asked to re-parse the same
838 : * tuple data. Reset the StringInfo's cursor so that works.
839 : */
840 220944 : colvalue->cursor = 0;
841 :
842 220944 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
843 441888 : slot->tts_values[i] =
844 220944 : OidReceiveFunctionCall(typreceive, colvalue,
845 : typioparam, att->atttypmod);
846 :
847 : /* Trouble if it didn't eat the whole buffer */
848 220944 : if (colvalue->cursor != colvalue->len)
849 0 : ereport(ERROR,
850 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
851 : errmsg("incorrect binary data format in logical replication column %d",
852 : remoteattnum + 1)));
853 220944 : slot->tts_isnull[i] = false;
854 : }
855 : else
856 : {
857 : /*
858 : * NULL value from remote. (We don't expect to see
859 : * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
860 : * NULL.)
861 : */
862 100654 : slot->tts_values[i] = (Datum) 0;
863 100654 : slot->tts_isnull[i] = true;
864 : }
865 :
866 : /* Reset attnum for error callback */
867 606140 : apply_error_callback_arg.remote_attnum = -1;
868 : }
869 : else
870 : {
871 : /*
872 : * We assign NULL to dropped attributes and missing values
873 : * (missing values should be later filled using
874 : * slot_fill_defaults).
875 : */
876 413834 : slot->tts_values[i] = (Datum) 0;
877 413834 : slot->tts_isnull[i] = true;
878 : }
879 : }
880 :
881 297036 : ExecStoreVirtualTuple(slot);
882 297036 : }
883 :
884 : /*
885 : * Replace updated columns with data from the LogicalRepTupleData struct.
886 : * This is somewhat similar to heap_modify_tuple but also calls the type
887 : * input functions on the user data.
888 : *
889 : * "slot" is filled with a copy of the tuple in "srcslot", replacing
890 : * columns provided in "tupleData" and leaving others as-is.
891 : *
892 : * Caution: unreplaced pass-by-ref columns in "slot" will point into the
893 : * storage for "srcslot". This is OK for current usage, but someday we may
894 : * need to materialize "slot" at the end to make it independent of "srcslot".
895 : */
896 : static void
897 63848 : slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
898 : LogicalRepRelMapEntry *rel,
899 : LogicalRepTupleData *tupleData)
900 : {
901 63848 : int natts = slot->tts_tupleDescriptor->natts;
902 : int i;
903 :
904 : /* We'll fill "slot" with a virtual tuple, so we must start with ... */
905 63848 : ExecClearTuple(slot);
906 :
907 : /*
908 : * Copy all the column data from srcslot, so that we'll have valid values
909 : * for unreplaced columns.
910 : */
911 : Assert(natts == srcslot->tts_tupleDescriptor->natts);
912 63848 : slot_getallattrs(srcslot);
913 63848 : memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
914 63848 : memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
915 :
916 : /* Call the "in" function for each replaced attribute */
917 : Assert(natts == rel->attrmap->maplen);
918 318560 : for (i = 0; i < natts; i++)
919 : {
920 254712 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
921 254712 : int remoteattnum = rel->attrmap->attnums[i];
922 :
923 254712 : if (remoteattnum < 0)
924 117038 : continue;
925 :
926 : Assert(remoteattnum < tupleData->ncols);
927 :
928 137674 : if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
929 : {
930 137668 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
931 :
932 : /* Set attnum for error callback */
933 137668 : apply_error_callback_arg.remote_attnum = remoteattnum;
934 :
935 137668 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
936 : {
937 : Oid typinput;
938 : Oid typioparam;
939 :
940 50860 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
941 101720 : slot->tts_values[i] =
942 50860 : OidInputFunctionCall(typinput, colvalue->data,
943 : typioparam, att->atttypmod);
944 50860 : slot->tts_isnull[i] = false;
945 : }
946 86808 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
947 : {
948 : Oid typreceive;
949 : Oid typioparam;
950 :
951 : /*
952 : * In some code paths we may be asked to re-parse the same
953 : * tuple data. Reset the StringInfo's cursor so that works.
954 : */
955 86712 : colvalue->cursor = 0;
956 :
957 86712 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
958 173424 : slot->tts_values[i] =
959 86712 : OidReceiveFunctionCall(typreceive, colvalue,
960 : typioparam, att->atttypmod);
961 :
962 : /* Trouble if it didn't eat the whole buffer */
963 86712 : if (colvalue->cursor != colvalue->len)
964 0 : ereport(ERROR,
965 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
966 : errmsg("incorrect binary data format in logical replication column %d",
967 : remoteattnum + 1)));
968 86712 : slot->tts_isnull[i] = false;
969 : }
970 : else
971 : {
972 : /* must be LOGICALREP_COLUMN_NULL */
973 96 : slot->tts_values[i] = (Datum) 0;
974 96 : slot->tts_isnull[i] = true;
975 : }
976 :
977 : /* Reset attnum for error callback */
978 137668 : apply_error_callback_arg.remote_attnum = -1;
979 : }
980 : }
981 :
982 : /* And finally, declare that "slot" contains a valid virtual tuple */
983 63848 : ExecStoreVirtualTuple(slot);
984 63848 : }
985 :
986 : /*
987 : * Handle BEGIN message.
988 : */
989 : static void
990 904 : apply_handle_begin(StringInfo s)
991 : {
992 : LogicalRepBeginData begin_data;
993 :
994 : /* There must not be an active streaming transaction. */
995 : Assert(!TransactionIdIsValid(stream_xid));
996 :
997 904 : logicalrep_read_begin(s, &begin_data);
998 904 : set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
999 :
1000 904 : remote_final_lsn = begin_data.final_lsn;
1001 :
1002 904 : maybe_start_skipping_changes(begin_data.final_lsn);
1003 :
1004 904 : in_remote_transaction = true;
1005 :
1006 904 : pgstat_report_activity(STATE_RUNNING, NULL);
1007 904 : }
1008 :
1009 : /*
1010 : * Handle COMMIT message.
1011 : *
1012 : * TODO, support tracking of multiple origins
1013 : */
1014 : static void
1015 848 : apply_handle_commit(StringInfo s)
1016 : {
1017 : LogicalRepCommitData commit_data;
1018 :
1019 848 : logicalrep_read_commit(s, &commit_data);
1020 :
1021 848 : if (commit_data.commit_lsn != remote_final_lsn)
1022 0 : ereport(ERROR,
1023 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1024 : errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
1025 : LSN_FORMAT_ARGS(commit_data.commit_lsn),
1026 : LSN_FORMAT_ARGS(remote_final_lsn))));
1027 :
1028 848 : apply_handle_commit_internal(&commit_data);
1029 :
1030 : /* Process any tables that are being synchronized in parallel. */
1031 848 : process_syncing_tables(commit_data.end_lsn);
1032 :
1033 848 : pgstat_report_activity(STATE_IDLE, NULL);
1034 848 : reset_apply_error_context_info();
1035 848 : }
1036 :
1037 : /*
1038 : * Handle BEGIN PREPARE message.
1039 : */
1040 : static void
1041 32 : apply_handle_begin_prepare(StringInfo s)
1042 : {
1043 : LogicalRepPreparedTxnData begin_data;
1044 :
1045 : /* Tablesync should never receive prepare. */
1046 32 : if (am_tablesync_worker())
1047 0 : ereport(ERROR,
1048 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1049 : errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1050 :
1051 : /* There must not be an active streaming transaction. */
1052 : Assert(!TransactionIdIsValid(stream_xid));
1053 :
1054 32 : logicalrep_read_begin_prepare(s, &begin_data);
1055 32 : set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1056 :
1057 32 : remote_final_lsn = begin_data.prepare_lsn;
1058 :
1059 32 : maybe_start_skipping_changes(begin_data.prepare_lsn);
1060 :
1061 32 : in_remote_transaction = true;
1062 :
1063 32 : pgstat_report_activity(STATE_RUNNING, NULL);
1064 32 : }
1065 :
1066 : /*
1067 : * Common function to prepare the GID.
1068 : */
1069 : static void
1070 46 : apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
1071 : {
1072 : char gid[GIDSIZE];
1073 :
1074 : /*
1075 : * Compute unique GID for two_phase transactions. We don't use GID of
1076 : * prepared transaction sent by server as that can lead to deadlock when
1077 : * we have multiple subscriptions from same node point to publications on
1078 : * the same node. See comments atop worker.c
1079 : */
1080 46 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
1081 : gid, sizeof(gid));
1082 :
1083 : /*
1084 : * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1085 : * called within the PrepareTransactionBlock below.
1086 : */
1087 46 : if (!IsTransactionBlock())
1088 : {
1089 46 : BeginTransactionBlock();
1090 46 : CommitTransactionCommand(); /* Completes the preceding Begin command. */
1091 : }
1092 :
1093 : /*
1094 : * Update origin state so we can restart streaming from correct position
1095 : * in case of crash.
1096 : */
1097 46 : replorigin_session_origin_lsn = prepare_data->end_lsn;
1098 46 : replorigin_session_origin_timestamp = prepare_data->prepare_time;
1099 :
1100 46 : PrepareTransactionBlock(gid);
1101 46 : }
1102 :
1103 : /*
1104 : * Handle PREPARE message.
1105 : */
1106 : static void
1107 30 : apply_handle_prepare(StringInfo s)
1108 : {
1109 : LogicalRepPreparedTxnData prepare_data;
1110 :
1111 30 : logicalrep_read_prepare(s, &prepare_data);
1112 :
1113 30 : if (prepare_data.prepare_lsn != remote_final_lsn)
1114 0 : ereport(ERROR,
1115 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1116 : errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
1117 : LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1118 : LSN_FORMAT_ARGS(remote_final_lsn))));
1119 :
1120 : /*
1121 : * Unlike commit, here, we always prepare the transaction even though no
1122 : * change has happened in this transaction or all changes are skipped. It
1123 : * is done this way because at commit prepared time, we won't know whether
1124 : * we have skipped preparing a transaction because of those reasons.
1125 : *
1126 : * XXX, We can optimize such that at commit prepared time, we first check
1127 : * whether we have prepared the transaction or not but that doesn't seem
1128 : * worthwhile because such cases shouldn't be common.
1129 : */
1130 30 : begin_replication_step();
1131 :
1132 30 : apply_handle_prepare_internal(&prepare_data);
1133 :
1134 30 : end_replication_step();
1135 30 : CommitTransactionCommand();
1136 28 : pgstat_report_stat(false);
1137 :
1138 : /*
1139 : * It is okay not to set the local_end LSN for the prepare because we
1140 : * always flush the prepare record. So, we can send the acknowledgment of
1141 : * the remote_end LSN as soon as prepare is finished.
1142 : *
1143 : * XXX For the sake of consistency with commit, we could have set it with
1144 : * the LSN of prepare but as of now we don't track that value similar to
1145 : * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
1146 : * it.
1147 : */
1148 28 : store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
1149 :
1150 28 : in_remote_transaction = false;
1151 :
1152 : /* Process any tables that are being synchronized in parallel. */
1153 28 : process_syncing_tables(prepare_data.end_lsn);
1154 :
1155 : /*
1156 : * Since we have already prepared the transaction, in a case where the
1157 : * server crashes before clearing the subskiplsn, it will be left but the
1158 : * transaction won't be resent. But that's okay because it's a rare case
1159 : * and the subskiplsn will be cleared when finishing the next transaction.
1160 : */
1161 28 : stop_skipping_changes();
1162 28 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1163 :
1164 28 : pgstat_report_activity(STATE_IDLE, NULL);
1165 28 : reset_apply_error_context_info();
1166 28 : }
1167 :
1168 : /*
1169 : * Handle a COMMIT PREPARED of a previously PREPARED transaction.
1170 : *
1171 : * Note that we don't need to wait here if the transaction was prepared in a
1172 : * parallel apply worker. In that case, we have already waited for the prepare
1173 : * to finish in apply_handle_stream_prepare() which will ensure all the
1174 : * operations in that transaction have happened in the subscriber, so no
1175 : * concurrent transaction can cause deadlock or transaction dependency issues.
1176 : */
1177 : static void
1178 40 : apply_handle_commit_prepared(StringInfo s)
1179 : {
1180 : LogicalRepCommitPreparedTxnData prepare_data;
1181 : char gid[GIDSIZE];
1182 :
1183 40 : logicalrep_read_commit_prepared(s, &prepare_data);
1184 40 : set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1185 :
1186 : /* Compute GID for two_phase transactions. */
1187 40 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
1188 : gid, sizeof(gid));
1189 :
1190 : /* There is no transaction when COMMIT PREPARED is called */
1191 40 : begin_replication_step();
1192 :
1193 : /*
1194 : * Update origin state so we can restart streaming from correct position
1195 : * in case of crash.
1196 : */
1197 40 : replorigin_session_origin_lsn = prepare_data.end_lsn;
1198 40 : replorigin_session_origin_timestamp = prepare_data.commit_time;
1199 :
1200 40 : FinishPreparedTransaction(gid, true);
1201 40 : end_replication_step();
1202 40 : CommitTransactionCommand();
1203 40 : pgstat_report_stat(false);
1204 :
1205 40 : store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
1206 40 : in_remote_transaction = false;
1207 :
1208 : /* Process any tables that are being synchronized in parallel. */
1209 40 : process_syncing_tables(prepare_data.end_lsn);
1210 :
1211 40 : clear_subscription_skip_lsn(prepare_data.end_lsn);
1212 :
1213 40 : pgstat_report_activity(STATE_IDLE, NULL);
1214 40 : reset_apply_error_context_info();
1215 40 : }
1216 :
1217 : /*
1218 : * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1219 : *
1220 : * Note that we don't need to wait here if the transaction was prepared in a
1221 : * parallel apply worker. In that case, we have already waited for the prepare
1222 : * to finish in apply_handle_stream_prepare() which will ensure all the
1223 : * operations in that transaction have happened in the subscriber, so no
1224 : * concurrent transaction can cause deadlock or transaction dependency issues.
1225 : */
1226 : static void
1227 10 : apply_handle_rollback_prepared(StringInfo s)
1228 : {
1229 : LogicalRepRollbackPreparedTxnData rollback_data;
1230 : char gid[GIDSIZE];
1231 :
1232 10 : logicalrep_read_rollback_prepared(s, &rollback_data);
1233 10 : set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1234 :
1235 : /* Compute GID for two_phase transactions. */
1236 10 : TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
1237 : gid, sizeof(gid));
1238 :
1239 : /*
1240 : * It is possible that we haven't received prepare because it occurred
1241 : * before walsender reached a consistent point or the two_phase was still
1242 : * not enabled by that time, so in such cases, we need to skip rollback
1243 : * prepared.
1244 : */
1245 10 : if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1246 : rollback_data.prepare_time))
1247 : {
1248 : /*
1249 : * Update origin state so we can restart streaming from correct
1250 : * position in case of crash.
1251 : */
1252 10 : replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
1253 10 : replorigin_session_origin_timestamp = rollback_data.rollback_time;
1254 :
1255 : /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1256 10 : begin_replication_step();
1257 10 : FinishPreparedTransaction(gid, false);
1258 10 : end_replication_step();
1259 10 : CommitTransactionCommand();
1260 :
1261 10 : clear_subscription_skip_lsn(rollback_data.rollback_end_lsn);
1262 : }
1263 :
1264 10 : pgstat_report_stat(false);
1265 :
1266 : /*
1267 : * It is okay not to set the local_end LSN for the rollback of prepared
1268 : * transaction because we always flush the WAL record for it. See
1269 : * apply_handle_prepare.
1270 : */
1271 10 : store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr);
1272 10 : in_remote_transaction = false;
1273 :
1274 : /* Process any tables that are being synchronized in parallel. */
1275 10 : process_syncing_tables(rollback_data.rollback_end_lsn);
1276 :
1277 10 : pgstat_report_activity(STATE_IDLE, NULL);
1278 10 : reset_apply_error_context_info();
1279 10 : }
1280 :
1281 : /*
1282 : * Handle STREAM PREPARE.
1283 : */
1284 : static void
1285 22 : apply_handle_stream_prepare(StringInfo s)
1286 : {
1287 : LogicalRepPreparedTxnData prepare_data;
1288 : ParallelApplyWorkerInfo *winfo;
1289 : TransApplyAction apply_action;
1290 :
1291 : /* Save the message before it is consumed. */
1292 22 : StringInfoData original_msg = *s;
1293 :
1294 22 : if (in_streamed_transaction)
1295 0 : ereport(ERROR,
1296 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1297 : errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1298 :
1299 : /* Tablesync should never receive prepare. */
1300 22 : if (am_tablesync_worker())
1301 0 : ereport(ERROR,
1302 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1303 : errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1304 :
1305 22 : logicalrep_read_stream_prepare(s, &prepare_data);
1306 22 : set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1307 :
1308 22 : apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1309 :
1310 22 : switch (apply_action)
1311 : {
1312 10 : case TRANS_LEADER_APPLY:
1313 :
1314 : /*
1315 : * The transaction has been serialized to file, so replay all the
1316 : * spooled operations.
1317 : */
1318 10 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
1319 : prepare_data.xid, prepare_data.prepare_lsn);
1320 :
1321 : /* Mark the transaction as prepared. */
1322 10 : apply_handle_prepare_internal(&prepare_data);
1323 :
1324 10 : CommitTransactionCommand();
1325 :
1326 : /*
1327 : * It is okay not to set the local_end LSN for the prepare because
1328 : * we always flush the prepare record. See apply_handle_prepare.
1329 : */
1330 10 : store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
1331 :
1332 10 : in_remote_transaction = false;
1333 :
1334 : /* Unlink the files with serialized changes and subxact info. */
1335 10 : stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
1336 :
1337 10 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1338 10 : break;
1339 :
1340 4 : case TRANS_LEADER_SEND_TO_PARALLEL:
1341 : Assert(winfo);
1342 :
1343 4 : if (pa_send_data(winfo, s->len, s->data))
1344 : {
1345 : /* Finish processing the streaming transaction. */
1346 4 : pa_xact_finish(winfo, prepare_data.end_lsn);
1347 4 : break;
1348 : }
1349 :
1350 : /*
1351 : * Switch to serialize mode when we are not able to send the
1352 : * change to parallel apply worker.
1353 : */
1354 0 : pa_switch_to_partial_serialize(winfo, true);
1355 :
1356 : /* fall through */
1357 2 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1358 : Assert(winfo);
1359 :
1360 2 : stream_open_and_write_change(prepare_data.xid,
1361 : LOGICAL_REP_MSG_STREAM_PREPARE,
1362 : &original_msg);
1363 :
1364 2 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
1365 :
1366 : /* Finish processing the streaming transaction. */
1367 2 : pa_xact_finish(winfo, prepare_data.end_lsn);
1368 2 : break;
1369 :
1370 6 : case TRANS_PARALLEL_APPLY:
1371 :
1372 : /*
1373 : * If the parallel apply worker is applying spooled messages then
1374 : * close the file before preparing.
1375 : */
1376 6 : if (stream_fd)
1377 2 : stream_close_file();
1378 :
1379 6 : begin_replication_step();
1380 :
1381 : /* Mark the transaction as prepared. */
1382 6 : apply_handle_prepare_internal(&prepare_data);
1383 :
1384 6 : end_replication_step();
1385 :
1386 6 : CommitTransactionCommand();
1387 :
1388 : /*
1389 : * It is okay not to set the local_end LSN for the prepare because
1390 : * we always flush the prepare record. See apply_handle_prepare.
1391 : */
1392 6 : MyParallelShared->last_commit_end = InvalidXLogRecPtr;
1393 :
1394 6 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
1395 6 : pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1396 :
1397 6 : pa_reset_subtrans();
1398 :
1399 6 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1400 6 : break;
1401 :
1402 0 : default:
1403 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1404 : break;
1405 : }
1406 :
1407 22 : pgstat_report_stat(false);
1408 :
1409 : /* Process any tables that are being synchronized in parallel. */
1410 22 : process_syncing_tables(prepare_data.end_lsn);
1411 :
1412 : /*
1413 : * Similar to prepare case, the subskiplsn could be left in a case of
1414 : * server crash but it's okay. See the comments in apply_handle_prepare().
1415 : */
1416 22 : stop_skipping_changes();
1417 22 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1418 :
1419 22 : pgstat_report_activity(STATE_IDLE, NULL);
1420 :
1421 22 : reset_apply_error_context_info();
1422 22 : }
1423 :
1424 : /*
1425 : * Handle ORIGIN message.
1426 : *
1427 : * TODO, support tracking of multiple origins
1428 : */
1429 : static void
1430 16 : apply_handle_origin(StringInfo s)
1431 : {
1432 : /*
1433 : * ORIGIN message can only come inside streaming transaction or inside
1434 : * remote transaction and before any actual writes.
1435 : */
1436 16 : if (!in_streamed_transaction &&
1437 24 : (!in_remote_transaction ||
1438 12 : (IsTransactionState() && !am_tablesync_worker())))
1439 0 : ereport(ERROR,
1440 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1441 : errmsg_internal("ORIGIN message sent out of order")));
1442 16 : }
1443 :
1444 : /*
1445 : * Initialize fileset (if not already done).
1446 : *
1447 : * Create a new file when first_segment is true, otherwise open the existing
1448 : * file.
1449 : */
1450 : void
1451 726 : stream_start_internal(TransactionId xid, bool first_segment)
1452 : {
1453 726 : begin_replication_step();
1454 :
1455 : /*
1456 : * Initialize the worker's stream_fileset if we haven't yet. This will be
1457 : * used for the entire duration of the worker so create it in a permanent
1458 : * context. We create this on the very first streaming message from any
1459 : * transaction and then use it for this and other streaming transactions.
1460 : * Now, we could create a fileset at the start of the worker as well but
1461 : * then we won't be sure that it will ever be used.
1462 : */
1463 726 : if (!MyLogicalRepWorker->stream_fileset)
1464 : {
1465 : MemoryContext oldctx;
1466 :
1467 28 : oldctx = MemoryContextSwitchTo(ApplyContext);
1468 :
1469 28 : MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
1470 28 : FileSetInit(MyLogicalRepWorker->stream_fileset);
1471 :
1472 28 : MemoryContextSwitchTo(oldctx);
1473 : }
1474 :
1475 : /* Open the spool file for this transaction. */
1476 726 : stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1477 :
1478 : /* If this is not the first segment, open existing subxact file. */
1479 726 : if (!first_segment)
1480 662 : subxact_info_read(MyLogicalRepWorker->subid, xid);
1481 :
1482 726 : end_replication_step();
1483 726 : }
1484 :
1485 : /*
1486 : * Handle STREAM START message.
1487 : */
1488 : static void
1489 1712 : apply_handle_stream_start(StringInfo s)
1490 : {
1491 : bool first_segment;
1492 : ParallelApplyWorkerInfo *winfo;
1493 : TransApplyAction apply_action;
1494 :
1495 : /* Save the message before it is consumed. */
1496 1712 : StringInfoData original_msg = *s;
1497 :
1498 1712 : if (in_streamed_transaction)
1499 0 : ereport(ERROR,
1500 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1501 : errmsg_internal("duplicate STREAM START message")));
1502 :
1503 : /* There must not be an active streaming transaction. */
1504 : Assert(!TransactionIdIsValid(stream_xid));
1505 :
1506 : /* notify handle methods we're processing a remote transaction */
1507 1712 : in_streamed_transaction = true;
1508 :
1509 : /* extract XID of the top-level transaction */
1510 1712 : stream_xid = logicalrep_read_stream_start(s, &first_segment);
1511 :
1512 1712 : if (!TransactionIdIsValid(stream_xid))
1513 0 : ereport(ERROR,
1514 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1515 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
1516 :
1517 1712 : set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
1518 :
1519 : /* Try to allocate a worker for the streaming transaction. */
1520 1712 : if (first_segment)
1521 164 : pa_allocate_worker(stream_xid);
1522 :
1523 1712 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1524 :
1525 1712 : switch (apply_action)
1526 : {
1527 686 : case TRANS_LEADER_SERIALIZE:
1528 :
1529 : /*
1530 : * Function stream_start_internal starts a transaction. This
1531 : * transaction will be committed on the stream stop unless it is a
1532 : * tablesync worker in which case it will be committed after
1533 : * processing all the messages. We need this transaction for
1534 : * handling the BufFile, used for serializing the streaming data
1535 : * and subxact info.
1536 : */
1537 686 : stream_start_internal(stream_xid, first_segment);
1538 686 : break;
1539 :
1540 500 : case TRANS_LEADER_SEND_TO_PARALLEL:
1541 : Assert(winfo);
1542 :
1543 : /*
1544 : * Once we start serializing the changes, the parallel apply
1545 : * worker will wait for the leader to release the stream lock
1546 : * until the end of the transaction. So, we don't need to release
1547 : * the lock or increment the stream count in that case.
1548 : */
1549 500 : if (pa_send_data(winfo, s->len, s->data))
1550 : {
1551 : /*
1552 : * Unlock the shared object lock so that the parallel apply
1553 : * worker can continue to receive changes.
1554 : */
1555 492 : if (!first_segment)
1556 446 : pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
1557 :
1558 : /*
1559 : * Increment the number of streaming blocks waiting to be
1560 : * processed by parallel apply worker.
1561 : */
1562 492 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
1563 :
1564 : /* Cache the parallel apply worker for this transaction. */
1565 492 : pa_set_stream_apply_worker(winfo);
1566 492 : break;
1567 : }
1568 :
1569 : /*
1570 : * Switch to serialize mode when we are not able to send the
1571 : * change to parallel apply worker.
1572 : */
1573 8 : pa_switch_to_partial_serialize(winfo, !first_segment);
1574 :
1575 : /* fall through */
1576 30 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1577 : Assert(winfo);
1578 :
1579 : /*
1580 : * Open the spool file unless it was already opened when switching
1581 : * to serialize mode. The transaction started in
1582 : * stream_start_internal will be committed on the stream stop.
1583 : */
1584 30 : if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1585 22 : stream_start_internal(stream_xid, first_segment);
1586 :
1587 30 : stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg);
1588 :
1589 : /* Cache the parallel apply worker for this transaction. */
1590 30 : pa_set_stream_apply_worker(winfo);
1591 30 : break;
1592 :
1593 504 : case TRANS_PARALLEL_APPLY:
1594 504 : if (first_segment)
1595 : {
1596 : /* Hold the lock until the end of the transaction. */
1597 54 : pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1598 54 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED);
1599 :
1600 : /*
1601 : * Signal the leader apply worker, as it may be waiting for
1602 : * us.
1603 : */
1604 54 : logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
1605 : }
1606 :
1607 504 : parallel_stream_nchanges = 0;
1608 504 : break;
1609 :
1610 0 : default:
1611 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1612 : break;
1613 : }
1614 :
1615 1712 : pgstat_report_activity(STATE_RUNNING, NULL);
1616 1712 : }
1617 :
1618 : /*
1619 : * Update the information about subxacts and close the file.
1620 : *
1621 : * This function should be called when the stream_start_internal function has
1622 : * been called.
1623 : */
1624 : void
1625 726 : stream_stop_internal(TransactionId xid)
1626 : {
1627 : /*
1628 : * Serialize information about subxacts for the toplevel transaction, then
1629 : * close the stream messages spool file.
1630 : */
1631 726 : subxact_info_write(MyLogicalRepWorker->subid, xid);
1632 726 : stream_close_file();
1633 :
1634 : /* We must be in a valid transaction state */
1635 : Assert(IsTransactionState());
1636 :
1637 : /* Commit the per-stream transaction */
1638 726 : CommitTransactionCommand();
1639 :
1640 : /* Reset per-stream context */
1641 726 : MemoryContextReset(LogicalStreamingContext);
1642 726 : }
1643 :
1644 : /*
1645 : * Handle STREAM STOP message.
1646 : */
1647 : static void
1648 1710 : apply_handle_stream_stop(StringInfo s)
1649 : {
1650 : ParallelApplyWorkerInfo *winfo;
1651 : TransApplyAction apply_action;
1652 :
1653 1710 : if (!in_streamed_transaction)
1654 0 : ereport(ERROR,
1655 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1656 : errmsg_internal("STREAM STOP message without STREAM START")));
1657 :
1658 1710 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1659 :
1660 1710 : switch (apply_action)
1661 : {
1662 686 : case TRANS_LEADER_SERIALIZE:
1663 686 : stream_stop_internal(stream_xid);
1664 686 : break;
1665 :
1666 492 : case TRANS_LEADER_SEND_TO_PARALLEL:
1667 : Assert(winfo);
1668 :
1669 : /*
1670 : * Lock before sending the STREAM_STOP message so that the leader
1671 : * can hold the lock first and the parallel apply worker will wait
1672 : * for leader to release the lock. See Locking Considerations atop
1673 : * applyparallelworker.c.
1674 : */
1675 492 : pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
1676 :
1677 492 : if (pa_send_data(winfo, s->len, s->data))
1678 : {
1679 492 : pa_set_stream_apply_worker(NULL);
1680 492 : break;
1681 : }
1682 :
1683 : /*
1684 : * Switch to serialize mode when we are not able to send the
1685 : * change to parallel apply worker.
1686 : */
1687 0 : pa_switch_to_partial_serialize(winfo, true);
1688 :
1689 : /* fall through */
1690 30 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1691 30 : stream_write_change(LOGICAL_REP_MSG_STREAM_STOP, s);
1692 30 : stream_stop_internal(stream_xid);
1693 30 : pa_set_stream_apply_worker(NULL);
1694 30 : break;
1695 :
1696 502 : case TRANS_PARALLEL_APPLY:
1697 502 : elog(DEBUG1, "applied %u changes in the streaming chunk",
1698 : parallel_stream_nchanges);
1699 :
1700 : /*
1701 : * By the time parallel apply worker is processing the changes in
1702 : * the current streaming block, the leader apply worker may have
1703 : * sent multiple streaming blocks. This can lead to parallel apply
1704 : * worker start waiting even when there are more chunk of streams
1705 : * in the queue. So, try to lock only if there is no message left
1706 : * in the queue. See Locking Considerations atop
1707 : * applyparallelworker.c.
1708 : *
1709 : * Note that here we have a race condition where we can start
1710 : * waiting even when there are pending streaming chunks. This can
1711 : * happen if the leader sends another streaming block and acquires
1712 : * the stream lock again after the parallel apply worker checks
1713 : * that there is no pending streaming block and before it actually
1714 : * starts waiting on a lock. We can handle this case by not
1715 : * allowing the leader to increment the stream block count during
1716 : * the time parallel apply worker acquires the lock but it is not
1717 : * clear whether that is worth the complexity.
1718 : *
1719 : * Now, if this missed chunk contains rollback to savepoint, then
1720 : * there is a risk of deadlock which probably shouldn't happen
1721 : * after restart.
1722 : */
1723 502 : pa_decr_and_wait_stream_block();
1724 500 : break;
1725 :
1726 0 : default:
1727 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1728 : break;
1729 : }
1730 :
1731 1708 : in_streamed_transaction = false;
1732 1708 : stream_xid = InvalidTransactionId;
1733 :
1734 : /*
1735 : * The parallel apply worker could be in a transaction in which case we
1736 : * need to report the state as STATE_IDLEINTRANSACTION.
1737 : */
1738 1708 : if (IsTransactionOrTransactionBlock())
1739 500 : pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL);
1740 : else
1741 1208 : pgstat_report_activity(STATE_IDLE, NULL);
1742 :
1743 1708 : reset_apply_error_context_info();
1744 1708 : }
1745 :
1746 : /*
1747 : * Helper function to handle STREAM ABORT message when the transaction was
1748 : * serialized to file.
1749 : */
1750 : static void
1751 28 : stream_abort_internal(TransactionId xid, TransactionId subxid)
1752 : {
1753 : /*
1754 : * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1755 : * just delete the files with serialized info.
1756 : */
1757 28 : if (xid == subxid)
1758 2 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
1759 : else
1760 : {
1761 : /*
1762 : * OK, so it's a subxact. We need to read the subxact file for the
1763 : * toplevel transaction, determine the offset tracked for the subxact,
1764 : * and truncate the file with changes. We also remove the subxacts
1765 : * with higher offsets (or rather higher XIDs).
1766 : *
1767 : * We intentionally scan the array from the tail, because we're likely
1768 : * aborting a change for the most recent subtransactions.
1769 : *
1770 : * We can't use the binary search here as subxact XIDs won't
1771 : * necessarily arrive in sorted order, consider the case where we have
1772 : * released the savepoint for multiple subtransactions and then
1773 : * performed rollback to savepoint for one of the earlier
1774 : * sub-transaction.
1775 : */
1776 : int64 i;
1777 : int64 subidx;
1778 : BufFile *fd;
1779 26 : bool found = false;
1780 : char path[MAXPGPATH];
1781 :
1782 26 : subidx = -1;
1783 26 : begin_replication_step();
1784 26 : subxact_info_read(MyLogicalRepWorker->subid, xid);
1785 :
1786 30 : for (i = subxact_data.nsubxacts; i > 0; i--)
1787 : {
1788 22 : if (subxact_data.subxacts[i - 1].xid == subxid)
1789 : {
1790 18 : subidx = (i - 1);
1791 18 : found = true;
1792 18 : break;
1793 : }
1794 : }
1795 :
1796 : /*
1797 : * If it's an empty sub-transaction then we will not find the subxid
1798 : * here so just cleanup the subxact info and return.
1799 : */
1800 26 : if (!found)
1801 : {
1802 : /* Cleanup the subxact info */
1803 8 : cleanup_subxact_info();
1804 8 : end_replication_step();
1805 8 : CommitTransactionCommand();
1806 8 : return;
1807 : }
1808 :
1809 : /* open the changes file */
1810 18 : changes_filename(path, MyLogicalRepWorker->subid, xid);
1811 18 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
1812 : O_RDWR, false);
1813 :
1814 : /* OK, truncate the file at the right offset */
1815 18 : BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
1816 18 : subxact_data.subxacts[subidx].offset);
1817 18 : BufFileClose(fd);
1818 :
1819 : /* discard the subxacts added later */
1820 18 : subxact_data.nsubxacts = subidx;
1821 :
1822 : /* write the updated subxact list */
1823 18 : subxact_info_write(MyLogicalRepWorker->subid, xid);
1824 :
1825 18 : end_replication_step();
1826 18 : CommitTransactionCommand();
1827 : }
1828 : }
1829 :
1830 : /*
1831 : * Handle STREAM ABORT message.
1832 : */
1833 : static void
1834 76 : apply_handle_stream_abort(StringInfo s)
1835 : {
1836 : TransactionId xid;
1837 : TransactionId subxid;
1838 : LogicalRepStreamAbortData abort_data;
1839 : ParallelApplyWorkerInfo *winfo;
1840 : TransApplyAction apply_action;
1841 :
1842 : /* Save the message before it is consumed. */
1843 76 : StringInfoData original_msg = *s;
1844 : bool toplevel_xact;
1845 :
1846 76 : if (in_streamed_transaction)
1847 0 : ereport(ERROR,
1848 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1849 : errmsg_internal("STREAM ABORT message without STREAM STOP")));
1850 :
1851 : /* We receive abort information only when we can apply in parallel. */
1852 76 : logicalrep_read_stream_abort(s, &abort_data,
1853 76 : MyLogicalRepWorker->parallel_apply);
1854 :
1855 76 : xid = abort_data.xid;
1856 76 : subxid = abort_data.subxid;
1857 76 : toplevel_xact = (xid == subxid);
1858 :
1859 76 : set_apply_error_context_xact(subxid, abort_data.abort_lsn);
1860 :
1861 76 : apply_action = get_transaction_apply_action(xid, &winfo);
1862 :
1863 76 : switch (apply_action)
1864 : {
1865 28 : case TRANS_LEADER_APPLY:
1866 :
1867 : /*
1868 : * We are in the leader apply worker and the transaction has been
1869 : * serialized to file.
1870 : */
1871 28 : stream_abort_internal(xid, subxid);
1872 :
1873 28 : elog(DEBUG1, "finished processing the STREAM ABORT command");
1874 28 : break;
1875 :
1876 20 : case TRANS_LEADER_SEND_TO_PARALLEL:
1877 : Assert(winfo);
1878 :
1879 : /*
1880 : * For the case of aborting the subtransaction, we increment the
1881 : * number of streaming blocks and take the lock again before
1882 : * sending the STREAM_ABORT to ensure that the parallel apply
1883 : * worker will wait on the lock for the next set of changes after
1884 : * processing the STREAM_ABORT message if it is not already
1885 : * waiting for STREAM_STOP message.
1886 : *
1887 : * It is important to perform this locking before sending the
1888 : * STREAM_ABORT message so that the leader can hold the lock first
1889 : * and the parallel apply worker will wait for the leader to
1890 : * release the lock. This is the same as what we do in
1891 : * apply_handle_stream_stop. See Locking Considerations atop
1892 : * applyparallelworker.c.
1893 : */
1894 20 : if (!toplevel_xact)
1895 : {
1896 18 : pa_unlock_stream(xid, AccessExclusiveLock);
1897 18 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
1898 18 : pa_lock_stream(xid, AccessExclusiveLock);
1899 : }
1900 :
1901 20 : if (pa_send_data(winfo, s->len, s->data))
1902 : {
1903 : /*
1904 : * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
1905 : * wait here for the parallel apply worker to finish as that
1906 : * is not required to maintain the commit order and won't have
1907 : * the risk of failures due to transaction dependencies and
1908 : * deadlocks. However, it is possible that before the parallel
1909 : * worker finishes and we clear the worker info, the xid
1910 : * wraparound happens on the upstream and a new transaction
1911 : * with the same xid can appear and that can lead to duplicate
1912 : * entries in ParallelApplyTxnHash. Yet another problem could
1913 : * be that we may have serialized the changes in partial
1914 : * serialize mode and the file containing xact changes may
1915 : * already exist, and after xid wraparound trying to create
1916 : * the file for the same xid can lead to an error. To avoid
1917 : * these problems, we decide to wait for the aborts to finish.
1918 : *
1919 : * Note, it is okay to not update the flush location position
1920 : * for aborts as in worst case that means such a transaction
1921 : * won't be sent again after restart.
1922 : */
1923 20 : if (toplevel_xact)
1924 2 : pa_xact_finish(winfo, InvalidXLogRecPtr);
1925 :
1926 20 : break;
1927 : }
1928 :
1929 : /*
1930 : * Switch to serialize mode when we are not able to send the
1931 : * change to parallel apply worker.
1932 : */
1933 0 : pa_switch_to_partial_serialize(winfo, true);
1934 :
1935 : /* fall through */
1936 4 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1937 : Assert(winfo);
1938 :
1939 : /*
1940 : * Parallel apply worker might have applied some changes, so write
1941 : * the STREAM_ABORT message so that it can rollback the
1942 : * subtransaction if needed.
1943 : */
1944 4 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_ABORT,
1945 : &original_msg);
1946 :
1947 4 : if (toplevel_xact)
1948 : {
1949 2 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
1950 2 : pa_xact_finish(winfo, InvalidXLogRecPtr);
1951 : }
1952 4 : break;
1953 :
1954 24 : case TRANS_PARALLEL_APPLY:
1955 :
1956 : /*
1957 : * If the parallel apply worker is applying spooled messages then
1958 : * close the file before aborting.
1959 : */
1960 24 : if (toplevel_xact && stream_fd)
1961 2 : stream_close_file();
1962 :
1963 24 : pa_stream_abort(&abort_data);
1964 :
1965 : /*
1966 : * We need to wait after processing rollback to savepoint for the
1967 : * next set of changes.
1968 : *
1969 : * We have a race condition here due to which we can start waiting
1970 : * here when there are more chunk of streams in the queue. See
1971 : * apply_handle_stream_stop.
1972 : */
1973 24 : if (!toplevel_xact)
1974 20 : pa_decr_and_wait_stream_block();
1975 :
1976 24 : elog(DEBUG1, "finished processing the STREAM ABORT command");
1977 24 : break;
1978 :
1979 0 : default:
1980 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1981 : break;
1982 : }
1983 :
1984 76 : reset_apply_error_context_info();
1985 76 : }
1986 :
1987 : /*
1988 : * Ensure that the passed location is fileset's end.
1989 : */
1990 : static void
1991 8 : ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
1992 : off_t offset)
1993 : {
1994 : char path[MAXPGPATH];
1995 : BufFile *fd;
1996 : int last_fileno;
1997 : off_t last_offset;
1998 :
1999 : Assert(!IsTransactionState());
2000 :
2001 8 : begin_replication_step();
2002 :
2003 8 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2004 :
2005 8 : fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2006 :
2007 8 : BufFileSeek(fd, 0, 0, SEEK_END);
2008 8 : BufFileTell(fd, &last_fileno, &last_offset);
2009 :
2010 8 : BufFileClose(fd);
2011 :
2012 8 : end_replication_step();
2013 :
2014 8 : if (last_fileno != fileno || last_offset != offset)
2015 0 : elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2016 : path);
2017 8 : }
2018 :
2019 : /*
2020 : * Common spoolfile processing.
2021 : */
2022 : void
2023 62 : apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
2024 : XLogRecPtr lsn)
2025 : {
2026 : int nchanges;
2027 : char path[MAXPGPATH];
2028 62 : char *buffer = NULL;
2029 : MemoryContext oldcxt;
2030 : ResourceOwner oldowner;
2031 : int fileno;
2032 : off_t offset;
2033 :
2034 62 : if (!am_parallel_apply_worker())
2035 54 : maybe_start_skipping_changes(lsn);
2036 :
2037 : /* Make sure we have an open transaction */
2038 62 : begin_replication_step();
2039 :
2040 : /*
2041 : * Allocate file handle and memory required to process all the messages in
2042 : * TopTransactionContext to avoid them getting reset after each message is
2043 : * processed.
2044 : */
2045 62 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
2046 :
2047 : /* Open the spool file for the committed/prepared transaction */
2048 62 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2049 62 : elog(DEBUG1, "replaying changes from file \"%s\"", path);
2050 :
2051 : /*
2052 : * Make sure the file is owned by the toplevel transaction so that the
2053 : * file will not be accidentally closed when aborting a subtransaction.
2054 : */
2055 62 : oldowner = CurrentResourceOwner;
2056 62 : CurrentResourceOwner = TopTransactionResourceOwner;
2057 :
2058 62 : stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2059 :
2060 62 : CurrentResourceOwner = oldowner;
2061 :
2062 62 : buffer = palloc(BLCKSZ);
2063 :
2064 62 : MemoryContextSwitchTo(oldcxt);
2065 :
2066 62 : remote_final_lsn = lsn;
2067 :
2068 : /*
2069 : * Make sure the handle apply_dispatch methods are aware we're in a remote
2070 : * transaction.
2071 : */
2072 62 : in_remote_transaction = true;
2073 62 : pgstat_report_activity(STATE_RUNNING, NULL);
2074 :
2075 62 : end_replication_step();
2076 :
2077 : /*
2078 : * Read the entries one by one and pass them through the same logic as in
2079 : * apply_dispatch.
2080 : */
2081 62 : nchanges = 0;
2082 : while (true)
2083 176938 : {
2084 : StringInfoData s2;
2085 : size_t nbytes;
2086 : int len;
2087 :
2088 177000 : CHECK_FOR_INTERRUPTS();
2089 :
2090 : /* read length of the on-disk record */
2091 177000 : nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2092 :
2093 : /* have we reached end of the file? */
2094 177000 : if (nbytes == 0)
2095 52 : break;
2096 :
2097 : /* do we have a correct length? */
2098 176948 : if (len <= 0)
2099 0 : elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2100 : len, path);
2101 :
2102 : /* make sure we have sufficiently large buffer */
2103 176948 : buffer = repalloc(buffer, len);
2104 :
2105 : /* and finally read the data into the buffer */
2106 176948 : BufFileReadExact(stream_fd, buffer, len);
2107 :
2108 176948 : BufFileTell(stream_fd, &fileno, &offset);
2109 :
2110 : /* init a stringinfo using the buffer and call apply_dispatch */
2111 176948 : initReadOnlyStringInfo(&s2, buffer, len);
2112 :
2113 : /* Ensure we are reading the data into our memory context. */
2114 176948 : oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
2115 :
2116 176948 : apply_dispatch(&s2);
2117 :
2118 176946 : MemoryContextReset(ApplyMessageContext);
2119 :
2120 176946 : MemoryContextSwitchTo(oldcxt);
2121 :
2122 176946 : nchanges++;
2123 :
2124 : /*
2125 : * It is possible the file has been closed because we have processed
2126 : * the transaction end message like stream_commit in which case that
2127 : * must be the last message.
2128 : */
2129 176946 : if (!stream_fd)
2130 : {
2131 8 : ensure_last_message(stream_fileset, xid, fileno, offset);
2132 8 : break;
2133 : }
2134 :
2135 176938 : if (nchanges % 1000 == 0)
2136 166 : elog(DEBUG1, "replayed %d changes from file \"%s\"",
2137 : nchanges, path);
2138 : }
2139 :
2140 60 : if (stream_fd)
2141 52 : stream_close_file();
2142 :
2143 60 : elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2144 : nchanges, path);
2145 :
2146 60 : return;
2147 : }
2148 :
2149 : /*
2150 : * Handle STREAM COMMIT message.
2151 : */
2152 : static void
2153 122 : apply_handle_stream_commit(StringInfo s)
2154 : {
2155 : TransactionId xid;
2156 : LogicalRepCommitData commit_data;
2157 : ParallelApplyWorkerInfo *winfo;
2158 : TransApplyAction apply_action;
2159 :
2160 : /* Save the message before it is consumed. */
2161 122 : StringInfoData original_msg = *s;
2162 :
2163 122 : if (in_streamed_transaction)
2164 0 : ereport(ERROR,
2165 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2166 : errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2167 :
2168 122 : xid = logicalrep_read_stream_commit(s, &commit_data);
2169 122 : set_apply_error_context_xact(xid, commit_data.commit_lsn);
2170 :
2171 122 : apply_action = get_transaction_apply_action(xid, &winfo);
2172 :
2173 122 : switch (apply_action)
2174 : {
2175 44 : case TRANS_LEADER_APPLY:
2176 :
2177 : /*
2178 : * The transaction has been serialized to file, so replay all the
2179 : * spooled operations.
2180 : */
2181 44 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
2182 : commit_data.commit_lsn);
2183 :
2184 42 : apply_handle_commit_internal(&commit_data);
2185 :
2186 : /* Unlink the files with serialized changes and subxact info. */
2187 42 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
2188 :
2189 42 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2190 42 : break;
2191 :
2192 36 : case TRANS_LEADER_SEND_TO_PARALLEL:
2193 : Assert(winfo);
2194 :
2195 36 : if (pa_send_data(winfo, s->len, s->data))
2196 : {
2197 : /* Finish processing the streaming transaction. */
2198 36 : pa_xact_finish(winfo, commit_data.end_lsn);
2199 34 : break;
2200 : }
2201 :
2202 : /*
2203 : * Switch to serialize mode when we are not able to send the
2204 : * change to parallel apply worker.
2205 : */
2206 0 : pa_switch_to_partial_serialize(winfo, true);
2207 :
2208 : /* fall through */
2209 4 : case TRANS_LEADER_PARTIAL_SERIALIZE:
2210 : Assert(winfo);
2211 :
2212 4 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_COMMIT,
2213 : &original_msg);
2214 :
2215 4 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
2216 :
2217 : /* Finish processing the streaming transaction. */
2218 4 : pa_xact_finish(winfo, commit_data.end_lsn);
2219 4 : break;
2220 :
2221 38 : case TRANS_PARALLEL_APPLY:
2222 :
2223 : /*
2224 : * If the parallel apply worker is applying spooled messages then
2225 : * close the file before committing.
2226 : */
2227 38 : if (stream_fd)
2228 4 : stream_close_file();
2229 :
2230 38 : apply_handle_commit_internal(&commit_data);
2231 :
2232 38 : MyParallelShared->last_commit_end = XactLastCommitEnd;
2233 :
2234 : /*
2235 : * It is important to set the transaction state as finished before
2236 : * releasing the lock. See pa_wait_for_xact_finish.
2237 : */
2238 38 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
2239 38 : pa_unlock_transaction(xid, AccessExclusiveLock);
2240 :
2241 38 : pa_reset_subtrans();
2242 :
2243 38 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2244 38 : break;
2245 :
2246 0 : default:
2247 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2248 : break;
2249 : }
2250 :
2251 : /* Process any tables that are being synchronized in parallel. */
2252 118 : process_syncing_tables(commit_data.end_lsn);
2253 :
2254 118 : pgstat_report_activity(STATE_IDLE, NULL);
2255 :
2256 118 : reset_apply_error_context_info();
2257 118 : }
2258 :
2259 : /*
2260 : * Helper function for apply_handle_commit and apply_handle_stream_commit.
2261 : */
2262 : static void
2263 928 : apply_handle_commit_internal(LogicalRepCommitData *commit_data)
2264 : {
2265 928 : if (is_skipping_changes())
2266 : {
2267 4 : stop_skipping_changes();
2268 :
2269 : /*
2270 : * Start a new transaction to clear the subskiplsn, if not started
2271 : * yet.
2272 : */
2273 4 : if (!IsTransactionState())
2274 2 : StartTransactionCommand();
2275 : }
2276 :
2277 928 : if (IsTransactionState())
2278 : {
2279 : /*
2280 : * The transaction is either non-empty or skipped, so we clear the
2281 : * subskiplsn.
2282 : */
2283 928 : clear_subscription_skip_lsn(commit_data->commit_lsn);
2284 :
2285 : /*
2286 : * Update origin state so we can restart streaming from correct
2287 : * position in case of crash.
2288 : */
2289 928 : replorigin_session_origin_lsn = commit_data->end_lsn;
2290 928 : replorigin_session_origin_timestamp = commit_data->committime;
2291 :
2292 928 : CommitTransactionCommand();
2293 :
2294 928 : if (IsTransactionBlock())
2295 : {
2296 8 : EndTransactionBlock(false);
2297 8 : CommitTransactionCommand();
2298 : }
2299 :
2300 928 : pgstat_report_stat(false);
2301 :
2302 928 : store_flush_position(commit_data->end_lsn, XactLastCommitEnd);
2303 : }
2304 : else
2305 : {
2306 : /* Process any invalidation messages that might have accumulated. */
2307 0 : AcceptInvalidationMessages();
2308 0 : maybe_reread_subscription();
2309 : }
2310 :
2311 928 : in_remote_transaction = false;
2312 928 : }
2313 :
2314 : /*
2315 : * Handle RELATION message.
2316 : *
2317 : * Note we don't do validation against local schema here. The validation
2318 : * against local schema is postponed until first change for given relation
2319 : * comes as we only care about it when applying changes for it anyway and we
2320 : * do less locking this way.
2321 : */
2322 : static void
2323 864 : apply_handle_relation(StringInfo s)
2324 : {
2325 : LogicalRepRelation *rel;
2326 :
2327 864 : if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
2328 70 : return;
2329 :
2330 794 : rel = logicalrep_read_rel(s);
2331 794 : logicalrep_relmap_update(rel);
2332 :
2333 : /* Also reset all entries in the partition map that refer to remoterel. */
2334 794 : logicalrep_partmap_reset_relmap(rel);
2335 : }
2336 :
2337 : /*
2338 : * Handle TYPE message.
2339 : *
2340 : * This implementation pays no attention to TYPE messages; we expect the user
2341 : * to have set things up so that the incoming data is acceptable to the input
2342 : * functions for the locally subscribed tables. Hence, we just read and
2343 : * discard the message.
2344 : */
2345 : static void
2346 36 : apply_handle_type(StringInfo s)
2347 : {
2348 : LogicalRepTyp typ;
2349 :
2350 36 : if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
2351 0 : return;
2352 :
2353 36 : logicalrep_read_typ(s, &typ);
2354 : }
2355 :
2356 : /*
2357 : * Check that we (the subscription owner) have sufficient privileges on the
2358 : * target relation to perform the given operation.
2359 : */
2360 : static void
2361 441556 : TargetPrivilegesCheck(Relation rel, AclMode mode)
2362 : {
2363 : Oid relid;
2364 : AclResult aclresult;
2365 :
2366 441556 : relid = RelationGetRelid(rel);
2367 441556 : aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
2368 441556 : if (aclresult != ACLCHECK_OK)
2369 14 : aclcheck_error(aclresult,
2370 14 : get_relkind_objtype(rel->rd_rel->relkind),
2371 14 : get_rel_name(relid));
2372 :
2373 : /*
2374 : * We lack the infrastructure to honor RLS policies. It might be possible
2375 : * to add such infrastructure here, but tablesync workers lack it, too, so
2376 : * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2377 : * but it seems dangerous to replicate a TRUNCATE and then refuse to
2378 : * replicate subsequent INSERTs, so we forbid all commands the same.
2379 : */
2380 441542 : if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2381 6 : ereport(ERROR,
2382 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2383 : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2384 : GetUserNameFromId(GetUserId(), true),
2385 : RelationGetRelationName(rel))));
2386 441536 : }
2387 :
2388 : /*
2389 : * Handle INSERT message.
2390 : */
2391 :
2392 : static void
2393 372644 : apply_handle_insert(StringInfo s)
2394 : {
2395 : LogicalRepRelMapEntry *rel;
2396 : LogicalRepTupleData newtup;
2397 : LogicalRepRelId relid;
2398 : UserContext ucxt;
2399 : ApplyExecutionData *edata;
2400 : EState *estate;
2401 : TupleTableSlot *remoteslot;
2402 : MemoryContext oldctx;
2403 : bool run_as_owner;
2404 :
2405 : /*
2406 : * Quick return if we are skipping data modification changes or handling
2407 : * streamed transactions.
2408 : */
2409 725286 : if (is_skipping_changes() ||
2410 352642 : handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
2411 220116 : return;
2412 :
2413 152630 : begin_replication_step();
2414 :
2415 152628 : relid = logicalrep_read_insert(s, &newtup);
2416 152628 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2417 152616 : if (!should_apply_changes_for_rel(rel))
2418 : {
2419 : /*
2420 : * The relation can't become interesting in the middle of the
2421 : * transaction so it's safe to unlock it.
2422 : */
2423 102 : logicalrep_rel_close(rel, RowExclusiveLock);
2424 102 : end_replication_step();
2425 102 : return;
2426 : }
2427 :
2428 : /*
2429 : * Make sure that any user-supplied code runs as the table owner, unless
2430 : * the user has opted out of that behavior.
2431 : */
2432 152514 : run_as_owner = MySubscription->runasowner;
2433 152514 : if (!run_as_owner)
2434 152498 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2435 :
2436 : /* Set relation for error callback */
2437 152514 : apply_error_callback_arg.rel = rel;
2438 :
2439 : /* Initialize the executor state. */
2440 152514 : edata = create_edata_for_relation(rel);
2441 152514 : estate = edata->estate;
2442 152514 : remoteslot = ExecInitExtraTupleSlot(estate,
2443 152514 : RelationGetDescr(rel->localrel),
2444 : &TTSOpsVirtual);
2445 :
2446 : /* Process and store remote tuple in the slot */
2447 152514 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2448 152514 : slot_store_data(remoteslot, rel, &newtup);
2449 152514 : slot_fill_defaults(rel, estate, remoteslot);
2450 152514 : MemoryContextSwitchTo(oldctx);
2451 :
2452 : /* For a partitioned table, insert the tuple into a partition. */
2453 152514 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2454 90 : apply_handle_tuple_routing(edata,
2455 : remoteslot, NULL, CMD_INSERT);
2456 : else
2457 : {
2458 152424 : ResultRelInfo *relinfo = edata->targetRelInfo;
2459 :
2460 152424 : ExecOpenIndices(relinfo, false);
2461 152424 : apply_handle_insert_internal(edata, relinfo, remoteslot);
2462 152396 : ExecCloseIndices(relinfo);
2463 : }
2464 :
2465 152484 : finish_edata(edata);
2466 :
2467 : /* Reset relation for error callback */
2468 152484 : apply_error_callback_arg.rel = NULL;
2469 :
2470 152484 : if (!run_as_owner)
2471 152474 : RestoreUserContext(&ucxt);
2472 :
2473 152484 : logicalrep_rel_close(rel, NoLock);
2474 :
2475 152484 : end_replication_step();
2476 : }
2477 :
2478 : /*
2479 : * Workhorse for apply_handle_insert()
2480 : * relinfo is for the relation we're actually inserting into
2481 : * (could be a child partition of edata->targetRelInfo)
2482 : */
2483 : static void
2484 152516 : apply_handle_insert_internal(ApplyExecutionData *edata,
2485 : ResultRelInfo *relinfo,
2486 : TupleTableSlot *remoteslot)
2487 : {
2488 152516 : EState *estate = edata->estate;
2489 :
2490 : /* Caller should have opened indexes already. */
2491 : Assert(relinfo->ri_IndexRelationDescs != NULL ||
2492 : !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
2493 : RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
2494 :
2495 : /* Caller will not have done this bit. */
2496 : Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
2497 152516 : InitConflictIndexes(relinfo);
2498 :
2499 : /* Do the insert. */
2500 152516 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
2501 152504 : ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2502 152486 : }
2503 :
2504 : /*
2505 : * Check if the logical replication relation is updatable and throw
2506 : * appropriate error if it isn't.
2507 : */
2508 : static void
2509 144566 : check_relation_updatable(LogicalRepRelMapEntry *rel)
2510 : {
2511 : /*
2512 : * For partitioned tables, we only need to care if the target partition is
2513 : * updatable (aka has PK or RI defined for it).
2514 : */
2515 144566 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2516 60 : return;
2517 :
2518 : /* Updatable, no error. */
2519 144506 : if (rel->updatable)
2520 144506 : return;
2521 :
2522 : /*
2523 : * We are in error mode so it's fine this is somewhat slow. It's better to
2524 : * give user correct error.
2525 : */
2526 0 : if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
2527 : {
2528 0 : ereport(ERROR,
2529 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2530 : errmsg("publisher did not send replica identity column "
2531 : "expected by the logical replication target relation \"%s.%s\"",
2532 : rel->remoterel.nspname, rel->remoterel.relname)));
2533 : }
2534 :
2535 0 : ereport(ERROR,
2536 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2537 : errmsg("logical replication target relation \"%s.%s\" has "
2538 : "neither REPLICA IDENTITY index nor PRIMARY "
2539 : "KEY and published relation does not have "
2540 : "REPLICA IDENTITY FULL",
2541 : rel->remoterel.nspname, rel->remoterel.relname)));
2542 : }
2543 :
2544 : /*
2545 : * Handle UPDATE message.
2546 : *
2547 : * TODO: FDW support
2548 : */
2549 : static void
2550 132318 : apply_handle_update(StringInfo s)
2551 : {
2552 : LogicalRepRelMapEntry *rel;
2553 : LogicalRepRelId relid;
2554 : UserContext ucxt;
2555 : ApplyExecutionData *edata;
2556 : EState *estate;
2557 : LogicalRepTupleData oldtup;
2558 : LogicalRepTupleData newtup;
2559 : bool has_oldtup;
2560 : TupleTableSlot *remoteslot;
2561 : RTEPermissionInfo *target_perminfo;
2562 : MemoryContext oldctx;
2563 : bool run_as_owner;
2564 :
2565 : /*
2566 : * Quick return if we are skipping data modification changes or handling
2567 : * streamed transactions.
2568 : */
2569 264630 : if (is_skipping_changes() ||
2570 132312 : handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
2571 68446 : return;
2572 :
2573 63872 : begin_replication_step();
2574 :
2575 63870 : relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2576 : &newtup);
2577 63870 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2578 63870 : if (!should_apply_changes_for_rel(rel))
2579 : {
2580 : /*
2581 : * The relation can't become interesting in the middle of the
2582 : * transaction so it's safe to unlock it.
2583 : */
2584 0 : logicalrep_rel_close(rel, RowExclusiveLock);
2585 0 : end_replication_step();
2586 0 : return;
2587 : }
2588 :
2589 : /* Set relation for error callback */
2590 63870 : apply_error_callback_arg.rel = rel;
2591 :
2592 : /* Check if we can do the update. */
2593 63870 : check_relation_updatable(rel);
2594 :
2595 : /*
2596 : * Make sure that any user-supplied code runs as the table owner, unless
2597 : * the user has opted out of that behavior.
2598 : */
2599 63870 : run_as_owner = MySubscription->runasowner;
2600 63870 : if (!run_as_owner)
2601 63864 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2602 :
2603 : /* Initialize the executor state. */
2604 63868 : edata = create_edata_for_relation(rel);
2605 63868 : estate = edata->estate;
2606 63868 : remoteslot = ExecInitExtraTupleSlot(estate,
2607 63868 : RelationGetDescr(rel->localrel),
2608 : &TTSOpsVirtual);
2609 :
2610 : /*
2611 : * Populate updatedCols so that per-column triggers can fire, and so
2612 : * executor can correctly pass down indexUnchanged hint. This could
2613 : * include more columns than were actually changed on the publisher
2614 : * because the logical replication protocol doesn't contain that
2615 : * information. But it would for example exclude columns that only exist
2616 : * on the subscriber, since we are not touching those.
2617 : */
2618 63868 : target_perminfo = list_nth(estate->es_rteperminfos, 0);
2619 318608 : for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2620 : {
2621 254740 : Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i);
2622 254740 : int remoteattnum = rel->attrmap->attnums[i];
2623 :
2624 254740 : if (!att->attisdropped && remoteattnum >= 0)
2625 : {
2626 : Assert(remoteattnum < newtup.ncols);
2627 137706 : if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2628 137700 : target_perminfo->updatedCols =
2629 137700 : bms_add_member(target_perminfo->updatedCols,
2630 : i + 1 - FirstLowInvalidHeapAttributeNumber);
2631 : }
2632 : }
2633 :
2634 : /* Build the search tuple. */
2635 63868 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2636 63868 : slot_store_data(remoteslot, rel,
2637 63868 : has_oldtup ? &oldtup : &newtup);
2638 63868 : MemoryContextSwitchTo(oldctx);
2639 :
2640 : /* For a partitioned table, apply update to correct partition. */
2641 63868 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2642 26 : apply_handle_tuple_routing(edata,
2643 : remoteslot, &newtup, CMD_UPDATE);
2644 : else
2645 63842 : apply_handle_update_internal(edata, edata->targetRelInfo,
2646 : remoteslot, &newtup, rel->localindexoid);
2647 :
2648 63856 : finish_edata(edata);
2649 :
2650 : /* Reset relation for error callback */
2651 63856 : apply_error_callback_arg.rel = NULL;
2652 :
2653 63856 : if (!run_as_owner)
2654 63852 : RestoreUserContext(&ucxt);
2655 :
2656 63856 : logicalrep_rel_close(rel, NoLock);
2657 :
2658 63856 : end_replication_step();
2659 : }
2660 :
2661 : /*
2662 : * Workhorse for apply_handle_update()
2663 : * relinfo is for the relation we're actually updating in
2664 : * (could be a child partition of edata->targetRelInfo)
2665 : */
2666 : static void
2667 63842 : apply_handle_update_internal(ApplyExecutionData *edata,
2668 : ResultRelInfo *relinfo,
2669 : TupleTableSlot *remoteslot,
2670 : LogicalRepTupleData *newtup,
2671 : Oid localindexoid)
2672 : {
2673 63842 : EState *estate = edata->estate;
2674 63842 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2675 63842 : Relation localrel = relinfo->ri_RelationDesc;
2676 : EPQState epqstate;
2677 63842 : TupleTableSlot *localslot = NULL;
2678 63842 : ConflictTupleInfo conflicttuple = {0};
2679 : bool found;
2680 : MemoryContext oldctx;
2681 :
2682 63842 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2683 63842 : ExecOpenIndices(relinfo, false);
2684 :
2685 63842 : found = FindReplTupleInLocalRel(edata, localrel,
2686 : &relmapentry->remoterel,
2687 : localindexoid,
2688 : remoteslot, &localslot);
2689 :
2690 : /*
2691 : * Tuple found.
2692 : *
2693 : * Note this will fail if there are other conflicting unique indexes.
2694 : */
2695 63834 : if (found)
2696 : {
2697 : /*
2698 : * Report the conflict if the tuple was modified by a different
2699 : * origin.
2700 : */
2701 63826 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
2702 4 : &conflicttuple.origin, &conflicttuple.ts) &&
2703 4 : conflicttuple.origin != replorigin_session_origin)
2704 : {
2705 : TupleTableSlot *newslot;
2706 :
2707 : /* Store the new tuple for conflict reporting */
2708 4 : newslot = table_slot_create(localrel, &estate->es_tupleTable);
2709 4 : slot_store_data(newslot, relmapentry, newtup);
2710 :
2711 4 : conflicttuple.slot = localslot;
2712 :
2713 4 : ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
2714 : remoteslot, newslot,
2715 4 : list_make1(&conflicttuple));
2716 : }
2717 :
2718 : /* Process and store remote tuple in the slot */
2719 63826 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2720 63826 : slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2721 63826 : MemoryContextSwitchTo(oldctx);
2722 :
2723 63826 : EvalPlanQualSetSlot(&epqstate, remoteslot);
2724 :
2725 63826 : InitConflictIndexes(relinfo);
2726 :
2727 : /* Do the actual update. */
2728 63826 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
2729 63826 : ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2730 : remoteslot);
2731 : }
2732 : else
2733 : {
2734 8 : TupleTableSlot *newslot = localslot;
2735 :
2736 : /* Store the new tuple for conflict reporting */
2737 8 : slot_store_data(newslot, relmapentry, newtup);
2738 :
2739 : /*
2740 : * The tuple to be updated could not be found. Do nothing except for
2741 : * emitting a log message.
2742 : */
2743 8 : ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
2744 8 : remoteslot, newslot, list_make1(&conflicttuple));
2745 : }
2746 :
2747 : /* Cleanup. */
2748 63830 : ExecCloseIndices(relinfo);
2749 63830 : EvalPlanQualEnd(&epqstate);
2750 63830 : }
2751 :
2752 : /*
2753 : * Handle DELETE message.
2754 : *
2755 : * TODO: FDW support
2756 : */
2757 : static void
2758 163866 : apply_handle_delete(StringInfo s)
2759 : {
2760 : LogicalRepRelMapEntry *rel;
2761 : LogicalRepTupleData oldtup;
2762 : LogicalRepRelId relid;
2763 : UserContext ucxt;
2764 : ApplyExecutionData *edata;
2765 : EState *estate;
2766 : TupleTableSlot *remoteslot;
2767 : MemoryContext oldctx;
2768 : bool run_as_owner;
2769 :
2770 : /*
2771 : * Quick return if we are skipping data modification changes or handling
2772 : * streamed transactions.
2773 : */
2774 327732 : if (is_skipping_changes() ||
2775 163866 : handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
2776 83230 : return;
2777 :
2778 80636 : begin_replication_step();
2779 :
2780 80636 : relid = logicalrep_read_delete(s, &oldtup);
2781 80636 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2782 80636 : if (!should_apply_changes_for_rel(rel))
2783 : {
2784 : /*
2785 : * The relation can't become interesting in the middle of the
2786 : * transaction so it's safe to unlock it.
2787 : */
2788 0 : logicalrep_rel_close(rel, RowExclusiveLock);
2789 0 : end_replication_step();
2790 0 : return;
2791 : }
2792 :
2793 : /* Set relation for error callback */
2794 80636 : apply_error_callback_arg.rel = rel;
2795 :
2796 : /* Check if we can do the delete. */
2797 80636 : check_relation_updatable(rel);
2798 :
2799 : /*
2800 : * Make sure that any user-supplied code runs as the table owner, unless
2801 : * the user has opted out of that behavior.
2802 : */
2803 80636 : run_as_owner = MySubscription->runasowner;
2804 80636 : if (!run_as_owner)
2805 80632 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2806 :
2807 : /* Initialize the executor state. */
2808 80636 : edata = create_edata_for_relation(rel);
2809 80636 : estate = edata->estate;
2810 80636 : remoteslot = ExecInitExtraTupleSlot(estate,
2811 80636 : RelationGetDescr(rel->localrel),
2812 : &TTSOpsVirtual);
2813 :
2814 : /* Build the search tuple. */
2815 80636 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2816 80636 : slot_store_data(remoteslot, rel, &oldtup);
2817 80636 : MemoryContextSwitchTo(oldctx);
2818 :
2819 : /* For a partitioned table, apply delete to correct partition. */
2820 80636 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2821 34 : apply_handle_tuple_routing(edata,
2822 : remoteslot, NULL, CMD_DELETE);
2823 : else
2824 : {
2825 80602 : ResultRelInfo *relinfo = edata->targetRelInfo;
2826 :
2827 80602 : ExecOpenIndices(relinfo, false);
2828 80602 : apply_handle_delete_internal(edata, relinfo,
2829 : remoteslot, rel->localindexoid);
2830 80602 : ExecCloseIndices(relinfo);
2831 : }
2832 :
2833 80636 : finish_edata(edata);
2834 :
2835 : /* Reset relation for error callback */
2836 80636 : apply_error_callback_arg.rel = NULL;
2837 :
2838 80636 : if (!run_as_owner)
2839 80632 : RestoreUserContext(&ucxt);
2840 :
2841 80636 : logicalrep_rel_close(rel, NoLock);
2842 :
2843 80636 : end_replication_step();
2844 : }
2845 :
2846 : /*
2847 : * Workhorse for apply_handle_delete()
2848 : * relinfo is for the relation we're actually deleting from
2849 : * (could be a child partition of edata->targetRelInfo)
2850 : */
2851 : static void
2852 80636 : apply_handle_delete_internal(ApplyExecutionData *edata,
2853 : ResultRelInfo *relinfo,
2854 : TupleTableSlot *remoteslot,
2855 : Oid localindexoid)
2856 : {
2857 80636 : EState *estate = edata->estate;
2858 80636 : Relation localrel = relinfo->ri_RelationDesc;
2859 80636 : LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
2860 : EPQState epqstate;
2861 : TupleTableSlot *localslot;
2862 80636 : ConflictTupleInfo conflicttuple = {0};
2863 : bool found;
2864 :
2865 80636 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2866 :
2867 : /* Caller should have opened indexes already. */
2868 : Assert(relinfo->ri_IndexRelationDescs != NULL ||
2869 : !localrel->rd_rel->relhasindex ||
2870 : RelationGetIndexList(localrel) == NIL);
2871 :
2872 80636 : found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
2873 : remoteslot, &localslot);
2874 :
2875 : /* If found delete it. */
2876 80636 : if (found)
2877 : {
2878 : /*
2879 : * Report the conflict if the tuple was modified by a different
2880 : * origin.
2881 : */
2882 80618 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
2883 6 : &conflicttuple.origin, &conflicttuple.ts) &&
2884 6 : conflicttuple.origin != replorigin_session_origin)
2885 : {
2886 4 : conflicttuple.slot = localslot;
2887 4 : ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
2888 : remoteslot, NULL,
2889 4 : list_make1(&conflicttuple));
2890 : }
2891 :
2892 80618 : EvalPlanQualSetSlot(&epqstate, localslot);
2893 :
2894 : /* Do the actual delete. */
2895 80618 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE);
2896 80618 : ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
2897 : }
2898 : else
2899 : {
2900 : /*
2901 : * The tuple to be deleted could not be found. Do nothing except for
2902 : * emitting a log message.
2903 : */
2904 18 : ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
2905 18 : remoteslot, NULL, list_make1(&conflicttuple));
2906 : }
2907 :
2908 : /* Cleanup. */
2909 80636 : EvalPlanQualEnd(&epqstate);
2910 80636 : }
2911 :
2912 : /*
2913 : * Try to find a tuple received from the publication side (in 'remoteslot') in
2914 : * the corresponding local relation using either replica identity index,
2915 : * primary key, index or if needed, sequential scan.
2916 : *
2917 : * Local tuple, if found, is returned in '*localslot'.
2918 : */
2919 : static bool
2920 144504 : FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
2921 : LogicalRepRelation *remoterel,
2922 : Oid localidxoid,
2923 : TupleTableSlot *remoteslot,
2924 : TupleTableSlot **localslot)
2925 : {
2926 144504 : EState *estate = edata->estate;
2927 : bool found;
2928 :
2929 : /*
2930 : * Regardless of the top-level operation, we're performing a read here, so
2931 : * check for SELECT privileges.
2932 : */
2933 144504 : TargetPrivilegesCheck(localrel, ACL_SELECT);
2934 :
2935 144496 : *localslot = table_slot_create(localrel, &estate->es_tupleTable);
2936 :
2937 : Assert(OidIsValid(localidxoid) ||
2938 : (remoterel->replident == REPLICA_IDENTITY_FULL));
2939 :
2940 144496 : if (OidIsValid(localidxoid))
2941 : {
2942 : #ifdef USE_ASSERT_CHECKING
2943 : Relation idxrel = index_open(localidxoid, AccessShareLock);
2944 :
2945 : /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
2946 : Assert(GetRelationIdentityOrPK(localrel) == localidxoid ||
2947 : (remoterel->replident == REPLICA_IDENTITY_FULL &&
2948 : IsIndexUsableForReplicaIdentityFull(idxrel,
2949 : edata->targetRel->attrmap)));
2950 : index_close(idxrel, AccessShareLock);
2951 : #endif
2952 :
2953 144198 : found = RelationFindReplTupleByIndex(localrel, localidxoid,
2954 : LockTupleExclusive,
2955 : remoteslot, *localslot);
2956 : }
2957 : else
2958 298 : found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
2959 : remoteslot, *localslot);
2960 :
2961 144496 : return found;
2962 : }
2963 :
2964 : /*
2965 : * This handles insert, update, delete on a partitioned table.
2966 : */
2967 : static void
2968 150 : apply_handle_tuple_routing(ApplyExecutionData *edata,
2969 : TupleTableSlot *remoteslot,
2970 : LogicalRepTupleData *newtup,
2971 : CmdType operation)
2972 : {
2973 150 : EState *estate = edata->estate;
2974 150 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2975 150 : ResultRelInfo *relinfo = edata->targetRelInfo;
2976 150 : Relation parentrel = relinfo->ri_RelationDesc;
2977 : ModifyTableState *mtstate;
2978 : PartitionTupleRouting *proute;
2979 : ResultRelInfo *partrelinfo;
2980 : Relation partrel;
2981 : TupleTableSlot *remoteslot_part;
2982 : TupleConversionMap *map;
2983 : MemoryContext oldctx;
2984 150 : LogicalRepRelMapEntry *part_entry = NULL;
2985 150 : AttrMap *attrmap = NULL;
2986 :
2987 : /* ModifyTableState is needed for ExecFindPartition(). */
2988 150 : edata->mtstate = mtstate = makeNode(ModifyTableState);
2989 150 : mtstate->ps.plan = NULL;
2990 150 : mtstate->ps.state = estate;
2991 150 : mtstate->operation = operation;
2992 150 : mtstate->resultRelInfo = relinfo;
2993 :
2994 : /* ... as is PartitionTupleRouting. */
2995 150 : edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
2996 :
2997 : /*
2998 : * Find the partition to which the "search tuple" belongs.
2999 : */
3000 : Assert(remoteslot != NULL);
3001 150 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3002 150 : partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
3003 : remoteslot, estate);
3004 : Assert(partrelinfo != NULL);
3005 150 : partrel = partrelinfo->ri_RelationDesc;
3006 :
3007 : /*
3008 : * Check for supported relkind. We need this since partitions might be of
3009 : * unsupported relkinds; and the set of partitions can change, so checking
3010 : * at CREATE/ALTER SUBSCRIPTION would be insufficient.
3011 : */
3012 150 : CheckSubscriptionRelkind(partrel->rd_rel->relkind,
3013 150 : get_namespace_name(RelationGetNamespace(partrel)),
3014 150 : RelationGetRelationName(partrel));
3015 :
3016 : /*
3017 : * To perform any of the operations below, the tuple must match the
3018 : * partition's rowtype. Convert if needed or just copy, using a dedicated
3019 : * slot to store the tuple in any case.
3020 : */
3021 150 : remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
3022 150 : if (remoteslot_part == NULL)
3023 84 : remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
3024 150 : map = ExecGetRootToChildMap(partrelinfo, estate);
3025 150 : if (map != NULL)
3026 : {
3027 66 : attrmap = map->attrMap;
3028 66 : remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
3029 : remoteslot_part);
3030 : }
3031 : else
3032 : {
3033 84 : remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
3034 84 : slot_getallattrs(remoteslot_part);
3035 : }
3036 150 : MemoryContextSwitchTo(oldctx);
3037 :
3038 : /* Check if we can do the update or delete on the leaf partition. */
3039 150 : if (operation == CMD_UPDATE || operation == CMD_DELETE)
3040 : {
3041 60 : part_entry = logicalrep_partition_open(relmapentry, partrel,
3042 : attrmap);
3043 60 : check_relation_updatable(part_entry);
3044 : }
3045 :
3046 150 : switch (operation)
3047 : {
3048 90 : case CMD_INSERT:
3049 90 : apply_handle_insert_internal(edata, partrelinfo,
3050 : remoteslot_part);
3051 88 : break;
3052 :
3053 34 : case CMD_DELETE:
3054 34 : apply_handle_delete_internal(edata, partrelinfo,
3055 : remoteslot_part,
3056 : part_entry->localindexoid);
3057 34 : break;
3058 :
3059 26 : case CMD_UPDATE:
3060 :
3061 : /*
3062 : * For UPDATE, depending on whether or not the updated tuple
3063 : * satisfies the partition's constraint, perform a simple UPDATE
3064 : * of the partition or move the updated tuple into a different
3065 : * suitable partition.
3066 : */
3067 : {
3068 : TupleTableSlot *localslot;
3069 : ResultRelInfo *partrelinfo_new;
3070 : Relation partrel_new;
3071 : bool found;
3072 : EPQState epqstate;
3073 26 : ConflictTupleInfo conflicttuple = {0};
3074 :
3075 : /* Get the matching local tuple from the partition. */
3076 26 : found = FindReplTupleInLocalRel(edata, partrel,
3077 : &part_entry->remoterel,
3078 : part_entry->localindexoid,
3079 : remoteslot_part, &localslot);
3080 26 : if (!found)
3081 : {
3082 4 : TupleTableSlot *newslot = localslot;
3083 :
3084 : /* Store the new tuple for conflict reporting */
3085 4 : slot_store_data(newslot, part_entry, newtup);
3086 :
3087 : /*
3088 : * The tuple to be updated could not be found. Do nothing
3089 : * except for emitting a log message.
3090 : */
3091 4 : ReportApplyConflict(estate, partrelinfo, LOG,
3092 : CT_UPDATE_MISSING, remoteslot_part,
3093 4 : newslot, list_make1(&conflicttuple));
3094 :
3095 4 : return;
3096 : }
3097 :
3098 : /*
3099 : * Report the conflict if the tuple was modified by a
3100 : * different origin.
3101 : */
3102 22 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3103 : &conflicttuple.origin,
3104 2 : &conflicttuple.ts) &&
3105 2 : conflicttuple.origin != replorigin_session_origin)
3106 : {
3107 : TupleTableSlot *newslot;
3108 :
3109 : /* Store the new tuple for conflict reporting */
3110 2 : newslot = table_slot_create(partrel, &estate->es_tupleTable);
3111 2 : slot_store_data(newslot, part_entry, newtup);
3112 :
3113 2 : conflicttuple.slot = localslot;
3114 :
3115 2 : ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
3116 : remoteslot_part, newslot,
3117 2 : list_make1(&conflicttuple));
3118 : }
3119 :
3120 : /*
3121 : * Apply the update to the local tuple, putting the result in
3122 : * remoteslot_part.
3123 : */
3124 22 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3125 22 : slot_modify_data(remoteslot_part, localslot, part_entry,
3126 : newtup);
3127 22 : MemoryContextSwitchTo(oldctx);
3128 :
3129 22 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3130 :
3131 : /*
3132 : * Does the updated tuple still satisfy the current
3133 : * partition's constraint?
3134 : */
3135 44 : if (!partrel->rd_rel->relispartition ||
3136 22 : ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3137 : false))
3138 : {
3139 : /*
3140 : * Yes, so simply UPDATE the partition. We don't call
3141 : * apply_handle_update_internal() here, which would
3142 : * normally do the following work, to avoid repeating some
3143 : * work already done above to find the local tuple in the
3144 : * partition.
3145 : */
3146 20 : InitConflictIndexes(partrelinfo);
3147 :
3148 20 : EvalPlanQualSetSlot(&epqstate, remoteslot_part);
3149 20 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
3150 : ACL_UPDATE);
3151 20 : ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3152 : localslot, remoteslot_part);
3153 : }
3154 : else
3155 : {
3156 : /* Move the tuple into the new partition. */
3157 :
3158 : /*
3159 : * New partition will be found using tuple routing, which
3160 : * can only occur via the parent table. We might need to
3161 : * convert the tuple to the parent's rowtype. Note that
3162 : * this is the tuple found in the partition, not the
3163 : * original search tuple received by this function.
3164 : */
3165 2 : if (map)
3166 : {
3167 : TupleConversionMap *PartitionToRootMap =
3168 2 : convert_tuples_by_name(RelationGetDescr(partrel),
3169 : RelationGetDescr(parentrel));
3170 :
3171 : remoteslot =
3172 2 : execute_attr_map_slot(PartitionToRootMap->attrMap,
3173 : remoteslot_part, remoteslot);
3174 : }
3175 : else
3176 : {
3177 0 : remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3178 0 : slot_getallattrs(remoteslot);
3179 : }
3180 :
3181 : /* Find the new partition. */
3182 2 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3183 2 : partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3184 : proute, remoteslot,
3185 : estate);
3186 2 : MemoryContextSwitchTo(oldctx);
3187 : Assert(partrelinfo_new != partrelinfo);
3188 2 : partrel_new = partrelinfo_new->ri_RelationDesc;
3189 :
3190 : /* Check that new partition also has supported relkind. */
3191 2 : CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3192 2 : get_namespace_name(RelationGetNamespace(partrel_new)),
3193 2 : RelationGetRelationName(partrel_new));
3194 :
3195 : /* DELETE old tuple found in the old partition. */
3196 2 : EvalPlanQualSetSlot(&epqstate, localslot);
3197 2 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, ACL_DELETE);
3198 2 : ExecSimpleRelationDelete(partrelinfo, estate, &epqstate, localslot);
3199 :
3200 : /* INSERT new tuple into the new partition. */
3201 :
3202 : /*
3203 : * Convert the replacement tuple to match the destination
3204 : * partition rowtype.
3205 : */
3206 2 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3207 2 : remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3208 2 : if (remoteslot_part == NULL)
3209 2 : remoteslot_part = table_slot_create(partrel_new,
3210 : &estate->es_tupleTable);
3211 2 : map = ExecGetRootToChildMap(partrelinfo_new, estate);
3212 2 : if (map != NULL)
3213 : {
3214 0 : remoteslot_part = execute_attr_map_slot(map->attrMap,
3215 : remoteslot,
3216 : remoteslot_part);
3217 : }
3218 : else
3219 : {
3220 2 : remoteslot_part = ExecCopySlot(remoteslot_part,
3221 : remoteslot);
3222 2 : slot_getallattrs(remoteslot);
3223 : }
3224 2 : MemoryContextSwitchTo(oldctx);
3225 2 : apply_handle_insert_internal(edata, partrelinfo_new,
3226 : remoteslot_part);
3227 : }
3228 :
3229 22 : EvalPlanQualEnd(&epqstate);
3230 : }
3231 22 : break;
3232 :
3233 0 : default:
3234 0 : elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3235 : break;
3236 : }
3237 : }
3238 :
3239 : /*
3240 : * Handle TRUNCATE message.
3241 : *
3242 : * TODO: FDW support
3243 : */
3244 : static void
3245 38 : apply_handle_truncate(StringInfo s)
3246 : {
3247 38 : bool cascade = false;
3248 38 : bool restart_seqs = false;
3249 38 : List *remote_relids = NIL;
3250 38 : List *remote_rels = NIL;
3251 38 : List *rels = NIL;
3252 38 : List *part_rels = NIL;
3253 38 : List *relids = NIL;
3254 38 : List *relids_logged = NIL;
3255 : ListCell *lc;
3256 38 : LOCKMODE lockmode = AccessExclusiveLock;
3257 :
3258 : /*
3259 : * Quick return if we are skipping data modification changes or handling
3260 : * streamed transactions.
3261 : */
3262 76 : if (is_skipping_changes() ||
3263 38 : handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
3264 0 : return;
3265 :
3266 38 : begin_replication_step();
3267 :
3268 38 : remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3269 :
3270 94 : foreach(lc, remote_relids)
3271 : {
3272 56 : LogicalRepRelId relid = lfirst_oid(lc);
3273 : LogicalRepRelMapEntry *rel;
3274 :
3275 56 : rel = logicalrep_rel_open(relid, lockmode);
3276 56 : if (!should_apply_changes_for_rel(rel))
3277 : {
3278 : /*
3279 : * The relation can't become interesting in the middle of the
3280 : * transaction so it's safe to unlock it.
3281 : */
3282 0 : logicalrep_rel_close(rel, lockmode);
3283 0 : continue;
3284 : }
3285 :
3286 56 : remote_rels = lappend(remote_rels, rel);
3287 56 : TargetPrivilegesCheck(rel->localrel, ACL_TRUNCATE);
3288 56 : rels = lappend(rels, rel->localrel);
3289 56 : relids = lappend_oid(relids, rel->localreloid);
3290 56 : if (RelationIsLogicallyLogged(rel->localrel))
3291 0 : relids_logged = lappend_oid(relids_logged, rel->localreloid);
3292 :
3293 : /*
3294 : * Truncate partitions if we got a message to truncate a partitioned
3295 : * table.
3296 : */
3297 56 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3298 : {
3299 : ListCell *child;
3300 8 : List *children = find_all_inheritors(rel->localreloid,
3301 : lockmode,
3302 : NULL);
3303 :
3304 30 : foreach(child, children)
3305 : {
3306 22 : Oid childrelid = lfirst_oid(child);
3307 : Relation childrel;
3308 :
3309 22 : if (list_member_oid(relids, childrelid))
3310 8 : continue;
3311 :
3312 : /* find_all_inheritors already got lock */
3313 14 : childrel = table_open(childrelid, NoLock);
3314 :
3315 : /*
3316 : * Ignore temp tables of other backends. See similar code in
3317 : * ExecuteTruncate().
3318 : */
3319 14 : if (RELATION_IS_OTHER_TEMP(childrel))
3320 : {
3321 0 : table_close(childrel, lockmode);
3322 0 : continue;
3323 : }
3324 :
3325 14 : TargetPrivilegesCheck(childrel, ACL_TRUNCATE);
3326 14 : rels = lappend(rels, childrel);
3327 14 : part_rels = lappend(part_rels, childrel);
3328 14 : relids = lappend_oid(relids, childrelid);
3329 : /* Log this relation only if needed for logical decoding */
3330 14 : if (RelationIsLogicallyLogged(childrel))
3331 0 : relids_logged = lappend_oid(relids_logged, childrelid);
3332 : }
3333 : }
3334 : }
3335 :
3336 : /*
3337 : * Even if we used CASCADE on the upstream primary we explicitly default
3338 : * to replaying changes without further cascading. This might be later
3339 : * changeable with a user specified option.
3340 : *
3341 : * MySubscription->runasowner tells us whether we want to execute
3342 : * replication actions as the subscription owner; the last argument to
3343 : * TruncateGuts tells it whether we want to switch to the table owner.
3344 : * Those are exactly opposite conditions.
3345 : */
3346 38 : ExecuteTruncateGuts(rels,
3347 : relids,
3348 : relids_logged,
3349 : DROP_RESTRICT,
3350 : restart_seqs,
3351 38 : !MySubscription->runasowner);
3352 94 : foreach(lc, remote_rels)
3353 : {
3354 56 : LogicalRepRelMapEntry *rel = lfirst(lc);
3355 :
3356 56 : logicalrep_rel_close(rel, NoLock);
3357 : }
3358 52 : foreach(lc, part_rels)
3359 : {
3360 14 : Relation rel = lfirst(lc);
3361 :
3362 14 : table_close(rel, NoLock);
3363 : }
3364 :
3365 38 : end_replication_step();
3366 : }
3367 :
3368 :
3369 : /*
3370 : * Logical replication protocol message dispatcher.
3371 : */
3372 : void
3373 675288 : apply_dispatch(StringInfo s)
3374 : {
3375 675288 : LogicalRepMsgType action = pq_getmsgbyte(s);
3376 : LogicalRepMsgType saved_command;
3377 :
3378 : /*
3379 : * Set the current command being applied. Since this function can be
3380 : * called recursively when applying spooled changes, save the current
3381 : * command.
3382 : */
3383 675288 : saved_command = apply_error_callback_arg.command;
3384 675288 : apply_error_callback_arg.command = action;
3385 :
3386 675288 : switch (action)
3387 : {
3388 904 : case LOGICAL_REP_MSG_BEGIN:
3389 904 : apply_handle_begin(s);
3390 904 : break;
3391 :
3392 848 : case LOGICAL_REP_MSG_COMMIT:
3393 848 : apply_handle_commit(s);
3394 848 : break;
3395 :
3396 372644 : case LOGICAL_REP_MSG_INSERT:
3397 372644 : apply_handle_insert(s);
3398 372600 : break;
3399 :
3400 132318 : case LOGICAL_REP_MSG_UPDATE:
3401 132318 : apply_handle_update(s);
3402 132302 : break;
3403 :
3404 163866 : case LOGICAL_REP_MSG_DELETE:
3405 163866 : apply_handle_delete(s);
3406 163866 : break;
3407 :
3408 38 : case LOGICAL_REP_MSG_TRUNCATE:
3409 38 : apply_handle_truncate(s);
3410 38 : break;
3411 :
3412 864 : case LOGICAL_REP_MSG_RELATION:
3413 864 : apply_handle_relation(s);
3414 864 : break;
3415 :
3416 36 : case LOGICAL_REP_MSG_TYPE:
3417 36 : apply_handle_type(s);
3418 36 : break;
3419 :
3420 16 : case LOGICAL_REP_MSG_ORIGIN:
3421 16 : apply_handle_origin(s);
3422 16 : break;
3423 :
3424 0 : case LOGICAL_REP_MSG_MESSAGE:
3425 :
3426 : /*
3427 : * Logical replication does not use generic logical messages yet.
3428 : * Although, it could be used by other applications that use this
3429 : * output plugin.
3430 : */
3431 0 : break;
3432 :
3433 1712 : case LOGICAL_REP_MSG_STREAM_START:
3434 1712 : apply_handle_stream_start(s);
3435 1712 : break;
3436 :
3437 1710 : case LOGICAL_REP_MSG_STREAM_STOP:
3438 1710 : apply_handle_stream_stop(s);
3439 1708 : break;
3440 :
3441 76 : case LOGICAL_REP_MSG_STREAM_ABORT:
3442 76 : apply_handle_stream_abort(s);
3443 76 : break;
3444 :
3445 122 : case LOGICAL_REP_MSG_STREAM_COMMIT:
3446 122 : apply_handle_stream_commit(s);
3447 118 : break;
3448 :
3449 32 : case LOGICAL_REP_MSG_BEGIN_PREPARE:
3450 32 : apply_handle_begin_prepare(s);
3451 32 : break;
3452 :
3453 30 : case LOGICAL_REP_MSG_PREPARE:
3454 30 : apply_handle_prepare(s);
3455 28 : break;
3456 :
3457 40 : case LOGICAL_REP_MSG_COMMIT_PREPARED:
3458 40 : apply_handle_commit_prepared(s);
3459 40 : break;
3460 :
3461 10 : case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
3462 10 : apply_handle_rollback_prepared(s);
3463 10 : break;
3464 :
3465 22 : case LOGICAL_REP_MSG_STREAM_PREPARE:
3466 22 : apply_handle_stream_prepare(s);
3467 22 : break;
3468 :
3469 0 : default:
3470 0 : ereport(ERROR,
3471 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
3472 : errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3473 : }
3474 :
3475 : /* Reset the current command */
3476 675220 : apply_error_callback_arg.command = saved_command;
3477 675220 : }
3478 :
3479 : /*
3480 : * Figure out which write/flush positions to report to the walsender process.
3481 : *
3482 : * We can't simply report back the last LSN the walsender sent us because the
3483 : * local transaction might not yet be flushed to disk locally. Instead we
3484 : * build a list that associates local with remote LSNs for every commit. When
3485 : * reporting back the flush position to the sender we iterate that list and
3486 : * check which entries on it are already locally flushed. Those we can report
3487 : * as having been flushed.
3488 : *
3489 : * The have_pending_txes is true if there are outstanding transactions that
3490 : * need to be flushed.
3491 : */
3492 : static void
3493 73370 : get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
3494 : bool *have_pending_txes)
3495 : {
3496 : dlist_mutable_iter iter;
3497 73370 : XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3498 :
3499 73370 : *write = InvalidXLogRecPtr;
3500 73370 : *flush = InvalidXLogRecPtr;
3501 :
3502 74310 : dlist_foreach_modify(iter, &lsn_mapping)
3503 : {
3504 17210 : FlushPosition *pos =
3505 17210 : dlist_container(FlushPosition, node, iter.cur);
3506 :
3507 17210 : *write = pos->remote_end;
3508 :
3509 17210 : if (pos->local_end <= local_flush)
3510 : {
3511 940 : *flush = pos->remote_end;
3512 940 : dlist_delete(iter.cur);
3513 940 : pfree(pos);
3514 : }
3515 : else
3516 : {
3517 : /*
3518 : * Don't want to uselessly iterate over the rest of the list which
3519 : * could potentially be long. Instead get the last element and
3520 : * grab the write position from there.
3521 : */
3522 16270 : pos = dlist_tail_element(FlushPosition, node,
3523 : &lsn_mapping);
3524 16270 : *write = pos->remote_end;
3525 16270 : *have_pending_txes = true;
3526 16270 : return;
3527 : }
3528 : }
3529 :
3530 57100 : *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3531 : }
3532 :
3533 : /*
3534 : * Store current remote/local lsn pair in the tracking list.
3535 : */
3536 : void
3537 1060 : store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
3538 : {
3539 : FlushPosition *flushpos;
3540 :
3541 : /*
3542 : * Skip for parallel apply workers, because the lsn_mapping is maintained
3543 : * by the leader apply worker.
3544 : */
3545 1060 : if (am_parallel_apply_worker())
3546 38 : return;
3547 :
3548 : /* Need to do this in permanent context */
3549 1022 : MemoryContextSwitchTo(ApplyContext);
3550 :
3551 : /* Track commit lsn */
3552 1022 : flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
3553 1022 : flushpos->local_end = local_lsn;
3554 1022 : flushpos->remote_end = remote_lsn;
3555 :
3556 1022 : dlist_push_tail(&lsn_mapping, &flushpos->node);
3557 1022 : MemoryContextSwitchTo(ApplyMessageContext);
3558 : }
3559 :
3560 :
3561 : /* Update statistics of the worker. */
3562 : static void
3563 372872 : UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
3564 : {
3565 372872 : MyLogicalRepWorker->last_lsn = last_lsn;
3566 372872 : MyLogicalRepWorker->last_send_time = send_time;
3567 372872 : MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
3568 372872 : if (reply)
3569 : {
3570 3294 : MyLogicalRepWorker->reply_lsn = last_lsn;
3571 3294 : MyLogicalRepWorker->reply_time = send_time;
3572 : }
3573 372872 : }
3574 :
3575 : /*
3576 : * Apply main loop.
3577 : */
3578 : static void
3579 730 : LogicalRepApplyLoop(XLogRecPtr last_received)
3580 : {
3581 730 : TimestampTz last_recv_timestamp = GetCurrentTimestamp();
3582 730 : bool ping_sent = false;
3583 : TimeLineID tli;
3584 : ErrorContextCallback errcallback;
3585 :
3586 : /*
3587 : * Init the ApplyMessageContext which we clean up after each replication
3588 : * protocol message.
3589 : */
3590 730 : ApplyMessageContext = AllocSetContextCreate(ApplyContext,
3591 : "ApplyMessageContext",
3592 : ALLOCSET_DEFAULT_SIZES);
3593 :
3594 : /*
3595 : * This memory context is used for per-stream data when the streaming mode
3596 : * is enabled. This context is reset on each stream stop.
3597 : */
3598 730 : LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
3599 : "LogicalStreamingContext",
3600 : ALLOCSET_DEFAULT_SIZES);
3601 :
3602 : /* mark as idle, before starting to loop */
3603 730 : pgstat_report_activity(STATE_IDLE, NULL);
3604 :
3605 : /*
3606 : * Push apply error context callback. Fields will be filled while applying
3607 : * a change.
3608 : */
3609 730 : errcallback.callback = apply_error_callback;
3610 730 : errcallback.previous = error_context_stack;
3611 730 : error_context_stack = &errcallback;
3612 730 : apply_error_context_stack = error_context_stack;
3613 :
3614 : /* This outer loop iterates once per wait. */
3615 : for (;;)
3616 69138 : {
3617 69868 : pgsocket fd = PGINVALID_SOCKET;
3618 : int rc;
3619 : int len;
3620 69868 : char *buf = NULL;
3621 69868 : bool endofstream = false;
3622 : long wait_time;
3623 :
3624 69868 : CHECK_FOR_INTERRUPTS();
3625 :
3626 69868 : MemoryContextSwitchTo(ApplyMessageContext);
3627 :
3628 69868 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
3629 :
3630 69834 : if (len != 0)
3631 : {
3632 : /* Loop to process all available data (without blocking). */
3633 : for (;;)
3634 : {
3635 440848 : CHECK_FOR_INTERRUPTS();
3636 :
3637 440848 : if (len == 0)
3638 : {
3639 67964 : break;
3640 : }
3641 372884 : else if (len < 0)
3642 : {
3643 12 : ereport(LOG,
3644 : (errmsg("data stream from publisher has ended")));
3645 12 : endofstream = true;
3646 12 : break;
3647 : }
3648 : else
3649 : {
3650 : int c;
3651 : StringInfoData s;
3652 :
3653 372872 : if (ConfigReloadPending)
3654 : {
3655 0 : ConfigReloadPending = false;
3656 0 : ProcessConfigFile(PGC_SIGHUP);
3657 : }
3658 :
3659 : /* Reset timeout. */
3660 372872 : last_recv_timestamp = GetCurrentTimestamp();
3661 372872 : ping_sent = false;
3662 :
3663 : /* Ensure we are reading the data into our memory context. */
3664 372872 : MemoryContextSwitchTo(ApplyMessageContext);
3665 :
3666 372872 : initReadOnlyStringInfo(&s, buf, len);
3667 :
3668 372872 : c = pq_getmsgbyte(&s);
3669 :
3670 372872 : if (c == 'w')
3671 : {
3672 : XLogRecPtr start_lsn;
3673 : XLogRecPtr end_lsn;
3674 : TimestampTz send_time;
3675 :
3676 369578 : start_lsn = pq_getmsgint64(&s);
3677 369578 : end_lsn = pq_getmsgint64(&s);
3678 369578 : send_time = pq_getmsgint64(&s);
3679 :
3680 369578 : if (last_received < start_lsn)
3681 313118 : last_received = start_lsn;
3682 :
3683 369578 : if (last_received < end_lsn)
3684 0 : last_received = end_lsn;
3685 :
3686 369578 : UpdateWorkerStats(last_received, send_time, false);
3687 :
3688 369578 : apply_dispatch(&s);
3689 : }
3690 3294 : else if (c == 'k')
3691 : {
3692 : XLogRecPtr end_lsn;
3693 : TimestampTz timestamp;
3694 : bool reply_requested;
3695 :
3696 3294 : end_lsn = pq_getmsgint64(&s);
3697 3294 : timestamp = pq_getmsgint64(&s);
3698 3294 : reply_requested = pq_getmsgbyte(&s);
3699 :
3700 3294 : if (last_received < end_lsn)
3701 1618 : last_received = end_lsn;
3702 :
3703 3294 : send_feedback(last_received, reply_requested, false);
3704 3294 : UpdateWorkerStats(last_received, timestamp, true);
3705 : }
3706 : /* other message types are purposefully ignored */
3707 :
3708 372808 : MemoryContextReset(ApplyMessageContext);
3709 : }
3710 :
3711 372808 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
3712 : }
3713 : }
3714 :
3715 : /* confirm all writes so far */
3716 69770 : send_feedback(last_received, false, false);
3717 :
3718 69770 : if (!in_remote_transaction && !in_streamed_transaction)
3719 : {
3720 : /*
3721 : * If we didn't get any transactions for a while there might be
3722 : * unconsumed invalidation messages in the queue, consume them
3723 : * now.
3724 : */
3725 5454 : AcceptInvalidationMessages();
3726 5454 : maybe_reread_subscription();
3727 :
3728 : /* Process any table synchronization changes. */
3729 5384 : process_syncing_tables(last_received);
3730 : }
3731 :
3732 : /* Cleanup the memory. */
3733 69326 : MemoryContextReset(ApplyMessageContext);
3734 69326 : MemoryContextSwitchTo(TopMemoryContext);
3735 :
3736 : /* Check if we need to exit the streaming loop. */
3737 69326 : if (endofstream)
3738 12 : break;
3739 :
3740 : /*
3741 : * Wait for more data or latch. If we have unflushed transactions,
3742 : * wake up after WalWriterDelay to see if they've been flushed yet (in
3743 : * which case we should send a feedback message). Otherwise, there's
3744 : * no particular urgency about waking up unless we get data or a
3745 : * signal.
3746 : */
3747 69314 : if (!dlist_is_empty(&lsn_mapping))
3748 15064 : wait_time = WalWriterDelay;
3749 : else
3750 54250 : wait_time = NAPTIME_PER_CYCLE;
3751 :
3752 69314 : rc = WaitLatchOrSocket(MyLatch,
3753 : WL_SOCKET_READABLE | WL_LATCH_SET |
3754 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
3755 : fd, wait_time,
3756 : WAIT_EVENT_LOGICAL_APPLY_MAIN);
3757 :
3758 69314 : if (rc & WL_LATCH_SET)
3759 : {
3760 1010 : ResetLatch(MyLatch);
3761 1010 : CHECK_FOR_INTERRUPTS();
3762 : }
3763 :
3764 69138 : if (ConfigReloadPending)
3765 : {
3766 16 : ConfigReloadPending = false;
3767 16 : ProcessConfigFile(PGC_SIGHUP);
3768 : }
3769 :
3770 69138 : if (rc & WL_TIMEOUT)
3771 : {
3772 : /*
3773 : * We didn't receive anything new. If we haven't heard anything
3774 : * from the server for more than wal_receiver_timeout / 2, ping
3775 : * the server. Also, if it's been longer than
3776 : * wal_receiver_status_interval since the last update we sent,
3777 : * send a status update to the primary anyway, to report any
3778 : * progress in applying WAL.
3779 : */
3780 306 : bool requestReply = false;
3781 :
3782 : /*
3783 : * Check if time since last receive from primary has reached the
3784 : * configured limit.
3785 : */
3786 306 : if (wal_receiver_timeout > 0)
3787 : {
3788 306 : TimestampTz now = GetCurrentTimestamp();
3789 : TimestampTz timeout;
3790 :
3791 306 : timeout =
3792 306 : TimestampTzPlusMilliseconds(last_recv_timestamp,
3793 : wal_receiver_timeout);
3794 :
3795 306 : if (now >= timeout)
3796 0 : ereport(ERROR,
3797 : (errcode(ERRCODE_CONNECTION_FAILURE),
3798 : errmsg("terminating logical replication worker due to timeout")));
3799 :
3800 : /* Check to see if it's time for a ping. */
3801 306 : if (!ping_sent)
3802 : {
3803 306 : timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
3804 : (wal_receiver_timeout / 2));
3805 306 : if (now >= timeout)
3806 : {
3807 0 : requestReply = true;
3808 0 : ping_sent = true;
3809 : }
3810 : }
3811 : }
3812 :
3813 306 : send_feedback(last_received, requestReply, requestReply);
3814 :
3815 : /*
3816 : * Force reporting to ensure long idle periods don't lead to
3817 : * arbitrarily delayed stats. Stats can only be reported outside
3818 : * of (implicit or explicit) transactions. That shouldn't lead to
3819 : * stats being delayed for long, because transactions are either
3820 : * sent as a whole on commit or streamed. Streamed transactions
3821 : * are spilled to disk and applied on commit.
3822 : */
3823 306 : if (!IsTransactionState())
3824 306 : pgstat_report_stat(true);
3825 : }
3826 : }
3827 :
3828 : /* Pop the error context stack */
3829 12 : error_context_stack = errcallback.previous;
3830 12 : apply_error_context_stack = error_context_stack;
3831 :
3832 : /* All done */
3833 12 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
3834 0 : }
3835 :
3836 : /*
3837 : * Send a Standby Status Update message to server.
3838 : *
3839 : * 'recvpos' is the latest LSN we've received data to, force is set if we need
3840 : * to send a response to avoid timeouts.
3841 : */
3842 : static void
3843 73370 : send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
3844 : {
3845 : static StringInfo reply_message = NULL;
3846 : static TimestampTz send_time = 0;
3847 :
3848 : static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
3849 : static XLogRecPtr last_writepos = InvalidXLogRecPtr;
3850 : static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
3851 :
3852 : XLogRecPtr writepos;
3853 : XLogRecPtr flushpos;
3854 : TimestampTz now;
3855 : bool have_pending_txes;
3856 :
3857 : /*
3858 : * If the user doesn't want status to be reported to the publisher, be
3859 : * sure to exit before doing anything at all.
3860 : */
3861 73370 : if (!force && wal_receiver_status_interval <= 0)
3862 25562 : return;
3863 :
3864 : /* It's legal to not pass a recvpos */
3865 73370 : if (recvpos < last_recvpos)
3866 0 : recvpos = last_recvpos;
3867 :
3868 73370 : get_flush_position(&writepos, &flushpos, &have_pending_txes);
3869 :
3870 : /*
3871 : * No outstanding transactions to flush, we can report the latest received
3872 : * position. This is important for synchronous replication.
3873 : */
3874 73370 : if (!have_pending_txes)
3875 57100 : flushpos = writepos = recvpos;
3876 :
3877 73370 : if (writepos < last_writepos)
3878 0 : writepos = last_writepos;
3879 :
3880 73370 : if (flushpos < last_flushpos)
3881 16202 : flushpos = last_flushpos;
3882 :
3883 73370 : now = GetCurrentTimestamp();
3884 :
3885 : /* if we've already reported everything we're good */
3886 73370 : if (!force &&
3887 73136 : writepos == last_writepos &&
3888 26056 : flushpos == last_flushpos &&
3889 25794 : !TimestampDifferenceExceeds(send_time, now,
3890 : wal_receiver_status_interval * 1000))
3891 25562 : return;
3892 47808 : send_time = now;
3893 :
3894 47808 : if (!reply_message)
3895 : {
3896 730 : MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
3897 :
3898 730 : reply_message = makeStringInfo();
3899 730 : MemoryContextSwitchTo(oldctx);
3900 : }
3901 : else
3902 47078 : resetStringInfo(reply_message);
3903 :
3904 47808 : pq_sendbyte(reply_message, 'r');
3905 47808 : pq_sendint64(reply_message, recvpos); /* write */
3906 47808 : pq_sendint64(reply_message, flushpos); /* flush */
3907 47808 : pq_sendint64(reply_message, writepos); /* apply */
3908 47808 : pq_sendint64(reply_message, now); /* sendTime */
3909 47808 : pq_sendbyte(reply_message, requestReply); /* replyRequested */
3910 :
3911 47808 : elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
3912 : force,
3913 : LSN_FORMAT_ARGS(recvpos),
3914 : LSN_FORMAT_ARGS(writepos),
3915 : LSN_FORMAT_ARGS(flushpos));
3916 :
3917 47808 : walrcv_send(LogRepWorkerWalRcvConn,
3918 : reply_message->data, reply_message->len);
3919 :
3920 47808 : if (recvpos > last_recvpos)
3921 47086 : last_recvpos = recvpos;
3922 47808 : if (writepos > last_writepos)
3923 47086 : last_writepos = writepos;
3924 47808 : if (flushpos > last_flushpos)
3925 46586 : last_flushpos = flushpos;
3926 : }
3927 :
3928 : /*
3929 : * Exit routine for apply workers due to subscription parameter changes.
3930 : */
3931 : static void
3932 74 : apply_worker_exit(void)
3933 : {
3934 74 : if (am_parallel_apply_worker())
3935 : {
3936 : /*
3937 : * Don't stop the parallel apply worker as the leader will detect the
3938 : * subscription parameter change and restart logical replication later
3939 : * anyway. This also prevents the leader from reporting errors when
3940 : * trying to communicate with a stopped parallel apply worker, which
3941 : * would accidentally disable subscriptions if disable_on_error was
3942 : * set.
3943 : */
3944 0 : return;
3945 : }
3946 :
3947 : /*
3948 : * Reset the last-start time for this apply worker so that the launcher
3949 : * will restart it without waiting for wal_retrieve_retry_interval if the
3950 : * subscription is still active, and so that we won't leak that hash table
3951 : * entry if it isn't.
3952 : */
3953 74 : if (am_leader_apply_worker())
3954 74 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
3955 :
3956 74 : proc_exit(0);
3957 : }
3958 :
3959 : /*
3960 : * Reread subscription info if needed.
3961 : *
3962 : * For significant changes, we react by exiting the current process; a new
3963 : * one will be launched afterwards if needed.
3964 : */
3965 : void
3966 7416 : maybe_reread_subscription(void)
3967 : {
3968 : MemoryContext oldctx;
3969 : Subscription *newsub;
3970 7416 : bool started_tx = false;
3971 :
3972 : /* When cache state is valid there is nothing to do here. */
3973 7416 : if (MySubscriptionValid)
3974 7262 : return;
3975 :
3976 : /* This function might be called inside or outside of transaction. */
3977 154 : if (!IsTransactionState())
3978 : {
3979 144 : StartTransactionCommand();
3980 144 : started_tx = true;
3981 : }
3982 :
3983 : /* Ensure allocations in permanent context. */
3984 154 : oldctx = MemoryContextSwitchTo(ApplyContext);
3985 :
3986 154 : newsub = GetSubscription(MyLogicalRepWorker->subid, true);
3987 :
3988 : /*
3989 : * Exit if the subscription was removed. This normally should not happen
3990 : * as the worker gets killed during DROP SUBSCRIPTION.
3991 : */
3992 154 : if (!newsub)
3993 : {
3994 0 : ereport(LOG,
3995 : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
3996 : MySubscription->name)));
3997 :
3998 : /* Ensure we remove no-longer-useful entry for worker's start time */
3999 0 : if (am_leader_apply_worker())
4000 0 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
4001 :
4002 0 : proc_exit(0);
4003 : }
4004 :
4005 : /* Exit if the subscription was disabled. */
4006 154 : if (!newsub->enabled)
4007 : {
4008 18 : ereport(LOG,
4009 : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
4010 : MySubscription->name)));
4011 :
4012 18 : apply_worker_exit();
4013 : }
4014 :
4015 : /* !slotname should never happen when enabled is true. */
4016 : Assert(newsub->slotname);
4017 :
4018 : /* two-phase cannot be altered while the worker is running */
4019 : Assert(newsub->twophasestate == MySubscription->twophasestate);
4020 :
4021 : /*
4022 : * Exit if any parameter that affects the remote connection was changed.
4023 : * The launcher will start a new worker but note that the parallel apply
4024 : * worker won't restart if the streaming option's value is changed from
4025 : * 'parallel' to any other value or the server decides not to stream the
4026 : * in-progress transaction.
4027 : */
4028 136 : if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
4029 132 : strcmp(newsub->name, MySubscription->name) != 0 ||
4030 130 : strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
4031 130 : newsub->binary != MySubscription->binary ||
4032 118 : newsub->stream != MySubscription->stream ||
4033 108 : newsub->passwordrequired != MySubscription->passwordrequired ||
4034 108 : strcmp(newsub->origin, MySubscription->origin) != 0 ||
4035 108 : newsub->owner != MySubscription->owner ||
4036 106 : !equal(newsub->publications, MySubscription->publications))
4037 : {
4038 48 : if (am_parallel_apply_worker())
4039 0 : ereport(LOG,
4040 : (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
4041 : MySubscription->name)));
4042 : else
4043 48 : ereport(LOG,
4044 : (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
4045 : MySubscription->name)));
4046 :
4047 48 : apply_worker_exit();
4048 : }
4049 :
4050 : /*
4051 : * Exit if the subscription owner's superuser privileges have been
4052 : * revoked.
4053 : */
4054 88 : if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
4055 : {
4056 8 : if (am_parallel_apply_worker())
4057 0 : ereport(LOG,
4058 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
4059 : MySubscription->name));
4060 : else
4061 8 : ereport(LOG,
4062 : errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
4063 : MySubscription->name));
4064 :
4065 8 : apply_worker_exit();
4066 : }
4067 :
4068 : /* Check for other changes that should never happen too. */
4069 80 : if (newsub->dbid != MySubscription->dbid)
4070 : {
4071 0 : elog(ERROR, "subscription %u changed unexpectedly",
4072 : MyLogicalRepWorker->subid);
4073 : }
4074 :
4075 : /* Clean old subscription info and switch to new one. */
4076 80 : FreeSubscription(MySubscription);
4077 80 : MySubscription = newsub;
4078 :
4079 80 : MemoryContextSwitchTo(oldctx);
4080 :
4081 : /* Change synchronous commit according to the user's wishes */
4082 80 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
4083 : PGC_BACKEND, PGC_S_OVERRIDE);
4084 :
4085 80 : if (started_tx)
4086 74 : CommitTransactionCommand();
4087 :
4088 80 : MySubscriptionValid = true;
4089 : }
4090 :
4091 : /*
4092 : * Callback from subscription syscache invalidation.
4093 : */
4094 : static void
4095 164 : subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
4096 : {
4097 164 : MySubscriptionValid = false;
4098 164 : }
4099 :
4100 : /*
4101 : * subxact_info_write
4102 : * Store information about subxacts for a toplevel transaction.
4103 : *
4104 : * For each subxact we store offset of its first change in the main file.
4105 : * The file is always over-written as a whole.
4106 : *
4107 : * XXX We should only store subxacts that were not aborted yet.
4108 : */
4109 : static void
4110 744 : subxact_info_write(Oid subid, TransactionId xid)
4111 : {
4112 : char path[MAXPGPATH];
4113 : Size len;
4114 : BufFile *fd;
4115 :
4116 : Assert(TransactionIdIsValid(xid));
4117 :
4118 : /* construct the subxact filename */
4119 744 : subxact_filename(path, subid, xid);
4120 :
4121 : /* Delete the subxacts file, if exists. */
4122 744 : if (subxact_data.nsubxacts == 0)
4123 : {
4124 580 : cleanup_subxact_info();
4125 580 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
4126 :
4127 580 : return;
4128 : }
4129 :
4130 : /*
4131 : * Create the subxact file if it not already created, otherwise open the
4132 : * existing file.
4133 : */
4134 164 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR,
4135 : true);
4136 164 : if (fd == NULL)
4137 16 : fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path);
4138 :
4139 164 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
4140 :
4141 : /* Write the subxact count and subxact info */
4142 164 : BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
4143 164 : BufFileWrite(fd, subxact_data.subxacts, len);
4144 :
4145 164 : BufFileClose(fd);
4146 :
4147 : /* free the memory allocated for subxact info */
4148 164 : cleanup_subxact_info();
4149 : }
4150 :
4151 : /*
4152 : * subxact_info_read
4153 : * Restore information about subxacts of a streamed transaction.
4154 : *
4155 : * Read information about subxacts into the structure subxact_data that can be
4156 : * used later.
4157 : */
4158 : static void
4159 688 : subxact_info_read(Oid subid, TransactionId xid)
4160 : {
4161 : char path[MAXPGPATH];
4162 : Size len;
4163 : BufFile *fd;
4164 : MemoryContext oldctx;
4165 :
4166 : Assert(!subxact_data.subxacts);
4167 : Assert(subxact_data.nsubxacts == 0);
4168 : Assert(subxact_data.nsubxacts_max == 0);
4169 :
4170 : /*
4171 : * If the subxact file doesn't exist that means we don't have any subxact
4172 : * info.
4173 : */
4174 688 : subxact_filename(path, subid, xid);
4175 688 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
4176 : true);
4177 688 : if (fd == NULL)
4178 530 : return;
4179 :
4180 : /* read number of subxact items */
4181 158 : BufFileReadExact(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
4182 :
4183 158 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
4184 :
4185 : /* we keep the maximum as a power of 2 */
4186 158 : subxact_data.nsubxacts_max = 1 << my_log2(subxact_data.nsubxacts);
4187 :
4188 : /*
4189 : * Allocate subxact information in the logical streaming context. We need
4190 : * this information during the complete stream so that we can add the sub
4191 : * transaction info to this. On stream stop we will flush this information
4192 : * to the subxact file and reset the logical streaming context.
4193 : */
4194 158 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
4195 158 : subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
4196 : sizeof(SubXactInfo));
4197 158 : MemoryContextSwitchTo(oldctx);
4198 :
4199 158 : if (len > 0)
4200 158 : BufFileReadExact(fd, subxact_data.subxacts, len);
4201 :
4202 158 : BufFileClose(fd);
4203 : }
4204 :
4205 : /*
4206 : * subxact_info_add
4207 : * Add information about a subxact (offset in the main file).
4208 : */
4209 : static void
4210 205024 : subxact_info_add(TransactionId xid)
4211 : {
4212 205024 : SubXactInfo *subxacts = subxact_data.subxacts;
4213 : int64 i;
4214 :
4215 : /* We must have a valid top level stream xid and a stream fd. */
4216 : Assert(TransactionIdIsValid(stream_xid));
4217 : Assert(stream_fd != NULL);
4218 :
4219 : /*
4220 : * If the XID matches the toplevel transaction, we don't want to add it.
4221 : */
4222 205024 : if (stream_xid == xid)
4223 184776 : return;
4224 :
4225 : /*
4226 : * In most cases we're checking the same subxact as we've already seen in
4227 : * the last call, so make sure to ignore it (this change comes later).
4228 : */
4229 20248 : if (subxact_data.subxact_last == xid)
4230 20096 : return;
4231 :
4232 : /* OK, remember we're processing this XID. */
4233 152 : subxact_data.subxact_last = xid;
4234 :
4235 : /*
4236 : * Check if the transaction is already present in the array of subxact. We
4237 : * intentionally scan the array from the tail, because we're likely adding
4238 : * a change for the most recent subtransactions.
4239 : *
4240 : * XXX Can we rely on the subxact XIDs arriving in sorted order? That
4241 : * would allow us to use binary search here.
4242 : */
4243 190 : for (i = subxact_data.nsubxacts; i > 0; i--)
4244 : {
4245 : /* found, so we're done */
4246 152 : if (subxacts[i - 1].xid == xid)
4247 114 : return;
4248 : }
4249 :
4250 : /* This is a new subxact, so we need to add it to the array. */
4251 38 : if (subxact_data.nsubxacts == 0)
4252 : {
4253 : MemoryContext oldctx;
4254 :
4255 16 : subxact_data.nsubxacts_max = 128;
4256 :
4257 : /*
4258 : * Allocate this memory for subxacts in per-stream context, see
4259 : * subxact_info_read.
4260 : */
4261 16 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
4262 16 : subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
4263 16 : MemoryContextSwitchTo(oldctx);
4264 : }
4265 22 : else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
4266 : {
4267 20 : subxact_data.nsubxacts_max *= 2;
4268 20 : subxacts = repalloc(subxacts,
4269 20 : subxact_data.nsubxacts_max * sizeof(SubXactInfo));
4270 : }
4271 :
4272 38 : subxacts[subxact_data.nsubxacts].xid = xid;
4273 :
4274 : /*
4275 : * Get the current offset of the stream file and store it as offset of
4276 : * this subxact.
4277 : */
4278 38 : BufFileTell(stream_fd,
4279 38 : &subxacts[subxact_data.nsubxacts].fileno,
4280 38 : &subxacts[subxact_data.nsubxacts].offset);
4281 :
4282 38 : subxact_data.nsubxacts++;
4283 38 : subxact_data.subxacts = subxacts;
4284 : }
4285 :
4286 : /* format filename for file containing the info about subxacts */
4287 : static inline void
4288 1494 : subxact_filename(char *path, Oid subid, TransactionId xid)
4289 : {
4290 1494 : snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
4291 1494 : }
4292 :
4293 : /* format filename for file containing serialized changes */
4294 : static inline void
4295 876 : changes_filename(char *path, Oid subid, TransactionId xid)
4296 : {
4297 876 : snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
4298 876 : }
4299 :
4300 : /*
4301 : * stream_cleanup_files
4302 : * Cleanup files for a subscription / toplevel transaction.
4303 : *
4304 : * Remove files with serialized changes and subxact info for a particular
4305 : * toplevel transaction. Each subscription has a separate set of files
4306 : * for any toplevel transaction.
4307 : */
4308 : void
4309 62 : stream_cleanup_files(Oid subid, TransactionId xid)
4310 : {
4311 : char path[MAXPGPATH];
4312 :
4313 : /* Delete the changes file. */
4314 62 : changes_filename(path, subid, xid);
4315 62 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false);
4316 :
4317 : /* Delete the subxact file, if it exists. */
4318 62 : subxact_filename(path, subid, xid);
4319 62 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
4320 62 : }
4321 :
4322 : /*
4323 : * stream_open_file
4324 : * Open a file that we'll use to serialize changes for a toplevel
4325 : * transaction.
4326 : *
4327 : * Open a file for streamed changes from a toplevel transaction identified
4328 : * by stream_xid (global variable). If it's the first chunk of streamed
4329 : * changes for this transaction, create the buffile, otherwise open the
4330 : * previously created file.
4331 : */
4332 : static void
4333 726 : stream_open_file(Oid subid, TransactionId xid, bool first_segment)
4334 : {
4335 : char path[MAXPGPATH];
4336 : MemoryContext oldcxt;
4337 :
4338 : Assert(OidIsValid(subid));
4339 : Assert(TransactionIdIsValid(xid));
4340 : Assert(stream_fd == NULL);
4341 :
4342 :
4343 726 : changes_filename(path, subid, xid);
4344 726 : elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
4345 :
4346 : /*
4347 : * Create/open the buffiles under the logical streaming context so that we
4348 : * have those files until stream stop.
4349 : */
4350 726 : oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
4351 :
4352 : /*
4353 : * If this is the first streamed segment, create the changes file.
4354 : * Otherwise, just open the file for writing, in append mode.
4355 : */
4356 726 : if (first_segment)
4357 64 : stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
4358 : path);
4359 : else
4360 : {
4361 : /*
4362 : * Open the file and seek to the end of the file because we always
4363 : * append the changes file.
4364 : */
4365 662 : stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset,
4366 : path, O_RDWR, false);
4367 662 : BufFileSeek(stream_fd, 0, 0, SEEK_END);
4368 : }
4369 :
4370 726 : MemoryContextSwitchTo(oldcxt);
4371 726 : }
4372 :
4373 : /*
4374 : * stream_close_file
4375 : * Close the currently open file with streamed changes.
4376 : */
4377 : static void
4378 786 : stream_close_file(void)
4379 : {
4380 : Assert(stream_fd != NULL);
4381 :
4382 786 : BufFileClose(stream_fd);
4383 :
4384 786 : stream_fd = NULL;
4385 786 : }
4386 :
4387 : /*
4388 : * stream_write_change
4389 : * Serialize a change to a file for the current toplevel transaction.
4390 : *
4391 : * The change is serialized in a simple format, with length (not including
4392 : * the length), action code (identifying the message type) and message
4393 : * contents (without the subxact TransactionId value).
4394 : */
4395 : static void
4396 215106 : stream_write_change(char action, StringInfo s)
4397 : {
4398 : int len;
4399 :
4400 : Assert(stream_fd != NULL);
4401 :
4402 : /* total on-disk size, including the action type character */
4403 215106 : len = (s->len - s->cursor) + sizeof(char);
4404 :
4405 : /* first write the size */
4406 215106 : BufFileWrite(stream_fd, &len, sizeof(len));
4407 :
4408 : /* then the action */
4409 215106 : BufFileWrite(stream_fd, &action, sizeof(action));
4410 :
4411 : /* and finally the remaining part of the buffer (after the XID) */
4412 215106 : len = (s->len - s->cursor);
4413 :
4414 215106 : BufFileWrite(stream_fd, &s->data[s->cursor], len);
4415 215106 : }
4416 :
4417 : /*
4418 : * stream_open_and_write_change
4419 : * Serialize a message to a file for the given transaction.
4420 : *
4421 : * This function is similar to stream_write_change except that it will open the
4422 : * target file if not already before writing the message and close the file at
4423 : * the end.
4424 : */
4425 : static void
4426 10 : stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
4427 : {
4428 : Assert(!in_streamed_transaction);
4429 :
4430 10 : if (!stream_fd)
4431 10 : stream_start_internal(xid, false);
4432 :
4433 10 : stream_write_change(action, s);
4434 10 : stream_stop_internal(xid);
4435 10 : }
4436 :
4437 : /*
4438 : * Sets streaming options including replication slot name and origin start
4439 : * position. Workers need these options for logical replication.
4440 : */
4441 : void
4442 730 : set_stream_options(WalRcvStreamOptions *options,
4443 : char *slotname,
4444 : XLogRecPtr *origin_startpos)
4445 : {
4446 : int server_version;
4447 :
4448 730 : options->logical = true;
4449 730 : options->startpoint = *origin_startpos;
4450 730 : options->slotname = slotname;
4451 :
4452 730 : server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
4453 730 : options->proto.logical.proto_version =
4454 730 : server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
4455 : server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
4456 : server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
4457 : LOGICALREP_PROTO_VERSION_NUM;
4458 :
4459 730 : options->proto.logical.publication_names = MySubscription->publications;
4460 730 : options->proto.logical.binary = MySubscription->binary;
4461 :
4462 : /*
4463 : * Assign the appropriate option value for streaming option according to
4464 : * the 'streaming' mode and the publisher's ability to support that mode.
4465 : */
4466 730 : if (server_version >= 160000 &&
4467 730 : MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
4468 : {
4469 662 : options->proto.logical.streaming_str = "parallel";
4470 662 : MyLogicalRepWorker->parallel_apply = true;
4471 : }
4472 68 : else if (server_version >= 140000 &&
4473 68 : MySubscription->stream != LOGICALREP_STREAM_OFF)
4474 : {
4475 52 : options->proto.logical.streaming_str = "on";
4476 52 : MyLogicalRepWorker->parallel_apply = false;
4477 : }
4478 : else
4479 : {
4480 16 : options->proto.logical.streaming_str = NULL;
4481 16 : MyLogicalRepWorker->parallel_apply = false;
4482 : }
4483 :
4484 730 : options->proto.logical.twophase = false;
4485 730 : options->proto.logical.origin = pstrdup(MySubscription->origin);
4486 730 : }
4487 :
4488 : /*
4489 : * Cleanup the memory for subxacts and reset the related variables.
4490 : */
4491 : static inline void
4492 752 : cleanup_subxact_info()
4493 : {
4494 752 : if (subxact_data.subxacts)
4495 174 : pfree(subxact_data.subxacts);
4496 :
4497 752 : subxact_data.subxacts = NULL;
4498 752 : subxact_data.subxact_last = InvalidTransactionId;
4499 752 : subxact_data.nsubxacts = 0;
4500 752 : subxact_data.nsubxacts_max = 0;
4501 752 : }
4502 :
4503 : /*
4504 : * Common function to run the apply loop with error handling. Disable the
4505 : * subscription, if necessary.
4506 : *
4507 : * Note that we don't handle FATAL errors which are probably because
4508 : * of system resource error and are not repeatable.
4509 : */
4510 : void
4511 730 : start_apply(XLogRecPtr origin_startpos)
4512 : {
4513 730 : PG_TRY();
4514 : {
4515 730 : LogicalRepApplyLoop(origin_startpos);
4516 : }
4517 106 : PG_CATCH();
4518 : {
4519 106 : if (MySubscription->disableonerr)
4520 6 : DisableSubscriptionAndExit();
4521 : else
4522 : {
4523 : /*
4524 : * Report the worker failed while applying changes. Abort the
4525 : * current transaction so that the stats message is sent in an
4526 : * idle state.
4527 : */
4528 100 : AbortOutOfAnyTransaction();
4529 100 : pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
4530 :
4531 100 : PG_RE_THROW();
4532 : }
4533 : }
4534 0 : PG_END_TRY();
4535 0 : }
4536 :
4537 : /*
4538 : * Runs the leader apply worker.
4539 : *
4540 : * It sets up replication origin, streaming options and then starts streaming.
4541 : */
4542 : static void
4543 430 : run_apply_worker()
4544 : {
4545 : char originname[NAMEDATALEN];
4546 430 : XLogRecPtr origin_startpos = InvalidXLogRecPtr;
4547 430 : char *slotname = NULL;
4548 : WalRcvStreamOptions options;
4549 : RepOriginId originid;
4550 : TimeLineID startpointTLI;
4551 : char *err;
4552 : bool must_use_password;
4553 :
4554 430 : slotname = MySubscription->slotname;
4555 :
4556 : /*
4557 : * This shouldn't happen if the subscription is enabled, but guard against
4558 : * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
4559 : * slot is NULL.)
4560 : */
4561 430 : if (!slotname)
4562 0 : ereport(ERROR,
4563 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
4564 : errmsg("subscription has no replication slot set")));
4565 :
4566 : /* Setup replication origin tracking. */
4567 430 : ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
4568 : originname, sizeof(originname));
4569 430 : StartTransactionCommand();
4570 430 : originid = replorigin_by_name(originname, true);
4571 430 : if (!OidIsValid(originid))
4572 0 : originid = replorigin_create(originname);
4573 430 : replorigin_session_setup(originid, 0);
4574 430 : replorigin_session_origin = originid;
4575 430 : origin_startpos = replorigin_session_get_progress(false);
4576 430 : CommitTransactionCommand();
4577 :
4578 : /* Is the use of a password mandatory? */
4579 822 : must_use_password = MySubscription->passwordrequired &&
4580 392 : !MySubscription->ownersuperuser;
4581 :
4582 430 : LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
4583 : true, must_use_password,
4584 : MySubscription->name, &err);
4585 :
4586 408 : if (LogRepWorkerWalRcvConn == NULL)
4587 40 : ereport(ERROR,
4588 : (errcode(ERRCODE_CONNECTION_FAILURE),
4589 : errmsg("apply worker for subscription \"%s\" could not connect to the publisher: %s",
4590 : MySubscription->name, err)));
4591 :
4592 : /*
4593 : * We don't really use the output identify_system for anything but it does
4594 : * some initializations on the upstream so let's still call it.
4595 : */
4596 368 : (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
4597 :
4598 368 : set_apply_error_context_origin(originname);
4599 :
4600 368 : set_stream_options(&options, slotname, &origin_startpos);
4601 :
4602 : /*
4603 : * Even when the two_phase mode is requested by the user, it remains as
4604 : * the tri-state PENDING until all tablesyncs have reached READY state.
4605 : * Only then, can it become ENABLED.
4606 : *
4607 : * Note: If the subscription has no tables then leave the state as
4608 : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
4609 : * work.
4610 : */
4611 400 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
4612 32 : AllTablesyncsReady())
4613 : {
4614 : /* Start streaming with two_phase enabled */
4615 18 : options.proto.logical.twophase = true;
4616 18 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
4617 :
4618 18 : StartTransactionCommand();
4619 18 : UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
4620 18 : MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
4621 18 : CommitTransactionCommand();
4622 : }
4623 : else
4624 : {
4625 350 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
4626 : }
4627 :
4628 368 : ereport(DEBUG1,
4629 : (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
4630 : MySubscription->name,
4631 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
4632 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
4633 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
4634 : "?")));
4635 :
4636 : /* Run the main loop. */
4637 368 : start_apply(origin_startpos);
4638 0 : }
4639 :
4640 : /*
4641 : * Common initialization for leader apply worker, parallel apply worker and
4642 : * tablesync worker.
4643 : *
4644 : * Initialize the database connection, in-memory subscription and necessary
4645 : * config options.
4646 : */
4647 : void
4648 838 : InitializeLogRepWorker(void)
4649 : {
4650 : MemoryContext oldctx;
4651 :
4652 : /* Run as replica session replication role. */
4653 838 : SetConfigOption("session_replication_role", "replica",
4654 : PGC_SUSET, PGC_S_OVERRIDE);
4655 :
4656 : /* Connect to our database. */
4657 838 : BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
4658 838 : MyLogicalRepWorker->userid,
4659 : 0);
4660 :
4661 : /*
4662 : * Set always-secure search path, so malicious users can't redirect user
4663 : * code (e.g. pg_index.indexprs).
4664 : */
4665 832 : SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
4666 :
4667 : /* Load the subscription into persistent memory context. */
4668 832 : ApplyContext = AllocSetContextCreate(TopMemoryContext,
4669 : "ApplyContext",
4670 : ALLOCSET_DEFAULT_SIZES);
4671 832 : StartTransactionCommand();
4672 832 : oldctx = MemoryContextSwitchTo(ApplyContext);
4673 :
4674 832 : MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
4675 832 : if (!MySubscription)
4676 : {
4677 0 : ereport(LOG,
4678 : (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
4679 : MyLogicalRepWorker->subid)));
4680 :
4681 : /* Ensure we remove no-longer-useful entry for worker's start time */
4682 0 : if (am_leader_apply_worker())
4683 0 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
4684 :
4685 0 : proc_exit(0);
4686 : }
4687 :
4688 832 : MySubscriptionValid = true;
4689 832 : MemoryContextSwitchTo(oldctx);
4690 :
4691 832 : if (!MySubscription->enabled)
4692 : {
4693 0 : ereport(LOG,
4694 : (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
4695 : MySubscription->name)));
4696 :
4697 0 : apply_worker_exit();
4698 : }
4699 :
4700 : /* Setup synchronous commit according to the user's wishes */
4701 832 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
4702 : PGC_BACKEND, PGC_S_OVERRIDE);
4703 :
4704 : /*
4705 : * Keep us informed about subscription or role changes. Note that the
4706 : * role's superuser privilege can be revoked.
4707 : */
4708 832 : CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
4709 : subscription_change_cb,
4710 : (Datum) 0);
4711 :
4712 832 : CacheRegisterSyscacheCallback(AUTHOID,
4713 : subscription_change_cb,
4714 : (Datum) 0);
4715 :
4716 832 : if (am_tablesync_worker())
4717 382 : ereport(LOG,
4718 : (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
4719 : MySubscription->name,
4720 : get_rel_name(MyLogicalRepWorker->relid))));
4721 : else
4722 450 : ereport(LOG,
4723 : (errmsg("logical replication apply worker for subscription \"%s\" has started",
4724 : MySubscription->name)));
4725 :
4726 832 : CommitTransactionCommand();
4727 832 : }
4728 :
4729 : /*
4730 : * Reset the origin state.
4731 : */
4732 : static void
4733 874 : replorigin_reset(int code, Datum arg)
4734 : {
4735 874 : replorigin_session_origin = InvalidRepOriginId;
4736 874 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
4737 874 : replorigin_session_origin_timestamp = 0;
4738 874 : }
4739 :
4740 : /* Common function to setup the leader apply or tablesync worker. */
4741 : void
4742 818 : SetupApplyOrSyncWorker(int worker_slot)
4743 : {
4744 : /* Attach to slot */
4745 818 : logicalrep_worker_attach(worker_slot);
4746 :
4747 : Assert(am_tablesync_worker() || am_leader_apply_worker());
4748 :
4749 : /* Setup signal handling */
4750 818 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
4751 818 : pqsignal(SIGTERM, die);
4752 818 : BackgroundWorkerUnblockSignals();
4753 :
4754 : /*
4755 : * We don't currently need any ResourceOwner in a walreceiver process, but
4756 : * if we did, we could call CreateAuxProcessResourceOwner here.
4757 : */
4758 :
4759 : /* Initialise stats to a sanish value */
4760 818 : MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
4761 818 : MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
4762 :
4763 : /* Load the libpq-specific functions */
4764 818 : load_file("libpqwalreceiver", false);
4765 :
4766 818 : InitializeLogRepWorker();
4767 :
4768 : /*
4769 : * Register a callback to reset the origin state before aborting any
4770 : * pending transaction during shutdown (see ShutdownPostgres()). This will
4771 : * avoid origin advancement for an in-complete transaction which could
4772 : * otherwise lead to its loss as such a transaction won't be sent by the
4773 : * server again.
4774 : *
4775 : * Note that even a LOG or DEBUG statement placed after setting the origin
4776 : * state may process a shutdown signal before committing the current apply
4777 : * operation. So, it is important to register such a callback here.
4778 : */
4779 812 : before_shmem_exit(replorigin_reset, (Datum) 0);
4780 :
4781 : /* Connect to the origin and start the replication. */
4782 812 : elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
4783 : MySubscription->conninfo);
4784 :
4785 : /*
4786 : * Setup callback for syscache so that we know when something changes in
4787 : * the subscription relation state.
4788 : */
4789 812 : CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
4790 : invalidate_syncing_table_states,
4791 : (Datum) 0);
4792 812 : }
4793 :
4794 : /* Logical Replication Apply worker entry point */
4795 : void
4796 436 : ApplyWorkerMain(Datum main_arg)
4797 : {
4798 436 : int worker_slot = DatumGetInt32(main_arg);
4799 :
4800 436 : InitializingApplyWorker = true;
4801 :
4802 436 : SetupApplyOrSyncWorker(worker_slot);
4803 :
4804 430 : InitializingApplyWorker = false;
4805 :
4806 430 : run_apply_worker();
4807 :
4808 0 : proc_exit(0);
4809 : }
4810 :
4811 : /*
4812 : * After error recovery, disable the subscription in a new transaction
4813 : * and exit cleanly.
4814 : */
4815 : void
4816 8 : DisableSubscriptionAndExit(void)
4817 : {
4818 : /*
4819 : * Emit the error message, and recover from the error state to an idle
4820 : * state
4821 : */
4822 8 : HOLD_INTERRUPTS();
4823 :
4824 8 : EmitErrorReport();
4825 8 : AbortOutOfAnyTransaction();
4826 8 : FlushErrorState();
4827 :
4828 8 : RESUME_INTERRUPTS();
4829 :
4830 : /* Report the worker failed during either table synchronization or apply */
4831 8 : pgstat_report_subscription_error(MyLogicalRepWorker->subid,
4832 8 : !am_tablesync_worker());
4833 :
4834 : /* Disable the subscription */
4835 8 : StartTransactionCommand();
4836 8 : DisableSubscription(MySubscription->oid);
4837 8 : CommitTransactionCommand();
4838 :
4839 : /* Ensure we remove no-longer-useful entry for worker's start time */
4840 8 : if (am_leader_apply_worker())
4841 6 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
4842 :
4843 : /* Notify the subscription has been disabled and exit */
4844 8 : ereport(LOG,
4845 : errmsg("subscription \"%s\" has been disabled because of an error",
4846 : MySubscription->name));
4847 :
4848 8 : proc_exit(0);
4849 : }
4850 :
4851 : /*
4852 : * Is current process a logical replication worker?
4853 : */
4854 : bool
4855 3950 : IsLogicalWorker(void)
4856 : {
4857 3950 : return MyLogicalRepWorker != NULL;
4858 : }
4859 :
4860 : /*
4861 : * Is current process a logical replication parallel apply worker?
4862 : */
4863 : bool
4864 2754 : IsLogicalParallelApplyWorker(void)
4865 : {
4866 2754 : return IsLogicalWorker() && am_parallel_apply_worker();
4867 : }
4868 :
4869 : /*
4870 : * Start skipping changes of the transaction if the given LSN matches the
4871 : * LSN specified by subscription's skiplsn.
4872 : */
4873 : static void
4874 990 : maybe_start_skipping_changes(XLogRecPtr finish_lsn)
4875 : {
4876 : Assert(!is_skipping_changes());
4877 : Assert(!in_remote_transaction);
4878 : Assert(!in_streamed_transaction);
4879 :
4880 : /*
4881 : * Quick return if it's not requested to skip this transaction. This
4882 : * function is called for every remote transaction and we assume that
4883 : * skipping the transaction is not used often.
4884 : */
4885 990 : if (likely(XLogRecPtrIsInvalid(MySubscription->skiplsn) ||
4886 : MySubscription->skiplsn != finish_lsn))
4887 984 : return;
4888 :
4889 : /* Start skipping all changes of this transaction */
4890 6 : skip_xact_finish_lsn = finish_lsn;
4891 :
4892 6 : ereport(LOG,
4893 : errmsg("logical replication starts skipping transaction at LSN %X/%X",
4894 : LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
4895 : }
4896 :
4897 : /*
4898 : * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
4899 : */
4900 : static void
4901 54 : stop_skipping_changes(void)
4902 : {
4903 54 : if (!is_skipping_changes())
4904 48 : return;
4905 :
4906 6 : ereport(LOG,
4907 : (errmsg("logical replication completed skipping transaction at LSN %X/%X",
4908 : LSN_FORMAT_ARGS(skip_xact_finish_lsn))));
4909 :
4910 : /* Stop skipping changes */
4911 6 : skip_xact_finish_lsn = InvalidXLogRecPtr;
4912 : }
4913 :
4914 : /*
4915 : * Clear subskiplsn of pg_subscription catalog.
4916 : *
4917 : * finish_lsn is the transaction's finish LSN that is used to check if the
4918 : * subskiplsn matches it. If not matched, we raise a warning when clearing the
4919 : * subskiplsn in order to inform users for cases e.g., where the user mistakenly
4920 : * specified the wrong subskiplsn.
4921 : */
4922 : static void
4923 1028 : clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
4924 : {
4925 : Relation rel;
4926 : Form_pg_subscription subform;
4927 : HeapTuple tup;
4928 1028 : XLogRecPtr myskiplsn = MySubscription->skiplsn;
4929 1028 : bool started_tx = false;
4930 :
4931 1028 : if (likely(XLogRecPtrIsInvalid(myskiplsn)) || am_parallel_apply_worker())
4932 1022 : return;
4933 :
4934 6 : if (!IsTransactionState())
4935 : {
4936 2 : StartTransactionCommand();
4937 2 : started_tx = true;
4938 : }
4939 :
4940 : /*
4941 : * Protect subskiplsn of pg_subscription from being concurrently updated
4942 : * while clearing it.
4943 : */
4944 6 : LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
4945 : AccessShareLock);
4946 :
4947 6 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
4948 :
4949 : /* Fetch the existing tuple. */
4950 6 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
4951 : ObjectIdGetDatum(MySubscription->oid));
4952 :
4953 6 : if (!HeapTupleIsValid(tup))
4954 0 : elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
4955 :
4956 6 : subform = (Form_pg_subscription) GETSTRUCT(tup);
4957 :
4958 : /*
4959 : * Clear the subskiplsn. If the user has already changed subskiplsn before
4960 : * clearing it we don't update the catalog and the replication origin
4961 : * state won't get advanced. So in the worst case, if the server crashes
4962 : * before sending an acknowledgment of the flush position the transaction
4963 : * will be sent again and the user needs to set subskiplsn again. We can
4964 : * reduce the possibility by logging a replication origin WAL record to
4965 : * advance the origin LSN instead but there is no way to advance the
4966 : * origin timestamp and it doesn't seem to be worth doing anything about
4967 : * it since it's a very rare case.
4968 : */
4969 6 : if (subform->subskiplsn == myskiplsn)
4970 : {
4971 : bool nulls[Natts_pg_subscription];
4972 : bool replaces[Natts_pg_subscription];
4973 : Datum values[Natts_pg_subscription];
4974 :
4975 6 : memset(values, 0, sizeof(values));
4976 6 : memset(nulls, false, sizeof(nulls));
4977 6 : memset(replaces, false, sizeof(replaces));
4978 :
4979 : /* reset subskiplsn */
4980 6 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
4981 6 : replaces[Anum_pg_subscription_subskiplsn - 1] = true;
4982 :
4983 6 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
4984 : replaces);
4985 6 : CatalogTupleUpdate(rel, &tup->t_self, tup);
4986 :
4987 6 : if (myskiplsn != finish_lsn)
4988 0 : ereport(WARNING,
4989 : errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
4990 : errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.",
4991 : LSN_FORMAT_ARGS(finish_lsn),
4992 : LSN_FORMAT_ARGS(myskiplsn)));
4993 : }
4994 :
4995 6 : heap_freetuple(tup);
4996 6 : table_close(rel, NoLock);
4997 :
4998 6 : if (started_tx)
4999 2 : CommitTransactionCommand();
5000 : }
5001 :
5002 : /* Error callback to give more context info about the change being applied */
5003 : void
5004 1408 : apply_error_callback(void *arg)
5005 : {
5006 1408 : ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
5007 : int elevel;
5008 :
5009 1408 : if (apply_error_callback_arg.command == 0)
5010 684 : return;
5011 :
5012 : Assert(errarg->origin_name);
5013 :
5014 724 : elevel = geterrlevel();
5015 :
5016 : /*
5017 : * Reset the origin state to prevent the advancement of origin progress if
5018 : * we fail to apply. Otherwise, this will result in transaction loss as
5019 : * that transaction won't be sent again by the server.
5020 : */
5021 724 : if (elevel >= ERROR)
5022 62 : replorigin_reset(0, (Datum) 0);
5023 :
5024 724 : if (errarg->rel == NULL)
5025 : {
5026 640 : if (!TransactionIdIsValid(errarg->remote_xid))
5027 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
5028 : errarg->origin_name,
5029 : logicalrep_message_type(errarg->command));
5030 640 : else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5031 532 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
5032 : errarg->origin_name,
5033 : logicalrep_message_type(errarg->command),
5034 : errarg->remote_xid);
5035 : else
5036 216 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
5037 : errarg->origin_name,
5038 : logicalrep_message_type(errarg->command),
5039 : errarg->remote_xid,
5040 108 : LSN_FORMAT_ARGS(errarg->finish_lsn));
5041 : }
5042 : else
5043 : {
5044 84 : if (errarg->remote_attnum < 0)
5045 : {
5046 84 : if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5047 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
5048 : errarg->origin_name,
5049 : logicalrep_message_type(errarg->command),
5050 0 : errarg->rel->remoterel.nspname,
5051 0 : errarg->rel->remoterel.relname,
5052 : errarg->remote_xid);
5053 : else
5054 168 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
5055 : errarg->origin_name,
5056 : logicalrep_message_type(errarg->command),
5057 84 : errarg->rel->remoterel.nspname,
5058 84 : errarg->rel->remoterel.relname,
5059 : errarg->remote_xid,
5060 84 : LSN_FORMAT_ARGS(errarg->finish_lsn));
5061 : }
5062 : else
5063 : {
5064 0 : if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5065 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
5066 : errarg->origin_name,
5067 : logicalrep_message_type(errarg->command),
5068 0 : errarg->rel->remoterel.nspname,
5069 0 : errarg->rel->remoterel.relname,
5070 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
5071 : errarg->remote_xid);
5072 : else
5073 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
5074 : errarg->origin_name,
5075 : logicalrep_message_type(errarg->command),
5076 0 : errarg->rel->remoterel.nspname,
5077 0 : errarg->rel->remoterel.relname,
5078 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
5079 : errarg->remote_xid,
5080 0 : LSN_FORMAT_ARGS(errarg->finish_lsn));
5081 : }
5082 : }
5083 : }
5084 :
5085 : /* Set transaction information of apply error callback */
5086 : static inline void
5087 5768 : set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
5088 : {
5089 5768 : apply_error_callback_arg.remote_xid = xid;
5090 5768 : apply_error_callback_arg.finish_lsn = lsn;
5091 5768 : }
5092 :
5093 : /* Reset all information of apply error callback */
5094 : static inline void
5095 2850 : reset_apply_error_context_info(void)
5096 : {
5097 2850 : apply_error_callback_arg.command = 0;
5098 2850 : apply_error_callback_arg.rel = NULL;
5099 2850 : apply_error_callback_arg.remote_attnum = -1;
5100 2850 : set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
5101 2850 : }
5102 :
5103 : /*
5104 : * Request wakeup of the workers for the given subscription OID
5105 : * at commit of the current transaction.
5106 : *
5107 : * This is used to ensure that the workers process assorted changes
5108 : * as soon as possible.
5109 : */
5110 : void
5111 386 : LogicalRepWorkersWakeupAtCommit(Oid subid)
5112 : {
5113 : MemoryContext oldcxt;
5114 :
5115 386 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
5116 386 : on_commit_wakeup_workers_subids =
5117 386 : list_append_unique_oid(on_commit_wakeup_workers_subids, subid);
5118 386 : MemoryContextSwitchTo(oldcxt);
5119 386 : }
5120 :
5121 : /*
5122 : * Wake up the workers of any subscriptions that were changed in this xact.
5123 : */
5124 : void
5125 852692 : AtEOXact_LogicalRepWorkers(bool isCommit)
5126 : {
5127 852692 : if (isCommit && on_commit_wakeup_workers_subids != NIL)
5128 : {
5129 : ListCell *lc;
5130 :
5131 376 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
5132 752 : foreach(lc, on_commit_wakeup_workers_subids)
5133 : {
5134 376 : Oid subid = lfirst_oid(lc);
5135 : List *workers;
5136 : ListCell *lc2;
5137 :
5138 376 : workers = logicalrep_workers_find(subid, true, false);
5139 486 : foreach(lc2, workers)
5140 : {
5141 110 : LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
5142 :
5143 110 : logicalrep_worker_wakeup_ptr(worker);
5144 : }
5145 : }
5146 376 : LWLockRelease(LogicalRepWorkerLock);
5147 : }
5148 :
5149 : /* The List storage will be reclaimed automatically in xact cleanup. */
5150 852692 : on_commit_wakeup_workers_subids = NIL;
5151 852692 : }
5152 :
5153 : /*
5154 : * Allocate the origin name in long-lived context for error context message.
5155 : */
5156 : void
5157 750 : set_apply_error_context_origin(char *originname)
5158 : {
5159 750 : apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
5160 : originname);
5161 750 : }
5162 :
5163 : /*
5164 : * Return the action to be taken for the given transaction. See
5165 : * TransApplyAction for information on each of the actions.
5166 : *
5167 : * *winfo is assigned to the destination parallel worker info when the leader
5168 : * apply worker has to pass all the transaction's changes to the parallel
5169 : * apply worker.
5170 : */
5171 : static TransApplyAction
5172 653400 : get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
5173 : {
5174 653400 : *winfo = NULL;
5175 :
5176 653400 : if (am_parallel_apply_worker())
5177 : {
5178 138844 : return TRANS_PARALLEL_APPLY;
5179 : }
5180 :
5181 : /*
5182 : * If we are processing this transaction using a parallel apply worker
5183 : * then either we send the changes to the parallel worker or if the worker
5184 : * is busy then serialize the changes to the file which will later be
5185 : * processed by the parallel worker.
5186 : */
5187 514556 : *winfo = pa_find_worker(xid);
5188 :
5189 514556 : if (*winfo && (*winfo)->serialize_changes)
5190 : {
5191 10074 : return TRANS_LEADER_PARTIAL_SERIALIZE;
5192 : }
5193 504482 : else if (*winfo)
5194 : {
5195 137824 : return TRANS_LEADER_SEND_TO_PARALLEL;
5196 : }
5197 :
5198 : /*
5199 : * If there is no parallel worker involved to process this transaction
5200 : * then we either directly apply the change or serialize it to a file
5201 : * which will later be applied when the transaction finish message is
5202 : * processed.
5203 : */
5204 366658 : else if (in_streamed_transaction)
5205 : {
5206 206396 : return TRANS_LEADER_SERIALIZE;
5207 : }
5208 : else
5209 : {
5210 160262 : return TRANS_LEADER_APPLY;
5211 : }
5212 : }
|