Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * worker.c
3 : * PostgreSQL logical replication worker (apply)
4 : *
5 : * Copyright (c) 2016-2025, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/worker.c
9 : *
10 : * NOTES
11 : * This file contains the worker which applies logical changes as they come
12 : * from remote logical replication stream.
13 : *
14 : * The main worker (apply) is started by logical replication worker
15 : * launcher for every enabled subscription in a database. It uses
16 : * walsender protocol to communicate with publisher.
17 : *
18 : * This module includes server facing code and shares libpqwalreceiver
19 : * module with walreceiver for providing the libpq specific functionality.
20 : *
21 : *
22 : * STREAMED TRANSACTIONS
23 : * ---------------------
24 : * Streamed transactions (large transactions exceeding a memory limit on the
25 : * upstream) are applied using one of two approaches:
26 : *
27 : * 1) Write to temporary files and apply when the final commit arrives
28 : *
29 : * This approach is used when the user has set the subscription's streaming
30 : * option as on.
31 : *
32 : * Unlike the regular (non-streamed) case, handling streamed transactions has
33 : * to handle aborts of both the toplevel transaction and subtransactions. This
34 : * is achieved by tracking offsets for subtransactions, which is then used
35 : * to truncate the file with serialized changes.
36 : *
37 : * The files are placed in tmp file directory by default, and the filenames
38 : * include both the XID of the toplevel transaction and OID of the
39 : * subscription. This is necessary so that different workers processing a
40 : * remote transaction with the same XID doesn't interfere.
41 : *
42 : * We use BufFiles instead of using normal temporary files because (a) the
43 : * BufFile infrastructure supports temporary files that exceed the OS file size
44 : * limit, (b) provides a way for automatic clean up on the error and (c) provides
45 : * a way to survive these files across local transactions and allow to open and
46 : * close at stream start and close. We decided to use FileSet
47 : * infrastructure as without that it deletes the files on the closure of the
48 : * file and if we decide to keep stream files open across the start/stop stream
49 : * then it will consume a lot of memory (more than 8K for each BufFile and
50 : * there could be multiple such BufFiles as the subscriber could receive
51 : * multiple start/stop streams for different transactions before getting the
52 : * commit). Moreover, if we don't use FileSet then we also need to invent
53 : * a new way to pass filenames to BufFile APIs so that we are allowed to open
54 : * the file we desired across multiple stream-open calls for the same
55 : * transaction.
56 : *
57 : * 2) Parallel apply workers.
58 : *
59 : * This approach is used when the user has set the subscription's streaming
60 : * option as parallel. See logical/applyparallelworker.c for information about
61 : * this approach.
62 : *
63 : * TWO_PHASE TRANSACTIONS
64 : * ----------------------
65 : * Two phase transactions are replayed at prepare and then committed or
66 : * rolled back at commit prepared and rollback prepared respectively. It is
67 : * possible to have a prepared transaction that arrives at the apply worker
68 : * when the tablesync is busy doing the initial copy. In this case, the apply
69 : * worker skips all the prepared operations [e.g. inserts] while the tablesync
70 : * is still busy (see the condition of should_apply_changes_for_rel). The
71 : * tablesync worker might not get such a prepared transaction because say it
72 : * was prior to the initial consistent point but might have got some later
73 : * commits. Now, the tablesync worker will exit without doing anything for the
74 : * prepared transaction skipped by the apply worker as the sync location for it
75 : * will be already ahead of the apply worker's current location. This would lead
76 : * to an "empty prepare", because later when the apply worker does the commit
77 : * prepare, there is nothing in it (the inserts were skipped earlier).
78 : *
79 : * To avoid this, and similar prepare confusions the subscription's two_phase
80 : * commit is enabled only after the initial sync is over. The two_phase option
81 : * has been implemented as a tri-state with values DISABLED, PENDING, and
82 : * ENABLED.
83 : *
84 : * Even if the user specifies they want a subscription with two_phase = on,
85 : * internally it will start with a tri-state of PENDING which only becomes
86 : * ENABLED after all tablesync initializations are completed - i.e. when all
87 : * tablesync workers have reached their READY state. In other words, the value
88 : * PENDING is only a temporary state for subscription start-up.
89 : *
90 : * Until the two_phase is properly available (ENABLED) the subscription will
91 : * behave as if two_phase = off. When the apply worker detects that all
92 : * tablesyncs have become READY (while the tri-state was PENDING) it will
93 : * restart the apply worker process. This happens in
94 : * process_syncing_tables_for_apply.
95 : *
96 : * When the (re-started) apply worker finds that all tablesyncs are READY for a
97 : * two_phase tri-state of PENDING it start streaming messages with the
98 : * two_phase option which in turn enables the decoding of two-phase commits at
99 : * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
100 : * Now, it is possible that during the time we have not enabled two_phase, the
101 : * publisher (replication server) would have skipped some prepares but we
102 : * ensure that such prepares are sent along with commit prepare, see
103 : * ReorderBufferFinishPrepared.
104 : *
105 : * If the subscription has no tables then a two_phase tri-state PENDING is
106 : * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
107 : * PUBLICATION which might otherwise be disallowed (see below).
108 : *
109 : * If ever a user needs to be aware of the tri-state value, they can fetch it
110 : * from the pg_subscription catalog (see column subtwophasestate).
111 : *
112 : * Finally, to avoid problems mentioned in previous paragraphs from any
113 : * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
114 : * to 'off' and then again back to 'on') there is a restriction for
115 : * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
116 : * the two_phase tri-state is ENABLED, except when copy_data = false.
117 : *
118 : * We can get prepare of the same GID more than once for the genuine cases
119 : * where we have defined multiple subscriptions for publications on the same
120 : * server and prepared transaction has operations on tables subscribed to those
121 : * subscriptions. For such cases, if we use the GID sent by publisher one of
122 : * the prepares will be successful and others will fail, in which case the
123 : * server will send them again. Now, this can lead to a deadlock if user has
124 : * set synchronous_standby_names for all the subscriptions on subscriber. To
125 : * avoid such deadlocks, we generate a unique GID (consisting of the
126 : * subscription oid and the xid of the prepared transaction) for each prepare
127 : * transaction on the subscriber.
128 : *
129 : * FAILOVER
130 : * ----------------------
131 : * The logical slot on the primary can be synced to the standby by specifying
132 : * failover = true when creating the subscription. Enabling failover allows us
133 : * to smoothly transition to the promoted standby, ensuring that we can
134 : * subscribe to the new primary without losing any data.
135 : *-------------------------------------------------------------------------
136 : */
137 :
138 : #include "postgres.h"
139 :
140 : #include <sys/stat.h>
141 : #include <unistd.h>
142 :
143 : #include "access/table.h"
144 : #include "access/tableam.h"
145 : #include "access/twophase.h"
146 : #include "access/xact.h"
147 : #include "catalog/indexing.h"
148 : #include "catalog/pg_inherits.h"
149 : #include "catalog/pg_subscription.h"
150 : #include "catalog/pg_subscription_rel.h"
151 : #include "commands/tablecmds.h"
152 : #include "commands/trigger.h"
153 : #include "executor/executor.h"
154 : #include "executor/execPartition.h"
155 : #include "libpq/pqformat.h"
156 : #include "miscadmin.h"
157 : #include "optimizer/optimizer.h"
158 : #include "parser/parse_relation.h"
159 : #include "pgstat.h"
160 : #include "postmaster/bgworker.h"
161 : #include "postmaster/interrupt.h"
162 : #include "postmaster/walwriter.h"
163 : #include "replication/conflict.h"
164 : #include "replication/logicallauncher.h"
165 : #include "replication/logicalproto.h"
166 : #include "replication/logicalrelation.h"
167 : #include "replication/logicalworker.h"
168 : #include "replication/origin.h"
169 : #include "replication/walreceiver.h"
170 : #include "replication/worker_internal.h"
171 : #include "rewrite/rewriteHandler.h"
172 : #include "storage/buffile.h"
173 : #include "storage/ipc.h"
174 : #include "storage/lmgr.h"
175 : #include "tcop/tcopprot.h"
176 : #include "utils/acl.h"
177 : #include "utils/dynahash.h"
178 : #include "utils/guc.h"
179 : #include "utils/inval.h"
180 : #include "utils/lsyscache.h"
181 : #include "utils/memutils.h"
182 : #include "utils/pg_lsn.h"
183 : #include "utils/rel.h"
184 : #include "utils/rls.h"
185 : #include "utils/snapmgr.h"
186 : #include "utils/syscache.h"
187 : #include "utils/usercontext.h"
188 :
189 : #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
190 :
191 : typedef struct FlushPosition
192 : {
193 : dlist_node node;
194 : XLogRecPtr local_end;
195 : XLogRecPtr remote_end;
196 : } FlushPosition;
197 :
198 : static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
199 :
200 : typedef struct ApplyExecutionData
201 : {
202 : EState *estate; /* executor state, used to track resources */
203 :
204 : LogicalRepRelMapEntry *targetRel; /* replication target rel */
205 : ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
206 :
207 : /* These fields are used when the target relation is partitioned: */
208 : ModifyTableState *mtstate; /* dummy ModifyTable state */
209 : PartitionTupleRouting *proute; /* partition routing info */
210 : } ApplyExecutionData;
211 :
212 : /* Struct for saving and restoring apply errcontext information */
213 : typedef struct ApplyErrorCallbackArg
214 : {
215 : LogicalRepMsgType command; /* 0 if invalid */
216 : LogicalRepRelMapEntry *rel;
217 :
218 : /* Remote node information */
219 : int remote_attnum; /* -1 if invalid */
220 : TransactionId remote_xid;
221 : XLogRecPtr finish_lsn;
222 : char *origin_name;
223 : } ApplyErrorCallbackArg;
224 :
225 : /*
226 : * The action to be taken for the changes in the transaction.
227 : *
228 : * TRANS_LEADER_APPLY:
229 : * This action means that we are in the leader apply worker or table sync
230 : * worker. The changes of the transaction are either directly applied or
231 : * are read from temporary files (for streaming transactions) and then
232 : * applied by the worker.
233 : *
234 : * TRANS_LEADER_SERIALIZE:
235 : * This action means that we are in the leader apply worker or table sync
236 : * worker. Changes are written to temporary files and then applied when the
237 : * final commit arrives.
238 : *
239 : * TRANS_LEADER_SEND_TO_PARALLEL:
240 : * This action means that we are in the leader apply worker and need to send
241 : * the changes to the parallel apply worker.
242 : *
243 : * TRANS_LEADER_PARTIAL_SERIALIZE:
244 : * This action means that we are in the leader apply worker and have sent some
245 : * changes directly to the parallel apply worker and the remaining changes are
246 : * serialized to a file, due to timeout while sending data. The parallel apply
247 : * worker will apply these serialized changes when the final commit arrives.
248 : *
249 : * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
250 : * serializing changes, the leader worker also needs to serialize the
251 : * STREAM_XXX message to a file, and wait for the parallel apply worker to
252 : * finish the transaction when processing the transaction finish command. So
253 : * this new action was introduced to keep the code and logic clear.
254 : *
255 : * TRANS_PARALLEL_APPLY:
256 : * This action means that we are in the parallel apply worker and changes of
257 : * the transaction are applied directly by the worker.
258 : */
259 : typedef enum
260 : {
261 : /* The action for non-streaming transactions. */
262 : TRANS_LEADER_APPLY,
263 :
264 : /* Actions for streaming transactions. */
265 : TRANS_LEADER_SERIALIZE,
266 : TRANS_LEADER_SEND_TO_PARALLEL,
267 : TRANS_LEADER_PARTIAL_SERIALIZE,
268 : TRANS_PARALLEL_APPLY,
269 : } TransApplyAction;
270 :
271 : /* errcontext tracker */
272 : static ApplyErrorCallbackArg apply_error_callback_arg =
273 : {
274 : .command = 0,
275 : .rel = NULL,
276 : .remote_attnum = -1,
277 : .remote_xid = InvalidTransactionId,
278 : .finish_lsn = InvalidXLogRecPtr,
279 : .origin_name = NULL,
280 : };
281 :
282 : ErrorContextCallback *apply_error_context_stack = NULL;
283 :
284 : MemoryContext ApplyMessageContext = NULL;
285 : MemoryContext ApplyContext = NULL;
286 :
287 : /* per stream context for streaming transactions */
288 : static MemoryContext LogicalStreamingContext = NULL;
289 :
290 : WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
291 :
292 : Subscription *MySubscription = NULL;
293 : static bool MySubscriptionValid = false;
294 :
295 : static List *on_commit_wakeup_workers_subids = NIL;
296 :
297 : bool in_remote_transaction = false;
298 : static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
299 :
300 : /* fields valid only when processing streamed transaction */
301 : static bool in_streamed_transaction = false;
302 :
303 : static TransactionId stream_xid = InvalidTransactionId;
304 :
305 : /*
306 : * The number of changes applied by parallel apply worker during one streaming
307 : * block.
308 : */
309 : static uint32 parallel_stream_nchanges = 0;
310 :
311 : /* Are we initializing an apply worker? */
312 : bool InitializingApplyWorker = false;
313 :
314 : /*
315 : * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
316 : * the subscription if the remote transaction's finish LSN matches the subskiplsn.
317 : * Once we start skipping changes, we don't stop it until we skip all changes of
318 : * the transaction even if pg_subscription is updated and MySubscription->skiplsn
319 : * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
320 : * we don't skip receiving and spooling the changes since we decide whether or not
321 : * to skip applying the changes when starting to apply changes. The subskiplsn is
322 : * cleared after successfully skipping the transaction or applying non-empty
323 : * transaction. The latter prevents the mistakenly specified subskiplsn from
324 : * being left. Note that we cannot skip the streaming transactions when using
325 : * parallel apply workers because we cannot get the finish LSN before applying
326 : * the changes. So, we don't start parallel apply worker when finish LSN is set
327 : * by the user.
328 : */
329 : static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
330 : #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
331 :
332 : /* BufFile handle of the current streaming file */
333 : static BufFile *stream_fd = NULL;
334 :
335 : typedef struct SubXactInfo
336 : {
337 : TransactionId xid; /* XID of the subxact */
338 : int fileno; /* file number in the buffile */
339 : off_t offset; /* offset in the file */
340 : } SubXactInfo;
341 :
342 : /* Sub-transaction data for the current streaming transaction */
343 : typedef struct ApplySubXactData
344 : {
345 : uint32 nsubxacts; /* number of sub-transactions */
346 : uint32 nsubxacts_max; /* current capacity of subxacts */
347 : TransactionId subxact_last; /* xid of the last sub-transaction */
348 : SubXactInfo *subxacts; /* sub-xact offset in changes file */
349 : } ApplySubXactData;
350 :
351 : static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
352 :
353 : static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
354 : static inline void changes_filename(char *path, Oid subid, TransactionId xid);
355 :
356 : /*
357 : * Information about subtransactions of a given toplevel transaction.
358 : */
359 : static void subxact_info_write(Oid subid, TransactionId xid);
360 : static void subxact_info_read(Oid subid, TransactionId xid);
361 : static void subxact_info_add(TransactionId xid);
362 : static inline void cleanup_subxact_info(void);
363 :
364 : /*
365 : * Serialize and deserialize changes for a toplevel transaction.
366 : */
367 : static void stream_open_file(Oid subid, TransactionId xid,
368 : bool first_segment);
369 : static void stream_write_change(char action, StringInfo s);
370 : static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
371 : static void stream_close_file(void);
372 :
373 : static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
374 :
375 : static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
376 : static void apply_handle_insert_internal(ApplyExecutionData *edata,
377 : ResultRelInfo *relinfo,
378 : TupleTableSlot *remoteslot);
379 : static void apply_handle_update_internal(ApplyExecutionData *edata,
380 : ResultRelInfo *relinfo,
381 : TupleTableSlot *remoteslot,
382 : LogicalRepTupleData *newtup,
383 : Oid localindexoid);
384 : static void apply_handle_delete_internal(ApplyExecutionData *edata,
385 : ResultRelInfo *relinfo,
386 : TupleTableSlot *remoteslot,
387 : Oid localindexoid);
388 : static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
389 : LogicalRepRelation *remoterel,
390 : Oid localidxoid,
391 : TupleTableSlot *remoteslot,
392 : TupleTableSlot **localslot);
393 : static void apply_handle_tuple_routing(ApplyExecutionData *edata,
394 : TupleTableSlot *remoteslot,
395 : LogicalRepTupleData *newtup,
396 : CmdType operation);
397 :
398 : /* Functions for skipping changes */
399 : static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
400 : static void stop_skipping_changes(void);
401 : static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
402 :
403 : /* Functions for apply error callback */
404 : static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
405 : static inline void reset_apply_error_context_info(void);
406 :
407 : static TransApplyAction get_transaction_apply_action(TransactionId xid,
408 : ParallelApplyWorkerInfo **winfo);
409 :
410 : static void replorigin_reset(int code, Datum arg);
411 :
412 : /*
413 : * Form the origin name for the subscription.
414 : *
415 : * This is a common function for tablesync and other workers. Tablesync workers
416 : * must pass a valid relid. Other callers must pass relid = InvalidOid.
417 : *
418 : * Return the name in the supplied buffer.
419 : */
420 : void
421 2488 : ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
422 : char *originname, Size szoriginname)
423 : {
424 2488 : if (OidIsValid(relid))
425 : {
426 : /* Replication origin name for tablesync workers. */
427 1484 : snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
428 : }
429 : else
430 : {
431 : /* Replication origin name for non-tablesync workers. */
432 1004 : snprintf(originname, szoriginname, "pg_%u", suboid);
433 : }
434 2488 : }
435 :
436 : /*
437 : * Should this worker apply changes for given relation.
438 : *
439 : * This is mainly needed for initial relation data sync as that runs in
440 : * separate worker process running in parallel and we need some way to skip
441 : * changes coming to the leader apply worker during the sync of a table.
442 : *
443 : * Note we need to do smaller or equals comparison for SYNCDONE state because
444 : * it might hold position of end of initial slot consistent point WAL
445 : * record + 1 (ie start of next record) and next record can be COMMIT of
446 : * transaction we are now processing (which is what we set remote_final_lsn
447 : * to in apply_handle_begin).
448 : *
449 : * Note that for streaming transactions that are being applied in the parallel
450 : * apply worker, we disallow applying changes if the target table in the
451 : * subscription is not in the READY state, because we cannot decide whether to
452 : * apply the change as we won't know remote_final_lsn by that time.
453 : *
454 : * We already checked this in pa_can_start() before assigning the
455 : * streaming transaction to the parallel worker, but it also needs to be
456 : * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
457 : * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
458 : * while applying this transaction.
459 : */
460 : static bool
461 296266 : should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
462 : {
463 296266 : switch (MyLogicalRepWorker->type)
464 : {
465 20 : case WORKERTYPE_TABLESYNC:
466 20 : return MyLogicalRepWorker->relid == rel->localreloid;
467 :
468 136786 : case WORKERTYPE_PARALLEL_APPLY:
469 : /* We don't synchronize rel's that are in unknown state. */
470 136786 : if (rel->state != SUBREL_STATE_READY &&
471 0 : rel->state != SUBREL_STATE_UNKNOWN)
472 0 : ereport(ERROR,
473 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
474 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
475 : MySubscription->name),
476 : errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
477 :
478 136786 : return rel->state == SUBREL_STATE_READY;
479 :
480 159460 : case WORKERTYPE_APPLY:
481 159584 : return (rel->state == SUBREL_STATE_READY ||
482 124 : (rel->state == SUBREL_STATE_SYNCDONE &&
483 6 : rel->statelsn <= remote_final_lsn));
484 :
485 0 : case WORKERTYPE_UNKNOWN:
486 : /* Should never happen. */
487 0 : elog(ERROR, "Unknown worker type");
488 : }
489 :
490 0 : return false; /* dummy for compiler */
491 : }
492 :
493 : /*
494 : * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
495 : *
496 : * Start a transaction, if this is the first step (else we keep using the
497 : * existing transaction).
498 : * Also provide a global snapshot and ensure we run in ApplyMessageContext.
499 : */
500 : static void
501 297176 : begin_replication_step(void)
502 : {
503 297176 : SetCurrentStatementStartTimestamp();
504 :
505 297176 : if (!IsTransactionState())
506 : {
507 1856 : StartTransactionCommand();
508 1856 : maybe_reread_subscription();
509 : }
510 :
511 297170 : PushActiveSnapshot(GetTransactionSnapshot());
512 :
513 297170 : MemoryContextSwitchTo(ApplyMessageContext);
514 297170 : }
515 :
516 : /*
517 : * Finish up one step of a replication transaction.
518 : * Callers of begin_replication_step() must also call this.
519 : *
520 : * We don't close out the transaction here, but we should increment
521 : * the command counter to make the effects of this step visible.
522 : */
523 : static void
524 297112 : end_replication_step(void)
525 : {
526 297112 : PopActiveSnapshot();
527 :
528 297112 : CommandCounterIncrement();
529 297112 : }
530 :
531 : /*
532 : * Handle streamed transactions for both the leader apply worker and the
533 : * parallel apply workers.
534 : *
535 : * In the streaming case (receiving a block of the streamed transaction), for
536 : * serialize mode, simply redirect it to a file for the proper toplevel
537 : * transaction, and for parallel mode, the leader apply worker will send the
538 : * changes to parallel apply workers and the parallel apply worker will define
539 : * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
540 : * messages will be applied by both leader apply worker and parallel apply
541 : * workers).
542 : *
543 : * Returns true for streamed transactions (when the change is either serialized
544 : * to file or sent to parallel apply worker), false otherwise (regular mode or
545 : * needs to be processed by parallel apply worker).
546 : *
547 : * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
548 : * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
549 : * to a parallel apply worker.
550 : */
551 : static bool
552 648856 : handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
553 : {
554 : TransactionId current_xid;
555 : ParallelApplyWorkerInfo *winfo;
556 : TransApplyAction apply_action;
557 : StringInfoData original_msg;
558 :
559 648856 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
560 :
561 : /* not in streaming mode */
562 648856 : if (apply_action == TRANS_LEADER_APPLY)
563 160204 : return false;
564 :
565 : Assert(TransactionIdIsValid(stream_xid));
566 :
567 : /*
568 : * The parallel apply worker needs the xid in this message to decide
569 : * whether to define a savepoint, so save the original message that has
570 : * not moved the cursor after the xid. We will serialize this message to a
571 : * file in PARTIAL_SERIALIZE mode.
572 : */
573 488652 : original_msg = *s;
574 :
575 : /*
576 : * We should have received XID of the subxact as the first part of the
577 : * message, so extract it.
578 : */
579 488652 : current_xid = pq_getmsgint(s, 4);
580 :
581 488652 : if (!TransactionIdIsValid(current_xid))
582 0 : ereport(ERROR,
583 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
584 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
585 :
586 488652 : switch (apply_action)
587 : {
588 205026 : case TRANS_LEADER_SERIALIZE:
589 : Assert(stream_fd);
590 :
591 : /* Add the new subxact to the array (unless already there). */
592 205026 : subxact_info_add(current_xid);
593 :
594 : /* Write the change to the current file */
595 205026 : stream_write_change(action, s);
596 205026 : return true;
597 :
598 136772 : case TRANS_LEADER_SEND_TO_PARALLEL:
599 : Assert(winfo);
600 :
601 : /*
602 : * XXX The publisher side doesn't always send relation/type update
603 : * messages after the streaming transaction, so also update the
604 : * relation/type in leader apply worker. See function
605 : * cleanup_rel_sync_cache.
606 : */
607 136772 : if (pa_send_data(winfo, s->len, s->data))
608 136772 : return (action != LOGICAL_REP_MSG_RELATION &&
609 : action != LOGICAL_REP_MSG_TYPE);
610 :
611 : /*
612 : * Switch to serialize mode when we are not able to send the
613 : * change to parallel apply worker.
614 : */
615 0 : pa_switch_to_partial_serialize(winfo, false);
616 :
617 : /* fall through */
618 10012 : case TRANS_LEADER_PARTIAL_SERIALIZE:
619 10012 : stream_write_change(action, &original_msg);
620 :
621 : /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
622 10012 : return (action != LOGICAL_REP_MSG_RELATION &&
623 : action != LOGICAL_REP_MSG_TYPE);
624 :
625 136842 : case TRANS_PARALLEL_APPLY:
626 136842 : parallel_stream_nchanges += 1;
627 :
628 : /* Define a savepoint for a subxact if needed. */
629 136842 : pa_start_subtrans(current_xid, stream_xid);
630 136842 : return false;
631 :
632 0 : default:
633 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
634 : return false; /* silence compiler warning */
635 : }
636 : }
637 :
638 : /*
639 : * Executor state preparation for evaluation of constraint expressions,
640 : * indexes and triggers for the specified relation.
641 : *
642 : * Note that the caller must open and close any indexes to be updated.
643 : */
644 : static ApplyExecutionData *
645 296090 : create_edata_for_relation(LogicalRepRelMapEntry *rel)
646 : {
647 : ApplyExecutionData *edata;
648 : EState *estate;
649 : RangeTblEntry *rte;
650 296090 : List *perminfos = NIL;
651 : ResultRelInfo *resultRelInfo;
652 :
653 296090 : edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
654 296090 : edata->targetRel = rel;
655 :
656 296090 : edata->estate = estate = CreateExecutorState();
657 :
658 296090 : rte = makeNode(RangeTblEntry);
659 296090 : rte->rtekind = RTE_RELATION;
660 296090 : rte->relid = RelationGetRelid(rel->localrel);
661 296090 : rte->relkind = rel->localrel->rd_rel->relkind;
662 296090 : rte->rellockmode = AccessShareLock;
663 :
664 296090 : addRTEPermissionInfo(&perminfos, rte);
665 :
666 296090 : ExecInitRangeTable(estate, list_make1(rte), perminfos,
667 : bms_make_singleton(1));
668 :
669 296090 : edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
670 :
671 : /*
672 : * Use Relation opened by logicalrep_rel_open() instead of opening it
673 : * again.
674 : */
675 296090 : InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
676 :
677 : /*
678 : * We put the ResultRelInfo in the es_opened_result_relations list, even
679 : * though we don't populate the es_result_relations array. That's a bit
680 : * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
681 : *
682 : * ExecOpenIndices() is not called here either, each execution path doing
683 : * an apply operation being responsible for that.
684 : */
685 296090 : estate->es_opened_result_relations =
686 296090 : lappend(estate->es_opened_result_relations, resultRelInfo);
687 :
688 296090 : estate->es_output_cid = GetCurrentCommandId(true);
689 :
690 : /* Prepare to catch AFTER triggers. */
691 296090 : AfterTriggerBeginQuery();
692 :
693 : /* other fields of edata remain NULL for now */
694 :
695 296090 : return edata;
696 : }
697 :
698 : /*
699 : * Finish any operations related to the executor state created by
700 : * create_edata_for_relation().
701 : */
702 : static void
703 296048 : finish_edata(ApplyExecutionData *edata)
704 : {
705 296048 : EState *estate = edata->estate;
706 :
707 : /* Handle any queued AFTER triggers. */
708 296048 : AfterTriggerEndQuery(estate);
709 :
710 : /* Shut down tuple routing, if any was done. */
711 296048 : if (edata->proute)
712 148 : ExecCleanupTupleRouting(edata->mtstate, edata->proute);
713 :
714 : /*
715 : * Cleanup. It might seem that we should call ExecCloseResultRelations()
716 : * here, but we intentionally don't. It would close the rel we added to
717 : * es_opened_result_relations above, which is wrong because we took no
718 : * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
719 : * any other relations opened during execution.
720 : */
721 296048 : ExecResetTupleTable(estate->es_tupleTable, false);
722 296048 : FreeExecutorState(estate);
723 296048 : pfree(edata);
724 296048 : }
725 :
726 : /*
727 : * Executes default values for columns for which we can't map to remote
728 : * relation columns.
729 : *
730 : * This allows us to support tables which have more columns on the downstream
731 : * than on the upstream.
732 : */
733 : static void
734 151586 : slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
735 : TupleTableSlot *slot)
736 : {
737 151586 : TupleDesc desc = RelationGetDescr(rel->localrel);
738 151586 : int num_phys_attrs = desc->natts;
739 : int i;
740 : int attnum,
741 151586 : num_defaults = 0;
742 : int *defmap;
743 : ExprState **defexprs;
744 : ExprContext *econtext;
745 :
746 151586 : econtext = GetPerTupleExprContext(estate);
747 :
748 : /* We got all the data via replication, no need to evaluate anything. */
749 151586 : if (num_phys_attrs == rel->remoterel.natts)
750 71296 : return;
751 :
752 80290 : defmap = (int *) palloc(num_phys_attrs * sizeof(int));
753 80290 : defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
754 :
755 : Assert(rel->attrmap->maplen == num_phys_attrs);
756 421326 : for (attnum = 0; attnum < num_phys_attrs; attnum++)
757 : {
758 : Expr *defexpr;
759 :
760 341036 : if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
761 18 : continue;
762 :
763 341018 : if (rel->attrmap->attnums[attnum] >= 0)
764 184536 : continue;
765 :
766 156482 : defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
767 :
768 156482 : if (defexpr != NULL)
769 : {
770 : /* Run the expression through planner */
771 140262 : defexpr = expression_planner(defexpr);
772 :
773 : /* Initialize executable expression in copycontext */
774 140262 : defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
775 140262 : defmap[num_defaults] = attnum;
776 140262 : num_defaults++;
777 : }
778 : }
779 :
780 220552 : for (i = 0; i < num_defaults; i++)
781 140262 : slot->tts_values[defmap[i]] =
782 140262 : ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
783 : }
784 :
785 : /*
786 : * Store tuple data into slot.
787 : *
788 : * Incoming data can be either text or binary format.
789 : */
790 : static void
791 296108 : slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
792 : LogicalRepTupleData *tupleData)
793 : {
794 296108 : int natts = slot->tts_tupleDescriptor->natts;
795 : int i;
796 :
797 296108 : ExecClearTuple(slot);
798 :
799 : /* Call the "in" function for each non-dropped, non-null attribute */
800 : Assert(natts == rel->attrmap->maplen);
801 1315154 : for (i = 0; i < natts; i++)
802 : {
803 1019046 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
804 1019046 : int remoteattnum = rel->attrmap->attnums[i];
805 :
806 1019046 : if (!att->attisdropped && remoteattnum >= 0)
807 605212 : {
808 605212 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
809 :
810 : Assert(remoteattnum < tupleData->ncols);
811 :
812 : /* Set attnum for error callback */
813 605212 : apply_error_callback_arg.remote_attnum = remoteattnum;
814 :
815 605212 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
816 : {
817 : Oid typinput;
818 : Oid typioparam;
819 :
820 284542 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
821 569084 : slot->tts_values[i] =
822 284542 : OidInputFunctionCall(typinput, colvalue->data,
823 : typioparam, att->atttypmod);
824 284542 : slot->tts_isnull[i] = false;
825 : }
826 320670 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
827 : {
828 : Oid typreceive;
829 : Oid typioparam;
830 :
831 : /*
832 : * In some code paths we may be asked to re-parse the same
833 : * tuple data. Reset the StringInfo's cursor so that works.
834 : */
835 220016 : colvalue->cursor = 0;
836 :
837 220016 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
838 440032 : slot->tts_values[i] =
839 220016 : OidReceiveFunctionCall(typreceive, colvalue,
840 : typioparam, att->atttypmod);
841 :
842 : /* Trouble if it didn't eat the whole buffer */
843 220016 : if (colvalue->cursor != colvalue->len)
844 0 : ereport(ERROR,
845 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
846 : errmsg("incorrect binary data format in logical replication column %d",
847 : remoteattnum + 1)));
848 220016 : slot->tts_isnull[i] = false;
849 : }
850 : else
851 : {
852 : /*
853 : * NULL value from remote. (We don't expect to see
854 : * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
855 : * NULL.)
856 : */
857 100654 : slot->tts_values[i] = (Datum) 0;
858 100654 : slot->tts_isnull[i] = true;
859 : }
860 :
861 : /* Reset attnum for error callback */
862 605212 : apply_error_callback_arg.remote_attnum = -1;
863 : }
864 : else
865 : {
866 : /*
867 : * We assign NULL to dropped attributes and missing values
868 : * (missing values should be later filled using
869 : * slot_fill_defaults).
870 : */
871 413834 : slot->tts_values[i] = (Datum) 0;
872 413834 : slot->tts_isnull[i] = true;
873 : }
874 : }
875 :
876 296108 : ExecStoreVirtualTuple(slot);
877 296108 : }
878 :
879 : /*
880 : * Replace updated columns with data from the LogicalRepTupleData struct.
881 : * This is somewhat similar to heap_modify_tuple but also calls the type
882 : * input functions on the user data.
883 : *
884 : * "slot" is filled with a copy of the tuple in "srcslot", replacing
885 : * columns provided in "tupleData" and leaving others as-is.
886 : *
887 : * Caution: unreplaced pass-by-ref columns in "slot" will point into the
888 : * storage for "srcslot". This is OK for current usage, but someday we may
889 : * need to materialize "slot" at the end to make it independent of "srcslot".
890 : */
891 : static void
892 63848 : slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
893 : LogicalRepRelMapEntry *rel,
894 : LogicalRepTupleData *tupleData)
895 : {
896 63848 : int natts = slot->tts_tupleDescriptor->natts;
897 : int i;
898 :
899 : /* We'll fill "slot" with a virtual tuple, so we must start with ... */
900 63848 : ExecClearTuple(slot);
901 :
902 : /*
903 : * Copy all the column data from srcslot, so that we'll have valid values
904 : * for unreplaced columns.
905 : */
906 : Assert(natts == srcslot->tts_tupleDescriptor->natts);
907 63848 : slot_getallattrs(srcslot);
908 63848 : memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
909 63848 : memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
910 :
911 : /* Call the "in" function for each replaced attribute */
912 : Assert(natts == rel->attrmap->maplen);
913 318560 : for (i = 0; i < natts; i++)
914 : {
915 254712 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
916 254712 : int remoteattnum = rel->attrmap->attnums[i];
917 :
918 254712 : if (remoteattnum < 0)
919 117038 : continue;
920 :
921 : Assert(remoteattnum < tupleData->ncols);
922 :
923 137674 : if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
924 : {
925 137668 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
926 :
927 : /* Set attnum for error callback */
928 137668 : apply_error_callback_arg.remote_attnum = remoteattnum;
929 :
930 137668 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
931 : {
932 : Oid typinput;
933 : Oid typioparam;
934 :
935 50860 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
936 101720 : slot->tts_values[i] =
937 50860 : OidInputFunctionCall(typinput, colvalue->data,
938 : typioparam, att->atttypmod);
939 50860 : slot->tts_isnull[i] = false;
940 : }
941 86808 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
942 : {
943 : Oid typreceive;
944 : Oid typioparam;
945 :
946 : /*
947 : * In some code paths we may be asked to re-parse the same
948 : * tuple data. Reset the StringInfo's cursor so that works.
949 : */
950 86712 : colvalue->cursor = 0;
951 :
952 86712 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
953 173424 : slot->tts_values[i] =
954 86712 : OidReceiveFunctionCall(typreceive, colvalue,
955 : typioparam, att->atttypmod);
956 :
957 : /* Trouble if it didn't eat the whole buffer */
958 86712 : if (colvalue->cursor != colvalue->len)
959 0 : ereport(ERROR,
960 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
961 : errmsg("incorrect binary data format in logical replication column %d",
962 : remoteattnum + 1)));
963 86712 : slot->tts_isnull[i] = false;
964 : }
965 : else
966 : {
967 : /* must be LOGICALREP_COLUMN_NULL */
968 96 : slot->tts_values[i] = (Datum) 0;
969 96 : slot->tts_isnull[i] = true;
970 : }
971 :
972 : /* Reset attnum for error callback */
973 137668 : apply_error_callback_arg.remote_attnum = -1;
974 : }
975 : }
976 :
977 : /* And finally, declare that "slot" contains a valid virtual tuple */
978 63848 : ExecStoreVirtualTuple(slot);
979 63848 : }
980 :
981 : /*
982 : * Handle BEGIN message.
983 : */
984 : static void
985 908 : apply_handle_begin(StringInfo s)
986 : {
987 : LogicalRepBeginData begin_data;
988 :
989 : /* There must not be an active streaming transaction. */
990 : Assert(!TransactionIdIsValid(stream_xid));
991 :
992 908 : logicalrep_read_begin(s, &begin_data);
993 908 : set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
994 :
995 908 : remote_final_lsn = begin_data.final_lsn;
996 :
997 908 : maybe_start_skipping_changes(begin_data.final_lsn);
998 :
999 908 : in_remote_transaction = true;
1000 :
1001 908 : pgstat_report_activity(STATE_RUNNING, NULL);
1002 908 : }
1003 :
1004 : /*
1005 : * Handle COMMIT message.
1006 : *
1007 : * TODO, support tracking of multiple origins
1008 : */
1009 : static void
1010 848 : apply_handle_commit(StringInfo s)
1011 : {
1012 : LogicalRepCommitData commit_data;
1013 :
1014 848 : logicalrep_read_commit(s, &commit_data);
1015 :
1016 848 : if (commit_data.commit_lsn != remote_final_lsn)
1017 0 : ereport(ERROR,
1018 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1019 : errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
1020 : LSN_FORMAT_ARGS(commit_data.commit_lsn),
1021 : LSN_FORMAT_ARGS(remote_final_lsn))));
1022 :
1023 848 : apply_handle_commit_internal(&commit_data);
1024 :
1025 : /* Process any tables that are being synchronized in parallel. */
1026 848 : process_syncing_tables(commit_data.end_lsn);
1027 :
1028 848 : pgstat_report_activity(STATE_IDLE, NULL);
1029 848 : reset_apply_error_context_info();
1030 848 : }
1031 :
1032 : /*
1033 : * Handle BEGIN PREPARE message.
1034 : */
1035 : static void
1036 32 : apply_handle_begin_prepare(StringInfo s)
1037 : {
1038 : LogicalRepPreparedTxnData begin_data;
1039 :
1040 : /* Tablesync should never receive prepare. */
1041 32 : if (am_tablesync_worker())
1042 0 : ereport(ERROR,
1043 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1044 : errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1045 :
1046 : /* There must not be an active streaming transaction. */
1047 : Assert(!TransactionIdIsValid(stream_xid));
1048 :
1049 32 : logicalrep_read_begin_prepare(s, &begin_data);
1050 32 : set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1051 :
1052 32 : remote_final_lsn = begin_data.prepare_lsn;
1053 :
1054 32 : maybe_start_skipping_changes(begin_data.prepare_lsn);
1055 :
1056 32 : in_remote_transaction = true;
1057 :
1058 32 : pgstat_report_activity(STATE_RUNNING, NULL);
1059 32 : }
1060 :
1061 : /*
1062 : * Common function to prepare the GID.
1063 : */
1064 : static void
1065 46 : apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
1066 : {
1067 : char gid[GIDSIZE];
1068 :
1069 : /*
1070 : * Compute unique GID for two_phase transactions. We don't use GID of
1071 : * prepared transaction sent by server as that can lead to deadlock when
1072 : * we have multiple subscriptions from same node point to publications on
1073 : * the same node. See comments atop worker.c
1074 : */
1075 46 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
1076 : gid, sizeof(gid));
1077 :
1078 : /*
1079 : * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1080 : * called within the PrepareTransactionBlock below.
1081 : */
1082 46 : if (!IsTransactionBlock())
1083 : {
1084 46 : BeginTransactionBlock();
1085 46 : CommitTransactionCommand(); /* Completes the preceding Begin command. */
1086 : }
1087 :
1088 : /*
1089 : * Update origin state so we can restart streaming from correct position
1090 : * in case of crash.
1091 : */
1092 46 : replorigin_session_origin_lsn = prepare_data->end_lsn;
1093 46 : replorigin_session_origin_timestamp = prepare_data->prepare_time;
1094 :
1095 46 : PrepareTransactionBlock(gid);
1096 46 : }
1097 :
1098 : /*
1099 : * Handle PREPARE message.
1100 : */
1101 : static void
1102 30 : apply_handle_prepare(StringInfo s)
1103 : {
1104 : LogicalRepPreparedTxnData prepare_data;
1105 :
1106 30 : logicalrep_read_prepare(s, &prepare_data);
1107 :
1108 30 : if (prepare_data.prepare_lsn != remote_final_lsn)
1109 0 : ereport(ERROR,
1110 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1111 : errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
1112 : LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1113 : LSN_FORMAT_ARGS(remote_final_lsn))));
1114 :
1115 : /*
1116 : * Unlike commit, here, we always prepare the transaction even though no
1117 : * change has happened in this transaction or all changes are skipped. It
1118 : * is done this way because at commit prepared time, we won't know whether
1119 : * we have skipped preparing a transaction because of those reasons.
1120 : *
1121 : * XXX, We can optimize such that at commit prepared time, we first check
1122 : * whether we have prepared the transaction or not but that doesn't seem
1123 : * worthwhile because such cases shouldn't be common.
1124 : */
1125 30 : begin_replication_step();
1126 :
1127 30 : apply_handle_prepare_internal(&prepare_data);
1128 :
1129 30 : end_replication_step();
1130 30 : CommitTransactionCommand();
1131 28 : pgstat_report_stat(false);
1132 :
1133 : /*
1134 : * It is okay not to set the local_end LSN for the prepare because we
1135 : * always flush the prepare record. So, we can send the acknowledgment of
1136 : * the remote_end LSN as soon as prepare is finished.
1137 : *
1138 : * XXX For the sake of consistency with commit, we could have set it with
1139 : * the LSN of prepare but as of now we don't track that value similar to
1140 : * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
1141 : * it.
1142 : */
1143 28 : store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
1144 :
1145 28 : in_remote_transaction = false;
1146 :
1147 : /* Process any tables that are being synchronized in parallel. */
1148 28 : process_syncing_tables(prepare_data.end_lsn);
1149 :
1150 : /*
1151 : * Since we have already prepared the transaction, in a case where the
1152 : * server crashes before clearing the subskiplsn, it will be left but the
1153 : * transaction won't be resent. But that's okay because it's a rare case
1154 : * and the subskiplsn will be cleared when finishing the next transaction.
1155 : */
1156 28 : stop_skipping_changes();
1157 28 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1158 :
1159 28 : pgstat_report_activity(STATE_IDLE, NULL);
1160 28 : reset_apply_error_context_info();
1161 28 : }
1162 :
1163 : /*
1164 : * Handle a COMMIT PREPARED of a previously PREPARED transaction.
1165 : *
1166 : * Note that we don't need to wait here if the transaction was prepared in a
1167 : * parallel apply worker. In that case, we have already waited for the prepare
1168 : * to finish in apply_handle_stream_prepare() which will ensure all the
1169 : * operations in that transaction have happened in the subscriber, so no
1170 : * concurrent transaction can cause deadlock or transaction dependency issues.
1171 : */
1172 : static void
1173 40 : apply_handle_commit_prepared(StringInfo s)
1174 : {
1175 : LogicalRepCommitPreparedTxnData prepare_data;
1176 : char gid[GIDSIZE];
1177 :
1178 40 : logicalrep_read_commit_prepared(s, &prepare_data);
1179 40 : set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1180 :
1181 : /* Compute GID for two_phase transactions. */
1182 40 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
1183 : gid, sizeof(gid));
1184 :
1185 : /* There is no transaction when COMMIT PREPARED is called */
1186 40 : begin_replication_step();
1187 :
1188 : /*
1189 : * Update origin state so we can restart streaming from correct position
1190 : * in case of crash.
1191 : */
1192 40 : replorigin_session_origin_lsn = prepare_data.end_lsn;
1193 40 : replorigin_session_origin_timestamp = prepare_data.commit_time;
1194 :
1195 40 : FinishPreparedTransaction(gid, true);
1196 40 : end_replication_step();
1197 40 : CommitTransactionCommand();
1198 40 : pgstat_report_stat(false);
1199 :
1200 40 : store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
1201 40 : in_remote_transaction = false;
1202 :
1203 : /* Process any tables that are being synchronized in parallel. */
1204 40 : process_syncing_tables(prepare_data.end_lsn);
1205 :
1206 40 : clear_subscription_skip_lsn(prepare_data.end_lsn);
1207 :
1208 40 : pgstat_report_activity(STATE_IDLE, NULL);
1209 40 : reset_apply_error_context_info();
1210 40 : }
1211 :
1212 : /*
1213 : * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1214 : *
1215 : * Note that we don't need to wait here if the transaction was prepared in a
1216 : * parallel apply worker. In that case, we have already waited for the prepare
1217 : * to finish in apply_handle_stream_prepare() which will ensure all the
1218 : * operations in that transaction have happened in the subscriber, so no
1219 : * concurrent transaction can cause deadlock or transaction dependency issues.
1220 : */
1221 : static void
1222 10 : apply_handle_rollback_prepared(StringInfo s)
1223 : {
1224 : LogicalRepRollbackPreparedTxnData rollback_data;
1225 : char gid[GIDSIZE];
1226 :
1227 10 : logicalrep_read_rollback_prepared(s, &rollback_data);
1228 10 : set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1229 :
1230 : /* Compute GID for two_phase transactions. */
1231 10 : TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
1232 : gid, sizeof(gid));
1233 :
1234 : /*
1235 : * It is possible that we haven't received prepare because it occurred
1236 : * before walsender reached a consistent point or the two_phase was still
1237 : * not enabled by that time, so in such cases, we need to skip rollback
1238 : * prepared.
1239 : */
1240 10 : if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1241 : rollback_data.prepare_time))
1242 : {
1243 : /*
1244 : * Update origin state so we can restart streaming from correct
1245 : * position in case of crash.
1246 : */
1247 10 : replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
1248 10 : replorigin_session_origin_timestamp = rollback_data.rollback_time;
1249 :
1250 : /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1251 10 : begin_replication_step();
1252 10 : FinishPreparedTransaction(gid, false);
1253 10 : end_replication_step();
1254 10 : CommitTransactionCommand();
1255 :
1256 10 : clear_subscription_skip_lsn(rollback_data.rollback_end_lsn);
1257 : }
1258 :
1259 10 : pgstat_report_stat(false);
1260 :
1261 : /*
1262 : * It is okay not to set the local_end LSN for the rollback of prepared
1263 : * transaction because we always flush the WAL record for it. See
1264 : * apply_handle_prepare.
1265 : */
1266 10 : store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr);
1267 10 : in_remote_transaction = false;
1268 :
1269 : /* Process any tables that are being synchronized in parallel. */
1270 10 : process_syncing_tables(rollback_data.rollback_end_lsn);
1271 :
1272 10 : pgstat_report_activity(STATE_IDLE, NULL);
1273 10 : reset_apply_error_context_info();
1274 10 : }
1275 :
1276 : /*
1277 : * Handle STREAM PREPARE.
1278 : */
1279 : static void
1280 22 : apply_handle_stream_prepare(StringInfo s)
1281 : {
1282 : LogicalRepPreparedTxnData prepare_data;
1283 : ParallelApplyWorkerInfo *winfo;
1284 : TransApplyAction apply_action;
1285 :
1286 : /* Save the message before it is consumed. */
1287 22 : StringInfoData original_msg = *s;
1288 :
1289 22 : if (in_streamed_transaction)
1290 0 : ereport(ERROR,
1291 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1292 : errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1293 :
1294 : /* Tablesync should never receive prepare. */
1295 22 : if (am_tablesync_worker())
1296 0 : ereport(ERROR,
1297 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1298 : errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1299 :
1300 22 : logicalrep_read_stream_prepare(s, &prepare_data);
1301 22 : set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1302 :
1303 22 : apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1304 :
1305 22 : switch (apply_action)
1306 : {
1307 10 : case TRANS_LEADER_APPLY:
1308 :
1309 : /*
1310 : * The transaction has been serialized to file, so replay all the
1311 : * spooled operations.
1312 : */
1313 10 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
1314 : prepare_data.xid, prepare_data.prepare_lsn);
1315 :
1316 : /* Mark the transaction as prepared. */
1317 10 : apply_handle_prepare_internal(&prepare_data);
1318 :
1319 10 : CommitTransactionCommand();
1320 :
1321 : /*
1322 : * It is okay not to set the local_end LSN for the prepare because
1323 : * we always flush the prepare record. See apply_handle_prepare.
1324 : */
1325 10 : store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
1326 :
1327 10 : in_remote_transaction = false;
1328 :
1329 : /* Unlink the files with serialized changes and subxact info. */
1330 10 : stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
1331 :
1332 10 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1333 10 : break;
1334 :
1335 4 : case TRANS_LEADER_SEND_TO_PARALLEL:
1336 : Assert(winfo);
1337 :
1338 4 : if (pa_send_data(winfo, s->len, s->data))
1339 : {
1340 : /* Finish processing the streaming transaction. */
1341 4 : pa_xact_finish(winfo, prepare_data.end_lsn);
1342 4 : break;
1343 : }
1344 :
1345 : /*
1346 : * Switch to serialize mode when we are not able to send the
1347 : * change to parallel apply worker.
1348 : */
1349 0 : pa_switch_to_partial_serialize(winfo, true);
1350 :
1351 : /* fall through */
1352 2 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1353 : Assert(winfo);
1354 :
1355 2 : stream_open_and_write_change(prepare_data.xid,
1356 : LOGICAL_REP_MSG_STREAM_PREPARE,
1357 : &original_msg);
1358 :
1359 2 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
1360 :
1361 : /* Finish processing the streaming transaction. */
1362 2 : pa_xact_finish(winfo, prepare_data.end_lsn);
1363 2 : break;
1364 :
1365 6 : case TRANS_PARALLEL_APPLY:
1366 :
1367 : /*
1368 : * If the parallel apply worker is applying spooled messages then
1369 : * close the file before preparing.
1370 : */
1371 6 : if (stream_fd)
1372 2 : stream_close_file();
1373 :
1374 6 : begin_replication_step();
1375 :
1376 : /* Mark the transaction as prepared. */
1377 6 : apply_handle_prepare_internal(&prepare_data);
1378 :
1379 6 : end_replication_step();
1380 :
1381 6 : CommitTransactionCommand();
1382 :
1383 : /*
1384 : * It is okay not to set the local_end LSN for the prepare because
1385 : * we always flush the prepare record. See apply_handle_prepare.
1386 : */
1387 6 : MyParallelShared->last_commit_end = InvalidXLogRecPtr;
1388 :
1389 6 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
1390 6 : pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1391 :
1392 6 : pa_reset_subtrans();
1393 :
1394 6 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1395 6 : break;
1396 :
1397 0 : default:
1398 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1399 : break;
1400 : }
1401 :
1402 22 : pgstat_report_stat(false);
1403 :
1404 : /* Process any tables that are being synchronized in parallel. */
1405 22 : process_syncing_tables(prepare_data.end_lsn);
1406 :
1407 : /*
1408 : * Similar to prepare case, the subskiplsn could be left in a case of
1409 : * server crash but it's okay. See the comments in apply_handle_prepare().
1410 : */
1411 22 : stop_skipping_changes();
1412 22 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1413 :
1414 22 : pgstat_report_activity(STATE_IDLE, NULL);
1415 :
1416 22 : reset_apply_error_context_info();
1417 22 : }
1418 :
1419 : /*
1420 : * Handle ORIGIN message.
1421 : *
1422 : * TODO, support tracking of multiple origins
1423 : */
1424 : static void
1425 16 : apply_handle_origin(StringInfo s)
1426 : {
1427 : /*
1428 : * ORIGIN message can only come inside streaming transaction or inside
1429 : * remote transaction and before any actual writes.
1430 : */
1431 16 : if (!in_streamed_transaction &&
1432 24 : (!in_remote_transaction ||
1433 12 : (IsTransactionState() && !am_tablesync_worker())))
1434 0 : ereport(ERROR,
1435 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1436 : errmsg_internal("ORIGIN message sent out of order")));
1437 16 : }
1438 :
1439 : /*
1440 : * Initialize fileset (if not already done).
1441 : *
1442 : * Create a new file when first_segment is true, otherwise open the existing
1443 : * file.
1444 : */
1445 : void
1446 726 : stream_start_internal(TransactionId xid, bool first_segment)
1447 : {
1448 726 : begin_replication_step();
1449 :
1450 : /*
1451 : * Initialize the worker's stream_fileset if we haven't yet. This will be
1452 : * used for the entire duration of the worker so create it in a permanent
1453 : * context. We create this on the very first streaming message from any
1454 : * transaction and then use it for this and other streaming transactions.
1455 : * Now, we could create a fileset at the start of the worker as well but
1456 : * then we won't be sure that it will ever be used.
1457 : */
1458 726 : if (!MyLogicalRepWorker->stream_fileset)
1459 : {
1460 : MemoryContext oldctx;
1461 :
1462 28 : oldctx = MemoryContextSwitchTo(ApplyContext);
1463 :
1464 28 : MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
1465 28 : FileSetInit(MyLogicalRepWorker->stream_fileset);
1466 :
1467 28 : MemoryContextSwitchTo(oldctx);
1468 : }
1469 :
1470 : /* Open the spool file for this transaction. */
1471 726 : stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1472 :
1473 : /* If this is not the first segment, open existing subxact file. */
1474 726 : if (!first_segment)
1475 662 : subxact_info_read(MyLogicalRepWorker->subid, xid);
1476 :
1477 726 : end_replication_step();
1478 726 : }
1479 :
1480 : /*
1481 : * Handle STREAM START message.
1482 : */
1483 : static void
1484 1674 : apply_handle_stream_start(StringInfo s)
1485 : {
1486 : bool first_segment;
1487 : ParallelApplyWorkerInfo *winfo;
1488 : TransApplyAction apply_action;
1489 :
1490 : /* Save the message before it is consumed. */
1491 1674 : StringInfoData original_msg = *s;
1492 :
1493 1674 : if (in_streamed_transaction)
1494 0 : ereport(ERROR,
1495 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1496 : errmsg_internal("duplicate STREAM START message")));
1497 :
1498 : /* There must not be an active streaming transaction. */
1499 : Assert(!TransactionIdIsValid(stream_xid));
1500 :
1501 : /* notify handle methods we're processing a remote transaction */
1502 1674 : in_streamed_transaction = true;
1503 :
1504 : /* extract XID of the top-level transaction */
1505 1674 : stream_xid = logicalrep_read_stream_start(s, &first_segment);
1506 :
1507 1674 : if (!TransactionIdIsValid(stream_xid))
1508 0 : ereport(ERROR,
1509 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1510 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
1511 :
1512 1674 : set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
1513 :
1514 : /* Try to allocate a worker for the streaming transaction. */
1515 1674 : if (first_segment)
1516 164 : pa_allocate_worker(stream_xid);
1517 :
1518 1674 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1519 :
1520 1674 : switch (apply_action)
1521 : {
1522 686 : case TRANS_LEADER_SERIALIZE:
1523 :
1524 : /*
1525 : * Function stream_start_internal starts a transaction. This
1526 : * transaction will be committed on the stream stop unless it is a
1527 : * tablesync worker in which case it will be committed after
1528 : * processing all the messages. We need this transaction for
1529 : * handling the BufFile, used for serializing the streaming data
1530 : * and subxact info.
1531 : */
1532 686 : stream_start_internal(stream_xid, first_segment);
1533 686 : break;
1534 :
1535 482 : case TRANS_LEADER_SEND_TO_PARALLEL:
1536 : Assert(winfo);
1537 :
1538 : /*
1539 : * Once we start serializing the changes, the parallel apply
1540 : * worker will wait for the leader to release the stream lock
1541 : * until the end of the transaction. So, we don't need to release
1542 : * the lock or increment the stream count in that case.
1543 : */
1544 482 : if (pa_send_data(winfo, s->len, s->data))
1545 : {
1546 : /*
1547 : * Unlock the shared object lock so that the parallel apply
1548 : * worker can continue to receive changes.
1549 : */
1550 474 : if (!first_segment)
1551 428 : pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
1552 :
1553 : /*
1554 : * Increment the number of streaming blocks waiting to be
1555 : * processed by parallel apply worker.
1556 : */
1557 474 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
1558 :
1559 : /* Cache the parallel apply worker for this transaction. */
1560 474 : pa_set_stream_apply_worker(winfo);
1561 474 : break;
1562 : }
1563 :
1564 : /*
1565 : * Switch to serialize mode when we are not able to send the
1566 : * change to parallel apply worker.
1567 : */
1568 8 : pa_switch_to_partial_serialize(winfo, !first_segment);
1569 :
1570 : /* fall through */
1571 30 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1572 : Assert(winfo);
1573 :
1574 : /*
1575 : * Open the spool file unless it was already opened when switching
1576 : * to serialize mode. The transaction started in
1577 : * stream_start_internal will be committed on the stream stop.
1578 : */
1579 30 : if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1580 22 : stream_start_internal(stream_xid, first_segment);
1581 :
1582 30 : stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg);
1583 :
1584 : /* Cache the parallel apply worker for this transaction. */
1585 30 : pa_set_stream_apply_worker(winfo);
1586 30 : break;
1587 :
1588 484 : case TRANS_PARALLEL_APPLY:
1589 484 : if (first_segment)
1590 : {
1591 : /* Hold the lock until the end of the transaction. */
1592 54 : pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1593 54 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED);
1594 :
1595 : /*
1596 : * Signal the leader apply worker, as it may be waiting for
1597 : * us.
1598 : */
1599 54 : logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
1600 : }
1601 :
1602 484 : parallel_stream_nchanges = 0;
1603 484 : break;
1604 :
1605 0 : default:
1606 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1607 : break;
1608 : }
1609 :
1610 1674 : pgstat_report_activity(STATE_RUNNING, NULL);
1611 1674 : }
1612 :
1613 : /*
1614 : * Update the information about subxacts and close the file.
1615 : *
1616 : * This function should be called when the stream_start_internal function has
1617 : * been called.
1618 : */
1619 : void
1620 726 : stream_stop_internal(TransactionId xid)
1621 : {
1622 : /*
1623 : * Serialize information about subxacts for the toplevel transaction, then
1624 : * close the stream messages spool file.
1625 : */
1626 726 : subxact_info_write(MyLogicalRepWorker->subid, xid);
1627 726 : stream_close_file();
1628 :
1629 : /* We must be in a valid transaction state */
1630 : Assert(IsTransactionState());
1631 :
1632 : /* Commit the per-stream transaction */
1633 726 : CommitTransactionCommand();
1634 :
1635 : /* Reset per-stream context */
1636 726 : MemoryContextReset(LogicalStreamingContext);
1637 726 : }
1638 :
1639 : /*
1640 : * Handle STREAM STOP message.
1641 : */
1642 : static void
1643 1672 : apply_handle_stream_stop(StringInfo s)
1644 : {
1645 : ParallelApplyWorkerInfo *winfo;
1646 : TransApplyAction apply_action;
1647 :
1648 1672 : if (!in_streamed_transaction)
1649 0 : ereport(ERROR,
1650 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1651 : errmsg_internal("STREAM STOP message without STREAM START")));
1652 :
1653 1672 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1654 :
1655 1672 : switch (apply_action)
1656 : {
1657 686 : case TRANS_LEADER_SERIALIZE:
1658 686 : stream_stop_internal(stream_xid);
1659 686 : break;
1660 :
1661 474 : case TRANS_LEADER_SEND_TO_PARALLEL:
1662 : Assert(winfo);
1663 :
1664 : /*
1665 : * Lock before sending the STREAM_STOP message so that the leader
1666 : * can hold the lock first and the parallel apply worker will wait
1667 : * for leader to release the lock. See Locking Considerations atop
1668 : * applyparallelworker.c.
1669 : */
1670 474 : pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
1671 :
1672 474 : if (pa_send_data(winfo, s->len, s->data))
1673 : {
1674 474 : pa_set_stream_apply_worker(NULL);
1675 474 : break;
1676 : }
1677 :
1678 : /*
1679 : * Switch to serialize mode when we are not able to send the
1680 : * change to parallel apply worker.
1681 : */
1682 0 : pa_switch_to_partial_serialize(winfo, true);
1683 :
1684 : /* fall through */
1685 30 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1686 30 : stream_write_change(LOGICAL_REP_MSG_STREAM_STOP, s);
1687 30 : stream_stop_internal(stream_xid);
1688 30 : pa_set_stream_apply_worker(NULL);
1689 30 : break;
1690 :
1691 482 : case TRANS_PARALLEL_APPLY:
1692 482 : elog(DEBUG1, "applied %u changes in the streaming chunk",
1693 : parallel_stream_nchanges);
1694 :
1695 : /*
1696 : * By the time parallel apply worker is processing the changes in
1697 : * the current streaming block, the leader apply worker may have
1698 : * sent multiple streaming blocks. This can lead to parallel apply
1699 : * worker start waiting even when there are more chunk of streams
1700 : * in the queue. So, try to lock only if there is no message left
1701 : * in the queue. See Locking Considerations atop
1702 : * applyparallelworker.c.
1703 : *
1704 : * Note that here we have a race condition where we can start
1705 : * waiting even when there are pending streaming chunks. This can
1706 : * happen if the leader sends another streaming block and acquires
1707 : * the stream lock again after the parallel apply worker checks
1708 : * that there is no pending streaming block and before it actually
1709 : * starts waiting on a lock. We can handle this case by not
1710 : * allowing the leader to increment the stream block count during
1711 : * the time parallel apply worker acquires the lock but it is not
1712 : * clear whether that is worth the complexity.
1713 : *
1714 : * Now, if this missed chunk contains rollback to savepoint, then
1715 : * there is a risk of deadlock which probably shouldn't happen
1716 : * after restart.
1717 : */
1718 482 : pa_decr_and_wait_stream_block();
1719 478 : break;
1720 :
1721 0 : default:
1722 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1723 : break;
1724 : }
1725 :
1726 1668 : in_streamed_transaction = false;
1727 1668 : stream_xid = InvalidTransactionId;
1728 :
1729 : /*
1730 : * The parallel apply worker could be in a transaction in which case we
1731 : * need to report the state as STATE_IDLEINTRANSACTION.
1732 : */
1733 1668 : if (IsTransactionOrTransactionBlock())
1734 478 : pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL);
1735 : else
1736 1190 : pgstat_report_activity(STATE_IDLE, NULL);
1737 :
1738 1668 : reset_apply_error_context_info();
1739 1668 : }
1740 :
1741 : /*
1742 : * Helper function to handle STREAM ABORT message when the transaction was
1743 : * serialized to file.
1744 : */
1745 : static void
1746 28 : stream_abort_internal(TransactionId xid, TransactionId subxid)
1747 : {
1748 : /*
1749 : * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1750 : * just delete the files with serialized info.
1751 : */
1752 28 : if (xid == subxid)
1753 2 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
1754 : else
1755 : {
1756 : /*
1757 : * OK, so it's a subxact. We need to read the subxact file for the
1758 : * toplevel transaction, determine the offset tracked for the subxact,
1759 : * and truncate the file with changes. We also remove the subxacts
1760 : * with higher offsets (or rather higher XIDs).
1761 : *
1762 : * We intentionally scan the array from the tail, because we're likely
1763 : * aborting a change for the most recent subtransactions.
1764 : *
1765 : * We can't use the binary search here as subxact XIDs won't
1766 : * necessarily arrive in sorted order, consider the case where we have
1767 : * released the savepoint for multiple subtransactions and then
1768 : * performed rollback to savepoint for one of the earlier
1769 : * sub-transaction.
1770 : */
1771 : int64 i;
1772 : int64 subidx;
1773 : BufFile *fd;
1774 26 : bool found = false;
1775 : char path[MAXPGPATH];
1776 :
1777 26 : subidx = -1;
1778 26 : begin_replication_step();
1779 26 : subxact_info_read(MyLogicalRepWorker->subid, xid);
1780 :
1781 30 : for (i = subxact_data.nsubxacts; i > 0; i--)
1782 : {
1783 22 : if (subxact_data.subxacts[i - 1].xid == subxid)
1784 : {
1785 18 : subidx = (i - 1);
1786 18 : found = true;
1787 18 : break;
1788 : }
1789 : }
1790 :
1791 : /*
1792 : * If it's an empty sub-transaction then we will not find the subxid
1793 : * here so just cleanup the subxact info and return.
1794 : */
1795 26 : if (!found)
1796 : {
1797 : /* Cleanup the subxact info */
1798 8 : cleanup_subxact_info();
1799 8 : end_replication_step();
1800 8 : CommitTransactionCommand();
1801 8 : return;
1802 : }
1803 :
1804 : /* open the changes file */
1805 18 : changes_filename(path, MyLogicalRepWorker->subid, xid);
1806 18 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
1807 : O_RDWR, false);
1808 :
1809 : /* OK, truncate the file at the right offset */
1810 18 : BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
1811 18 : subxact_data.subxacts[subidx].offset);
1812 18 : BufFileClose(fd);
1813 :
1814 : /* discard the subxacts added later */
1815 18 : subxact_data.nsubxacts = subidx;
1816 :
1817 : /* write the updated subxact list */
1818 18 : subxact_info_write(MyLogicalRepWorker->subid, xid);
1819 :
1820 18 : end_replication_step();
1821 18 : CommitTransactionCommand();
1822 : }
1823 : }
1824 :
1825 : /*
1826 : * Handle STREAM ABORT message.
1827 : */
1828 : static void
1829 76 : apply_handle_stream_abort(StringInfo s)
1830 : {
1831 : TransactionId xid;
1832 : TransactionId subxid;
1833 : LogicalRepStreamAbortData abort_data;
1834 : ParallelApplyWorkerInfo *winfo;
1835 : TransApplyAction apply_action;
1836 :
1837 : /* Save the message before it is consumed. */
1838 76 : StringInfoData original_msg = *s;
1839 : bool toplevel_xact;
1840 :
1841 76 : if (in_streamed_transaction)
1842 0 : ereport(ERROR,
1843 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1844 : errmsg_internal("STREAM ABORT message without STREAM STOP")));
1845 :
1846 : /* We receive abort information only when we can apply in parallel. */
1847 76 : logicalrep_read_stream_abort(s, &abort_data,
1848 76 : MyLogicalRepWorker->parallel_apply);
1849 :
1850 76 : xid = abort_data.xid;
1851 76 : subxid = abort_data.subxid;
1852 76 : toplevel_xact = (xid == subxid);
1853 :
1854 76 : set_apply_error_context_xact(subxid, abort_data.abort_lsn);
1855 :
1856 76 : apply_action = get_transaction_apply_action(xid, &winfo);
1857 :
1858 76 : switch (apply_action)
1859 : {
1860 28 : case TRANS_LEADER_APPLY:
1861 :
1862 : /*
1863 : * We are in the leader apply worker and the transaction has been
1864 : * serialized to file.
1865 : */
1866 28 : stream_abort_internal(xid, subxid);
1867 :
1868 28 : elog(DEBUG1, "finished processing the STREAM ABORT command");
1869 28 : break;
1870 :
1871 20 : case TRANS_LEADER_SEND_TO_PARALLEL:
1872 : Assert(winfo);
1873 :
1874 : /*
1875 : * For the case of aborting the subtransaction, we increment the
1876 : * number of streaming blocks and take the lock again before
1877 : * sending the STREAM_ABORT to ensure that the parallel apply
1878 : * worker will wait on the lock for the next set of changes after
1879 : * processing the STREAM_ABORT message if it is not already
1880 : * waiting for STREAM_STOP message.
1881 : *
1882 : * It is important to perform this locking before sending the
1883 : * STREAM_ABORT message so that the leader can hold the lock first
1884 : * and the parallel apply worker will wait for the leader to
1885 : * release the lock. This is the same as what we do in
1886 : * apply_handle_stream_stop. See Locking Considerations atop
1887 : * applyparallelworker.c.
1888 : */
1889 20 : if (!toplevel_xact)
1890 : {
1891 18 : pa_unlock_stream(xid, AccessExclusiveLock);
1892 18 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
1893 18 : pa_lock_stream(xid, AccessExclusiveLock);
1894 : }
1895 :
1896 20 : if (pa_send_data(winfo, s->len, s->data))
1897 : {
1898 : /*
1899 : * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
1900 : * wait here for the parallel apply worker to finish as that
1901 : * is not required to maintain the commit order and won't have
1902 : * the risk of failures due to transaction dependencies and
1903 : * deadlocks. However, it is possible that before the parallel
1904 : * worker finishes and we clear the worker info, the xid
1905 : * wraparound happens on the upstream and a new transaction
1906 : * with the same xid can appear and that can lead to duplicate
1907 : * entries in ParallelApplyTxnHash. Yet another problem could
1908 : * be that we may have serialized the changes in partial
1909 : * serialize mode and the file containing xact changes may
1910 : * already exist, and after xid wraparound trying to create
1911 : * the file for the same xid can lead to an error. To avoid
1912 : * these problems, we decide to wait for the aborts to finish.
1913 : *
1914 : * Note, it is okay to not update the flush location position
1915 : * for aborts as in worst case that means such a transaction
1916 : * won't be sent again after restart.
1917 : */
1918 20 : if (toplevel_xact)
1919 2 : pa_xact_finish(winfo, InvalidXLogRecPtr);
1920 :
1921 20 : break;
1922 : }
1923 :
1924 : /*
1925 : * Switch to serialize mode when we are not able to send the
1926 : * change to parallel apply worker.
1927 : */
1928 0 : pa_switch_to_partial_serialize(winfo, true);
1929 :
1930 : /* fall through */
1931 4 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1932 : Assert(winfo);
1933 :
1934 : /*
1935 : * Parallel apply worker might have applied some changes, so write
1936 : * the STREAM_ABORT message so that it can rollback the
1937 : * subtransaction if needed.
1938 : */
1939 4 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_ABORT,
1940 : &original_msg);
1941 :
1942 4 : if (toplevel_xact)
1943 : {
1944 2 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
1945 2 : pa_xact_finish(winfo, InvalidXLogRecPtr);
1946 : }
1947 4 : break;
1948 :
1949 24 : case TRANS_PARALLEL_APPLY:
1950 :
1951 : /*
1952 : * If the parallel apply worker is applying spooled messages then
1953 : * close the file before aborting.
1954 : */
1955 24 : if (toplevel_xact && stream_fd)
1956 2 : stream_close_file();
1957 :
1958 24 : pa_stream_abort(&abort_data);
1959 :
1960 : /*
1961 : * We need to wait after processing rollback to savepoint for the
1962 : * next set of changes.
1963 : *
1964 : * We have a race condition here due to which we can start waiting
1965 : * here when there are more chunk of streams in the queue. See
1966 : * apply_handle_stream_stop.
1967 : */
1968 24 : if (!toplevel_xact)
1969 20 : pa_decr_and_wait_stream_block();
1970 :
1971 24 : elog(DEBUG1, "finished processing the STREAM ABORT command");
1972 24 : break;
1973 :
1974 0 : default:
1975 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1976 : break;
1977 : }
1978 :
1979 76 : reset_apply_error_context_info();
1980 76 : }
1981 :
1982 : /*
1983 : * Ensure that the passed location is fileset's end.
1984 : */
1985 : static void
1986 8 : ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
1987 : off_t offset)
1988 : {
1989 : char path[MAXPGPATH];
1990 : BufFile *fd;
1991 : int last_fileno;
1992 : off_t last_offset;
1993 :
1994 : Assert(!IsTransactionState());
1995 :
1996 8 : begin_replication_step();
1997 :
1998 8 : changes_filename(path, MyLogicalRepWorker->subid, xid);
1999 :
2000 8 : fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2001 :
2002 8 : BufFileSeek(fd, 0, 0, SEEK_END);
2003 8 : BufFileTell(fd, &last_fileno, &last_offset);
2004 :
2005 8 : BufFileClose(fd);
2006 :
2007 8 : end_replication_step();
2008 :
2009 8 : if (last_fileno != fileno || last_offset != offset)
2010 0 : elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2011 : path);
2012 8 : }
2013 :
2014 : /*
2015 : * Common spoolfile processing.
2016 : */
2017 : void
2018 62 : apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
2019 : XLogRecPtr lsn)
2020 : {
2021 : int nchanges;
2022 : char path[MAXPGPATH];
2023 62 : char *buffer = NULL;
2024 : MemoryContext oldcxt;
2025 : ResourceOwner oldowner;
2026 : int fileno;
2027 : off_t offset;
2028 :
2029 62 : if (!am_parallel_apply_worker())
2030 54 : maybe_start_skipping_changes(lsn);
2031 :
2032 : /* Make sure we have an open transaction */
2033 62 : begin_replication_step();
2034 :
2035 : /*
2036 : * Allocate file handle and memory required to process all the messages in
2037 : * TopTransactionContext to avoid them getting reset after each message is
2038 : * processed.
2039 : */
2040 62 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
2041 :
2042 : /* Open the spool file for the committed/prepared transaction */
2043 62 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2044 62 : elog(DEBUG1, "replaying changes from file \"%s\"", path);
2045 :
2046 : /*
2047 : * Make sure the file is owned by the toplevel transaction so that the
2048 : * file will not be accidentally closed when aborting a subtransaction.
2049 : */
2050 62 : oldowner = CurrentResourceOwner;
2051 62 : CurrentResourceOwner = TopTransactionResourceOwner;
2052 :
2053 62 : stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2054 :
2055 62 : CurrentResourceOwner = oldowner;
2056 :
2057 62 : buffer = palloc(BLCKSZ);
2058 :
2059 62 : MemoryContextSwitchTo(oldcxt);
2060 :
2061 62 : remote_final_lsn = lsn;
2062 :
2063 : /*
2064 : * Make sure the handle apply_dispatch methods are aware we're in a remote
2065 : * transaction.
2066 : */
2067 62 : in_remote_transaction = true;
2068 62 : pgstat_report_activity(STATE_RUNNING, NULL);
2069 :
2070 62 : end_replication_step();
2071 :
2072 : /*
2073 : * Read the entries one by one and pass them through the same logic as in
2074 : * apply_dispatch.
2075 : */
2076 62 : nchanges = 0;
2077 : while (true)
2078 176940 : {
2079 : StringInfoData s2;
2080 : size_t nbytes;
2081 : int len;
2082 :
2083 177002 : CHECK_FOR_INTERRUPTS();
2084 :
2085 : /* read length of the on-disk record */
2086 177002 : nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2087 :
2088 : /* have we reached end of the file? */
2089 177002 : if (nbytes == 0)
2090 52 : break;
2091 :
2092 : /* do we have a correct length? */
2093 176950 : if (len <= 0)
2094 0 : elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2095 : len, path);
2096 :
2097 : /* make sure we have sufficiently large buffer */
2098 176950 : buffer = repalloc(buffer, len);
2099 :
2100 : /* and finally read the data into the buffer */
2101 176950 : BufFileReadExact(stream_fd, buffer, len);
2102 :
2103 176950 : BufFileTell(stream_fd, &fileno, &offset);
2104 :
2105 : /* init a stringinfo using the buffer and call apply_dispatch */
2106 176950 : initReadOnlyStringInfo(&s2, buffer, len);
2107 :
2108 : /* Ensure we are reading the data into our memory context. */
2109 176950 : oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
2110 :
2111 176950 : apply_dispatch(&s2);
2112 :
2113 176948 : MemoryContextReset(ApplyMessageContext);
2114 :
2115 176948 : MemoryContextSwitchTo(oldcxt);
2116 :
2117 176948 : nchanges++;
2118 :
2119 : /*
2120 : * It is possible the file has been closed because we have processed
2121 : * the transaction end message like stream_commit in which case that
2122 : * must be the last message.
2123 : */
2124 176948 : if (!stream_fd)
2125 : {
2126 8 : ensure_last_message(stream_fileset, xid, fileno, offset);
2127 8 : break;
2128 : }
2129 :
2130 176940 : if (nchanges % 1000 == 0)
2131 166 : elog(DEBUG1, "replayed %d changes from file \"%s\"",
2132 : nchanges, path);
2133 : }
2134 :
2135 60 : if (stream_fd)
2136 52 : stream_close_file();
2137 :
2138 60 : elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2139 : nchanges, path);
2140 :
2141 60 : return;
2142 : }
2143 :
2144 : /*
2145 : * Handle STREAM COMMIT message.
2146 : */
2147 : static void
2148 122 : apply_handle_stream_commit(StringInfo s)
2149 : {
2150 : TransactionId xid;
2151 : LogicalRepCommitData commit_data;
2152 : ParallelApplyWorkerInfo *winfo;
2153 : TransApplyAction apply_action;
2154 :
2155 : /* Save the message before it is consumed. */
2156 122 : StringInfoData original_msg = *s;
2157 :
2158 122 : if (in_streamed_transaction)
2159 0 : ereport(ERROR,
2160 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2161 : errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2162 :
2163 122 : xid = logicalrep_read_stream_commit(s, &commit_data);
2164 122 : set_apply_error_context_xact(xid, commit_data.commit_lsn);
2165 :
2166 122 : apply_action = get_transaction_apply_action(xid, &winfo);
2167 :
2168 122 : switch (apply_action)
2169 : {
2170 44 : case TRANS_LEADER_APPLY:
2171 :
2172 : /*
2173 : * The transaction has been serialized to file, so replay all the
2174 : * spooled operations.
2175 : */
2176 44 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
2177 : commit_data.commit_lsn);
2178 :
2179 42 : apply_handle_commit_internal(&commit_data);
2180 :
2181 : /* Unlink the files with serialized changes and subxact info. */
2182 42 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
2183 :
2184 42 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2185 42 : break;
2186 :
2187 36 : case TRANS_LEADER_SEND_TO_PARALLEL:
2188 : Assert(winfo);
2189 :
2190 36 : if (pa_send_data(winfo, s->len, s->data))
2191 : {
2192 : /* Finish processing the streaming transaction. */
2193 36 : pa_xact_finish(winfo, commit_data.end_lsn);
2194 34 : break;
2195 : }
2196 :
2197 : /*
2198 : * Switch to serialize mode when we are not able to send the
2199 : * change to parallel apply worker.
2200 : */
2201 0 : pa_switch_to_partial_serialize(winfo, true);
2202 :
2203 : /* fall through */
2204 4 : case TRANS_LEADER_PARTIAL_SERIALIZE:
2205 : Assert(winfo);
2206 :
2207 4 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_COMMIT,
2208 : &original_msg);
2209 :
2210 4 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
2211 :
2212 : /* Finish processing the streaming transaction. */
2213 4 : pa_xact_finish(winfo, commit_data.end_lsn);
2214 4 : break;
2215 :
2216 38 : case TRANS_PARALLEL_APPLY:
2217 :
2218 : /*
2219 : * If the parallel apply worker is applying spooled messages then
2220 : * close the file before committing.
2221 : */
2222 38 : if (stream_fd)
2223 4 : stream_close_file();
2224 :
2225 38 : apply_handle_commit_internal(&commit_data);
2226 :
2227 38 : MyParallelShared->last_commit_end = XactLastCommitEnd;
2228 :
2229 : /*
2230 : * It is important to set the transaction state as finished before
2231 : * releasing the lock. See pa_wait_for_xact_finish.
2232 : */
2233 38 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
2234 38 : pa_unlock_transaction(xid, AccessExclusiveLock);
2235 :
2236 38 : pa_reset_subtrans();
2237 :
2238 38 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2239 38 : break;
2240 :
2241 0 : default:
2242 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2243 : break;
2244 : }
2245 :
2246 : /* Process any tables that are being synchronized in parallel. */
2247 118 : process_syncing_tables(commit_data.end_lsn);
2248 :
2249 118 : pgstat_report_activity(STATE_IDLE, NULL);
2250 :
2251 118 : reset_apply_error_context_info();
2252 118 : }
2253 :
2254 : /*
2255 : * Helper function for apply_handle_commit and apply_handle_stream_commit.
2256 : */
2257 : static void
2258 928 : apply_handle_commit_internal(LogicalRepCommitData *commit_data)
2259 : {
2260 928 : if (is_skipping_changes())
2261 : {
2262 4 : stop_skipping_changes();
2263 :
2264 : /*
2265 : * Start a new transaction to clear the subskiplsn, if not started
2266 : * yet.
2267 : */
2268 4 : if (!IsTransactionState())
2269 2 : StartTransactionCommand();
2270 : }
2271 :
2272 928 : if (IsTransactionState())
2273 : {
2274 : /*
2275 : * The transaction is either non-empty or skipped, so we clear the
2276 : * subskiplsn.
2277 : */
2278 928 : clear_subscription_skip_lsn(commit_data->commit_lsn);
2279 :
2280 : /*
2281 : * Update origin state so we can restart streaming from correct
2282 : * position in case of crash.
2283 : */
2284 928 : replorigin_session_origin_lsn = commit_data->end_lsn;
2285 928 : replorigin_session_origin_timestamp = commit_data->committime;
2286 :
2287 928 : CommitTransactionCommand();
2288 :
2289 928 : if (IsTransactionBlock())
2290 : {
2291 8 : EndTransactionBlock(false);
2292 8 : CommitTransactionCommand();
2293 : }
2294 :
2295 928 : pgstat_report_stat(false);
2296 :
2297 928 : store_flush_position(commit_data->end_lsn, XactLastCommitEnd);
2298 : }
2299 : else
2300 : {
2301 : /* Process any invalidation messages that might have accumulated. */
2302 0 : AcceptInvalidationMessages();
2303 0 : maybe_reread_subscription();
2304 : }
2305 :
2306 928 : in_remote_transaction = false;
2307 928 : }
2308 :
2309 : /*
2310 : * Handle RELATION message.
2311 : *
2312 : * Note we don't do validation against local schema here. The validation
2313 : * against local schema is postponed until first change for given relation
2314 : * comes as we only care about it when applying changes for it anyway and we
2315 : * do less locking this way.
2316 : */
2317 : static void
2318 870 : apply_handle_relation(StringInfo s)
2319 : {
2320 : LogicalRepRelation *rel;
2321 :
2322 870 : if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
2323 72 : return;
2324 :
2325 798 : rel = logicalrep_read_rel(s);
2326 798 : logicalrep_relmap_update(rel);
2327 :
2328 : /* Also reset all entries in the partition map that refer to remoterel. */
2329 798 : logicalrep_partmap_reset_relmap(rel);
2330 : }
2331 :
2332 : /*
2333 : * Handle TYPE message.
2334 : *
2335 : * This implementation pays no attention to TYPE messages; we expect the user
2336 : * to have set things up so that the incoming data is acceptable to the input
2337 : * functions for the locally subscribed tables. Hence, we just read and
2338 : * discard the message.
2339 : */
2340 : static void
2341 36 : apply_handle_type(StringInfo s)
2342 : {
2343 : LogicalRepTyp typ;
2344 :
2345 36 : if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
2346 0 : return;
2347 :
2348 36 : logicalrep_read_typ(s, &typ);
2349 : }
2350 :
2351 : /*
2352 : * Check that we (the subscription owner) have sufficient privileges on the
2353 : * target relation to perform the given operation.
2354 : */
2355 : static void
2356 440628 : TargetPrivilegesCheck(Relation rel, AclMode mode)
2357 : {
2358 : Oid relid;
2359 : AclResult aclresult;
2360 :
2361 440628 : relid = RelationGetRelid(rel);
2362 440628 : aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
2363 440628 : if (aclresult != ACLCHECK_OK)
2364 14 : aclcheck_error(aclresult,
2365 14 : get_relkind_objtype(rel->rd_rel->relkind),
2366 14 : get_rel_name(relid));
2367 :
2368 : /*
2369 : * We lack the infrastructure to honor RLS policies. It might be possible
2370 : * to add such infrastructure here, but tablesync workers lack it, too, so
2371 : * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2372 : * but it seems dangerous to replicate a TRUNCATE and then refuse to
2373 : * replicate subsequent INSERTs, so we forbid all commands the same.
2374 : */
2375 440614 : if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2376 6 : ereport(ERROR,
2377 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2378 : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2379 : GetUserNameFromId(GetUserId(), true),
2380 : RelationGetRelationName(rel))));
2381 440608 : }
2382 :
2383 : /*
2384 : * Handle INSERT message.
2385 : */
2386 :
2387 : static void
2388 371736 : apply_handle_insert(StringInfo s)
2389 : {
2390 : LogicalRepRelMapEntry *rel;
2391 : LogicalRepTupleData newtup;
2392 : LogicalRepRelId relid;
2393 : UserContext ucxt;
2394 : ApplyExecutionData *edata;
2395 : EState *estate;
2396 : TupleTableSlot *remoteslot;
2397 : MemoryContext oldctx;
2398 : bool run_as_owner;
2399 :
2400 : /*
2401 : * Quick return if we are skipping data modification changes or handling
2402 : * streamed transactions.
2403 : */
2404 723470 : if (is_skipping_changes() ||
2405 351734 : handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
2406 220132 : return;
2407 :
2408 151722 : begin_replication_step();
2409 :
2410 151718 : relid = logicalrep_read_insert(s, &newtup);
2411 151718 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2412 151704 : if (!should_apply_changes_for_rel(rel))
2413 : {
2414 : /*
2415 : * The relation can't become interesting in the middle of the
2416 : * transaction so it's safe to unlock it.
2417 : */
2418 118 : logicalrep_rel_close(rel, RowExclusiveLock);
2419 118 : end_replication_step();
2420 118 : return;
2421 : }
2422 :
2423 : /*
2424 : * Make sure that any user-supplied code runs as the table owner, unless
2425 : * the user has opted out of that behavior.
2426 : */
2427 151586 : run_as_owner = MySubscription->runasowner;
2428 151586 : if (!run_as_owner)
2429 151570 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2430 :
2431 : /* Set relation for error callback */
2432 151586 : apply_error_callback_arg.rel = rel;
2433 :
2434 : /* Initialize the executor state. */
2435 151586 : edata = create_edata_for_relation(rel);
2436 151586 : estate = edata->estate;
2437 151586 : remoteslot = ExecInitExtraTupleSlot(estate,
2438 151586 : RelationGetDescr(rel->localrel),
2439 : &TTSOpsVirtual);
2440 :
2441 : /* Process and store remote tuple in the slot */
2442 151586 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2443 151586 : slot_store_data(remoteslot, rel, &newtup);
2444 151586 : slot_fill_defaults(rel, estate, remoteslot);
2445 151586 : MemoryContextSwitchTo(oldctx);
2446 :
2447 : /* For a partitioned table, insert the tuple into a partition. */
2448 151586 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2449 90 : apply_handle_tuple_routing(edata,
2450 : remoteslot, NULL, CMD_INSERT);
2451 : else
2452 : {
2453 151496 : ResultRelInfo *relinfo = edata->targetRelInfo;
2454 :
2455 151496 : ExecOpenIndices(relinfo, false);
2456 151496 : apply_handle_insert_internal(edata, relinfo, remoteslot);
2457 151468 : ExecCloseIndices(relinfo);
2458 : }
2459 :
2460 151556 : finish_edata(edata);
2461 :
2462 : /* Reset relation for error callback */
2463 151556 : apply_error_callback_arg.rel = NULL;
2464 :
2465 151556 : if (!run_as_owner)
2466 151546 : RestoreUserContext(&ucxt);
2467 :
2468 151556 : logicalrep_rel_close(rel, NoLock);
2469 :
2470 151556 : end_replication_step();
2471 : }
2472 :
2473 : /*
2474 : * Workhorse for apply_handle_insert()
2475 : * relinfo is for the relation we're actually inserting into
2476 : * (could be a child partition of edata->targetRelInfo)
2477 : */
2478 : static void
2479 151588 : apply_handle_insert_internal(ApplyExecutionData *edata,
2480 : ResultRelInfo *relinfo,
2481 : TupleTableSlot *remoteslot)
2482 : {
2483 151588 : EState *estate = edata->estate;
2484 :
2485 : /* Caller should have opened indexes already. */
2486 : Assert(relinfo->ri_IndexRelationDescs != NULL ||
2487 : !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
2488 : RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
2489 :
2490 : /* Caller will not have done this bit. */
2491 : Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
2492 151588 : InitConflictIndexes(relinfo);
2493 :
2494 : /* Do the insert. */
2495 151588 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
2496 151576 : ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2497 151558 : }
2498 :
2499 : /*
2500 : * Check if the logical replication relation is updatable and throw
2501 : * appropriate error if it isn't.
2502 : */
2503 : static void
2504 144566 : check_relation_updatable(LogicalRepRelMapEntry *rel)
2505 : {
2506 : /*
2507 : * For partitioned tables, we only need to care if the target partition is
2508 : * updatable (aka has PK or RI defined for it).
2509 : */
2510 144566 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2511 60 : return;
2512 :
2513 : /* Updatable, no error. */
2514 144506 : if (rel->updatable)
2515 144506 : return;
2516 :
2517 : /*
2518 : * We are in error mode so it's fine this is somewhat slow. It's better to
2519 : * give user correct error.
2520 : */
2521 0 : if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
2522 : {
2523 0 : ereport(ERROR,
2524 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2525 : errmsg("publisher did not send replica identity column "
2526 : "expected by the logical replication target relation \"%s.%s\"",
2527 : rel->remoterel.nspname, rel->remoterel.relname)));
2528 : }
2529 :
2530 0 : ereport(ERROR,
2531 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2532 : errmsg("logical replication target relation \"%s.%s\" has "
2533 : "neither REPLICA IDENTITY index nor PRIMARY "
2534 : "KEY and published relation does not have "
2535 : "REPLICA IDENTITY FULL",
2536 : rel->remoterel.nspname, rel->remoterel.relname)));
2537 : }
2538 :
2539 : /*
2540 : * Handle UPDATE message.
2541 : *
2542 : * TODO: FDW support
2543 : */
2544 : static void
2545 132318 : apply_handle_update(StringInfo s)
2546 : {
2547 : LogicalRepRelMapEntry *rel;
2548 : LogicalRepRelId relid;
2549 : UserContext ucxt;
2550 : ApplyExecutionData *edata;
2551 : EState *estate;
2552 : LogicalRepTupleData oldtup;
2553 : LogicalRepTupleData newtup;
2554 : bool has_oldtup;
2555 : TupleTableSlot *remoteslot;
2556 : RTEPermissionInfo *target_perminfo;
2557 : MemoryContext oldctx;
2558 : bool run_as_owner;
2559 :
2560 : /*
2561 : * Quick return if we are skipping data modification changes or handling
2562 : * streamed transactions.
2563 : */
2564 264630 : if (is_skipping_changes() ||
2565 132312 : handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
2566 68446 : return;
2567 :
2568 63872 : begin_replication_step();
2569 :
2570 63870 : relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2571 : &newtup);
2572 63870 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2573 63870 : if (!should_apply_changes_for_rel(rel))
2574 : {
2575 : /*
2576 : * The relation can't become interesting in the middle of the
2577 : * transaction so it's safe to unlock it.
2578 : */
2579 0 : logicalrep_rel_close(rel, RowExclusiveLock);
2580 0 : end_replication_step();
2581 0 : return;
2582 : }
2583 :
2584 : /* Set relation for error callback */
2585 63870 : apply_error_callback_arg.rel = rel;
2586 :
2587 : /* Check if we can do the update. */
2588 63870 : check_relation_updatable(rel);
2589 :
2590 : /*
2591 : * Make sure that any user-supplied code runs as the table owner, unless
2592 : * the user has opted out of that behavior.
2593 : */
2594 63870 : run_as_owner = MySubscription->runasowner;
2595 63870 : if (!run_as_owner)
2596 63864 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2597 :
2598 : /* Initialize the executor state. */
2599 63868 : edata = create_edata_for_relation(rel);
2600 63868 : estate = edata->estate;
2601 63868 : remoteslot = ExecInitExtraTupleSlot(estate,
2602 63868 : RelationGetDescr(rel->localrel),
2603 : &TTSOpsVirtual);
2604 :
2605 : /*
2606 : * Populate updatedCols so that per-column triggers can fire, and so
2607 : * executor can correctly pass down indexUnchanged hint. This could
2608 : * include more columns than were actually changed on the publisher
2609 : * because the logical replication protocol doesn't contain that
2610 : * information. But it would for example exclude columns that only exist
2611 : * on the subscriber, since we are not touching those.
2612 : */
2613 63868 : target_perminfo = list_nth(estate->es_rteperminfos, 0);
2614 318608 : for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2615 : {
2616 254740 : Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i);
2617 254740 : int remoteattnum = rel->attrmap->attnums[i];
2618 :
2619 254740 : if (!att->attisdropped && remoteattnum >= 0)
2620 : {
2621 : Assert(remoteattnum < newtup.ncols);
2622 137706 : if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2623 137700 : target_perminfo->updatedCols =
2624 137700 : bms_add_member(target_perminfo->updatedCols,
2625 : i + 1 - FirstLowInvalidHeapAttributeNumber);
2626 : }
2627 : }
2628 :
2629 : /* Build the search tuple. */
2630 63868 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2631 63868 : slot_store_data(remoteslot, rel,
2632 63868 : has_oldtup ? &oldtup : &newtup);
2633 63868 : MemoryContextSwitchTo(oldctx);
2634 :
2635 : /* For a partitioned table, apply update to correct partition. */
2636 63868 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2637 26 : apply_handle_tuple_routing(edata,
2638 : remoteslot, &newtup, CMD_UPDATE);
2639 : else
2640 63842 : apply_handle_update_internal(edata, edata->targetRelInfo,
2641 : remoteslot, &newtup, rel->localindexoid);
2642 :
2643 63856 : finish_edata(edata);
2644 :
2645 : /* Reset relation for error callback */
2646 63856 : apply_error_callback_arg.rel = NULL;
2647 :
2648 63856 : if (!run_as_owner)
2649 63852 : RestoreUserContext(&ucxt);
2650 :
2651 63856 : logicalrep_rel_close(rel, NoLock);
2652 :
2653 63856 : end_replication_step();
2654 : }
2655 :
2656 : /*
2657 : * Workhorse for apply_handle_update()
2658 : * relinfo is for the relation we're actually updating in
2659 : * (could be a child partition of edata->targetRelInfo)
2660 : */
2661 : static void
2662 63842 : apply_handle_update_internal(ApplyExecutionData *edata,
2663 : ResultRelInfo *relinfo,
2664 : TupleTableSlot *remoteslot,
2665 : LogicalRepTupleData *newtup,
2666 : Oid localindexoid)
2667 : {
2668 63842 : EState *estate = edata->estate;
2669 63842 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2670 63842 : Relation localrel = relinfo->ri_RelationDesc;
2671 : EPQState epqstate;
2672 63842 : TupleTableSlot *localslot = NULL;
2673 63842 : ConflictTupleInfo conflicttuple = {0};
2674 : bool found;
2675 : MemoryContext oldctx;
2676 :
2677 63842 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2678 63842 : ExecOpenIndices(relinfo, false);
2679 :
2680 63842 : found = FindReplTupleInLocalRel(edata, localrel,
2681 : &relmapentry->remoterel,
2682 : localindexoid,
2683 : remoteslot, &localslot);
2684 :
2685 : /*
2686 : * Tuple found.
2687 : *
2688 : * Note this will fail if there are other conflicting unique indexes.
2689 : */
2690 63834 : if (found)
2691 : {
2692 : /*
2693 : * Report the conflict if the tuple was modified by a different
2694 : * origin.
2695 : */
2696 63826 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
2697 4 : &conflicttuple.origin, &conflicttuple.ts) &&
2698 4 : conflicttuple.origin != replorigin_session_origin)
2699 : {
2700 : TupleTableSlot *newslot;
2701 :
2702 : /* Store the new tuple for conflict reporting */
2703 4 : newslot = table_slot_create(localrel, &estate->es_tupleTable);
2704 4 : slot_store_data(newslot, relmapentry, newtup);
2705 :
2706 4 : conflicttuple.slot = localslot;
2707 :
2708 4 : ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
2709 : remoteslot, newslot,
2710 4 : list_make1(&conflicttuple));
2711 : }
2712 :
2713 : /* Process and store remote tuple in the slot */
2714 63826 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2715 63826 : slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2716 63826 : MemoryContextSwitchTo(oldctx);
2717 :
2718 63826 : EvalPlanQualSetSlot(&epqstate, remoteslot);
2719 :
2720 63826 : InitConflictIndexes(relinfo);
2721 :
2722 : /* Do the actual update. */
2723 63826 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
2724 63826 : ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2725 : remoteslot);
2726 : }
2727 : else
2728 : {
2729 8 : TupleTableSlot *newslot = localslot;
2730 :
2731 : /* Store the new tuple for conflict reporting */
2732 8 : slot_store_data(newslot, relmapentry, newtup);
2733 :
2734 : /*
2735 : * The tuple to be updated could not be found. Do nothing except for
2736 : * emitting a log message.
2737 : */
2738 8 : ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
2739 8 : remoteslot, newslot, list_make1(&conflicttuple));
2740 : }
2741 :
2742 : /* Cleanup. */
2743 63830 : ExecCloseIndices(relinfo);
2744 63830 : EvalPlanQualEnd(&epqstate);
2745 63830 : }
2746 :
2747 : /*
2748 : * Handle DELETE message.
2749 : *
2750 : * TODO: FDW support
2751 : */
2752 : static void
2753 163866 : apply_handle_delete(StringInfo s)
2754 : {
2755 : LogicalRepRelMapEntry *rel;
2756 : LogicalRepTupleData oldtup;
2757 : LogicalRepRelId relid;
2758 : UserContext ucxt;
2759 : ApplyExecutionData *edata;
2760 : EState *estate;
2761 : TupleTableSlot *remoteslot;
2762 : MemoryContext oldctx;
2763 : bool run_as_owner;
2764 :
2765 : /*
2766 : * Quick return if we are skipping data modification changes or handling
2767 : * streamed transactions.
2768 : */
2769 327732 : if (is_skipping_changes() ||
2770 163866 : handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
2771 83230 : return;
2772 :
2773 80636 : begin_replication_step();
2774 :
2775 80636 : relid = logicalrep_read_delete(s, &oldtup);
2776 80636 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2777 80636 : if (!should_apply_changes_for_rel(rel))
2778 : {
2779 : /*
2780 : * The relation can't become interesting in the middle of the
2781 : * transaction so it's safe to unlock it.
2782 : */
2783 0 : logicalrep_rel_close(rel, RowExclusiveLock);
2784 0 : end_replication_step();
2785 0 : return;
2786 : }
2787 :
2788 : /* Set relation for error callback */
2789 80636 : apply_error_callback_arg.rel = rel;
2790 :
2791 : /* Check if we can do the delete. */
2792 80636 : check_relation_updatable(rel);
2793 :
2794 : /*
2795 : * Make sure that any user-supplied code runs as the table owner, unless
2796 : * the user has opted out of that behavior.
2797 : */
2798 80636 : run_as_owner = MySubscription->runasowner;
2799 80636 : if (!run_as_owner)
2800 80632 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2801 :
2802 : /* Initialize the executor state. */
2803 80636 : edata = create_edata_for_relation(rel);
2804 80636 : estate = edata->estate;
2805 80636 : remoteslot = ExecInitExtraTupleSlot(estate,
2806 80636 : RelationGetDescr(rel->localrel),
2807 : &TTSOpsVirtual);
2808 :
2809 : /* Build the search tuple. */
2810 80636 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2811 80636 : slot_store_data(remoteslot, rel, &oldtup);
2812 80636 : MemoryContextSwitchTo(oldctx);
2813 :
2814 : /* For a partitioned table, apply delete to correct partition. */
2815 80636 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2816 34 : apply_handle_tuple_routing(edata,
2817 : remoteslot, NULL, CMD_DELETE);
2818 : else
2819 : {
2820 80602 : ResultRelInfo *relinfo = edata->targetRelInfo;
2821 :
2822 80602 : ExecOpenIndices(relinfo, false);
2823 80602 : apply_handle_delete_internal(edata, relinfo,
2824 : remoteslot, rel->localindexoid);
2825 80602 : ExecCloseIndices(relinfo);
2826 : }
2827 :
2828 80636 : finish_edata(edata);
2829 :
2830 : /* Reset relation for error callback */
2831 80636 : apply_error_callback_arg.rel = NULL;
2832 :
2833 80636 : if (!run_as_owner)
2834 80632 : RestoreUserContext(&ucxt);
2835 :
2836 80636 : logicalrep_rel_close(rel, NoLock);
2837 :
2838 80636 : end_replication_step();
2839 : }
2840 :
2841 : /*
2842 : * Workhorse for apply_handle_delete()
2843 : * relinfo is for the relation we're actually deleting from
2844 : * (could be a child partition of edata->targetRelInfo)
2845 : */
2846 : static void
2847 80636 : apply_handle_delete_internal(ApplyExecutionData *edata,
2848 : ResultRelInfo *relinfo,
2849 : TupleTableSlot *remoteslot,
2850 : Oid localindexoid)
2851 : {
2852 80636 : EState *estate = edata->estate;
2853 80636 : Relation localrel = relinfo->ri_RelationDesc;
2854 80636 : LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
2855 : EPQState epqstate;
2856 : TupleTableSlot *localslot;
2857 80636 : ConflictTupleInfo conflicttuple = {0};
2858 : bool found;
2859 :
2860 80636 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2861 :
2862 : /* Caller should have opened indexes already. */
2863 : Assert(relinfo->ri_IndexRelationDescs != NULL ||
2864 : !localrel->rd_rel->relhasindex ||
2865 : RelationGetIndexList(localrel) == NIL);
2866 :
2867 80636 : found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
2868 : remoteslot, &localslot);
2869 :
2870 : /* If found delete it. */
2871 80636 : if (found)
2872 : {
2873 : /*
2874 : * Report the conflict if the tuple was modified by a different
2875 : * origin.
2876 : */
2877 80618 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
2878 6 : &conflicttuple.origin, &conflicttuple.ts) &&
2879 6 : conflicttuple.origin != replorigin_session_origin)
2880 : {
2881 4 : conflicttuple.slot = localslot;
2882 4 : ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
2883 : remoteslot, NULL,
2884 4 : list_make1(&conflicttuple));
2885 : }
2886 :
2887 80618 : EvalPlanQualSetSlot(&epqstate, localslot);
2888 :
2889 : /* Do the actual delete. */
2890 80618 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE);
2891 80618 : ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
2892 : }
2893 : else
2894 : {
2895 : /*
2896 : * The tuple to be deleted could not be found. Do nothing except for
2897 : * emitting a log message.
2898 : */
2899 18 : ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
2900 18 : remoteslot, NULL, list_make1(&conflicttuple));
2901 : }
2902 :
2903 : /* Cleanup. */
2904 80636 : EvalPlanQualEnd(&epqstate);
2905 80636 : }
2906 :
2907 : /*
2908 : * Try to find a tuple received from the publication side (in 'remoteslot') in
2909 : * the corresponding local relation using either replica identity index,
2910 : * primary key, index or if needed, sequential scan.
2911 : *
2912 : * Local tuple, if found, is returned in '*localslot'.
2913 : */
2914 : static bool
2915 144504 : FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
2916 : LogicalRepRelation *remoterel,
2917 : Oid localidxoid,
2918 : TupleTableSlot *remoteslot,
2919 : TupleTableSlot **localslot)
2920 : {
2921 144504 : EState *estate = edata->estate;
2922 : bool found;
2923 :
2924 : /*
2925 : * Regardless of the top-level operation, we're performing a read here, so
2926 : * check for SELECT privileges.
2927 : */
2928 144504 : TargetPrivilegesCheck(localrel, ACL_SELECT);
2929 :
2930 144496 : *localslot = table_slot_create(localrel, &estate->es_tupleTable);
2931 :
2932 : Assert(OidIsValid(localidxoid) ||
2933 : (remoterel->replident == REPLICA_IDENTITY_FULL));
2934 :
2935 144496 : if (OidIsValid(localidxoid))
2936 : {
2937 : #ifdef USE_ASSERT_CHECKING
2938 : Relation idxrel = index_open(localidxoid, AccessShareLock);
2939 :
2940 : /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
2941 : Assert(GetRelationIdentityOrPK(localrel) == localidxoid ||
2942 : (remoterel->replident == REPLICA_IDENTITY_FULL &&
2943 : IsIndexUsableForReplicaIdentityFull(idxrel,
2944 : edata->targetRel->attrmap)));
2945 : index_close(idxrel, AccessShareLock);
2946 : #endif
2947 :
2948 144198 : found = RelationFindReplTupleByIndex(localrel, localidxoid,
2949 : LockTupleExclusive,
2950 : remoteslot, *localslot);
2951 : }
2952 : else
2953 298 : found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
2954 : remoteslot, *localslot);
2955 :
2956 144496 : return found;
2957 : }
2958 :
2959 : /*
2960 : * This handles insert, update, delete on a partitioned table.
2961 : */
2962 : static void
2963 150 : apply_handle_tuple_routing(ApplyExecutionData *edata,
2964 : TupleTableSlot *remoteslot,
2965 : LogicalRepTupleData *newtup,
2966 : CmdType operation)
2967 : {
2968 150 : EState *estate = edata->estate;
2969 150 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2970 150 : ResultRelInfo *relinfo = edata->targetRelInfo;
2971 150 : Relation parentrel = relinfo->ri_RelationDesc;
2972 : ModifyTableState *mtstate;
2973 : PartitionTupleRouting *proute;
2974 : ResultRelInfo *partrelinfo;
2975 : Relation partrel;
2976 : TupleTableSlot *remoteslot_part;
2977 : TupleConversionMap *map;
2978 : MemoryContext oldctx;
2979 150 : LogicalRepRelMapEntry *part_entry = NULL;
2980 150 : AttrMap *attrmap = NULL;
2981 :
2982 : /* ModifyTableState is needed for ExecFindPartition(). */
2983 150 : edata->mtstate = mtstate = makeNode(ModifyTableState);
2984 150 : mtstate->ps.plan = NULL;
2985 150 : mtstate->ps.state = estate;
2986 150 : mtstate->operation = operation;
2987 150 : mtstate->resultRelInfo = relinfo;
2988 :
2989 : /* ... as is PartitionTupleRouting. */
2990 150 : edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
2991 :
2992 : /*
2993 : * Find the partition to which the "search tuple" belongs.
2994 : */
2995 : Assert(remoteslot != NULL);
2996 150 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2997 150 : partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
2998 : remoteslot, estate);
2999 : Assert(partrelinfo != NULL);
3000 150 : partrel = partrelinfo->ri_RelationDesc;
3001 :
3002 : /*
3003 : * Check for supported relkind. We need this since partitions might be of
3004 : * unsupported relkinds; and the set of partitions can change, so checking
3005 : * at CREATE/ALTER SUBSCRIPTION would be insufficient.
3006 : */
3007 150 : CheckSubscriptionRelkind(partrel->rd_rel->relkind,
3008 150 : get_namespace_name(RelationGetNamespace(partrel)),
3009 150 : RelationGetRelationName(partrel));
3010 :
3011 : /*
3012 : * To perform any of the operations below, the tuple must match the
3013 : * partition's rowtype. Convert if needed or just copy, using a dedicated
3014 : * slot to store the tuple in any case.
3015 : */
3016 150 : remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
3017 150 : if (remoteslot_part == NULL)
3018 84 : remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
3019 150 : map = ExecGetRootToChildMap(partrelinfo, estate);
3020 150 : if (map != NULL)
3021 : {
3022 66 : attrmap = map->attrMap;
3023 66 : remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
3024 : remoteslot_part);
3025 : }
3026 : else
3027 : {
3028 84 : remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
3029 84 : slot_getallattrs(remoteslot_part);
3030 : }
3031 150 : MemoryContextSwitchTo(oldctx);
3032 :
3033 : /* Check if we can do the update or delete on the leaf partition. */
3034 150 : if (operation == CMD_UPDATE || operation == CMD_DELETE)
3035 : {
3036 60 : part_entry = logicalrep_partition_open(relmapentry, partrel,
3037 : attrmap);
3038 60 : check_relation_updatable(part_entry);
3039 : }
3040 :
3041 150 : switch (operation)
3042 : {
3043 90 : case CMD_INSERT:
3044 90 : apply_handle_insert_internal(edata, partrelinfo,
3045 : remoteslot_part);
3046 88 : break;
3047 :
3048 34 : case CMD_DELETE:
3049 34 : apply_handle_delete_internal(edata, partrelinfo,
3050 : remoteslot_part,
3051 : part_entry->localindexoid);
3052 34 : break;
3053 :
3054 26 : case CMD_UPDATE:
3055 :
3056 : /*
3057 : * For UPDATE, depending on whether or not the updated tuple
3058 : * satisfies the partition's constraint, perform a simple UPDATE
3059 : * of the partition or move the updated tuple into a different
3060 : * suitable partition.
3061 : */
3062 : {
3063 : TupleTableSlot *localslot;
3064 : ResultRelInfo *partrelinfo_new;
3065 : Relation partrel_new;
3066 : bool found;
3067 : EPQState epqstate;
3068 26 : ConflictTupleInfo conflicttuple = {0};
3069 :
3070 : /* Get the matching local tuple from the partition. */
3071 26 : found = FindReplTupleInLocalRel(edata, partrel,
3072 : &part_entry->remoterel,
3073 : part_entry->localindexoid,
3074 : remoteslot_part, &localslot);
3075 26 : if (!found)
3076 : {
3077 4 : TupleTableSlot *newslot = localslot;
3078 :
3079 : /* Store the new tuple for conflict reporting */
3080 4 : slot_store_data(newslot, part_entry, newtup);
3081 :
3082 : /*
3083 : * The tuple to be updated could not be found. Do nothing
3084 : * except for emitting a log message.
3085 : */
3086 4 : ReportApplyConflict(estate, partrelinfo, LOG,
3087 : CT_UPDATE_MISSING, remoteslot_part,
3088 4 : newslot, list_make1(&conflicttuple));
3089 :
3090 4 : return;
3091 : }
3092 :
3093 : /*
3094 : * Report the conflict if the tuple was modified by a
3095 : * different origin.
3096 : */
3097 22 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3098 : &conflicttuple.origin,
3099 2 : &conflicttuple.ts) &&
3100 2 : conflicttuple.origin != replorigin_session_origin)
3101 : {
3102 : TupleTableSlot *newslot;
3103 :
3104 : /* Store the new tuple for conflict reporting */
3105 2 : newslot = table_slot_create(partrel, &estate->es_tupleTable);
3106 2 : slot_store_data(newslot, part_entry, newtup);
3107 :
3108 2 : conflicttuple.slot = localslot;
3109 :
3110 2 : ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
3111 : remoteslot_part, newslot,
3112 2 : list_make1(&conflicttuple));
3113 : }
3114 :
3115 : /*
3116 : * Apply the update to the local tuple, putting the result in
3117 : * remoteslot_part.
3118 : */
3119 22 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3120 22 : slot_modify_data(remoteslot_part, localslot, part_entry,
3121 : newtup);
3122 22 : MemoryContextSwitchTo(oldctx);
3123 :
3124 22 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3125 :
3126 : /*
3127 : * Does the updated tuple still satisfy the current
3128 : * partition's constraint?
3129 : */
3130 44 : if (!partrel->rd_rel->relispartition ||
3131 22 : ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3132 : false))
3133 : {
3134 : /*
3135 : * Yes, so simply UPDATE the partition. We don't call
3136 : * apply_handle_update_internal() here, which would
3137 : * normally do the following work, to avoid repeating some
3138 : * work already done above to find the local tuple in the
3139 : * partition.
3140 : */
3141 20 : InitConflictIndexes(partrelinfo);
3142 :
3143 20 : EvalPlanQualSetSlot(&epqstate, remoteslot_part);
3144 20 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
3145 : ACL_UPDATE);
3146 20 : ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3147 : localslot, remoteslot_part);
3148 : }
3149 : else
3150 : {
3151 : /* Move the tuple into the new partition. */
3152 :
3153 : /*
3154 : * New partition will be found using tuple routing, which
3155 : * can only occur via the parent table. We might need to
3156 : * convert the tuple to the parent's rowtype. Note that
3157 : * this is the tuple found in the partition, not the
3158 : * original search tuple received by this function.
3159 : */
3160 2 : if (map)
3161 : {
3162 : TupleConversionMap *PartitionToRootMap =
3163 2 : convert_tuples_by_name(RelationGetDescr(partrel),
3164 : RelationGetDescr(parentrel));
3165 :
3166 : remoteslot =
3167 2 : execute_attr_map_slot(PartitionToRootMap->attrMap,
3168 : remoteslot_part, remoteslot);
3169 : }
3170 : else
3171 : {
3172 0 : remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3173 0 : slot_getallattrs(remoteslot);
3174 : }
3175 :
3176 : /* Find the new partition. */
3177 2 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3178 2 : partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3179 : proute, remoteslot,
3180 : estate);
3181 2 : MemoryContextSwitchTo(oldctx);
3182 : Assert(partrelinfo_new != partrelinfo);
3183 2 : partrel_new = partrelinfo_new->ri_RelationDesc;
3184 :
3185 : /* Check that new partition also has supported relkind. */
3186 2 : CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3187 2 : get_namespace_name(RelationGetNamespace(partrel_new)),
3188 2 : RelationGetRelationName(partrel_new));
3189 :
3190 : /* DELETE old tuple found in the old partition. */
3191 2 : EvalPlanQualSetSlot(&epqstate, localslot);
3192 2 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, ACL_DELETE);
3193 2 : ExecSimpleRelationDelete(partrelinfo, estate, &epqstate, localslot);
3194 :
3195 : /* INSERT new tuple into the new partition. */
3196 :
3197 : /*
3198 : * Convert the replacement tuple to match the destination
3199 : * partition rowtype.
3200 : */
3201 2 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3202 2 : remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3203 2 : if (remoteslot_part == NULL)
3204 2 : remoteslot_part = table_slot_create(partrel_new,
3205 : &estate->es_tupleTable);
3206 2 : map = ExecGetRootToChildMap(partrelinfo_new, estate);
3207 2 : if (map != NULL)
3208 : {
3209 0 : remoteslot_part = execute_attr_map_slot(map->attrMap,
3210 : remoteslot,
3211 : remoteslot_part);
3212 : }
3213 : else
3214 : {
3215 2 : remoteslot_part = ExecCopySlot(remoteslot_part,
3216 : remoteslot);
3217 2 : slot_getallattrs(remoteslot);
3218 : }
3219 2 : MemoryContextSwitchTo(oldctx);
3220 2 : apply_handle_insert_internal(edata, partrelinfo_new,
3221 : remoteslot_part);
3222 : }
3223 :
3224 22 : EvalPlanQualEnd(&epqstate);
3225 : }
3226 22 : break;
3227 :
3228 0 : default:
3229 0 : elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3230 : break;
3231 : }
3232 : }
3233 :
3234 : /*
3235 : * Handle TRUNCATE message.
3236 : *
3237 : * TODO: FDW support
3238 : */
3239 : static void
3240 38 : apply_handle_truncate(StringInfo s)
3241 : {
3242 38 : bool cascade = false;
3243 38 : bool restart_seqs = false;
3244 38 : List *remote_relids = NIL;
3245 38 : List *remote_rels = NIL;
3246 38 : List *rels = NIL;
3247 38 : List *part_rels = NIL;
3248 38 : List *relids = NIL;
3249 38 : List *relids_logged = NIL;
3250 : ListCell *lc;
3251 38 : LOCKMODE lockmode = AccessExclusiveLock;
3252 :
3253 : /*
3254 : * Quick return if we are skipping data modification changes or handling
3255 : * streamed transactions.
3256 : */
3257 76 : if (is_skipping_changes() ||
3258 38 : handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
3259 0 : return;
3260 :
3261 38 : begin_replication_step();
3262 :
3263 38 : remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3264 :
3265 94 : foreach(lc, remote_relids)
3266 : {
3267 56 : LogicalRepRelId relid = lfirst_oid(lc);
3268 : LogicalRepRelMapEntry *rel;
3269 :
3270 56 : rel = logicalrep_rel_open(relid, lockmode);
3271 56 : if (!should_apply_changes_for_rel(rel))
3272 : {
3273 : /*
3274 : * The relation can't become interesting in the middle of the
3275 : * transaction so it's safe to unlock it.
3276 : */
3277 0 : logicalrep_rel_close(rel, lockmode);
3278 0 : continue;
3279 : }
3280 :
3281 56 : remote_rels = lappend(remote_rels, rel);
3282 56 : TargetPrivilegesCheck(rel->localrel, ACL_TRUNCATE);
3283 56 : rels = lappend(rels, rel->localrel);
3284 56 : relids = lappend_oid(relids, rel->localreloid);
3285 56 : if (RelationIsLogicallyLogged(rel->localrel))
3286 0 : relids_logged = lappend_oid(relids_logged, rel->localreloid);
3287 :
3288 : /*
3289 : * Truncate partitions if we got a message to truncate a partitioned
3290 : * table.
3291 : */
3292 56 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3293 : {
3294 : ListCell *child;
3295 8 : List *children = find_all_inheritors(rel->localreloid,
3296 : lockmode,
3297 : NULL);
3298 :
3299 30 : foreach(child, children)
3300 : {
3301 22 : Oid childrelid = lfirst_oid(child);
3302 : Relation childrel;
3303 :
3304 22 : if (list_member_oid(relids, childrelid))
3305 8 : continue;
3306 :
3307 : /* find_all_inheritors already got lock */
3308 14 : childrel = table_open(childrelid, NoLock);
3309 :
3310 : /*
3311 : * Ignore temp tables of other backends. See similar code in
3312 : * ExecuteTruncate().
3313 : */
3314 14 : if (RELATION_IS_OTHER_TEMP(childrel))
3315 : {
3316 0 : table_close(childrel, lockmode);
3317 0 : continue;
3318 : }
3319 :
3320 14 : TargetPrivilegesCheck(childrel, ACL_TRUNCATE);
3321 14 : rels = lappend(rels, childrel);
3322 14 : part_rels = lappend(part_rels, childrel);
3323 14 : relids = lappend_oid(relids, childrelid);
3324 : /* Log this relation only if needed for logical decoding */
3325 14 : if (RelationIsLogicallyLogged(childrel))
3326 0 : relids_logged = lappend_oid(relids_logged, childrelid);
3327 : }
3328 : }
3329 : }
3330 :
3331 : /*
3332 : * Even if we used CASCADE on the upstream primary we explicitly default
3333 : * to replaying changes without further cascading. This might be later
3334 : * changeable with a user specified option.
3335 : *
3336 : * MySubscription->runasowner tells us whether we want to execute
3337 : * replication actions as the subscription owner; the last argument to
3338 : * TruncateGuts tells it whether we want to switch to the table owner.
3339 : * Those are exactly opposite conditions.
3340 : */
3341 38 : ExecuteTruncateGuts(rels,
3342 : relids,
3343 : relids_logged,
3344 : DROP_RESTRICT,
3345 : restart_seqs,
3346 38 : !MySubscription->runasowner);
3347 94 : foreach(lc, remote_rels)
3348 : {
3349 56 : LogicalRepRelMapEntry *rel = lfirst(lc);
3350 :
3351 56 : logicalrep_rel_close(rel, NoLock);
3352 : }
3353 52 : foreach(lc, part_rels)
3354 : {
3355 14 : Relation rel = lfirst(lc);
3356 :
3357 14 : table_close(rel, NoLock);
3358 : }
3359 :
3360 38 : end_replication_step();
3361 : }
3362 :
3363 :
3364 : /*
3365 : * Logical replication protocol message dispatcher.
3366 : */
3367 : void
3368 674314 : apply_dispatch(StringInfo s)
3369 : {
3370 674314 : LogicalRepMsgType action = pq_getmsgbyte(s);
3371 : LogicalRepMsgType saved_command;
3372 :
3373 : /*
3374 : * Set the current command being applied. Since this function can be
3375 : * called recursively when applying spooled changes, save the current
3376 : * command.
3377 : */
3378 674314 : saved_command = apply_error_callback_arg.command;
3379 674314 : apply_error_callback_arg.command = action;
3380 :
3381 674314 : switch (action)
3382 : {
3383 908 : case LOGICAL_REP_MSG_BEGIN:
3384 908 : apply_handle_begin(s);
3385 908 : break;
3386 :
3387 848 : case LOGICAL_REP_MSG_COMMIT:
3388 848 : apply_handle_commit(s);
3389 848 : break;
3390 :
3391 371736 : case LOGICAL_REP_MSG_INSERT:
3392 371736 : apply_handle_insert(s);
3393 371688 : break;
3394 :
3395 132318 : case LOGICAL_REP_MSG_UPDATE:
3396 132318 : apply_handle_update(s);
3397 132302 : break;
3398 :
3399 163866 : case LOGICAL_REP_MSG_DELETE:
3400 163866 : apply_handle_delete(s);
3401 163866 : break;
3402 :
3403 38 : case LOGICAL_REP_MSG_TRUNCATE:
3404 38 : apply_handle_truncate(s);
3405 38 : break;
3406 :
3407 870 : case LOGICAL_REP_MSG_RELATION:
3408 870 : apply_handle_relation(s);
3409 870 : break;
3410 :
3411 36 : case LOGICAL_REP_MSG_TYPE:
3412 36 : apply_handle_type(s);
3413 36 : break;
3414 :
3415 16 : case LOGICAL_REP_MSG_ORIGIN:
3416 16 : apply_handle_origin(s);
3417 16 : break;
3418 :
3419 0 : case LOGICAL_REP_MSG_MESSAGE:
3420 :
3421 : /*
3422 : * Logical replication does not use generic logical messages yet.
3423 : * Although, it could be used by other applications that use this
3424 : * output plugin.
3425 : */
3426 0 : break;
3427 :
3428 1674 : case LOGICAL_REP_MSG_STREAM_START:
3429 1674 : apply_handle_stream_start(s);
3430 1674 : break;
3431 :
3432 1672 : case LOGICAL_REP_MSG_STREAM_STOP:
3433 1672 : apply_handle_stream_stop(s);
3434 1668 : break;
3435 :
3436 76 : case LOGICAL_REP_MSG_STREAM_ABORT:
3437 76 : apply_handle_stream_abort(s);
3438 76 : break;
3439 :
3440 122 : case LOGICAL_REP_MSG_STREAM_COMMIT:
3441 122 : apply_handle_stream_commit(s);
3442 118 : break;
3443 :
3444 32 : case LOGICAL_REP_MSG_BEGIN_PREPARE:
3445 32 : apply_handle_begin_prepare(s);
3446 32 : break;
3447 :
3448 30 : case LOGICAL_REP_MSG_PREPARE:
3449 30 : apply_handle_prepare(s);
3450 28 : break;
3451 :
3452 40 : case LOGICAL_REP_MSG_COMMIT_PREPARED:
3453 40 : apply_handle_commit_prepared(s);
3454 40 : break;
3455 :
3456 10 : case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
3457 10 : apply_handle_rollback_prepared(s);
3458 10 : break;
3459 :
3460 22 : case LOGICAL_REP_MSG_STREAM_PREPARE:
3461 22 : apply_handle_stream_prepare(s);
3462 22 : break;
3463 :
3464 0 : default:
3465 0 : ereport(ERROR,
3466 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
3467 : errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3468 : }
3469 :
3470 : /* Reset the current command */
3471 674240 : apply_error_callback_arg.command = saved_command;
3472 674240 : }
3473 :
3474 : /*
3475 : * Figure out which write/flush positions to report to the walsender process.
3476 : *
3477 : * We can't simply report back the last LSN the walsender sent us because the
3478 : * local transaction might not yet be flushed to disk locally. Instead we
3479 : * build a list that associates local with remote LSNs for every commit. When
3480 : * reporting back the flush position to the sender we iterate that list and
3481 : * check which entries on it are already locally flushed. Those we can report
3482 : * as having been flushed.
3483 : *
3484 : * The have_pending_txes is true if there are outstanding transactions that
3485 : * need to be flushed.
3486 : */
3487 : static void
3488 130688 : get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
3489 : bool *have_pending_txes)
3490 : {
3491 : dlist_mutable_iter iter;
3492 130688 : XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3493 :
3494 130688 : *write = InvalidXLogRecPtr;
3495 130688 : *flush = InvalidXLogRecPtr;
3496 :
3497 131640 : dlist_foreach_modify(iter, &lsn_mapping)
3498 : {
3499 24944 : FlushPosition *pos =
3500 24944 : dlist_container(FlushPosition, node, iter.cur);
3501 :
3502 24944 : *write = pos->remote_end;
3503 :
3504 24944 : if (pos->local_end <= local_flush)
3505 : {
3506 952 : *flush = pos->remote_end;
3507 952 : dlist_delete(iter.cur);
3508 952 : pfree(pos);
3509 : }
3510 : else
3511 : {
3512 : /*
3513 : * Don't want to uselessly iterate over the rest of the list which
3514 : * could potentially be long. Instead get the last element and
3515 : * grab the write position from there.
3516 : */
3517 23992 : pos = dlist_tail_element(FlushPosition, node,
3518 : &lsn_mapping);
3519 23992 : *write = pos->remote_end;
3520 23992 : *have_pending_txes = true;
3521 23992 : return;
3522 : }
3523 : }
3524 :
3525 106696 : *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3526 : }
3527 :
3528 : /*
3529 : * Store current remote/local lsn pair in the tracking list.
3530 : */
3531 : void
3532 1060 : store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
3533 : {
3534 : FlushPosition *flushpos;
3535 :
3536 : /*
3537 : * Skip for parallel apply workers, because the lsn_mapping is maintained
3538 : * by the leader apply worker.
3539 : */
3540 1060 : if (am_parallel_apply_worker())
3541 38 : return;
3542 :
3543 : /* Need to do this in permanent context */
3544 1022 : MemoryContextSwitchTo(ApplyContext);
3545 :
3546 : /* Track commit lsn */
3547 1022 : flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
3548 1022 : flushpos->local_end = local_lsn;
3549 1022 : flushpos->remote_end = remote_lsn;
3550 :
3551 1022 : dlist_push_tail(&lsn_mapping, &flushpos->node);
3552 1022 : MemoryContextSwitchTo(ApplyMessageContext);
3553 : }
3554 :
3555 :
3556 : /* Update statistics of the worker. */
3557 : static void
3558 374344 : UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
3559 : {
3560 374344 : MyLogicalRepWorker->last_lsn = last_lsn;
3561 374344 : MyLogicalRepWorker->last_send_time = send_time;
3562 374344 : MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
3563 374344 : if (reply)
3564 : {
3565 4774 : MyLogicalRepWorker->reply_lsn = last_lsn;
3566 4774 : MyLogicalRepWorker->reply_time = send_time;
3567 : }
3568 374344 : }
3569 :
3570 : /*
3571 : * Apply main loop.
3572 : */
3573 : static void
3574 746 : LogicalRepApplyLoop(XLogRecPtr last_received)
3575 : {
3576 746 : TimestampTz last_recv_timestamp = GetCurrentTimestamp();
3577 746 : bool ping_sent = false;
3578 : TimeLineID tli;
3579 : ErrorContextCallback errcallback;
3580 :
3581 : /*
3582 : * Init the ApplyMessageContext which we clean up after each replication
3583 : * protocol message.
3584 : */
3585 746 : ApplyMessageContext = AllocSetContextCreate(ApplyContext,
3586 : "ApplyMessageContext",
3587 : ALLOCSET_DEFAULT_SIZES);
3588 :
3589 : /*
3590 : * This memory context is used for per-stream data when the streaming mode
3591 : * is enabled. This context is reset on each stream stop.
3592 : */
3593 746 : LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
3594 : "LogicalStreamingContext",
3595 : ALLOCSET_DEFAULT_SIZES);
3596 :
3597 : /* mark as idle, before starting to loop */
3598 746 : pgstat_report_activity(STATE_IDLE, NULL);
3599 :
3600 : /*
3601 : * Push apply error context callback. Fields will be filled while applying
3602 : * a change.
3603 : */
3604 746 : errcallback.callback = apply_error_callback;
3605 746 : errcallback.previous = error_context_stack;
3606 746 : error_context_stack = &errcallback;
3607 746 : apply_error_context_stack = error_context_stack;
3608 :
3609 : /* This outer loop iterates once per wait. */
3610 : for (;;)
3611 124956 : {
3612 125702 : pgsocket fd = PGINVALID_SOCKET;
3613 : int rc;
3614 : int len;
3615 125702 : char *buf = NULL;
3616 125702 : bool endofstream = false;
3617 : long wait_time;
3618 :
3619 125702 : CHECK_FOR_INTERRUPTS();
3620 :
3621 125700 : MemoryContextSwitchTo(ApplyMessageContext);
3622 :
3623 125700 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
3624 :
3625 125668 : if (len != 0)
3626 : {
3627 : /* Loop to process all available data (without blocking). */
3628 : for (;;)
3629 : {
3630 497986 : CHECK_FOR_INTERRUPTS();
3631 :
3632 497986 : if (len == 0)
3633 : {
3634 123624 : break;
3635 : }
3636 374362 : else if (len < 0)
3637 : {
3638 18 : ereport(LOG,
3639 : (errmsg("data stream from publisher has ended")));
3640 18 : endofstream = true;
3641 18 : break;
3642 : }
3643 : else
3644 : {
3645 : int c;
3646 : StringInfoData s;
3647 :
3648 374344 : if (ConfigReloadPending)
3649 : {
3650 0 : ConfigReloadPending = false;
3651 0 : ProcessConfigFile(PGC_SIGHUP);
3652 : }
3653 :
3654 : /* Reset timeout. */
3655 374344 : last_recv_timestamp = GetCurrentTimestamp();
3656 374344 : ping_sent = false;
3657 :
3658 : /* Ensure we are reading the data into our memory context. */
3659 374344 : MemoryContextSwitchTo(ApplyMessageContext);
3660 :
3661 374344 : initReadOnlyStringInfo(&s, buf, len);
3662 :
3663 374344 : c = pq_getmsgbyte(&s);
3664 :
3665 374344 : if (c == 'w')
3666 : {
3667 : XLogRecPtr start_lsn;
3668 : XLogRecPtr end_lsn;
3669 : TimestampTz send_time;
3670 :
3671 369570 : start_lsn = pq_getmsgint64(&s);
3672 369570 : end_lsn = pq_getmsgint64(&s);
3673 369570 : send_time = pq_getmsgint64(&s);
3674 :
3675 369570 : if (last_received < start_lsn)
3676 297208 : last_received = start_lsn;
3677 :
3678 369570 : if (last_received < end_lsn)
3679 0 : last_received = end_lsn;
3680 :
3681 369570 : UpdateWorkerStats(last_received, send_time, false);
3682 :
3683 369570 : apply_dispatch(&s);
3684 : }
3685 4774 : else if (c == 'k')
3686 : {
3687 : XLogRecPtr end_lsn;
3688 : TimestampTz timestamp;
3689 : bool reply_requested;
3690 :
3691 4774 : end_lsn = pq_getmsgint64(&s);
3692 4774 : timestamp = pq_getmsgint64(&s);
3693 4774 : reply_requested = pq_getmsgbyte(&s);
3694 :
3695 4774 : if (last_received < end_lsn)
3696 1748 : last_received = end_lsn;
3697 :
3698 4774 : send_feedback(last_received, reply_requested, false);
3699 4774 : UpdateWorkerStats(last_received, timestamp, true);
3700 : }
3701 : /* other message types are purposefully ignored */
3702 :
3703 374276 : MemoryContextReset(ApplyMessageContext);
3704 : }
3705 :
3706 374276 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
3707 : }
3708 : }
3709 :
3710 : /* confirm all writes so far */
3711 125600 : send_feedback(last_received, false, false);
3712 :
3713 125600 : if (!in_remote_transaction && !in_streamed_transaction)
3714 : {
3715 : /*
3716 : * If we didn't get any transactions for a while there might be
3717 : * unconsumed invalidation messages in the queue, consume them
3718 : * now.
3719 : */
3720 6692 : AcceptInvalidationMessages();
3721 6692 : maybe_reread_subscription();
3722 :
3723 : /* Process any table synchronization changes. */
3724 6624 : process_syncing_tables(last_received);
3725 : }
3726 :
3727 : /* Cleanup the memory. */
3728 125154 : MemoryContextReset(ApplyMessageContext);
3729 125154 : MemoryContextSwitchTo(TopMemoryContext);
3730 :
3731 : /* Check if we need to exit the streaming loop. */
3732 125154 : if (endofstream)
3733 18 : break;
3734 :
3735 : /*
3736 : * Wait for more data or latch. If we have unflushed transactions,
3737 : * wake up after WalWriterDelay to see if they've been flushed yet (in
3738 : * which case we should send a feedback message). Otherwise, there's
3739 : * no particular urgency about waking up unless we get data or a
3740 : * signal.
3741 : */
3742 125136 : if (!dlist_is_empty(&lsn_mapping))
3743 21338 : wait_time = WalWriterDelay;
3744 : else
3745 103798 : wait_time = NAPTIME_PER_CYCLE;
3746 :
3747 125136 : rc = WaitLatchOrSocket(MyLatch,
3748 : WL_SOCKET_READABLE | WL_LATCH_SET |
3749 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
3750 : fd, wait_time,
3751 : WAIT_EVENT_LOGICAL_APPLY_MAIN);
3752 :
3753 125136 : if (rc & WL_LATCH_SET)
3754 : {
3755 1378 : ResetLatch(MyLatch);
3756 1378 : CHECK_FOR_INTERRUPTS();
3757 : }
3758 :
3759 124956 : if (ConfigReloadPending)
3760 : {
3761 16 : ConfigReloadPending = false;
3762 16 : ProcessConfigFile(PGC_SIGHUP);
3763 : }
3764 :
3765 124956 : if (rc & WL_TIMEOUT)
3766 : {
3767 : /*
3768 : * We didn't receive anything new. If we haven't heard anything
3769 : * from the server for more than wal_receiver_timeout / 2, ping
3770 : * the server. Also, if it's been longer than
3771 : * wal_receiver_status_interval since the last update we sent,
3772 : * send a status update to the primary anyway, to report any
3773 : * progress in applying WAL.
3774 : */
3775 314 : bool requestReply = false;
3776 :
3777 : /*
3778 : * Check if time since last receive from primary has reached the
3779 : * configured limit.
3780 : */
3781 314 : if (wal_receiver_timeout > 0)
3782 : {
3783 314 : TimestampTz now = GetCurrentTimestamp();
3784 : TimestampTz timeout;
3785 :
3786 314 : timeout =
3787 314 : TimestampTzPlusMilliseconds(last_recv_timestamp,
3788 : wal_receiver_timeout);
3789 :
3790 314 : if (now >= timeout)
3791 0 : ereport(ERROR,
3792 : (errcode(ERRCODE_CONNECTION_FAILURE),
3793 : errmsg("terminating logical replication worker due to timeout")));
3794 :
3795 : /* Check to see if it's time for a ping. */
3796 314 : if (!ping_sent)
3797 : {
3798 314 : timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
3799 : (wal_receiver_timeout / 2));
3800 314 : if (now >= timeout)
3801 : {
3802 0 : requestReply = true;
3803 0 : ping_sent = true;
3804 : }
3805 : }
3806 : }
3807 :
3808 314 : send_feedback(last_received, requestReply, requestReply);
3809 :
3810 : /*
3811 : * Force reporting to ensure long idle periods don't lead to
3812 : * arbitrarily delayed stats. Stats can only be reported outside
3813 : * of (implicit or explicit) transactions. That shouldn't lead to
3814 : * stats being delayed for long, because transactions are either
3815 : * sent as a whole on commit or streamed. Streamed transactions
3816 : * are spilled to disk and applied on commit.
3817 : */
3818 314 : if (!IsTransactionState())
3819 314 : pgstat_report_stat(true);
3820 : }
3821 : }
3822 :
3823 : /* Pop the error context stack */
3824 18 : error_context_stack = errcallback.previous;
3825 18 : apply_error_context_stack = error_context_stack;
3826 :
3827 : /* All done */
3828 18 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
3829 0 : }
3830 :
3831 : /*
3832 : * Send a Standby Status Update message to server.
3833 : *
3834 : * 'recvpos' is the latest LSN we've received data to, force is set if we need
3835 : * to send a response to avoid timeouts.
3836 : */
3837 : static void
3838 130688 : send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
3839 : {
3840 : static StringInfo reply_message = NULL;
3841 : static TimestampTz send_time = 0;
3842 :
3843 : static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
3844 : static XLogRecPtr last_writepos = InvalidXLogRecPtr;
3845 : static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
3846 :
3847 : XLogRecPtr writepos;
3848 : XLogRecPtr flushpos;
3849 : TimestampTz now;
3850 : bool have_pending_txes;
3851 :
3852 : /*
3853 : * If the user doesn't want status to be reported to the publisher, be
3854 : * sure to exit before doing anything at all.
3855 : */
3856 130688 : if (!force && wal_receiver_status_interval <= 0)
3857 43560 : return;
3858 :
3859 : /* It's legal to not pass a recvpos */
3860 130688 : if (recvpos < last_recvpos)
3861 0 : recvpos = last_recvpos;
3862 :
3863 130688 : get_flush_position(&writepos, &flushpos, &have_pending_txes);
3864 :
3865 : /*
3866 : * No outstanding transactions to flush, we can report the latest received
3867 : * position. This is important for synchronous replication.
3868 : */
3869 130688 : if (!have_pending_txes)
3870 106696 : flushpos = writepos = recvpos;
3871 :
3872 130688 : if (writepos < last_writepos)
3873 0 : writepos = last_writepos;
3874 :
3875 130688 : if (flushpos < last_flushpos)
3876 23910 : flushpos = last_flushpos;
3877 :
3878 130688 : now = GetCurrentTimestamp();
3879 :
3880 : /* if we've already reported everything we're good */
3881 130688 : if (!force &&
3882 129016 : writepos == last_writepos &&
3883 44062 : flushpos == last_flushpos &&
3884 43804 : !TimestampDifferenceExceeds(send_time, now,
3885 : wal_receiver_status_interval * 1000))
3886 43560 : return;
3887 87128 : send_time = now;
3888 :
3889 87128 : if (!reply_message)
3890 : {
3891 746 : MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
3892 :
3893 746 : reply_message = makeStringInfo();
3894 746 : MemoryContextSwitchTo(oldctx);
3895 : }
3896 : else
3897 86382 : resetStringInfo(reply_message);
3898 :
3899 87128 : pq_sendbyte(reply_message, 'r');
3900 87128 : pq_sendint64(reply_message, recvpos); /* write */
3901 87128 : pq_sendint64(reply_message, flushpos); /* flush */
3902 87128 : pq_sendint64(reply_message, writepos); /* apply */
3903 87128 : pq_sendint64(reply_message, now); /* sendTime */
3904 87128 : pq_sendbyte(reply_message, requestReply); /* replyRequested */
3905 :
3906 87128 : elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
3907 : force,
3908 : LSN_FORMAT_ARGS(recvpos),
3909 : LSN_FORMAT_ARGS(writepos),
3910 : LSN_FORMAT_ARGS(flushpos));
3911 :
3912 87128 : walrcv_send(LogRepWorkerWalRcvConn,
3913 : reply_message->data, reply_message->len);
3914 :
3915 87128 : if (recvpos > last_recvpos)
3916 84962 : last_recvpos = recvpos;
3917 87128 : if (writepos > last_writepos)
3918 84960 : last_writepos = writepos;
3919 87128 : if (flushpos > last_flushpos)
3920 84444 : last_flushpos = flushpos;
3921 : }
3922 :
3923 : /*
3924 : * Exit routine for apply workers due to subscription parameter changes.
3925 : */
3926 : static void
3927 74 : apply_worker_exit(void)
3928 : {
3929 74 : if (am_parallel_apply_worker())
3930 : {
3931 : /*
3932 : * Don't stop the parallel apply worker as the leader will detect the
3933 : * subscription parameter change and restart logical replication later
3934 : * anyway. This also prevents the leader from reporting errors when
3935 : * trying to communicate with a stopped parallel apply worker, which
3936 : * would accidentally disable subscriptions if disable_on_error was
3937 : * set.
3938 : */
3939 0 : return;
3940 : }
3941 :
3942 : /*
3943 : * Reset the last-start time for this apply worker so that the launcher
3944 : * will restart it without waiting for wal_retrieve_retry_interval if the
3945 : * subscription is still active, and so that we won't leak that hash table
3946 : * entry if it isn't.
3947 : */
3948 74 : if (am_leader_apply_worker())
3949 74 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
3950 :
3951 74 : proc_exit(0);
3952 : }
3953 :
3954 : /*
3955 : * Reread subscription info if needed.
3956 : *
3957 : * For significant changes, we react by exiting the current process; a new
3958 : * one will be launched afterwards if needed.
3959 : */
3960 : void
3961 8658 : maybe_reread_subscription(void)
3962 : {
3963 : MemoryContext oldctx;
3964 : Subscription *newsub;
3965 8658 : bool started_tx = false;
3966 :
3967 : /* When cache state is valid there is nothing to do here. */
3968 8658 : if (MySubscriptionValid)
3969 8508 : return;
3970 :
3971 : /* This function might be called inside or outside of transaction. */
3972 150 : if (!IsTransactionState())
3973 : {
3974 142 : StartTransactionCommand();
3975 142 : started_tx = true;
3976 : }
3977 :
3978 : /* Ensure allocations in permanent context. */
3979 150 : oldctx = MemoryContextSwitchTo(ApplyContext);
3980 :
3981 150 : newsub = GetSubscription(MyLogicalRepWorker->subid, true);
3982 :
3983 : /*
3984 : * Exit if the subscription was removed. This normally should not happen
3985 : * as the worker gets killed during DROP SUBSCRIPTION.
3986 : */
3987 150 : if (!newsub)
3988 : {
3989 0 : ereport(LOG,
3990 : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
3991 : MySubscription->name)));
3992 :
3993 : /* Ensure we remove no-longer-useful entry for worker's start time */
3994 0 : if (am_leader_apply_worker())
3995 0 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
3996 :
3997 0 : proc_exit(0);
3998 : }
3999 :
4000 : /* Exit if the subscription was disabled. */
4001 150 : if (!newsub->enabled)
4002 : {
4003 18 : ereport(LOG,
4004 : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
4005 : MySubscription->name)));
4006 :
4007 18 : apply_worker_exit();
4008 : }
4009 :
4010 : /* !slotname should never happen when enabled is true. */
4011 : Assert(newsub->slotname);
4012 :
4013 : /* two-phase cannot be altered while the worker is running */
4014 : Assert(newsub->twophasestate == MySubscription->twophasestate);
4015 :
4016 : /*
4017 : * Exit if any parameter that affects the remote connection was changed.
4018 : * The launcher will start a new worker but note that the parallel apply
4019 : * worker won't restart if the streaming option's value is changed from
4020 : * 'parallel' to any other value or the server decides not to stream the
4021 : * in-progress transaction.
4022 : */
4023 132 : if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
4024 128 : strcmp(newsub->name, MySubscription->name) != 0 ||
4025 126 : strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
4026 126 : newsub->binary != MySubscription->binary ||
4027 114 : newsub->stream != MySubscription->stream ||
4028 104 : newsub->passwordrequired != MySubscription->passwordrequired ||
4029 104 : strcmp(newsub->origin, MySubscription->origin) != 0 ||
4030 104 : newsub->owner != MySubscription->owner ||
4031 102 : !equal(newsub->publications, MySubscription->publications))
4032 : {
4033 48 : if (am_parallel_apply_worker())
4034 0 : ereport(LOG,
4035 : (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
4036 : MySubscription->name)));
4037 : else
4038 48 : ereport(LOG,
4039 : (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
4040 : MySubscription->name)));
4041 :
4042 48 : apply_worker_exit();
4043 : }
4044 :
4045 : /*
4046 : * Exit if the subscription owner's superuser privileges have been
4047 : * revoked.
4048 : */
4049 84 : if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
4050 : {
4051 8 : if (am_parallel_apply_worker())
4052 0 : ereport(LOG,
4053 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
4054 : MySubscription->name));
4055 : else
4056 8 : ereport(LOG,
4057 : errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
4058 : MySubscription->name));
4059 :
4060 8 : apply_worker_exit();
4061 : }
4062 :
4063 : /* Check for other changes that should never happen too. */
4064 76 : if (newsub->dbid != MySubscription->dbid)
4065 : {
4066 0 : elog(ERROR, "subscription %u changed unexpectedly",
4067 : MyLogicalRepWorker->subid);
4068 : }
4069 :
4070 : /* Clean old subscription info and switch to new one. */
4071 76 : FreeSubscription(MySubscription);
4072 76 : MySubscription = newsub;
4073 :
4074 76 : MemoryContextSwitchTo(oldctx);
4075 :
4076 : /* Change synchronous commit according to the user's wishes */
4077 76 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
4078 : PGC_BACKEND, PGC_S_OVERRIDE);
4079 :
4080 76 : if (started_tx)
4081 74 : CommitTransactionCommand();
4082 :
4083 76 : MySubscriptionValid = true;
4084 : }
4085 :
4086 : /*
4087 : * Callback from subscription syscache invalidation.
4088 : */
4089 : static void
4090 160 : subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
4091 : {
4092 160 : MySubscriptionValid = false;
4093 160 : }
4094 :
4095 : /*
4096 : * subxact_info_write
4097 : * Store information about subxacts for a toplevel transaction.
4098 : *
4099 : * For each subxact we store offset of its first change in the main file.
4100 : * The file is always over-written as a whole.
4101 : *
4102 : * XXX We should only store subxacts that were not aborted yet.
4103 : */
4104 : static void
4105 744 : subxact_info_write(Oid subid, TransactionId xid)
4106 : {
4107 : char path[MAXPGPATH];
4108 : Size len;
4109 : BufFile *fd;
4110 :
4111 : Assert(TransactionIdIsValid(xid));
4112 :
4113 : /* construct the subxact filename */
4114 744 : subxact_filename(path, subid, xid);
4115 :
4116 : /* Delete the subxacts file, if exists. */
4117 744 : if (subxact_data.nsubxacts == 0)
4118 : {
4119 580 : cleanup_subxact_info();
4120 580 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
4121 :
4122 580 : return;
4123 : }
4124 :
4125 : /*
4126 : * Create the subxact file if it not already created, otherwise open the
4127 : * existing file.
4128 : */
4129 164 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR,
4130 : true);
4131 164 : if (fd == NULL)
4132 16 : fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path);
4133 :
4134 164 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
4135 :
4136 : /* Write the subxact count and subxact info */
4137 164 : BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
4138 164 : BufFileWrite(fd, subxact_data.subxacts, len);
4139 :
4140 164 : BufFileClose(fd);
4141 :
4142 : /* free the memory allocated for subxact info */
4143 164 : cleanup_subxact_info();
4144 : }
4145 :
4146 : /*
4147 : * subxact_info_read
4148 : * Restore information about subxacts of a streamed transaction.
4149 : *
4150 : * Read information about subxacts into the structure subxact_data that can be
4151 : * used later.
4152 : */
4153 : static void
4154 688 : subxact_info_read(Oid subid, TransactionId xid)
4155 : {
4156 : char path[MAXPGPATH];
4157 : Size len;
4158 : BufFile *fd;
4159 : MemoryContext oldctx;
4160 :
4161 : Assert(!subxact_data.subxacts);
4162 : Assert(subxact_data.nsubxacts == 0);
4163 : Assert(subxact_data.nsubxacts_max == 0);
4164 :
4165 : /*
4166 : * If the subxact file doesn't exist that means we don't have any subxact
4167 : * info.
4168 : */
4169 688 : subxact_filename(path, subid, xid);
4170 688 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
4171 : true);
4172 688 : if (fd == NULL)
4173 530 : return;
4174 :
4175 : /* read number of subxact items */
4176 158 : BufFileReadExact(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
4177 :
4178 158 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
4179 :
4180 : /* we keep the maximum as a power of 2 */
4181 158 : subxact_data.nsubxacts_max = 1 << my_log2(subxact_data.nsubxacts);
4182 :
4183 : /*
4184 : * Allocate subxact information in the logical streaming context. We need
4185 : * this information during the complete stream so that we can add the sub
4186 : * transaction info to this. On stream stop we will flush this information
4187 : * to the subxact file and reset the logical streaming context.
4188 : */
4189 158 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
4190 158 : subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
4191 : sizeof(SubXactInfo));
4192 158 : MemoryContextSwitchTo(oldctx);
4193 :
4194 158 : if (len > 0)
4195 158 : BufFileReadExact(fd, subxact_data.subxacts, len);
4196 :
4197 158 : BufFileClose(fd);
4198 : }
4199 :
4200 : /*
4201 : * subxact_info_add
4202 : * Add information about a subxact (offset in the main file).
4203 : */
4204 : static void
4205 205026 : subxact_info_add(TransactionId xid)
4206 : {
4207 205026 : SubXactInfo *subxacts = subxact_data.subxacts;
4208 : int64 i;
4209 :
4210 : /* We must have a valid top level stream xid and a stream fd. */
4211 : Assert(TransactionIdIsValid(stream_xid));
4212 : Assert(stream_fd != NULL);
4213 :
4214 : /*
4215 : * If the XID matches the toplevel transaction, we don't want to add it.
4216 : */
4217 205026 : if (stream_xid == xid)
4218 184778 : return;
4219 :
4220 : /*
4221 : * In most cases we're checking the same subxact as we've already seen in
4222 : * the last call, so make sure to ignore it (this change comes later).
4223 : */
4224 20248 : if (subxact_data.subxact_last == xid)
4225 20096 : return;
4226 :
4227 : /* OK, remember we're processing this XID. */
4228 152 : subxact_data.subxact_last = xid;
4229 :
4230 : /*
4231 : * Check if the transaction is already present in the array of subxact. We
4232 : * intentionally scan the array from the tail, because we're likely adding
4233 : * a change for the most recent subtransactions.
4234 : *
4235 : * XXX Can we rely on the subxact XIDs arriving in sorted order? That
4236 : * would allow us to use binary search here.
4237 : */
4238 190 : for (i = subxact_data.nsubxacts; i > 0; i--)
4239 : {
4240 : /* found, so we're done */
4241 152 : if (subxacts[i - 1].xid == xid)
4242 114 : return;
4243 : }
4244 :
4245 : /* This is a new subxact, so we need to add it to the array. */
4246 38 : if (subxact_data.nsubxacts == 0)
4247 : {
4248 : MemoryContext oldctx;
4249 :
4250 16 : subxact_data.nsubxacts_max = 128;
4251 :
4252 : /*
4253 : * Allocate this memory for subxacts in per-stream context, see
4254 : * subxact_info_read.
4255 : */
4256 16 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
4257 16 : subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
4258 16 : MemoryContextSwitchTo(oldctx);
4259 : }
4260 22 : else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
4261 : {
4262 20 : subxact_data.nsubxacts_max *= 2;
4263 20 : subxacts = repalloc(subxacts,
4264 20 : subxact_data.nsubxacts_max * sizeof(SubXactInfo));
4265 : }
4266 :
4267 38 : subxacts[subxact_data.nsubxacts].xid = xid;
4268 :
4269 : /*
4270 : * Get the current offset of the stream file and store it as offset of
4271 : * this subxact.
4272 : */
4273 38 : BufFileTell(stream_fd,
4274 38 : &subxacts[subxact_data.nsubxacts].fileno,
4275 38 : &subxacts[subxact_data.nsubxacts].offset);
4276 :
4277 38 : subxact_data.nsubxacts++;
4278 38 : subxact_data.subxacts = subxacts;
4279 : }
4280 :
4281 : /* format filename for file containing the info about subxacts */
4282 : static inline void
4283 1494 : subxact_filename(char *path, Oid subid, TransactionId xid)
4284 : {
4285 1494 : snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
4286 1494 : }
4287 :
4288 : /* format filename for file containing serialized changes */
4289 : static inline void
4290 876 : changes_filename(char *path, Oid subid, TransactionId xid)
4291 : {
4292 876 : snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
4293 876 : }
4294 :
4295 : /*
4296 : * stream_cleanup_files
4297 : * Cleanup files for a subscription / toplevel transaction.
4298 : *
4299 : * Remove files with serialized changes and subxact info for a particular
4300 : * toplevel transaction. Each subscription has a separate set of files
4301 : * for any toplevel transaction.
4302 : */
4303 : void
4304 62 : stream_cleanup_files(Oid subid, TransactionId xid)
4305 : {
4306 : char path[MAXPGPATH];
4307 :
4308 : /* Delete the changes file. */
4309 62 : changes_filename(path, subid, xid);
4310 62 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false);
4311 :
4312 : /* Delete the subxact file, if it exists. */
4313 62 : subxact_filename(path, subid, xid);
4314 62 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
4315 62 : }
4316 :
4317 : /*
4318 : * stream_open_file
4319 : * Open a file that we'll use to serialize changes for a toplevel
4320 : * transaction.
4321 : *
4322 : * Open a file for streamed changes from a toplevel transaction identified
4323 : * by stream_xid (global variable). If it's the first chunk of streamed
4324 : * changes for this transaction, create the buffile, otherwise open the
4325 : * previously created file.
4326 : */
4327 : static void
4328 726 : stream_open_file(Oid subid, TransactionId xid, bool first_segment)
4329 : {
4330 : char path[MAXPGPATH];
4331 : MemoryContext oldcxt;
4332 :
4333 : Assert(OidIsValid(subid));
4334 : Assert(TransactionIdIsValid(xid));
4335 : Assert(stream_fd == NULL);
4336 :
4337 :
4338 726 : changes_filename(path, subid, xid);
4339 726 : elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
4340 :
4341 : /*
4342 : * Create/open the buffiles under the logical streaming context so that we
4343 : * have those files until stream stop.
4344 : */
4345 726 : oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
4346 :
4347 : /*
4348 : * If this is the first streamed segment, create the changes file.
4349 : * Otherwise, just open the file for writing, in append mode.
4350 : */
4351 726 : if (first_segment)
4352 64 : stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
4353 : path);
4354 : else
4355 : {
4356 : /*
4357 : * Open the file and seek to the end of the file because we always
4358 : * append the changes file.
4359 : */
4360 662 : stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset,
4361 : path, O_RDWR, false);
4362 662 : BufFileSeek(stream_fd, 0, 0, SEEK_END);
4363 : }
4364 :
4365 726 : MemoryContextSwitchTo(oldcxt);
4366 726 : }
4367 :
4368 : /*
4369 : * stream_close_file
4370 : * Close the currently open file with streamed changes.
4371 : */
4372 : static void
4373 786 : stream_close_file(void)
4374 : {
4375 : Assert(stream_fd != NULL);
4376 :
4377 786 : BufFileClose(stream_fd);
4378 :
4379 786 : stream_fd = NULL;
4380 786 : }
4381 :
4382 : /*
4383 : * stream_write_change
4384 : * Serialize a change to a file for the current toplevel transaction.
4385 : *
4386 : * The change is serialized in a simple format, with length (not including
4387 : * the length), action code (identifying the message type) and message
4388 : * contents (without the subxact TransactionId value).
4389 : */
4390 : static void
4391 215108 : stream_write_change(char action, StringInfo s)
4392 : {
4393 : int len;
4394 :
4395 : Assert(stream_fd != NULL);
4396 :
4397 : /* total on-disk size, including the action type character */
4398 215108 : len = (s->len - s->cursor) + sizeof(char);
4399 :
4400 : /* first write the size */
4401 215108 : BufFileWrite(stream_fd, &len, sizeof(len));
4402 :
4403 : /* then the action */
4404 215108 : BufFileWrite(stream_fd, &action, sizeof(action));
4405 :
4406 : /* and finally the remaining part of the buffer (after the XID) */
4407 215108 : len = (s->len - s->cursor);
4408 :
4409 215108 : BufFileWrite(stream_fd, &s->data[s->cursor], len);
4410 215108 : }
4411 :
4412 : /*
4413 : * stream_open_and_write_change
4414 : * Serialize a message to a file for the given transaction.
4415 : *
4416 : * This function is similar to stream_write_change except that it will open the
4417 : * target file if not already before writing the message and close the file at
4418 : * the end.
4419 : */
4420 : static void
4421 10 : stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
4422 : {
4423 : Assert(!in_streamed_transaction);
4424 :
4425 10 : if (!stream_fd)
4426 10 : stream_start_internal(xid, false);
4427 :
4428 10 : stream_write_change(action, s);
4429 10 : stream_stop_internal(xid);
4430 10 : }
4431 :
4432 : /*
4433 : * Sets streaming options including replication slot name and origin start
4434 : * position. Workers need these options for logical replication.
4435 : */
4436 : void
4437 746 : set_stream_options(WalRcvStreamOptions *options,
4438 : char *slotname,
4439 : XLogRecPtr *origin_startpos)
4440 : {
4441 : int server_version;
4442 :
4443 746 : options->logical = true;
4444 746 : options->startpoint = *origin_startpos;
4445 746 : options->slotname = slotname;
4446 :
4447 746 : server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
4448 746 : options->proto.logical.proto_version =
4449 746 : server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
4450 : server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
4451 : server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
4452 : LOGICALREP_PROTO_VERSION_NUM;
4453 :
4454 746 : options->proto.logical.publication_names = MySubscription->publications;
4455 746 : options->proto.logical.binary = MySubscription->binary;
4456 :
4457 : /*
4458 : * Assign the appropriate option value for streaming option according to
4459 : * the 'streaming' mode and the publisher's ability to support that mode.
4460 : */
4461 746 : if (server_version >= 160000 &&
4462 746 : MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
4463 : {
4464 678 : options->proto.logical.streaming_str = "parallel";
4465 678 : MyLogicalRepWorker->parallel_apply = true;
4466 : }
4467 68 : else if (server_version >= 140000 &&
4468 68 : MySubscription->stream != LOGICALREP_STREAM_OFF)
4469 : {
4470 52 : options->proto.logical.streaming_str = "on";
4471 52 : MyLogicalRepWorker->parallel_apply = false;
4472 : }
4473 : else
4474 : {
4475 16 : options->proto.logical.streaming_str = NULL;
4476 16 : MyLogicalRepWorker->parallel_apply = false;
4477 : }
4478 :
4479 746 : options->proto.logical.twophase = false;
4480 746 : options->proto.logical.origin = pstrdup(MySubscription->origin);
4481 746 : }
4482 :
4483 : /*
4484 : * Cleanup the memory for subxacts and reset the related variables.
4485 : */
4486 : static inline void
4487 752 : cleanup_subxact_info()
4488 : {
4489 752 : if (subxact_data.subxacts)
4490 174 : pfree(subxact_data.subxacts);
4491 :
4492 752 : subxact_data.subxacts = NULL;
4493 752 : subxact_data.subxact_last = InvalidTransactionId;
4494 752 : subxact_data.nsubxacts = 0;
4495 752 : subxact_data.nsubxacts_max = 0;
4496 752 : }
4497 :
4498 : /*
4499 : * Common function to run the apply loop with error handling. Disable the
4500 : * subscription, if necessary.
4501 : *
4502 : * Note that we don't handle FATAL errors which are probably because
4503 : * of system resource error and are not repeatable.
4504 : */
4505 : void
4506 746 : start_apply(XLogRecPtr origin_startpos)
4507 : {
4508 746 : PG_TRY();
4509 : {
4510 746 : LogicalRepApplyLoop(origin_startpos);
4511 : }
4512 112 : PG_CATCH();
4513 : {
4514 : /*
4515 : * Reset the origin state to prevent the advancement of origin
4516 : * progress if we fail to apply. Otherwise, this will result in
4517 : * transaction loss as that transaction won't be sent again by the
4518 : * server.
4519 : */
4520 112 : replorigin_reset(0, (Datum) 0);
4521 :
4522 112 : if (MySubscription->disableonerr)
4523 6 : DisableSubscriptionAndExit();
4524 : else
4525 : {
4526 : /*
4527 : * Report the worker failed while applying changes. Abort the
4528 : * current transaction so that the stats message is sent in an
4529 : * idle state.
4530 : */
4531 106 : AbortOutOfAnyTransaction();
4532 106 : pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
4533 :
4534 106 : PG_RE_THROW();
4535 : }
4536 : }
4537 0 : PG_END_TRY();
4538 0 : }
4539 :
4540 : /*
4541 : * Runs the leader apply worker.
4542 : *
4543 : * It sets up replication origin, streaming options and then starts streaming.
4544 : */
4545 : static void
4546 444 : run_apply_worker()
4547 : {
4548 : char originname[NAMEDATALEN];
4549 444 : XLogRecPtr origin_startpos = InvalidXLogRecPtr;
4550 444 : char *slotname = NULL;
4551 : WalRcvStreamOptions options;
4552 : RepOriginId originid;
4553 : TimeLineID startpointTLI;
4554 : char *err;
4555 : bool must_use_password;
4556 :
4557 444 : slotname = MySubscription->slotname;
4558 :
4559 : /*
4560 : * This shouldn't happen if the subscription is enabled, but guard against
4561 : * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
4562 : * slot is NULL.)
4563 : */
4564 444 : if (!slotname)
4565 0 : ereport(ERROR,
4566 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
4567 : errmsg("subscription has no replication slot set")));
4568 :
4569 : /* Setup replication origin tracking. */
4570 444 : ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
4571 : originname, sizeof(originname));
4572 444 : StartTransactionCommand();
4573 444 : originid = replorigin_by_name(originname, true);
4574 444 : if (!OidIsValid(originid))
4575 0 : originid = replorigin_create(originname);
4576 444 : replorigin_session_setup(originid, 0);
4577 444 : replorigin_session_origin = originid;
4578 444 : origin_startpos = replorigin_session_get_progress(false);
4579 444 : CommitTransactionCommand();
4580 :
4581 : /* Is the use of a password mandatory? */
4582 852 : must_use_password = MySubscription->passwordrequired &&
4583 408 : !MySubscription->ownersuperuser;
4584 :
4585 444 : LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
4586 : true, must_use_password,
4587 : MySubscription->name, &err);
4588 :
4589 428 : if (LogRepWorkerWalRcvConn == NULL)
4590 46 : ereport(ERROR,
4591 : (errcode(ERRCODE_CONNECTION_FAILURE),
4592 : errmsg("apply worker for subscription \"%s\" could not connect to the publisher: %s",
4593 : MySubscription->name, err)));
4594 :
4595 : /*
4596 : * We don't really use the output identify_system for anything but it does
4597 : * some initializations on the upstream so let's still call it.
4598 : */
4599 382 : (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
4600 :
4601 382 : set_apply_error_context_origin(originname);
4602 :
4603 382 : set_stream_options(&options, slotname, &origin_startpos);
4604 :
4605 : /*
4606 : * Even when the two_phase mode is requested by the user, it remains as
4607 : * the tri-state PENDING until all tablesyncs have reached READY state.
4608 : * Only then, can it become ENABLED.
4609 : *
4610 : * Note: If the subscription has no tables then leave the state as
4611 : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
4612 : * work.
4613 : */
4614 412 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
4615 30 : AllTablesyncsReady())
4616 : {
4617 : /* Start streaming with two_phase enabled */
4618 16 : options.proto.logical.twophase = true;
4619 16 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
4620 :
4621 16 : StartTransactionCommand();
4622 :
4623 : /*
4624 : * Updating pg_subscription might involve TOAST table access, so
4625 : * ensure we have a valid snapshot.
4626 : */
4627 16 : PushActiveSnapshot(GetTransactionSnapshot());
4628 :
4629 16 : UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
4630 16 : MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
4631 16 : PopActiveSnapshot();
4632 16 : CommitTransactionCommand();
4633 : }
4634 : else
4635 : {
4636 366 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
4637 : }
4638 :
4639 382 : ereport(DEBUG1,
4640 : (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
4641 : MySubscription->name,
4642 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
4643 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
4644 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
4645 : "?")));
4646 :
4647 : /* Run the main loop. */
4648 382 : start_apply(origin_startpos);
4649 0 : }
4650 :
4651 : /*
4652 : * Common initialization for leader apply worker, parallel apply worker and
4653 : * tablesync worker.
4654 : *
4655 : * Initialize the database connection, in-memory subscription and necessary
4656 : * config options.
4657 : */
4658 : void
4659 860 : InitializeLogRepWorker(void)
4660 : {
4661 : MemoryContext oldctx;
4662 :
4663 : /* Run as replica session replication role. */
4664 860 : SetConfigOption("session_replication_role", "replica",
4665 : PGC_SUSET, PGC_S_OVERRIDE);
4666 :
4667 : /* Connect to our database. */
4668 860 : BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
4669 860 : MyLogicalRepWorker->userid,
4670 : 0);
4671 :
4672 : /*
4673 : * Set always-secure search path, so malicious users can't redirect user
4674 : * code (e.g. pg_index.indexprs).
4675 : */
4676 854 : SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
4677 :
4678 : /* Load the subscription into persistent memory context. */
4679 854 : ApplyContext = AllocSetContextCreate(TopMemoryContext,
4680 : "ApplyContext",
4681 : ALLOCSET_DEFAULT_SIZES);
4682 854 : StartTransactionCommand();
4683 854 : oldctx = MemoryContextSwitchTo(ApplyContext);
4684 :
4685 854 : MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
4686 852 : if (!MySubscription)
4687 : {
4688 0 : ereport(LOG,
4689 : (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
4690 : MyLogicalRepWorker->subid)));
4691 :
4692 : /* Ensure we remove no-longer-useful entry for worker's start time */
4693 0 : if (am_leader_apply_worker())
4694 0 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
4695 :
4696 0 : proc_exit(0);
4697 : }
4698 :
4699 852 : MySubscriptionValid = true;
4700 852 : MemoryContextSwitchTo(oldctx);
4701 :
4702 852 : if (!MySubscription->enabled)
4703 : {
4704 0 : ereport(LOG,
4705 : (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
4706 : MySubscription->name)));
4707 :
4708 0 : apply_worker_exit();
4709 : }
4710 :
4711 : /* Setup synchronous commit according to the user's wishes */
4712 852 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
4713 : PGC_BACKEND, PGC_S_OVERRIDE);
4714 :
4715 : /*
4716 : * Keep us informed about subscription or role changes. Note that the
4717 : * role's superuser privilege can be revoked.
4718 : */
4719 852 : CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
4720 : subscription_change_cb,
4721 : (Datum) 0);
4722 :
4723 852 : CacheRegisterSyscacheCallback(AUTHOID,
4724 : subscription_change_cb,
4725 : (Datum) 0);
4726 :
4727 852 : if (am_tablesync_worker())
4728 388 : ereport(LOG,
4729 : (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
4730 : MySubscription->name,
4731 : get_rel_name(MyLogicalRepWorker->relid))));
4732 : else
4733 464 : ereport(LOG,
4734 : (errmsg("logical replication apply worker for subscription \"%s\" has started",
4735 : MySubscription->name)));
4736 :
4737 852 : CommitTransactionCommand();
4738 852 : }
4739 :
4740 : /*
4741 : * Reset the origin state.
4742 : */
4743 : static void
4744 944 : replorigin_reset(int code, Datum arg)
4745 : {
4746 944 : replorigin_session_origin = InvalidRepOriginId;
4747 944 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
4748 944 : replorigin_session_origin_timestamp = 0;
4749 944 : }
4750 :
4751 : /* Common function to setup the leader apply or tablesync worker. */
4752 : void
4753 840 : SetupApplyOrSyncWorker(int worker_slot)
4754 : {
4755 : /* Attach to slot */
4756 840 : logicalrep_worker_attach(worker_slot);
4757 :
4758 : Assert(am_tablesync_worker() || am_leader_apply_worker());
4759 :
4760 : /* Setup signal handling */
4761 840 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
4762 840 : pqsignal(SIGTERM, die);
4763 840 : BackgroundWorkerUnblockSignals();
4764 :
4765 : /*
4766 : * We don't currently need any ResourceOwner in a walreceiver process, but
4767 : * if we did, we could call CreateAuxProcessResourceOwner here.
4768 : */
4769 :
4770 : /* Initialise stats to a sanish value */
4771 840 : MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
4772 840 : MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
4773 :
4774 : /* Load the libpq-specific functions */
4775 840 : load_file("libpqwalreceiver", false);
4776 :
4777 840 : InitializeLogRepWorker();
4778 :
4779 : /*
4780 : * Register a callback to reset the origin state before aborting any
4781 : * pending transaction during shutdown (see ShutdownPostgres()). This will
4782 : * avoid origin advancement for an in-complete transaction which could
4783 : * otherwise lead to its loss as such a transaction won't be sent by the
4784 : * server again.
4785 : *
4786 : * Note that even a LOG or DEBUG statement placed after setting the origin
4787 : * state may process a shutdown signal before committing the current apply
4788 : * operation. So, it is important to register such a callback here.
4789 : */
4790 832 : before_shmem_exit(replorigin_reset, (Datum) 0);
4791 :
4792 : /* Connect to the origin and start the replication. */
4793 832 : elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
4794 : MySubscription->conninfo);
4795 :
4796 : /*
4797 : * Setup callback for syscache so that we know when something changes in
4798 : * the subscription relation state.
4799 : */
4800 832 : CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
4801 : invalidate_syncing_table_states,
4802 : (Datum) 0);
4803 832 : }
4804 :
4805 : /* Logical Replication Apply worker entry point */
4806 : void
4807 450 : ApplyWorkerMain(Datum main_arg)
4808 : {
4809 450 : int worker_slot = DatumGetInt32(main_arg);
4810 :
4811 450 : InitializingApplyWorker = true;
4812 :
4813 450 : SetupApplyOrSyncWorker(worker_slot);
4814 :
4815 444 : InitializingApplyWorker = false;
4816 :
4817 444 : run_apply_worker();
4818 :
4819 0 : proc_exit(0);
4820 : }
4821 :
4822 : /*
4823 : * After error recovery, disable the subscription in a new transaction
4824 : * and exit cleanly.
4825 : */
4826 : void
4827 8 : DisableSubscriptionAndExit(void)
4828 : {
4829 : /*
4830 : * Emit the error message, and recover from the error state to an idle
4831 : * state
4832 : */
4833 8 : HOLD_INTERRUPTS();
4834 :
4835 8 : EmitErrorReport();
4836 8 : AbortOutOfAnyTransaction();
4837 8 : FlushErrorState();
4838 :
4839 8 : RESUME_INTERRUPTS();
4840 :
4841 : /* Report the worker failed during either table synchronization or apply */
4842 8 : pgstat_report_subscription_error(MyLogicalRepWorker->subid,
4843 8 : !am_tablesync_worker());
4844 :
4845 : /* Disable the subscription */
4846 8 : StartTransactionCommand();
4847 :
4848 : /*
4849 : * Updating pg_subscription might involve TOAST table access, so ensure we
4850 : * have a valid snapshot.
4851 : */
4852 8 : PushActiveSnapshot(GetTransactionSnapshot());
4853 :
4854 8 : DisableSubscription(MySubscription->oid);
4855 8 : PopActiveSnapshot();
4856 8 : CommitTransactionCommand();
4857 :
4858 : /* Ensure we remove no-longer-useful entry for worker's start time */
4859 8 : if (am_leader_apply_worker())
4860 6 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
4861 :
4862 : /* Notify the subscription has been disabled and exit */
4863 8 : ereport(LOG,
4864 : errmsg("subscription \"%s\" has been disabled because of an error",
4865 : MySubscription->name));
4866 :
4867 8 : proc_exit(0);
4868 : }
4869 :
4870 : /*
4871 : * Is current process a logical replication worker?
4872 : */
4873 : bool
4874 3988 : IsLogicalWorker(void)
4875 : {
4876 3988 : return MyLogicalRepWorker != NULL;
4877 : }
4878 :
4879 : /*
4880 : * Is current process a logical replication parallel apply worker?
4881 : */
4882 : bool
4883 2760 : IsLogicalParallelApplyWorker(void)
4884 : {
4885 2760 : return IsLogicalWorker() && am_parallel_apply_worker();
4886 : }
4887 :
4888 : /*
4889 : * Start skipping changes of the transaction if the given LSN matches the
4890 : * LSN specified by subscription's skiplsn.
4891 : */
4892 : static void
4893 994 : maybe_start_skipping_changes(XLogRecPtr finish_lsn)
4894 : {
4895 : Assert(!is_skipping_changes());
4896 : Assert(!in_remote_transaction);
4897 : Assert(!in_streamed_transaction);
4898 :
4899 : /*
4900 : * Quick return if it's not requested to skip this transaction. This
4901 : * function is called for every remote transaction and we assume that
4902 : * skipping the transaction is not used often.
4903 : */
4904 994 : if (likely(XLogRecPtrIsInvalid(MySubscription->skiplsn) ||
4905 : MySubscription->skiplsn != finish_lsn))
4906 988 : return;
4907 :
4908 : /* Start skipping all changes of this transaction */
4909 6 : skip_xact_finish_lsn = finish_lsn;
4910 :
4911 6 : ereport(LOG,
4912 : errmsg("logical replication starts skipping transaction at LSN %X/%X",
4913 : LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
4914 : }
4915 :
4916 : /*
4917 : * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
4918 : */
4919 : static void
4920 54 : stop_skipping_changes(void)
4921 : {
4922 54 : if (!is_skipping_changes())
4923 48 : return;
4924 :
4925 6 : ereport(LOG,
4926 : (errmsg("logical replication completed skipping transaction at LSN %X/%X",
4927 : LSN_FORMAT_ARGS(skip_xact_finish_lsn))));
4928 :
4929 : /* Stop skipping changes */
4930 6 : skip_xact_finish_lsn = InvalidXLogRecPtr;
4931 : }
4932 :
4933 : /*
4934 : * Clear subskiplsn of pg_subscription catalog.
4935 : *
4936 : * finish_lsn is the transaction's finish LSN that is used to check if the
4937 : * subskiplsn matches it. If not matched, we raise a warning when clearing the
4938 : * subskiplsn in order to inform users for cases e.g., where the user mistakenly
4939 : * specified the wrong subskiplsn.
4940 : */
4941 : static void
4942 1028 : clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
4943 : {
4944 : Relation rel;
4945 : Form_pg_subscription subform;
4946 : HeapTuple tup;
4947 1028 : XLogRecPtr myskiplsn = MySubscription->skiplsn;
4948 1028 : bool started_tx = false;
4949 :
4950 1028 : if (likely(XLogRecPtrIsInvalid(myskiplsn)) || am_parallel_apply_worker())
4951 1022 : return;
4952 :
4953 6 : if (!IsTransactionState())
4954 : {
4955 2 : StartTransactionCommand();
4956 2 : started_tx = true;
4957 : }
4958 :
4959 : /*
4960 : * Updating pg_subscription might involve TOAST table access, so ensure we
4961 : * have a valid snapshot.
4962 : */
4963 6 : PushActiveSnapshot(GetTransactionSnapshot());
4964 :
4965 : /*
4966 : * Protect subskiplsn of pg_subscription from being concurrently updated
4967 : * while clearing it.
4968 : */
4969 6 : LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
4970 : AccessShareLock);
4971 :
4972 6 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
4973 :
4974 : /* Fetch the existing tuple. */
4975 6 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
4976 : ObjectIdGetDatum(MySubscription->oid));
4977 :
4978 6 : if (!HeapTupleIsValid(tup))
4979 0 : elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
4980 :
4981 6 : subform = (Form_pg_subscription) GETSTRUCT(tup);
4982 :
4983 : /*
4984 : * Clear the subskiplsn. If the user has already changed subskiplsn before
4985 : * clearing it we don't update the catalog and the replication origin
4986 : * state won't get advanced. So in the worst case, if the server crashes
4987 : * before sending an acknowledgment of the flush position the transaction
4988 : * will be sent again and the user needs to set subskiplsn again. We can
4989 : * reduce the possibility by logging a replication origin WAL record to
4990 : * advance the origin LSN instead but there is no way to advance the
4991 : * origin timestamp and it doesn't seem to be worth doing anything about
4992 : * it since it's a very rare case.
4993 : */
4994 6 : if (subform->subskiplsn == myskiplsn)
4995 : {
4996 : bool nulls[Natts_pg_subscription];
4997 : bool replaces[Natts_pg_subscription];
4998 : Datum values[Natts_pg_subscription];
4999 :
5000 6 : memset(values, 0, sizeof(values));
5001 6 : memset(nulls, false, sizeof(nulls));
5002 6 : memset(replaces, false, sizeof(replaces));
5003 :
5004 : /* reset subskiplsn */
5005 6 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
5006 6 : replaces[Anum_pg_subscription_subskiplsn - 1] = true;
5007 :
5008 6 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
5009 : replaces);
5010 6 : CatalogTupleUpdate(rel, &tup->t_self, tup);
5011 :
5012 6 : if (myskiplsn != finish_lsn)
5013 0 : ereport(WARNING,
5014 : errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
5015 : errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.",
5016 : LSN_FORMAT_ARGS(finish_lsn),
5017 : LSN_FORMAT_ARGS(myskiplsn)));
5018 : }
5019 :
5020 6 : heap_freetuple(tup);
5021 6 : table_close(rel, NoLock);
5022 :
5023 6 : PopActiveSnapshot();
5024 :
5025 6 : if (started_tx)
5026 2 : CommitTransactionCommand();
5027 : }
5028 :
5029 : /* Error callback to give more context info about the change being applied */
5030 : void
5031 1406 : apply_error_callback(void *arg)
5032 : {
5033 1406 : ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
5034 :
5035 1406 : if (apply_error_callback_arg.command == 0)
5036 694 : return;
5037 :
5038 : Assert(errarg->origin_name);
5039 :
5040 712 : if (errarg->rel == NULL)
5041 : {
5042 626 : if (!TransactionIdIsValid(errarg->remote_xid))
5043 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
5044 : errarg->origin_name,
5045 : logicalrep_message_type(errarg->command));
5046 626 : else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5047 514 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
5048 : errarg->origin_name,
5049 : logicalrep_message_type(errarg->command),
5050 : errarg->remote_xid);
5051 : else
5052 224 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
5053 : errarg->origin_name,
5054 : logicalrep_message_type(errarg->command),
5055 : errarg->remote_xid,
5056 112 : LSN_FORMAT_ARGS(errarg->finish_lsn));
5057 : }
5058 : else
5059 : {
5060 86 : if (errarg->remote_attnum < 0)
5061 : {
5062 86 : if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5063 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
5064 : errarg->origin_name,
5065 : logicalrep_message_type(errarg->command),
5066 0 : errarg->rel->remoterel.nspname,
5067 0 : errarg->rel->remoterel.relname,
5068 : errarg->remote_xid);
5069 : else
5070 172 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
5071 : errarg->origin_name,
5072 : logicalrep_message_type(errarg->command),
5073 86 : errarg->rel->remoterel.nspname,
5074 86 : errarg->rel->remoterel.relname,
5075 : errarg->remote_xid,
5076 86 : LSN_FORMAT_ARGS(errarg->finish_lsn));
5077 : }
5078 : else
5079 : {
5080 0 : if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5081 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
5082 : errarg->origin_name,
5083 : logicalrep_message_type(errarg->command),
5084 0 : errarg->rel->remoterel.nspname,
5085 0 : errarg->rel->remoterel.relname,
5086 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
5087 : errarg->remote_xid);
5088 : else
5089 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
5090 : errarg->origin_name,
5091 : logicalrep_message_type(errarg->command),
5092 0 : errarg->rel->remoterel.nspname,
5093 0 : errarg->rel->remoterel.relname,
5094 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
5095 : errarg->remote_xid,
5096 0 : LSN_FORMAT_ARGS(errarg->finish_lsn));
5097 : }
5098 : }
5099 : }
5100 :
5101 : /* Set transaction information of apply error callback */
5102 : static inline void
5103 5694 : set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
5104 : {
5105 5694 : apply_error_callback_arg.remote_xid = xid;
5106 5694 : apply_error_callback_arg.finish_lsn = lsn;
5107 5694 : }
5108 :
5109 : /* Reset all information of apply error callback */
5110 : static inline void
5111 2810 : reset_apply_error_context_info(void)
5112 : {
5113 2810 : apply_error_callback_arg.command = 0;
5114 2810 : apply_error_callback_arg.rel = NULL;
5115 2810 : apply_error_callback_arg.remote_attnum = -1;
5116 2810 : set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
5117 2810 : }
5118 :
5119 : /*
5120 : * Request wakeup of the workers for the given subscription OID
5121 : * at commit of the current transaction.
5122 : *
5123 : * This is used to ensure that the workers process assorted changes
5124 : * as soon as possible.
5125 : */
5126 : void
5127 386 : LogicalRepWorkersWakeupAtCommit(Oid subid)
5128 : {
5129 : MemoryContext oldcxt;
5130 :
5131 386 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
5132 386 : on_commit_wakeup_workers_subids =
5133 386 : list_append_unique_oid(on_commit_wakeup_workers_subids, subid);
5134 386 : MemoryContextSwitchTo(oldcxt);
5135 386 : }
5136 :
5137 : /*
5138 : * Wake up the workers of any subscriptions that were changed in this xact.
5139 : */
5140 : void
5141 1042496 : AtEOXact_LogicalRepWorkers(bool isCommit)
5142 : {
5143 1042496 : if (isCommit && on_commit_wakeup_workers_subids != NIL)
5144 : {
5145 : ListCell *lc;
5146 :
5147 376 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
5148 752 : foreach(lc, on_commit_wakeup_workers_subids)
5149 : {
5150 376 : Oid subid = lfirst_oid(lc);
5151 : List *workers;
5152 : ListCell *lc2;
5153 :
5154 376 : workers = logicalrep_workers_find(subid, true, false);
5155 486 : foreach(lc2, workers)
5156 : {
5157 110 : LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
5158 :
5159 110 : logicalrep_worker_wakeup_ptr(worker);
5160 : }
5161 : }
5162 376 : LWLockRelease(LogicalRepWorkerLock);
5163 : }
5164 :
5165 : /* The List storage will be reclaimed automatically in xact cleanup. */
5166 1042496 : on_commit_wakeup_workers_subids = NIL;
5167 1042496 : }
5168 :
5169 : /*
5170 : * Allocate the origin name in long-lived context for error context message.
5171 : */
5172 : void
5173 766 : set_apply_error_context_origin(char *originname)
5174 : {
5175 766 : apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
5176 : originname);
5177 766 : }
5178 :
5179 : /*
5180 : * Return the action to be taken for the given transaction. See
5181 : * TransApplyAction for information on each of the actions.
5182 : *
5183 : * *winfo is assigned to the destination parallel worker info when the leader
5184 : * apply worker has to pass all the transaction's changes to the parallel
5185 : * apply worker.
5186 : */
5187 : static TransApplyAction
5188 652422 : get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
5189 : {
5190 652422 : *winfo = NULL;
5191 :
5192 652422 : if (am_parallel_apply_worker())
5193 : {
5194 137876 : return TRANS_PARALLEL_APPLY;
5195 : }
5196 :
5197 : /*
5198 : * If we are processing this transaction using a parallel apply worker
5199 : * then either we send the changes to the parallel worker or if the worker
5200 : * is busy then serialize the changes to the file which will later be
5201 : * processed by the parallel worker.
5202 : */
5203 514546 : *winfo = pa_find_worker(xid);
5204 :
5205 514546 : if (*winfo && (*winfo)->serialize_changes)
5206 : {
5207 10074 : return TRANS_LEADER_PARTIAL_SERIALIZE;
5208 : }
5209 504472 : else if (*winfo)
5210 : {
5211 137788 : return TRANS_LEADER_SEND_TO_PARALLEL;
5212 : }
5213 :
5214 : /*
5215 : * If there is no parallel worker involved to process this transaction
5216 : * then we either directly apply the change or serialize it to a file
5217 : * which will later be applied when the transaction finish message is
5218 : * processed.
5219 : */
5220 366684 : else if (in_streamed_transaction)
5221 : {
5222 206398 : return TRANS_LEADER_SERIALIZE;
5223 : }
5224 : else
5225 : {
5226 160286 : return TRANS_LEADER_APPLY;
5227 : }
5228 : }
|