Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * worker.c
3 : * PostgreSQL logical replication worker (apply)
4 : *
5 : * Copyright (c) 2016-2025, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/worker.c
9 : *
10 : * NOTES
11 : * This file contains the worker which applies logical changes as they come
12 : * from remote logical replication stream.
13 : *
14 : * The main worker (apply) is started by logical replication worker
15 : * launcher for every enabled subscription in a database. It uses
16 : * walsender protocol to communicate with publisher.
17 : *
18 : * This module includes server facing code and shares libpqwalreceiver
19 : * module with walreceiver for providing the libpq specific functionality.
20 : *
21 : *
22 : * STREAMED TRANSACTIONS
23 : * ---------------------
24 : * Streamed transactions (large transactions exceeding a memory limit on the
25 : * upstream) are applied using one of two approaches:
26 : *
27 : * 1) Write to temporary files and apply when the final commit arrives
28 : *
29 : * This approach is used when the user has set the subscription's streaming
30 : * option as on.
31 : *
32 : * Unlike the regular (non-streamed) case, handling streamed transactions has
33 : * to handle aborts of both the toplevel transaction and subtransactions. This
34 : * is achieved by tracking offsets for subtransactions, which is then used
35 : * to truncate the file with serialized changes.
36 : *
37 : * The files are placed in tmp file directory by default, and the filenames
38 : * include both the XID of the toplevel transaction and OID of the
39 : * subscription. This is necessary so that different workers processing a
40 : * remote transaction with the same XID doesn't interfere.
41 : *
42 : * We use BufFiles instead of using normal temporary files because (a) the
43 : * BufFile infrastructure supports temporary files that exceed the OS file size
44 : * limit, (b) provides a way for automatic clean up on the error and (c) provides
45 : * a way to survive these files across local transactions and allow to open and
46 : * close at stream start and close. We decided to use FileSet
47 : * infrastructure as without that it deletes the files on the closure of the
48 : * file and if we decide to keep stream files open across the start/stop stream
49 : * then it will consume a lot of memory (more than 8K for each BufFile and
50 : * there could be multiple such BufFiles as the subscriber could receive
51 : * multiple start/stop streams for different transactions before getting the
52 : * commit). Moreover, if we don't use FileSet then we also need to invent
53 : * a new way to pass filenames to BufFile APIs so that we are allowed to open
54 : * the file we desired across multiple stream-open calls for the same
55 : * transaction.
56 : *
57 : * 2) Parallel apply workers.
58 : *
59 : * This approach is used when the user has set the subscription's streaming
60 : * option as parallel. See logical/applyparallelworker.c for information about
61 : * this approach.
62 : *
63 : * TWO_PHASE TRANSACTIONS
64 : * ----------------------
65 : * Two phase transactions are replayed at prepare and then committed or
66 : * rolled back at commit prepared and rollback prepared respectively. It is
67 : * possible to have a prepared transaction that arrives at the apply worker
68 : * when the tablesync is busy doing the initial copy. In this case, the apply
69 : * worker skips all the prepared operations [e.g. inserts] while the tablesync
70 : * is still busy (see the condition of should_apply_changes_for_rel). The
71 : * tablesync worker might not get such a prepared transaction because say it
72 : * was prior to the initial consistent point but might have got some later
73 : * commits. Now, the tablesync worker will exit without doing anything for the
74 : * prepared transaction skipped by the apply worker as the sync location for it
75 : * will be already ahead of the apply worker's current location. This would lead
76 : * to an "empty prepare", because later when the apply worker does the commit
77 : * prepare, there is nothing in it (the inserts were skipped earlier).
78 : *
79 : * To avoid this, and similar prepare confusions the subscription's two_phase
80 : * commit is enabled only after the initial sync is over. The two_phase option
81 : * has been implemented as a tri-state with values DISABLED, PENDING, and
82 : * ENABLED.
83 : *
84 : * Even if the user specifies they want a subscription with two_phase = on,
85 : * internally it will start with a tri-state of PENDING which only becomes
86 : * ENABLED after all tablesync initializations are completed - i.e. when all
87 : * tablesync workers have reached their READY state. In other words, the value
88 : * PENDING is only a temporary state for subscription start-up.
89 : *
90 : * Until the two_phase is properly available (ENABLED) the subscription will
91 : * behave as if two_phase = off. When the apply worker detects that all
92 : * tablesyncs have become READY (while the tri-state was PENDING) it will
93 : * restart the apply worker process. This happens in
94 : * process_syncing_tables_for_apply.
95 : *
96 : * When the (re-started) apply worker finds that all tablesyncs are READY for a
97 : * two_phase tri-state of PENDING it start streaming messages with the
98 : * two_phase option which in turn enables the decoding of two-phase commits at
99 : * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
100 : * Now, it is possible that during the time we have not enabled two_phase, the
101 : * publisher (replication server) would have skipped some prepares but we
102 : * ensure that such prepares are sent along with commit prepare, see
103 : * ReorderBufferFinishPrepared.
104 : *
105 : * If the subscription has no tables then a two_phase tri-state PENDING is
106 : * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
107 : * PUBLICATION which might otherwise be disallowed (see below).
108 : *
109 : * If ever a user needs to be aware of the tri-state value, they can fetch it
110 : * from the pg_subscription catalog (see column subtwophasestate).
111 : *
112 : * Finally, to avoid problems mentioned in previous paragraphs from any
113 : * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
114 : * to 'off' and then again back to 'on') there is a restriction for
115 : * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
116 : * the two_phase tri-state is ENABLED, except when copy_data = false.
117 : *
118 : * We can get prepare of the same GID more than once for the genuine cases
119 : * where we have defined multiple subscriptions for publications on the same
120 : * server and prepared transaction has operations on tables subscribed to those
121 : * subscriptions. For such cases, if we use the GID sent by publisher one of
122 : * the prepares will be successful and others will fail, in which case the
123 : * server will send them again. Now, this can lead to a deadlock if user has
124 : * set synchronous_standby_names for all the subscriptions on subscriber. To
125 : * avoid such deadlocks, we generate a unique GID (consisting of the
126 : * subscription oid and the xid of the prepared transaction) for each prepare
127 : * transaction on the subscriber.
128 : *
129 : * FAILOVER
130 : * ----------------------
131 : * The logical slot on the primary can be synced to the standby by specifying
132 : * failover = true when creating the subscription. Enabling failover allows us
133 : * to smoothly transition to the promoted standby, ensuring that we can
134 : * subscribe to the new primary without losing any data.
135 : *
136 : * RETAIN DEAD TUPLES
137 : * ----------------------
138 : * Each apply worker that enabled retain_dead_tuples option maintains a
139 : * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to
140 : * prevent dead rows from being removed prematurely when the apply worker still
141 : * needs them to detect update_deleted conflicts. Additionally, this helps to
142 : * retain the required commit_ts module information, which further helps to
143 : * detect update_origin_differs and delete_origin_differs conflicts reliably, as
144 : * otherwise, vacuum freeze could remove the required information.
145 : *
146 : * The logical replication launcher manages an internal replication slot named
147 : * "pg_conflict_detection". It asynchronously aggregates the non-removable
148 : * transaction ID from all apply workers to determine the appropriate xmin for
149 : * the slot, thereby retaining necessary tuples.
150 : *
151 : * The non-removable transaction ID in the apply worker is advanced to the
152 : * oldest running transaction ID once all concurrent transactions on the
153 : * publisher have been applied and flushed locally. The process involves:
154 : *
155 : * - RDT_GET_CANDIDATE_XID:
156 : * Call GetOldestActiveTransactionId() to take oldestRunningXid as the
157 : * candidate xid.
158 : *
159 : * - RDT_REQUEST_PUBLISHER_STATUS:
160 : * Send a message to the walsender requesting the publisher status, which
161 : * includes the latest WAL write position and information about transactions
162 : * that are in the commit phase.
163 : *
164 : * - RDT_WAIT_FOR_PUBLISHER_STATUS:
165 : * Wait for the status from the walsender. After receiving the first status,
166 : * do not proceed if there are concurrent remote transactions that are still
167 : * in the commit phase. These transactions might have been assigned an
168 : * earlier commit timestamp but have not yet written the commit WAL record.
169 : * Continue to request the publisher status (RDT_REQUEST_PUBLISHER_STATUS)
170 : * until all these transactions have completed.
171 : *
172 : * - RDT_WAIT_FOR_LOCAL_FLUSH:
173 : * Advance the non-removable transaction ID if the current flush location has
174 : * reached or surpassed the last received WAL position.
175 : *
176 : * - RDT_STOP_CONFLICT_INFO_RETENTION:
177 : * This phase is required only when max_retention_duration is defined. We
178 : * enter this phase if the wait time in either the
179 : * RDT_WAIT_FOR_PUBLISHER_STATUS or RDT_WAIT_FOR_LOCAL_FLUSH phase exceeds
180 : * configured max_retention_duration. In this phase,
181 : * pg_subscription.subretentionactive is updated to false within a new
182 : * transaction, and oldest_nonremovable_xid is set to InvalidTransactionId.
183 : *
184 : * The overall state progression is: GET_CANDIDATE_XID ->
185 : * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
186 : * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
187 : * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID.
188 : *
189 : * Retaining the dead tuples for this period is sufficient for ensuring
190 : * eventual consistency using last-update-wins strategy, as dead tuples are
191 : * useful for detecting conflicts only during the application of concurrent
192 : * transactions from remote nodes. After applying and flushing all remote
193 : * transactions that occurred concurrently with the tuple DELETE, any
194 : * subsequent UPDATE from a remote node should have a later timestamp. In such
195 : * cases, it is acceptable to detect an update_missing scenario and convert the
196 : * UPDATE to an INSERT when applying it. But, for concurrent remote
197 : * transactions with earlier timestamps than the DELETE, detecting
198 : * update_deleted is necessary, as the UPDATEs in remote transactions should be
199 : * ignored if their timestamp is earlier than that of the dead tuples.
200 : *
201 : * Note that advancing the non-removable transaction ID is not supported if the
202 : * publisher is also a physical standby. This is because the logical walsender
203 : * on the standby can only get the WAL replay position but there may be more
204 : * WALs that are being replicated from the primary and those WALs could have
205 : * earlier commit timestamp.
206 : *
207 : * Similarly, when the publisher has subscribed to another publisher,
208 : * information necessary for conflict detection cannot be retained for
209 : * changes from origins other than the publisher. This is because publisher
210 : * lacks the information on concurrent transactions of other publishers to
211 : * which it subscribes. As the information on concurrent transactions is
212 : * unavailable beyond subscriber's immediate publishers, the non-removable
213 : * transaction ID might be advanced prematurely before changes from other
214 : * origins have been fully applied.
215 : *
216 : * XXX Retaining information for changes from other origins might be possible
217 : * by requesting the subscription on that origin to enable retain_dead_tuples
218 : * and fetching the conflict detection slot.xmin along with the publisher's
219 : * status. In the RDT_WAIT_FOR_PUBLISHER_STATUS phase, the apply worker could
220 : * wait for the remote slot's xmin to reach the oldest active transaction ID,
221 : * ensuring that all transactions from other origins have been applied on the
222 : * publisher, thereby getting the latest WAL position that includes all
223 : * concurrent changes. However, this approach may impact performance, so it
224 : * might not worth the effort.
225 : *
226 : * XXX It seems feasible to get the latest commit's WAL location from the
227 : * publisher and wait till that is applied. However, we can't do that
228 : * because commit timestamps can regress as a commit with a later LSN is not
229 : * guaranteed to have a later timestamp than those with earlier LSNs. Having
230 : * said that, even if that is possible, it won't improve performance much as
231 : * the apply always lag and moves slowly as compared with the transactions
232 : * on the publisher.
233 : *-------------------------------------------------------------------------
234 : */
235 :
236 : #include "postgres.h"
237 :
238 : #include <sys/stat.h>
239 : #include <unistd.h>
240 :
241 : #include "access/commit_ts.h"
242 : #include "access/table.h"
243 : #include "access/tableam.h"
244 : #include "access/twophase.h"
245 : #include "access/xact.h"
246 : #include "catalog/indexing.h"
247 : #include "catalog/pg_inherits.h"
248 : #include "catalog/pg_subscription.h"
249 : #include "catalog/pg_subscription_rel.h"
250 : #include "commands/subscriptioncmds.h"
251 : #include "commands/tablecmds.h"
252 : #include "commands/trigger.h"
253 : #include "executor/executor.h"
254 : #include "executor/execPartition.h"
255 : #include "libpq/pqformat.h"
256 : #include "miscadmin.h"
257 : #include "optimizer/optimizer.h"
258 : #include "parser/parse_relation.h"
259 : #include "pgstat.h"
260 : #include "postmaster/bgworker.h"
261 : #include "postmaster/interrupt.h"
262 : #include "postmaster/walwriter.h"
263 : #include "replication/conflict.h"
264 : #include "replication/logicallauncher.h"
265 : #include "replication/logicalproto.h"
266 : #include "replication/logicalrelation.h"
267 : #include "replication/logicalworker.h"
268 : #include "replication/origin.h"
269 : #include "replication/slot.h"
270 : #include "replication/walreceiver.h"
271 : #include "replication/worker_internal.h"
272 : #include "rewrite/rewriteHandler.h"
273 : #include "storage/buffile.h"
274 : #include "storage/ipc.h"
275 : #include "storage/lmgr.h"
276 : #include "storage/procarray.h"
277 : #include "tcop/tcopprot.h"
278 : #include "utils/acl.h"
279 : #include "utils/guc.h"
280 : #include "utils/inval.h"
281 : #include "utils/lsyscache.h"
282 : #include "utils/memutils.h"
283 : #include "utils/pg_lsn.h"
284 : #include "utils/rel.h"
285 : #include "utils/rls.h"
286 : #include "utils/snapmgr.h"
287 : #include "utils/syscache.h"
288 : #include "utils/usercontext.h"
289 :
290 : #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
291 :
292 : typedef struct FlushPosition
293 : {
294 : dlist_node node;
295 : XLogRecPtr local_end;
296 : XLogRecPtr remote_end;
297 : } FlushPosition;
298 :
299 : static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
300 :
301 : typedef struct ApplyExecutionData
302 : {
303 : EState *estate; /* executor state, used to track resources */
304 :
305 : LogicalRepRelMapEntry *targetRel; /* replication target rel */
306 : ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
307 :
308 : /* These fields are used when the target relation is partitioned: */
309 : ModifyTableState *mtstate; /* dummy ModifyTable state */
310 : PartitionTupleRouting *proute; /* partition routing info */
311 : } ApplyExecutionData;
312 :
313 : /* Struct for saving and restoring apply errcontext information */
314 : typedef struct ApplyErrorCallbackArg
315 : {
316 : LogicalRepMsgType command; /* 0 if invalid */
317 : LogicalRepRelMapEntry *rel;
318 :
319 : /* Remote node information */
320 : int remote_attnum; /* -1 if invalid */
321 : TransactionId remote_xid;
322 : XLogRecPtr finish_lsn;
323 : char *origin_name;
324 : } ApplyErrorCallbackArg;
325 :
326 : /*
327 : * The action to be taken for the changes in the transaction.
328 : *
329 : * TRANS_LEADER_APPLY:
330 : * This action means that we are in the leader apply worker or table sync
331 : * worker. The changes of the transaction are either directly applied or
332 : * are read from temporary files (for streaming transactions) and then
333 : * applied by the worker.
334 : *
335 : * TRANS_LEADER_SERIALIZE:
336 : * This action means that we are in the leader apply worker or table sync
337 : * worker. Changes are written to temporary files and then applied when the
338 : * final commit arrives.
339 : *
340 : * TRANS_LEADER_SEND_TO_PARALLEL:
341 : * This action means that we are in the leader apply worker and need to send
342 : * the changes to the parallel apply worker.
343 : *
344 : * TRANS_LEADER_PARTIAL_SERIALIZE:
345 : * This action means that we are in the leader apply worker and have sent some
346 : * changes directly to the parallel apply worker and the remaining changes are
347 : * serialized to a file, due to timeout while sending data. The parallel apply
348 : * worker will apply these serialized changes when the final commit arrives.
349 : *
350 : * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
351 : * serializing changes, the leader worker also needs to serialize the
352 : * STREAM_XXX message to a file, and wait for the parallel apply worker to
353 : * finish the transaction when processing the transaction finish command. So
354 : * this new action was introduced to keep the code and logic clear.
355 : *
356 : * TRANS_PARALLEL_APPLY:
357 : * This action means that we are in the parallel apply worker and changes of
358 : * the transaction are applied directly by the worker.
359 : */
360 : typedef enum
361 : {
362 : /* The action for non-streaming transactions. */
363 : TRANS_LEADER_APPLY,
364 :
365 : /* Actions for streaming transactions. */
366 : TRANS_LEADER_SERIALIZE,
367 : TRANS_LEADER_SEND_TO_PARALLEL,
368 : TRANS_LEADER_PARTIAL_SERIALIZE,
369 : TRANS_PARALLEL_APPLY,
370 : } TransApplyAction;
371 :
372 : /*
373 : * The phases involved in advancing the non-removable transaction ID.
374 : *
375 : * See comments atop worker.c for details of the transition between these
376 : * phases.
377 : */
378 : typedef enum
379 : {
380 : RDT_GET_CANDIDATE_XID,
381 : RDT_REQUEST_PUBLISHER_STATUS,
382 : RDT_WAIT_FOR_PUBLISHER_STATUS,
383 : RDT_WAIT_FOR_LOCAL_FLUSH,
384 : RDT_STOP_CONFLICT_INFO_RETENTION
385 : } RetainDeadTuplesPhase;
386 :
387 : /*
388 : * Critical information for managing phase transitions within the
389 : * RetainDeadTuplesPhase.
390 : */
391 : typedef struct RetainDeadTuplesData
392 : {
393 : RetainDeadTuplesPhase phase; /* current phase */
394 : XLogRecPtr remote_lsn; /* WAL write position on the publisher */
395 :
396 : /*
397 : * Oldest transaction ID that was in the commit phase on the publisher.
398 : * Use FullTransactionId to prevent issues with transaction ID wraparound,
399 : * where a new remote_oldestxid could falsely appear to originate from the
400 : * past and block advancement.
401 : */
402 : FullTransactionId remote_oldestxid;
403 :
404 : /*
405 : * Next transaction ID to be assigned on the publisher. Use
406 : * FullTransactionId for consistency and to allow straightforward
407 : * comparisons with remote_oldestxid.
408 : */
409 : FullTransactionId remote_nextxid;
410 :
411 : TimestampTz reply_time; /* when the publisher responds with status */
412 :
413 : /*
414 : * Publisher transaction ID that must be awaited to complete before
415 : * entering the final phase (RDT_WAIT_FOR_LOCAL_FLUSH). Use
416 : * FullTransactionId for the same reason as remote_nextxid.
417 : */
418 : FullTransactionId remote_wait_for;
419 :
420 : TransactionId candidate_xid; /* candidate for the non-removable
421 : * transaction ID */
422 : TimestampTz flushpos_update_time; /* when the remote flush position was
423 : * updated in final phase
424 : * (RDT_WAIT_FOR_LOCAL_FLUSH) */
425 :
426 : long table_sync_wait_time; /* time spent waiting for table sync
427 : * to finish */
428 :
429 : /*
430 : * The following fields are used to determine the timing for the next
431 : * round of transaction ID advancement.
432 : */
433 : TimestampTz last_recv_time; /* when the last message was received */
434 : TimestampTz candidate_xid_time; /* when the candidate_xid is decided */
435 : int xid_advance_interval; /* how much time (ms) to wait before
436 : * attempting to advance the
437 : * non-removable transaction ID */
438 : } RetainDeadTuplesData;
439 :
440 : /*
441 : * The minimum (100ms) and maximum (3 minutes) intervals for advancing
442 : * non-removable transaction IDs. The maximum interval is a bit arbitrary but
443 : * is sufficient to not cause any undue network traffic.
444 : */
445 : #define MIN_XID_ADVANCE_INTERVAL 100
446 : #define MAX_XID_ADVANCE_INTERVAL 180000
447 :
448 : /* errcontext tracker */
449 : static ApplyErrorCallbackArg apply_error_callback_arg =
450 : {
451 : .command = 0,
452 : .rel = NULL,
453 : .remote_attnum = -1,
454 : .remote_xid = InvalidTransactionId,
455 : .finish_lsn = InvalidXLogRecPtr,
456 : .origin_name = NULL,
457 : };
458 :
459 : ErrorContextCallback *apply_error_context_stack = NULL;
460 :
461 : MemoryContext ApplyMessageContext = NULL;
462 : MemoryContext ApplyContext = NULL;
463 :
464 : /* per stream context for streaming transactions */
465 : static MemoryContext LogicalStreamingContext = NULL;
466 :
467 : WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
468 :
469 : Subscription *MySubscription = NULL;
470 : static bool MySubscriptionValid = false;
471 :
472 : static List *on_commit_wakeup_workers_subids = NIL;
473 :
474 : bool in_remote_transaction = false;
475 : static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
476 :
477 : /* fields valid only when processing streamed transaction */
478 : static bool in_streamed_transaction = false;
479 :
480 : static TransactionId stream_xid = InvalidTransactionId;
481 :
482 : /*
483 : * The number of changes applied by parallel apply worker during one streaming
484 : * block.
485 : */
486 : static uint32 parallel_stream_nchanges = 0;
487 :
488 : /* Are we initializing an apply worker? */
489 : bool InitializingApplyWorker = false;
490 :
491 : /*
492 : * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
493 : * the subscription if the remote transaction's finish LSN matches the subskiplsn.
494 : * Once we start skipping changes, we don't stop it until we skip all changes of
495 : * the transaction even if pg_subscription is updated and MySubscription->skiplsn
496 : * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
497 : * we don't skip receiving and spooling the changes since we decide whether or not
498 : * to skip applying the changes when starting to apply changes. The subskiplsn is
499 : * cleared after successfully skipping the transaction or applying non-empty
500 : * transaction. The latter prevents the mistakenly specified subskiplsn from
501 : * being left. Note that we cannot skip the streaming transactions when using
502 : * parallel apply workers because we cannot get the finish LSN before applying
503 : * the changes. So, we don't start parallel apply worker when finish LSN is set
504 : * by the user.
505 : */
506 : static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
507 : #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
508 :
509 : /* BufFile handle of the current streaming file */
510 : static BufFile *stream_fd = NULL;
511 :
512 : /*
513 : * The remote WAL position that has been applied and flushed locally. We record
514 : * and use this information both while sending feedback to the server and
515 : * advancing oldest_nonremovable_xid.
516 : */
517 : static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
518 :
519 : typedef struct SubXactInfo
520 : {
521 : TransactionId xid; /* XID of the subxact */
522 : int fileno; /* file number in the buffile */
523 : off_t offset; /* offset in the file */
524 : } SubXactInfo;
525 :
526 : /* Sub-transaction data for the current streaming transaction */
527 : typedef struct ApplySubXactData
528 : {
529 : uint32 nsubxacts; /* number of sub-transactions */
530 : uint32 nsubxacts_max; /* current capacity of subxacts */
531 : TransactionId subxact_last; /* xid of the last sub-transaction */
532 : SubXactInfo *subxacts; /* sub-xact offset in changes file */
533 : } ApplySubXactData;
534 :
535 : static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
536 :
537 : static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
538 : static inline void changes_filename(char *path, Oid subid, TransactionId xid);
539 :
540 : /*
541 : * Information about subtransactions of a given toplevel transaction.
542 : */
543 : static void subxact_info_write(Oid subid, TransactionId xid);
544 : static void subxact_info_read(Oid subid, TransactionId xid);
545 : static void subxact_info_add(TransactionId xid);
546 : static inline void cleanup_subxact_info(void);
547 :
548 : /*
549 : * Serialize and deserialize changes for a toplevel transaction.
550 : */
551 : static void stream_open_file(Oid subid, TransactionId xid,
552 : bool first_segment);
553 : static void stream_write_change(char action, StringInfo s);
554 : static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
555 : static void stream_close_file(void);
556 :
557 : static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
558 :
559 : static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
560 : bool status_received);
561 : static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data);
562 : static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
563 : bool status_received);
564 : static void get_candidate_xid(RetainDeadTuplesData *rdt_data);
565 : static void request_publisher_status(RetainDeadTuplesData *rdt_data);
566 : static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
567 : bool status_received);
568 : static void wait_for_local_flush(RetainDeadTuplesData *rdt_data);
569 : static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
570 : static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
571 : static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data);
572 : static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data,
573 : bool new_xid_found);
574 :
575 : static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
576 : static void apply_handle_insert_internal(ApplyExecutionData *edata,
577 : ResultRelInfo *relinfo,
578 : TupleTableSlot *remoteslot);
579 : static void apply_handle_update_internal(ApplyExecutionData *edata,
580 : ResultRelInfo *relinfo,
581 : TupleTableSlot *remoteslot,
582 : LogicalRepTupleData *newtup,
583 : Oid localindexoid);
584 : static void apply_handle_delete_internal(ApplyExecutionData *edata,
585 : ResultRelInfo *relinfo,
586 : TupleTableSlot *remoteslot,
587 : Oid localindexoid);
588 : static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
589 : LogicalRepRelation *remoterel,
590 : Oid localidxoid,
591 : TupleTableSlot *remoteslot,
592 : TupleTableSlot **localslot);
593 : static bool FindDeletedTupleInLocalRel(Relation localrel,
594 : Oid localidxoid,
595 : TupleTableSlot *remoteslot,
596 : TransactionId *delete_xid,
597 : RepOriginId *delete_origin,
598 : TimestampTz *delete_time);
599 : static void apply_handle_tuple_routing(ApplyExecutionData *edata,
600 : TupleTableSlot *remoteslot,
601 : LogicalRepTupleData *newtup,
602 : CmdType operation);
603 :
604 : /* Functions for skipping changes */
605 : static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
606 : static void stop_skipping_changes(void);
607 : static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
608 :
609 : /* Functions for apply error callback */
610 : static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
611 : static inline void reset_apply_error_context_info(void);
612 :
613 : static TransApplyAction get_transaction_apply_action(TransactionId xid,
614 : ParallelApplyWorkerInfo **winfo);
615 :
616 : static void replorigin_reset(int code, Datum arg);
617 :
618 : /*
619 : * Form the origin name for the subscription.
620 : *
621 : * This is a common function for tablesync and other workers. Tablesync workers
622 : * must pass a valid relid. Other callers must pass relid = InvalidOid.
623 : *
624 : * Return the name in the supplied buffer.
625 : */
626 : void
627 2634 : ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
628 : char *originname, Size szoriginname)
629 : {
630 2634 : if (OidIsValid(relid))
631 : {
632 : /* Replication origin name for tablesync workers. */
633 1490 : snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
634 : }
635 : else
636 : {
637 : /* Replication origin name for non-tablesync workers. */
638 1144 : snprintf(originname, szoriginname, "pg_%u", suboid);
639 : }
640 2634 : }
641 :
642 : /*
643 : * Should this worker apply changes for given relation.
644 : *
645 : * This is mainly needed for initial relation data sync as that runs in
646 : * separate worker process running in parallel and we need some way to skip
647 : * changes coming to the leader apply worker during the sync of a table.
648 : *
649 : * Note we need to do smaller or equals comparison for SYNCDONE state because
650 : * it might hold position of end of initial slot consistent point WAL
651 : * record + 1 (ie start of next record) and next record can be COMMIT of
652 : * transaction we are now processing (which is what we set remote_final_lsn
653 : * to in apply_handle_begin).
654 : *
655 : * Note that for streaming transactions that are being applied in the parallel
656 : * apply worker, we disallow applying changes if the target table in the
657 : * subscription is not in the READY state, because we cannot decide whether to
658 : * apply the change as we won't know remote_final_lsn by that time.
659 : *
660 : * We already checked this in pa_can_start() before assigning the
661 : * streaming transaction to the parallel worker, but it also needs to be
662 : * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
663 : * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
664 : * while applying this transaction.
665 : */
666 : static bool
667 296800 : should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
668 : {
669 296800 : switch (MyLogicalRepWorker->type)
670 : {
671 20 : case WORKERTYPE_TABLESYNC:
672 20 : return MyLogicalRepWorker->relid == rel->localreloid;
673 :
674 137254 : case WORKERTYPE_PARALLEL_APPLY:
675 : /* We don't synchronize rel's that are in unknown state. */
676 137254 : if (rel->state != SUBREL_STATE_READY &&
677 0 : rel->state != SUBREL_STATE_UNKNOWN)
678 0 : ereport(ERROR,
679 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
680 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
681 : MySubscription->name),
682 : errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
683 :
684 137254 : return rel->state == SUBREL_STATE_READY;
685 :
686 159526 : case WORKERTYPE_APPLY:
687 159694 : return (rel->state == SUBREL_STATE_READY ||
688 168 : (rel->state == SUBREL_STATE_SYNCDONE &&
689 50 : rel->statelsn <= remote_final_lsn));
690 :
691 0 : case WORKERTYPE_UNKNOWN:
692 : /* Should never happen. */
693 0 : elog(ERROR, "Unknown worker type");
694 : }
695 :
696 0 : return false; /* dummy for compiler */
697 : }
698 :
699 : /*
700 : * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
701 : *
702 : * Start a transaction, if this is the first step (else we keep using the
703 : * existing transaction).
704 : * Also provide a global snapshot and ensure we run in ApplyMessageContext.
705 : */
706 : static void
707 297708 : begin_replication_step(void)
708 : {
709 297708 : SetCurrentStatementStartTimestamp();
710 :
711 297708 : if (!IsTransactionState())
712 : {
713 1918 : StartTransactionCommand();
714 1918 : maybe_reread_subscription();
715 : }
716 :
717 297702 : PushActiveSnapshot(GetTransactionSnapshot());
718 :
719 297702 : MemoryContextSwitchTo(ApplyMessageContext);
720 297702 : }
721 :
722 : /*
723 : * Finish up one step of a replication transaction.
724 : * Callers of begin_replication_step() must also call this.
725 : *
726 : * We don't close out the transaction here, but we should increment
727 : * the command counter to make the effects of this step visible.
728 : */
729 : static void
730 297598 : end_replication_step(void)
731 : {
732 297598 : PopActiveSnapshot();
733 :
734 297598 : CommandCounterIncrement();
735 297598 : }
736 :
737 : /*
738 : * Handle streamed transactions for both the leader apply worker and the
739 : * parallel apply workers.
740 : *
741 : * In the streaming case (receiving a block of the streamed transaction), for
742 : * serialize mode, simply redirect it to a file for the proper toplevel
743 : * transaction, and for parallel mode, the leader apply worker will send the
744 : * changes to parallel apply workers and the parallel apply worker will define
745 : * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
746 : * messages will be applied by both leader apply worker and parallel apply
747 : * workers).
748 : *
749 : * Returns true for streamed transactions (when the change is either serialized
750 : * to file or sent to parallel apply worker), false otherwise (regular mode or
751 : * needs to be processed by parallel apply worker).
752 : *
753 : * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
754 : * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
755 : * to a parallel apply worker.
756 : */
757 : static bool
758 649456 : handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
759 : {
760 : TransactionId current_xid;
761 : ParallelApplyWorkerInfo *winfo;
762 : TransApplyAction apply_action;
763 : StringInfoData original_msg;
764 :
765 649456 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
766 :
767 : /* not in streaming mode */
768 649456 : if (apply_action == TRANS_LEADER_APPLY)
769 160338 : return false;
770 :
771 : Assert(TransactionIdIsValid(stream_xid));
772 :
773 : /*
774 : * The parallel apply worker needs the xid in this message to decide
775 : * whether to define a savepoint, so save the original message that has
776 : * not moved the cursor after the xid. We will serialize this message to a
777 : * file in PARTIAL_SERIALIZE mode.
778 : */
779 489118 : original_msg = *s;
780 :
781 : /*
782 : * We should have received XID of the subxact as the first part of the
783 : * message, so extract it.
784 : */
785 489118 : current_xid = pq_getmsgint(s, 4);
786 :
787 489118 : if (!TransactionIdIsValid(current_xid))
788 0 : ereport(ERROR,
789 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
790 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
791 :
792 489118 : switch (apply_action)
793 : {
794 205024 : case TRANS_LEADER_SERIALIZE:
795 : Assert(stream_fd);
796 :
797 : /* Add the new subxact to the array (unless already there). */
798 205024 : subxact_info_add(current_xid);
799 :
800 : /* Write the change to the current file */
801 205024 : stream_write_change(action, s);
802 205024 : return true;
803 :
804 136772 : case TRANS_LEADER_SEND_TO_PARALLEL:
805 : Assert(winfo);
806 :
807 : /*
808 : * XXX The publisher side doesn't always send relation/type update
809 : * messages after the streaming transaction, so also update the
810 : * relation/type in leader apply worker. See function
811 : * cleanup_rel_sync_cache.
812 : */
813 136772 : if (pa_send_data(winfo, s->len, s->data))
814 136772 : return (action != LOGICAL_REP_MSG_RELATION &&
815 : action != LOGICAL_REP_MSG_TYPE);
816 :
817 : /*
818 : * Switch to serialize mode when we are not able to send the
819 : * change to parallel apply worker.
820 : */
821 0 : pa_switch_to_partial_serialize(winfo, false);
822 :
823 : /* fall through */
824 10012 : case TRANS_LEADER_PARTIAL_SERIALIZE:
825 10012 : stream_write_change(action, &original_msg);
826 :
827 : /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
828 10012 : return (action != LOGICAL_REP_MSG_RELATION &&
829 : action != LOGICAL_REP_MSG_TYPE);
830 :
831 137310 : case TRANS_PARALLEL_APPLY:
832 137310 : parallel_stream_nchanges += 1;
833 :
834 : /* Define a savepoint for a subxact if needed. */
835 137310 : pa_start_subtrans(current_xid, stream_xid);
836 137310 : return false;
837 :
838 0 : default:
839 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
840 : return false; /* silence compiler warning */
841 : }
842 : }
843 :
844 : /*
845 : * Executor state preparation for evaluation of constraint expressions,
846 : * indexes and triggers for the specified relation.
847 : *
848 : * Note that the caller must open and close any indexes to be updated.
849 : */
850 : static ApplyExecutionData *
851 296622 : create_edata_for_relation(LogicalRepRelMapEntry *rel)
852 : {
853 : ApplyExecutionData *edata;
854 : EState *estate;
855 : RangeTblEntry *rte;
856 296622 : List *perminfos = NIL;
857 : ResultRelInfo *resultRelInfo;
858 :
859 296622 : edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
860 296622 : edata->targetRel = rel;
861 :
862 296622 : edata->estate = estate = CreateExecutorState();
863 :
864 296622 : rte = makeNode(RangeTblEntry);
865 296622 : rte->rtekind = RTE_RELATION;
866 296622 : rte->relid = RelationGetRelid(rel->localrel);
867 296622 : rte->relkind = rel->localrel->rd_rel->relkind;
868 296622 : rte->rellockmode = AccessShareLock;
869 :
870 296622 : addRTEPermissionInfo(&perminfos, rte);
871 :
872 296622 : ExecInitRangeTable(estate, list_make1(rte), perminfos,
873 : bms_make_singleton(1));
874 :
875 296622 : edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
876 :
877 : /*
878 : * Use Relation opened by logicalrep_rel_open() instead of opening it
879 : * again.
880 : */
881 296622 : InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
882 :
883 : /*
884 : * We put the ResultRelInfo in the es_opened_result_relations list, even
885 : * though we don't populate the es_result_relations array. That's a bit
886 : * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
887 : *
888 : * ExecOpenIndices() is not called here either, each execution path doing
889 : * an apply operation being responsible for that.
890 : */
891 296622 : estate->es_opened_result_relations =
892 296622 : lappend(estate->es_opened_result_relations, resultRelInfo);
893 :
894 296622 : estate->es_output_cid = GetCurrentCommandId(true);
895 :
896 : /* Prepare to catch AFTER triggers. */
897 296622 : AfterTriggerBeginQuery();
898 :
899 : /* other fields of edata remain NULL for now */
900 :
901 296622 : return edata;
902 : }
903 :
904 : /*
905 : * Finish any operations related to the executor state created by
906 : * create_edata_for_relation().
907 : */
908 : static void
909 296534 : finish_edata(ApplyExecutionData *edata)
910 : {
911 296534 : EState *estate = edata->estate;
912 :
913 : /* Handle any queued AFTER triggers. */
914 296534 : AfterTriggerEndQuery(estate);
915 :
916 : /* Shut down tuple routing, if any was done. */
917 296534 : if (edata->proute)
918 148 : ExecCleanupTupleRouting(edata->mtstate, edata->proute);
919 :
920 : /*
921 : * Cleanup. It might seem that we should call ExecCloseResultRelations()
922 : * here, but we intentionally don't. It would close the rel we added to
923 : * es_opened_result_relations above, which is wrong because we took no
924 : * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
925 : * any other relations opened during execution.
926 : */
927 296534 : ExecResetTupleTable(estate->es_tupleTable, false);
928 296534 : FreeExecutorState(estate);
929 296534 : pfree(edata);
930 296534 : }
931 :
932 : /*
933 : * Executes default values for columns for which we can't map to remote
934 : * relation columns.
935 : *
936 : * This allows us to support tables which have more columns on the downstream
937 : * than on the upstream.
938 : */
939 : static void
940 152104 : slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
941 : TupleTableSlot *slot)
942 : {
943 152104 : TupleDesc desc = RelationGetDescr(rel->localrel);
944 152104 : int num_phys_attrs = desc->natts;
945 : int i;
946 : int attnum,
947 152104 : num_defaults = 0;
948 : int *defmap;
949 : ExprState **defexprs;
950 : ExprContext *econtext;
951 :
952 152104 : econtext = GetPerTupleExprContext(estate);
953 :
954 : /* We got all the data via replication, no need to evaluate anything. */
955 152104 : if (num_phys_attrs == rel->remoterel.natts)
956 71814 : return;
957 :
958 80290 : defmap = (int *) palloc(num_phys_attrs * sizeof(int));
959 80290 : defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
960 :
961 : Assert(rel->attrmap->maplen == num_phys_attrs);
962 421326 : for (attnum = 0; attnum < num_phys_attrs; attnum++)
963 : {
964 : Expr *defexpr;
965 :
966 341036 : if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
967 18 : continue;
968 :
969 341018 : if (rel->attrmap->attnums[attnum] >= 0)
970 184536 : continue;
971 :
972 156482 : defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
973 :
974 156482 : if (defexpr != NULL)
975 : {
976 : /* Run the expression through planner */
977 140262 : defexpr = expression_planner(defexpr);
978 :
979 : /* Initialize executable expression in copycontext */
980 140262 : defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
981 140262 : defmap[num_defaults] = attnum;
982 140262 : num_defaults++;
983 : }
984 : }
985 :
986 220552 : for (i = 0; i < num_defaults; i++)
987 140262 : slot->tts_values[defmap[i]] =
988 140262 : ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
989 : }
990 :
991 : /*
992 : * Store tuple data into slot.
993 : *
994 : * Incoming data can be either text or binary format.
995 : */
996 : static void
997 296646 : slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
998 : LogicalRepTupleData *tupleData)
999 : {
1000 296646 : int natts = slot->tts_tupleDescriptor->natts;
1001 : int i;
1002 :
1003 296646 : ExecClearTuple(slot);
1004 :
1005 : /* Call the "in" function for each non-dropped, non-null attribute */
1006 : Assert(natts == rel->attrmap->maplen);
1007 1316344 : for (i = 0; i < natts; i++)
1008 : {
1009 1019698 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
1010 1019698 : int remoteattnum = rel->attrmap->attnums[i];
1011 :
1012 1019698 : if (!att->attisdropped && remoteattnum >= 0)
1013 605864 : {
1014 605864 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
1015 :
1016 : Assert(remoteattnum < tupleData->ncols);
1017 :
1018 : /* Set attnum for error callback */
1019 605864 : apply_error_callback_arg.remote_attnum = remoteattnum;
1020 :
1021 605864 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
1022 : {
1023 : Oid typinput;
1024 : Oid typioparam;
1025 :
1026 284724 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1027 569448 : slot->tts_values[i] =
1028 284724 : OidInputFunctionCall(typinput, colvalue->data,
1029 : typioparam, att->atttypmod);
1030 284724 : slot->tts_isnull[i] = false;
1031 : }
1032 321140 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1033 : {
1034 : Oid typreceive;
1035 : Oid typioparam;
1036 :
1037 : /*
1038 : * In some code paths we may be asked to re-parse the same
1039 : * tuple data. Reset the StringInfo's cursor so that works.
1040 : */
1041 220484 : colvalue->cursor = 0;
1042 :
1043 220484 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1044 440968 : slot->tts_values[i] =
1045 220484 : OidReceiveFunctionCall(typreceive, colvalue,
1046 : typioparam, att->atttypmod);
1047 :
1048 : /* Trouble if it didn't eat the whole buffer */
1049 220484 : if (colvalue->cursor != colvalue->len)
1050 0 : ereport(ERROR,
1051 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
1052 : errmsg("incorrect binary data format in logical replication column %d",
1053 : remoteattnum + 1)));
1054 220484 : slot->tts_isnull[i] = false;
1055 : }
1056 : else
1057 : {
1058 : /*
1059 : * NULL value from remote. (We don't expect to see
1060 : * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
1061 : * NULL.)
1062 : */
1063 100656 : slot->tts_values[i] = (Datum) 0;
1064 100656 : slot->tts_isnull[i] = true;
1065 : }
1066 :
1067 : /* Reset attnum for error callback */
1068 605864 : apply_error_callback_arg.remote_attnum = -1;
1069 : }
1070 : else
1071 : {
1072 : /*
1073 : * We assign NULL to dropped attributes and missing values
1074 : * (missing values should be later filled using
1075 : * slot_fill_defaults).
1076 : */
1077 413834 : slot->tts_values[i] = (Datum) 0;
1078 413834 : slot->tts_isnull[i] = true;
1079 : }
1080 : }
1081 :
1082 296646 : ExecStoreVirtualTuple(slot);
1083 296646 : }
1084 :
1085 : /*
1086 : * Replace updated columns with data from the LogicalRepTupleData struct.
1087 : * This is somewhat similar to heap_modify_tuple but also calls the type
1088 : * input functions on the user data.
1089 : *
1090 : * "slot" is filled with a copy of the tuple in "srcslot", replacing
1091 : * columns provided in "tupleData" and leaving others as-is.
1092 : *
1093 : * Caution: unreplaced pass-by-ref columns in "slot" will point into the
1094 : * storage for "srcslot". This is OK for current usage, but someday we may
1095 : * need to materialize "slot" at the end to make it independent of "srcslot".
1096 : */
1097 : static void
1098 63848 : slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
1099 : LogicalRepRelMapEntry *rel,
1100 : LogicalRepTupleData *tupleData)
1101 : {
1102 63848 : int natts = slot->tts_tupleDescriptor->natts;
1103 : int i;
1104 :
1105 : /* We'll fill "slot" with a virtual tuple, so we must start with ... */
1106 63848 : ExecClearTuple(slot);
1107 :
1108 : /*
1109 : * Copy all the column data from srcslot, so that we'll have valid values
1110 : * for unreplaced columns.
1111 : */
1112 : Assert(natts == srcslot->tts_tupleDescriptor->natts);
1113 63848 : slot_getallattrs(srcslot);
1114 63848 : memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
1115 63848 : memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
1116 :
1117 : /* Call the "in" function for each replaced attribute */
1118 : Assert(natts == rel->attrmap->maplen);
1119 318560 : for (i = 0; i < natts; i++)
1120 : {
1121 254712 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
1122 254712 : int remoteattnum = rel->attrmap->attnums[i];
1123 :
1124 254712 : if (remoteattnum < 0)
1125 117038 : continue;
1126 :
1127 : Assert(remoteattnum < tupleData->ncols);
1128 :
1129 137674 : if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
1130 : {
1131 137668 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
1132 :
1133 : /* Set attnum for error callback */
1134 137668 : apply_error_callback_arg.remote_attnum = remoteattnum;
1135 :
1136 137668 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
1137 : {
1138 : Oid typinput;
1139 : Oid typioparam;
1140 :
1141 50860 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1142 101720 : slot->tts_values[i] =
1143 50860 : OidInputFunctionCall(typinput, colvalue->data,
1144 : typioparam, att->atttypmod);
1145 50860 : slot->tts_isnull[i] = false;
1146 : }
1147 86808 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1148 : {
1149 : Oid typreceive;
1150 : Oid typioparam;
1151 :
1152 : /*
1153 : * In some code paths we may be asked to re-parse the same
1154 : * tuple data. Reset the StringInfo's cursor so that works.
1155 : */
1156 86712 : colvalue->cursor = 0;
1157 :
1158 86712 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1159 173424 : slot->tts_values[i] =
1160 86712 : OidReceiveFunctionCall(typreceive, colvalue,
1161 : typioparam, att->atttypmod);
1162 :
1163 : /* Trouble if it didn't eat the whole buffer */
1164 86712 : if (colvalue->cursor != colvalue->len)
1165 0 : ereport(ERROR,
1166 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
1167 : errmsg("incorrect binary data format in logical replication column %d",
1168 : remoteattnum + 1)));
1169 86712 : slot->tts_isnull[i] = false;
1170 : }
1171 : else
1172 : {
1173 : /* must be LOGICALREP_COLUMN_NULL */
1174 96 : slot->tts_values[i] = (Datum) 0;
1175 96 : slot->tts_isnull[i] = true;
1176 : }
1177 :
1178 : /* Reset attnum for error callback */
1179 137668 : apply_error_callback_arg.remote_attnum = -1;
1180 : }
1181 : }
1182 :
1183 : /* And finally, declare that "slot" contains a valid virtual tuple */
1184 63848 : ExecStoreVirtualTuple(slot);
1185 63848 : }
1186 :
1187 : /*
1188 : * Handle BEGIN message.
1189 : */
1190 : static void
1191 972 : apply_handle_begin(StringInfo s)
1192 : {
1193 : LogicalRepBeginData begin_data;
1194 :
1195 : /* There must not be an active streaming transaction. */
1196 : Assert(!TransactionIdIsValid(stream_xid));
1197 :
1198 972 : logicalrep_read_begin(s, &begin_data);
1199 972 : set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
1200 :
1201 972 : remote_final_lsn = begin_data.final_lsn;
1202 :
1203 972 : maybe_start_skipping_changes(begin_data.final_lsn);
1204 :
1205 972 : in_remote_transaction = true;
1206 :
1207 972 : pgstat_report_activity(STATE_RUNNING, NULL);
1208 972 : }
1209 :
1210 : /*
1211 : * Handle COMMIT message.
1212 : *
1213 : * TODO, support tracking of multiple origins
1214 : */
1215 : static void
1216 866 : apply_handle_commit(StringInfo s)
1217 : {
1218 : LogicalRepCommitData commit_data;
1219 :
1220 866 : logicalrep_read_commit(s, &commit_data);
1221 :
1222 866 : if (commit_data.commit_lsn != remote_final_lsn)
1223 0 : ereport(ERROR,
1224 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1225 : errmsg_internal("incorrect commit LSN %X/%08X in commit message (expected %X/%08X)",
1226 : LSN_FORMAT_ARGS(commit_data.commit_lsn),
1227 : LSN_FORMAT_ARGS(remote_final_lsn))));
1228 :
1229 866 : apply_handle_commit_internal(&commit_data);
1230 :
1231 : /* Process any tables that are being synchronized in parallel. */
1232 866 : process_syncing_tables(commit_data.end_lsn);
1233 :
1234 866 : pgstat_report_activity(STATE_IDLE, NULL);
1235 866 : reset_apply_error_context_info();
1236 866 : }
1237 :
1238 : /*
1239 : * Handle BEGIN PREPARE message.
1240 : */
1241 : static void
1242 32 : apply_handle_begin_prepare(StringInfo s)
1243 : {
1244 : LogicalRepPreparedTxnData begin_data;
1245 :
1246 : /* Tablesync should never receive prepare. */
1247 32 : if (am_tablesync_worker())
1248 0 : ereport(ERROR,
1249 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1250 : errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1251 :
1252 : /* There must not be an active streaming transaction. */
1253 : Assert(!TransactionIdIsValid(stream_xid));
1254 :
1255 32 : logicalrep_read_begin_prepare(s, &begin_data);
1256 32 : set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1257 :
1258 32 : remote_final_lsn = begin_data.prepare_lsn;
1259 :
1260 32 : maybe_start_skipping_changes(begin_data.prepare_lsn);
1261 :
1262 32 : in_remote_transaction = true;
1263 :
1264 32 : pgstat_report_activity(STATE_RUNNING, NULL);
1265 32 : }
1266 :
1267 : /*
1268 : * Common function to prepare the GID.
1269 : */
1270 : static void
1271 46 : apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
1272 : {
1273 : char gid[GIDSIZE];
1274 :
1275 : /*
1276 : * Compute unique GID for two_phase transactions. We don't use GID of
1277 : * prepared transaction sent by server as that can lead to deadlock when
1278 : * we have multiple subscriptions from same node point to publications on
1279 : * the same node. See comments atop worker.c
1280 : */
1281 46 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
1282 : gid, sizeof(gid));
1283 :
1284 : /*
1285 : * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1286 : * called within the PrepareTransactionBlock below.
1287 : */
1288 46 : if (!IsTransactionBlock())
1289 : {
1290 46 : BeginTransactionBlock();
1291 46 : CommitTransactionCommand(); /* Completes the preceding Begin command. */
1292 : }
1293 :
1294 : /*
1295 : * Update origin state so we can restart streaming from correct position
1296 : * in case of crash.
1297 : */
1298 46 : replorigin_session_origin_lsn = prepare_data->end_lsn;
1299 46 : replorigin_session_origin_timestamp = prepare_data->prepare_time;
1300 :
1301 46 : PrepareTransactionBlock(gid);
1302 46 : }
1303 :
1304 : /*
1305 : * Handle PREPARE message.
1306 : */
1307 : static void
1308 30 : apply_handle_prepare(StringInfo s)
1309 : {
1310 : LogicalRepPreparedTxnData prepare_data;
1311 :
1312 30 : logicalrep_read_prepare(s, &prepare_data);
1313 :
1314 30 : if (prepare_data.prepare_lsn != remote_final_lsn)
1315 0 : ereport(ERROR,
1316 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1317 : errmsg_internal("incorrect prepare LSN %X/%08X in prepare message (expected %X/%08X)",
1318 : LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1319 : LSN_FORMAT_ARGS(remote_final_lsn))));
1320 :
1321 : /*
1322 : * Unlike commit, here, we always prepare the transaction even though no
1323 : * change has happened in this transaction or all changes are skipped. It
1324 : * is done this way because at commit prepared time, we won't know whether
1325 : * we have skipped preparing a transaction because of those reasons.
1326 : *
1327 : * XXX, We can optimize such that at commit prepared time, we first check
1328 : * whether we have prepared the transaction or not but that doesn't seem
1329 : * worthwhile because such cases shouldn't be common.
1330 : */
1331 30 : begin_replication_step();
1332 :
1333 30 : apply_handle_prepare_internal(&prepare_data);
1334 :
1335 30 : end_replication_step();
1336 30 : CommitTransactionCommand();
1337 28 : pgstat_report_stat(false);
1338 :
1339 : /*
1340 : * It is okay not to set the local_end LSN for the prepare because we
1341 : * always flush the prepare record. So, we can send the acknowledgment of
1342 : * the remote_end LSN as soon as prepare is finished.
1343 : *
1344 : * XXX For the sake of consistency with commit, we could have set it with
1345 : * the LSN of prepare but as of now we don't track that value similar to
1346 : * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
1347 : * it.
1348 : */
1349 28 : store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
1350 :
1351 28 : in_remote_transaction = false;
1352 :
1353 : /* Process any tables that are being synchronized in parallel. */
1354 28 : process_syncing_tables(prepare_data.end_lsn);
1355 :
1356 : /*
1357 : * Since we have already prepared the transaction, in a case where the
1358 : * server crashes before clearing the subskiplsn, it will be left but the
1359 : * transaction won't be resent. But that's okay because it's a rare case
1360 : * and the subskiplsn will be cleared when finishing the next transaction.
1361 : */
1362 28 : stop_skipping_changes();
1363 28 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1364 :
1365 28 : pgstat_report_activity(STATE_IDLE, NULL);
1366 28 : reset_apply_error_context_info();
1367 28 : }
1368 :
1369 : /*
1370 : * Handle a COMMIT PREPARED of a previously PREPARED transaction.
1371 : *
1372 : * Note that we don't need to wait here if the transaction was prepared in a
1373 : * parallel apply worker. In that case, we have already waited for the prepare
1374 : * to finish in apply_handle_stream_prepare() which will ensure all the
1375 : * operations in that transaction have happened in the subscriber, so no
1376 : * concurrent transaction can cause deadlock or transaction dependency issues.
1377 : */
1378 : static void
1379 40 : apply_handle_commit_prepared(StringInfo s)
1380 : {
1381 : LogicalRepCommitPreparedTxnData prepare_data;
1382 : char gid[GIDSIZE];
1383 :
1384 40 : logicalrep_read_commit_prepared(s, &prepare_data);
1385 40 : set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1386 :
1387 : /* Compute GID for two_phase transactions. */
1388 40 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
1389 : gid, sizeof(gid));
1390 :
1391 : /* There is no transaction when COMMIT PREPARED is called */
1392 40 : begin_replication_step();
1393 :
1394 : /*
1395 : * Update origin state so we can restart streaming from correct position
1396 : * in case of crash.
1397 : */
1398 40 : replorigin_session_origin_lsn = prepare_data.end_lsn;
1399 40 : replorigin_session_origin_timestamp = prepare_data.commit_time;
1400 :
1401 40 : FinishPreparedTransaction(gid, true);
1402 40 : end_replication_step();
1403 40 : CommitTransactionCommand();
1404 40 : pgstat_report_stat(false);
1405 :
1406 40 : store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
1407 40 : in_remote_transaction = false;
1408 :
1409 : /* Process any tables that are being synchronized in parallel. */
1410 40 : process_syncing_tables(prepare_data.end_lsn);
1411 :
1412 40 : clear_subscription_skip_lsn(prepare_data.end_lsn);
1413 :
1414 40 : pgstat_report_activity(STATE_IDLE, NULL);
1415 40 : reset_apply_error_context_info();
1416 40 : }
1417 :
1418 : /*
1419 : * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1420 : *
1421 : * Note that we don't need to wait here if the transaction was prepared in a
1422 : * parallel apply worker. In that case, we have already waited for the prepare
1423 : * to finish in apply_handle_stream_prepare() which will ensure all the
1424 : * operations in that transaction have happened in the subscriber, so no
1425 : * concurrent transaction can cause deadlock or transaction dependency issues.
1426 : */
1427 : static void
1428 10 : apply_handle_rollback_prepared(StringInfo s)
1429 : {
1430 : LogicalRepRollbackPreparedTxnData rollback_data;
1431 : char gid[GIDSIZE];
1432 :
1433 10 : logicalrep_read_rollback_prepared(s, &rollback_data);
1434 10 : set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1435 :
1436 : /* Compute GID for two_phase transactions. */
1437 10 : TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
1438 : gid, sizeof(gid));
1439 :
1440 : /*
1441 : * It is possible that we haven't received prepare because it occurred
1442 : * before walsender reached a consistent point or the two_phase was still
1443 : * not enabled by that time, so in such cases, we need to skip rollback
1444 : * prepared.
1445 : */
1446 10 : if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1447 : rollback_data.prepare_time))
1448 : {
1449 : /*
1450 : * Update origin state so we can restart streaming from correct
1451 : * position in case of crash.
1452 : */
1453 10 : replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
1454 10 : replorigin_session_origin_timestamp = rollback_data.rollback_time;
1455 :
1456 : /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1457 10 : begin_replication_step();
1458 10 : FinishPreparedTransaction(gid, false);
1459 10 : end_replication_step();
1460 10 : CommitTransactionCommand();
1461 :
1462 10 : clear_subscription_skip_lsn(rollback_data.rollback_end_lsn);
1463 : }
1464 :
1465 10 : pgstat_report_stat(false);
1466 :
1467 : /*
1468 : * It is okay not to set the local_end LSN for the rollback of prepared
1469 : * transaction because we always flush the WAL record for it. See
1470 : * apply_handle_prepare.
1471 : */
1472 10 : store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr);
1473 10 : in_remote_transaction = false;
1474 :
1475 : /* Process any tables that are being synchronized in parallel. */
1476 10 : process_syncing_tables(rollback_data.rollback_end_lsn);
1477 :
1478 10 : pgstat_report_activity(STATE_IDLE, NULL);
1479 10 : reset_apply_error_context_info();
1480 10 : }
1481 :
1482 : /*
1483 : * Handle STREAM PREPARE.
1484 : */
1485 : static void
1486 22 : apply_handle_stream_prepare(StringInfo s)
1487 : {
1488 : LogicalRepPreparedTxnData prepare_data;
1489 : ParallelApplyWorkerInfo *winfo;
1490 : TransApplyAction apply_action;
1491 :
1492 : /* Save the message before it is consumed. */
1493 22 : StringInfoData original_msg = *s;
1494 :
1495 22 : if (in_streamed_transaction)
1496 0 : ereport(ERROR,
1497 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1498 : errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1499 :
1500 : /* Tablesync should never receive prepare. */
1501 22 : if (am_tablesync_worker())
1502 0 : ereport(ERROR,
1503 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1504 : errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1505 :
1506 22 : logicalrep_read_stream_prepare(s, &prepare_data);
1507 22 : set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1508 :
1509 22 : apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1510 :
1511 22 : switch (apply_action)
1512 : {
1513 10 : case TRANS_LEADER_APPLY:
1514 :
1515 : /*
1516 : * The transaction has been serialized to file, so replay all the
1517 : * spooled operations.
1518 : */
1519 10 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
1520 : prepare_data.xid, prepare_data.prepare_lsn);
1521 :
1522 : /* Mark the transaction as prepared. */
1523 10 : apply_handle_prepare_internal(&prepare_data);
1524 :
1525 10 : CommitTransactionCommand();
1526 :
1527 : /*
1528 : * It is okay not to set the local_end LSN for the prepare because
1529 : * we always flush the prepare record. See apply_handle_prepare.
1530 : */
1531 10 : store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
1532 :
1533 10 : in_remote_transaction = false;
1534 :
1535 : /* Unlink the files with serialized changes and subxact info. */
1536 10 : stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
1537 :
1538 10 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1539 10 : break;
1540 :
1541 4 : case TRANS_LEADER_SEND_TO_PARALLEL:
1542 : Assert(winfo);
1543 :
1544 4 : if (pa_send_data(winfo, s->len, s->data))
1545 : {
1546 : /* Finish processing the streaming transaction. */
1547 4 : pa_xact_finish(winfo, prepare_data.end_lsn);
1548 4 : break;
1549 : }
1550 :
1551 : /*
1552 : * Switch to serialize mode when we are not able to send the
1553 : * change to parallel apply worker.
1554 : */
1555 0 : pa_switch_to_partial_serialize(winfo, true);
1556 :
1557 : /* fall through */
1558 2 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1559 : Assert(winfo);
1560 :
1561 2 : stream_open_and_write_change(prepare_data.xid,
1562 : LOGICAL_REP_MSG_STREAM_PREPARE,
1563 : &original_msg);
1564 :
1565 2 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
1566 :
1567 : /* Finish processing the streaming transaction. */
1568 2 : pa_xact_finish(winfo, prepare_data.end_lsn);
1569 2 : break;
1570 :
1571 6 : case TRANS_PARALLEL_APPLY:
1572 :
1573 : /*
1574 : * If the parallel apply worker is applying spooled messages then
1575 : * close the file before preparing.
1576 : */
1577 6 : if (stream_fd)
1578 2 : stream_close_file();
1579 :
1580 6 : begin_replication_step();
1581 :
1582 : /* Mark the transaction as prepared. */
1583 6 : apply_handle_prepare_internal(&prepare_data);
1584 :
1585 6 : end_replication_step();
1586 :
1587 6 : CommitTransactionCommand();
1588 :
1589 : /*
1590 : * It is okay not to set the local_end LSN for the prepare because
1591 : * we always flush the prepare record. See apply_handle_prepare.
1592 : */
1593 6 : MyParallelShared->last_commit_end = InvalidXLogRecPtr;
1594 :
1595 6 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
1596 6 : pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1597 :
1598 6 : pa_reset_subtrans();
1599 :
1600 6 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1601 6 : break;
1602 :
1603 0 : default:
1604 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1605 : break;
1606 : }
1607 :
1608 22 : pgstat_report_stat(false);
1609 :
1610 : /* Process any tables that are being synchronized in parallel. */
1611 22 : process_syncing_tables(prepare_data.end_lsn);
1612 :
1613 : /*
1614 : * Similar to prepare case, the subskiplsn could be left in a case of
1615 : * server crash but it's okay. See the comments in apply_handle_prepare().
1616 : */
1617 22 : stop_skipping_changes();
1618 22 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1619 :
1620 22 : pgstat_report_activity(STATE_IDLE, NULL);
1621 :
1622 22 : reset_apply_error_context_info();
1623 22 : }
1624 :
1625 : /*
1626 : * Handle ORIGIN message.
1627 : *
1628 : * TODO, support tracking of multiple origins
1629 : */
1630 : static void
1631 16 : apply_handle_origin(StringInfo s)
1632 : {
1633 : /*
1634 : * ORIGIN message can only come inside streaming transaction or inside
1635 : * remote transaction and before any actual writes.
1636 : */
1637 16 : if (!in_streamed_transaction &&
1638 24 : (!in_remote_transaction ||
1639 12 : (IsTransactionState() && !am_tablesync_worker())))
1640 0 : ereport(ERROR,
1641 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1642 : errmsg_internal("ORIGIN message sent out of order")));
1643 16 : }
1644 :
1645 : /*
1646 : * Initialize fileset (if not already done).
1647 : *
1648 : * Create a new file when first_segment is true, otherwise open the existing
1649 : * file.
1650 : */
1651 : void
1652 724 : stream_start_internal(TransactionId xid, bool first_segment)
1653 : {
1654 724 : begin_replication_step();
1655 :
1656 : /*
1657 : * Initialize the worker's stream_fileset if we haven't yet. This will be
1658 : * used for the entire duration of the worker so create it in a permanent
1659 : * context. We create this on the very first streaming message from any
1660 : * transaction and then use it for this and other streaming transactions.
1661 : * Now, we could create a fileset at the start of the worker as well but
1662 : * then we won't be sure that it will ever be used.
1663 : */
1664 724 : if (!MyLogicalRepWorker->stream_fileset)
1665 : {
1666 : MemoryContext oldctx;
1667 :
1668 28 : oldctx = MemoryContextSwitchTo(ApplyContext);
1669 :
1670 28 : MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
1671 28 : FileSetInit(MyLogicalRepWorker->stream_fileset);
1672 :
1673 28 : MemoryContextSwitchTo(oldctx);
1674 : }
1675 :
1676 : /* Open the spool file for this transaction. */
1677 724 : stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1678 :
1679 : /* If this is not the first segment, open existing subxact file. */
1680 724 : if (!first_segment)
1681 660 : subxact_info_read(MyLogicalRepWorker->subid, xid);
1682 :
1683 724 : end_replication_step();
1684 724 : }
1685 :
1686 : /*
1687 : * Handle STREAM START message.
1688 : */
1689 : static void
1690 1672 : apply_handle_stream_start(StringInfo s)
1691 : {
1692 : bool first_segment;
1693 : ParallelApplyWorkerInfo *winfo;
1694 : TransApplyAction apply_action;
1695 :
1696 : /* Save the message before it is consumed. */
1697 1672 : StringInfoData original_msg = *s;
1698 :
1699 1672 : if (in_streamed_transaction)
1700 0 : ereport(ERROR,
1701 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1702 : errmsg_internal("duplicate STREAM START message")));
1703 :
1704 : /* There must not be an active streaming transaction. */
1705 : Assert(!TransactionIdIsValid(stream_xid));
1706 :
1707 : /* notify handle methods we're processing a remote transaction */
1708 1672 : in_streamed_transaction = true;
1709 :
1710 : /* extract XID of the top-level transaction */
1711 1672 : stream_xid = logicalrep_read_stream_start(s, &first_segment);
1712 :
1713 1672 : if (!TransactionIdIsValid(stream_xid))
1714 0 : ereport(ERROR,
1715 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1716 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
1717 :
1718 1672 : set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
1719 :
1720 : /* Try to allocate a worker for the streaming transaction. */
1721 1672 : if (first_segment)
1722 164 : pa_allocate_worker(stream_xid);
1723 :
1724 1672 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1725 :
1726 1672 : switch (apply_action)
1727 : {
1728 684 : case TRANS_LEADER_SERIALIZE:
1729 :
1730 : /*
1731 : * Function stream_start_internal starts a transaction. This
1732 : * transaction will be committed on the stream stop unless it is a
1733 : * tablesync worker in which case it will be committed after
1734 : * processing all the messages. We need this transaction for
1735 : * handling the BufFile, used for serializing the streaming data
1736 : * and subxact info.
1737 : */
1738 684 : stream_start_internal(stream_xid, first_segment);
1739 684 : break;
1740 :
1741 482 : case TRANS_LEADER_SEND_TO_PARALLEL:
1742 : Assert(winfo);
1743 :
1744 : /*
1745 : * Once we start serializing the changes, the parallel apply
1746 : * worker will wait for the leader to release the stream lock
1747 : * until the end of the transaction. So, we don't need to release
1748 : * the lock or increment the stream count in that case.
1749 : */
1750 482 : if (pa_send_data(winfo, s->len, s->data))
1751 : {
1752 : /*
1753 : * Unlock the shared object lock so that the parallel apply
1754 : * worker can continue to receive changes.
1755 : */
1756 474 : if (!first_segment)
1757 428 : pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
1758 :
1759 : /*
1760 : * Increment the number of streaming blocks waiting to be
1761 : * processed by parallel apply worker.
1762 : */
1763 474 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
1764 :
1765 : /* Cache the parallel apply worker for this transaction. */
1766 474 : pa_set_stream_apply_worker(winfo);
1767 474 : break;
1768 : }
1769 :
1770 : /*
1771 : * Switch to serialize mode when we are not able to send the
1772 : * change to parallel apply worker.
1773 : */
1774 8 : pa_switch_to_partial_serialize(winfo, !first_segment);
1775 :
1776 : /* fall through */
1777 30 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1778 : Assert(winfo);
1779 :
1780 : /*
1781 : * Open the spool file unless it was already opened when switching
1782 : * to serialize mode. The transaction started in
1783 : * stream_start_internal will be committed on the stream stop.
1784 : */
1785 30 : if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1786 22 : stream_start_internal(stream_xid, first_segment);
1787 :
1788 30 : stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg);
1789 :
1790 : /* Cache the parallel apply worker for this transaction. */
1791 30 : pa_set_stream_apply_worker(winfo);
1792 30 : break;
1793 :
1794 484 : case TRANS_PARALLEL_APPLY:
1795 484 : if (first_segment)
1796 : {
1797 : /* Hold the lock until the end of the transaction. */
1798 54 : pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1799 54 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED);
1800 :
1801 : /*
1802 : * Signal the leader apply worker, as it may be waiting for
1803 : * us.
1804 : */
1805 54 : logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
1806 : }
1807 :
1808 484 : parallel_stream_nchanges = 0;
1809 484 : break;
1810 :
1811 0 : default:
1812 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1813 : break;
1814 : }
1815 :
1816 1672 : pgstat_report_activity(STATE_RUNNING, NULL);
1817 1672 : }
1818 :
1819 : /*
1820 : * Update the information about subxacts and close the file.
1821 : *
1822 : * This function should be called when the stream_start_internal function has
1823 : * been called.
1824 : */
1825 : void
1826 724 : stream_stop_internal(TransactionId xid)
1827 : {
1828 : /*
1829 : * Serialize information about subxacts for the toplevel transaction, then
1830 : * close the stream messages spool file.
1831 : */
1832 724 : subxact_info_write(MyLogicalRepWorker->subid, xid);
1833 724 : stream_close_file();
1834 :
1835 : /* We must be in a valid transaction state */
1836 : Assert(IsTransactionState());
1837 :
1838 : /* Commit the per-stream transaction */
1839 724 : CommitTransactionCommand();
1840 :
1841 : /* Reset per-stream context */
1842 724 : MemoryContextReset(LogicalStreamingContext);
1843 724 : }
1844 :
1845 : /*
1846 : * Handle STREAM STOP message.
1847 : */
1848 : static void
1849 1670 : apply_handle_stream_stop(StringInfo s)
1850 : {
1851 : ParallelApplyWorkerInfo *winfo;
1852 : TransApplyAction apply_action;
1853 :
1854 1670 : if (!in_streamed_transaction)
1855 0 : ereport(ERROR,
1856 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1857 : errmsg_internal("STREAM STOP message without STREAM START")));
1858 :
1859 1670 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1860 :
1861 1670 : switch (apply_action)
1862 : {
1863 684 : case TRANS_LEADER_SERIALIZE:
1864 684 : stream_stop_internal(stream_xid);
1865 684 : break;
1866 :
1867 474 : case TRANS_LEADER_SEND_TO_PARALLEL:
1868 : Assert(winfo);
1869 :
1870 : /*
1871 : * Lock before sending the STREAM_STOP message so that the leader
1872 : * can hold the lock first and the parallel apply worker will wait
1873 : * for leader to release the lock. See Locking Considerations atop
1874 : * applyparallelworker.c.
1875 : */
1876 474 : pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
1877 :
1878 474 : if (pa_send_data(winfo, s->len, s->data))
1879 : {
1880 474 : pa_set_stream_apply_worker(NULL);
1881 474 : break;
1882 : }
1883 :
1884 : /*
1885 : * Switch to serialize mode when we are not able to send the
1886 : * change to parallel apply worker.
1887 : */
1888 0 : pa_switch_to_partial_serialize(winfo, true);
1889 :
1890 : /* fall through */
1891 30 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1892 30 : stream_write_change(LOGICAL_REP_MSG_STREAM_STOP, s);
1893 30 : stream_stop_internal(stream_xid);
1894 30 : pa_set_stream_apply_worker(NULL);
1895 30 : break;
1896 :
1897 482 : case TRANS_PARALLEL_APPLY:
1898 482 : elog(DEBUG1, "applied %u changes in the streaming chunk",
1899 : parallel_stream_nchanges);
1900 :
1901 : /*
1902 : * By the time parallel apply worker is processing the changes in
1903 : * the current streaming block, the leader apply worker may have
1904 : * sent multiple streaming blocks. This can lead to parallel apply
1905 : * worker start waiting even when there are more chunk of streams
1906 : * in the queue. So, try to lock only if there is no message left
1907 : * in the queue. See Locking Considerations atop
1908 : * applyparallelworker.c.
1909 : *
1910 : * Note that here we have a race condition where we can start
1911 : * waiting even when there are pending streaming chunks. This can
1912 : * happen if the leader sends another streaming block and acquires
1913 : * the stream lock again after the parallel apply worker checks
1914 : * that there is no pending streaming block and before it actually
1915 : * starts waiting on a lock. We can handle this case by not
1916 : * allowing the leader to increment the stream block count during
1917 : * the time parallel apply worker acquires the lock but it is not
1918 : * clear whether that is worth the complexity.
1919 : *
1920 : * Now, if this missed chunk contains rollback to savepoint, then
1921 : * there is a risk of deadlock which probably shouldn't happen
1922 : * after restart.
1923 : */
1924 482 : pa_decr_and_wait_stream_block();
1925 478 : break;
1926 :
1927 0 : default:
1928 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1929 : break;
1930 : }
1931 :
1932 1666 : in_streamed_transaction = false;
1933 1666 : stream_xid = InvalidTransactionId;
1934 :
1935 : /*
1936 : * The parallel apply worker could be in a transaction in which case we
1937 : * need to report the state as STATE_IDLEINTRANSACTION.
1938 : */
1939 1666 : if (IsTransactionOrTransactionBlock())
1940 478 : pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL);
1941 : else
1942 1188 : pgstat_report_activity(STATE_IDLE, NULL);
1943 :
1944 1666 : reset_apply_error_context_info();
1945 1666 : }
1946 :
1947 : /*
1948 : * Helper function to handle STREAM ABORT message when the transaction was
1949 : * serialized to file.
1950 : */
1951 : static void
1952 28 : stream_abort_internal(TransactionId xid, TransactionId subxid)
1953 : {
1954 : /*
1955 : * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1956 : * just delete the files with serialized info.
1957 : */
1958 28 : if (xid == subxid)
1959 2 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
1960 : else
1961 : {
1962 : /*
1963 : * OK, so it's a subxact. We need to read the subxact file for the
1964 : * toplevel transaction, determine the offset tracked for the subxact,
1965 : * and truncate the file with changes. We also remove the subxacts
1966 : * with higher offsets (or rather higher XIDs).
1967 : *
1968 : * We intentionally scan the array from the tail, because we're likely
1969 : * aborting a change for the most recent subtransactions.
1970 : *
1971 : * We can't use the binary search here as subxact XIDs won't
1972 : * necessarily arrive in sorted order, consider the case where we have
1973 : * released the savepoint for multiple subtransactions and then
1974 : * performed rollback to savepoint for one of the earlier
1975 : * sub-transaction.
1976 : */
1977 : int64 i;
1978 : int64 subidx;
1979 : BufFile *fd;
1980 26 : bool found = false;
1981 : char path[MAXPGPATH];
1982 :
1983 26 : subidx = -1;
1984 26 : begin_replication_step();
1985 26 : subxact_info_read(MyLogicalRepWorker->subid, xid);
1986 :
1987 30 : for (i = subxact_data.nsubxacts; i > 0; i--)
1988 : {
1989 22 : if (subxact_data.subxacts[i - 1].xid == subxid)
1990 : {
1991 18 : subidx = (i - 1);
1992 18 : found = true;
1993 18 : break;
1994 : }
1995 : }
1996 :
1997 : /*
1998 : * If it's an empty sub-transaction then we will not find the subxid
1999 : * here so just cleanup the subxact info and return.
2000 : */
2001 26 : if (!found)
2002 : {
2003 : /* Cleanup the subxact info */
2004 8 : cleanup_subxact_info();
2005 8 : end_replication_step();
2006 8 : CommitTransactionCommand();
2007 8 : return;
2008 : }
2009 :
2010 : /* open the changes file */
2011 18 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2012 18 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
2013 : O_RDWR, false);
2014 :
2015 : /* OK, truncate the file at the right offset */
2016 18 : BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
2017 18 : subxact_data.subxacts[subidx].offset);
2018 18 : BufFileClose(fd);
2019 :
2020 : /* discard the subxacts added later */
2021 18 : subxact_data.nsubxacts = subidx;
2022 :
2023 : /* write the updated subxact list */
2024 18 : subxact_info_write(MyLogicalRepWorker->subid, xid);
2025 :
2026 18 : end_replication_step();
2027 18 : CommitTransactionCommand();
2028 : }
2029 : }
2030 :
2031 : /*
2032 : * Handle STREAM ABORT message.
2033 : */
2034 : static void
2035 76 : apply_handle_stream_abort(StringInfo s)
2036 : {
2037 : TransactionId xid;
2038 : TransactionId subxid;
2039 : LogicalRepStreamAbortData abort_data;
2040 : ParallelApplyWorkerInfo *winfo;
2041 : TransApplyAction apply_action;
2042 :
2043 : /* Save the message before it is consumed. */
2044 76 : StringInfoData original_msg = *s;
2045 : bool toplevel_xact;
2046 :
2047 76 : if (in_streamed_transaction)
2048 0 : ereport(ERROR,
2049 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2050 : errmsg_internal("STREAM ABORT message without STREAM STOP")));
2051 :
2052 : /* We receive abort information only when we can apply in parallel. */
2053 76 : logicalrep_read_stream_abort(s, &abort_data,
2054 76 : MyLogicalRepWorker->parallel_apply);
2055 :
2056 76 : xid = abort_data.xid;
2057 76 : subxid = abort_data.subxid;
2058 76 : toplevel_xact = (xid == subxid);
2059 :
2060 76 : set_apply_error_context_xact(subxid, abort_data.abort_lsn);
2061 :
2062 76 : apply_action = get_transaction_apply_action(xid, &winfo);
2063 :
2064 76 : switch (apply_action)
2065 : {
2066 28 : case TRANS_LEADER_APPLY:
2067 :
2068 : /*
2069 : * We are in the leader apply worker and the transaction has been
2070 : * serialized to file.
2071 : */
2072 28 : stream_abort_internal(xid, subxid);
2073 :
2074 28 : elog(DEBUG1, "finished processing the STREAM ABORT command");
2075 28 : break;
2076 :
2077 20 : case TRANS_LEADER_SEND_TO_PARALLEL:
2078 : Assert(winfo);
2079 :
2080 : /*
2081 : * For the case of aborting the subtransaction, we increment the
2082 : * number of streaming blocks and take the lock again before
2083 : * sending the STREAM_ABORT to ensure that the parallel apply
2084 : * worker will wait on the lock for the next set of changes after
2085 : * processing the STREAM_ABORT message if it is not already
2086 : * waiting for STREAM_STOP message.
2087 : *
2088 : * It is important to perform this locking before sending the
2089 : * STREAM_ABORT message so that the leader can hold the lock first
2090 : * and the parallel apply worker will wait for the leader to
2091 : * release the lock. This is the same as what we do in
2092 : * apply_handle_stream_stop. See Locking Considerations atop
2093 : * applyparallelworker.c.
2094 : */
2095 20 : if (!toplevel_xact)
2096 : {
2097 18 : pa_unlock_stream(xid, AccessExclusiveLock);
2098 18 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
2099 18 : pa_lock_stream(xid, AccessExclusiveLock);
2100 : }
2101 :
2102 20 : if (pa_send_data(winfo, s->len, s->data))
2103 : {
2104 : /*
2105 : * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
2106 : * wait here for the parallel apply worker to finish as that
2107 : * is not required to maintain the commit order and won't have
2108 : * the risk of failures due to transaction dependencies and
2109 : * deadlocks. However, it is possible that before the parallel
2110 : * worker finishes and we clear the worker info, the xid
2111 : * wraparound happens on the upstream and a new transaction
2112 : * with the same xid can appear and that can lead to duplicate
2113 : * entries in ParallelApplyTxnHash. Yet another problem could
2114 : * be that we may have serialized the changes in partial
2115 : * serialize mode and the file containing xact changes may
2116 : * already exist, and after xid wraparound trying to create
2117 : * the file for the same xid can lead to an error. To avoid
2118 : * these problems, we decide to wait for the aborts to finish.
2119 : *
2120 : * Note, it is okay to not update the flush location position
2121 : * for aborts as in worst case that means such a transaction
2122 : * won't be sent again after restart.
2123 : */
2124 20 : if (toplevel_xact)
2125 2 : pa_xact_finish(winfo, InvalidXLogRecPtr);
2126 :
2127 20 : break;
2128 : }
2129 :
2130 : /*
2131 : * Switch to serialize mode when we are not able to send the
2132 : * change to parallel apply worker.
2133 : */
2134 0 : pa_switch_to_partial_serialize(winfo, true);
2135 :
2136 : /* fall through */
2137 4 : case TRANS_LEADER_PARTIAL_SERIALIZE:
2138 : Assert(winfo);
2139 :
2140 : /*
2141 : * Parallel apply worker might have applied some changes, so write
2142 : * the STREAM_ABORT message so that it can rollback the
2143 : * subtransaction if needed.
2144 : */
2145 4 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_ABORT,
2146 : &original_msg);
2147 :
2148 4 : if (toplevel_xact)
2149 : {
2150 2 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
2151 2 : pa_xact_finish(winfo, InvalidXLogRecPtr);
2152 : }
2153 4 : break;
2154 :
2155 24 : case TRANS_PARALLEL_APPLY:
2156 :
2157 : /*
2158 : * If the parallel apply worker is applying spooled messages then
2159 : * close the file before aborting.
2160 : */
2161 24 : if (toplevel_xact && stream_fd)
2162 2 : stream_close_file();
2163 :
2164 24 : pa_stream_abort(&abort_data);
2165 :
2166 : /*
2167 : * We need to wait after processing rollback to savepoint for the
2168 : * next set of changes.
2169 : *
2170 : * We have a race condition here due to which we can start waiting
2171 : * here when there are more chunk of streams in the queue. See
2172 : * apply_handle_stream_stop.
2173 : */
2174 24 : if (!toplevel_xact)
2175 20 : pa_decr_and_wait_stream_block();
2176 :
2177 24 : elog(DEBUG1, "finished processing the STREAM ABORT command");
2178 24 : break;
2179 :
2180 0 : default:
2181 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2182 : break;
2183 : }
2184 :
2185 76 : reset_apply_error_context_info();
2186 76 : }
2187 :
2188 : /*
2189 : * Ensure that the passed location is fileset's end.
2190 : */
2191 : static void
2192 8 : ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
2193 : off_t offset)
2194 : {
2195 : char path[MAXPGPATH];
2196 : BufFile *fd;
2197 : int last_fileno;
2198 : off_t last_offset;
2199 :
2200 : Assert(!IsTransactionState());
2201 :
2202 8 : begin_replication_step();
2203 :
2204 8 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2205 :
2206 8 : fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2207 :
2208 8 : BufFileSeek(fd, 0, 0, SEEK_END);
2209 8 : BufFileTell(fd, &last_fileno, &last_offset);
2210 :
2211 8 : BufFileClose(fd);
2212 :
2213 8 : end_replication_step();
2214 :
2215 8 : if (last_fileno != fileno || last_offset != offset)
2216 0 : elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2217 : path);
2218 8 : }
2219 :
2220 : /*
2221 : * Common spoolfile processing.
2222 : */
2223 : void
2224 62 : apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
2225 : XLogRecPtr lsn)
2226 : {
2227 : int nchanges;
2228 : char path[MAXPGPATH];
2229 62 : char *buffer = NULL;
2230 : MemoryContext oldcxt;
2231 : ResourceOwner oldowner;
2232 : int fileno;
2233 : off_t offset;
2234 :
2235 62 : if (!am_parallel_apply_worker())
2236 54 : maybe_start_skipping_changes(lsn);
2237 :
2238 : /* Make sure we have an open transaction */
2239 62 : begin_replication_step();
2240 :
2241 : /*
2242 : * Allocate file handle and memory required to process all the messages in
2243 : * TopTransactionContext to avoid them getting reset after each message is
2244 : * processed.
2245 : */
2246 62 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
2247 :
2248 : /* Open the spool file for the committed/prepared transaction */
2249 62 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2250 62 : elog(DEBUG1, "replaying changes from file \"%s\"", path);
2251 :
2252 : /*
2253 : * Make sure the file is owned by the toplevel transaction so that the
2254 : * file will not be accidentally closed when aborting a subtransaction.
2255 : */
2256 62 : oldowner = CurrentResourceOwner;
2257 62 : CurrentResourceOwner = TopTransactionResourceOwner;
2258 :
2259 62 : stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2260 :
2261 62 : CurrentResourceOwner = oldowner;
2262 :
2263 62 : buffer = palloc(BLCKSZ);
2264 :
2265 62 : MemoryContextSwitchTo(oldcxt);
2266 :
2267 62 : remote_final_lsn = lsn;
2268 :
2269 : /*
2270 : * Make sure the handle apply_dispatch methods are aware we're in a remote
2271 : * transaction.
2272 : */
2273 62 : in_remote_transaction = true;
2274 62 : pgstat_report_activity(STATE_RUNNING, NULL);
2275 :
2276 62 : end_replication_step();
2277 :
2278 : /*
2279 : * Read the entries one by one and pass them through the same logic as in
2280 : * apply_dispatch.
2281 : */
2282 62 : nchanges = 0;
2283 : while (true)
2284 176938 : {
2285 : StringInfoData s2;
2286 : size_t nbytes;
2287 : int len;
2288 :
2289 177000 : CHECK_FOR_INTERRUPTS();
2290 :
2291 : /* read length of the on-disk record */
2292 177000 : nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2293 :
2294 : /* have we reached end of the file? */
2295 177000 : if (nbytes == 0)
2296 52 : break;
2297 :
2298 : /* do we have a correct length? */
2299 176948 : if (len <= 0)
2300 0 : elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2301 : len, path);
2302 :
2303 : /* make sure we have sufficiently large buffer */
2304 176948 : buffer = repalloc(buffer, len);
2305 :
2306 : /* and finally read the data into the buffer */
2307 176948 : BufFileReadExact(stream_fd, buffer, len);
2308 :
2309 176948 : BufFileTell(stream_fd, &fileno, &offset);
2310 :
2311 : /* init a stringinfo using the buffer and call apply_dispatch */
2312 176948 : initReadOnlyStringInfo(&s2, buffer, len);
2313 :
2314 : /* Ensure we are reading the data into our memory context. */
2315 176948 : oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
2316 :
2317 176948 : apply_dispatch(&s2);
2318 :
2319 176946 : MemoryContextReset(ApplyMessageContext);
2320 :
2321 176946 : MemoryContextSwitchTo(oldcxt);
2322 :
2323 176946 : nchanges++;
2324 :
2325 : /*
2326 : * It is possible the file has been closed because we have processed
2327 : * the transaction end message like stream_commit in which case that
2328 : * must be the last message.
2329 : */
2330 176946 : if (!stream_fd)
2331 : {
2332 8 : ensure_last_message(stream_fileset, xid, fileno, offset);
2333 8 : break;
2334 : }
2335 :
2336 176938 : if (nchanges % 1000 == 0)
2337 166 : elog(DEBUG1, "replayed %d changes from file \"%s\"",
2338 : nchanges, path);
2339 : }
2340 :
2341 60 : if (stream_fd)
2342 52 : stream_close_file();
2343 :
2344 60 : elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2345 : nchanges, path);
2346 :
2347 60 : return;
2348 : }
2349 :
2350 : /*
2351 : * Handle STREAM COMMIT message.
2352 : */
2353 : static void
2354 122 : apply_handle_stream_commit(StringInfo s)
2355 : {
2356 : TransactionId xid;
2357 : LogicalRepCommitData commit_data;
2358 : ParallelApplyWorkerInfo *winfo;
2359 : TransApplyAction apply_action;
2360 :
2361 : /* Save the message before it is consumed. */
2362 122 : StringInfoData original_msg = *s;
2363 :
2364 122 : if (in_streamed_transaction)
2365 0 : ereport(ERROR,
2366 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2367 : errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2368 :
2369 122 : xid = logicalrep_read_stream_commit(s, &commit_data);
2370 122 : set_apply_error_context_xact(xid, commit_data.commit_lsn);
2371 :
2372 122 : apply_action = get_transaction_apply_action(xid, &winfo);
2373 :
2374 122 : switch (apply_action)
2375 : {
2376 44 : case TRANS_LEADER_APPLY:
2377 :
2378 : /*
2379 : * The transaction has been serialized to file, so replay all the
2380 : * spooled operations.
2381 : */
2382 44 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
2383 : commit_data.commit_lsn);
2384 :
2385 42 : apply_handle_commit_internal(&commit_data);
2386 :
2387 : /* Unlink the files with serialized changes and subxact info. */
2388 42 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
2389 :
2390 42 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2391 42 : break;
2392 :
2393 36 : case TRANS_LEADER_SEND_TO_PARALLEL:
2394 : Assert(winfo);
2395 :
2396 36 : if (pa_send_data(winfo, s->len, s->data))
2397 : {
2398 : /* Finish processing the streaming transaction. */
2399 36 : pa_xact_finish(winfo, commit_data.end_lsn);
2400 34 : break;
2401 : }
2402 :
2403 : /*
2404 : * Switch to serialize mode when we are not able to send the
2405 : * change to parallel apply worker.
2406 : */
2407 0 : pa_switch_to_partial_serialize(winfo, true);
2408 :
2409 : /* fall through */
2410 4 : case TRANS_LEADER_PARTIAL_SERIALIZE:
2411 : Assert(winfo);
2412 :
2413 4 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_COMMIT,
2414 : &original_msg);
2415 :
2416 4 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
2417 :
2418 : /* Finish processing the streaming transaction. */
2419 4 : pa_xact_finish(winfo, commit_data.end_lsn);
2420 4 : break;
2421 :
2422 38 : case TRANS_PARALLEL_APPLY:
2423 :
2424 : /*
2425 : * If the parallel apply worker is applying spooled messages then
2426 : * close the file before committing.
2427 : */
2428 38 : if (stream_fd)
2429 4 : stream_close_file();
2430 :
2431 38 : apply_handle_commit_internal(&commit_data);
2432 :
2433 38 : MyParallelShared->last_commit_end = XactLastCommitEnd;
2434 :
2435 : /*
2436 : * It is important to set the transaction state as finished before
2437 : * releasing the lock. See pa_wait_for_xact_finish.
2438 : */
2439 38 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
2440 38 : pa_unlock_transaction(xid, AccessExclusiveLock);
2441 :
2442 38 : pa_reset_subtrans();
2443 :
2444 38 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2445 38 : break;
2446 :
2447 0 : default:
2448 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2449 : break;
2450 : }
2451 :
2452 : /* Process any tables that are being synchronized in parallel. */
2453 118 : process_syncing_tables(commit_data.end_lsn);
2454 :
2455 118 : pgstat_report_activity(STATE_IDLE, NULL);
2456 :
2457 118 : reset_apply_error_context_info();
2458 118 : }
2459 :
2460 : /*
2461 : * Helper function for apply_handle_commit and apply_handle_stream_commit.
2462 : */
2463 : static void
2464 946 : apply_handle_commit_internal(LogicalRepCommitData *commit_data)
2465 : {
2466 946 : if (is_skipping_changes())
2467 : {
2468 4 : stop_skipping_changes();
2469 :
2470 : /*
2471 : * Start a new transaction to clear the subskiplsn, if not started
2472 : * yet.
2473 : */
2474 4 : if (!IsTransactionState())
2475 2 : StartTransactionCommand();
2476 : }
2477 :
2478 946 : if (IsTransactionState())
2479 : {
2480 : /*
2481 : * The transaction is either non-empty or skipped, so we clear the
2482 : * subskiplsn.
2483 : */
2484 946 : clear_subscription_skip_lsn(commit_data->commit_lsn);
2485 :
2486 : /*
2487 : * Update origin state so we can restart streaming from correct
2488 : * position in case of crash.
2489 : */
2490 946 : replorigin_session_origin_lsn = commit_data->end_lsn;
2491 946 : replorigin_session_origin_timestamp = commit_data->committime;
2492 :
2493 946 : CommitTransactionCommand();
2494 :
2495 946 : if (IsTransactionBlock())
2496 : {
2497 8 : EndTransactionBlock(false);
2498 8 : CommitTransactionCommand();
2499 : }
2500 :
2501 946 : pgstat_report_stat(false);
2502 :
2503 946 : store_flush_position(commit_data->end_lsn, XactLastCommitEnd);
2504 : }
2505 : else
2506 : {
2507 : /* Process any invalidation messages that might have accumulated. */
2508 0 : AcceptInvalidationMessages();
2509 0 : maybe_reread_subscription();
2510 : }
2511 :
2512 946 : in_remote_transaction = false;
2513 946 : }
2514 :
2515 : /*
2516 : * Handle RELATION message.
2517 : *
2518 : * Note we don't do validation against local schema here. The validation
2519 : * against local schema is postponed until first change for given relation
2520 : * comes as we only care about it when applying changes for it anyway and we
2521 : * do less locking this way.
2522 : */
2523 : static void
2524 936 : apply_handle_relation(StringInfo s)
2525 : {
2526 : LogicalRepRelation *rel;
2527 :
2528 936 : if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
2529 70 : return;
2530 :
2531 866 : rel = logicalrep_read_rel(s);
2532 866 : logicalrep_relmap_update(rel);
2533 :
2534 : /* Also reset all entries in the partition map that refer to remoterel. */
2535 866 : logicalrep_partmap_reset_relmap(rel);
2536 : }
2537 :
2538 : /*
2539 : * Handle TYPE message.
2540 : *
2541 : * This implementation pays no attention to TYPE messages; we expect the user
2542 : * to have set things up so that the incoming data is acceptable to the input
2543 : * functions for the locally subscribed tables. Hence, we just read and
2544 : * discard the message.
2545 : */
2546 : static void
2547 36 : apply_handle_type(StringInfo s)
2548 : {
2549 : LogicalRepTyp typ;
2550 :
2551 36 : if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
2552 0 : return;
2553 :
2554 36 : logicalrep_read_typ(s, &typ);
2555 : }
2556 :
2557 : /*
2558 : * Check that we (the subscription owner) have sufficient privileges on the
2559 : * target relation to perform the given operation.
2560 : */
2561 : static void
2562 441168 : TargetPrivilegesCheck(Relation rel, AclMode mode)
2563 : {
2564 : Oid relid;
2565 : AclResult aclresult;
2566 :
2567 441168 : relid = RelationGetRelid(rel);
2568 441168 : aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
2569 441168 : if (aclresult != ACLCHECK_OK)
2570 16 : aclcheck_error(aclresult,
2571 16 : get_relkind_objtype(rel->rd_rel->relkind),
2572 16 : get_rel_name(relid));
2573 :
2574 : /*
2575 : * We lack the infrastructure to honor RLS policies. It might be possible
2576 : * to add such infrastructure here, but tablesync workers lack it, too, so
2577 : * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2578 : * but it seems dangerous to replicate a TRUNCATE and then refuse to
2579 : * replicate subsequent INSERTs, so we forbid all commands the same.
2580 : */
2581 441152 : if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2582 6 : ereport(ERROR,
2583 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2584 : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2585 : GetUserNameFromId(GetUserId(), true),
2586 : RelationGetRelationName(rel))));
2587 441146 : }
2588 :
2589 : /*
2590 : * Handle INSERT message.
2591 : */
2592 :
2593 : static void
2594 372254 : apply_handle_insert(StringInfo s)
2595 : {
2596 : LogicalRepRelMapEntry *rel;
2597 : LogicalRepTupleData newtup;
2598 : LogicalRepRelId relid;
2599 : UserContext ucxt;
2600 : ApplyExecutionData *edata;
2601 : EState *estate;
2602 : TupleTableSlot *remoteslot;
2603 : MemoryContext oldctx;
2604 : bool run_as_owner;
2605 :
2606 : /*
2607 : * Quick return if we are skipping data modification changes or handling
2608 : * streamed transactions.
2609 : */
2610 724506 : if (is_skipping_changes() ||
2611 352252 : handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
2612 220132 : return;
2613 :
2614 152240 : begin_replication_step();
2615 :
2616 152236 : relid = logicalrep_read_insert(s, &newtup);
2617 152236 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2618 152222 : if (!should_apply_changes_for_rel(rel))
2619 : {
2620 : /*
2621 : * The relation can't become interesting in the middle of the
2622 : * transaction so it's safe to unlock it.
2623 : */
2624 118 : logicalrep_rel_close(rel, RowExclusiveLock);
2625 118 : end_replication_step();
2626 118 : return;
2627 : }
2628 :
2629 : /*
2630 : * Make sure that any user-supplied code runs as the table owner, unless
2631 : * the user has opted out of that behavior.
2632 : */
2633 152104 : run_as_owner = MySubscription->runasowner;
2634 152104 : if (!run_as_owner)
2635 152088 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2636 :
2637 : /* Set relation for error callback */
2638 152104 : apply_error_callback_arg.rel = rel;
2639 :
2640 : /* Initialize the executor state. */
2641 152104 : edata = create_edata_for_relation(rel);
2642 152104 : estate = edata->estate;
2643 152104 : remoteslot = ExecInitExtraTupleSlot(estate,
2644 152104 : RelationGetDescr(rel->localrel),
2645 : &TTSOpsVirtual);
2646 :
2647 : /* Process and store remote tuple in the slot */
2648 152104 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2649 152104 : slot_store_data(remoteslot, rel, &newtup);
2650 152104 : slot_fill_defaults(rel, estate, remoteslot);
2651 152104 : MemoryContextSwitchTo(oldctx);
2652 :
2653 : /* For a partitioned table, insert the tuple into a partition. */
2654 152104 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2655 132 : apply_handle_tuple_routing(edata,
2656 : remoteslot, NULL, CMD_INSERT);
2657 : else
2658 : {
2659 151972 : ResultRelInfo *relinfo = edata->targetRelInfo;
2660 :
2661 151972 : ExecOpenIndices(relinfo, false);
2662 151972 : apply_handle_insert_internal(edata, relinfo, remoteslot);
2663 151942 : ExecCloseIndices(relinfo);
2664 : }
2665 :
2666 152030 : finish_edata(edata);
2667 :
2668 : /* Reset relation for error callback */
2669 152030 : apply_error_callback_arg.rel = NULL;
2670 :
2671 152030 : if (!run_as_owner)
2672 152020 : RestoreUserContext(&ucxt);
2673 :
2674 152030 : logicalrep_rel_close(rel, NoLock);
2675 :
2676 152030 : end_replication_step();
2677 : }
2678 :
2679 : /*
2680 : * Workhorse for apply_handle_insert()
2681 : * relinfo is for the relation we're actually inserting into
2682 : * (could be a child partition of edata->targetRelInfo)
2683 : */
2684 : static void
2685 152106 : apply_handle_insert_internal(ApplyExecutionData *edata,
2686 : ResultRelInfo *relinfo,
2687 : TupleTableSlot *remoteslot)
2688 : {
2689 152106 : EState *estate = edata->estate;
2690 :
2691 : /* Caller should have opened indexes already. */
2692 : Assert(relinfo->ri_IndexRelationDescs != NULL ||
2693 : !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
2694 : RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
2695 :
2696 : /* Caller will not have done this bit. */
2697 : Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
2698 152106 : InitConflictIndexes(relinfo);
2699 :
2700 : /* Do the insert. */
2701 152106 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
2702 152094 : ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2703 152032 : }
2704 :
2705 : /*
2706 : * Check if the logical replication relation is updatable and throw
2707 : * appropriate error if it isn't.
2708 : */
2709 : static void
2710 144580 : check_relation_updatable(LogicalRepRelMapEntry *rel)
2711 : {
2712 : /*
2713 : * For partitioned tables, we only need to care if the target partition is
2714 : * updatable (aka has PK or RI defined for it).
2715 : */
2716 144580 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2717 60 : return;
2718 :
2719 : /* Updatable, no error. */
2720 144520 : if (rel->updatable)
2721 144520 : return;
2722 :
2723 : /*
2724 : * We are in error mode so it's fine this is somewhat slow. It's better to
2725 : * give user correct error.
2726 : */
2727 0 : if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
2728 : {
2729 0 : ereport(ERROR,
2730 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2731 : errmsg("publisher did not send replica identity column "
2732 : "expected by the logical replication target relation \"%s.%s\"",
2733 : rel->remoterel.nspname, rel->remoterel.relname)));
2734 : }
2735 :
2736 0 : ereport(ERROR,
2737 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2738 : errmsg("logical replication target relation \"%s.%s\" has "
2739 : "neither REPLICA IDENTITY index nor PRIMARY "
2740 : "KEY and published relation does not have "
2741 : "REPLICA IDENTITY FULL",
2742 : rel->remoterel.nspname, rel->remoterel.relname)));
2743 : }
2744 :
2745 : /*
2746 : * Handle UPDATE message.
2747 : *
2748 : * TODO: FDW support
2749 : */
2750 : static void
2751 132326 : apply_handle_update(StringInfo s)
2752 : {
2753 : LogicalRepRelMapEntry *rel;
2754 : LogicalRepRelId relid;
2755 : UserContext ucxt;
2756 : ApplyExecutionData *edata;
2757 : EState *estate;
2758 : LogicalRepTupleData oldtup;
2759 : LogicalRepTupleData newtup;
2760 : bool has_oldtup;
2761 : TupleTableSlot *remoteslot;
2762 : RTEPermissionInfo *target_perminfo;
2763 : MemoryContext oldctx;
2764 : bool run_as_owner;
2765 :
2766 : /*
2767 : * Quick return if we are skipping data modification changes or handling
2768 : * streamed transactions.
2769 : */
2770 264646 : if (is_skipping_changes() ||
2771 132320 : handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
2772 68446 : return;
2773 :
2774 63880 : begin_replication_step();
2775 :
2776 63878 : relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2777 : &newtup);
2778 63878 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2779 63878 : if (!should_apply_changes_for_rel(rel))
2780 : {
2781 : /*
2782 : * The relation can't become interesting in the middle of the
2783 : * transaction so it's safe to unlock it.
2784 : */
2785 0 : logicalrep_rel_close(rel, RowExclusiveLock);
2786 0 : end_replication_step();
2787 0 : return;
2788 : }
2789 :
2790 : /* Set relation for error callback */
2791 63878 : apply_error_callback_arg.rel = rel;
2792 :
2793 : /* Check if we can do the update. */
2794 63878 : check_relation_updatable(rel);
2795 :
2796 : /*
2797 : * Make sure that any user-supplied code runs as the table owner, unless
2798 : * the user has opted out of that behavior.
2799 : */
2800 63878 : run_as_owner = MySubscription->runasowner;
2801 63878 : if (!run_as_owner)
2802 63870 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2803 :
2804 : /* Initialize the executor state. */
2805 63876 : edata = create_edata_for_relation(rel);
2806 63876 : estate = edata->estate;
2807 63876 : remoteslot = ExecInitExtraTupleSlot(estate,
2808 63876 : RelationGetDescr(rel->localrel),
2809 : &TTSOpsVirtual);
2810 :
2811 : /*
2812 : * Populate updatedCols so that per-column triggers can fire, and so
2813 : * executor can correctly pass down indexUnchanged hint. This could
2814 : * include more columns than were actually changed on the publisher
2815 : * because the logical replication protocol doesn't contain that
2816 : * information. But it would for example exclude columns that only exist
2817 : * on the subscriber, since we are not touching those.
2818 : */
2819 63876 : target_perminfo = list_nth(estate->es_rteperminfos, 0);
2820 318630 : for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2821 : {
2822 254754 : Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i);
2823 254754 : int remoteattnum = rel->attrmap->attnums[i];
2824 :
2825 254754 : if (!att->attisdropped && remoteattnum >= 0)
2826 : {
2827 : Assert(remoteattnum < newtup.ncols);
2828 137720 : if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2829 137714 : target_perminfo->updatedCols =
2830 137714 : bms_add_member(target_perminfo->updatedCols,
2831 : i + 1 - FirstLowInvalidHeapAttributeNumber);
2832 : }
2833 : }
2834 :
2835 : /* Build the search tuple. */
2836 63876 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2837 63876 : slot_store_data(remoteslot, rel,
2838 63876 : has_oldtup ? &oldtup : &newtup);
2839 63876 : MemoryContextSwitchTo(oldctx);
2840 :
2841 : /* For a partitioned table, apply update to correct partition. */
2842 63876 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2843 26 : apply_handle_tuple_routing(edata,
2844 : remoteslot, &newtup, CMD_UPDATE);
2845 : else
2846 63850 : apply_handle_update_internal(edata, edata->targetRelInfo,
2847 : remoteslot, &newtup, rel->localindexoid);
2848 :
2849 63862 : finish_edata(edata);
2850 :
2851 : /* Reset relation for error callback */
2852 63862 : apply_error_callback_arg.rel = NULL;
2853 :
2854 63862 : if (!run_as_owner)
2855 63858 : RestoreUserContext(&ucxt);
2856 :
2857 63862 : logicalrep_rel_close(rel, NoLock);
2858 :
2859 63862 : end_replication_step();
2860 : }
2861 :
2862 : /*
2863 : * Workhorse for apply_handle_update()
2864 : * relinfo is for the relation we're actually updating in
2865 : * (could be a child partition of edata->targetRelInfo)
2866 : */
2867 : static void
2868 63850 : apply_handle_update_internal(ApplyExecutionData *edata,
2869 : ResultRelInfo *relinfo,
2870 : TupleTableSlot *remoteslot,
2871 : LogicalRepTupleData *newtup,
2872 : Oid localindexoid)
2873 : {
2874 63850 : EState *estate = edata->estate;
2875 63850 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2876 63850 : Relation localrel = relinfo->ri_RelationDesc;
2877 : EPQState epqstate;
2878 63850 : TupleTableSlot *localslot = NULL;
2879 63850 : ConflictTupleInfo conflicttuple = {0};
2880 : bool found;
2881 : MemoryContext oldctx;
2882 :
2883 63850 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2884 63850 : ExecOpenIndices(relinfo, false);
2885 :
2886 63850 : found = FindReplTupleInLocalRel(edata, localrel,
2887 : &relmapentry->remoterel,
2888 : localindexoid,
2889 : remoteslot, &localslot);
2890 :
2891 : /*
2892 : * Tuple found.
2893 : *
2894 : * Note this will fail if there are other conflicting unique indexes.
2895 : */
2896 63840 : if (found)
2897 : {
2898 : /*
2899 : * Report the conflict if the tuple was modified by a different
2900 : * origin.
2901 : */
2902 63826 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
2903 4 : &conflicttuple.origin, &conflicttuple.ts) &&
2904 4 : conflicttuple.origin != replorigin_session_origin)
2905 : {
2906 : TupleTableSlot *newslot;
2907 :
2908 : /* Store the new tuple for conflict reporting */
2909 4 : newslot = table_slot_create(localrel, &estate->es_tupleTable);
2910 4 : slot_store_data(newslot, relmapentry, newtup);
2911 :
2912 4 : conflicttuple.slot = localslot;
2913 :
2914 4 : ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
2915 : remoteslot, newslot,
2916 4 : list_make1(&conflicttuple));
2917 : }
2918 :
2919 : /* Process and store remote tuple in the slot */
2920 63826 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2921 63826 : slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2922 63826 : MemoryContextSwitchTo(oldctx);
2923 :
2924 63826 : EvalPlanQualSetSlot(&epqstate, remoteslot);
2925 :
2926 63826 : InitConflictIndexes(relinfo);
2927 :
2928 : /* Do the actual update. */
2929 63826 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
2930 63826 : ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2931 : remoteslot);
2932 : }
2933 : else
2934 : {
2935 : ConflictType type;
2936 14 : TupleTableSlot *newslot = localslot;
2937 :
2938 : /*
2939 : * Detecting whether the tuple was recently deleted or never existed
2940 : * is crucial to avoid misleading the user during conflict handling.
2941 : */
2942 14 : if (FindDeletedTupleInLocalRel(localrel, localindexoid, remoteslot,
2943 : &conflicttuple.xmin,
2944 : &conflicttuple.origin,
2945 6 : &conflicttuple.ts) &&
2946 6 : conflicttuple.origin != replorigin_session_origin)
2947 6 : type = CT_UPDATE_DELETED;
2948 : else
2949 8 : type = CT_UPDATE_MISSING;
2950 :
2951 : /* Store the new tuple for conflict reporting */
2952 14 : slot_store_data(newslot, relmapentry, newtup);
2953 :
2954 : /*
2955 : * The tuple to be updated could not be found or was deleted. Do
2956 : * nothing except for emitting a log message.
2957 : */
2958 14 : ReportApplyConflict(estate, relinfo, LOG, type, remoteslot, newslot,
2959 14 : list_make1(&conflicttuple));
2960 : }
2961 :
2962 : /* Cleanup. */
2963 63836 : ExecCloseIndices(relinfo);
2964 63836 : EvalPlanQualEnd(&epqstate);
2965 63836 : }
2966 :
2967 : /*
2968 : * Handle DELETE message.
2969 : *
2970 : * TODO: FDW support
2971 : */
2972 : static void
2973 163872 : apply_handle_delete(StringInfo s)
2974 : {
2975 : LogicalRepRelMapEntry *rel;
2976 : LogicalRepTupleData oldtup;
2977 : LogicalRepRelId relid;
2978 : UserContext ucxt;
2979 : ApplyExecutionData *edata;
2980 : EState *estate;
2981 : TupleTableSlot *remoteslot;
2982 : MemoryContext oldctx;
2983 : bool run_as_owner;
2984 :
2985 : /*
2986 : * Quick return if we are skipping data modification changes or handling
2987 : * streamed transactions.
2988 : */
2989 327744 : if (is_skipping_changes() ||
2990 163872 : handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
2991 83230 : return;
2992 :
2993 80642 : begin_replication_step();
2994 :
2995 80642 : relid = logicalrep_read_delete(s, &oldtup);
2996 80642 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2997 80642 : if (!should_apply_changes_for_rel(rel))
2998 : {
2999 : /*
3000 : * The relation can't become interesting in the middle of the
3001 : * transaction so it's safe to unlock it.
3002 : */
3003 0 : logicalrep_rel_close(rel, RowExclusiveLock);
3004 0 : end_replication_step();
3005 0 : return;
3006 : }
3007 :
3008 : /* Set relation for error callback */
3009 80642 : apply_error_callback_arg.rel = rel;
3010 :
3011 : /* Check if we can do the delete. */
3012 80642 : check_relation_updatable(rel);
3013 :
3014 : /*
3015 : * Make sure that any user-supplied code runs as the table owner, unless
3016 : * the user has opted out of that behavior.
3017 : */
3018 80642 : run_as_owner = MySubscription->runasowner;
3019 80642 : if (!run_as_owner)
3020 80638 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
3021 :
3022 : /* Initialize the executor state. */
3023 80642 : edata = create_edata_for_relation(rel);
3024 80642 : estate = edata->estate;
3025 80642 : remoteslot = ExecInitExtraTupleSlot(estate,
3026 80642 : RelationGetDescr(rel->localrel),
3027 : &TTSOpsVirtual);
3028 :
3029 : /* Build the search tuple. */
3030 80642 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3031 80642 : slot_store_data(remoteslot, rel, &oldtup);
3032 80642 : MemoryContextSwitchTo(oldctx);
3033 :
3034 : /* For a partitioned table, apply delete to correct partition. */
3035 80642 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3036 34 : apply_handle_tuple_routing(edata,
3037 : remoteslot, NULL, CMD_DELETE);
3038 : else
3039 : {
3040 80608 : ResultRelInfo *relinfo = edata->targetRelInfo;
3041 :
3042 80608 : ExecOpenIndices(relinfo, false);
3043 80608 : apply_handle_delete_internal(edata, relinfo,
3044 : remoteslot, rel->localindexoid);
3045 80608 : ExecCloseIndices(relinfo);
3046 : }
3047 :
3048 80642 : finish_edata(edata);
3049 :
3050 : /* Reset relation for error callback */
3051 80642 : apply_error_callback_arg.rel = NULL;
3052 :
3053 80642 : if (!run_as_owner)
3054 80638 : RestoreUserContext(&ucxt);
3055 :
3056 80642 : logicalrep_rel_close(rel, NoLock);
3057 :
3058 80642 : end_replication_step();
3059 : }
3060 :
3061 : /*
3062 : * Workhorse for apply_handle_delete()
3063 : * relinfo is for the relation we're actually deleting from
3064 : * (could be a child partition of edata->targetRelInfo)
3065 : */
3066 : static void
3067 80642 : apply_handle_delete_internal(ApplyExecutionData *edata,
3068 : ResultRelInfo *relinfo,
3069 : TupleTableSlot *remoteslot,
3070 : Oid localindexoid)
3071 : {
3072 80642 : EState *estate = edata->estate;
3073 80642 : Relation localrel = relinfo->ri_RelationDesc;
3074 80642 : LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
3075 : EPQState epqstate;
3076 : TupleTableSlot *localslot;
3077 80642 : ConflictTupleInfo conflicttuple = {0};
3078 : bool found;
3079 :
3080 80642 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3081 :
3082 : /* Caller should have opened indexes already. */
3083 : Assert(relinfo->ri_IndexRelationDescs != NULL ||
3084 : !localrel->rd_rel->relhasindex ||
3085 : RelationGetIndexList(localrel) == NIL);
3086 :
3087 80642 : found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
3088 : remoteslot, &localslot);
3089 :
3090 : /* If found delete it. */
3091 80642 : if (found)
3092 : {
3093 : /*
3094 : * Report the conflict if the tuple was modified by a different
3095 : * origin.
3096 : */
3097 80624 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3098 12 : &conflicttuple.origin, &conflicttuple.ts) &&
3099 12 : conflicttuple.origin != replorigin_session_origin)
3100 : {
3101 10 : conflicttuple.slot = localslot;
3102 10 : ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
3103 : remoteslot, NULL,
3104 10 : list_make1(&conflicttuple));
3105 : }
3106 :
3107 80624 : EvalPlanQualSetSlot(&epqstate, localslot);
3108 :
3109 : /* Do the actual delete. */
3110 80624 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE);
3111 80624 : ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
3112 : }
3113 : else
3114 : {
3115 : /*
3116 : * The tuple to be deleted could not be found. Do nothing except for
3117 : * emitting a log message.
3118 : */
3119 18 : ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
3120 18 : remoteslot, NULL, list_make1(&conflicttuple));
3121 : }
3122 :
3123 : /* Cleanup. */
3124 80642 : EvalPlanQualEnd(&epqstate);
3125 80642 : }
3126 :
3127 : /*
3128 : * Try to find a tuple received from the publication side (in 'remoteslot') in
3129 : * the corresponding local relation using either replica identity index,
3130 : * primary key, index or if needed, sequential scan.
3131 : *
3132 : * Local tuple, if found, is returned in '*localslot'.
3133 : */
3134 : static bool
3135 144518 : FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
3136 : LogicalRepRelation *remoterel,
3137 : Oid localidxoid,
3138 : TupleTableSlot *remoteslot,
3139 : TupleTableSlot **localslot)
3140 : {
3141 144518 : EState *estate = edata->estate;
3142 : bool found;
3143 :
3144 : /*
3145 : * Regardless of the top-level operation, we're performing a read here, so
3146 : * check for SELECT privileges.
3147 : */
3148 144518 : TargetPrivilegesCheck(localrel, ACL_SELECT);
3149 :
3150 144508 : *localslot = table_slot_create(localrel, &estate->es_tupleTable);
3151 :
3152 : Assert(OidIsValid(localidxoid) ||
3153 : (remoterel->replident == REPLICA_IDENTITY_FULL));
3154 :
3155 144508 : if (OidIsValid(localidxoid))
3156 : {
3157 : #ifdef USE_ASSERT_CHECKING
3158 : Relation idxrel = index_open(localidxoid, AccessShareLock);
3159 :
3160 : /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
3161 : Assert(GetRelationIdentityOrPK(localrel) == localidxoid ||
3162 : (remoterel->replident == REPLICA_IDENTITY_FULL &&
3163 : IsIndexUsableForReplicaIdentityFull(idxrel,
3164 : edata->targetRel->attrmap)));
3165 : index_close(idxrel, AccessShareLock);
3166 : #endif
3167 :
3168 144206 : found = RelationFindReplTupleByIndex(localrel, localidxoid,
3169 : LockTupleExclusive,
3170 : remoteslot, *localslot);
3171 : }
3172 : else
3173 302 : found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
3174 : remoteslot, *localslot);
3175 :
3176 144508 : return found;
3177 : }
3178 :
3179 : /*
3180 : * Determine whether the index can reliably locate the deleted tuple in the
3181 : * local relation.
3182 : *
3183 : * An index may exclude deleted tuples if it was re-indexed or re-created during
3184 : * change application. Therefore, an index is considered usable only if the
3185 : * conflict detection slot.xmin (conflict_detection_xmin) is greater than the
3186 : * index tuple's xmin. This ensures that any tuples deleted prior to the index
3187 : * creation or re-indexing are not relevant for conflict detection in the
3188 : * current apply worker.
3189 : *
3190 : * Note that indexes may also be excluded if they were modified by other DDL
3191 : * operations, such as ALTER INDEX. However, this is acceptable, as the
3192 : * likelihood of such DDL changes coinciding with the need to scan dead
3193 : * tuples for the update_deleted is low.
3194 : */
3195 : static bool
3196 2 : IsIndexUsableForFindingDeletedTuple(Oid localindexoid,
3197 : TransactionId conflict_detection_xmin)
3198 : {
3199 : HeapTuple index_tuple;
3200 : TransactionId index_xmin;
3201 :
3202 2 : index_tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(localindexoid));
3203 :
3204 2 : if (!HeapTupleIsValid(index_tuple)) /* should not happen */
3205 0 : elog(ERROR, "cache lookup failed for index %u", localindexoid);
3206 :
3207 : /*
3208 : * No need to check for a frozen transaction ID, as
3209 : * TransactionIdPrecedes() manages it internally, treating it as falling
3210 : * behind the conflict_detection_xmin.
3211 : */
3212 2 : index_xmin = HeapTupleHeaderGetXmin(index_tuple->t_data);
3213 :
3214 2 : ReleaseSysCache(index_tuple);
3215 :
3216 2 : return TransactionIdPrecedes(index_xmin, conflict_detection_xmin);
3217 : }
3218 :
3219 : /*
3220 : * Attempts to locate a deleted tuple in the local relation that matches the
3221 : * values of the tuple received from the publication side (in 'remoteslot').
3222 : * The search is performed using either the replica identity index, primary
3223 : * key, other available index, or a sequential scan if necessary.
3224 : *
3225 : * Returns true if the deleted tuple is found. If found, the transaction ID,
3226 : * origin, and commit timestamp of the deletion are stored in '*delete_xid',
3227 : * '*delete_origin', and '*delete_time' respectively.
3228 : */
3229 : static bool
3230 18 : FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid,
3231 : TupleTableSlot *remoteslot,
3232 : TransactionId *delete_xid, RepOriginId *delete_origin,
3233 : TimestampTz *delete_time)
3234 : {
3235 : TransactionId oldestxmin;
3236 :
3237 : /*
3238 : * Return false if either dead tuples are not retained or commit timestamp
3239 : * data is not available.
3240 : */
3241 18 : if (!MySubscription->retaindeadtuples || !track_commit_timestamp)
3242 12 : return false;
3243 :
3244 : /*
3245 : * For conflict detection, we use the leader worker's
3246 : * oldest_nonremovable_xid value instead of invoking
3247 : * GetOldestNonRemovableTransactionId() or using the conflict detection
3248 : * slot's xmin. The oldest_nonremovable_xid acts as a threshold to
3249 : * identify tuples that were recently deleted. These deleted tuples are no
3250 : * longer visible to concurrent transactions. However, if a remote update
3251 : * matches such a tuple, we log an update_deleted conflict.
3252 : *
3253 : * While GetOldestNonRemovableTransactionId() and slot.xmin may return
3254 : * transaction IDs older than oldest_nonremovable_xid, for our current
3255 : * purpose, it is acceptable to treat tuples deleted by transactions prior
3256 : * to oldest_nonremovable_xid as update_missing conflicts.
3257 : */
3258 6 : if (am_leader_apply_worker())
3259 : {
3260 6 : oldestxmin = MyLogicalRepWorker->oldest_nonremovable_xid;
3261 : }
3262 : else
3263 : {
3264 : LogicalRepWorker *leader;
3265 :
3266 : /*
3267 : * Obtain the information from the leader apply worker as only the
3268 : * leader manages oldest_nonremovable_xid (see
3269 : * maybe_advance_nonremovable_xid() for details).
3270 : */
3271 0 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
3272 0 : leader = logicalrep_worker_find(MyLogicalRepWorker->subid,
3273 : InvalidOid, false);
3274 0 : if (!leader)
3275 : {
3276 0 : ereport(ERROR,
3277 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3278 : errmsg("could not detect conflict as the leader apply worker has exited")));
3279 : }
3280 :
3281 0 : SpinLockAcquire(&leader->relmutex);
3282 0 : oldestxmin = leader->oldest_nonremovable_xid;
3283 0 : SpinLockRelease(&leader->relmutex);
3284 0 : LWLockRelease(LogicalRepWorkerLock);
3285 : }
3286 :
3287 : /*
3288 : * Return false if the leader apply worker has stopped retaining
3289 : * information for detecting conflicts. This implies that update_deleted
3290 : * can no longer be reliably detected.
3291 : */
3292 6 : if (!TransactionIdIsValid(oldestxmin))
3293 0 : return false;
3294 :
3295 8 : if (OidIsValid(localidxoid) &&
3296 2 : IsIndexUsableForFindingDeletedTuple(localidxoid, oldestxmin))
3297 2 : return RelationFindDeletedTupleInfoByIndex(localrel, localidxoid,
3298 : remoteslot, oldestxmin,
3299 : delete_xid, delete_origin,
3300 : delete_time);
3301 : else
3302 4 : return RelationFindDeletedTupleInfoSeq(localrel, remoteslot,
3303 : oldestxmin, delete_xid,
3304 : delete_origin, delete_time);
3305 : }
3306 :
3307 : /*
3308 : * This handles insert, update, delete on a partitioned table.
3309 : */
3310 : static void
3311 192 : apply_handle_tuple_routing(ApplyExecutionData *edata,
3312 : TupleTableSlot *remoteslot,
3313 : LogicalRepTupleData *newtup,
3314 : CmdType operation)
3315 : {
3316 192 : EState *estate = edata->estate;
3317 192 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
3318 192 : ResultRelInfo *relinfo = edata->targetRelInfo;
3319 192 : Relation parentrel = relinfo->ri_RelationDesc;
3320 : ModifyTableState *mtstate;
3321 : PartitionTupleRouting *proute;
3322 : ResultRelInfo *partrelinfo;
3323 : Relation partrel;
3324 : TupleTableSlot *remoteslot_part;
3325 : TupleConversionMap *map;
3326 : MemoryContext oldctx;
3327 192 : LogicalRepRelMapEntry *part_entry = NULL;
3328 192 : AttrMap *attrmap = NULL;
3329 :
3330 : /* ModifyTableState is needed for ExecFindPartition(). */
3331 192 : edata->mtstate = mtstate = makeNode(ModifyTableState);
3332 192 : mtstate->ps.plan = NULL;
3333 192 : mtstate->ps.state = estate;
3334 192 : mtstate->operation = operation;
3335 192 : mtstate->resultRelInfo = relinfo;
3336 :
3337 : /* ... as is PartitionTupleRouting. */
3338 192 : edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
3339 :
3340 : /*
3341 : * Find the partition to which the "search tuple" belongs.
3342 : */
3343 : Assert(remoteslot != NULL);
3344 192 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3345 192 : partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
3346 : remoteslot, estate);
3347 : Assert(partrelinfo != NULL);
3348 192 : partrel = partrelinfo->ri_RelationDesc;
3349 :
3350 : /*
3351 : * Check for supported relkind. We need this since partitions might be of
3352 : * unsupported relkinds; and the set of partitions can change, so checking
3353 : * at CREATE/ALTER SUBSCRIPTION would be insufficient.
3354 : */
3355 192 : CheckSubscriptionRelkind(partrel->rd_rel->relkind,
3356 192 : get_namespace_name(RelationGetNamespace(partrel)),
3357 192 : RelationGetRelationName(partrel));
3358 :
3359 : /*
3360 : * To perform any of the operations below, the tuple must match the
3361 : * partition's rowtype. Convert if needed or just copy, using a dedicated
3362 : * slot to store the tuple in any case.
3363 : */
3364 192 : remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
3365 192 : if (remoteslot_part == NULL)
3366 126 : remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
3367 192 : map = ExecGetRootToChildMap(partrelinfo, estate);
3368 192 : if (map != NULL)
3369 : {
3370 66 : attrmap = map->attrMap;
3371 66 : remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
3372 : remoteslot_part);
3373 : }
3374 : else
3375 : {
3376 126 : remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
3377 126 : slot_getallattrs(remoteslot_part);
3378 : }
3379 192 : MemoryContextSwitchTo(oldctx);
3380 :
3381 : /* Check if we can do the update or delete on the leaf partition. */
3382 192 : if (operation == CMD_UPDATE || operation == CMD_DELETE)
3383 : {
3384 60 : part_entry = logicalrep_partition_open(relmapentry, partrel,
3385 : attrmap);
3386 60 : check_relation_updatable(part_entry);
3387 : }
3388 :
3389 192 : switch (operation)
3390 : {
3391 132 : case CMD_INSERT:
3392 132 : apply_handle_insert_internal(edata, partrelinfo,
3393 : remoteslot_part);
3394 88 : break;
3395 :
3396 34 : case CMD_DELETE:
3397 34 : apply_handle_delete_internal(edata, partrelinfo,
3398 : remoteslot_part,
3399 : part_entry->localindexoid);
3400 34 : break;
3401 :
3402 26 : case CMD_UPDATE:
3403 :
3404 : /*
3405 : * For UPDATE, depending on whether or not the updated tuple
3406 : * satisfies the partition's constraint, perform a simple UPDATE
3407 : * of the partition or move the updated tuple into a different
3408 : * suitable partition.
3409 : */
3410 : {
3411 : TupleTableSlot *localslot;
3412 : ResultRelInfo *partrelinfo_new;
3413 : Relation partrel_new;
3414 : bool found;
3415 : EPQState epqstate;
3416 26 : ConflictTupleInfo conflicttuple = {0};
3417 :
3418 : /* Get the matching local tuple from the partition. */
3419 26 : found = FindReplTupleInLocalRel(edata, partrel,
3420 : &part_entry->remoterel,
3421 : part_entry->localindexoid,
3422 : remoteslot_part, &localslot);
3423 26 : if (!found)
3424 : {
3425 : ConflictType type;
3426 4 : TupleTableSlot *newslot = localslot;
3427 :
3428 : /*
3429 : * Detecting whether the tuple was recently deleted or
3430 : * never existed is crucial to avoid misleading the user
3431 : * during conflict handling.
3432 : */
3433 4 : if (FindDeletedTupleInLocalRel(partrel,
3434 : part_entry->localindexoid,
3435 : remoteslot_part,
3436 : &conflicttuple.xmin,
3437 : &conflicttuple.origin,
3438 0 : &conflicttuple.ts) &&
3439 0 : conflicttuple.origin != replorigin_session_origin)
3440 0 : type = CT_UPDATE_DELETED;
3441 : else
3442 4 : type = CT_UPDATE_MISSING;
3443 :
3444 : /* Store the new tuple for conflict reporting */
3445 4 : slot_store_data(newslot, part_entry, newtup);
3446 :
3447 : /*
3448 : * The tuple to be updated could not be found or was
3449 : * deleted. Do nothing except for emitting a log message.
3450 : */
3451 4 : ReportApplyConflict(estate, partrelinfo, LOG,
3452 : type, remoteslot_part, newslot,
3453 4 : list_make1(&conflicttuple));
3454 :
3455 4 : return;
3456 : }
3457 :
3458 : /*
3459 : * Report the conflict if the tuple was modified by a
3460 : * different origin.
3461 : */
3462 22 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3463 : &conflicttuple.origin,
3464 2 : &conflicttuple.ts) &&
3465 2 : conflicttuple.origin != replorigin_session_origin)
3466 : {
3467 : TupleTableSlot *newslot;
3468 :
3469 : /* Store the new tuple for conflict reporting */
3470 2 : newslot = table_slot_create(partrel, &estate->es_tupleTable);
3471 2 : slot_store_data(newslot, part_entry, newtup);
3472 :
3473 2 : conflicttuple.slot = localslot;
3474 :
3475 2 : ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
3476 : remoteslot_part, newslot,
3477 2 : list_make1(&conflicttuple));
3478 : }
3479 :
3480 : /*
3481 : * Apply the update to the local tuple, putting the result in
3482 : * remoteslot_part.
3483 : */
3484 22 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3485 22 : slot_modify_data(remoteslot_part, localslot, part_entry,
3486 : newtup);
3487 22 : MemoryContextSwitchTo(oldctx);
3488 :
3489 22 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3490 :
3491 : /*
3492 : * Does the updated tuple still satisfy the current
3493 : * partition's constraint?
3494 : */
3495 44 : if (!partrel->rd_rel->relispartition ||
3496 22 : ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3497 : false))
3498 : {
3499 : /*
3500 : * Yes, so simply UPDATE the partition. We don't call
3501 : * apply_handle_update_internal() here, which would
3502 : * normally do the following work, to avoid repeating some
3503 : * work already done above to find the local tuple in the
3504 : * partition.
3505 : */
3506 20 : InitConflictIndexes(partrelinfo);
3507 :
3508 20 : EvalPlanQualSetSlot(&epqstate, remoteslot_part);
3509 20 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
3510 : ACL_UPDATE);
3511 20 : ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3512 : localslot, remoteslot_part);
3513 : }
3514 : else
3515 : {
3516 : /* Move the tuple into the new partition. */
3517 :
3518 : /*
3519 : * New partition will be found using tuple routing, which
3520 : * can only occur via the parent table. We might need to
3521 : * convert the tuple to the parent's rowtype. Note that
3522 : * this is the tuple found in the partition, not the
3523 : * original search tuple received by this function.
3524 : */
3525 2 : if (map)
3526 : {
3527 : TupleConversionMap *PartitionToRootMap =
3528 2 : convert_tuples_by_name(RelationGetDescr(partrel),
3529 : RelationGetDescr(parentrel));
3530 :
3531 : remoteslot =
3532 2 : execute_attr_map_slot(PartitionToRootMap->attrMap,
3533 : remoteslot_part, remoteslot);
3534 : }
3535 : else
3536 : {
3537 0 : remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3538 0 : slot_getallattrs(remoteslot);
3539 : }
3540 :
3541 : /* Find the new partition. */
3542 2 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3543 2 : partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3544 : proute, remoteslot,
3545 : estate);
3546 2 : MemoryContextSwitchTo(oldctx);
3547 : Assert(partrelinfo_new != partrelinfo);
3548 2 : partrel_new = partrelinfo_new->ri_RelationDesc;
3549 :
3550 : /* Check that new partition also has supported relkind. */
3551 2 : CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3552 2 : get_namespace_name(RelationGetNamespace(partrel_new)),
3553 2 : RelationGetRelationName(partrel_new));
3554 :
3555 : /* DELETE old tuple found in the old partition. */
3556 2 : EvalPlanQualSetSlot(&epqstate, localslot);
3557 2 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, ACL_DELETE);
3558 2 : ExecSimpleRelationDelete(partrelinfo, estate, &epqstate, localslot);
3559 :
3560 : /* INSERT new tuple into the new partition. */
3561 :
3562 : /*
3563 : * Convert the replacement tuple to match the destination
3564 : * partition rowtype.
3565 : */
3566 2 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3567 2 : remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3568 2 : if (remoteslot_part == NULL)
3569 2 : remoteslot_part = table_slot_create(partrel_new,
3570 : &estate->es_tupleTable);
3571 2 : map = ExecGetRootToChildMap(partrelinfo_new, estate);
3572 2 : if (map != NULL)
3573 : {
3574 0 : remoteslot_part = execute_attr_map_slot(map->attrMap,
3575 : remoteslot,
3576 : remoteslot_part);
3577 : }
3578 : else
3579 : {
3580 2 : remoteslot_part = ExecCopySlot(remoteslot_part,
3581 : remoteslot);
3582 2 : slot_getallattrs(remoteslot);
3583 : }
3584 2 : MemoryContextSwitchTo(oldctx);
3585 2 : apply_handle_insert_internal(edata, partrelinfo_new,
3586 : remoteslot_part);
3587 : }
3588 :
3589 22 : EvalPlanQualEnd(&epqstate);
3590 : }
3591 22 : break;
3592 :
3593 0 : default:
3594 0 : elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3595 : break;
3596 : }
3597 : }
3598 :
3599 : /*
3600 : * Handle TRUNCATE message.
3601 : *
3602 : * TODO: FDW support
3603 : */
3604 : static void
3605 40 : apply_handle_truncate(StringInfo s)
3606 : {
3607 40 : bool cascade = false;
3608 40 : bool restart_seqs = false;
3609 40 : List *remote_relids = NIL;
3610 40 : List *remote_rels = NIL;
3611 40 : List *rels = NIL;
3612 40 : List *part_rels = NIL;
3613 40 : List *relids = NIL;
3614 40 : List *relids_logged = NIL;
3615 : ListCell *lc;
3616 40 : LOCKMODE lockmode = AccessExclusiveLock;
3617 :
3618 : /*
3619 : * Quick return if we are skipping data modification changes or handling
3620 : * streamed transactions.
3621 : */
3622 80 : if (is_skipping_changes() ||
3623 40 : handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
3624 0 : return;
3625 :
3626 40 : begin_replication_step();
3627 :
3628 40 : remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3629 :
3630 98 : foreach(lc, remote_relids)
3631 : {
3632 58 : LogicalRepRelId relid = lfirst_oid(lc);
3633 : LogicalRepRelMapEntry *rel;
3634 :
3635 58 : rel = logicalrep_rel_open(relid, lockmode);
3636 58 : if (!should_apply_changes_for_rel(rel))
3637 : {
3638 : /*
3639 : * The relation can't become interesting in the middle of the
3640 : * transaction so it's safe to unlock it.
3641 : */
3642 0 : logicalrep_rel_close(rel, lockmode);
3643 0 : continue;
3644 : }
3645 :
3646 58 : remote_rels = lappend(remote_rels, rel);
3647 58 : TargetPrivilegesCheck(rel->localrel, ACL_TRUNCATE);
3648 58 : rels = lappend(rels, rel->localrel);
3649 58 : relids = lappend_oid(relids, rel->localreloid);
3650 58 : if (RelationIsLogicallyLogged(rel->localrel))
3651 2 : relids_logged = lappend_oid(relids_logged, rel->localreloid);
3652 :
3653 : /*
3654 : * Truncate partitions if we got a message to truncate a partitioned
3655 : * table.
3656 : */
3657 58 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3658 : {
3659 : ListCell *child;
3660 8 : List *children = find_all_inheritors(rel->localreloid,
3661 : lockmode,
3662 : NULL);
3663 :
3664 30 : foreach(child, children)
3665 : {
3666 22 : Oid childrelid = lfirst_oid(child);
3667 : Relation childrel;
3668 :
3669 22 : if (list_member_oid(relids, childrelid))
3670 8 : continue;
3671 :
3672 : /* find_all_inheritors already got lock */
3673 14 : childrel = table_open(childrelid, NoLock);
3674 :
3675 : /*
3676 : * Ignore temp tables of other backends. See similar code in
3677 : * ExecuteTruncate().
3678 : */
3679 14 : if (RELATION_IS_OTHER_TEMP(childrel))
3680 : {
3681 0 : table_close(childrel, lockmode);
3682 0 : continue;
3683 : }
3684 :
3685 14 : TargetPrivilegesCheck(childrel, ACL_TRUNCATE);
3686 14 : rels = lappend(rels, childrel);
3687 14 : part_rels = lappend(part_rels, childrel);
3688 14 : relids = lappend_oid(relids, childrelid);
3689 : /* Log this relation only if needed for logical decoding */
3690 14 : if (RelationIsLogicallyLogged(childrel))
3691 0 : relids_logged = lappend_oid(relids_logged, childrelid);
3692 : }
3693 : }
3694 : }
3695 :
3696 : /*
3697 : * Even if we used CASCADE on the upstream primary we explicitly default
3698 : * to replaying changes without further cascading. This might be later
3699 : * changeable with a user specified option.
3700 : *
3701 : * MySubscription->runasowner tells us whether we want to execute
3702 : * replication actions as the subscription owner; the last argument to
3703 : * TruncateGuts tells it whether we want to switch to the table owner.
3704 : * Those are exactly opposite conditions.
3705 : */
3706 40 : ExecuteTruncateGuts(rels,
3707 : relids,
3708 : relids_logged,
3709 : DROP_RESTRICT,
3710 : restart_seqs,
3711 40 : !MySubscription->runasowner);
3712 98 : foreach(lc, remote_rels)
3713 : {
3714 58 : LogicalRepRelMapEntry *rel = lfirst(lc);
3715 :
3716 58 : logicalrep_rel_close(rel, NoLock);
3717 : }
3718 54 : foreach(lc, part_rels)
3719 : {
3720 14 : Relation rel = lfirst(lc);
3721 :
3722 14 : table_close(rel, NoLock);
3723 : }
3724 :
3725 40 : end_replication_step();
3726 : }
3727 :
3728 :
3729 : /*
3730 : * Logical replication protocol message dispatcher.
3731 : */
3732 : void
3733 674992 : apply_dispatch(StringInfo s)
3734 : {
3735 674992 : LogicalRepMsgType action = pq_getmsgbyte(s);
3736 : LogicalRepMsgType saved_command;
3737 :
3738 : /*
3739 : * Set the current command being applied. Since this function can be
3740 : * called recursively when applying spooled changes, save the current
3741 : * command.
3742 : */
3743 674992 : saved_command = apply_error_callback_arg.command;
3744 674992 : apply_error_callback_arg.command = action;
3745 :
3746 674992 : switch (action)
3747 : {
3748 972 : case LOGICAL_REP_MSG_BEGIN:
3749 972 : apply_handle_begin(s);
3750 972 : break;
3751 :
3752 866 : case LOGICAL_REP_MSG_COMMIT:
3753 866 : apply_handle_commit(s);
3754 866 : break;
3755 :
3756 372254 : case LOGICAL_REP_MSG_INSERT:
3757 372254 : apply_handle_insert(s);
3758 372162 : break;
3759 :
3760 132326 : case LOGICAL_REP_MSG_UPDATE:
3761 132326 : apply_handle_update(s);
3762 132308 : break;
3763 :
3764 163872 : case LOGICAL_REP_MSG_DELETE:
3765 163872 : apply_handle_delete(s);
3766 163872 : break;
3767 :
3768 40 : case LOGICAL_REP_MSG_TRUNCATE:
3769 40 : apply_handle_truncate(s);
3770 40 : break;
3771 :
3772 936 : case LOGICAL_REP_MSG_RELATION:
3773 936 : apply_handle_relation(s);
3774 936 : break;
3775 :
3776 36 : case LOGICAL_REP_MSG_TYPE:
3777 36 : apply_handle_type(s);
3778 36 : break;
3779 :
3780 16 : case LOGICAL_REP_MSG_ORIGIN:
3781 16 : apply_handle_origin(s);
3782 16 : break;
3783 :
3784 0 : case LOGICAL_REP_MSG_MESSAGE:
3785 :
3786 : /*
3787 : * Logical replication does not use generic logical messages yet.
3788 : * Although, it could be used by other applications that use this
3789 : * output plugin.
3790 : */
3791 0 : break;
3792 :
3793 1672 : case LOGICAL_REP_MSG_STREAM_START:
3794 1672 : apply_handle_stream_start(s);
3795 1672 : break;
3796 :
3797 1670 : case LOGICAL_REP_MSG_STREAM_STOP:
3798 1670 : apply_handle_stream_stop(s);
3799 1666 : break;
3800 :
3801 76 : case LOGICAL_REP_MSG_STREAM_ABORT:
3802 76 : apply_handle_stream_abort(s);
3803 76 : break;
3804 :
3805 122 : case LOGICAL_REP_MSG_STREAM_COMMIT:
3806 122 : apply_handle_stream_commit(s);
3807 118 : break;
3808 :
3809 32 : case LOGICAL_REP_MSG_BEGIN_PREPARE:
3810 32 : apply_handle_begin_prepare(s);
3811 32 : break;
3812 :
3813 30 : case LOGICAL_REP_MSG_PREPARE:
3814 30 : apply_handle_prepare(s);
3815 28 : break;
3816 :
3817 40 : case LOGICAL_REP_MSG_COMMIT_PREPARED:
3818 40 : apply_handle_commit_prepared(s);
3819 40 : break;
3820 :
3821 10 : case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
3822 10 : apply_handle_rollback_prepared(s);
3823 10 : break;
3824 :
3825 22 : case LOGICAL_REP_MSG_STREAM_PREPARE:
3826 22 : apply_handle_stream_prepare(s);
3827 22 : break;
3828 :
3829 0 : default:
3830 0 : ereport(ERROR,
3831 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
3832 : errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3833 : }
3834 :
3835 : /* Reset the current command */
3836 674872 : apply_error_callback_arg.command = saved_command;
3837 674872 : }
3838 :
3839 : /*
3840 : * Figure out which write/flush positions to report to the walsender process.
3841 : *
3842 : * We can't simply report back the last LSN the walsender sent us because the
3843 : * local transaction might not yet be flushed to disk locally. Instead we
3844 : * build a list that associates local with remote LSNs for every commit. When
3845 : * reporting back the flush position to the sender we iterate that list and
3846 : * check which entries on it are already locally flushed. Those we can report
3847 : * as having been flushed.
3848 : *
3849 : * The have_pending_txes is true if there are outstanding transactions that
3850 : * need to be flushed.
3851 : */
3852 : static void
3853 145876 : get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
3854 : bool *have_pending_txes)
3855 : {
3856 : dlist_mutable_iter iter;
3857 145876 : XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3858 :
3859 145876 : *write = InvalidXLogRecPtr;
3860 145876 : *flush = InvalidXLogRecPtr;
3861 :
3862 146872 : dlist_foreach_modify(iter, &lsn_mapping)
3863 : {
3864 16270 : FlushPosition *pos =
3865 16270 : dlist_container(FlushPosition, node, iter.cur);
3866 :
3867 16270 : *write = pos->remote_end;
3868 :
3869 16270 : if (pos->local_end <= local_flush)
3870 : {
3871 996 : *flush = pos->remote_end;
3872 996 : dlist_delete(iter.cur);
3873 996 : pfree(pos);
3874 : }
3875 : else
3876 : {
3877 : /*
3878 : * Don't want to uselessly iterate over the rest of the list which
3879 : * could potentially be long. Instead get the last element and
3880 : * grab the write position from there.
3881 : */
3882 15274 : pos = dlist_tail_element(FlushPosition, node,
3883 : &lsn_mapping);
3884 15274 : *write = pos->remote_end;
3885 15274 : *have_pending_txes = true;
3886 15274 : return;
3887 : }
3888 : }
3889 :
3890 130602 : *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3891 : }
3892 :
3893 : /*
3894 : * Store current remote/local lsn pair in the tracking list.
3895 : */
3896 : void
3897 1078 : store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
3898 : {
3899 : FlushPosition *flushpos;
3900 :
3901 : /*
3902 : * Skip for parallel apply workers, because the lsn_mapping is maintained
3903 : * by the leader apply worker.
3904 : */
3905 1078 : if (am_parallel_apply_worker())
3906 38 : return;
3907 :
3908 : /* Need to do this in permanent context */
3909 1040 : MemoryContextSwitchTo(ApplyContext);
3910 :
3911 : /* Track commit lsn */
3912 1040 : flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
3913 1040 : flushpos->local_end = local_lsn;
3914 1040 : flushpos->remote_end = remote_lsn;
3915 :
3916 1040 : dlist_push_tail(&lsn_mapping, &flushpos->node);
3917 1040 : MemoryContextSwitchTo(ApplyMessageContext);
3918 : }
3919 :
3920 :
3921 : /* Update statistics of the worker. */
3922 : static void
3923 387970 : UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
3924 : {
3925 387970 : MyLogicalRepWorker->last_lsn = last_lsn;
3926 387970 : MyLogicalRepWorker->last_send_time = send_time;
3927 387970 : MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
3928 387970 : if (reply)
3929 : {
3930 3250 : MyLogicalRepWorker->reply_lsn = last_lsn;
3931 3250 : MyLogicalRepWorker->reply_time = send_time;
3932 : }
3933 387970 : }
3934 :
3935 : /*
3936 : * Apply main loop.
3937 : */
3938 : static void
3939 816 : LogicalRepApplyLoop(XLogRecPtr last_received)
3940 : {
3941 816 : TimestampTz last_recv_timestamp = GetCurrentTimestamp();
3942 816 : bool ping_sent = false;
3943 : TimeLineID tli;
3944 : ErrorContextCallback errcallback;
3945 816 : RetainDeadTuplesData rdt_data = {0};
3946 :
3947 : /*
3948 : * Init the ApplyMessageContext which we clean up after each replication
3949 : * protocol message.
3950 : */
3951 816 : ApplyMessageContext = AllocSetContextCreate(ApplyContext,
3952 : "ApplyMessageContext",
3953 : ALLOCSET_DEFAULT_SIZES);
3954 :
3955 : /*
3956 : * This memory context is used for per-stream data when the streaming mode
3957 : * is enabled. This context is reset on each stream stop.
3958 : */
3959 816 : LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
3960 : "LogicalStreamingContext",
3961 : ALLOCSET_DEFAULT_SIZES);
3962 :
3963 : /* mark as idle, before starting to loop */
3964 816 : pgstat_report_activity(STATE_IDLE, NULL);
3965 :
3966 : /*
3967 : * Push apply error context callback. Fields will be filled while applying
3968 : * a change.
3969 : */
3970 816 : errcallback.callback = apply_error_callback;
3971 816 : errcallback.previous = error_context_stack;
3972 816 : error_context_stack = &errcallback;
3973 816 : apply_error_context_stack = error_context_stack;
3974 :
3975 : /* This outer loop iterates once per wait. */
3976 : for (;;)
3977 141524 : {
3978 142340 : pgsocket fd = PGINVALID_SOCKET;
3979 : int rc;
3980 : int len;
3981 142340 : char *buf = NULL;
3982 142340 : bool endofstream = false;
3983 : long wait_time;
3984 :
3985 142340 : CHECK_FOR_INTERRUPTS();
3986 :
3987 142340 : MemoryContextSwitchTo(ApplyMessageContext);
3988 :
3989 142340 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
3990 :
3991 142310 : if (len != 0)
3992 : {
3993 : /* Loop to process all available data (without blocking). */
3994 : for (;;)
3995 : {
3996 527468 : CHECK_FOR_INTERRUPTS();
3997 :
3998 527468 : if (len == 0)
3999 : {
4000 139478 : break;
4001 : }
4002 387990 : else if (len < 0)
4003 : {
4004 20 : ereport(LOG,
4005 : (errmsg("data stream from publisher has ended")));
4006 20 : endofstream = true;
4007 20 : break;
4008 : }
4009 : else
4010 : {
4011 : int c;
4012 : StringInfoData s;
4013 :
4014 387970 : if (ConfigReloadPending)
4015 : {
4016 0 : ConfigReloadPending = false;
4017 0 : ProcessConfigFile(PGC_SIGHUP);
4018 : }
4019 :
4020 : /* Reset timeout. */
4021 387970 : last_recv_timestamp = GetCurrentTimestamp();
4022 387970 : ping_sent = false;
4023 :
4024 387970 : rdt_data.last_recv_time = last_recv_timestamp;
4025 :
4026 : /* Ensure we are reading the data into our memory context. */
4027 387970 : MemoryContextSwitchTo(ApplyMessageContext);
4028 :
4029 387970 : initReadOnlyStringInfo(&s, buf, len);
4030 :
4031 387970 : c = pq_getmsgbyte(&s);
4032 :
4033 387970 : if (c == PqReplMsg_WALData)
4034 : {
4035 : XLogRecPtr start_lsn;
4036 : XLogRecPtr end_lsn;
4037 : TimestampTz send_time;
4038 :
4039 369782 : start_lsn = pq_getmsgint64(&s);
4040 369782 : end_lsn = pq_getmsgint64(&s);
4041 369782 : send_time = pq_getmsgint64(&s);
4042 :
4043 369782 : if (last_received < start_lsn)
4044 298812 : last_received = start_lsn;
4045 :
4046 369782 : if (last_received < end_lsn)
4047 0 : last_received = end_lsn;
4048 :
4049 369782 : UpdateWorkerStats(last_received, send_time, false);
4050 :
4051 369782 : apply_dispatch(&s);
4052 :
4053 369668 : maybe_advance_nonremovable_xid(&rdt_data, false);
4054 : }
4055 18188 : else if (c == PqReplMsg_Keepalive)
4056 : {
4057 : XLogRecPtr end_lsn;
4058 : TimestampTz timestamp;
4059 : bool reply_requested;
4060 :
4061 3250 : end_lsn = pq_getmsgint64(&s);
4062 3250 : timestamp = pq_getmsgint64(&s);
4063 3250 : reply_requested = pq_getmsgbyte(&s);
4064 :
4065 3250 : if (last_received < end_lsn)
4066 1938 : last_received = end_lsn;
4067 :
4068 3250 : send_feedback(last_received, reply_requested, false);
4069 :
4070 3250 : maybe_advance_nonremovable_xid(&rdt_data, false);
4071 :
4072 3250 : UpdateWorkerStats(last_received, timestamp, true);
4073 : }
4074 14938 : else if (c == PqReplMsg_PrimaryStatusUpdate)
4075 : {
4076 14938 : rdt_data.remote_lsn = pq_getmsgint64(&s);
4077 14938 : rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
4078 14938 : rdt_data.remote_nextxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
4079 14938 : rdt_data.reply_time = pq_getmsgint64(&s);
4080 :
4081 : /*
4082 : * This should never happen, see
4083 : * ProcessStandbyPSRequestMessage. But if it happens
4084 : * due to a bug, we don't want to proceed as it can
4085 : * incorrectly advance oldest_nonremovable_xid.
4086 : */
4087 14938 : if (XLogRecPtrIsInvalid(rdt_data.remote_lsn))
4088 0 : elog(ERROR, "cannot get the latest WAL position from the publisher");
4089 :
4090 14938 : maybe_advance_nonremovable_xid(&rdt_data, true);
4091 :
4092 14938 : UpdateWorkerStats(last_received, rdt_data.reply_time, false);
4093 : }
4094 : /* other message types are purposefully ignored */
4095 :
4096 387856 : MemoryContextReset(ApplyMessageContext);
4097 : }
4098 :
4099 387856 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
4100 : }
4101 : }
4102 :
4103 : /* confirm all writes so far */
4104 142196 : send_feedback(last_received, false, false);
4105 :
4106 : /* Reset the timestamp if no message was received */
4107 142196 : rdt_data.last_recv_time = 0;
4108 :
4109 142196 : maybe_advance_nonremovable_xid(&rdt_data, false);
4110 :
4111 142196 : if (!in_remote_transaction && !in_streamed_transaction)
4112 : {
4113 : /*
4114 : * If we didn't get any transactions for a while there might be
4115 : * unconsumed invalidation messages in the queue, consume them
4116 : * now.
4117 : */
4118 10350 : AcceptInvalidationMessages();
4119 10350 : maybe_reread_subscription();
4120 :
4121 : /* Process any table synchronization changes. */
4122 10266 : process_syncing_tables(last_received);
4123 : }
4124 :
4125 : /* Cleanup the memory. */
4126 141732 : MemoryContextReset(ApplyMessageContext);
4127 141732 : MemoryContextSwitchTo(TopMemoryContext);
4128 :
4129 : /* Check if we need to exit the streaming loop. */
4130 141732 : if (endofstream)
4131 20 : break;
4132 :
4133 : /*
4134 : * Wait for more data or latch. If we have unflushed transactions,
4135 : * wake up after WalWriterDelay to see if they've been flushed yet (in
4136 : * which case we should send a feedback message). Otherwise, there's
4137 : * no particular urgency about waking up unless we get data or a
4138 : * signal.
4139 : */
4140 141712 : if (!dlist_is_empty(&lsn_mapping))
4141 14330 : wait_time = WalWriterDelay;
4142 : else
4143 127382 : wait_time = NAPTIME_PER_CYCLE;
4144 :
4145 : /*
4146 : * Ensure to wake up when it's possible to advance the non-removable
4147 : * transaction ID, or when the retention duration may have exceeded
4148 : * max_retention_duration.
4149 : */
4150 141712 : if (MySubscription->retentionactive)
4151 : {
4152 5200 : if (rdt_data.phase == RDT_GET_CANDIDATE_XID &&
4153 130 : rdt_data.xid_advance_interval)
4154 0 : wait_time = Min(wait_time, rdt_data.xid_advance_interval);
4155 5200 : else if (MySubscription->maxretention > 0)
4156 2 : wait_time = Min(wait_time, MySubscription->maxretention);
4157 : }
4158 :
4159 141712 : rc = WaitLatchOrSocket(MyLatch,
4160 : WL_SOCKET_READABLE | WL_LATCH_SET |
4161 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
4162 : fd, wait_time,
4163 : WAIT_EVENT_LOGICAL_APPLY_MAIN);
4164 :
4165 141712 : if (rc & WL_LATCH_SET)
4166 : {
4167 1964 : ResetLatch(MyLatch);
4168 1964 : CHECK_FOR_INTERRUPTS();
4169 : }
4170 :
4171 141524 : if (ConfigReloadPending)
4172 : {
4173 24 : ConfigReloadPending = false;
4174 24 : ProcessConfigFile(PGC_SIGHUP);
4175 : }
4176 :
4177 141524 : if (rc & WL_TIMEOUT)
4178 : {
4179 : /*
4180 : * We didn't receive anything new. If we haven't heard anything
4181 : * from the server for more than wal_receiver_timeout / 2, ping
4182 : * the server. Also, if it's been longer than
4183 : * wal_receiver_status_interval since the last update we sent,
4184 : * send a status update to the primary anyway, to report any
4185 : * progress in applying WAL.
4186 : */
4187 404 : bool requestReply = false;
4188 :
4189 : /*
4190 : * Check if time since last receive from primary has reached the
4191 : * configured limit.
4192 : */
4193 404 : if (wal_receiver_timeout > 0)
4194 : {
4195 404 : TimestampTz now = GetCurrentTimestamp();
4196 : TimestampTz timeout;
4197 :
4198 404 : timeout =
4199 404 : TimestampTzPlusMilliseconds(last_recv_timestamp,
4200 : wal_receiver_timeout);
4201 :
4202 404 : if (now >= timeout)
4203 0 : ereport(ERROR,
4204 : (errcode(ERRCODE_CONNECTION_FAILURE),
4205 : errmsg("terminating logical replication worker due to timeout")));
4206 :
4207 : /* Check to see if it's time for a ping. */
4208 404 : if (!ping_sent)
4209 : {
4210 404 : timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
4211 : (wal_receiver_timeout / 2));
4212 404 : if (now >= timeout)
4213 : {
4214 0 : requestReply = true;
4215 0 : ping_sent = true;
4216 : }
4217 : }
4218 : }
4219 :
4220 404 : send_feedback(last_received, requestReply, requestReply);
4221 :
4222 404 : maybe_advance_nonremovable_xid(&rdt_data, false);
4223 :
4224 : /*
4225 : * Force reporting to ensure long idle periods don't lead to
4226 : * arbitrarily delayed stats. Stats can only be reported outside
4227 : * of (implicit or explicit) transactions. That shouldn't lead to
4228 : * stats being delayed for long, because transactions are either
4229 : * sent as a whole on commit or streamed. Streamed transactions
4230 : * are spilled to disk and applied on commit.
4231 : */
4232 404 : if (!IsTransactionState())
4233 404 : pgstat_report_stat(true);
4234 : }
4235 : }
4236 :
4237 : /* Pop the error context stack */
4238 20 : error_context_stack = errcallback.previous;
4239 20 : apply_error_context_stack = error_context_stack;
4240 :
4241 : /* All done */
4242 20 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
4243 0 : }
4244 :
4245 : /*
4246 : * Send a Standby Status Update message to server.
4247 : *
4248 : * 'recvpos' is the latest LSN we've received data to, force is set if we need
4249 : * to send a response to avoid timeouts.
4250 : */
4251 : static void
4252 145850 : send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
4253 : {
4254 : static StringInfo reply_message = NULL;
4255 : static TimestampTz send_time = 0;
4256 :
4257 : static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
4258 : static XLogRecPtr last_writepos = InvalidXLogRecPtr;
4259 :
4260 : XLogRecPtr writepos;
4261 : XLogRecPtr flushpos;
4262 : TimestampTz now;
4263 : bool have_pending_txes;
4264 :
4265 : /*
4266 : * If the user doesn't want status to be reported to the publisher, be
4267 : * sure to exit before doing anything at all.
4268 : */
4269 145850 : if (!force && wal_receiver_status_interval <= 0)
4270 42924 : return;
4271 :
4272 : /* It's legal to not pass a recvpos */
4273 145850 : if (recvpos < last_recvpos)
4274 0 : recvpos = last_recvpos;
4275 :
4276 145850 : get_flush_position(&writepos, &flushpos, &have_pending_txes);
4277 :
4278 : /*
4279 : * No outstanding transactions to flush, we can report the latest received
4280 : * position. This is important for synchronous replication.
4281 : */
4282 145850 : if (!have_pending_txes)
4283 130588 : flushpos = writepos = recvpos;
4284 :
4285 145850 : if (writepos < last_writepos)
4286 0 : writepos = last_writepos;
4287 :
4288 145850 : if (flushpos < last_flushpos)
4289 15158 : flushpos = last_flushpos;
4290 :
4291 145850 : now = GetCurrentTimestamp();
4292 :
4293 : /* if we've already reported everything we're good */
4294 145850 : if (!force &&
4295 145844 : writepos == last_writepos &&
4296 43500 : flushpos == last_flushpos &&
4297 43182 : !TimestampDifferenceExceeds(send_time, now,
4298 : wal_receiver_status_interval * 1000))
4299 42924 : return;
4300 102926 : send_time = now;
4301 :
4302 102926 : if (!reply_message)
4303 : {
4304 816 : MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
4305 :
4306 816 : reply_message = makeStringInfo();
4307 816 : MemoryContextSwitchTo(oldctx);
4308 : }
4309 : else
4310 102110 : resetStringInfo(reply_message);
4311 :
4312 102926 : pq_sendbyte(reply_message, PqReplMsg_StandbyStatusUpdate);
4313 102926 : pq_sendint64(reply_message, recvpos); /* write */
4314 102926 : pq_sendint64(reply_message, flushpos); /* flush */
4315 102926 : pq_sendint64(reply_message, writepos); /* apply */
4316 102926 : pq_sendint64(reply_message, now); /* sendTime */
4317 102926 : pq_sendbyte(reply_message, requestReply); /* replyRequested */
4318 :
4319 102926 : elog(DEBUG2, "sending feedback (force %d) to recv %X/%08X, write %X/%08X, flush %X/%08X",
4320 : force,
4321 : LSN_FORMAT_ARGS(recvpos),
4322 : LSN_FORMAT_ARGS(writepos),
4323 : LSN_FORMAT_ARGS(flushpos));
4324 :
4325 102926 : walrcv_send(LogRepWorkerWalRcvConn,
4326 : reply_message->data, reply_message->len);
4327 :
4328 102926 : if (recvpos > last_recvpos)
4329 102350 : last_recvpos = recvpos;
4330 102926 : if (writepos > last_writepos)
4331 102348 : last_writepos = writepos;
4332 102926 : if (flushpos > last_flushpos)
4333 101904 : last_flushpos = flushpos;
4334 : }
4335 :
4336 : /*
4337 : * Attempt to advance the non-removable transaction ID.
4338 : *
4339 : * See comments atop worker.c for details.
4340 : */
4341 : static void
4342 530456 : maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
4343 : bool status_received)
4344 : {
4345 530456 : if (!can_advance_nonremovable_xid(rdt_data))
4346 509968 : return;
4347 :
4348 20488 : process_rdt_phase_transition(rdt_data, status_received);
4349 : }
4350 :
4351 : /*
4352 : * Preliminary check to determine if advancing the non-removable transaction ID
4353 : * is allowed.
4354 : */
4355 : static bool
4356 530456 : can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
4357 : {
4358 : /*
4359 : * It is sufficient to manage non-removable transaction ID for a
4360 : * subscription by the main apply worker to detect update_deleted reliably
4361 : * even for table sync or parallel apply workers.
4362 : */
4363 530456 : if (!am_leader_apply_worker())
4364 558 : return false;
4365 :
4366 : /* No need to advance if retaining dead tuples is not required */
4367 529898 : if (!MySubscription->retaindeadtuples)
4368 509384 : return false;
4369 :
4370 : /* No need to advance if we have already stopped retaining */
4371 20514 : if (!MySubscription->retentionactive)
4372 26 : return false;
4373 :
4374 20488 : return true;
4375 : }
4376 :
4377 : /*
4378 : * Process phase transitions during the non-removable transaction ID
4379 : * advancement. See comments atop worker.c for details of the transition.
4380 : */
4381 : static void
4382 35594 : process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
4383 : bool status_received)
4384 : {
4385 35594 : switch (rdt_data->phase)
4386 : {
4387 364 : case RDT_GET_CANDIDATE_XID:
4388 364 : get_candidate_xid(rdt_data);
4389 364 : break;
4390 14950 : case RDT_REQUEST_PUBLISHER_STATUS:
4391 14950 : request_publisher_status(rdt_data);
4392 14950 : break;
4393 20034 : case RDT_WAIT_FOR_PUBLISHER_STATUS:
4394 20034 : wait_for_publisher_status(rdt_data, status_received);
4395 20034 : break;
4396 244 : case RDT_WAIT_FOR_LOCAL_FLUSH:
4397 244 : wait_for_local_flush(rdt_data);
4398 244 : break;
4399 2 : case RDT_STOP_CONFLICT_INFO_RETENTION:
4400 2 : stop_conflict_info_retention(rdt_data);
4401 2 : break;
4402 : }
4403 35594 : }
4404 :
4405 : /*
4406 : * Workhorse for the RDT_GET_CANDIDATE_XID phase.
4407 : */
4408 : static void
4409 364 : get_candidate_xid(RetainDeadTuplesData *rdt_data)
4410 : {
4411 : TransactionId oldest_running_xid;
4412 : TimestampTz now;
4413 :
4414 : /*
4415 : * Use last_recv_time when applying changes in the loop to avoid
4416 : * unnecessary system time retrieval. If last_recv_time is not available,
4417 : * obtain the current timestamp.
4418 : */
4419 364 : now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4420 :
4421 : /*
4422 : * Compute the candidate_xid and request the publisher status at most once
4423 : * per xid_advance_interval. Refer to adjust_xid_advance_interval() for
4424 : * details on how this value is dynamically adjusted. This is to avoid
4425 : * using CPU and network resources without making much progress.
4426 : */
4427 364 : if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
4428 : rdt_data->xid_advance_interval))
4429 0 : return;
4430 :
4431 : /*
4432 : * Immediately update the timer, even if the function returns later
4433 : * without setting candidate_xid due to inactivity on the subscriber. This
4434 : * avoids frequent calls to GetOldestActiveTransactionId.
4435 : */
4436 364 : rdt_data->candidate_xid_time = now;
4437 :
4438 : /*
4439 : * Consider transactions in the current database, as only dead tuples from
4440 : * this database are required for conflict detection.
4441 : */
4442 364 : oldest_running_xid = GetOldestActiveTransactionId(false, false);
4443 :
4444 : /*
4445 : * Oldest active transaction ID (oldest_running_xid) can't be behind any
4446 : * of its previously computed value.
4447 : */
4448 : Assert(TransactionIdPrecedesOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
4449 : oldest_running_xid));
4450 :
4451 : /* Return if the oldest_nonremovable_xid cannot be advanced */
4452 364 : if (TransactionIdEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
4453 : oldest_running_xid))
4454 : {
4455 272 : adjust_xid_advance_interval(rdt_data, false);
4456 272 : return;
4457 : }
4458 :
4459 92 : adjust_xid_advance_interval(rdt_data, true);
4460 :
4461 92 : rdt_data->candidate_xid = oldest_running_xid;
4462 92 : rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
4463 :
4464 : /* process the next phase */
4465 92 : process_rdt_phase_transition(rdt_data, false);
4466 : }
4467 :
4468 : /*
4469 : * Workhorse for the RDT_REQUEST_PUBLISHER_STATUS phase.
4470 : */
4471 : static void
4472 14950 : request_publisher_status(RetainDeadTuplesData *rdt_data)
4473 : {
4474 : static StringInfo request_message = NULL;
4475 :
4476 14950 : if (!request_message)
4477 : {
4478 24 : MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
4479 :
4480 24 : request_message = makeStringInfo();
4481 24 : MemoryContextSwitchTo(oldctx);
4482 : }
4483 : else
4484 14926 : resetStringInfo(request_message);
4485 :
4486 : /*
4487 : * Send the current time to update the remote walsender's latest reply
4488 : * message received time.
4489 : */
4490 14950 : pq_sendbyte(request_message, PqReplMsg_PrimaryStatusRequest);
4491 14950 : pq_sendint64(request_message, GetCurrentTimestamp());
4492 :
4493 14950 : elog(DEBUG2, "sending publisher status request message");
4494 :
4495 : /* Send a request for the publisher status */
4496 14950 : walrcv_send(LogRepWorkerWalRcvConn,
4497 : request_message->data, request_message->len);
4498 :
4499 14950 : rdt_data->phase = RDT_WAIT_FOR_PUBLISHER_STATUS;
4500 :
4501 : /*
4502 : * Skip calling maybe_advance_nonremovable_xid() since further transition
4503 : * is possible only once we receive the publisher status message.
4504 : */
4505 14950 : }
4506 :
4507 : /*
4508 : * Workhorse for the RDT_WAIT_FOR_PUBLISHER_STATUS phase.
4509 : */
4510 : static void
4511 20034 : wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
4512 : bool status_received)
4513 : {
4514 : /*
4515 : * Return if we have requested but not yet received the publisher status.
4516 : */
4517 20034 : if (!status_received)
4518 5098 : return;
4519 :
4520 : /*
4521 : * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4522 : * retaining conflict information for this worker.
4523 : */
4524 14936 : if (should_stop_conflict_info_retention(rdt_data))
4525 0 : return;
4526 :
4527 14936 : if (!FullTransactionIdIsValid(rdt_data->remote_wait_for))
4528 78 : rdt_data->remote_wait_for = rdt_data->remote_nextxid;
4529 :
4530 : /*
4531 : * Check if all remote concurrent transactions that were active at the
4532 : * first status request have now completed. If completed, proceed to the
4533 : * next phase; otherwise, continue checking the publisher status until
4534 : * these transactions finish.
4535 : *
4536 : * It's possible that transactions in the commit phase during the last
4537 : * cycle have now finished committing, but remote_oldestxid remains older
4538 : * than remote_wait_for. This can happen if some old transaction came in
4539 : * the commit phase when we requested status in this cycle. We do not
4540 : * handle this case explicitly as it's rare and the benefit doesn't
4541 : * justify the required complexity. Tracking would require either caching
4542 : * all xids at the publisher or sending them to subscribers. The condition
4543 : * will resolve naturally once the remaining transactions are finished.
4544 : *
4545 : * Directly advancing the non-removable transaction ID is possible if
4546 : * there are no activities on the publisher since the last advancement
4547 : * cycle. However, it requires maintaining two fields, last_remote_nextxid
4548 : * and last_remote_lsn, within the structure for comparison with the
4549 : * current cycle's values. Considering the minimal cost of continuing in
4550 : * RDT_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to
4551 : * advance the transaction ID here.
4552 : */
4553 14936 : if (FullTransactionIdPrecedesOrEquals(rdt_data->remote_wait_for,
4554 : rdt_data->remote_oldestxid))
4555 78 : rdt_data->phase = RDT_WAIT_FOR_LOCAL_FLUSH;
4556 : else
4557 14858 : rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
4558 :
4559 : /* process the next phase */
4560 14936 : process_rdt_phase_transition(rdt_data, false);
4561 : }
4562 :
4563 : /*
4564 : * Workhorse for the RDT_WAIT_FOR_LOCAL_FLUSH phase.
4565 : */
4566 : static void
4567 244 : wait_for_local_flush(RetainDeadTuplesData *rdt_data)
4568 : {
4569 : Assert(!XLogRecPtrIsInvalid(rdt_data->remote_lsn) &&
4570 : TransactionIdIsValid(rdt_data->candidate_xid));
4571 :
4572 : /*
4573 : * We expect the publisher and subscriber clocks to be in sync using time
4574 : * sync service like NTP. Otherwise, we will advance this worker's
4575 : * oldest_nonremovable_xid prematurely, leading to the removal of rows
4576 : * required to detect update_deleted reliably. This check primarily
4577 : * addresses scenarios where the publisher's clock falls behind; if the
4578 : * publisher's clock is ahead, subsequent transactions will naturally bear
4579 : * later commit timestamps, conforming to the design outlined atop
4580 : * worker.c.
4581 : *
4582 : * XXX Consider waiting for the publisher's clock to catch up with the
4583 : * subscriber's before proceeding to the next phase.
4584 : */
4585 244 : if (TimestampDifferenceExceeds(rdt_data->reply_time,
4586 : rdt_data->candidate_xid_time, 0))
4587 0 : ereport(ERROR,
4588 : errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"),
4589 : errdetail_internal("The clock on the publisher is behind that of the subscriber."));
4590 :
4591 : /*
4592 : * Do not attempt to advance the non-removable transaction ID when table
4593 : * sync is in progress. During this time, changes from a single
4594 : * transaction may be applied by multiple table sync workers corresponding
4595 : * to the target tables. So, it's necessary for all table sync workers to
4596 : * apply and flush the corresponding changes before advancing the
4597 : * transaction ID, otherwise, dead tuples that are still needed for
4598 : * conflict detection in table sync workers could be removed prematurely.
4599 : * However, confirming the apply and flush progress across all table sync
4600 : * workers is complex and not worth the effort, so we simply return if not
4601 : * all tables are in the READY state.
4602 : *
4603 : * Advancing the transaction ID is necessary even when no tables are
4604 : * currently subscribed, to avoid retaining dead tuples unnecessarily.
4605 : * While it might seem safe to skip all phases and directly assign
4606 : * candidate_xid to oldest_nonremovable_xid during the
4607 : * RDT_GET_CANDIDATE_XID phase in such cases, this is unsafe. If users
4608 : * concurrently add tables to the subscription, the apply worker may not
4609 : * process invalidations in time. Consequently,
4610 : * HasSubscriptionRelationsCached() might miss the new tables, leading to
4611 : * premature advancement of oldest_nonremovable_xid.
4612 : *
4613 : * Performing the check during RDT_WAIT_FOR_LOCAL_FLUSH is safe, as
4614 : * invalidations are guaranteed to be processed before applying changes
4615 : * from newly added tables while waiting for the local flush to reach
4616 : * remote_lsn.
4617 : *
4618 : * Additionally, even if we check for subscription tables during
4619 : * RDT_GET_CANDIDATE_XID, they might be dropped before reaching
4620 : * RDT_WAIT_FOR_LOCAL_FLUSH. Therefore, it's still necessary to verify
4621 : * subscription tables at this stage to prevent unnecessary tuple
4622 : * retention.
4623 : */
4624 244 : if (HasSubscriptionRelationsCached() && !AllTablesyncsReady())
4625 : {
4626 : TimestampTz now;
4627 :
4628 56 : now = rdt_data->last_recv_time
4629 28 : ? rdt_data->last_recv_time : GetCurrentTimestamp();
4630 :
4631 : /*
4632 : * Record the time spent waiting for table sync, it is needed for the
4633 : * timeout check in should_stop_conflict_info_retention().
4634 : */
4635 28 : rdt_data->table_sync_wait_time =
4636 28 : TimestampDifferenceMilliseconds(rdt_data->candidate_xid_time, now);
4637 :
4638 28 : return;
4639 : }
4640 :
4641 : /*
4642 : * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4643 : * retaining conflict information for this worker.
4644 : */
4645 216 : if (should_stop_conflict_info_retention(rdt_data))
4646 2 : return;
4647 :
4648 : /*
4649 : * Update and check the remote flush position if we are applying changes
4650 : * in a loop. This is done at most once per WalWriterDelay to avoid
4651 : * performing costly operations in get_flush_position() too frequently
4652 : * during change application.
4653 : */
4654 286 : if (last_flushpos < rdt_data->remote_lsn && rdt_data->last_recv_time &&
4655 72 : TimestampDifferenceExceeds(rdt_data->flushpos_update_time,
4656 : rdt_data->last_recv_time, WalWriterDelay))
4657 : {
4658 : XLogRecPtr writepos;
4659 : XLogRecPtr flushpos;
4660 : bool have_pending_txes;
4661 :
4662 : /* Fetch the latest remote flush position */
4663 26 : get_flush_position(&writepos, &flushpos, &have_pending_txes);
4664 :
4665 26 : if (flushpos > last_flushpos)
4666 0 : last_flushpos = flushpos;
4667 :
4668 26 : rdt_data->flushpos_update_time = rdt_data->last_recv_time;
4669 : }
4670 :
4671 : /* Return to wait for the changes to be applied */
4672 214 : if (last_flushpos < rdt_data->remote_lsn)
4673 138 : return;
4674 :
4675 : /*
4676 : * Reaching here means the remote WAL position has been received, and all
4677 : * transactions up to that position on the publisher have been applied and
4678 : * flushed locally. So, we can advance the non-removable transaction ID.
4679 : */
4680 76 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
4681 76 : MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid;
4682 76 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
4683 :
4684 76 : elog(DEBUG2, "confirmed flush up to remote lsn %X/%08X: new oldest_nonremovable_xid %u",
4685 : LSN_FORMAT_ARGS(rdt_data->remote_lsn),
4686 : rdt_data->candidate_xid);
4687 :
4688 : /* Notify launcher to update the xmin of the conflict slot */
4689 76 : ApplyLauncherWakeup();
4690 :
4691 76 : reset_retention_data_fields(rdt_data);
4692 :
4693 : /* process the next phase */
4694 76 : process_rdt_phase_transition(rdt_data, false);
4695 : }
4696 :
4697 : /*
4698 : * Check whether conflict information retention should be stopped due to
4699 : * exceeding the maximum wait time (max_retention_duration).
4700 : *
4701 : * If retention should be stopped, transition to the
4702 : * RDT_STOP_CONFLICT_INFO_RETENTION phase and return true. Otherwise, return
4703 : * false.
4704 : *
4705 : * Note: Retention won't be resumed automatically. The user must manually
4706 : * disable retain_dead_tuples and re-enable it after confirming that the
4707 : * replication slot maintained by the launcher has been dropped.
4708 : */
4709 : static bool
4710 15152 : should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
4711 : {
4712 : TimestampTz now;
4713 :
4714 : Assert(TransactionIdIsValid(rdt_data->candidate_xid));
4715 : Assert(rdt_data->phase == RDT_WAIT_FOR_PUBLISHER_STATUS ||
4716 : rdt_data->phase == RDT_WAIT_FOR_LOCAL_FLUSH);
4717 :
4718 15152 : if (!MySubscription->maxretention)
4719 15150 : return false;
4720 :
4721 : /*
4722 : * Use last_recv_time when applying changes in the loop to avoid
4723 : * unnecessary system time retrieval. If last_recv_time is not available,
4724 : * obtain the current timestamp.
4725 : */
4726 2 : now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4727 :
4728 : /*
4729 : * Return early if the wait time has not exceeded the configured maximum
4730 : * (max_retention_duration). Time spent waiting for table synchronization
4731 : * is excluded from this calculation, as it occurs infrequently.
4732 : */
4733 2 : if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
4734 2 : MySubscription->maxretention +
4735 2 : rdt_data->table_sync_wait_time))
4736 0 : return false;
4737 :
4738 2 : rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
4739 :
4740 : /* process the next phase */
4741 2 : process_rdt_phase_transition(rdt_data, false);
4742 :
4743 2 : return true;
4744 : }
4745 :
4746 : /*
4747 : * Workhorse for the RDT_STOP_CONFLICT_INFO_RETENTION phase.
4748 : */
4749 : static void
4750 2 : stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
4751 : {
4752 : /*
4753 : * Do not update the catalog during an active transaction. The transaction
4754 : * may be started during change application, leading to a possible
4755 : * rollback of catalog updates if the application fails subsequently.
4756 : */
4757 2 : if (IsTransactionState())
4758 0 : return;
4759 :
4760 2 : StartTransactionCommand();
4761 :
4762 : /*
4763 : * Updating pg_subscription might involve TOAST table access, so ensure we
4764 : * have a valid snapshot.
4765 : */
4766 2 : PushActiveSnapshot(GetTransactionSnapshot());
4767 :
4768 : /* Set pg_subscription.subretentionactive to false */
4769 2 : UpdateDeadTupleRetentionStatus(MySubscription->oid, false);
4770 :
4771 2 : PopActiveSnapshot();
4772 2 : CommitTransactionCommand();
4773 :
4774 2 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
4775 2 : MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId;
4776 2 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
4777 :
4778 2 : ereport(LOG,
4779 : errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts",
4780 : MySubscription->name),
4781 : errdetail("Retention of information used for conflict detection has exceeded max_retention_duration of %u ms.",
4782 : MySubscription->maxretention));
4783 :
4784 : /* Notify launcher to update the conflict slot */
4785 2 : ApplyLauncherWakeup();
4786 :
4787 2 : reset_retention_data_fields(rdt_data);
4788 : }
4789 :
4790 : /*
4791 : * Reset all data fields of RetainDeadTuplesData except those used to
4792 : * determine the timing for the next round of transaction ID advancement. We
4793 : * can even use flushpos_update_time in the next round to decide whether to get
4794 : * the latest flush position.
4795 : */
4796 : static void
4797 78 : reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
4798 : {
4799 78 : rdt_data->phase = RDT_GET_CANDIDATE_XID;
4800 78 : rdt_data->remote_lsn = InvalidXLogRecPtr;
4801 78 : rdt_data->remote_oldestxid = InvalidFullTransactionId;
4802 78 : rdt_data->remote_nextxid = InvalidFullTransactionId;
4803 78 : rdt_data->reply_time = 0;
4804 78 : rdt_data->remote_wait_for = InvalidFullTransactionId;
4805 78 : rdt_data->candidate_xid = InvalidTransactionId;
4806 78 : rdt_data->table_sync_wait_time = 0;
4807 78 : }
4808 :
4809 : /*
4810 : * Adjust the interval for advancing non-removable transaction IDs.
4811 : *
4812 : * If there is no activity on the node, we progressively double the interval
4813 : * used to advance non-removable transaction ID. This helps conserve CPU
4814 : * and network resources when there's little benefit to frequent updates.
4815 : *
4816 : * The interval is capped by the lowest of the following:
4817 : * - wal_receiver_status_interval (if set),
4818 : * - a default maximum of 3 minutes,
4819 : * - max_retention_duration.
4820 : *
4821 : * This ensures the interval never exceeds the retention boundary, even if
4822 : * other limits are higher. Once activity resumes on the node, the interval
4823 : * is reset to lesser of 100ms and max_retention_duration, allowing timely
4824 : * advancement of non-removable transaction ID.
4825 : *
4826 : * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
4827 : * consider the other interval or a separate GUC if the need arises.
4828 : */
4829 : static void
4830 364 : adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
4831 : {
4832 364 : if (!new_xid_found && rdt_data->xid_advance_interval)
4833 0 : {
4834 0 : int max_interval = wal_receiver_status_interval
4835 0 : ? wal_receiver_status_interval * 1000
4836 0 : : MAX_XID_ADVANCE_INTERVAL;
4837 :
4838 : /*
4839 : * No new transaction ID has been assigned since the last check, so
4840 : * double the interval, but not beyond the maximum allowable value.
4841 : */
4842 0 : rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4843 : max_interval);
4844 : }
4845 : else
4846 : {
4847 : /*
4848 : * A new transaction ID was found or the interval is not yet
4849 : * initialized, so set the interval to the minimum value.
4850 : */
4851 364 : rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
4852 : }
4853 :
4854 : /* Ensure the wait time remains within the maximum limit */
4855 364 : rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,
4856 : MySubscription->maxretention);
4857 364 : }
4858 :
4859 : /*
4860 : * Exit routine for apply workers due to subscription parameter changes.
4861 : */
4862 : static void
4863 90 : apply_worker_exit(void)
4864 : {
4865 90 : if (am_parallel_apply_worker())
4866 : {
4867 : /*
4868 : * Don't stop the parallel apply worker as the leader will detect the
4869 : * subscription parameter change and restart logical replication later
4870 : * anyway. This also prevents the leader from reporting errors when
4871 : * trying to communicate with a stopped parallel apply worker, which
4872 : * would accidentally disable subscriptions if disable_on_error was
4873 : * set.
4874 : */
4875 0 : return;
4876 : }
4877 :
4878 : /*
4879 : * Reset the last-start time for this apply worker so that the launcher
4880 : * will restart it without waiting for wal_retrieve_retry_interval if the
4881 : * subscription is still active, and so that we won't leak that hash table
4882 : * entry if it isn't.
4883 : */
4884 90 : if (am_leader_apply_worker())
4885 90 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
4886 :
4887 90 : proc_exit(0);
4888 : }
4889 :
4890 : /*
4891 : * Reread subscription info if needed.
4892 : *
4893 : * For significant changes, we react by exiting the current process; a new
4894 : * one will be launched afterwards if needed.
4895 : */
4896 : void
4897 12378 : maybe_reread_subscription(void)
4898 : {
4899 : MemoryContext oldctx;
4900 : Subscription *newsub;
4901 12378 : bool started_tx = false;
4902 :
4903 : /* When cache state is valid there is nothing to do here. */
4904 12378 : if (MySubscriptionValid)
4905 12208 : return;
4906 :
4907 : /* This function might be called inside or outside of transaction. */
4908 170 : if (!IsTransactionState())
4909 : {
4910 162 : StartTransactionCommand();
4911 162 : started_tx = true;
4912 : }
4913 :
4914 : /* Ensure allocations in permanent context. */
4915 170 : oldctx = MemoryContextSwitchTo(ApplyContext);
4916 :
4917 170 : newsub = GetSubscription(MyLogicalRepWorker->subid, true);
4918 :
4919 : /*
4920 : * Exit if the subscription was removed. This normally should not happen
4921 : * as the worker gets killed during DROP SUBSCRIPTION.
4922 : */
4923 170 : if (!newsub)
4924 : {
4925 0 : ereport(LOG,
4926 : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
4927 : MySubscription->name)));
4928 :
4929 : /* Ensure we remove no-longer-useful entry for worker's start time */
4930 0 : if (am_leader_apply_worker())
4931 0 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
4932 :
4933 0 : proc_exit(0);
4934 : }
4935 :
4936 : /* Exit if the subscription was disabled. */
4937 170 : if (!newsub->enabled)
4938 : {
4939 28 : ereport(LOG,
4940 : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
4941 : MySubscription->name)));
4942 :
4943 28 : apply_worker_exit();
4944 : }
4945 :
4946 : /* !slotname should never happen when enabled is true. */
4947 : Assert(newsub->slotname);
4948 :
4949 : /* two-phase cannot be altered while the worker is running */
4950 : Assert(newsub->twophasestate == MySubscription->twophasestate);
4951 :
4952 : /*
4953 : * Exit if any parameter that affects the remote connection was changed.
4954 : * The launcher will start a new worker but note that the parallel apply
4955 : * worker won't restart if the streaming option's value is changed from
4956 : * 'parallel' to any other value or the server decides not to stream the
4957 : * in-progress transaction.
4958 : */
4959 142 : if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
4960 138 : strcmp(newsub->name, MySubscription->name) != 0 ||
4961 136 : strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
4962 136 : newsub->binary != MySubscription->binary ||
4963 124 : newsub->stream != MySubscription->stream ||
4964 114 : newsub->passwordrequired != MySubscription->passwordrequired ||
4965 114 : strcmp(newsub->origin, MySubscription->origin) != 0 ||
4966 110 : newsub->owner != MySubscription->owner ||
4967 108 : !equal(newsub->publications, MySubscription->publications))
4968 : {
4969 54 : if (am_parallel_apply_worker())
4970 0 : ereport(LOG,
4971 : (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
4972 : MySubscription->name)));
4973 : else
4974 54 : ereport(LOG,
4975 : (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
4976 : MySubscription->name)));
4977 :
4978 54 : apply_worker_exit();
4979 : }
4980 :
4981 : /*
4982 : * Exit if the subscription owner's superuser privileges have been
4983 : * revoked.
4984 : */
4985 88 : if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
4986 : {
4987 8 : if (am_parallel_apply_worker())
4988 0 : ereport(LOG,
4989 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
4990 : MySubscription->name));
4991 : else
4992 8 : ereport(LOG,
4993 : errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
4994 : MySubscription->name));
4995 :
4996 8 : apply_worker_exit();
4997 : }
4998 :
4999 : /* Check for other changes that should never happen too. */
5000 80 : if (newsub->dbid != MySubscription->dbid)
5001 : {
5002 0 : elog(ERROR, "subscription %u changed unexpectedly",
5003 : MyLogicalRepWorker->subid);
5004 : }
5005 :
5006 : /* Clean old subscription info and switch to new one. */
5007 80 : FreeSubscription(MySubscription);
5008 80 : MySubscription = newsub;
5009 :
5010 80 : MemoryContextSwitchTo(oldctx);
5011 :
5012 : /* Change synchronous commit according to the user's wishes */
5013 80 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
5014 : PGC_BACKEND, PGC_S_OVERRIDE);
5015 :
5016 80 : if (started_tx)
5017 78 : CommitTransactionCommand();
5018 :
5019 80 : MySubscriptionValid = true;
5020 : }
5021 :
5022 : /*
5023 : * Callback from subscription syscache invalidation.
5024 : */
5025 : static void
5026 178 : subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
5027 : {
5028 178 : MySubscriptionValid = false;
5029 178 : }
5030 :
5031 : /*
5032 : * subxact_info_write
5033 : * Store information about subxacts for a toplevel transaction.
5034 : *
5035 : * For each subxact we store offset of its first change in the main file.
5036 : * The file is always over-written as a whole.
5037 : *
5038 : * XXX We should only store subxacts that were not aborted yet.
5039 : */
5040 : static void
5041 742 : subxact_info_write(Oid subid, TransactionId xid)
5042 : {
5043 : char path[MAXPGPATH];
5044 : Size len;
5045 : BufFile *fd;
5046 :
5047 : Assert(TransactionIdIsValid(xid));
5048 :
5049 : /* construct the subxact filename */
5050 742 : subxact_filename(path, subid, xid);
5051 :
5052 : /* Delete the subxacts file, if exists. */
5053 742 : if (subxact_data.nsubxacts == 0)
5054 : {
5055 578 : cleanup_subxact_info();
5056 578 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
5057 :
5058 578 : return;
5059 : }
5060 :
5061 : /*
5062 : * Create the subxact file if it not already created, otherwise open the
5063 : * existing file.
5064 : */
5065 164 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR,
5066 : true);
5067 164 : if (fd == NULL)
5068 16 : fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path);
5069 :
5070 164 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
5071 :
5072 : /* Write the subxact count and subxact info */
5073 164 : BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
5074 164 : BufFileWrite(fd, subxact_data.subxacts, len);
5075 :
5076 164 : BufFileClose(fd);
5077 :
5078 : /* free the memory allocated for subxact info */
5079 164 : cleanup_subxact_info();
5080 : }
5081 :
5082 : /*
5083 : * subxact_info_read
5084 : * Restore information about subxacts of a streamed transaction.
5085 : *
5086 : * Read information about subxacts into the structure subxact_data that can be
5087 : * used later.
5088 : */
5089 : static void
5090 686 : subxact_info_read(Oid subid, TransactionId xid)
5091 : {
5092 : char path[MAXPGPATH];
5093 : Size len;
5094 : BufFile *fd;
5095 : MemoryContext oldctx;
5096 :
5097 : Assert(!subxact_data.subxacts);
5098 : Assert(subxact_data.nsubxacts == 0);
5099 : Assert(subxact_data.nsubxacts_max == 0);
5100 :
5101 : /*
5102 : * If the subxact file doesn't exist that means we don't have any subxact
5103 : * info.
5104 : */
5105 686 : subxact_filename(path, subid, xid);
5106 686 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
5107 : true);
5108 686 : if (fd == NULL)
5109 528 : return;
5110 :
5111 : /* read number of subxact items */
5112 158 : BufFileReadExact(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
5113 :
5114 158 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
5115 :
5116 : /* we keep the maximum as a power of 2 */
5117 158 : subxact_data.nsubxacts_max = 1 << pg_ceil_log2_32(subxact_data.nsubxacts);
5118 :
5119 : /*
5120 : * Allocate subxact information in the logical streaming context. We need
5121 : * this information during the complete stream so that we can add the sub
5122 : * transaction info to this. On stream stop we will flush this information
5123 : * to the subxact file and reset the logical streaming context.
5124 : */
5125 158 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
5126 158 : subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
5127 : sizeof(SubXactInfo));
5128 158 : MemoryContextSwitchTo(oldctx);
5129 :
5130 158 : if (len > 0)
5131 158 : BufFileReadExact(fd, subxact_data.subxacts, len);
5132 :
5133 158 : BufFileClose(fd);
5134 : }
5135 :
5136 : /*
5137 : * subxact_info_add
5138 : * Add information about a subxact (offset in the main file).
5139 : */
5140 : static void
5141 205024 : subxact_info_add(TransactionId xid)
5142 : {
5143 205024 : SubXactInfo *subxacts = subxact_data.subxacts;
5144 : int64 i;
5145 :
5146 : /* We must have a valid top level stream xid and a stream fd. */
5147 : Assert(TransactionIdIsValid(stream_xid));
5148 : Assert(stream_fd != NULL);
5149 :
5150 : /*
5151 : * If the XID matches the toplevel transaction, we don't want to add it.
5152 : */
5153 205024 : if (stream_xid == xid)
5154 184776 : return;
5155 :
5156 : /*
5157 : * In most cases we're checking the same subxact as we've already seen in
5158 : * the last call, so make sure to ignore it (this change comes later).
5159 : */
5160 20248 : if (subxact_data.subxact_last == xid)
5161 20096 : return;
5162 :
5163 : /* OK, remember we're processing this XID. */
5164 152 : subxact_data.subxact_last = xid;
5165 :
5166 : /*
5167 : * Check if the transaction is already present in the array of subxact. We
5168 : * intentionally scan the array from the tail, because we're likely adding
5169 : * a change for the most recent subtransactions.
5170 : *
5171 : * XXX Can we rely on the subxact XIDs arriving in sorted order? That
5172 : * would allow us to use binary search here.
5173 : */
5174 190 : for (i = subxact_data.nsubxacts; i > 0; i--)
5175 : {
5176 : /* found, so we're done */
5177 152 : if (subxacts[i - 1].xid == xid)
5178 114 : return;
5179 : }
5180 :
5181 : /* This is a new subxact, so we need to add it to the array. */
5182 38 : if (subxact_data.nsubxacts == 0)
5183 : {
5184 : MemoryContext oldctx;
5185 :
5186 16 : subxact_data.nsubxacts_max = 128;
5187 :
5188 : /*
5189 : * Allocate this memory for subxacts in per-stream context, see
5190 : * subxact_info_read.
5191 : */
5192 16 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
5193 16 : subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
5194 16 : MemoryContextSwitchTo(oldctx);
5195 : }
5196 22 : else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
5197 : {
5198 20 : subxact_data.nsubxacts_max *= 2;
5199 20 : subxacts = repalloc(subxacts,
5200 20 : subxact_data.nsubxacts_max * sizeof(SubXactInfo));
5201 : }
5202 :
5203 38 : subxacts[subxact_data.nsubxacts].xid = xid;
5204 :
5205 : /*
5206 : * Get the current offset of the stream file and store it as offset of
5207 : * this subxact.
5208 : */
5209 38 : BufFileTell(stream_fd,
5210 38 : &subxacts[subxact_data.nsubxacts].fileno,
5211 38 : &subxacts[subxact_data.nsubxacts].offset);
5212 :
5213 38 : subxact_data.nsubxacts++;
5214 38 : subxact_data.subxacts = subxacts;
5215 : }
5216 :
5217 : /* format filename for file containing the info about subxacts */
5218 : static inline void
5219 1490 : subxact_filename(char *path, Oid subid, TransactionId xid)
5220 : {
5221 1490 : snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
5222 1490 : }
5223 :
5224 : /* format filename for file containing serialized changes */
5225 : static inline void
5226 874 : changes_filename(char *path, Oid subid, TransactionId xid)
5227 : {
5228 874 : snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
5229 874 : }
5230 :
5231 : /*
5232 : * stream_cleanup_files
5233 : * Cleanup files for a subscription / toplevel transaction.
5234 : *
5235 : * Remove files with serialized changes and subxact info for a particular
5236 : * toplevel transaction. Each subscription has a separate set of files
5237 : * for any toplevel transaction.
5238 : */
5239 : void
5240 62 : stream_cleanup_files(Oid subid, TransactionId xid)
5241 : {
5242 : char path[MAXPGPATH];
5243 :
5244 : /* Delete the changes file. */
5245 62 : changes_filename(path, subid, xid);
5246 62 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false);
5247 :
5248 : /* Delete the subxact file, if it exists. */
5249 62 : subxact_filename(path, subid, xid);
5250 62 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
5251 62 : }
5252 :
5253 : /*
5254 : * stream_open_file
5255 : * Open a file that we'll use to serialize changes for a toplevel
5256 : * transaction.
5257 : *
5258 : * Open a file for streamed changes from a toplevel transaction identified
5259 : * by stream_xid (global variable). If it's the first chunk of streamed
5260 : * changes for this transaction, create the buffile, otherwise open the
5261 : * previously created file.
5262 : */
5263 : static void
5264 724 : stream_open_file(Oid subid, TransactionId xid, bool first_segment)
5265 : {
5266 : char path[MAXPGPATH];
5267 : MemoryContext oldcxt;
5268 :
5269 : Assert(OidIsValid(subid));
5270 : Assert(TransactionIdIsValid(xid));
5271 : Assert(stream_fd == NULL);
5272 :
5273 :
5274 724 : changes_filename(path, subid, xid);
5275 724 : elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
5276 :
5277 : /*
5278 : * Create/open the buffiles under the logical streaming context so that we
5279 : * have those files until stream stop.
5280 : */
5281 724 : oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
5282 :
5283 : /*
5284 : * If this is the first streamed segment, create the changes file.
5285 : * Otherwise, just open the file for writing, in append mode.
5286 : */
5287 724 : if (first_segment)
5288 64 : stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
5289 : path);
5290 : else
5291 : {
5292 : /*
5293 : * Open the file and seek to the end of the file because we always
5294 : * append the changes file.
5295 : */
5296 660 : stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset,
5297 : path, O_RDWR, false);
5298 660 : BufFileSeek(stream_fd, 0, 0, SEEK_END);
5299 : }
5300 :
5301 724 : MemoryContextSwitchTo(oldcxt);
5302 724 : }
5303 :
5304 : /*
5305 : * stream_close_file
5306 : * Close the currently open file with streamed changes.
5307 : */
5308 : static void
5309 784 : stream_close_file(void)
5310 : {
5311 : Assert(stream_fd != NULL);
5312 :
5313 784 : BufFileClose(stream_fd);
5314 :
5315 784 : stream_fd = NULL;
5316 784 : }
5317 :
5318 : /*
5319 : * stream_write_change
5320 : * Serialize a change to a file for the current toplevel transaction.
5321 : *
5322 : * The change is serialized in a simple format, with length (not including
5323 : * the length), action code (identifying the message type) and message
5324 : * contents (without the subxact TransactionId value).
5325 : */
5326 : static void
5327 215106 : stream_write_change(char action, StringInfo s)
5328 : {
5329 : int len;
5330 :
5331 : Assert(stream_fd != NULL);
5332 :
5333 : /* total on-disk size, including the action type character */
5334 215106 : len = (s->len - s->cursor) + sizeof(char);
5335 :
5336 : /* first write the size */
5337 215106 : BufFileWrite(stream_fd, &len, sizeof(len));
5338 :
5339 : /* then the action */
5340 215106 : BufFileWrite(stream_fd, &action, sizeof(action));
5341 :
5342 : /* and finally the remaining part of the buffer (after the XID) */
5343 215106 : len = (s->len - s->cursor);
5344 :
5345 215106 : BufFileWrite(stream_fd, &s->data[s->cursor], len);
5346 215106 : }
5347 :
5348 : /*
5349 : * stream_open_and_write_change
5350 : * Serialize a message to a file for the given transaction.
5351 : *
5352 : * This function is similar to stream_write_change except that it will open the
5353 : * target file if not already before writing the message and close the file at
5354 : * the end.
5355 : */
5356 : static void
5357 10 : stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
5358 : {
5359 : Assert(!in_streamed_transaction);
5360 :
5361 10 : if (!stream_fd)
5362 10 : stream_start_internal(xid, false);
5363 :
5364 10 : stream_write_change(action, s);
5365 10 : stream_stop_internal(xid);
5366 10 : }
5367 :
5368 : /*
5369 : * Sets streaming options including replication slot name and origin start
5370 : * position. Workers need these options for logical replication.
5371 : */
5372 : void
5373 816 : set_stream_options(WalRcvStreamOptions *options,
5374 : char *slotname,
5375 : XLogRecPtr *origin_startpos)
5376 : {
5377 : int server_version;
5378 :
5379 816 : options->logical = true;
5380 816 : options->startpoint = *origin_startpos;
5381 816 : options->slotname = slotname;
5382 :
5383 816 : server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
5384 816 : options->proto.logical.proto_version =
5385 816 : server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
5386 : server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
5387 : server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
5388 : LOGICALREP_PROTO_VERSION_NUM;
5389 :
5390 816 : options->proto.logical.publication_names = MySubscription->publications;
5391 816 : options->proto.logical.binary = MySubscription->binary;
5392 :
5393 : /*
5394 : * Assign the appropriate option value for streaming option according to
5395 : * the 'streaming' mode and the publisher's ability to support that mode.
5396 : */
5397 816 : if (server_version >= 160000 &&
5398 816 : MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
5399 : {
5400 748 : options->proto.logical.streaming_str = "parallel";
5401 748 : MyLogicalRepWorker->parallel_apply = true;
5402 : }
5403 68 : else if (server_version >= 140000 &&
5404 68 : MySubscription->stream != LOGICALREP_STREAM_OFF)
5405 : {
5406 52 : options->proto.logical.streaming_str = "on";
5407 52 : MyLogicalRepWorker->parallel_apply = false;
5408 : }
5409 : else
5410 : {
5411 16 : options->proto.logical.streaming_str = NULL;
5412 16 : MyLogicalRepWorker->parallel_apply = false;
5413 : }
5414 :
5415 816 : options->proto.logical.twophase = false;
5416 816 : options->proto.logical.origin = pstrdup(MySubscription->origin);
5417 816 : }
5418 :
5419 : /*
5420 : * Cleanup the memory for subxacts and reset the related variables.
5421 : */
5422 : static inline void
5423 750 : cleanup_subxact_info()
5424 : {
5425 750 : if (subxact_data.subxacts)
5426 174 : pfree(subxact_data.subxacts);
5427 :
5428 750 : subxact_data.subxacts = NULL;
5429 750 : subxact_data.subxact_last = InvalidTransactionId;
5430 750 : subxact_data.nsubxacts = 0;
5431 750 : subxact_data.nsubxacts_max = 0;
5432 750 : }
5433 :
5434 : /*
5435 : * Common function to run the apply loop with error handling. Disable the
5436 : * subscription, if necessary.
5437 : *
5438 : * Note that we don't handle FATAL errors which are probably because
5439 : * of system resource error and are not repeatable.
5440 : */
5441 : void
5442 816 : start_apply(XLogRecPtr origin_startpos)
5443 : {
5444 816 : PG_TRY();
5445 : {
5446 816 : LogicalRepApplyLoop(origin_startpos);
5447 : }
5448 158 : PG_CATCH();
5449 : {
5450 : /*
5451 : * Reset the origin state to prevent the advancement of origin
5452 : * progress if we fail to apply. Otherwise, this will result in
5453 : * transaction loss as that transaction won't be sent again by the
5454 : * server.
5455 : */
5456 158 : replorigin_reset(0, (Datum) 0);
5457 :
5458 158 : if (MySubscription->disableonerr)
5459 6 : DisableSubscriptionAndExit();
5460 : else
5461 : {
5462 : /*
5463 : * Report the worker failed while applying changes. Abort the
5464 : * current transaction so that the stats message is sent in an
5465 : * idle state.
5466 : */
5467 152 : AbortOutOfAnyTransaction();
5468 152 : pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
5469 :
5470 152 : PG_RE_THROW();
5471 : }
5472 : }
5473 0 : PG_END_TRY();
5474 0 : }
5475 :
5476 : /*
5477 : * Runs the leader apply worker.
5478 : *
5479 : * It sets up replication origin, streaming options and then starts streaming.
5480 : */
5481 : static void
5482 546 : run_apply_worker()
5483 : {
5484 : char originname[NAMEDATALEN];
5485 546 : XLogRecPtr origin_startpos = InvalidXLogRecPtr;
5486 546 : char *slotname = NULL;
5487 : WalRcvStreamOptions options;
5488 : RepOriginId originid;
5489 : TimeLineID startpointTLI;
5490 : char *err;
5491 : bool must_use_password;
5492 :
5493 546 : slotname = MySubscription->slotname;
5494 :
5495 : /*
5496 : * This shouldn't happen if the subscription is enabled, but guard against
5497 : * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
5498 : * slot is NULL.)
5499 : */
5500 546 : if (!slotname)
5501 0 : ereport(ERROR,
5502 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5503 : errmsg("subscription has no replication slot set")));
5504 :
5505 : /* Setup replication origin tracking. */
5506 546 : ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
5507 : originname, sizeof(originname));
5508 546 : StartTransactionCommand();
5509 546 : originid = replorigin_by_name(originname, true);
5510 546 : if (!OidIsValid(originid))
5511 0 : originid = replorigin_create(originname);
5512 546 : replorigin_session_setup(originid, 0);
5513 546 : replorigin_session_origin = originid;
5514 546 : origin_startpos = replorigin_session_get_progress(false);
5515 546 : CommitTransactionCommand();
5516 :
5517 : /* Is the use of a password mandatory? */
5518 1052 : must_use_password = MySubscription->passwordrequired &&
5519 506 : !MySubscription->ownersuperuser;
5520 :
5521 546 : LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
5522 : true, must_use_password,
5523 : MySubscription->name, &err);
5524 :
5525 514 : if (LogRepWorkerWalRcvConn == NULL)
5526 64 : ereport(ERROR,
5527 : (errcode(ERRCODE_CONNECTION_FAILURE),
5528 : errmsg("apply worker for subscription \"%s\" could not connect to the publisher: %s",
5529 : MySubscription->name, err)));
5530 :
5531 : /*
5532 : * We don't really use the output identify_system for anything but it does
5533 : * some initializations on the upstream so let's still call it.
5534 : */
5535 450 : (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
5536 :
5537 450 : set_apply_error_context_origin(originname);
5538 :
5539 450 : set_stream_options(&options, slotname, &origin_startpos);
5540 :
5541 : /*
5542 : * Even when the two_phase mode is requested by the user, it remains as
5543 : * the tri-state PENDING until all tablesyncs have reached READY state.
5544 : * Only then, can it become ENABLED.
5545 : *
5546 : * Note: If the subscription has no tables then leave the state as
5547 : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
5548 : * work.
5549 : */
5550 478 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
5551 28 : AllTablesyncsReady())
5552 : {
5553 : /* Start streaming with two_phase enabled */
5554 14 : options.proto.logical.twophase = true;
5555 14 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
5556 :
5557 14 : StartTransactionCommand();
5558 :
5559 : /*
5560 : * Updating pg_subscription might involve TOAST table access, so
5561 : * ensure we have a valid snapshot.
5562 : */
5563 14 : PushActiveSnapshot(GetTransactionSnapshot());
5564 :
5565 14 : UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
5566 14 : MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
5567 14 : PopActiveSnapshot();
5568 14 : CommitTransactionCommand();
5569 : }
5570 : else
5571 : {
5572 436 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
5573 : }
5574 :
5575 450 : ereport(DEBUG1,
5576 : (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
5577 : MySubscription->name,
5578 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
5579 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
5580 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
5581 : "?")));
5582 :
5583 : /* Run the main loop. */
5584 450 : start_apply(origin_startpos);
5585 0 : }
5586 :
5587 : /*
5588 : * Common initialization for leader apply worker, parallel apply worker and
5589 : * tablesync worker.
5590 : *
5591 : * Initialize the database connection, in-memory subscription and necessary
5592 : * config options.
5593 : */
5594 : void
5595 1084 : InitializeLogRepWorker(void)
5596 : {
5597 : MemoryContext oldctx;
5598 :
5599 : /* Run as replica session replication role. */
5600 1084 : SetConfigOption("session_replication_role", "replica",
5601 : PGC_SUSET, PGC_S_OVERRIDE);
5602 :
5603 : /* Connect to our database. */
5604 1084 : BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
5605 1084 : MyLogicalRepWorker->userid,
5606 : 0);
5607 :
5608 : /*
5609 : * Set always-secure search path, so malicious users can't redirect user
5610 : * code (e.g. pg_index.indexprs).
5611 : */
5612 1076 : SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
5613 :
5614 : /* Load the subscription into persistent memory context. */
5615 1076 : ApplyContext = AllocSetContextCreate(TopMemoryContext,
5616 : "ApplyContext",
5617 : ALLOCSET_DEFAULT_SIZES);
5618 1076 : StartTransactionCommand();
5619 1076 : oldctx = MemoryContextSwitchTo(ApplyContext);
5620 :
5621 : /*
5622 : * Lock the subscription to prevent it from being concurrently dropped,
5623 : * then re-verify its existence. After the initialization, the worker will
5624 : * be terminated gracefully if the subscription is dropped.
5625 : */
5626 1076 : LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, 0,
5627 : AccessShareLock);
5628 1074 : MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
5629 1074 : if (!MySubscription)
5630 : {
5631 120 : ereport(LOG,
5632 : (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
5633 : MyLogicalRepWorker->subid)));
5634 :
5635 : /* Ensure we remove no-longer-useful entry for worker's start time */
5636 120 : if (am_leader_apply_worker())
5637 120 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5638 :
5639 120 : proc_exit(0);
5640 : }
5641 :
5642 954 : MySubscriptionValid = true;
5643 954 : MemoryContextSwitchTo(oldctx);
5644 :
5645 954 : if (!MySubscription->enabled)
5646 : {
5647 0 : ereport(LOG,
5648 : (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
5649 : MySubscription->name)));
5650 :
5651 0 : apply_worker_exit();
5652 : }
5653 :
5654 : /*
5655 : * Restart the worker if retain_dead_tuples was enabled during startup.
5656 : *
5657 : * At this point, the replication slot used for conflict detection might
5658 : * not exist yet, or could be dropped soon if the launcher perceives
5659 : * retain_dead_tuples as disabled. To avoid unnecessary tracking of
5660 : * oldest_nonremovable_xid when the slot is absent or at risk of being
5661 : * dropped, a restart is initiated.
5662 : *
5663 : * The oldest_nonremovable_xid should be initialized only when the
5664 : * subscription's retention is active before launching the worker. See
5665 : * logicalrep_worker_launch.
5666 : */
5667 954 : if (am_leader_apply_worker() &&
5668 546 : MySubscription->retaindeadtuples &&
5669 30 : MySubscription->retentionactive &&
5670 30 : !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid))
5671 : {
5672 0 : ereport(LOG,
5673 : errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup",
5674 : MySubscription->name, "retain_dead_tuples"));
5675 :
5676 0 : apply_worker_exit();
5677 : }
5678 :
5679 : /* Setup synchronous commit according to the user's wishes */
5680 954 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
5681 : PGC_BACKEND, PGC_S_OVERRIDE);
5682 :
5683 : /*
5684 : * Keep us informed about subscription or role changes. Note that the
5685 : * role's superuser privilege can be revoked.
5686 : */
5687 954 : CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
5688 : subscription_change_cb,
5689 : (Datum) 0);
5690 :
5691 954 : CacheRegisterSyscacheCallback(AUTHOID,
5692 : subscription_change_cb,
5693 : (Datum) 0);
5694 :
5695 954 : if (am_tablesync_worker())
5696 388 : ereport(LOG,
5697 : (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
5698 : MySubscription->name,
5699 : get_rel_name(MyLogicalRepWorker->relid))));
5700 : else
5701 566 : ereport(LOG,
5702 : (errmsg("logical replication apply worker for subscription \"%s\" has started",
5703 : MySubscription->name)));
5704 :
5705 954 : CommitTransactionCommand();
5706 954 : }
5707 :
5708 : /*
5709 : * Reset the origin state.
5710 : */
5711 : static void
5712 1092 : replorigin_reset(int code, Datum arg)
5713 : {
5714 1092 : replorigin_session_origin = InvalidRepOriginId;
5715 1092 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
5716 1092 : replorigin_session_origin_timestamp = 0;
5717 1092 : }
5718 :
5719 : /* Common function to setup the leader apply or tablesync worker. */
5720 : void
5721 1064 : SetupApplyOrSyncWorker(int worker_slot)
5722 : {
5723 : /* Attach to slot */
5724 1064 : logicalrep_worker_attach(worker_slot);
5725 :
5726 : Assert(am_tablesync_worker() || am_leader_apply_worker());
5727 :
5728 : /* Setup signal handling */
5729 1064 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
5730 1064 : pqsignal(SIGTERM, die);
5731 1064 : BackgroundWorkerUnblockSignals();
5732 :
5733 : /*
5734 : * We don't currently need any ResourceOwner in a walreceiver process, but
5735 : * if we did, we could call CreateAuxProcessResourceOwner here.
5736 : */
5737 :
5738 : /* Initialise stats to a sanish value */
5739 1064 : MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
5740 1064 : MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
5741 :
5742 : /* Load the libpq-specific functions */
5743 1064 : load_file("libpqwalreceiver", false);
5744 :
5745 1064 : InitializeLogRepWorker();
5746 :
5747 : /*
5748 : * Register a callback to reset the origin state before aborting any
5749 : * pending transaction during shutdown (see ShutdownPostgres()). This will
5750 : * avoid origin advancement for an in-complete transaction which could
5751 : * otherwise lead to its loss as such a transaction won't be sent by the
5752 : * server again.
5753 : *
5754 : * Note that even a LOG or DEBUG statement placed after setting the origin
5755 : * state may process a shutdown signal before committing the current apply
5756 : * operation. So, it is important to register such a callback here.
5757 : */
5758 934 : before_shmem_exit(replorigin_reset, (Datum) 0);
5759 :
5760 : /* Connect to the origin and start the replication. */
5761 934 : elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
5762 : MySubscription->conninfo);
5763 :
5764 : /*
5765 : * Setup callback for syscache so that we know when something changes in
5766 : * the subscription relation state.
5767 : */
5768 934 : CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
5769 : invalidate_syncing_table_states,
5770 : (Datum) 0);
5771 934 : }
5772 :
5773 : /* Logical Replication Apply worker entry point */
5774 : void
5775 672 : ApplyWorkerMain(Datum main_arg)
5776 : {
5777 672 : int worker_slot = DatumGetInt32(main_arg);
5778 :
5779 672 : InitializingApplyWorker = true;
5780 :
5781 672 : SetupApplyOrSyncWorker(worker_slot);
5782 :
5783 546 : InitializingApplyWorker = false;
5784 :
5785 546 : run_apply_worker();
5786 :
5787 0 : proc_exit(0);
5788 : }
5789 :
5790 : /*
5791 : * After error recovery, disable the subscription in a new transaction
5792 : * and exit cleanly.
5793 : */
5794 : void
5795 8 : DisableSubscriptionAndExit(void)
5796 : {
5797 : /*
5798 : * Emit the error message, and recover from the error state to an idle
5799 : * state
5800 : */
5801 8 : HOLD_INTERRUPTS();
5802 :
5803 8 : EmitErrorReport();
5804 8 : AbortOutOfAnyTransaction();
5805 8 : FlushErrorState();
5806 :
5807 8 : RESUME_INTERRUPTS();
5808 :
5809 : /* Report the worker failed during either table synchronization or apply */
5810 8 : pgstat_report_subscription_error(MyLogicalRepWorker->subid,
5811 8 : !am_tablesync_worker());
5812 :
5813 : /* Disable the subscription */
5814 8 : StartTransactionCommand();
5815 :
5816 : /*
5817 : * Updating pg_subscription might involve TOAST table access, so ensure we
5818 : * have a valid snapshot.
5819 : */
5820 8 : PushActiveSnapshot(GetTransactionSnapshot());
5821 :
5822 8 : DisableSubscription(MySubscription->oid);
5823 8 : PopActiveSnapshot();
5824 8 : CommitTransactionCommand();
5825 :
5826 : /* Ensure we remove no-longer-useful entry for worker's start time */
5827 8 : if (am_leader_apply_worker())
5828 6 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5829 :
5830 : /* Notify the subscription has been disabled and exit */
5831 8 : ereport(LOG,
5832 : errmsg("subscription \"%s\" has been disabled because of an error",
5833 : MySubscription->name));
5834 :
5835 : /*
5836 : * Skip the track_commit_timestamp check when disabling the worker due to
5837 : * an error, as verifying commit timestamps is unnecessary in this
5838 : * context.
5839 : */
5840 8 : CheckSubDeadTupleRetention(false, true, WARNING,
5841 8 : MySubscription->retaindeadtuples,
5842 8 : MySubscription->retentionactive, false);
5843 :
5844 8 : proc_exit(0);
5845 : }
5846 :
5847 : /*
5848 : * Is current process a logical replication worker?
5849 : */
5850 : bool
5851 4020 : IsLogicalWorker(void)
5852 : {
5853 4020 : return MyLogicalRepWorker != NULL;
5854 : }
5855 :
5856 : /*
5857 : * Is current process a logical replication parallel apply worker?
5858 : */
5859 : bool
5860 2752 : IsLogicalParallelApplyWorker(void)
5861 : {
5862 2752 : return IsLogicalWorker() && am_parallel_apply_worker();
5863 : }
5864 :
5865 : /*
5866 : * Start skipping changes of the transaction if the given LSN matches the
5867 : * LSN specified by subscription's skiplsn.
5868 : */
5869 : static void
5870 1058 : maybe_start_skipping_changes(XLogRecPtr finish_lsn)
5871 : {
5872 : Assert(!is_skipping_changes());
5873 : Assert(!in_remote_transaction);
5874 : Assert(!in_streamed_transaction);
5875 :
5876 : /*
5877 : * Quick return if it's not requested to skip this transaction. This
5878 : * function is called for every remote transaction and we assume that
5879 : * skipping the transaction is not used often.
5880 : */
5881 1058 : if (likely(XLogRecPtrIsInvalid(MySubscription->skiplsn) ||
5882 : MySubscription->skiplsn != finish_lsn))
5883 1052 : return;
5884 :
5885 : /* Start skipping all changes of this transaction */
5886 6 : skip_xact_finish_lsn = finish_lsn;
5887 :
5888 6 : ereport(LOG,
5889 : errmsg("logical replication starts skipping transaction at LSN %X/%08X",
5890 : LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
5891 : }
5892 :
5893 : /*
5894 : * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
5895 : */
5896 : static void
5897 54 : stop_skipping_changes(void)
5898 : {
5899 54 : if (!is_skipping_changes())
5900 48 : return;
5901 :
5902 6 : ereport(LOG,
5903 : errmsg("logical replication completed skipping transaction at LSN %X/%08X",
5904 : LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
5905 :
5906 : /* Stop skipping changes */
5907 6 : skip_xact_finish_lsn = InvalidXLogRecPtr;
5908 : }
5909 :
5910 : /*
5911 : * Clear subskiplsn of pg_subscription catalog.
5912 : *
5913 : * finish_lsn is the transaction's finish LSN that is used to check if the
5914 : * subskiplsn matches it. If not matched, we raise a warning when clearing the
5915 : * subskiplsn in order to inform users for cases e.g., where the user mistakenly
5916 : * specified the wrong subskiplsn.
5917 : */
5918 : static void
5919 1046 : clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
5920 : {
5921 : Relation rel;
5922 : Form_pg_subscription subform;
5923 : HeapTuple tup;
5924 1046 : XLogRecPtr myskiplsn = MySubscription->skiplsn;
5925 1046 : bool started_tx = false;
5926 :
5927 1046 : if (likely(XLogRecPtrIsInvalid(myskiplsn)) || am_parallel_apply_worker())
5928 1040 : return;
5929 :
5930 6 : if (!IsTransactionState())
5931 : {
5932 2 : StartTransactionCommand();
5933 2 : started_tx = true;
5934 : }
5935 :
5936 : /*
5937 : * Updating pg_subscription might involve TOAST table access, so ensure we
5938 : * have a valid snapshot.
5939 : */
5940 6 : PushActiveSnapshot(GetTransactionSnapshot());
5941 :
5942 : /*
5943 : * Protect subskiplsn of pg_subscription from being concurrently updated
5944 : * while clearing it.
5945 : */
5946 6 : LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
5947 : AccessShareLock);
5948 :
5949 6 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
5950 :
5951 : /* Fetch the existing tuple. */
5952 6 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
5953 : ObjectIdGetDatum(MySubscription->oid));
5954 :
5955 6 : if (!HeapTupleIsValid(tup))
5956 0 : elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
5957 :
5958 6 : subform = (Form_pg_subscription) GETSTRUCT(tup);
5959 :
5960 : /*
5961 : * Clear the subskiplsn. If the user has already changed subskiplsn before
5962 : * clearing it we don't update the catalog and the replication origin
5963 : * state won't get advanced. So in the worst case, if the server crashes
5964 : * before sending an acknowledgment of the flush position the transaction
5965 : * will be sent again and the user needs to set subskiplsn again. We can
5966 : * reduce the possibility by logging a replication origin WAL record to
5967 : * advance the origin LSN instead but there is no way to advance the
5968 : * origin timestamp and it doesn't seem to be worth doing anything about
5969 : * it since it's a very rare case.
5970 : */
5971 6 : if (subform->subskiplsn == myskiplsn)
5972 : {
5973 : bool nulls[Natts_pg_subscription];
5974 : bool replaces[Natts_pg_subscription];
5975 : Datum values[Natts_pg_subscription];
5976 :
5977 6 : memset(values, 0, sizeof(values));
5978 6 : memset(nulls, false, sizeof(nulls));
5979 6 : memset(replaces, false, sizeof(replaces));
5980 :
5981 : /* reset subskiplsn */
5982 6 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
5983 6 : replaces[Anum_pg_subscription_subskiplsn - 1] = true;
5984 :
5985 6 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
5986 : replaces);
5987 6 : CatalogTupleUpdate(rel, &tup->t_self, tup);
5988 :
5989 6 : if (myskiplsn != finish_lsn)
5990 0 : ereport(WARNING,
5991 : errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
5992 : errdetail("Remote transaction's finish WAL location (LSN) %X/%08X did not match skip-LSN %X/%08X.",
5993 : LSN_FORMAT_ARGS(finish_lsn),
5994 : LSN_FORMAT_ARGS(myskiplsn)));
5995 : }
5996 :
5997 6 : heap_freetuple(tup);
5998 6 : table_close(rel, NoLock);
5999 :
6000 6 : PopActiveSnapshot();
6001 :
6002 6 : if (started_tx)
6003 2 : CommitTransactionCommand();
6004 : }
6005 :
6006 : /* Error callback to give more context info about the change being applied */
6007 : void
6008 12706 : apply_error_callback(void *arg)
6009 : {
6010 12706 : ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
6011 :
6012 12706 : if (apply_error_callback_arg.command == 0)
6013 11936 : return;
6014 :
6015 : Assert(errarg->origin_name);
6016 :
6017 770 : if (errarg->rel == NULL)
6018 : {
6019 626 : if (!TransactionIdIsValid(errarg->remote_xid))
6020 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
6021 : errarg->origin_name,
6022 : logicalrep_message_type(errarg->command));
6023 626 : else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
6024 514 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
6025 : errarg->origin_name,
6026 : logicalrep_message_type(errarg->command),
6027 : errarg->remote_xid);
6028 : else
6029 224 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%08X",
6030 : errarg->origin_name,
6031 : logicalrep_message_type(errarg->command),
6032 : errarg->remote_xid,
6033 112 : LSN_FORMAT_ARGS(errarg->finish_lsn));
6034 : }
6035 : else
6036 : {
6037 144 : if (errarg->remote_attnum < 0)
6038 : {
6039 144 : if (XLogRecPtrIsInvalid(errarg->finish_lsn))
6040 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
6041 : errarg->origin_name,
6042 : logicalrep_message_type(errarg->command),
6043 0 : errarg->rel->remoterel.nspname,
6044 0 : errarg->rel->remoterel.relname,
6045 : errarg->remote_xid);
6046 : else
6047 288 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%08X",
6048 : errarg->origin_name,
6049 : logicalrep_message_type(errarg->command),
6050 144 : errarg->rel->remoterel.nspname,
6051 144 : errarg->rel->remoterel.relname,
6052 : errarg->remote_xid,
6053 144 : LSN_FORMAT_ARGS(errarg->finish_lsn));
6054 : }
6055 : else
6056 : {
6057 0 : if (XLogRecPtrIsInvalid(errarg->finish_lsn))
6058 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
6059 : errarg->origin_name,
6060 : logicalrep_message_type(errarg->command),
6061 0 : errarg->rel->remoterel.nspname,
6062 0 : errarg->rel->remoterel.relname,
6063 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
6064 : errarg->remote_xid);
6065 : else
6066 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%08X",
6067 : errarg->origin_name,
6068 : logicalrep_message_type(errarg->command),
6069 0 : errarg->rel->remoterel.nspname,
6070 0 : errarg->rel->remoterel.relname,
6071 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
6072 : errarg->remote_xid,
6073 0 : LSN_FORMAT_ARGS(errarg->finish_lsn));
6074 : }
6075 : }
6076 : }
6077 :
6078 : /* Set transaction information of apply error callback */
6079 : static inline void
6080 5772 : set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
6081 : {
6082 5772 : apply_error_callback_arg.remote_xid = xid;
6083 5772 : apply_error_callback_arg.finish_lsn = lsn;
6084 5772 : }
6085 :
6086 : /* Reset all information of apply error callback */
6087 : static inline void
6088 2826 : reset_apply_error_context_info(void)
6089 : {
6090 2826 : apply_error_callback_arg.command = 0;
6091 2826 : apply_error_callback_arg.rel = NULL;
6092 2826 : apply_error_callback_arg.remote_attnum = -1;
6093 2826 : set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
6094 2826 : }
6095 :
6096 : /*
6097 : * Request wakeup of the workers for the given subscription OID
6098 : * at commit of the current transaction.
6099 : *
6100 : * This is used to ensure that the workers process assorted changes
6101 : * as soon as possible.
6102 : */
6103 : void
6104 438 : LogicalRepWorkersWakeupAtCommit(Oid subid)
6105 : {
6106 : MemoryContext oldcxt;
6107 :
6108 438 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
6109 438 : on_commit_wakeup_workers_subids =
6110 438 : list_append_unique_oid(on_commit_wakeup_workers_subids, subid);
6111 438 : MemoryContextSwitchTo(oldcxt);
6112 438 : }
6113 :
6114 : /*
6115 : * Wake up the workers of any subscriptions that were changed in this xact.
6116 : */
6117 : void
6118 1021200 : AtEOXact_LogicalRepWorkers(bool isCommit)
6119 : {
6120 1021200 : if (isCommit && on_commit_wakeup_workers_subids != NIL)
6121 : {
6122 : ListCell *lc;
6123 :
6124 428 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
6125 856 : foreach(lc, on_commit_wakeup_workers_subids)
6126 : {
6127 428 : Oid subid = lfirst_oid(lc);
6128 : List *workers;
6129 : ListCell *lc2;
6130 :
6131 428 : workers = logicalrep_workers_find(subid, true, false);
6132 560 : foreach(lc2, workers)
6133 : {
6134 132 : LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
6135 :
6136 132 : logicalrep_worker_wakeup_ptr(worker);
6137 : }
6138 : }
6139 428 : LWLockRelease(LogicalRepWorkerLock);
6140 : }
6141 :
6142 : /* The List storage will be reclaimed automatically in xact cleanup. */
6143 1021200 : on_commit_wakeup_workers_subids = NIL;
6144 1021200 : }
6145 :
6146 : /*
6147 : * Allocate the origin name in long-lived context for error context message.
6148 : */
6149 : void
6150 836 : set_apply_error_context_origin(char *originname)
6151 : {
6152 836 : apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
6153 : originname);
6154 836 : }
6155 :
6156 : /*
6157 : * Return the action to be taken for the given transaction. See
6158 : * TransApplyAction for information on each of the actions.
6159 : *
6160 : * *winfo is assigned to the destination parallel worker info when the leader
6161 : * apply worker has to pass all the transaction's changes to the parallel
6162 : * apply worker.
6163 : */
6164 : static TransApplyAction
6165 653018 : get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
6166 : {
6167 653018 : *winfo = NULL;
6168 :
6169 653018 : if (am_parallel_apply_worker())
6170 : {
6171 138344 : return TRANS_PARALLEL_APPLY;
6172 : }
6173 :
6174 : /*
6175 : * If we are processing this transaction using a parallel apply worker
6176 : * then either we send the changes to the parallel worker or if the worker
6177 : * is busy then serialize the changes to the file which will later be
6178 : * processed by the parallel worker.
6179 : */
6180 514674 : *winfo = pa_find_worker(xid);
6181 :
6182 514674 : if (*winfo && (*winfo)->serialize_changes)
6183 : {
6184 10074 : return TRANS_LEADER_PARTIAL_SERIALIZE;
6185 : }
6186 504600 : else if (*winfo)
6187 : {
6188 137788 : return TRANS_LEADER_SEND_TO_PARALLEL;
6189 : }
6190 :
6191 : /*
6192 : * If there is no parallel worker involved to process this transaction
6193 : * then we either directly apply the change or serialize it to a file
6194 : * which will later be applied when the transaction finish message is
6195 : * processed.
6196 : */
6197 366812 : else if (in_streamed_transaction)
6198 : {
6199 206392 : return TRANS_LEADER_SERIALIZE;
6200 : }
6201 : else
6202 : {
6203 160420 : return TRANS_LEADER_APPLY;
6204 : }
6205 : }
|