Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * worker.c
3 : * PostgreSQL logical replication worker (apply)
4 : *
5 : * Copyright (c) 2016-2025, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/worker.c
9 : *
10 : * NOTES
11 : * This file contains the worker which applies logical changes as they come
12 : * from remote logical replication stream.
13 : *
14 : * The main worker (apply) is started by logical replication worker
15 : * launcher for every enabled subscription in a database. It uses
16 : * walsender protocol to communicate with publisher.
17 : *
18 : * This module includes server facing code and shares libpqwalreceiver
19 : * module with walreceiver for providing the libpq specific functionality.
20 : *
21 : *
22 : * STREAMED TRANSACTIONS
23 : * ---------------------
24 : * Streamed transactions (large transactions exceeding a memory limit on the
25 : * upstream) are applied using one of two approaches:
26 : *
27 : * 1) Write to temporary files and apply when the final commit arrives
28 : *
29 : * This approach is used when the user has set the subscription's streaming
30 : * option as on.
31 : *
32 : * Unlike the regular (non-streamed) case, handling streamed transactions has
33 : * to handle aborts of both the toplevel transaction and subtransactions. This
34 : * is achieved by tracking offsets for subtransactions, which is then used
35 : * to truncate the file with serialized changes.
36 : *
37 : * The files are placed in tmp file directory by default, and the filenames
38 : * include both the XID of the toplevel transaction and OID of the
39 : * subscription. This is necessary so that different workers processing a
40 : * remote transaction with the same XID doesn't interfere.
41 : *
42 : * We use BufFiles instead of using normal temporary files because (a) the
43 : * BufFile infrastructure supports temporary files that exceed the OS file size
44 : * limit, (b) provides a way for automatic clean up on the error and (c) provides
45 : * a way to survive these files across local transactions and allow to open and
46 : * close at stream start and close. We decided to use FileSet
47 : * infrastructure as without that it deletes the files on the closure of the
48 : * file and if we decide to keep stream files open across the start/stop stream
49 : * then it will consume a lot of memory (more than 8K for each BufFile and
50 : * there could be multiple such BufFiles as the subscriber could receive
51 : * multiple start/stop streams for different transactions before getting the
52 : * commit). Moreover, if we don't use FileSet then we also need to invent
53 : * a new way to pass filenames to BufFile APIs so that we are allowed to open
54 : * the file we desired across multiple stream-open calls for the same
55 : * transaction.
56 : *
57 : * 2) Parallel apply workers.
58 : *
59 : * This approach is used when the user has set the subscription's streaming
60 : * option as parallel. See logical/applyparallelworker.c for information about
61 : * this approach.
62 : *
63 : * TWO_PHASE TRANSACTIONS
64 : * ----------------------
65 : * Two phase transactions are replayed at prepare and then committed or
66 : * rolled back at commit prepared and rollback prepared respectively. It is
67 : * possible to have a prepared transaction that arrives at the apply worker
68 : * when the tablesync is busy doing the initial copy. In this case, the apply
69 : * worker skips all the prepared operations [e.g. inserts] while the tablesync
70 : * is still busy (see the condition of should_apply_changes_for_rel). The
71 : * tablesync worker might not get such a prepared transaction because say it
72 : * was prior to the initial consistent point but might have got some later
73 : * commits. Now, the tablesync worker will exit without doing anything for the
74 : * prepared transaction skipped by the apply worker as the sync location for it
75 : * will be already ahead of the apply worker's current location. This would lead
76 : * to an "empty prepare", because later when the apply worker does the commit
77 : * prepare, there is nothing in it (the inserts were skipped earlier).
78 : *
79 : * To avoid this, and similar prepare confusions the subscription's two_phase
80 : * commit is enabled only after the initial sync is over. The two_phase option
81 : * has been implemented as a tri-state with values DISABLED, PENDING, and
82 : * ENABLED.
83 : *
84 : * Even if the user specifies they want a subscription with two_phase = on,
85 : * internally it will start with a tri-state of PENDING which only becomes
86 : * ENABLED after all tablesync initializations are completed - i.e. when all
87 : * tablesync workers have reached their READY state. In other words, the value
88 : * PENDING is only a temporary state for subscription start-up.
89 : *
90 : * Until the two_phase is properly available (ENABLED) the subscription will
91 : * behave as if two_phase = off. When the apply worker detects that all
92 : * tablesyncs have become READY (while the tri-state was PENDING) it will
93 : * restart the apply worker process. This happens in
94 : * process_syncing_tables_for_apply.
95 : *
96 : * When the (re-started) apply worker finds that all tablesyncs are READY for a
97 : * two_phase tri-state of PENDING it start streaming messages with the
98 : * two_phase option which in turn enables the decoding of two-phase commits at
99 : * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
100 : * Now, it is possible that during the time we have not enabled two_phase, the
101 : * publisher (replication server) would have skipped some prepares but we
102 : * ensure that such prepares are sent along with commit prepare, see
103 : * ReorderBufferFinishPrepared.
104 : *
105 : * If the subscription has no tables then a two_phase tri-state PENDING is
106 : * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
107 : * PUBLICATION which might otherwise be disallowed (see below).
108 : *
109 : * If ever a user needs to be aware of the tri-state value, they can fetch it
110 : * from the pg_subscription catalog (see column subtwophasestate).
111 : *
112 : * Finally, to avoid problems mentioned in previous paragraphs from any
113 : * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
114 : * to 'off' and then again back to 'on') there is a restriction for
115 : * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
116 : * the two_phase tri-state is ENABLED, except when copy_data = false.
117 : *
118 : * We can get prepare of the same GID more than once for the genuine cases
119 : * where we have defined multiple subscriptions for publications on the same
120 : * server and prepared transaction has operations on tables subscribed to those
121 : * subscriptions. For such cases, if we use the GID sent by publisher one of
122 : * the prepares will be successful and others will fail, in which case the
123 : * server will send them again. Now, this can lead to a deadlock if user has
124 : * set synchronous_standby_names for all the subscriptions on subscriber. To
125 : * avoid such deadlocks, we generate a unique GID (consisting of the
126 : * subscription oid and the xid of the prepared transaction) for each prepare
127 : * transaction on the subscriber.
128 : *
129 : * FAILOVER
130 : * ----------------------
131 : * The logical slot on the primary can be synced to the standby by specifying
132 : * failover = true when creating the subscription. Enabling failover allows us
133 : * to smoothly transition to the promoted standby, ensuring that we can
134 : * subscribe to the new primary without losing any data.
135 : *
136 : * RETAIN DEAD TUPLES
137 : * ----------------------
138 : * Each apply worker that enabled retain_dead_tuples option maintains a
139 : * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to
140 : * prevent dead rows from being removed prematurely when the apply worker still
141 : * needs them to detect update_deleted conflicts. Additionally, this helps to
142 : * retain the required commit_ts module information, which further helps to
143 : * detect update_origin_differs and delete_origin_differs conflicts reliably, as
144 : * otherwise, vacuum freeze could remove the required information.
145 : *
146 : * The logical replication launcher manages an internal replication slot named
147 : * "pg_conflict_detection". It asynchronously aggregates the non-removable
148 : * transaction ID from all apply workers to determine the appropriate xmin for
149 : * the slot, thereby retaining necessary tuples.
150 : *
151 : * The non-removable transaction ID in the apply worker is advanced to the
152 : * oldest running transaction ID once all concurrent transactions on the
153 : * publisher have been applied and flushed locally. The process involves:
154 : *
155 : * - RDT_GET_CANDIDATE_XID:
156 : * Call GetOldestActiveTransactionId() to take oldestRunningXid as the
157 : * candidate xid.
158 : *
159 : * - RDT_REQUEST_PUBLISHER_STATUS:
160 : * Send a message to the walsender requesting the publisher status, which
161 : * includes the latest WAL write position and information about transactions
162 : * that are in the commit phase.
163 : *
164 : * - RDT_WAIT_FOR_PUBLISHER_STATUS:
165 : * Wait for the status from the walsender. After receiving the first status,
166 : * do not proceed if there are concurrent remote transactions that are still
167 : * in the commit phase. These transactions might have been assigned an
168 : * earlier commit timestamp but have not yet written the commit WAL record.
169 : * Continue to request the publisher status (RDT_REQUEST_PUBLISHER_STATUS)
170 : * until all these transactions have completed.
171 : *
172 : * - RDT_WAIT_FOR_LOCAL_FLUSH:
173 : * Advance the non-removable transaction ID if the current flush location has
174 : * reached or surpassed the last received WAL position.
175 : *
176 : * - RDT_STOP_CONFLICT_INFO_RETENTION:
177 : * This phase is required only when max_retention_duration is defined. We
178 : * enter this phase if the wait time in either the
179 : * RDT_WAIT_FOR_PUBLISHER_STATUS or RDT_WAIT_FOR_LOCAL_FLUSH phase exceeds
180 : * configured max_retention_duration. In this phase,
181 : * pg_subscription.subretentionactive is updated to false within a new
182 : * transaction, and oldest_nonremovable_xid is set to InvalidTransactionId.
183 : *
184 : * - RDT_RESUME_CONFLICT_INFO_RETENTION:
185 : * This phase is required only when max_retention_duration is defined. We
186 : * enter this phase if the retention was previously stopped, and the time
187 : * required to advance the non-removable transaction ID in the
188 : * RDT_WAIT_FOR_LOCAL_FLUSH phase has decreased to within acceptable limits
189 : * (or if max_retention_duration is set to 0). During this phase,
190 : * pg_subscription.subretentionactive is updated to true within a new
191 : * transaction, and the worker will be restarted.
192 : *
193 : * The overall state progression is: GET_CANDIDATE_XID ->
194 : * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
195 : * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
196 : * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID.
197 : *
198 : * Retaining the dead tuples for this period is sufficient for ensuring
199 : * eventual consistency using last-update-wins strategy, as dead tuples are
200 : * useful for detecting conflicts only during the application of concurrent
201 : * transactions from remote nodes. After applying and flushing all remote
202 : * transactions that occurred concurrently with the tuple DELETE, any
203 : * subsequent UPDATE from a remote node should have a later timestamp. In such
204 : * cases, it is acceptable to detect an update_missing scenario and convert the
205 : * UPDATE to an INSERT when applying it. But, for concurrent remote
206 : * transactions with earlier timestamps than the DELETE, detecting
207 : * update_deleted is necessary, as the UPDATEs in remote transactions should be
208 : * ignored if their timestamp is earlier than that of the dead tuples.
209 : *
210 : * Note that advancing the non-removable transaction ID is not supported if the
211 : * publisher is also a physical standby. This is because the logical walsender
212 : * on the standby can only get the WAL replay position but there may be more
213 : * WALs that are being replicated from the primary and those WALs could have
214 : * earlier commit timestamp.
215 : *
216 : * Similarly, when the publisher has subscribed to another publisher,
217 : * information necessary for conflict detection cannot be retained for
218 : * changes from origins other than the publisher. This is because publisher
219 : * lacks the information on concurrent transactions of other publishers to
220 : * which it subscribes. As the information on concurrent transactions is
221 : * unavailable beyond subscriber's immediate publishers, the non-removable
222 : * transaction ID might be advanced prematurely before changes from other
223 : * origins have been fully applied.
224 : *
225 : * XXX Retaining information for changes from other origins might be possible
226 : * by requesting the subscription on that origin to enable retain_dead_tuples
227 : * and fetching the conflict detection slot.xmin along with the publisher's
228 : * status. In the RDT_WAIT_FOR_PUBLISHER_STATUS phase, the apply worker could
229 : * wait for the remote slot's xmin to reach the oldest active transaction ID,
230 : * ensuring that all transactions from other origins have been applied on the
231 : * publisher, thereby getting the latest WAL position that includes all
232 : * concurrent changes. However, this approach may impact performance, so it
233 : * might not worth the effort.
234 : *
235 : * XXX It seems feasible to get the latest commit's WAL location from the
236 : * publisher and wait till that is applied. However, we can't do that
237 : * because commit timestamps can regress as a commit with a later LSN is not
238 : * guaranteed to have a later timestamp than those with earlier LSNs. Having
239 : * said that, even if that is possible, it won't improve performance much as
240 : * the apply always lag and moves slowly as compared with the transactions
241 : * on the publisher.
242 : *-------------------------------------------------------------------------
243 : */
244 :
245 : #include "postgres.h"
246 :
247 : #include <sys/stat.h>
248 : #include <unistd.h>
249 :
250 : #include "access/commit_ts.h"
251 : #include "access/table.h"
252 : #include "access/tableam.h"
253 : #include "access/twophase.h"
254 : #include "access/xact.h"
255 : #include "catalog/indexing.h"
256 : #include "catalog/pg_inherits.h"
257 : #include "catalog/pg_subscription.h"
258 : #include "catalog/pg_subscription_rel.h"
259 : #include "commands/subscriptioncmds.h"
260 : #include "commands/tablecmds.h"
261 : #include "commands/trigger.h"
262 : #include "executor/executor.h"
263 : #include "executor/execPartition.h"
264 : #include "libpq/pqformat.h"
265 : #include "miscadmin.h"
266 : #include "optimizer/optimizer.h"
267 : #include "parser/parse_relation.h"
268 : #include "pgstat.h"
269 : #include "postmaster/bgworker.h"
270 : #include "postmaster/interrupt.h"
271 : #include "postmaster/walwriter.h"
272 : #include "replication/conflict.h"
273 : #include "replication/logicallauncher.h"
274 : #include "replication/logicalproto.h"
275 : #include "replication/logicalrelation.h"
276 : #include "replication/logicalworker.h"
277 : #include "replication/origin.h"
278 : #include "replication/slot.h"
279 : #include "replication/walreceiver.h"
280 : #include "replication/worker_internal.h"
281 : #include "rewrite/rewriteHandler.h"
282 : #include "storage/buffile.h"
283 : #include "storage/ipc.h"
284 : #include "storage/lmgr.h"
285 : #include "storage/procarray.h"
286 : #include "tcop/tcopprot.h"
287 : #include "utils/acl.h"
288 : #include "utils/guc.h"
289 : #include "utils/inval.h"
290 : #include "utils/lsyscache.h"
291 : #include "utils/memutils.h"
292 : #include "utils/pg_lsn.h"
293 : #include "utils/rel.h"
294 : #include "utils/rls.h"
295 : #include "utils/snapmgr.h"
296 : #include "utils/syscache.h"
297 : #include "utils/usercontext.h"
298 :
299 : #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
300 :
301 : typedef struct FlushPosition
302 : {
303 : dlist_node node;
304 : XLogRecPtr local_end;
305 : XLogRecPtr remote_end;
306 : } FlushPosition;
307 :
308 : static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
309 :
310 : typedef struct ApplyExecutionData
311 : {
312 : EState *estate; /* executor state, used to track resources */
313 :
314 : LogicalRepRelMapEntry *targetRel; /* replication target rel */
315 : ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
316 :
317 : /* These fields are used when the target relation is partitioned: */
318 : ModifyTableState *mtstate; /* dummy ModifyTable state */
319 : PartitionTupleRouting *proute; /* partition routing info */
320 : } ApplyExecutionData;
321 :
322 : /* Struct for saving and restoring apply errcontext information */
323 : typedef struct ApplyErrorCallbackArg
324 : {
325 : LogicalRepMsgType command; /* 0 if invalid */
326 : LogicalRepRelMapEntry *rel;
327 :
328 : /* Remote node information */
329 : int remote_attnum; /* -1 if invalid */
330 : TransactionId remote_xid;
331 : XLogRecPtr finish_lsn;
332 : char *origin_name;
333 : } ApplyErrorCallbackArg;
334 :
335 : /*
336 : * The action to be taken for the changes in the transaction.
337 : *
338 : * TRANS_LEADER_APPLY:
339 : * This action means that we are in the leader apply worker or table sync
340 : * worker. The changes of the transaction are either directly applied or
341 : * are read from temporary files (for streaming transactions) and then
342 : * applied by the worker.
343 : *
344 : * TRANS_LEADER_SERIALIZE:
345 : * This action means that we are in the leader apply worker or table sync
346 : * worker. Changes are written to temporary files and then applied when the
347 : * final commit arrives.
348 : *
349 : * TRANS_LEADER_SEND_TO_PARALLEL:
350 : * This action means that we are in the leader apply worker and need to send
351 : * the changes to the parallel apply worker.
352 : *
353 : * TRANS_LEADER_PARTIAL_SERIALIZE:
354 : * This action means that we are in the leader apply worker and have sent some
355 : * changes directly to the parallel apply worker and the remaining changes are
356 : * serialized to a file, due to timeout while sending data. The parallel apply
357 : * worker will apply these serialized changes when the final commit arrives.
358 : *
359 : * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
360 : * serializing changes, the leader worker also needs to serialize the
361 : * STREAM_XXX message to a file, and wait for the parallel apply worker to
362 : * finish the transaction when processing the transaction finish command. So
363 : * this new action was introduced to keep the code and logic clear.
364 : *
365 : * TRANS_PARALLEL_APPLY:
366 : * This action means that we are in the parallel apply worker and changes of
367 : * the transaction are applied directly by the worker.
368 : */
369 : typedef enum
370 : {
371 : /* The action for non-streaming transactions. */
372 : TRANS_LEADER_APPLY,
373 :
374 : /* Actions for streaming transactions. */
375 : TRANS_LEADER_SERIALIZE,
376 : TRANS_LEADER_SEND_TO_PARALLEL,
377 : TRANS_LEADER_PARTIAL_SERIALIZE,
378 : TRANS_PARALLEL_APPLY,
379 : } TransApplyAction;
380 :
381 : /*
382 : * The phases involved in advancing the non-removable transaction ID.
383 : *
384 : * See comments atop worker.c for details of the transition between these
385 : * phases.
386 : */
387 : typedef enum
388 : {
389 : RDT_GET_CANDIDATE_XID,
390 : RDT_REQUEST_PUBLISHER_STATUS,
391 : RDT_WAIT_FOR_PUBLISHER_STATUS,
392 : RDT_WAIT_FOR_LOCAL_FLUSH,
393 : RDT_STOP_CONFLICT_INFO_RETENTION,
394 : RDT_RESUME_CONFLICT_INFO_RETENTION,
395 : } RetainDeadTuplesPhase;
396 :
397 : /*
398 : * Critical information for managing phase transitions within the
399 : * RetainDeadTuplesPhase.
400 : */
401 : typedef struct RetainDeadTuplesData
402 : {
403 : RetainDeadTuplesPhase phase; /* current phase */
404 : XLogRecPtr remote_lsn; /* WAL write position on the publisher */
405 :
406 : /*
407 : * Oldest transaction ID that was in the commit phase on the publisher.
408 : * Use FullTransactionId to prevent issues with transaction ID wraparound,
409 : * where a new remote_oldestxid could falsely appear to originate from the
410 : * past and block advancement.
411 : */
412 : FullTransactionId remote_oldestxid;
413 :
414 : /*
415 : * Next transaction ID to be assigned on the publisher. Use
416 : * FullTransactionId for consistency and to allow straightforward
417 : * comparisons with remote_oldestxid.
418 : */
419 : FullTransactionId remote_nextxid;
420 :
421 : TimestampTz reply_time; /* when the publisher responds with status */
422 :
423 : /*
424 : * Publisher transaction ID that must be awaited to complete before
425 : * entering the final phase (RDT_WAIT_FOR_LOCAL_FLUSH). Use
426 : * FullTransactionId for the same reason as remote_nextxid.
427 : */
428 : FullTransactionId remote_wait_for;
429 :
430 : TransactionId candidate_xid; /* candidate for the non-removable
431 : * transaction ID */
432 : TimestampTz flushpos_update_time; /* when the remote flush position was
433 : * updated in final phase
434 : * (RDT_WAIT_FOR_LOCAL_FLUSH) */
435 :
436 : long table_sync_wait_time; /* time spent waiting for table sync
437 : * to finish */
438 :
439 : /*
440 : * The following fields are used to determine the timing for the next
441 : * round of transaction ID advancement.
442 : */
443 : TimestampTz last_recv_time; /* when the last message was received */
444 : TimestampTz candidate_xid_time; /* when the candidate_xid is decided */
445 : int xid_advance_interval; /* how much time (ms) to wait before
446 : * attempting to advance the
447 : * non-removable transaction ID */
448 : } RetainDeadTuplesData;
449 :
450 : /*
451 : * The minimum (100ms) and maximum (3 minutes) intervals for advancing
452 : * non-removable transaction IDs. The maximum interval is a bit arbitrary but
453 : * is sufficient to not cause any undue network traffic.
454 : */
455 : #define MIN_XID_ADVANCE_INTERVAL 100
456 : #define MAX_XID_ADVANCE_INTERVAL 180000
457 :
458 : /* errcontext tracker */
459 : static ApplyErrorCallbackArg apply_error_callback_arg =
460 : {
461 : .command = 0,
462 : .rel = NULL,
463 : .remote_attnum = -1,
464 : .remote_xid = InvalidTransactionId,
465 : .finish_lsn = InvalidXLogRecPtr,
466 : .origin_name = NULL,
467 : };
468 :
469 : ErrorContextCallback *apply_error_context_stack = NULL;
470 :
471 : MemoryContext ApplyMessageContext = NULL;
472 : MemoryContext ApplyContext = NULL;
473 :
474 : /* per stream context for streaming transactions */
475 : static MemoryContext LogicalStreamingContext = NULL;
476 :
477 : WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
478 :
479 : Subscription *MySubscription = NULL;
480 : static bool MySubscriptionValid = false;
481 :
482 : static List *on_commit_wakeup_workers_subids = NIL;
483 :
484 : bool in_remote_transaction = false;
485 : static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
486 :
487 : /* fields valid only when processing streamed transaction */
488 : static bool in_streamed_transaction = false;
489 :
490 : static TransactionId stream_xid = InvalidTransactionId;
491 :
492 : /*
493 : * The number of changes applied by parallel apply worker during one streaming
494 : * block.
495 : */
496 : static uint32 parallel_stream_nchanges = 0;
497 :
498 : /* Are we initializing an apply worker? */
499 : bool InitializingApplyWorker = false;
500 :
501 : /*
502 : * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
503 : * the subscription if the remote transaction's finish LSN matches the subskiplsn.
504 : * Once we start skipping changes, we don't stop it until we skip all changes of
505 : * the transaction even if pg_subscription is updated and MySubscription->skiplsn
506 : * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
507 : * we don't skip receiving and spooling the changes since we decide whether or not
508 : * to skip applying the changes when starting to apply changes. The subskiplsn is
509 : * cleared after successfully skipping the transaction or applying non-empty
510 : * transaction. The latter prevents the mistakenly specified subskiplsn from
511 : * being left. Note that we cannot skip the streaming transactions when using
512 : * parallel apply workers because we cannot get the finish LSN before applying
513 : * the changes. So, we don't start parallel apply worker when finish LSN is set
514 : * by the user.
515 : */
516 : static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
517 : #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
518 :
519 : /* BufFile handle of the current streaming file */
520 : static BufFile *stream_fd = NULL;
521 :
522 : /*
523 : * The remote WAL position that has been applied and flushed locally. We record
524 : * and use this information both while sending feedback to the server and
525 : * advancing oldest_nonremovable_xid.
526 : */
527 : static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
528 :
529 : typedef struct SubXactInfo
530 : {
531 : TransactionId xid; /* XID of the subxact */
532 : int fileno; /* file number in the buffile */
533 : off_t offset; /* offset in the file */
534 : } SubXactInfo;
535 :
536 : /* Sub-transaction data for the current streaming transaction */
537 : typedef struct ApplySubXactData
538 : {
539 : uint32 nsubxacts; /* number of sub-transactions */
540 : uint32 nsubxacts_max; /* current capacity of subxacts */
541 : TransactionId subxact_last; /* xid of the last sub-transaction */
542 : SubXactInfo *subxacts; /* sub-xact offset in changes file */
543 : } ApplySubXactData;
544 :
545 : static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
546 :
547 : static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
548 : static inline void changes_filename(char *path, Oid subid, TransactionId xid);
549 :
550 : /*
551 : * Information about subtransactions of a given toplevel transaction.
552 : */
553 : static void subxact_info_write(Oid subid, TransactionId xid);
554 : static void subxact_info_read(Oid subid, TransactionId xid);
555 : static void subxact_info_add(TransactionId xid);
556 : static inline void cleanup_subxact_info(void);
557 :
558 : /*
559 : * Serialize and deserialize changes for a toplevel transaction.
560 : */
561 : static void stream_open_file(Oid subid, TransactionId xid,
562 : bool first_segment);
563 : static void stream_write_change(char action, StringInfo s);
564 : static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
565 : static void stream_close_file(void);
566 :
567 : static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
568 :
569 : static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
570 : bool status_received);
571 : static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data);
572 : static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
573 : bool status_received);
574 : static void get_candidate_xid(RetainDeadTuplesData *rdt_data);
575 : static void request_publisher_status(RetainDeadTuplesData *rdt_data);
576 : static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
577 : bool status_received);
578 : static void wait_for_local_flush(RetainDeadTuplesData *rdt_data);
579 : static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
580 : static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
581 : static void resume_conflict_info_retention(RetainDeadTuplesData *rdt_data);
582 : static bool update_retention_status(bool active);
583 : static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data);
584 : static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data,
585 : bool new_xid_found);
586 :
587 : static void apply_worker_exit(void);
588 :
589 : static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
590 : static void apply_handle_insert_internal(ApplyExecutionData *edata,
591 : ResultRelInfo *relinfo,
592 : TupleTableSlot *remoteslot);
593 : static void apply_handle_update_internal(ApplyExecutionData *edata,
594 : ResultRelInfo *relinfo,
595 : TupleTableSlot *remoteslot,
596 : LogicalRepTupleData *newtup,
597 : Oid localindexoid);
598 : static void apply_handle_delete_internal(ApplyExecutionData *edata,
599 : ResultRelInfo *relinfo,
600 : TupleTableSlot *remoteslot,
601 : Oid localindexoid);
602 : static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
603 : LogicalRepRelation *remoterel,
604 : Oid localidxoid,
605 : TupleTableSlot *remoteslot,
606 : TupleTableSlot **localslot);
607 : static bool FindDeletedTupleInLocalRel(Relation localrel,
608 : Oid localidxoid,
609 : TupleTableSlot *remoteslot,
610 : TransactionId *delete_xid,
611 : RepOriginId *delete_origin,
612 : TimestampTz *delete_time);
613 : static void apply_handle_tuple_routing(ApplyExecutionData *edata,
614 : TupleTableSlot *remoteslot,
615 : LogicalRepTupleData *newtup,
616 : CmdType operation);
617 :
618 : /* Functions for skipping changes */
619 : static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
620 : static void stop_skipping_changes(void);
621 : static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
622 :
623 : /* Functions for apply error callback */
624 : static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
625 : static inline void reset_apply_error_context_info(void);
626 :
627 : static TransApplyAction get_transaction_apply_action(TransactionId xid,
628 : ParallelApplyWorkerInfo **winfo);
629 :
630 : static void replorigin_reset(int code, Datum arg);
631 :
632 : /*
633 : * Form the origin name for the subscription.
634 : *
635 : * This is a common function for tablesync and other workers. Tablesync workers
636 : * must pass a valid relid. Other callers must pass relid = InvalidOid.
637 : *
638 : * Return the name in the supplied buffer.
639 : */
640 : void
641 2600 : ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
642 : char *originname, Size szoriginname)
643 : {
644 2600 : if (OidIsValid(relid))
645 : {
646 : /* Replication origin name for tablesync workers. */
647 1492 : snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
648 : }
649 : else
650 : {
651 : /* Replication origin name for non-tablesync workers. */
652 1108 : snprintf(originname, szoriginname, "pg_%u", suboid);
653 : }
654 2600 : }
655 :
656 : /*
657 : * Should this worker apply changes for given relation.
658 : *
659 : * This is mainly needed for initial relation data sync as that runs in
660 : * separate worker process running in parallel and we need some way to skip
661 : * changes coming to the leader apply worker during the sync of a table.
662 : *
663 : * Note we need to do smaller or equals comparison for SYNCDONE state because
664 : * it might hold position of end of initial slot consistent point WAL
665 : * record + 1 (ie start of next record) and next record can be COMMIT of
666 : * transaction we are now processing (which is what we set remote_final_lsn
667 : * to in apply_handle_begin).
668 : *
669 : * Note that for streaming transactions that are being applied in the parallel
670 : * apply worker, we disallow applying changes if the target table in the
671 : * subscription is not in the READY state, because we cannot decide whether to
672 : * apply the change as we won't know remote_final_lsn by that time.
673 : *
674 : * We already checked this in pa_can_start() before assigning the
675 : * streaming transaction to the parallel worker, but it also needs to be
676 : * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
677 : * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
678 : * while applying this transaction.
679 : */
680 : static bool
681 296264 : should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
682 : {
683 296264 : switch (MyLogicalRepWorker->type)
684 : {
685 20 : case WORKERTYPE_TABLESYNC:
686 20 : return MyLogicalRepWorker->relid == rel->localreloid;
687 :
688 136730 : case WORKERTYPE_PARALLEL_APPLY:
689 : /* We don't synchronize rel's that are in unknown state. */
690 136730 : if (rel->state != SUBREL_STATE_READY &&
691 0 : rel->state != SUBREL_STATE_UNKNOWN)
692 0 : ereport(ERROR,
693 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
694 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
695 : MySubscription->name),
696 : errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
697 :
698 136730 : return rel->state == SUBREL_STATE_READY;
699 :
700 159514 : case WORKERTYPE_APPLY:
701 159630 : return (rel->state == SUBREL_STATE_READY ||
702 116 : (rel->state == SUBREL_STATE_SYNCDONE &&
703 16 : rel->statelsn <= remote_final_lsn));
704 :
705 0 : case WORKERTYPE_UNKNOWN:
706 : /* Should never happen. */
707 0 : elog(ERROR, "Unknown worker type");
708 : }
709 :
710 0 : return false; /* dummy for compiler */
711 : }
712 :
713 : /*
714 : * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
715 : *
716 : * Start a transaction, if this is the first step (else we keep using the
717 : * existing transaction).
718 : * Also provide a global snapshot and ensure we run in ApplyMessageContext.
719 : */
720 : static void
721 297176 : begin_replication_step(void)
722 : {
723 297176 : SetCurrentStatementStartTimestamp();
724 :
725 297176 : if (!IsTransactionState())
726 : {
727 1910 : StartTransactionCommand();
728 1910 : maybe_reread_subscription();
729 : }
730 :
731 297170 : PushActiveSnapshot(GetTransactionSnapshot());
732 :
733 297170 : MemoryContextSwitchTo(ApplyMessageContext);
734 297170 : }
735 :
736 : /*
737 : * Finish up one step of a replication transaction.
738 : * Callers of begin_replication_step() must also call this.
739 : *
740 : * We don't close out the transaction here, but we should increment
741 : * the command counter to make the effects of this step visible.
742 : */
743 : static void
744 297080 : end_replication_step(void)
745 : {
746 297080 : PopActiveSnapshot();
747 :
748 297080 : CommandCounterIncrement();
749 297080 : }
750 :
751 : /*
752 : * Handle streamed transactions for both the leader apply worker and the
753 : * parallel apply workers.
754 : *
755 : * In the streaming case (receiving a block of the streamed transaction), for
756 : * serialize mode, simply redirect it to a file for the proper toplevel
757 : * transaction, and for parallel mode, the leader apply worker will send the
758 : * changes to parallel apply workers and the parallel apply worker will define
759 : * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
760 : * messages will be applied by both leader apply worker and parallel apply
761 : * workers).
762 : *
763 : * Returns true for streamed transactions (when the change is either serialized
764 : * to file or sent to parallel apply worker), false otherwise (regular mode or
765 : * needs to be processed by parallel apply worker).
766 : *
767 : * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
768 : * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
769 : * to a parallel apply worker.
770 : */
771 : static bool
772 648914 : handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
773 : {
774 : TransactionId current_xid;
775 : ParallelApplyWorkerInfo *winfo;
776 : TransApplyAction apply_action;
777 : StringInfoData original_msg;
778 :
779 648914 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
780 :
781 : /* not in streaming mode */
782 648914 : if (apply_action == TRANS_LEADER_APPLY)
783 160316 : return false;
784 :
785 : Assert(TransactionIdIsValid(stream_xid));
786 :
787 : /*
788 : * The parallel apply worker needs the xid in this message to decide
789 : * whether to define a savepoint, so save the original message that has
790 : * not moved the cursor after the xid. We will serialize this message to a
791 : * file in PARTIAL_SERIALIZE mode.
792 : */
793 488598 : original_msg = *s;
794 :
795 : /*
796 : * We should have received XID of the subxact as the first part of the
797 : * message, so extract it.
798 : */
799 488598 : current_xid = pq_getmsgint(s, 4);
800 :
801 488598 : if (!TransactionIdIsValid(current_xid))
802 0 : ereport(ERROR,
803 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
804 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
805 :
806 488598 : switch (apply_action)
807 : {
808 205028 : case TRANS_LEADER_SERIALIZE:
809 : Assert(stream_fd);
810 :
811 : /* Add the new subxact to the array (unless already there). */
812 205028 : subxact_info_add(current_xid);
813 :
814 : /* Write the change to the current file */
815 205028 : stream_write_change(action, s);
816 205028 : return true;
817 :
818 136772 : case TRANS_LEADER_SEND_TO_PARALLEL:
819 : Assert(winfo);
820 :
821 : /*
822 : * XXX The publisher side doesn't always send relation/type update
823 : * messages after the streaming transaction, so also update the
824 : * relation/type in leader apply worker. See function
825 : * cleanup_rel_sync_cache.
826 : */
827 136772 : if (pa_send_data(winfo, s->len, s->data))
828 136772 : return (action != LOGICAL_REP_MSG_RELATION &&
829 : action != LOGICAL_REP_MSG_TYPE);
830 :
831 : /*
832 : * Switch to serialize mode when we are not able to send the
833 : * change to parallel apply worker.
834 : */
835 0 : pa_switch_to_partial_serialize(winfo, false);
836 :
837 : /* fall through */
838 10012 : case TRANS_LEADER_PARTIAL_SERIALIZE:
839 10012 : stream_write_change(action, &original_msg);
840 :
841 : /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
842 10012 : return (action != LOGICAL_REP_MSG_RELATION &&
843 : action != LOGICAL_REP_MSG_TYPE);
844 :
845 136786 : case TRANS_PARALLEL_APPLY:
846 136786 : parallel_stream_nchanges += 1;
847 :
848 : /* Define a savepoint for a subxact if needed. */
849 136786 : pa_start_subtrans(current_xid, stream_xid);
850 136786 : return false;
851 :
852 0 : default:
853 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
854 : return false; /* silence compiler warning */
855 : }
856 : }
857 :
858 : /*
859 : * Executor state preparation for evaluation of constraint expressions,
860 : * indexes and triggers for the specified relation.
861 : *
862 : * Note that the caller must open and close any indexes to be updated.
863 : */
864 : static ApplyExecutionData *
865 296104 : create_edata_for_relation(LogicalRepRelMapEntry *rel)
866 : {
867 : ApplyExecutionData *edata;
868 : EState *estate;
869 : RangeTblEntry *rte;
870 296104 : List *perminfos = NIL;
871 : ResultRelInfo *resultRelInfo;
872 :
873 296104 : edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
874 296104 : edata->targetRel = rel;
875 :
876 296104 : edata->estate = estate = CreateExecutorState();
877 :
878 296104 : rte = makeNode(RangeTblEntry);
879 296104 : rte->rtekind = RTE_RELATION;
880 296104 : rte->relid = RelationGetRelid(rel->localrel);
881 296104 : rte->relkind = rel->localrel->rd_rel->relkind;
882 296104 : rte->rellockmode = AccessShareLock;
883 :
884 296104 : addRTEPermissionInfo(&perminfos, rte);
885 :
886 296104 : ExecInitRangeTable(estate, list_make1(rte), perminfos,
887 : bms_make_singleton(1));
888 :
889 296104 : edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
890 :
891 : /*
892 : * Use Relation opened by logicalrep_rel_open() instead of opening it
893 : * again.
894 : */
895 296104 : InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
896 :
897 : /*
898 : * We put the ResultRelInfo in the es_opened_result_relations list, even
899 : * though we don't populate the es_result_relations array. That's a bit
900 : * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
901 : *
902 : * ExecOpenIndices() is not called here either, each execution path doing
903 : * an apply operation being responsible for that.
904 : */
905 296104 : estate->es_opened_result_relations =
906 296104 : lappend(estate->es_opened_result_relations, resultRelInfo);
907 :
908 296104 : estate->es_output_cid = GetCurrentCommandId(true);
909 :
910 : /* Prepare to catch AFTER triggers. */
911 296104 : AfterTriggerBeginQuery();
912 :
913 : /* other fields of edata remain NULL for now */
914 :
915 296104 : return edata;
916 : }
917 :
918 : /*
919 : * Finish any operations related to the executor state created by
920 : * create_edata_for_relation().
921 : */
922 : static void
923 296032 : finish_edata(ApplyExecutionData *edata)
924 : {
925 296032 : EState *estate = edata->estate;
926 :
927 : /* Handle any queued AFTER triggers. */
928 296032 : AfterTriggerEndQuery(estate);
929 :
930 : /* Shut down tuple routing, if any was done. */
931 296032 : if (edata->proute)
932 148 : ExecCleanupTupleRouting(edata->mtstate, edata->proute);
933 :
934 : /*
935 : * Cleanup. It might seem that we should call ExecCloseResultRelations()
936 : * here, but we intentionally don't. It would close the rel we added to
937 : * es_opened_result_relations above, which is wrong because we took no
938 : * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
939 : * any other relations opened during execution.
940 : */
941 296032 : ExecResetTupleTable(estate->es_tupleTable, false);
942 296032 : FreeExecutorState(estate);
943 296032 : pfree(edata);
944 296032 : }
945 :
946 : /*
947 : * Executes default values for columns for which we can't map to remote
948 : * relation columns.
949 : *
950 : * This allows us to support tables which have more columns on the downstream
951 : * than on the upstream.
952 : */
953 : static void
954 151588 : slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
955 : TupleTableSlot *slot)
956 : {
957 151588 : TupleDesc desc = RelationGetDescr(rel->localrel);
958 151588 : int num_phys_attrs = desc->natts;
959 : int i;
960 : int attnum,
961 151588 : num_defaults = 0;
962 : int *defmap;
963 : ExprState **defexprs;
964 : ExprContext *econtext;
965 :
966 151588 : econtext = GetPerTupleExprContext(estate);
967 :
968 : /* We got all the data via replication, no need to evaluate anything. */
969 151588 : if (num_phys_attrs == rel->remoterel.natts)
970 71298 : return;
971 :
972 80290 : defmap = (int *) palloc(num_phys_attrs * sizeof(int));
973 80290 : defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
974 :
975 : Assert(rel->attrmap->maplen == num_phys_attrs);
976 421326 : for (attnum = 0; attnum < num_phys_attrs; attnum++)
977 : {
978 : Expr *defexpr;
979 :
980 341036 : if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
981 18 : continue;
982 :
983 341018 : if (rel->attrmap->attnums[attnum] >= 0)
984 184536 : continue;
985 :
986 156482 : defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
987 :
988 156482 : if (defexpr != NULL)
989 : {
990 : /* Run the expression through planner */
991 140262 : defexpr = expression_planner(defexpr);
992 :
993 : /* Initialize executable expression in copycontext */
994 140262 : defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
995 140262 : defmap[num_defaults] = attnum;
996 140262 : num_defaults++;
997 : }
998 : }
999 :
1000 220552 : for (i = 0; i < num_defaults; i++)
1001 140262 : slot->tts_values[defmap[i]] =
1002 140262 : ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
1003 : }
1004 :
1005 : /*
1006 : * Store tuple data into slot.
1007 : *
1008 : * Incoming data can be either text or binary format.
1009 : */
1010 : static void
1011 296128 : slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
1012 : LogicalRepTupleData *tupleData)
1013 : {
1014 296128 : int natts = slot->tts_tupleDescriptor->natts;
1015 : int i;
1016 :
1017 296128 : ExecClearTuple(slot);
1018 :
1019 : /* Call the "in" function for each non-dropped, non-null attribute */
1020 : Assert(natts == rel->attrmap->maplen);
1021 1315276 : for (i = 0; i < natts; i++)
1022 : {
1023 1019148 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
1024 1019148 : int remoteattnum = rel->attrmap->attnums[i];
1025 :
1026 1019148 : if (!att->attisdropped && remoteattnum >= 0)
1027 605314 : {
1028 605314 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
1029 :
1030 : Assert(remoteattnum < tupleData->ncols);
1031 :
1032 : /* Set attnum for error callback */
1033 605314 : apply_error_callback_arg.remote_attnum = remoteattnum;
1034 :
1035 605314 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
1036 : {
1037 : Oid typinput;
1038 : Oid typioparam;
1039 :
1040 284698 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1041 569396 : slot->tts_values[i] =
1042 284698 : OidInputFunctionCall(typinput, colvalue->data,
1043 : typioparam, att->atttypmod);
1044 284698 : slot->tts_isnull[i] = false;
1045 : }
1046 320616 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1047 : {
1048 : Oid typreceive;
1049 : Oid typioparam;
1050 :
1051 : /*
1052 : * In some code paths we may be asked to re-parse the same
1053 : * tuple data. Reset the StringInfo's cursor so that works.
1054 : */
1055 219960 : colvalue->cursor = 0;
1056 :
1057 219960 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1058 439920 : slot->tts_values[i] =
1059 219960 : OidReceiveFunctionCall(typreceive, colvalue,
1060 : typioparam, att->atttypmod);
1061 :
1062 : /* Trouble if it didn't eat the whole buffer */
1063 219960 : if (colvalue->cursor != colvalue->len)
1064 0 : ereport(ERROR,
1065 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
1066 : errmsg("incorrect binary data format in logical replication column %d",
1067 : remoteattnum + 1)));
1068 219960 : slot->tts_isnull[i] = false;
1069 : }
1070 : else
1071 : {
1072 : /*
1073 : * NULL value from remote. (We don't expect to see
1074 : * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
1075 : * NULL.)
1076 : */
1077 100656 : slot->tts_values[i] = (Datum) 0;
1078 100656 : slot->tts_isnull[i] = true;
1079 : }
1080 :
1081 : /* Reset attnum for error callback */
1082 605314 : apply_error_callback_arg.remote_attnum = -1;
1083 : }
1084 : else
1085 : {
1086 : /*
1087 : * We assign NULL to dropped attributes and missing values
1088 : * (missing values should be later filled using
1089 : * slot_fill_defaults).
1090 : */
1091 413834 : slot->tts_values[i] = (Datum) 0;
1092 413834 : slot->tts_isnull[i] = true;
1093 : }
1094 : }
1095 :
1096 296128 : ExecStoreVirtualTuple(slot);
1097 296128 : }
1098 :
1099 : /*
1100 : * Replace updated columns with data from the LogicalRepTupleData struct.
1101 : * This is somewhat similar to heap_modify_tuple but also calls the type
1102 : * input functions on the user data.
1103 : *
1104 : * "slot" is filled with a copy of the tuple in "srcslot", replacing
1105 : * columns provided in "tupleData" and leaving others as-is.
1106 : *
1107 : * Caution: unreplaced pass-by-ref columns in "slot" will point into the
1108 : * storage for "srcslot". This is OK for current usage, but someday we may
1109 : * need to materialize "slot" at the end to make it independent of "srcslot".
1110 : */
1111 : static void
1112 63848 : slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
1113 : LogicalRepRelMapEntry *rel,
1114 : LogicalRepTupleData *tupleData)
1115 : {
1116 63848 : int natts = slot->tts_tupleDescriptor->natts;
1117 : int i;
1118 :
1119 : /* We'll fill "slot" with a virtual tuple, so we must start with ... */
1120 63848 : ExecClearTuple(slot);
1121 :
1122 : /*
1123 : * Copy all the column data from srcslot, so that we'll have valid values
1124 : * for unreplaced columns.
1125 : */
1126 : Assert(natts == srcslot->tts_tupleDescriptor->natts);
1127 63848 : slot_getallattrs(srcslot);
1128 63848 : memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
1129 63848 : memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
1130 :
1131 : /* Call the "in" function for each replaced attribute */
1132 : Assert(natts == rel->attrmap->maplen);
1133 318560 : for (i = 0; i < natts; i++)
1134 : {
1135 254712 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
1136 254712 : int remoteattnum = rel->attrmap->attnums[i];
1137 :
1138 254712 : if (remoteattnum < 0)
1139 117038 : continue;
1140 :
1141 : Assert(remoteattnum < tupleData->ncols);
1142 :
1143 137674 : if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
1144 : {
1145 137668 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
1146 :
1147 : /* Set attnum for error callback */
1148 137668 : apply_error_callback_arg.remote_attnum = remoteattnum;
1149 :
1150 137668 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
1151 : {
1152 : Oid typinput;
1153 : Oid typioparam;
1154 :
1155 50860 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1156 101720 : slot->tts_values[i] =
1157 50860 : OidInputFunctionCall(typinput, colvalue->data,
1158 : typioparam, att->atttypmod);
1159 50860 : slot->tts_isnull[i] = false;
1160 : }
1161 86808 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1162 : {
1163 : Oid typreceive;
1164 : Oid typioparam;
1165 :
1166 : /*
1167 : * In some code paths we may be asked to re-parse the same
1168 : * tuple data. Reset the StringInfo's cursor so that works.
1169 : */
1170 86712 : colvalue->cursor = 0;
1171 :
1172 86712 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1173 173424 : slot->tts_values[i] =
1174 86712 : OidReceiveFunctionCall(typreceive, colvalue,
1175 : typioparam, att->atttypmod);
1176 :
1177 : /* Trouble if it didn't eat the whole buffer */
1178 86712 : if (colvalue->cursor != colvalue->len)
1179 0 : ereport(ERROR,
1180 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
1181 : errmsg("incorrect binary data format in logical replication column %d",
1182 : remoteattnum + 1)));
1183 86712 : slot->tts_isnull[i] = false;
1184 : }
1185 : else
1186 : {
1187 : /* must be LOGICALREP_COLUMN_NULL */
1188 96 : slot->tts_values[i] = (Datum) 0;
1189 96 : slot->tts_isnull[i] = true;
1190 : }
1191 :
1192 : /* Reset attnum for error callback */
1193 137668 : apply_error_callback_arg.remote_attnum = -1;
1194 : }
1195 : }
1196 :
1197 : /* And finally, declare that "slot" contains a valid virtual tuple */
1198 63848 : ExecStoreVirtualTuple(slot);
1199 63848 : }
1200 :
1201 : /*
1202 : * Handle BEGIN message.
1203 : */
1204 : static void
1205 962 : apply_handle_begin(StringInfo s)
1206 : {
1207 : LogicalRepBeginData begin_data;
1208 :
1209 : /* There must not be an active streaming transaction. */
1210 : Assert(!TransactionIdIsValid(stream_xid));
1211 :
1212 962 : logicalrep_read_begin(s, &begin_data);
1213 962 : set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
1214 :
1215 962 : remote_final_lsn = begin_data.final_lsn;
1216 :
1217 962 : maybe_start_skipping_changes(begin_data.final_lsn);
1218 :
1219 962 : in_remote_transaction = true;
1220 :
1221 962 : pgstat_report_activity(STATE_RUNNING, NULL);
1222 962 : }
1223 :
1224 : /*
1225 : * Handle COMMIT message.
1226 : *
1227 : * TODO, support tracking of multiple origins
1228 : */
1229 : static void
1230 872 : apply_handle_commit(StringInfo s)
1231 : {
1232 : LogicalRepCommitData commit_data;
1233 :
1234 872 : logicalrep_read_commit(s, &commit_data);
1235 :
1236 872 : if (commit_data.commit_lsn != remote_final_lsn)
1237 0 : ereport(ERROR,
1238 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1239 : errmsg_internal("incorrect commit LSN %X/%08X in commit message (expected %X/%08X)",
1240 : LSN_FORMAT_ARGS(commit_data.commit_lsn),
1241 : LSN_FORMAT_ARGS(remote_final_lsn))));
1242 :
1243 872 : apply_handle_commit_internal(&commit_data);
1244 :
1245 : /* Process any tables that are being synchronized in parallel. */
1246 872 : process_syncing_tables(commit_data.end_lsn);
1247 :
1248 870 : pgstat_report_activity(STATE_IDLE, NULL);
1249 870 : reset_apply_error_context_info();
1250 870 : }
1251 :
1252 : /*
1253 : * Handle BEGIN PREPARE message.
1254 : */
1255 : static void
1256 32 : apply_handle_begin_prepare(StringInfo s)
1257 : {
1258 : LogicalRepPreparedTxnData begin_data;
1259 :
1260 : /* Tablesync should never receive prepare. */
1261 32 : if (am_tablesync_worker())
1262 0 : ereport(ERROR,
1263 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1264 : errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1265 :
1266 : /* There must not be an active streaming transaction. */
1267 : Assert(!TransactionIdIsValid(stream_xid));
1268 :
1269 32 : logicalrep_read_begin_prepare(s, &begin_data);
1270 32 : set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1271 :
1272 32 : remote_final_lsn = begin_data.prepare_lsn;
1273 :
1274 32 : maybe_start_skipping_changes(begin_data.prepare_lsn);
1275 :
1276 32 : in_remote_transaction = true;
1277 :
1278 32 : pgstat_report_activity(STATE_RUNNING, NULL);
1279 32 : }
1280 :
1281 : /*
1282 : * Common function to prepare the GID.
1283 : */
1284 : static void
1285 46 : apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
1286 : {
1287 : char gid[GIDSIZE];
1288 :
1289 : /*
1290 : * Compute unique GID for two_phase transactions. We don't use GID of
1291 : * prepared transaction sent by server as that can lead to deadlock when
1292 : * we have multiple subscriptions from same node point to publications on
1293 : * the same node. See comments atop worker.c
1294 : */
1295 46 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
1296 : gid, sizeof(gid));
1297 :
1298 : /*
1299 : * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1300 : * called within the PrepareTransactionBlock below.
1301 : */
1302 46 : if (!IsTransactionBlock())
1303 : {
1304 46 : BeginTransactionBlock();
1305 46 : CommitTransactionCommand(); /* Completes the preceding Begin command. */
1306 : }
1307 :
1308 : /*
1309 : * Update origin state so we can restart streaming from correct position
1310 : * in case of crash.
1311 : */
1312 46 : replorigin_session_origin_lsn = prepare_data->end_lsn;
1313 46 : replorigin_session_origin_timestamp = prepare_data->prepare_time;
1314 :
1315 46 : PrepareTransactionBlock(gid);
1316 46 : }
1317 :
1318 : /*
1319 : * Handle PREPARE message.
1320 : */
1321 : static void
1322 30 : apply_handle_prepare(StringInfo s)
1323 : {
1324 : LogicalRepPreparedTxnData prepare_data;
1325 :
1326 30 : logicalrep_read_prepare(s, &prepare_data);
1327 :
1328 30 : if (prepare_data.prepare_lsn != remote_final_lsn)
1329 0 : ereport(ERROR,
1330 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1331 : errmsg_internal("incorrect prepare LSN %X/%08X in prepare message (expected %X/%08X)",
1332 : LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1333 : LSN_FORMAT_ARGS(remote_final_lsn))));
1334 :
1335 : /*
1336 : * Unlike commit, here, we always prepare the transaction even though no
1337 : * change has happened in this transaction or all changes are skipped. It
1338 : * is done this way because at commit prepared time, we won't know whether
1339 : * we have skipped preparing a transaction because of those reasons.
1340 : *
1341 : * XXX, We can optimize such that at commit prepared time, we first check
1342 : * whether we have prepared the transaction or not but that doesn't seem
1343 : * worthwhile because such cases shouldn't be common.
1344 : */
1345 30 : begin_replication_step();
1346 :
1347 30 : apply_handle_prepare_internal(&prepare_data);
1348 :
1349 30 : end_replication_step();
1350 30 : CommitTransactionCommand();
1351 28 : pgstat_report_stat(false);
1352 :
1353 : /*
1354 : * It is okay not to set the local_end LSN for the prepare because we
1355 : * always flush the prepare record. So, we can send the acknowledgment of
1356 : * the remote_end LSN as soon as prepare is finished.
1357 : *
1358 : * XXX For the sake of consistency with commit, we could have set it with
1359 : * the LSN of prepare but as of now we don't track that value similar to
1360 : * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
1361 : * it.
1362 : */
1363 28 : store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
1364 :
1365 28 : in_remote_transaction = false;
1366 :
1367 : /* Process any tables that are being synchronized in parallel. */
1368 28 : process_syncing_tables(prepare_data.end_lsn);
1369 :
1370 : /*
1371 : * Since we have already prepared the transaction, in a case where the
1372 : * server crashes before clearing the subskiplsn, it will be left but the
1373 : * transaction won't be resent. But that's okay because it's a rare case
1374 : * and the subskiplsn will be cleared when finishing the next transaction.
1375 : */
1376 28 : stop_skipping_changes();
1377 28 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1378 :
1379 28 : pgstat_report_activity(STATE_IDLE, NULL);
1380 28 : reset_apply_error_context_info();
1381 28 : }
1382 :
1383 : /*
1384 : * Handle a COMMIT PREPARED of a previously PREPARED transaction.
1385 : *
1386 : * Note that we don't need to wait here if the transaction was prepared in a
1387 : * parallel apply worker. In that case, we have already waited for the prepare
1388 : * to finish in apply_handle_stream_prepare() which will ensure all the
1389 : * operations in that transaction have happened in the subscriber, so no
1390 : * concurrent transaction can cause deadlock or transaction dependency issues.
1391 : */
1392 : static void
1393 40 : apply_handle_commit_prepared(StringInfo s)
1394 : {
1395 : LogicalRepCommitPreparedTxnData prepare_data;
1396 : char gid[GIDSIZE];
1397 :
1398 40 : logicalrep_read_commit_prepared(s, &prepare_data);
1399 40 : set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1400 :
1401 : /* Compute GID for two_phase transactions. */
1402 40 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
1403 : gid, sizeof(gid));
1404 :
1405 : /* There is no transaction when COMMIT PREPARED is called */
1406 40 : begin_replication_step();
1407 :
1408 : /*
1409 : * Update origin state so we can restart streaming from correct position
1410 : * in case of crash.
1411 : */
1412 40 : replorigin_session_origin_lsn = prepare_data.end_lsn;
1413 40 : replorigin_session_origin_timestamp = prepare_data.commit_time;
1414 :
1415 40 : FinishPreparedTransaction(gid, true);
1416 40 : end_replication_step();
1417 40 : CommitTransactionCommand();
1418 40 : pgstat_report_stat(false);
1419 :
1420 40 : store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
1421 40 : in_remote_transaction = false;
1422 :
1423 : /* Process any tables that are being synchronized in parallel. */
1424 40 : process_syncing_tables(prepare_data.end_lsn);
1425 :
1426 40 : clear_subscription_skip_lsn(prepare_data.end_lsn);
1427 :
1428 40 : pgstat_report_activity(STATE_IDLE, NULL);
1429 40 : reset_apply_error_context_info();
1430 40 : }
1431 :
1432 : /*
1433 : * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1434 : *
1435 : * Note that we don't need to wait here if the transaction was prepared in a
1436 : * parallel apply worker. In that case, we have already waited for the prepare
1437 : * to finish in apply_handle_stream_prepare() which will ensure all the
1438 : * operations in that transaction have happened in the subscriber, so no
1439 : * concurrent transaction can cause deadlock or transaction dependency issues.
1440 : */
1441 : static void
1442 10 : apply_handle_rollback_prepared(StringInfo s)
1443 : {
1444 : LogicalRepRollbackPreparedTxnData rollback_data;
1445 : char gid[GIDSIZE];
1446 :
1447 10 : logicalrep_read_rollback_prepared(s, &rollback_data);
1448 10 : set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1449 :
1450 : /* Compute GID for two_phase transactions. */
1451 10 : TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
1452 : gid, sizeof(gid));
1453 :
1454 : /*
1455 : * It is possible that we haven't received prepare because it occurred
1456 : * before walsender reached a consistent point or the two_phase was still
1457 : * not enabled by that time, so in such cases, we need to skip rollback
1458 : * prepared.
1459 : */
1460 10 : if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1461 : rollback_data.prepare_time))
1462 : {
1463 : /*
1464 : * Update origin state so we can restart streaming from correct
1465 : * position in case of crash.
1466 : */
1467 10 : replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
1468 10 : replorigin_session_origin_timestamp = rollback_data.rollback_time;
1469 :
1470 : /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1471 10 : begin_replication_step();
1472 10 : FinishPreparedTransaction(gid, false);
1473 10 : end_replication_step();
1474 10 : CommitTransactionCommand();
1475 :
1476 10 : clear_subscription_skip_lsn(rollback_data.rollback_end_lsn);
1477 : }
1478 :
1479 10 : pgstat_report_stat(false);
1480 :
1481 : /*
1482 : * It is okay not to set the local_end LSN for the rollback of prepared
1483 : * transaction because we always flush the WAL record for it. See
1484 : * apply_handle_prepare.
1485 : */
1486 10 : store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr);
1487 10 : in_remote_transaction = false;
1488 :
1489 : /* Process any tables that are being synchronized in parallel. */
1490 10 : process_syncing_tables(rollback_data.rollback_end_lsn);
1491 :
1492 10 : pgstat_report_activity(STATE_IDLE, NULL);
1493 10 : reset_apply_error_context_info();
1494 10 : }
1495 :
1496 : /*
1497 : * Handle STREAM PREPARE.
1498 : */
1499 : static void
1500 22 : apply_handle_stream_prepare(StringInfo s)
1501 : {
1502 : LogicalRepPreparedTxnData prepare_data;
1503 : ParallelApplyWorkerInfo *winfo;
1504 : TransApplyAction apply_action;
1505 :
1506 : /* Save the message before it is consumed. */
1507 22 : StringInfoData original_msg = *s;
1508 :
1509 22 : if (in_streamed_transaction)
1510 0 : ereport(ERROR,
1511 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1512 : errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1513 :
1514 : /* Tablesync should never receive prepare. */
1515 22 : if (am_tablesync_worker())
1516 0 : ereport(ERROR,
1517 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1518 : errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1519 :
1520 22 : logicalrep_read_stream_prepare(s, &prepare_data);
1521 22 : set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1522 :
1523 22 : apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1524 :
1525 22 : switch (apply_action)
1526 : {
1527 10 : case TRANS_LEADER_APPLY:
1528 :
1529 : /*
1530 : * The transaction has been serialized to file, so replay all the
1531 : * spooled operations.
1532 : */
1533 10 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
1534 : prepare_data.xid, prepare_data.prepare_lsn);
1535 :
1536 : /* Mark the transaction as prepared. */
1537 10 : apply_handle_prepare_internal(&prepare_data);
1538 :
1539 10 : CommitTransactionCommand();
1540 :
1541 : /*
1542 : * It is okay not to set the local_end LSN for the prepare because
1543 : * we always flush the prepare record. See apply_handle_prepare.
1544 : */
1545 10 : store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
1546 :
1547 10 : in_remote_transaction = false;
1548 :
1549 : /* Unlink the files with serialized changes and subxact info. */
1550 10 : stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
1551 :
1552 10 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1553 10 : break;
1554 :
1555 4 : case TRANS_LEADER_SEND_TO_PARALLEL:
1556 : Assert(winfo);
1557 :
1558 4 : if (pa_send_data(winfo, s->len, s->data))
1559 : {
1560 : /* Finish processing the streaming transaction. */
1561 4 : pa_xact_finish(winfo, prepare_data.end_lsn);
1562 4 : break;
1563 : }
1564 :
1565 : /*
1566 : * Switch to serialize mode when we are not able to send the
1567 : * change to parallel apply worker.
1568 : */
1569 0 : pa_switch_to_partial_serialize(winfo, true);
1570 :
1571 : /* fall through */
1572 2 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1573 : Assert(winfo);
1574 :
1575 2 : stream_open_and_write_change(prepare_data.xid,
1576 : LOGICAL_REP_MSG_STREAM_PREPARE,
1577 : &original_msg);
1578 :
1579 2 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
1580 :
1581 : /* Finish processing the streaming transaction. */
1582 2 : pa_xact_finish(winfo, prepare_data.end_lsn);
1583 2 : break;
1584 :
1585 6 : case TRANS_PARALLEL_APPLY:
1586 :
1587 : /*
1588 : * If the parallel apply worker is applying spooled messages then
1589 : * close the file before preparing.
1590 : */
1591 6 : if (stream_fd)
1592 2 : stream_close_file();
1593 :
1594 6 : begin_replication_step();
1595 :
1596 : /* Mark the transaction as prepared. */
1597 6 : apply_handle_prepare_internal(&prepare_data);
1598 :
1599 6 : end_replication_step();
1600 :
1601 6 : CommitTransactionCommand();
1602 :
1603 : /*
1604 : * It is okay not to set the local_end LSN for the prepare because
1605 : * we always flush the prepare record. See apply_handle_prepare.
1606 : */
1607 6 : MyParallelShared->last_commit_end = InvalidXLogRecPtr;
1608 :
1609 6 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
1610 6 : pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1611 :
1612 6 : pa_reset_subtrans();
1613 :
1614 6 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1615 6 : break;
1616 :
1617 0 : default:
1618 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1619 : break;
1620 : }
1621 :
1622 22 : pgstat_report_stat(false);
1623 :
1624 : /* Process any tables that are being synchronized in parallel. */
1625 22 : process_syncing_tables(prepare_data.end_lsn);
1626 :
1627 : /*
1628 : * Similar to prepare case, the subskiplsn could be left in a case of
1629 : * server crash but it's okay. See the comments in apply_handle_prepare().
1630 : */
1631 22 : stop_skipping_changes();
1632 22 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1633 :
1634 22 : pgstat_report_activity(STATE_IDLE, NULL);
1635 :
1636 22 : reset_apply_error_context_info();
1637 22 : }
1638 :
1639 : /*
1640 : * Handle ORIGIN message.
1641 : *
1642 : * TODO, support tracking of multiple origins
1643 : */
1644 : static void
1645 14 : apply_handle_origin(StringInfo s)
1646 : {
1647 : /*
1648 : * ORIGIN message can only come inside streaming transaction or inside
1649 : * remote transaction and before any actual writes.
1650 : */
1651 14 : if (!in_streamed_transaction &&
1652 20 : (!in_remote_transaction ||
1653 10 : (IsTransactionState() && !am_tablesync_worker())))
1654 0 : ereport(ERROR,
1655 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1656 : errmsg_internal("ORIGIN message sent out of order")));
1657 14 : }
1658 :
1659 : /*
1660 : * Initialize fileset (if not already done).
1661 : *
1662 : * Create a new file when first_segment is true, otherwise open the existing
1663 : * file.
1664 : */
1665 : void
1666 726 : stream_start_internal(TransactionId xid, bool first_segment)
1667 : {
1668 726 : begin_replication_step();
1669 :
1670 : /*
1671 : * Initialize the worker's stream_fileset if we haven't yet. This will be
1672 : * used for the entire duration of the worker so create it in a permanent
1673 : * context. We create this on the very first streaming message from any
1674 : * transaction and then use it for this and other streaming transactions.
1675 : * Now, we could create a fileset at the start of the worker as well but
1676 : * then we won't be sure that it will ever be used.
1677 : */
1678 726 : if (!MyLogicalRepWorker->stream_fileset)
1679 : {
1680 : MemoryContext oldctx;
1681 :
1682 28 : oldctx = MemoryContextSwitchTo(ApplyContext);
1683 :
1684 28 : MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
1685 28 : FileSetInit(MyLogicalRepWorker->stream_fileset);
1686 :
1687 28 : MemoryContextSwitchTo(oldctx);
1688 : }
1689 :
1690 : /* Open the spool file for this transaction. */
1691 726 : stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1692 :
1693 : /* If this is not the first segment, open existing subxact file. */
1694 726 : if (!first_segment)
1695 662 : subxact_info_read(MyLogicalRepWorker->subid, xid);
1696 :
1697 726 : end_replication_step();
1698 726 : }
1699 :
1700 : /*
1701 : * Handle STREAM START message.
1702 : */
1703 : static void
1704 1714 : apply_handle_stream_start(StringInfo s)
1705 : {
1706 : bool first_segment;
1707 : ParallelApplyWorkerInfo *winfo;
1708 : TransApplyAction apply_action;
1709 :
1710 : /* Save the message before it is consumed. */
1711 1714 : StringInfoData original_msg = *s;
1712 :
1713 1714 : if (in_streamed_transaction)
1714 0 : ereport(ERROR,
1715 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1716 : errmsg_internal("duplicate STREAM START message")));
1717 :
1718 : /* There must not be an active streaming transaction. */
1719 : Assert(!TransactionIdIsValid(stream_xid));
1720 :
1721 : /* notify handle methods we're processing a remote transaction */
1722 1714 : in_streamed_transaction = true;
1723 :
1724 : /* extract XID of the top-level transaction */
1725 1714 : stream_xid = logicalrep_read_stream_start(s, &first_segment);
1726 :
1727 1714 : if (!TransactionIdIsValid(stream_xid))
1728 0 : ereport(ERROR,
1729 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1730 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
1731 :
1732 1714 : set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
1733 :
1734 : /* Try to allocate a worker for the streaming transaction. */
1735 1714 : if (first_segment)
1736 164 : pa_allocate_worker(stream_xid);
1737 :
1738 1714 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1739 :
1740 1714 : switch (apply_action)
1741 : {
1742 686 : case TRANS_LEADER_SERIALIZE:
1743 :
1744 : /*
1745 : * Function stream_start_internal starts a transaction. This
1746 : * transaction will be committed on the stream stop unless it is a
1747 : * tablesync worker in which case it will be committed after
1748 : * processing all the messages. We need this transaction for
1749 : * handling the BufFile, used for serializing the streaming data
1750 : * and subxact info.
1751 : */
1752 686 : stream_start_internal(stream_xid, first_segment);
1753 686 : break;
1754 :
1755 502 : case TRANS_LEADER_SEND_TO_PARALLEL:
1756 : Assert(winfo);
1757 :
1758 : /*
1759 : * Once we start serializing the changes, the parallel apply
1760 : * worker will wait for the leader to release the stream lock
1761 : * until the end of the transaction. So, we don't need to release
1762 : * the lock or increment the stream count in that case.
1763 : */
1764 502 : if (pa_send_data(winfo, s->len, s->data))
1765 : {
1766 : /*
1767 : * Unlock the shared object lock so that the parallel apply
1768 : * worker can continue to receive changes.
1769 : */
1770 494 : if (!first_segment)
1771 448 : pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
1772 :
1773 : /*
1774 : * Increment the number of streaming blocks waiting to be
1775 : * processed by parallel apply worker.
1776 : */
1777 494 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
1778 :
1779 : /* Cache the parallel apply worker for this transaction. */
1780 494 : pa_set_stream_apply_worker(winfo);
1781 494 : break;
1782 : }
1783 :
1784 : /*
1785 : * Switch to serialize mode when we are not able to send the
1786 : * change to parallel apply worker.
1787 : */
1788 8 : pa_switch_to_partial_serialize(winfo, !first_segment);
1789 :
1790 : /* fall through */
1791 30 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1792 : Assert(winfo);
1793 :
1794 : /*
1795 : * Open the spool file unless it was already opened when switching
1796 : * to serialize mode. The transaction started in
1797 : * stream_start_internal will be committed on the stream stop.
1798 : */
1799 30 : if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1800 22 : stream_start_internal(stream_xid, first_segment);
1801 :
1802 30 : stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg);
1803 :
1804 : /* Cache the parallel apply worker for this transaction. */
1805 30 : pa_set_stream_apply_worker(winfo);
1806 30 : break;
1807 :
1808 504 : case TRANS_PARALLEL_APPLY:
1809 504 : if (first_segment)
1810 : {
1811 : /* Hold the lock until the end of the transaction. */
1812 54 : pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1813 54 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED);
1814 :
1815 : /*
1816 : * Signal the leader apply worker, as it may be waiting for
1817 : * us.
1818 : */
1819 54 : logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
1820 : }
1821 :
1822 504 : parallel_stream_nchanges = 0;
1823 504 : break;
1824 :
1825 0 : default:
1826 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1827 : break;
1828 : }
1829 :
1830 1714 : pgstat_report_activity(STATE_RUNNING, NULL);
1831 1714 : }
1832 :
1833 : /*
1834 : * Update the information about subxacts and close the file.
1835 : *
1836 : * This function should be called when the stream_start_internal function has
1837 : * been called.
1838 : */
1839 : void
1840 726 : stream_stop_internal(TransactionId xid)
1841 : {
1842 : /*
1843 : * Serialize information about subxacts for the toplevel transaction, then
1844 : * close the stream messages spool file.
1845 : */
1846 726 : subxact_info_write(MyLogicalRepWorker->subid, xid);
1847 726 : stream_close_file();
1848 :
1849 : /* We must be in a valid transaction state */
1850 : Assert(IsTransactionState());
1851 :
1852 : /* Commit the per-stream transaction */
1853 726 : CommitTransactionCommand();
1854 :
1855 : /* Reset per-stream context */
1856 726 : MemoryContextReset(LogicalStreamingContext);
1857 726 : }
1858 :
1859 : /*
1860 : * Handle STREAM STOP message.
1861 : */
1862 : static void
1863 1712 : apply_handle_stream_stop(StringInfo s)
1864 : {
1865 : ParallelApplyWorkerInfo *winfo;
1866 : TransApplyAction apply_action;
1867 :
1868 1712 : if (!in_streamed_transaction)
1869 0 : ereport(ERROR,
1870 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1871 : errmsg_internal("STREAM STOP message without STREAM START")));
1872 :
1873 1712 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1874 :
1875 1712 : switch (apply_action)
1876 : {
1877 686 : case TRANS_LEADER_SERIALIZE:
1878 686 : stream_stop_internal(stream_xid);
1879 686 : break;
1880 :
1881 494 : case TRANS_LEADER_SEND_TO_PARALLEL:
1882 : Assert(winfo);
1883 :
1884 : /*
1885 : * Lock before sending the STREAM_STOP message so that the leader
1886 : * can hold the lock first and the parallel apply worker will wait
1887 : * for leader to release the lock. See Locking Considerations atop
1888 : * applyparallelworker.c.
1889 : */
1890 494 : pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
1891 :
1892 494 : if (pa_send_data(winfo, s->len, s->data))
1893 : {
1894 494 : pa_set_stream_apply_worker(NULL);
1895 494 : break;
1896 : }
1897 :
1898 : /*
1899 : * Switch to serialize mode when we are not able to send the
1900 : * change to parallel apply worker.
1901 : */
1902 0 : pa_switch_to_partial_serialize(winfo, true);
1903 :
1904 : /* fall through */
1905 30 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1906 30 : stream_write_change(LOGICAL_REP_MSG_STREAM_STOP, s);
1907 30 : stream_stop_internal(stream_xid);
1908 30 : pa_set_stream_apply_worker(NULL);
1909 30 : break;
1910 :
1911 502 : case TRANS_PARALLEL_APPLY:
1912 502 : elog(DEBUG1, "applied %u changes in the streaming chunk",
1913 : parallel_stream_nchanges);
1914 :
1915 : /*
1916 : * By the time parallel apply worker is processing the changes in
1917 : * the current streaming block, the leader apply worker may have
1918 : * sent multiple streaming blocks. This can lead to parallel apply
1919 : * worker start waiting even when there are more chunk of streams
1920 : * in the queue. So, try to lock only if there is no message left
1921 : * in the queue. See Locking Considerations atop
1922 : * applyparallelworker.c.
1923 : *
1924 : * Note that here we have a race condition where we can start
1925 : * waiting even when there are pending streaming chunks. This can
1926 : * happen if the leader sends another streaming block and acquires
1927 : * the stream lock again after the parallel apply worker checks
1928 : * that there is no pending streaming block and before it actually
1929 : * starts waiting on a lock. We can handle this case by not
1930 : * allowing the leader to increment the stream block count during
1931 : * the time parallel apply worker acquires the lock but it is not
1932 : * clear whether that is worth the complexity.
1933 : *
1934 : * Now, if this missed chunk contains rollback to savepoint, then
1935 : * there is a risk of deadlock which probably shouldn't happen
1936 : * after restart.
1937 : */
1938 502 : pa_decr_and_wait_stream_block();
1939 498 : break;
1940 :
1941 0 : default:
1942 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1943 : break;
1944 : }
1945 :
1946 1708 : in_streamed_transaction = false;
1947 1708 : stream_xid = InvalidTransactionId;
1948 :
1949 : /*
1950 : * The parallel apply worker could be in a transaction in which case we
1951 : * need to report the state as STATE_IDLEINTRANSACTION.
1952 : */
1953 1708 : if (IsTransactionOrTransactionBlock())
1954 498 : pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL);
1955 : else
1956 1210 : pgstat_report_activity(STATE_IDLE, NULL);
1957 :
1958 1708 : reset_apply_error_context_info();
1959 1708 : }
1960 :
1961 : /*
1962 : * Helper function to handle STREAM ABORT message when the transaction was
1963 : * serialized to file.
1964 : */
1965 : static void
1966 28 : stream_abort_internal(TransactionId xid, TransactionId subxid)
1967 : {
1968 : /*
1969 : * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1970 : * just delete the files with serialized info.
1971 : */
1972 28 : if (xid == subxid)
1973 2 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
1974 : else
1975 : {
1976 : /*
1977 : * OK, so it's a subxact. We need to read the subxact file for the
1978 : * toplevel transaction, determine the offset tracked for the subxact,
1979 : * and truncate the file with changes. We also remove the subxacts
1980 : * with higher offsets (or rather higher XIDs).
1981 : *
1982 : * We intentionally scan the array from the tail, because we're likely
1983 : * aborting a change for the most recent subtransactions.
1984 : *
1985 : * We can't use the binary search here as subxact XIDs won't
1986 : * necessarily arrive in sorted order, consider the case where we have
1987 : * released the savepoint for multiple subtransactions and then
1988 : * performed rollback to savepoint for one of the earlier
1989 : * sub-transaction.
1990 : */
1991 : int64 i;
1992 : int64 subidx;
1993 : BufFile *fd;
1994 26 : bool found = false;
1995 : char path[MAXPGPATH];
1996 :
1997 26 : subidx = -1;
1998 26 : begin_replication_step();
1999 26 : subxact_info_read(MyLogicalRepWorker->subid, xid);
2000 :
2001 30 : for (i = subxact_data.nsubxacts; i > 0; i--)
2002 : {
2003 22 : if (subxact_data.subxacts[i - 1].xid == subxid)
2004 : {
2005 18 : subidx = (i - 1);
2006 18 : found = true;
2007 18 : break;
2008 : }
2009 : }
2010 :
2011 : /*
2012 : * If it's an empty sub-transaction then we will not find the subxid
2013 : * here so just cleanup the subxact info and return.
2014 : */
2015 26 : if (!found)
2016 : {
2017 : /* Cleanup the subxact info */
2018 8 : cleanup_subxact_info();
2019 8 : end_replication_step();
2020 8 : CommitTransactionCommand();
2021 8 : return;
2022 : }
2023 :
2024 : /* open the changes file */
2025 18 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2026 18 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
2027 : O_RDWR, false);
2028 :
2029 : /* OK, truncate the file at the right offset */
2030 18 : BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
2031 18 : subxact_data.subxacts[subidx].offset);
2032 18 : BufFileClose(fd);
2033 :
2034 : /* discard the subxacts added later */
2035 18 : subxact_data.nsubxacts = subidx;
2036 :
2037 : /* write the updated subxact list */
2038 18 : subxact_info_write(MyLogicalRepWorker->subid, xid);
2039 :
2040 18 : end_replication_step();
2041 18 : CommitTransactionCommand();
2042 : }
2043 : }
2044 :
2045 : /*
2046 : * Handle STREAM ABORT message.
2047 : */
2048 : static void
2049 76 : apply_handle_stream_abort(StringInfo s)
2050 : {
2051 : TransactionId xid;
2052 : TransactionId subxid;
2053 : LogicalRepStreamAbortData abort_data;
2054 : ParallelApplyWorkerInfo *winfo;
2055 : TransApplyAction apply_action;
2056 :
2057 : /* Save the message before it is consumed. */
2058 76 : StringInfoData original_msg = *s;
2059 : bool toplevel_xact;
2060 :
2061 76 : if (in_streamed_transaction)
2062 0 : ereport(ERROR,
2063 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2064 : errmsg_internal("STREAM ABORT message without STREAM STOP")));
2065 :
2066 : /* We receive abort information only when we can apply in parallel. */
2067 76 : logicalrep_read_stream_abort(s, &abort_data,
2068 76 : MyLogicalRepWorker->parallel_apply);
2069 :
2070 76 : xid = abort_data.xid;
2071 76 : subxid = abort_data.subxid;
2072 76 : toplevel_xact = (xid == subxid);
2073 :
2074 76 : set_apply_error_context_xact(subxid, abort_data.abort_lsn);
2075 :
2076 76 : apply_action = get_transaction_apply_action(xid, &winfo);
2077 :
2078 76 : switch (apply_action)
2079 : {
2080 28 : case TRANS_LEADER_APPLY:
2081 :
2082 : /*
2083 : * We are in the leader apply worker and the transaction has been
2084 : * serialized to file.
2085 : */
2086 28 : stream_abort_internal(xid, subxid);
2087 :
2088 28 : elog(DEBUG1, "finished processing the STREAM ABORT command");
2089 28 : break;
2090 :
2091 20 : case TRANS_LEADER_SEND_TO_PARALLEL:
2092 : Assert(winfo);
2093 :
2094 : /*
2095 : * For the case of aborting the subtransaction, we increment the
2096 : * number of streaming blocks and take the lock again before
2097 : * sending the STREAM_ABORT to ensure that the parallel apply
2098 : * worker will wait on the lock for the next set of changes after
2099 : * processing the STREAM_ABORT message if it is not already
2100 : * waiting for STREAM_STOP message.
2101 : *
2102 : * It is important to perform this locking before sending the
2103 : * STREAM_ABORT message so that the leader can hold the lock first
2104 : * and the parallel apply worker will wait for the leader to
2105 : * release the lock. This is the same as what we do in
2106 : * apply_handle_stream_stop. See Locking Considerations atop
2107 : * applyparallelworker.c.
2108 : */
2109 20 : if (!toplevel_xact)
2110 : {
2111 18 : pa_unlock_stream(xid, AccessExclusiveLock);
2112 18 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
2113 18 : pa_lock_stream(xid, AccessExclusiveLock);
2114 : }
2115 :
2116 20 : if (pa_send_data(winfo, s->len, s->data))
2117 : {
2118 : /*
2119 : * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
2120 : * wait here for the parallel apply worker to finish as that
2121 : * is not required to maintain the commit order and won't have
2122 : * the risk of failures due to transaction dependencies and
2123 : * deadlocks. However, it is possible that before the parallel
2124 : * worker finishes and we clear the worker info, the xid
2125 : * wraparound happens on the upstream and a new transaction
2126 : * with the same xid can appear and that can lead to duplicate
2127 : * entries in ParallelApplyTxnHash. Yet another problem could
2128 : * be that we may have serialized the changes in partial
2129 : * serialize mode and the file containing xact changes may
2130 : * already exist, and after xid wraparound trying to create
2131 : * the file for the same xid can lead to an error. To avoid
2132 : * these problems, we decide to wait for the aborts to finish.
2133 : *
2134 : * Note, it is okay to not update the flush location position
2135 : * for aborts as in worst case that means such a transaction
2136 : * won't be sent again after restart.
2137 : */
2138 20 : if (toplevel_xact)
2139 2 : pa_xact_finish(winfo, InvalidXLogRecPtr);
2140 :
2141 20 : break;
2142 : }
2143 :
2144 : /*
2145 : * Switch to serialize mode when we are not able to send the
2146 : * change to parallel apply worker.
2147 : */
2148 0 : pa_switch_to_partial_serialize(winfo, true);
2149 :
2150 : /* fall through */
2151 4 : case TRANS_LEADER_PARTIAL_SERIALIZE:
2152 : Assert(winfo);
2153 :
2154 : /*
2155 : * Parallel apply worker might have applied some changes, so write
2156 : * the STREAM_ABORT message so that it can rollback the
2157 : * subtransaction if needed.
2158 : */
2159 4 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_ABORT,
2160 : &original_msg);
2161 :
2162 4 : if (toplevel_xact)
2163 : {
2164 2 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
2165 2 : pa_xact_finish(winfo, InvalidXLogRecPtr);
2166 : }
2167 4 : break;
2168 :
2169 24 : case TRANS_PARALLEL_APPLY:
2170 :
2171 : /*
2172 : * If the parallel apply worker is applying spooled messages then
2173 : * close the file before aborting.
2174 : */
2175 24 : if (toplevel_xact && stream_fd)
2176 2 : stream_close_file();
2177 :
2178 24 : pa_stream_abort(&abort_data);
2179 :
2180 : /*
2181 : * We need to wait after processing rollback to savepoint for the
2182 : * next set of changes.
2183 : *
2184 : * We have a race condition here due to which we can start waiting
2185 : * here when there are more chunk of streams in the queue. See
2186 : * apply_handle_stream_stop.
2187 : */
2188 24 : if (!toplevel_xact)
2189 20 : pa_decr_and_wait_stream_block();
2190 :
2191 24 : elog(DEBUG1, "finished processing the STREAM ABORT command");
2192 24 : break;
2193 :
2194 0 : default:
2195 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2196 : break;
2197 : }
2198 :
2199 76 : reset_apply_error_context_info();
2200 76 : }
2201 :
2202 : /*
2203 : * Ensure that the passed location is fileset's end.
2204 : */
2205 : static void
2206 8 : ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
2207 : off_t offset)
2208 : {
2209 : char path[MAXPGPATH];
2210 : BufFile *fd;
2211 : int last_fileno;
2212 : off_t last_offset;
2213 :
2214 : Assert(!IsTransactionState());
2215 :
2216 8 : begin_replication_step();
2217 :
2218 8 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2219 :
2220 8 : fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2221 :
2222 8 : BufFileSeek(fd, 0, 0, SEEK_END);
2223 8 : BufFileTell(fd, &last_fileno, &last_offset);
2224 :
2225 8 : BufFileClose(fd);
2226 :
2227 8 : end_replication_step();
2228 :
2229 8 : if (last_fileno != fileno || last_offset != offset)
2230 0 : elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2231 : path);
2232 8 : }
2233 :
2234 : /*
2235 : * Common spoolfile processing.
2236 : */
2237 : void
2238 62 : apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
2239 : XLogRecPtr lsn)
2240 : {
2241 : int nchanges;
2242 : char path[MAXPGPATH];
2243 62 : char *buffer = NULL;
2244 : MemoryContext oldcxt;
2245 : ResourceOwner oldowner;
2246 : int fileno;
2247 : off_t offset;
2248 :
2249 62 : if (!am_parallel_apply_worker())
2250 54 : maybe_start_skipping_changes(lsn);
2251 :
2252 : /* Make sure we have an open transaction */
2253 62 : begin_replication_step();
2254 :
2255 : /*
2256 : * Allocate file handle and memory required to process all the messages in
2257 : * TopTransactionContext to avoid them getting reset after each message is
2258 : * processed.
2259 : */
2260 62 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
2261 :
2262 : /* Open the spool file for the committed/prepared transaction */
2263 62 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2264 62 : elog(DEBUG1, "replaying changes from file \"%s\"", path);
2265 :
2266 : /*
2267 : * Make sure the file is owned by the toplevel transaction so that the
2268 : * file will not be accidentally closed when aborting a subtransaction.
2269 : */
2270 62 : oldowner = CurrentResourceOwner;
2271 62 : CurrentResourceOwner = TopTransactionResourceOwner;
2272 :
2273 62 : stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2274 :
2275 62 : CurrentResourceOwner = oldowner;
2276 :
2277 62 : buffer = palloc(BLCKSZ);
2278 :
2279 62 : MemoryContextSwitchTo(oldcxt);
2280 :
2281 62 : remote_final_lsn = lsn;
2282 :
2283 : /*
2284 : * Make sure the handle apply_dispatch methods are aware we're in a remote
2285 : * transaction.
2286 : */
2287 62 : in_remote_transaction = true;
2288 62 : pgstat_report_activity(STATE_RUNNING, NULL);
2289 :
2290 62 : end_replication_step();
2291 :
2292 : /*
2293 : * Read the entries one by one and pass them through the same logic as in
2294 : * apply_dispatch.
2295 : */
2296 62 : nchanges = 0;
2297 : while (true)
2298 176942 : {
2299 : StringInfoData s2;
2300 : size_t nbytes;
2301 : int len;
2302 :
2303 177004 : CHECK_FOR_INTERRUPTS();
2304 :
2305 : /* read length of the on-disk record */
2306 177004 : nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2307 :
2308 : /* have we reached end of the file? */
2309 177004 : if (nbytes == 0)
2310 52 : break;
2311 :
2312 : /* do we have a correct length? */
2313 176952 : if (len <= 0)
2314 0 : elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2315 : len, path);
2316 :
2317 : /* make sure we have sufficiently large buffer */
2318 176952 : buffer = repalloc(buffer, len);
2319 :
2320 : /* and finally read the data into the buffer */
2321 176952 : BufFileReadExact(stream_fd, buffer, len);
2322 :
2323 176952 : BufFileTell(stream_fd, &fileno, &offset);
2324 :
2325 : /* init a stringinfo using the buffer and call apply_dispatch */
2326 176952 : initReadOnlyStringInfo(&s2, buffer, len);
2327 :
2328 : /* Ensure we are reading the data into our memory context. */
2329 176952 : oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
2330 :
2331 176952 : apply_dispatch(&s2);
2332 :
2333 176950 : MemoryContextReset(ApplyMessageContext);
2334 :
2335 176950 : MemoryContextSwitchTo(oldcxt);
2336 :
2337 176950 : nchanges++;
2338 :
2339 : /*
2340 : * It is possible the file has been closed because we have processed
2341 : * the transaction end message like stream_commit in which case that
2342 : * must be the last message.
2343 : */
2344 176950 : if (!stream_fd)
2345 : {
2346 8 : ensure_last_message(stream_fileset, xid, fileno, offset);
2347 8 : break;
2348 : }
2349 :
2350 176942 : if (nchanges % 1000 == 0)
2351 168 : elog(DEBUG1, "replayed %d changes from file \"%s\"",
2352 : nchanges, path);
2353 : }
2354 :
2355 60 : if (stream_fd)
2356 52 : stream_close_file();
2357 :
2358 60 : elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2359 : nchanges, path);
2360 :
2361 60 : return;
2362 : }
2363 :
2364 : /*
2365 : * Handle STREAM COMMIT message.
2366 : */
2367 : static void
2368 122 : apply_handle_stream_commit(StringInfo s)
2369 : {
2370 : TransactionId xid;
2371 : LogicalRepCommitData commit_data;
2372 : ParallelApplyWorkerInfo *winfo;
2373 : TransApplyAction apply_action;
2374 :
2375 : /* Save the message before it is consumed. */
2376 122 : StringInfoData original_msg = *s;
2377 :
2378 122 : if (in_streamed_transaction)
2379 0 : ereport(ERROR,
2380 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2381 : errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2382 :
2383 122 : xid = logicalrep_read_stream_commit(s, &commit_data);
2384 122 : set_apply_error_context_xact(xid, commit_data.commit_lsn);
2385 :
2386 122 : apply_action = get_transaction_apply_action(xid, &winfo);
2387 :
2388 122 : switch (apply_action)
2389 : {
2390 44 : case TRANS_LEADER_APPLY:
2391 :
2392 : /*
2393 : * The transaction has been serialized to file, so replay all the
2394 : * spooled operations.
2395 : */
2396 44 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
2397 : commit_data.commit_lsn);
2398 :
2399 42 : apply_handle_commit_internal(&commit_data);
2400 :
2401 : /* Unlink the files with serialized changes and subxact info. */
2402 42 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
2403 :
2404 42 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2405 42 : break;
2406 :
2407 36 : case TRANS_LEADER_SEND_TO_PARALLEL:
2408 : Assert(winfo);
2409 :
2410 36 : if (pa_send_data(winfo, s->len, s->data))
2411 : {
2412 : /* Finish processing the streaming transaction. */
2413 36 : pa_xact_finish(winfo, commit_data.end_lsn);
2414 34 : break;
2415 : }
2416 :
2417 : /*
2418 : * Switch to serialize mode when we are not able to send the
2419 : * change to parallel apply worker.
2420 : */
2421 0 : pa_switch_to_partial_serialize(winfo, true);
2422 :
2423 : /* fall through */
2424 4 : case TRANS_LEADER_PARTIAL_SERIALIZE:
2425 : Assert(winfo);
2426 :
2427 4 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_COMMIT,
2428 : &original_msg);
2429 :
2430 4 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
2431 :
2432 : /* Finish processing the streaming transaction. */
2433 4 : pa_xact_finish(winfo, commit_data.end_lsn);
2434 4 : break;
2435 :
2436 38 : case TRANS_PARALLEL_APPLY:
2437 :
2438 : /*
2439 : * If the parallel apply worker is applying spooled messages then
2440 : * close the file before committing.
2441 : */
2442 38 : if (stream_fd)
2443 4 : stream_close_file();
2444 :
2445 38 : apply_handle_commit_internal(&commit_data);
2446 :
2447 38 : MyParallelShared->last_commit_end = XactLastCommitEnd;
2448 :
2449 : /*
2450 : * It is important to set the transaction state as finished before
2451 : * releasing the lock. See pa_wait_for_xact_finish.
2452 : */
2453 38 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
2454 38 : pa_unlock_transaction(xid, AccessExclusiveLock);
2455 :
2456 38 : pa_reset_subtrans();
2457 :
2458 38 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2459 38 : break;
2460 :
2461 0 : default:
2462 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2463 : break;
2464 : }
2465 :
2466 : /* Process any tables that are being synchronized in parallel. */
2467 118 : process_syncing_tables(commit_data.end_lsn);
2468 :
2469 118 : pgstat_report_activity(STATE_IDLE, NULL);
2470 :
2471 118 : reset_apply_error_context_info();
2472 118 : }
2473 :
2474 : /*
2475 : * Helper function for apply_handle_commit and apply_handle_stream_commit.
2476 : */
2477 : static void
2478 952 : apply_handle_commit_internal(LogicalRepCommitData *commit_data)
2479 : {
2480 952 : if (is_skipping_changes())
2481 : {
2482 4 : stop_skipping_changes();
2483 :
2484 : /*
2485 : * Start a new transaction to clear the subskiplsn, if not started
2486 : * yet.
2487 : */
2488 4 : if (!IsTransactionState())
2489 2 : StartTransactionCommand();
2490 : }
2491 :
2492 952 : if (IsTransactionState())
2493 : {
2494 : /*
2495 : * The transaction is either non-empty or skipped, so we clear the
2496 : * subskiplsn.
2497 : */
2498 952 : clear_subscription_skip_lsn(commit_data->commit_lsn);
2499 :
2500 : /*
2501 : * Update origin state so we can restart streaming from correct
2502 : * position in case of crash.
2503 : */
2504 952 : replorigin_session_origin_lsn = commit_data->end_lsn;
2505 952 : replorigin_session_origin_timestamp = commit_data->committime;
2506 :
2507 952 : CommitTransactionCommand();
2508 :
2509 952 : if (IsTransactionBlock())
2510 : {
2511 8 : EndTransactionBlock(false);
2512 8 : CommitTransactionCommand();
2513 : }
2514 :
2515 952 : pgstat_report_stat(false);
2516 :
2517 952 : store_flush_position(commit_data->end_lsn, XactLastCommitEnd);
2518 : }
2519 : else
2520 : {
2521 : /* Process any invalidation messages that might have accumulated. */
2522 0 : AcceptInvalidationMessages();
2523 0 : maybe_reread_subscription();
2524 : }
2525 :
2526 952 : in_remote_transaction = false;
2527 952 : }
2528 :
2529 : /*
2530 : * Handle RELATION message.
2531 : *
2532 : * Note we don't do validation against local schema here. The validation
2533 : * against local schema is postponed until first change for given relation
2534 : * comes as we only care about it when applying changes for it anyway and we
2535 : * do less locking this way.
2536 : */
2537 : static void
2538 928 : apply_handle_relation(StringInfo s)
2539 : {
2540 : LogicalRepRelation *rel;
2541 :
2542 928 : if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
2543 74 : return;
2544 :
2545 854 : rel = logicalrep_read_rel(s);
2546 854 : logicalrep_relmap_update(rel);
2547 :
2548 : /* Also reset all entries in the partition map that refer to remoterel. */
2549 854 : logicalrep_partmap_reset_relmap(rel);
2550 : }
2551 :
2552 : /*
2553 : * Handle TYPE message.
2554 : *
2555 : * This implementation pays no attention to TYPE messages; we expect the user
2556 : * to have set things up so that the incoming data is acceptable to the input
2557 : * functions for the locally subscribed tables. Hence, we just read and
2558 : * discard the message.
2559 : */
2560 : static void
2561 36 : apply_handle_type(StringInfo s)
2562 : {
2563 : LogicalRepTyp typ;
2564 :
2565 36 : if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
2566 0 : return;
2567 :
2568 36 : logicalrep_read_typ(s, &typ);
2569 : }
2570 :
2571 : /*
2572 : * Check that we (the subscription owner) have sufficient privileges on the
2573 : * target relation to perform the given operation.
2574 : */
2575 : static void
2576 440650 : TargetPrivilegesCheck(Relation rel, AclMode mode)
2577 : {
2578 : Oid relid;
2579 : AclResult aclresult;
2580 :
2581 440650 : relid = RelationGetRelid(rel);
2582 440650 : aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
2583 440650 : if (aclresult != ACLCHECK_OK)
2584 14 : aclcheck_error(aclresult,
2585 14 : get_relkind_objtype(rel->rd_rel->relkind),
2586 14 : get_rel_name(relid));
2587 :
2588 : /*
2589 : * We lack the infrastructure to honor RLS policies. It might be possible
2590 : * to add such infrastructure here, but tablesync workers lack it, too, so
2591 : * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2592 : * but it seems dangerous to replicate a TRUNCATE and then refuse to
2593 : * replicate subsequent INSERTs, so we forbid all commands the same.
2594 : */
2595 440636 : if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2596 6 : ereport(ERROR,
2597 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2598 : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2599 : GetUserNameFromId(GetUserId(), true),
2600 : RelationGetRelationName(rel))));
2601 440630 : }
2602 :
2603 : /*
2604 : * Handle INSERT message.
2605 : */
2606 :
2607 : static void
2608 371722 : apply_handle_insert(StringInfo s)
2609 : {
2610 : LogicalRepRelMapEntry *rel;
2611 : LogicalRepTupleData newtup;
2612 : LogicalRepRelId relid;
2613 : UserContext ucxt;
2614 : ApplyExecutionData *edata;
2615 : EState *estate;
2616 : TupleTableSlot *remoteslot;
2617 : MemoryContext oldctx;
2618 : bool run_as_owner;
2619 :
2620 : /*
2621 : * Quick return if we are skipping data modification changes or handling
2622 : * streamed transactions.
2623 : */
2624 723442 : if (is_skipping_changes() ||
2625 351720 : handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
2626 220114 : return;
2627 :
2628 151708 : begin_replication_step();
2629 :
2630 151704 : relid = logicalrep_read_insert(s, &newtup);
2631 151704 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2632 151688 : if (!should_apply_changes_for_rel(rel))
2633 : {
2634 : /*
2635 : * The relation can't become interesting in the middle of the
2636 : * transaction so it's safe to unlock it.
2637 : */
2638 100 : logicalrep_rel_close(rel, RowExclusiveLock);
2639 100 : end_replication_step();
2640 100 : return;
2641 : }
2642 :
2643 : /*
2644 : * Make sure that any user-supplied code runs as the table owner, unless
2645 : * the user has opted out of that behavior.
2646 : */
2647 151588 : run_as_owner = MySubscription->runasowner;
2648 151588 : if (!run_as_owner)
2649 151572 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2650 :
2651 : /* Set relation for error callback */
2652 151588 : apply_error_callback_arg.rel = rel;
2653 :
2654 : /* Initialize the executor state. */
2655 151588 : edata = create_edata_for_relation(rel);
2656 151588 : estate = edata->estate;
2657 151588 : remoteslot = ExecInitExtraTupleSlot(estate,
2658 151588 : RelationGetDescr(rel->localrel),
2659 : &TTSOpsVirtual);
2660 :
2661 : /* Process and store remote tuple in the slot */
2662 151588 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2663 151588 : slot_store_data(remoteslot, rel, &newtup);
2664 151588 : slot_fill_defaults(rel, estate, remoteslot);
2665 151588 : MemoryContextSwitchTo(oldctx);
2666 :
2667 : /* For a partitioned table, insert the tuple into a partition. */
2668 151588 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2669 118 : apply_handle_tuple_routing(edata,
2670 : remoteslot, NULL, CMD_INSERT);
2671 : else
2672 : {
2673 151470 : ResultRelInfo *relinfo = edata->targetRelInfo;
2674 :
2675 151470 : ExecOpenIndices(relinfo, false);
2676 151470 : apply_handle_insert_internal(edata, relinfo, remoteslot);
2677 151440 : ExecCloseIndices(relinfo);
2678 : }
2679 :
2680 151528 : finish_edata(edata);
2681 :
2682 : /* Reset relation for error callback */
2683 151528 : apply_error_callback_arg.rel = NULL;
2684 :
2685 151528 : if (!run_as_owner)
2686 151518 : RestoreUserContext(&ucxt);
2687 :
2688 151528 : logicalrep_rel_close(rel, NoLock);
2689 :
2690 151528 : end_replication_step();
2691 : }
2692 :
2693 : /*
2694 : * Workhorse for apply_handle_insert()
2695 : * relinfo is for the relation we're actually inserting into
2696 : * (could be a child partition of edata->targetRelInfo)
2697 : */
2698 : static void
2699 151590 : apply_handle_insert_internal(ApplyExecutionData *edata,
2700 : ResultRelInfo *relinfo,
2701 : TupleTableSlot *remoteslot)
2702 : {
2703 151590 : EState *estate = edata->estate;
2704 :
2705 : /* Caller should have opened indexes already. */
2706 : Assert(relinfo->ri_IndexRelationDescs != NULL ||
2707 : !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
2708 : RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
2709 :
2710 : /* Caller will not have done this bit. */
2711 : Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
2712 151590 : InitConflictIndexes(relinfo);
2713 :
2714 : /* Do the insert. */
2715 151590 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
2716 151578 : ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2717 151530 : }
2718 :
2719 : /*
2720 : * Check if the logical replication relation is updatable and throw
2721 : * appropriate error if it isn't.
2722 : */
2723 : static void
2724 144578 : check_relation_updatable(LogicalRepRelMapEntry *rel)
2725 : {
2726 : /*
2727 : * For partitioned tables, we only need to care if the target partition is
2728 : * updatable (aka has PK or RI defined for it).
2729 : */
2730 144578 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2731 60 : return;
2732 :
2733 : /* Updatable, no error. */
2734 144518 : if (rel->updatable)
2735 144518 : return;
2736 :
2737 : /*
2738 : * We are in error mode so it's fine this is somewhat slow. It's better to
2739 : * give user correct error.
2740 : */
2741 0 : if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
2742 : {
2743 0 : ereport(ERROR,
2744 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2745 : errmsg("publisher did not send replica identity column "
2746 : "expected by the logical replication target relation \"%s.%s\"",
2747 : rel->remoterel.nspname, rel->remoterel.relname)));
2748 : }
2749 :
2750 0 : ereport(ERROR,
2751 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2752 : errmsg("logical replication target relation \"%s.%s\" has "
2753 : "neither REPLICA IDENTITY index nor PRIMARY "
2754 : "KEY and published relation does not have "
2755 : "REPLICA IDENTITY FULL",
2756 : rel->remoterel.nspname, rel->remoterel.relname)));
2757 : }
2758 :
2759 : /*
2760 : * Handle UPDATE message.
2761 : *
2762 : * TODO: FDW support
2763 : */
2764 : static void
2765 132324 : apply_handle_update(StringInfo s)
2766 : {
2767 : LogicalRepRelMapEntry *rel;
2768 : LogicalRepRelId relid;
2769 : UserContext ucxt;
2770 : ApplyExecutionData *edata;
2771 : EState *estate;
2772 : LogicalRepTupleData oldtup;
2773 : LogicalRepTupleData newtup;
2774 : bool has_oldtup;
2775 : TupleTableSlot *remoteslot;
2776 : RTEPermissionInfo *target_perminfo;
2777 : MemoryContext oldctx;
2778 : bool run_as_owner;
2779 :
2780 : /*
2781 : * Quick return if we are skipping data modification changes or handling
2782 : * streamed transactions.
2783 : */
2784 264642 : if (is_skipping_changes() ||
2785 132318 : handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
2786 68446 : return;
2787 :
2788 63878 : begin_replication_step();
2789 :
2790 63876 : relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2791 : &newtup);
2792 63876 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2793 63876 : if (!should_apply_changes_for_rel(rel))
2794 : {
2795 : /*
2796 : * The relation can't become interesting in the middle of the
2797 : * transaction so it's safe to unlock it.
2798 : */
2799 0 : logicalrep_rel_close(rel, RowExclusiveLock);
2800 0 : end_replication_step();
2801 0 : return;
2802 : }
2803 :
2804 : /* Set relation for error callback */
2805 63876 : apply_error_callback_arg.rel = rel;
2806 :
2807 : /* Check if we can do the update. */
2808 63876 : check_relation_updatable(rel);
2809 :
2810 : /*
2811 : * Make sure that any user-supplied code runs as the table owner, unless
2812 : * the user has opted out of that behavior.
2813 : */
2814 63876 : run_as_owner = MySubscription->runasowner;
2815 63876 : if (!run_as_owner)
2816 63870 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2817 :
2818 : /* Initialize the executor state. */
2819 63874 : edata = create_edata_for_relation(rel);
2820 63874 : estate = edata->estate;
2821 63874 : remoteslot = ExecInitExtraTupleSlot(estate,
2822 63874 : RelationGetDescr(rel->localrel),
2823 : &TTSOpsVirtual);
2824 :
2825 : /*
2826 : * Populate updatedCols so that per-column triggers can fire, and so
2827 : * executor can correctly pass down indexUnchanged hint. This could
2828 : * include more columns than were actually changed on the publisher
2829 : * because the logical replication protocol doesn't contain that
2830 : * information. But it would for example exclude columns that only exist
2831 : * on the subscriber, since we are not touching those.
2832 : */
2833 63874 : target_perminfo = list_nth(estate->es_rteperminfos, 0);
2834 318626 : for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2835 : {
2836 254752 : Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i);
2837 254752 : int remoteattnum = rel->attrmap->attnums[i];
2838 :
2839 254752 : if (!att->attisdropped && remoteattnum >= 0)
2840 : {
2841 : Assert(remoteattnum < newtup.ncols);
2842 137718 : if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2843 137712 : target_perminfo->updatedCols =
2844 137712 : bms_add_member(target_perminfo->updatedCols,
2845 : i + 1 - FirstLowInvalidHeapAttributeNumber);
2846 : }
2847 : }
2848 :
2849 : /* Build the search tuple. */
2850 63874 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2851 63874 : slot_store_data(remoteslot, rel,
2852 63874 : has_oldtup ? &oldtup : &newtup);
2853 63874 : MemoryContextSwitchTo(oldctx);
2854 :
2855 : /* For a partitioned table, apply update to correct partition. */
2856 63874 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2857 26 : apply_handle_tuple_routing(edata,
2858 : remoteslot, &newtup, CMD_UPDATE);
2859 : else
2860 63848 : apply_handle_update_internal(edata, edata->targetRelInfo,
2861 : remoteslot, &newtup, rel->localindexoid);
2862 :
2863 63862 : finish_edata(edata);
2864 :
2865 : /* Reset relation for error callback */
2866 63862 : apply_error_callback_arg.rel = NULL;
2867 :
2868 63862 : if (!run_as_owner)
2869 63858 : RestoreUserContext(&ucxt);
2870 :
2871 63862 : logicalrep_rel_close(rel, NoLock);
2872 :
2873 63862 : end_replication_step();
2874 : }
2875 :
2876 : /*
2877 : * Workhorse for apply_handle_update()
2878 : * relinfo is for the relation we're actually updating in
2879 : * (could be a child partition of edata->targetRelInfo)
2880 : */
2881 : static void
2882 63848 : apply_handle_update_internal(ApplyExecutionData *edata,
2883 : ResultRelInfo *relinfo,
2884 : TupleTableSlot *remoteslot,
2885 : LogicalRepTupleData *newtup,
2886 : Oid localindexoid)
2887 : {
2888 63848 : EState *estate = edata->estate;
2889 63848 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2890 63848 : Relation localrel = relinfo->ri_RelationDesc;
2891 : EPQState epqstate;
2892 63848 : TupleTableSlot *localslot = NULL;
2893 63848 : ConflictTupleInfo conflicttuple = {0};
2894 : bool found;
2895 : MemoryContext oldctx;
2896 :
2897 63848 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2898 63848 : ExecOpenIndices(relinfo, false);
2899 :
2900 63848 : found = FindReplTupleInLocalRel(edata, localrel,
2901 : &relmapentry->remoterel,
2902 : localindexoid,
2903 : remoteslot, &localslot);
2904 :
2905 : /*
2906 : * Tuple found.
2907 : *
2908 : * Note this will fail if there are other conflicting unique indexes.
2909 : */
2910 63840 : if (found)
2911 : {
2912 : /*
2913 : * Report the conflict if the tuple was modified by a different
2914 : * origin.
2915 : */
2916 63826 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
2917 4 : &conflicttuple.origin, &conflicttuple.ts) &&
2918 4 : conflicttuple.origin != replorigin_session_origin)
2919 : {
2920 : TupleTableSlot *newslot;
2921 :
2922 : /* Store the new tuple for conflict reporting */
2923 4 : newslot = table_slot_create(localrel, &estate->es_tupleTable);
2924 4 : slot_store_data(newslot, relmapentry, newtup);
2925 :
2926 4 : conflicttuple.slot = localslot;
2927 :
2928 4 : ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
2929 : remoteslot, newslot,
2930 4 : list_make1(&conflicttuple));
2931 : }
2932 :
2933 : /* Process and store remote tuple in the slot */
2934 63826 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2935 63826 : slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2936 63826 : MemoryContextSwitchTo(oldctx);
2937 :
2938 63826 : EvalPlanQualSetSlot(&epqstate, remoteslot);
2939 :
2940 63826 : InitConflictIndexes(relinfo);
2941 :
2942 : /* Do the actual update. */
2943 63826 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
2944 63826 : ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2945 : remoteslot);
2946 : }
2947 : else
2948 : {
2949 : ConflictType type;
2950 14 : TupleTableSlot *newslot = localslot;
2951 :
2952 : /*
2953 : * Detecting whether the tuple was recently deleted or never existed
2954 : * is crucial to avoid misleading the user during conflict handling.
2955 : */
2956 14 : if (FindDeletedTupleInLocalRel(localrel, localindexoid, remoteslot,
2957 : &conflicttuple.xmin,
2958 : &conflicttuple.origin,
2959 6 : &conflicttuple.ts) &&
2960 6 : conflicttuple.origin != replorigin_session_origin)
2961 6 : type = CT_UPDATE_DELETED;
2962 : else
2963 8 : type = CT_UPDATE_MISSING;
2964 :
2965 : /* Store the new tuple for conflict reporting */
2966 14 : slot_store_data(newslot, relmapentry, newtup);
2967 :
2968 : /*
2969 : * The tuple to be updated could not be found or was deleted. Do
2970 : * nothing except for emitting a log message.
2971 : */
2972 14 : ReportApplyConflict(estate, relinfo, LOG, type, remoteslot, newslot,
2973 14 : list_make1(&conflicttuple));
2974 : }
2975 :
2976 : /* Cleanup. */
2977 63836 : ExecCloseIndices(relinfo);
2978 63836 : EvalPlanQualEnd(&epqstate);
2979 63836 : }
2980 :
2981 : /*
2982 : * Handle DELETE message.
2983 : *
2984 : * TODO: FDW support
2985 : */
2986 : static void
2987 163872 : apply_handle_delete(StringInfo s)
2988 : {
2989 : LogicalRepRelMapEntry *rel;
2990 : LogicalRepTupleData oldtup;
2991 : LogicalRepRelId relid;
2992 : UserContext ucxt;
2993 : ApplyExecutionData *edata;
2994 : EState *estate;
2995 : TupleTableSlot *remoteslot;
2996 : MemoryContext oldctx;
2997 : bool run_as_owner;
2998 :
2999 : /*
3000 : * Quick return if we are skipping data modification changes or handling
3001 : * streamed transactions.
3002 : */
3003 327744 : if (is_skipping_changes() ||
3004 163872 : handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
3005 83230 : return;
3006 :
3007 80642 : begin_replication_step();
3008 :
3009 80642 : relid = logicalrep_read_delete(s, &oldtup);
3010 80642 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
3011 80642 : if (!should_apply_changes_for_rel(rel))
3012 : {
3013 : /*
3014 : * The relation can't become interesting in the middle of the
3015 : * transaction so it's safe to unlock it.
3016 : */
3017 0 : logicalrep_rel_close(rel, RowExclusiveLock);
3018 0 : end_replication_step();
3019 0 : return;
3020 : }
3021 :
3022 : /* Set relation for error callback */
3023 80642 : apply_error_callback_arg.rel = rel;
3024 :
3025 : /* Check if we can do the delete. */
3026 80642 : check_relation_updatable(rel);
3027 :
3028 : /*
3029 : * Make sure that any user-supplied code runs as the table owner, unless
3030 : * the user has opted out of that behavior.
3031 : */
3032 80642 : run_as_owner = MySubscription->runasowner;
3033 80642 : if (!run_as_owner)
3034 80638 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
3035 :
3036 : /* Initialize the executor state. */
3037 80642 : edata = create_edata_for_relation(rel);
3038 80642 : estate = edata->estate;
3039 80642 : remoteslot = ExecInitExtraTupleSlot(estate,
3040 80642 : RelationGetDescr(rel->localrel),
3041 : &TTSOpsVirtual);
3042 :
3043 : /* Build the search tuple. */
3044 80642 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3045 80642 : slot_store_data(remoteslot, rel, &oldtup);
3046 80642 : MemoryContextSwitchTo(oldctx);
3047 :
3048 : /* For a partitioned table, apply delete to correct partition. */
3049 80642 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3050 34 : apply_handle_tuple_routing(edata,
3051 : remoteslot, NULL, CMD_DELETE);
3052 : else
3053 : {
3054 80608 : ResultRelInfo *relinfo = edata->targetRelInfo;
3055 :
3056 80608 : ExecOpenIndices(relinfo, false);
3057 80608 : apply_handle_delete_internal(edata, relinfo,
3058 : remoteslot, rel->localindexoid);
3059 80608 : ExecCloseIndices(relinfo);
3060 : }
3061 :
3062 80642 : finish_edata(edata);
3063 :
3064 : /* Reset relation for error callback */
3065 80642 : apply_error_callback_arg.rel = NULL;
3066 :
3067 80642 : if (!run_as_owner)
3068 80638 : RestoreUserContext(&ucxt);
3069 :
3070 80642 : logicalrep_rel_close(rel, NoLock);
3071 :
3072 80642 : end_replication_step();
3073 : }
3074 :
3075 : /*
3076 : * Workhorse for apply_handle_delete()
3077 : * relinfo is for the relation we're actually deleting from
3078 : * (could be a child partition of edata->targetRelInfo)
3079 : */
3080 : static void
3081 80642 : apply_handle_delete_internal(ApplyExecutionData *edata,
3082 : ResultRelInfo *relinfo,
3083 : TupleTableSlot *remoteslot,
3084 : Oid localindexoid)
3085 : {
3086 80642 : EState *estate = edata->estate;
3087 80642 : Relation localrel = relinfo->ri_RelationDesc;
3088 80642 : LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
3089 : EPQState epqstate;
3090 : TupleTableSlot *localslot;
3091 80642 : ConflictTupleInfo conflicttuple = {0};
3092 : bool found;
3093 :
3094 80642 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3095 :
3096 : /* Caller should have opened indexes already. */
3097 : Assert(relinfo->ri_IndexRelationDescs != NULL ||
3098 : !localrel->rd_rel->relhasindex ||
3099 : RelationGetIndexList(localrel) == NIL);
3100 :
3101 80642 : found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
3102 : remoteslot, &localslot);
3103 :
3104 : /* If found delete it. */
3105 80642 : if (found)
3106 : {
3107 : /*
3108 : * Report the conflict if the tuple was modified by a different
3109 : * origin.
3110 : */
3111 80624 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3112 12 : &conflicttuple.origin, &conflicttuple.ts) &&
3113 12 : conflicttuple.origin != replorigin_session_origin)
3114 : {
3115 10 : conflicttuple.slot = localslot;
3116 10 : ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
3117 : remoteslot, NULL,
3118 10 : list_make1(&conflicttuple));
3119 : }
3120 :
3121 80624 : EvalPlanQualSetSlot(&epqstate, localslot);
3122 :
3123 : /* Do the actual delete. */
3124 80624 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE);
3125 80624 : ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
3126 : }
3127 : else
3128 : {
3129 : /*
3130 : * The tuple to be deleted could not be found. Do nothing except for
3131 : * emitting a log message.
3132 : */
3133 18 : ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
3134 18 : remoteslot, NULL, list_make1(&conflicttuple));
3135 : }
3136 :
3137 : /* Cleanup. */
3138 80642 : EvalPlanQualEnd(&epqstate);
3139 80642 : }
3140 :
3141 : /*
3142 : * Try to find a tuple received from the publication side (in 'remoteslot') in
3143 : * the corresponding local relation using either replica identity index,
3144 : * primary key, index or if needed, sequential scan.
3145 : *
3146 : * Local tuple, if found, is returned in '*localslot'.
3147 : */
3148 : static bool
3149 144516 : FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
3150 : LogicalRepRelation *remoterel,
3151 : Oid localidxoid,
3152 : TupleTableSlot *remoteslot,
3153 : TupleTableSlot **localslot)
3154 : {
3155 144516 : EState *estate = edata->estate;
3156 : bool found;
3157 :
3158 : /*
3159 : * Regardless of the top-level operation, we're performing a read here, so
3160 : * check for SELECT privileges.
3161 : */
3162 144516 : TargetPrivilegesCheck(localrel, ACL_SELECT);
3163 :
3164 144508 : *localslot = table_slot_create(localrel, &estate->es_tupleTable);
3165 :
3166 : Assert(OidIsValid(localidxoid) ||
3167 : (remoterel->replident == REPLICA_IDENTITY_FULL));
3168 :
3169 144508 : if (OidIsValid(localidxoid))
3170 : {
3171 : #ifdef USE_ASSERT_CHECKING
3172 : Relation idxrel = index_open(localidxoid, AccessShareLock);
3173 :
3174 : /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
3175 : Assert(GetRelationIdentityOrPK(localrel) == localidxoid ||
3176 : (remoterel->replident == REPLICA_IDENTITY_FULL &&
3177 : IsIndexUsableForReplicaIdentityFull(idxrel,
3178 : edata->targetRel->attrmap)));
3179 : index_close(idxrel, AccessShareLock);
3180 : #endif
3181 :
3182 144206 : found = RelationFindReplTupleByIndex(localrel, localidxoid,
3183 : LockTupleExclusive,
3184 : remoteslot, *localslot);
3185 : }
3186 : else
3187 302 : found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
3188 : remoteslot, *localslot);
3189 :
3190 144508 : return found;
3191 : }
3192 :
3193 : /*
3194 : * Determine whether the index can reliably locate the deleted tuple in the
3195 : * local relation.
3196 : *
3197 : * An index may exclude deleted tuples if it was re-indexed or re-created during
3198 : * change application. Therefore, an index is considered usable only if the
3199 : * conflict detection slot.xmin (conflict_detection_xmin) is greater than the
3200 : * index tuple's xmin. This ensures that any tuples deleted prior to the index
3201 : * creation or re-indexing are not relevant for conflict detection in the
3202 : * current apply worker.
3203 : *
3204 : * Note that indexes may also be excluded if they were modified by other DDL
3205 : * operations, such as ALTER INDEX. However, this is acceptable, as the
3206 : * likelihood of such DDL changes coinciding with the need to scan dead
3207 : * tuples for the update_deleted is low.
3208 : */
3209 : static bool
3210 2 : IsIndexUsableForFindingDeletedTuple(Oid localindexoid,
3211 : TransactionId conflict_detection_xmin)
3212 : {
3213 : HeapTuple index_tuple;
3214 : TransactionId index_xmin;
3215 :
3216 2 : index_tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(localindexoid));
3217 :
3218 2 : if (!HeapTupleIsValid(index_tuple)) /* should not happen */
3219 0 : elog(ERROR, "cache lookup failed for index %u", localindexoid);
3220 :
3221 : /*
3222 : * No need to check for a frozen transaction ID, as
3223 : * TransactionIdPrecedes() manages it internally, treating it as falling
3224 : * behind the conflict_detection_xmin.
3225 : */
3226 2 : index_xmin = HeapTupleHeaderGetXmin(index_tuple->t_data);
3227 :
3228 2 : ReleaseSysCache(index_tuple);
3229 :
3230 2 : return TransactionIdPrecedes(index_xmin, conflict_detection_xmin);
3231 : }
3232 :
3233 : /*
3234 : * Attempts to locate a deleted tuple in the local relation that matches the
3235 : * values of the tuple received from the publication side (in 'remoteslot').
3236 : * The search is performed using either the replica identity index, primary
3237 : * key, other available index, or a sequential scan if necessary.
3238 : *
3239 : * Returns true if the deleted tuple is found. If found, the transaction ID,
3240 : * origin, and commit timestamp of the deletion are stored in '*delete_xid',
3241 : * '*delete_origin', and '*delete_time' respectively.
3242 : */
3243 : static bool
3244 18 : FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid,
3245 : TupleTableSlot *remoteslot,
3246 : TransactionId *delete_xid, RepOriginId *delete_origin,
3247 : TimestampTz *delete_time)
3248 : {
3249 : TransactionId oldestxmin;
3250 :
3251 : /*
3252 : * Return false if either dead tuples are not retained or commit timestamp
3253 : * data is not available.
3254 : */
3255 18 : if (!MySubscription->retaindeadtuples || !track_commit_timestamp)
3256 12 : return false;
3257 :
3258 : /*
3259 : * For conflict detection, we use the leader worker's
3260 : * oldest_nonremovable_xid value instead of invoking
3261 : * GetOldestNonRemovableTransactionId() or using the conflict detection
3262 : * slot's xmin. The oldest_nonremovable_xid acts as a threshold to
3263 : * identify tuples that were recently deleted. These deleted tuples are no
3264 : * longer visible to concurrent transactions. However, if a remote update
3265 : * matches such a tuple, we log an update_deleted conflict.
3266 : *
3267 : * While GetOldestNonRemovableTransactionId() and slot.xmin may return
3268 : * transaction IDs older than oldest_nonremovable_xid, for our current
3269 : * purpose, it is acceptable to treat tuples deleted by transactions prior
3270 : * to oldest_nonremovable_xid as update_missing conflicts.
3271 : */
3272 6 : if (am_leader_apply_worker())
3273 : {
3274 6 : oldestxmin = MyLogicalRepWorker->oldest_nonremovable_xid;
3275 : }
3276 : else
3277 : {
3278 : LogicalRepWorker *leader;
3279 :
3280 : /*
3281 : * Obtain the information from the leader apply worker as only the
3282 : * leader manages oldest_nonremovable_xid (see
3283 : * maybe_advance_nonremovable_xid() for details).
3284 : */
3285 0 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
3286 0 : leader = logicalrep_worker_find(MyLogicalRepWorker->subid,
3287 : InvalidOid, false);
3288 0 : if (!leader)
3289 : {
3290 0 : ereport(ERROR,
3291 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3292 : errmsg("could not detect conflict as the leader apply worker has exited")));
3293 : }
3294 :
3295 0 : SpinLockAcquire(&leader->relmutex);
3296 0 : oldestxmin = leader->oldest_nonremovable_xid;
3297 0 : SpinLockRelease(&leader->relmutex);
3298 0 : LWLockRelease(LogicalRepWorkerLock);
3299 : }
3300 :
3301 : /*
3302 : * Return false if the leader apply worker has stopped retaining
3303 : * information for detecting conflicts. This implies that update_deleted
3304 : * can no longer be reliably detected.
3305 : */
3306 6 : if (!TransactionIdIsValid(oldestxmin))
3307 0 : return false;
3308 :
3309 8 : if (OidIsValid(localidxoid) &&
3310 2 : IsIndexUsableForFindingDeletedTuple(localidxoid, oldestxmin))
3311 2 : return RelationFindDeletedTupleInfoByIndex(localrel, localidxoid,
3312 : remoteslot, oldestxmin,
3313 : delete_xid, delete_origin,
3314 : delete_time);
3315 : else
3316 4 : return RelationFindDeletedTupleInfoSeq(localrel, remoteslot,
3317 : oldestxmin, delete_xid,
3318 : delete_origin, delete_time);
3319 : }
3320 :
3321 : /*
3322 : * This handles insert, update, delete on a partitioned table.
3323 : */
3324 : static void
3325 178 : apply_handle_tuple_routing(ApplyExecutionData *edata,
3326 : TupleTableSlot *remoteslot,
3327 : LogicalRepTupleData *newtup,
3328 : CmdType operation)
3329 : {
3330 178 : EState *estate = edata->estate;
3331 178 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
3332 178 : ResultRelInfo *relinfo = edata->targetRelInfo;
3333 178 : Relation parentrel = relinfo->ri_RelationDesc;
3334 : ModifyTableState *mtstate;
3335 : PartitionTupleRouting *proute;
3336 : ResultRelInfo *partrelinfo;
3337 : Relation partrel;
3338 : TupleTableSlot *remoteslot_part;
3339 : TupleConversionMap *map;
3340 : MemoryContext oldctx;
3341 178 : LogicalRepRelMapEntry *part_entry = NULL;
3342 178 : AttrMap *attrmap = NULL;
3343 :
3344 : /* ModifyTableState is needed for ExecFindPartition(). */
3345 178 : edata->mtstate = mtstate = makeNode(ModifyTableState);
3346 178 : mtstate->ps.plan = NULL;
3347 178 : mtstate->ps.state = estate;
3348 178 : mtstate->operation = operation;
3349 178 : mtstate->resultRelInfo = relinfo;
3350 :
3351 : /* ... as is PartitionTupleRouting. */
3352 178 : edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
3353 :
3354 : /*
3355 : * Find the partition to which the "search tuple" belongs.
3356 : */
3357 : Assert(remoteslot != NULL);
3358 178 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3359 178 : partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
3360 : remoteslot, estate);
3361 : Assert(partrelinfo != NULL);
3362 178 : partrel = partrelinfo->ri_RelationDesc;
3363 :
3364 : /*
3365 : * Check for supported relkind. We need this since partitions might be of
3366 : * unsupported relkinds; and the set of partitions can change, so checking
3367 : * at CREATE/ALTER SUBSCRIPTION would be insufficient.
3368 : */
3369 178 : CheckSubscriptionRelkind(partrel->rd_rel->relkind,
3370 178 : get_namespace_name(RelationGetNamespace(partrel)),
3371 178 : RelationGetRelationName(partrel));
3372 :
3373 : /*
3374 : * To perform any of the operations below, the tuple must match the
3375 : * partition's rowtype. Convert if needed or just copy, using a dedicated
3376 : * slot to store the tuple in any case.
3377 : */
3378 178 : remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
3379 178 : if (remoteslot_part == NULL)
3380 112 : remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
3381 178 : map = ExecGetRootToChildMap(partrelinfo, estate);
3382 178 : if (map != NULL)
3383 : {
3384 66 : attrmap = map->attrMap;
3385 66 : remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
3386 : remoteslot_part);
3387 : }
3388 : else
3389 : {
3390 112 : remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
3391 112 : slot_getallattrs(remoteslot_part);
3392 : }
3393 178 : MemoryContextSwitchTo(oldctx);
3394 :
3395 : /* Check if we can do the update or delete on the leaf partition. */
3396 178 : if (operation == CMD_UPDATE || operation == CMD_DELETE)
3397 : {
3398 60 : part_entry = logicalrep_partition_open(relmapentry, partrel,
3399 : attrmap);
3400 60 : check_relation_updatable(part_entry);
3401 : }
3402 :
3403 178 : switch (operation)
3404 : {
3405 118 : case CMD_INSERT:
3406 118 : apply_handle_insert_internal(edata, partrelinfo,
3407 : remoteslot_part);
3408 88 : break;
3409 :
3410 34 : case CMD_DELETE:
3411 34 : apply_handle_delete_internal(edata, partrelinfo,
3412 : remoteslot_part,
3413 : part_entry->localindexoid);
3414 34 : break;
3415 :
3416 26 : case CMD_UPDATE:
3417 :
3418 : /*
3419 : * For UPDATE, depending on whether or not the updated tuple
3420 : * satisfies the partition's constraint, perform a simple UPDATE
3421 : * of the partition or move the updated tuple into a different
3422 : * suitable partition.
3423 : */
3424 : {
3425 : TupleTableSlot *localslot;
3426 : ResultRelInfo *partrelinfo_new;
3427 : Relation partrel_new;
3428 : bool found;
3429 : EPQState epqstate;
3430 26 : ConflictTupleInfo conflicttuple = {0};
3431 :
3432 : /* Get the matching local tuple from the partition. */
3433 26 : found = FindReplTupleInLocalRel(edata, partrel,
3434 : &part_entry->remoterel,
3435 : part_entry->localindexoid,
3436 : remoteslot_part, &localslot);
3437 26 : if (!found)
3438 : {
3439 : ConflictType type;
3440 4 : TupleTableSlot *newslot = localslot;
3441 :
3442 : /*
3443 : * Detecting whether the tuple was recently deleted or
3444 : * never existed is crucial to avoid misleading the user
3445 : * during conflict handling.
3446 : */
3447 4 : if (FindDeletedTupleInLocalRel(partrel,
3448 : part_entry->localindexoid,
3449 : remoteslot_part,
3450 : &conflicttuple.xmin,
3451 : &conflicttuple.origin,
3452 0 : &conflicttuple.ts) &&
3453 0 : conflicttuple.origin != replorigin_session_origin)
3454 0 : type = CT_UPDATE_DELETED;
3455 : else
3456 4 : type = CT_UPDATE_MISSING;
3457 :
3458 : /* Store the new tuple for conflict reporting */
3459 4 : slot_store_data(newslot, part_entry, newtup);
3460 :
3461 : /*
3462 : * The tuple to be updated could not be found or was
3463 : * deleted. Do nothing except for emitting a log message.
3464 : */
3465 4 : ReportApplyConflict(estate, partrelinfo, LOG,
3466 : type, remoteslot_part, newslot,
3467 4 : list_make1(&conflicttuple));
3468 :
3469 4 : return;
3470 : }
3471 :
3472 : /*
3473 : * Report the conflict if the tuple was modified by a
3474 : * different origin.
3475 : */
3476 22 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3477 : &conflicttuple.origin,
3478 2 : &conflicttuple.ts) &&
3479 2 : conflicttuple.origin != replorigin_session_origin)
3480 : {
3481 : TupleTableSlot *newslot;
3482 :
3483 : /* Store the new tuple for conflict reporting */
3484 2 : newslot = table_slot_create(partrel, &estate->es_tupleTable);
3485 2 : slot_store_data(newslot, part_entry, newtup);
3486 :
3487 2 : conflicttuple.slot = localslot;
3488 :
3489 2 : ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
3490 : remoteslot_part, newslot,
3491 2 : list_make1(&conflicttuple));
3492 : }
3493 :
3494 : /*
3495 : * Apply the update to the local tuple, putting the result in
3496 : * remoteslot_part.
3497 : */
3498 22 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3499 22 : slot_modify_data(remoteslot_part, localslot, part_entry,
3500 : newtup);
3501 22 : MemoryContextSwitchTo(oldctx);
3502 :
3503 22 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3504 :
3505 : /*
3506 : * Does the updated tuple still satisfy the current
3507 : * partition's constraint?
3508 : */
3509 44 : if (!partrel->rd_rel->relispartition ||
3510 22 : ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3511 : false))
3512 : {
3513 : /*
3514 : * Yes, so simply UPDATE the partition. We don't call
3515 : * apply_handle_update_internal() here, which would
3516 : * normally do the following work, to avoid repeating some
3517 : * work already done above to find the local tuple in the
3518 : * partition.
3519 : */
3520 20 : InitConflictIndexes(partrelinfo);
3521 :
3522 20 : EvalPlanQualSetSlot(&epqstate, remoteslot_part);
3523 20 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
3524 : ACL_UPDATE);
3525 20 : ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3526 : localslot, remoteslot_part);
3527 : }
3528 : else
3529 : {
3530 : /* Move the tuple into the new partition. */
3531 :
3532 : /*
3533 : * New partition will be found using tuple routing, which
3534 : * can only occur via the parent table. We might need to
3535 : * convert the tuple to the parent's rowtype. Note that
3536 : * this is the tuple found in the partition, not the
3537 : * original search tuple received by this function.
3538 : */
3539 2 : if (map)
3540 : {
3541 : TupleConversionMap *PartitionToRootMap =
3542 2 : convert_tuples_by_name(RelationGetDescr(partrel),
3543 : RelationGetDescr(parentrel));
3544 :
3545 : remoteslot =
3546 2 : execute_attr_map_slot(PartitionToRootMap->attrMap,
3547 : remoteslot_part, remoteslot);
3548 : }
3549 : else
3550 : {
3551 0 : remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3552 0 : slot_getallattrs(remoteslot);
3553 : }
3554 :
3555 : /* Find the new partition. */
3556 2 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3557 2 : partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3558 : proute, remoteslot,
3559 : estate);
3560 2 : MemoryContextSwitchTo(oldctx);
3561 : Assert(partrelinfo_new != partrelinfo);
3562 2 : partrel_new = partrelinfo_new->ri_RelationDesc;
3563 :
3564 : /* Check that new partition also has supported relkind. */
3565 2 : CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3566 2 : get_namespace_name(RelationGetNamespace(partrel_new)),
3567 2 : RelationGetRelationName(partrel_new));
3568 :
3569 : /* DELETE old tuple found in the old partition. */
3570 2 : EvalPlanQualSetSlot(&epqstate, localslot);
3571 2 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, ACL_DELETE);
3572 2 : ExecSimpleRelationDelete(partrelinfo, estate, &epqstate, localslot);
3573 :
3574 : /* INSERT new tuple into the new partition. */
3575 :
3576 : /*
3577 : * Convert the replacement tuple to match the destination
3578 : * partition rowtype.
3579 : */
3580 2 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3581 2 : remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3582 2 : if (remoteslot_part == NULL)
3583 2 : remoteslot_part = table_slot_create(partrel_new,
3584 : &estate->es_tupleTable);
3585 2 : map = ExecGetRootToChildMap(partrelinfo_new, estate);
3586 2 : if (map != NULL)
3587 : {
3588 0 : remoteslot_part = execute_attr_map_slot(map->attrMap,
3589 : remoteslot,
3590 : remoteslot_part);
3591 : }
3592 : else
3593 : {
3594 2 : remoteslot_part = ExecCopySlot(remoteslot_part,
3595 : remoteslot);
3596 2 : slot_getallattrs(remoteslot);
3597 : }
3598 2 : MemoryContextSwitchTo(oldctx);
3599 2 : apply_handle_insert_internal(edata, partrelinfo_new,
3600 : remoteslot_part);
3601 : }
3602 :
3603 22 : EvalPlanQualEnd(&epqstate);
3604 : }
3605 22 : break;
3606 :
3607 0 : default:
3608 0 : elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3609 : break;
3610 : }
3611 : }
3612 :
3613 : /*
3614 : * Handle TRUNCATE message.
3615 : *
3616 : * TODO: FDW support
3617 : */
3618 : static void
3619 40 : apply_handle_truncate(StringInfo s)
3620 : {
3621 40 : bool cascade = false;
3622 40 : bool restart_seqs = false;
3623 40 : List *remote_relids = NIL;
3624 40 : List *remote_rels = NIL;
3625 40 : List *rels = NIL;
3626 40 : List *part_rels = NIL;
3627 40 : List *relids = NIL;
3628 40 : List *relids_logged = NIL;
3629 : ListCell *lc;
3630 40 : LOCKMODE lockmode = AccessExclusiveLock;
3631 :
3632 : /*
3633 : * Quick return if we are skipping data modification changes or handling
3634 : * streamed transactions.
3635 : */
3636 80 : if (is_skipping_changes() ||
3637 40 : handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
3638 0 : return;
3639 :
3640 40 : begin_replication_step();
3641 :
3642 40 : remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3643 :
3644 98 : foreach(lc, remote_relids)
3645 : {
3646 58 : LogicalRepRelId relid = lfirst_oid(lc);
3647 : LogicalRepRelMapEntry *rel;
3648 :
3649 58 : rel = logicalrep_rel_open(relid, lockmode);
3650 58 : if (!should_apply_changes_for_rel(rel))
3651 : {
3652 : /*
3653 : * The relation can't become interesting in the middle of the
3654 : * transaction so it's safe to unlock it.
3655 : */
3656 0 : logicalrep_rel_close(rel, lockmode);
3657 0 : continue;
3658 : }
3659 :
3660 58 : remote_rels = lappend(remote_rels, rel);
3661 58 : TargetPrivilegesCheck(rel->localrel, ACL_TRUNCATE);
3662 58 : rels = lappend(rels, rel->localrel);
3663 58 : relids = lappend_oid(relids, rel->localreloid);
3664 58 : if (RelationIsLogicallyLogged(rel->localrel))
3665 2 : relids_logged = lappend_oid(relids_logged, rel->localreloid);
3666 :
3667 : /*
3668 : * Truncate partitions if we got a message to truncate a partitioned
3669 : * table.
3670 : */
3671 58 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3672 : {
3673 : ListCell *child;
3674 8 : List *children = find_all_inheritors(rel->localreloid,
3675 : lockmode,
3676 : NULL);
3677 :
3678 30 : foreach(child, children)
3679 : {
3680 22 : Oid childrelid = lfirst_oid(child);
3681 : Relation childrel;
3682 :
3683 22 : if (list_member_oid(relids, childrelid))
3684 8 : continue;
3685 :
3686 : /* find_all_inheritors already got lock */
3687 14 : childrel = table_open(childrelid, NoLock);
3688 :
3689 : /*
3690 : * Ignore temp tables of other backends. See similar code in
3691 : * ExecuteTruncate().
3692 : */
3693 14 : if (RELATION_IS_OTHER_TEMP(childrel))
3694 : {
3695 0 : table_close(childrel, lockmode);
3696 0 : continue;
3697 : }
3698 :
3699 14 : TargetPrivilegesCheck(childrel, ACL_TRUNCATE);
3700 14 : rels = lappend(rels, childrel);
3701 14 : part_rels = lappend(part_rels, childrel);
3702 14 : relids = lappend_oid(relids, childrelid);
3703 : /* Log this relation only if needed for logical decoding */
3704 14 : if (RelationIsLogicallyLogged(childrel))
3705 0 : relids_logged = lappend_oid(relids_logged, childrelid);
3706 : }
3707 : }
3708 : }
3709 :
3710 : /*
3711 : * Even if we used CASCADE on the upstream primary we explicitly default
3712 : * to replaying changes without further cascading. This might be later
3713 : * changeable with a user specified option.
3714 : *
3715 : * MySubscription->runasowner tells us whether we want to execute
3716 : * replication actions as the subscription owner; the last argument to
3717 : * TruncateGuts tells it whether we want to switch to the table owner.
3718 : * Those are exactly opposite conditions.
3719 : */
3720 40 : ExecuteTruncateGuts(rels,
3721 : relids,
3722 : relids_logged,
3723 : DROP_RESTRICT,
3724 : restart_seqs,
3725 40 : !MySubscription->runasowner);
3726 98 : foreach(lc, remote_rels)
3727 : {
3728 58 : LogicalRepRelMapEntry *rel = lfirst(lc);
3729 :
3730 58 : logicalrep_rel_close(rel, NoLock);
3731 : }
3732 54 : foreach(lc, part_rels)
3733 : {
3734 14 : Relation rel = lfirst(lc);
3735 :
3736 14 : table_close(rel, NoLock);
3737 : }
3738 :
3739 40 : end_replication_step();
3740 : }
3741 :
3742 :
3743 : /*
3744 : * Logical replication protocol message dispatcher.
3745 : */
3746 : void
3747 674528 : apply_dispatch(StringInfo s)
3748 : {
3749 674528 : LogicalRepMsgType action = pq_getmsgbyte(s);
3750 : LogicalRepMsgType saved_command;
3751 :
3752 : /*
3753 : * Set the current command being applied. Since this function can be
3754 : * called recursively when applying spooled changes, save the current
3755 : * command.
3756 : */
3757 674528 : saved_command = apply_error_callback_arg.command;
3758 674528 : apply_error_callback_arg.command = action;
3759 :
3760 674528 : switch (action)
3761 : {
3762 962 : case LOGICAL_REP_MSG_BEGIN:
3763 962 : apply_handle_begin(s);
3764 962 : break;
3765 :
3766 872 : case LOGICAL_REP_MSG_COMMIT:
3767 872 : apply_handle_commit(s);
3768 870 : break;
3769 :
3770 371722 : case LOGICAL_REP_MSG_INSERT:
3771 371722 : apply_handle_insert(s);
3772 371642 : break;
3773 :
3774 132324 : case LOGICAL_REP_MSG_UPDATE:
3775 132324 : apply_handle_update(s);
3776 132308 : break;
3777 :
3778 163872 : case LOGICAL_REP_MSG_DELETE:
3779 163872 : apply_handle_delete(s);
3780 163872 : break;
3781 :
3782 40 : case LOGICAL_REP_MSG_TRUNCATE:
3783 40 : apply_handle_truncate(s);
3784 40 : break;
3785 :
3786 928 : case LOGICAL_REP_MSG_RELATION:
3787 928 : apply_handle_relation(s);
3788 928 : break;
3789 :
3790 36 : case LOGICAL_REP_MSG_TYPE:
3791 36 : apply_handle_type(s);
3792 36 : break;
3793 :
3794 14 : case LOGICAL_REP_MSG_ORIGIN:
3795 14 : apply_handle_origin(s);
3796 14 : break;
3797 :
3798 0 : case LOGICAL_REP_MSG_MESSAGE:
3799 :
3800 : /*
3801 : * Logical replication does not use generic logical messages yet.
3802 : * Although, it could be used by other applications that use this
3803 : * output plugin.
3804 : */
3805 0 : break;
3806 :
3807 1714 : case LOGICAL_REP_MSG_STREAM_START:
3808 1714 : apply_handle_stream_start(s);
3809 1714 : break;
3810 :
3811 1712 : case LOGICAL_REP_MSG_STREAM_STOP:
3812 1712 : apply_handle_stream_stop(s);
3813 1708 : break;
3814 :
3815 76 : case LOGICAL_REP_MSG_STREAM_ABORT:
3816 76 : apply_handle_stream_abort(s);
3817 76 : break;
3818 :
3819 122 : case LOGICAL_REP_MSG_STREAM_COMMIT:
3820 122 : apply_handle_stream_commit(s);
3821 118 : break;
3822 :
3823 32 : case LOGICAL_REP_MSG_BEGIN_PREPARE:
3824 32 : apply_handle_begin_prepare(s);
3825 32 : break;
3826 :
3827 30 : case LOGICAL_REP_MSG_PREPARE:
3828 30 : apply_handle_prepare(s);
3829 28 : break;
3830 :
3831 40 : case LOGICAL_REP_MSG_COMMIT_PREPARED:
3832 40 : apply_handle_commit_prepared(s);
3833 40 : break;
3834 :
3835 10 : case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
3836 10 : apply_handle_rollback_prepared(s);
3837 10 : break;
3838 :
3839 22 : case LOGICAL_REP_MSG_STREAM_PREPARE:
3840 22 : apply_handle_stream_prepare(s);
3841 22 : break;
3842 :
3843 0 : default:
3844 0 : ereport(ERROR,
3845 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
3846 : errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3847 : }
3848 :
3849 : /* Reset the current command */
3850 674420 : apply_error_callback_arg.command = saved_command;
3851 674420 : }
3852 :
3853 : /*
3854 : * Figure out which write/flush positions to report to the walsender process.
3855 : *
3856 : * We can't simply report back the last LSN the walsender sent us because the
3857 : * local transaction might not yet be flushed to disk locally. Instead we
3858 : * build a list that associates local with remote LSNs for every commit. When
3859 : * reporting back the flush position to the sender we iterate that list and
3860 : * check which entries on it are already locally flushed. Those we can report
3861 : * as having been flushed.
3862 : *
3863 : * The have_pending_txes is true if there are outstanding transactions that
3864 : * need to be flushed.
3865 : */
3866 : static void
3867 140392 : get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
3868 : bool *have_pending_txes)
3869 : {
3870 : dlist_mutable_iter iter;
3871 140392 : XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3872 :
3873 140392 : *write = InvalidXLogRecPtr;
3874 140392 : *flush = InvalidXLogRecPtr;
3875 :
3876 141360 : dlist_foreach_modify(iter, &lsn_mapping)
3877 : {
3878 26572 : FlushPosition *pos =
3879 26572 : dlist_container(FlushPosition, node, iter.cur);
3880 :
3881 26572 : *write = pos->remote_end;
3882 :
3883 26572 : if (pos->local_end <= local_flush)
3884 : {
3885 968 : *flush = pos->remote_end;
3886 968 : dlist_delete(iter.cur);
3887 968 : pfree(pos);
3888 : }
3889 : else
3890 : {
3891 : /*
3892 : * Don't want to uselessly iterate over the rest of the list which
3893 : * could potentially be long. Instead get the last element and
3894 : * grab the write position from there.
3895 : */
3896 25604 : pos = dlist_tail_element(FlushPosition, node,
3897 : &lsn_mapping);
3898 25604 : *write = pos->remote_end;
3899 25604 : *have_pending_txes = true;
3900 25604 : return;
3901 : }
3902 : }
3903 :
3904 114788 : *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3905 : }
3906 :
3907 : /*
3908 : * Store current remote/local lsn pair in the tracking list.
3909 : */
3910 : void
3911 1084 : store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
3912 : {
3913 : FlushPosition *flushpos;
3914 :
3915 : /*
3916 : * Skip for parallel apply workers, because the lsn_mapping is maintained
3917 : * by the leader apply worker.
3918 : */
3919 1084 : if (am_parallel_apply_worker())
3920 38 : return;
3921 :
3922 : /* Need to do this in permanent context */
3923 1046 : MemoryContextSwitchTo(ApplyContext);
3924 :
3925 : /* Track commit lsn */
3926 1046 : flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
3927 1046 : flushpos->local_end = local_lsn;
3928 1046 : flushpos->remote_end = remote_lsn;
3929 :
3930 1046 : dlist_push_tail(&lsn_mapping, &flushpos->node);
3931 1046 : MemoryContextSwitchTo(ApplyMessageContext);
3932 : }
3933 :
3934 :
3935 : /* Update statistics of the worker. */
3936 : static void
3937 387442 : UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
3938 : {
3939 387442 : MyLogicalRepWorker->last_lsn = last_lsn;
3940 387442 : MyLogicalRepWorker->last_send_time = send_time;
3941 387442 : MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
3942 387442 : if (reply)
3943 : {
3944 3976 : MyLogicalRepWorker->reply_lsn = last_lsn;
3945 3976 : MyLogicalRepWorker->reply_time = send_time;
3946 : }
3947 387442 : }
3948 :
3949 : /*
3950 : * Apply main loop.
3951 : */
3952 : static void
3953 804 : LogicalRepApplyLoop(XLogRecPtr last_received)
3954 : {
3955 804 : TimestampTz last_recv_timestamp = GetCurrentTimestamp();
3956 804 : bool ping_sent = false;
3957 : TimeLineID tli;
3958 : ErrorContextCallback errcallback;
3959 804 : RetainDeadTuplesData rdt_data = {0};
3960 :
3961 : /*
3962 : * Init the ApplyMessageContext which we clean up after each replication
3963 : * protocol message.
3964 : */
3965 804 : ApplyMessageContext = AllocSetContextCreate(ApplyContext,
3966 : "ApplyMessageContext",
3967 : ALLOCSET_DEFAULT_SIZES);
3968 :
3969 : /*
3970 : * This memory context is used for per-stream data when the streaming mode
3971 : * is enabled. This context is reset on each stream stop.
3972 : */
3973 804 : LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
3974 : "LogicalStreamingContext",
3975 : ALLOCSET_DEFAULT_SIZES);
3976 :
3977 : /* mark as idle, before starting to loop */
3978 804 : pgstat_report_activity(STATE_IDLE, NULL);
3979 :
3980 : /*
3981 : * Push apply error context callback. Fields will be filled while applying
3982 : * a change.
3983 : */
3984 804 : errcallback.callback = apply_error_callback;
3985 804 : errcallback.previous = error_context_stack;
3986 804 : error_context_stack = &errcallback;
3987 804 : apply_error_context_stack = error_context_stack;
3988 :
3989 : /* This outer loop iterates once per wait. */
3990 : for (;;)
3991 135406 : {
3992 136210 : pgsocket fd = PGINVALID_SOCKET;
3993 : int rc;
3994 : int len;
3995 136210 : char *buf = NULL;
3996 136210 : bool endofstream = false;
3997 : long wait_time;
3998 :
3999 136210 : CHECK_FOR_INTERRUPTS();
4000 :
4001 136210 : MemoryContextSwitchTo(ApplyMessageContext);
4002 :
4003 136210 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
4004 :
4005 136176 : if (len != 0)
4006 : {
4007 : /* Loop to process all available data (without blocking). */
4008 : for (;;)
4009 : {
4010 521522 : CHECK_FOR_INTERRUPTS();
4011 :
4012 521522 : if (len == 0)
4013 : {
4014 134060 : break;
4015 : }
4016 387462 : else if (len < 0)
4017 : {
4018 20 : ereport(LOG,
4019 : (errmsg("data stream from publisher has ended")));
4020 20 : endofstream = true;
4021 20 : break;
4022 : }
4023 : else
4024 : {
4025 : int c;
4026 : StringInfoData s;
4027 :
4028 387442 : if (ConfigReloadPending)
4029 : {
4030 0 : ConfigReloadPending = false;
4031 0 : ProcessConfigFile(PGC_SIGHUP);
4032 : }
4033 :
4034 : /* Reset timeout. */
4035 387442 : last_recv_timestamp = GetCurrentTimestamp();
4036 387442 : ping_sent = false;
4037 :
4038 387442 : rdt_data.last_recv_time = last_recv_timestamp;
4039 :
4040 : /* Ensure we are reading the data into our memory context. */
4041 387442 : MemoryContextSwitchTo(ApplyMessageContext);
4042 :
4043 387442 : initReadOnlyStringInfo(&s, buf, len);
4044 :
4045 387442 : c = pq_getmsgbyte(&s);
4046 :
4047 387442 : if (c == PqReplMsg_WALData)
4048 : {
4049 : XLogRecPtr start_lsn;
4050 : XLogRecPtr end_lsn;
4051 : TimestampTz send_time;
4052 :
4053 369798 : start_lsn = pq_getmsgint64(&s);
4054 369798 : end_lsn = pq_getmsgint64(&s);
4055 369798 : send_time = pq_getmsgint64(&s);
4056 :
4057 369798 : if (last_received < start_lsn)
4058 304940 : last_received = start_lsn;
4059 :
4060 369798 : if (last_received < end_lsn)
4061 0 : last_received = end_lsn;
4062 :
4063 369798 : UpdateWorkerStats(last_received, send_time, false);
4064 :
4065 369798 : apply_dispatch(&s);
4066 :
4067 369698 : maybe_advance_nonremovable_xid(&rdt_data, false);
4068 : }
4069 17644 : else if (c == PqReplMsg_Keepalive)
4070 : {
4071 : XLogRecPtr end_lsn;
4072 : TimestampTz timestamp;
4073 : bool reply_requested;
4074 :
4075 3976 : end_lsn = pq_getmsgint64(&s);
4076 3976 : timestamp = pq_getmsgint64(&s);
4077 3976 : reply_requested = pq_getmsgbyte(&s);
4078 :
4079 3976 : if (last_received < end_lsn)
4080 1926 : last_received = end_lsn;
4081 :
4082 3976 : send_feedback(last_received, reply_requested, false);
4083 :
4084 3976 : maybe_advance_nonremovable_xid(&rdt_data, false);
4085 :
4086 3976 : UpdateWorkerStats(last_received, timestamp, true);
4087 : }
4088 13668 : else if (c == PqReplMsg_PrimaryStatusUpdate)
4089 : {
4090 13668 : rdt_data.remote_lsn = pq_getmsgint64(&s);
4091 13668 : rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
4092 13668 : rdt_data.remote_nextxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
4093 13668 : rdt_data.reply_time = pq_getmsgint64(&s);
4094 :
4095 : /*
4096 : * This should never happen, see
4097 : * ProcessStandbyPSRequestMessage. But if it happens
4098 : * due to a bug, we don't want to proceed as it can
4099 : * incorrectly advance oldest_nonremovable_xid.
4100 : */
4101 13668 : if (XLogRecPtrIsInvalid(rdt_data.remote_lsn))
4102 0 : elog(ERROR, "cannot get the latest WAL position from the publisher");
4103 :
4104 13668 : maybe_advance_nonremovable_xid(&rdt_data, true);
4105 :
4106 13668 : UpdateWorkerStats(last_received, rdt_data.reply_time, false);
4107 : }
4108 : /* other message types are purposefully ignored */
4109 :
4110 387342 : MemoryContextReset(ApplyMessageContext);
4111 : }
4112 :
4113 387342 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
4114 : }
4115 : }
4116 :
4117 : /* confirm all writes so far */
4118 136076 : send_feedback(last_received, false, false);
4119 :
4120 : /* Reset the timestamp if no message was received */
4121 136076 : rdt_data.last_recv_time = 0;
4122 :
4123 136076 : maybe_advance_nonremovable_xid(&rdt_data, false);
4124 :
4125 136074 : if (!in_remote_transaction && !in_streamed_transaction)
4126 : {
4127 : /*
4128 : * If we didn't get any transactions for a while there might be
4129 : * unconsumed invalidation messages in the queue, consume them
4130 : * now.
4131 : */
4132 10618 : AcceptInvalidationMessages();
4133 10618 : maybe_reread_subscription();
4134 :
4135 : /* Process any table synchronization changes. */
4136 10536 : process_syncing_tables(last_received);
4137 : }
4138 :
4139 : /* Cleanup the memory. */
4140 135616 : MemoryContextReset(ApplyMessageContext);
4141 135616 : MemoryContextSwitchTo(TopMemoryContext);
4142 :
4143 : /* Check if we need to exit the streaming loop. */
4144 135616 : if (endofstream)
4145 20 : break;
4146 :
4147 : /*
4148 : * Wait for more data or latch. If we have unflushed transactions,
4149 : * wake up after WalWriterDelay to see if they've been flushed yet (in
4150 : * which case we should send a feedback message). Otherwise, there's
4151 : * no particular urgency about waking up unless we get data or a
4152 : * signal.
4153 : */
4154 135596 : if (!dlist_is_empty(&lsn_mapping))
4155 23932 : wait_time = WalWriterDelay;
4156 : else
4157 111664 : wait_time = NAPTIME_PER_CYCLE;
4158 :
4159 : /*
4160 : * Ensure to wake up when it's possible to advance the non-removable
4161 : * transaction ID, or when the retention duration may have exceeded
4162 : * max_retention_duration.
4163 : */
4164 135596 : if (MySubscription->retentionactive)
4165 : {
4166 4954 : if (rdt_data.phase == RDT_GET_CANDIDATE_XID &&
4167 124 : rdt_data.xid_advance_interval)
4168 0 : wait_time = Min(wait_time, rdt_data.xid_advance_interval);
4169 4954 : else if (MySubscription->maxretention > 0)
4170 2 : wait_time = Min(wait_time, MySubscription->maxretention);
4171 : }
4172 :
4173 135596 : rc = WaitLatchOrSocket(MyLatch,
4174 : WL_SOCKET_READABLE | WL_LATCH_SET |
4175 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
4176 : fd, wait_time,
4177 : WAIT_EVENT_LOGICAL_APPLY_MAIN);
4178 :
4179 135596 : if (rc & WL_LATCH_SET)
4180 : {
4181 1350 : ResetLatch(MyLatch);
4182 1350 : CHECK_FOR_INTERRUPTS();
4183 : }
4184 :
4185 135406 : if (ConfigReloadPending)
4186 : {
4187 20 : ConfigReloadPending = false;
4188 20 : ProcessConfigFile(PGC_SIGHUP);
4189 : }
4190 :
4191 135406 : if (rc & WL_TIMEOUT)
4192 : {
4193 : /*
4194 : * We didn't receive anything new. If we haven't heard anything
4195 : * from the server for more than wal_receiver_timeout / 2, ping
4196 : * the server. Also, if it's been longer than
4197 : * wal_receiver_status_interval since the last update we sent,
4198 : * send a status update to the primary anyway, to report any
4199 : * progress in applying WAL.
4200 : */
4201 314 : bool requestReply = false;
4202 :
4203 : /*
4204 : * Check if time since last receive from primary has reached the
4205 : * configured limit.
4206 : */
4207 314 : if (wal_receiver_timeout > 0)
4208 : {
4209 314 : TimestampTz now = GetCurrentTimestamp();
4210 : TimestampTz timeout;
4211 :
4212 314 : timeout =
4213 314 : TimestampTzPlusMilliseconds(last_recv_timestamp,
4214 : wal_receiver_timeout);
4215 :
4216 314 : if (now >= timeout)
4217 0 : ereport(ERROR,
4218 : (errcode(ERRCODE_CONNECTION_FAILURE),
4219 : errmsg("terminating logical replication worker due to timeout")));
4220 :
4221 : /* Check to see if it's time for a ping. */
4222 314 : if (!ping_sent)
4223 : {
4224 314 : timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
4225 : (wal_receiver_timeout / 2));
4226 314 : if (now >= timeout)
4227 : {
4228 0 : requestReply = true;
4229 0 : ping_sent = true;
4230 : }
4231 : }
4232 : }
4233 :
4234 314 : send_feedback(last_received, requestReply, requestReply);
4235 :
4236 314 : maybe_advance_nonremovable_xid(&rdt_data, false);
4237 :
4238 : /*
4239 : * Force reporting to ensure long idle periods don't lead to
4240 : * arbitrarily delayed stats. Stats can only be reported outside
4241 : * of (implicit or explicit) transactions. That shouldn't lead to
4242 : * stats being delayed for long, because transactions are either
4243 : * sent as a whole on commit or streamed. Streamed transactions
4244 : * are spilled to disk and applied on commit.
4245 : */
4246 314 : if (!IsTransactionState())
4247 314 : pgstat_report_stat(true);
4248 : }
4249 : }
4250 :
4251 : /* Pop the error context stack */
4252 20 : error_context_stack = errcallback.previous;
4253 20 : apply_error_context_stack = error_context_stack;
4254 :
4255 : /* All done */
4256 20 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
4257 0 : }
4258 :
4259 : /*
4260 : * Send a Standby Status Update message to server.
4261 : *
4262 : * 'recvpos' is the latest LSN we've received data to, force is set if we need
4263 : * to send a response to avoid timeouts.
4264 : */
4265 : static void
4266 140366 : send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
4267 : {
4268 : static StringInfo reply_message = NULL;
4269 : static TimestampTz send_time = 0;
4270 :
4271 : static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
4272 : static XLogRecPtr last_writepos = InvalidXLogRecPtr;
4273 :
4274 : XLogRecPtr writepos;
4275 : XLogRecPtr flushpos;
4276 : TimestampTz now;
4277 : bool have_pending_txes;
4278 :
4279 : /*
4280 : * If the user doesn't want status to be reported to the publisher, be
4281 : * sure to exit before doing anything at all.
4282 : */
4283 140366 : if (!force && wal_receiver_status_interval <= 0)
4284 42774 : return;
4285 :
4286 : /* It's legal to not pass a recvpos */
4287 140366 : if (recvpos < last_recvpos)
4288 0 : recvpos = last_recvpos;
4289 :
4290 140366 : get_flush_position(&writepos, &flushpos, &have_pending_txes);
4291 :
4292 : /*
4293 : * No outstanding transactions to flush, we can report the latest received
4294 : * position. This is important for synchronous replication.
4295 : */
4296 140366 : if (!have_pending_txes)
4297 114774 : flushpos = writepos = recvpos;
4298 :
4299 140366 : if (writepos < last_writepos)
4300 0 : writepos = last_writepos;
4301 :
4302 140366 : if (flushpos < last_flushpos)
4303 25484 : flushpos = last_flushpos;
4304 :
4305 140366 : now = GetCurrentTimestamp();
4306 :
4307 : /* if we've already reported everything we're good */
4308 140366 : if (!force &&
4309 139672 : writepos == last_writepos &&
4310 43302 : flushpos == last_flushpos &&
4311 43032 : !TimestampDifferenceExceeds(send_time, now,
4312 : wal_receiver_status_interval * 1000))
4313 42774 : return;
4314 97592 : send_time = now;
4315 :
4316 97592 : if (!reply_message)
4317 : {
4318 804 : MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
4319 :
4320 804 : reply_message = makeStringInfo();
4321 804 : MemoryContextSwitchTo(oldctx);
4322 : }
4323 : else
4324 96788 : resetStringInfo(reply_message);
4325 :
4326 97592 : pq_sendbyte(reply_message, PqReplMsg_StandbyStatusUpdate);
4327 97592 : pq_sendint64(reply_message, recvpos); /* write */
4328 97592 : pq_sendint64(reply_message, flushpos); /* flush */
4329 97592 : pq_sendint64(reply_message, writepos); /* apply */
4330 97592 : pq_sendint64(reply_message, now); /* sendTime */
4331 97592 : pq_sendbyte(reply_message, requestReply); /* replyRequested */
4332 :
4333 97592 : elog(DEBUG2, "sending feedback (force %d) to recv %X/%08X, write %X/%08X, flush %X/%08X",
4334 : force,
4335 : LSN_FORMAT_ARGS(recvpos),
4336 : LSN_FORMAT_ARGS(writepos),
4337 : LSN_FORMAT_ARGS(flushpos));
4338 :
4339 97592 : walrcv_send(LogRepWorkerWalRcvConn,
4340 : reply_message->data, reply_message->len);
4341 :
4342 97592 : if (recvpos > last_recvpos)
4343 96376 : last_recvpos = recvpos;
4344 97592 : if (writepos > last_writepos)
4345 96374 : last_writepos = writepos;
4346 97592 : if (flushpos > last_flushpos)
4347 95898 : last_flushpos = flushpos;
4348 : }
4349 :
4350 : /*
4351 : * Attempt to advance the non-removable transaction ID.
4352 : *
4353 : * See comments atop worker.c for details.
4354 : */
4355 : static void
4356 523732 : maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
4357 : bool status_received)
4358 : {
4359 523732 : if (!can_advance_nonremovable_xid(rdt_data))
4360 504728 : return;
4361 :
4362 19004 : process_rdt_phase_transition(rdt_data, status_received);
4363 : }
4364 :
4365 : /*
4366 : * Preliminary check to determine if advancing the non-removable transaction ID
4367 : * is allowed.
4368 : */
4369 : static bool
4370 523732 : can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
4371 : {
4372 : /*
4373 : * It is sufficient to manage non-removable transaction ID for a
4374 : * subscription by the main apply worker to detect update_deleted reliably
4375 : * even for table sync or parallel apply workers.
4376 : */
4377 523732 : if (!am_leader_apply_worker())
4378 600 : return false;
4379 :
4380 : /* No need to advance if retaining dead tuples is not required */
4381 523132 : if (!MySubscription->retaindeadtuples)
4382 504128 : return false;
4383 :
4384 19004 : return true;
4385 : }
4386 :
4387 : /*
4388 : * Process phase transitions during the non-removable transaction ID
4389 : * advancement. See comments atop worker.c for details of the transition.
4390 : */
4391 : static void
4392 32842 : process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
4393 : bool status_received)
4394 : {
4395 32842 : switch (rdt_data->phase)
4396 : {
4397 344 : case RDT_GET_CANDIDATE_XID:
4398 344 : get_candidate_xid(rdt_data);
4399 344 : break;
4400 13680 : case RDT_REQUEST_PUBLISHER_STATUS:
4401 13680 : request_publisher_status(rdt_data);
4402 13680 : break;
4403 18540 : case RDT_WAIT_FOR_PUBLISHER_STATUS:
4404 18540 : wait_for_publisher_status(rdt_data, status_received);
4405 18540 : break;
4406 272 : case RDT_WAIT_FOR_LOCAL_FLUSH:
4407 272 : wait_for_local_flush(rdt_data);
4408 272 : break;
4409 4 : case RDT_STOP_CONFLICT_INFO_RETENTION:
4410 4 : stop_conflict_info_retention(rdt_data);
4411 4 : break;
4412 2 : case RDT_RESUME_CONFLICT_INFO_RETENTION:
4413 2 : resume_conflict_info_retention(rdt_data);
4414 0 : break;
4415 : }
4416 32840 : }
4417 :
4418 : /*
4419 : * Workhorse for the RDT_GET_CANDIDATE_XID phase.
4420 : */
4421 : static void
4422 344 : get_candidate_xid(RetainDeadTuplesData *rdt_data)
4423 : {
4424 : TransactionId oldest_running_xid;
4425 : TimestampTz now;
4426 :
4427 : /*
4428 : * Use last_recv_time when applying changes in the loop to avoid
4429 : * unnecessary system time retrieval. If last_recv_time is not available,
4430 : * obtain the current timestamp.
4431 : */
4432 344 : now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4433 :
4434 : /*
4435 : * Compute the candidate_xid and request the publisher status at most once
4436 : * per xid_advance_interval. Refer to adjust_xid_advance_interval() for
4437 : * details on how this value is dynamically adjusted. This is to avoid
4438 : * using CPU and network resources without making much progress.
4439 : */
4440 344 : if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
4441 : rdt_data->xid_advance_interval))
4442 0 : return;
4443 :
4444 : /*
4445 : * Immediately update the timer, even if the function returns later
4446 : * without setting candidate_xid due to inactivity on the subscriber. This
4447 : * avoids frequent calls to GetOldestActiveTransactionId.
4448 : */
4449 344 : rdt_data->candidate_xid_time = now;
4450 :
4451 : /*
4452 : * Consider transactions in the current database, as only dead tuples from
4453 : * this database are required for conflict detection.
4454 : */
4455 344 : oldest_running_xid = GetOldestActiveTransactionId(false, false);
4456 :
4457 : /*
4458 : * Oldest active transaction ID (oldest_running_xid) can't be behind any
4459 : * of its previously computed value.
4460 : */
4461 : Assert(TransactionIdPrecedesOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
4462 : oldest_running_xid));
4463 :
4464 : /* Return if the oldest_nonremovable_xid cannot be advanced */
4465 344 : if (TransactionIdEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
4466 : oldest_running_xid))
4467 : {
4468 250 : adjust_xid_advance_interval(rdt_data, false);
4469 250 : return;
4470 : }
4471 :
4472 94 : adjust_xid_advance_interval(rdt_data, true);
4473 :
4474 94 : rdt_data->candidate_xid = oldest_running_xid;
4475 94 : rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
4476 :
4477 : /* process the next phase */
4478 94 : process_rdt_phase_transition(rdt_data, false);
4479 : }
4480 :
4481 : /*
4482 : * Workhorse for the RDT_REQUEST_PUBLISHER_STATUS phase.
4483 : */
4484 : static void
4485 13680 : request_publisher_status(RetainDeadTuplesData *rdt_data)
4486 : {
4487 : static StringInfo request_message = NULL;
4488 :
4489 13680 : if (!request_message)
4490 : {
4491 24 : MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
4492 :
4493 24 : request_message = makeStringInfo();
4494 24 : MemoryContextSwitchTo(oldctx);
4495 : }
4496 : else
4497 13656 : resetStringInfo(request_message);
4498 :
4499 : /*
4500 : * Send the current time to update the remote walsender's latest reply
4501 : * message received time.
4502 : */
4503 13680 : pq_sendbyte(request_message, PqReplMsg_PrimaryStatusRequest);
4504 13680 : pq_sendint64(request_message, GetCurrentTimestamp());
4505 :
4506 13680 : elog(DEBUG2, "sending publisher status request message");
4507 :
4508 : /* Send a request for the publisher status */
4509 13680 : walrcv_send(LogRepWorkerWalRcvConn,
4510 : request_message->data, request_message->len);
4511 :
4512 13680 : rdt_data->phase = RDT_WAIT_FOR_PUBLISHER_STATUS;
4513 :
4514 : /*
4515 : * Skip calling maybe_advance_nonremovable_xid() since further transition
4516 : * is possible only once we receive the publisher status message.
4517 : */
4518 13680 : }
4519 :
4520 : /*
4521 : * Workhorse for the RDT_WAIT_FOR_PUBLISHER_STATUS phase.
4522 : */
4523 : static void
4524 18540 : wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
4525 : bool status_received)
4526 : {
4527 : /*
4528 : * Return if we have requested but not yet received the publisher status.
4529 : */
4530 18540 : if (!status_received)
4531 4872 : return;
4532 :
4533 : /*
4534 : * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4535 : * retaining conflict information for this worker.
4536 : */
4537 13668 : if (should_stop_conflict_info_retention(rdt_data))
4538 : {
4539 0 : rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
4540 0 : return;
4541 : }
4542 :
4543 13668 : if (!FullTransactionIdIsValid(rdt_data->remote_wait_for))
4544 82 : rdt_data->remote_wait_for = rdt_data->remote_nextxid;
4545 :
4546 : /*
4547 : * Check if all remote concurrent transactions that were active at the
4548 : * first status request have now completed. If completed, proceed to the
4549 : * next phase; otherwise, continue checking the publisher status until
4550 : * these transactions finish.
4551 : *
4552 : * It's possible that transactions in the commit phase during the last
4553 : * cycle have now finished committing, but remote_oldestxid remains older
4554 : * than remote_wait_for. This can happen if some old transaction came in
4555 : * the commit phase when we requested status in this cycle. We do not
4556 : * handle this case explicitly as it's rare and the benefit doesn't
4557 : * justify the required complexity. Tracking would require either caching
4558 : * all xids at the publisher or sending them to subscribers. The condition
4559 : * will resolve naturally once the remaining transactions are finished.
4560 : *
4561 : * Directly advancing the non-removable transaction ID is possible if
4562 : * there are no activities on the publisher since the last advancement
4563 : * cycle. However, it requires maintaining two fields, last_remote_nextxid
4564 : * and last_remote_lsn, within the structure for comparison with the
4565 : * current cycle's values. Considering the minimal cost of continuing in
4566 : * RDT_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to
4567 : * advance the transaction ID here.
4568 : */
4569 13668 : if (FullTransactionIdPrecedesOrEquals(rdt_data->remote_wait_for,
4570 : rdt_data->remote_oldestxid))
4571 82 : rdt_data->phase = RDT_WAIT_FOR_LOCAL_FLUSH;
4572 : else
4573 13586 : rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
4574 :
4575 : /* process the next phase */
4576 13668 : process_rdt_phase_transition(rdt_data, false);
4577 : }
4578 :
4579 : /*
4580 : * Workhorse for the RDT_WAIT_FOR_LOCAL_FLUSH phase.
4581 : */
4582 : static void
4583 272 : wait_for_local_flush(RetainDeadTuplesData *rdt_data)
4584 : {
4585 : Assert(!XLogRecPtrIsInvalid(rdt_data->remote_lsn) &&
4586 : TransactionIdIsValid(rdt_data->candidate_xid));
4587 :
4588 : /*
4589 : * We expect the publisher and subscriber clocks to be in sync using time
4590 : * sync service like NTP. Otherwise, we will advance this worker's
4591 : * oldest_nonremovable_xid prematurely, leading to the removal of rows
4592 : * required to detect update_deleted reliably. This check primarily
4593 : * addresses scenarios where the publisher's clock falls behind; if the
4594 : * publisher's clock is ahead, subsequent transactions will naturally bear
4595 : * later commit timestamps, conforming to the design outlined atop
4596 : * worker.c.
4597 : *
4598 : * XXX Consider waiting for the publisher's clock to catch up with the
4599 : * subscriber's before proceeding to the next phase.
4600 : */
4601 272 : if (TimestampDifferenceExceeds(rdt_data->reply_time,
4602 : rdt_data->candidate_xid_time, 0))
4603 0 : ereport(ERROR,
4604 : errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"),
4605 : errdetail_internal("The clock on the publisher is behind that of the subscriber."));
4606 :
4607 : /*
4608 : * Do not attempt to advance the non-removable transaction ID when table
4609 : * sync is in progress. During this time, changes from a single
4610 : * transaction may be applied by multiple table sync workers corresponding
4611 : * to the target tables. So, it's necessary for all table sync workers to
4612 : * apply and flush the corresponding changes before advancing the
4613 : * transaction ID, otherwise, dead tuples that are still needed for
4614 : * conflict detection in table sync workers could be removed prematurely.
4615 : * However, confirming the apply and flush progress across all table sync
4616 : * workers is complex and not worth the effort, so we simply return if not
4617 : * all tables are in the READY state.
4618 : *
4619 : * Advancing the transaction ID is necessary even when no tables are
4620 : * currently subscribed, to avoid retaining dead tuples unnecessarily.
4621 : * While it might seem safe to skip all phases and directly assign
4622 : * candidate_xid to oldest_nonremovable_xid during the
4623 : * RDT_GET_CANDIDATE_XID phase in such cases, this is unsafe. If users
4624 : * concurrently add tables to the subscription, the apply worker may not
4625 : * process invalidations in time. Consequently,
4626 : * HasSubscriptionRelationsCached() might miss the new tables, leading to
4627 : * premature advancement of oldest_nonremovable_xid.
4628 : *
4629 : * Performing the check during RDT_WAIT_FOR_LOCAL_FLUSH is safe, as
4630 : * invalidations are guaranteed to be processed before applying changes
4631 : * from newly added tables while waiting for the local flush to reach
4632 : * remote_lsn.
4633 : *
4634 : * Additionally, even if we check for subscription tables during
4635 : * RDT_GET_CANDIDATE_XID, they might be dropped before reaching
4636 : * RDT_WAIT_FOR_LOCAL_FLUSH. Therefore, it's still necessary to verify
4637 : * subscription tables at this stage to prevent unnecessary tuple
4638 : * retention.
4639 : */
4640 272 : if (HasSubscriptionRelationsCached() && !AllTablesyncsReady())
4641 : {
4642 : TimestampTz now;
4643 :
4644 76 : now = rdt_data->last_recv_time
4645 38 : ? rdt_data->last_recv_time : GetCurrentTimestamp();
4646 :
4647 : /*
4648 : * Record the time spent waiting for table sync, it is needed for the
4649 : * timeout check in should_stop_conflict_info_retention().
4650 : */
4651 38 : rdt_data->table_sync_wait_time =
4652 38 : TimestampDifferenceMilliseconds(rdt_data->candidate_xid_time, now);
4653 :
4654 38 : return;
4655 : }
4656 :
4657 : /*
4658 : * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4659 : * retaining conflict information for this worker.
4660 : */
4661 234 : if (should_stop_conflict_info_retention(rdt_data))
4662 : {
4663 4 : rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
4664 4 : return;
4665 : }
4666 :
4667 : /*
4668 : * Update and check the remote flush position if we are applying changes
4669 : * in a loop. This is done at most once per WalWriterDelay to avoid
4670 : * performing costly operations in get_flush_position() too frequently
4671 : * during change application.
4672 : */
4673 308 : if (last_flushpos < rdt_data->remote_lsn && rdt_data->last_recv_time &&
4674 78 : TimestampDifferenceExceeds(rdt_data->flushpos_update_time,
4675 : rdt_data->last_recv_time, WalWriterDelay))
4676 : {
4677 : XLogRecPtr writepos;
4678 : XLogRecPtr flushpos;
4679 : bool have_pending_txes;
4680 :
4681 : /* Fetch the latest remote flush position */
4682 26 : get_flush_position(&writepos, &flushpos, &have_pending_txes);
4683 :
4684 26 : if (flushpos > last_flushpos)
4685 0 : last_flushpos = flushpos;
4686 :
4687 26 : rdt_data->flushpos_update_time = rdt_data->last_recv_time;
4688 : }
4689 :
4690 : /* Return to wait for the changes to be applied */
4691 230 : if (last_flushpos < rdt_data->remote_lsn)
4692 152 : return;
4693 :
4694 : /*
4695 : * Reaching this point implies should_stop_conflict_info_retention()
4696 : * returned false earlier, meaning that the most recent duration for
4697 : * advancing the non-removable transaction ID is within the
4698 : * max_retention_duration or max_retention_duration is set to 0.
4699 : *
4700 : * Therefore, if conflict info retention was previously stopped due to a
4701 : * timeout, it is now safe to resume retention.
4702 : */
4703 78 : if (!MySubscription->retentionactive)
4704 : {
4705 2 : rdt_data->phase = RDT_RESUME_CONFLICT_INFO_RETENTION;
4706 2 : return;
4707 : }
4708 :
4709 : /*
4710 : * Reaching here means the remote WAL position has been received, and all
4711 : * transactions up to that position on the publisher have been applied and
4712 : * flushed locally. So, we can advance the non-removable transaction ID.
4713 : */
4714 76 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
4715 76 : MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid;
4716 76 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
4717 :
4718 76 : elog(DEBUG2, "confirmed flush up to remote lsn %X/%08X: new oldest_nonremovable_xid %u",
4719 : LSN_FORMAT_ARGS(rdt_data->remote_lsn),
4720 : rdt_data->candidate_xid);
4721 :
4722 : /* Notify launcher to update the xmin of the conflict slot */
4723 76 : ApplyLauncherWakeup();
4724 :
4725 76 : reset_retention_data_fields(rdt_data);
4726 :
4727 : /* process the next phase */
4728 76 : process_rdt_phase_transition(rdt_data, false);
4729 : }
4730 :
4731 : /*
4732 : * Check whether conflict information retention should be stopped due to
4733 : * exceeding the maximum wait time (max_retention_duration).
4734 : *
4735 : * If retention should be stopped, return true. Otherwise, return false.
4736 : */
4737 : static bool
4738 13902 : should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
4739 : {
4740 : TimestampTz now;
4741 :
4742 : Assert(TransactionIdIsValid(rdt_data->candidate_xid));
4743 : Assert(rdt_data->phase == RDT_WAIT_FOR_PUBLISHER_STATUS ||
4744 : rdt_data->phase == RDT_WAIT_FOR_LOCAL_FLUSH);
4745 :
4746 13902 : if (!MySubscription->maxretention)
4747 13892 : return false;
4748 :
4749 : /*
4750 : * Use last_recv_time when applying changes in the loop to avoid
4751 : * unnecessary system time retrieval. If last_recv_time is not available,
4752 : * obtain the current timestamp.
4753 : */
4754 10 : now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4755 :
4756 : /*
4757 : * Return early if the wait time has not exceeded the configured maximum
4758 : * (max_retention_duration). Time spent waiting for table synchronization
4759 : * is excluded from this calculation, as it occurs infrequently.
4760 : */
4761 10 : if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
4762 10 : MySubscription->maxretention +
4763 10 : rdt_data->table_sync_wait_time))
4764 6 : return false;
4765 :
4766 4 : return true;
4767 : }
4768 :
4769 : /*
4770 : * Workhorse for the RDT_STOP_CONFLICT_INFO_RETENTION phase.
4771 : */
4772 : static void
4773 4 : stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
4774 : {
4775 : /* Stop retention if not yet */
4776 4 : if (MySubscription->retentionactive)
4777 : {
4778 : /*
4779 : * If the retention status cannot be updated (e.g., due to active
4780 : * transaction), skip further processing to avoid inconsistent
4781 : * retention behavior.
4782 : */
4783 2 : if (!update_retention_status(false))
4784 0 : return;
4785 :
4786 2 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
4787 2 : MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId;
4788 2 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
4789 :
4790 2 : ereport(LOG,
4791 : errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts",
4792 : MySubscription->name),
4793 : errdetail("Retention is stopped as the apply process is not advancing its xmin within the configured max_retention_duration of %u ms.",
4794 : MySubscription->maxretention));
4795 : }
4796 :
4797 : Assert(!TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid));
4798 :
4799 : /*
4800 : * If retention has been stopped, reset to the initial phase to retry
4801 : * resuming retention. This reset is required to recalculate the current
4802 : * wait time and resume retention if the time falls within
4803 : * max_retention_duration.
4804 : */
4805 4 : reset_retention_data_fields(rdt_data);
4806 : }
4807 :
4808 : /*
4809 : * Workhorse for the RDT_RESUME_CONFLICT_INFO_RETENTION phase.
4810 : */
4811 : static void
4812 2 : resume_conflict_info_retention(RetainDeadTuplesData *rdt_data)
4813 : {
4814 : /* We can't resume retention without updating retention status. */
4815 2 : if (!update_retention_status(true))
4816 0 : return;
4817 :
4818 2 : ereport(LOG,
4819 : errmsg("logical replication worker for subscription \"%s\" will resume retaining the information for detecting conflicts",
4820 : MySubscription->name),
4821 : MySubscription->maxretention
4822 : ? errdetail("Retention is re-enabled as the apply process is advancing its xmin within the configured max_retention_duration of %u ms.",
4823 : MySubscription->maxretention)
4824 : : errdetail("Retention is re-enabled as max_retention_duration is set to unlimited."));
4825 :
4826 : /*
4827 : * Restart the worker to let the launcher initialize
4828 : * oldest_nonremovable_xid at startup.
4829 : *
4830 : * While it's technically possible to derive this value on-the-fly using
4831 : * the conflict detection slot's xmin, doing so risks a race condition:
4832 : * the launcher might clean slot.xmin just after retention resumes. This
4833 : * would make oldest_nonremovable_xid unreliable, especially during xid
4834 : * wraparound.
4835 : *
4836 : * Although this can be prevented by introducing heavy weight locking, the
4837 : * complexity it will bring doesn't seem worthwhile given how rarely
4838 : * retention is resumed.
4839 : */
4840 2 : apply_worker_exit();
4841 : }
4842 :
4843 : /*
4844 : * Updates pg_subscription.subretentionactive to the given value within a
4845 : * new transaction.
4846 : *
4847 : * If already inside an active transaction, skips the update and returns
4848 : * false.
4849 : *
4850 : * Returns true if the update is successfully performed.
4851 : */
4852 : static bool
4853 4 : update_retention_status(bool active)
4854 : {
4855 : /*
4856 : * Do not update the catalog during an active transaction. The transaction
4857 : * may be started during change application, leading to a possible
4858 : * rollback of catalog updates if the application fails subsequently.
4859 : */
4860 4 : if (IsTransactionState())
4861 0 : return false;
4862 :
4863 4 : StartTransactionCommand();
4864 :
4865 : /*
4866 : * Updating pg_subscription might involve TOAST table access, so ensure we
4867 : * have a valid snapshot.
4868 : */
4869 4 : PushActiveSnapshot(GetTransactionSnapshot());
4870 :
4871 : /* Update pg_subscription.subretentionactive */
4872 4 : UpdateDeadTupleRetentionStatus(MySubscription->oid, active);
4873 :
4874 4 : PopActiveSnapshot();
4875 4 : CommitTransactionCommand();
4876 :
4877 : /* Notify launcher to update the conflict slot */
4878 4 : ApplyLauncherWakeup();
4879 :
4880 4 : MySubscription->retentionactive = active;
4881 :
4882 4 : return true;
4883 : }
4884 :
4885 : /*
4886 : * Reset all data fields of RetainDeadTuplesData except those used to
4887 : * determine the timing for the next round of transaction ID advancement. We
4888 : * can even use flushpos_update_time in the next round to decide whether to get
4889 : * the latest flush position.
4890 : */
4891 : static void
4892 80 : reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
4893 : {
4894 80 : rdt_data->phase = RDT_GET_CANDIDATE_XID;
4895 80 : rdt_data->remote_lsn = InvalidXLogRecPtr;
4896 80 : rdt_data->remote_oldestxid = InvalidFullTransactionId;
4897 80 : rdt_data->remote_nextxid = InvalidFullTransactionId;
4898 80 : rdt_data->reply_time = 0;
4899 80 : rdt_data->remote_wait_for = InvalidFullTransactionId;
4900 80 : rdt_data->candidate_xid = InvalidTransactionId;
4901 80 : rdt_data->table_sync_wait_time = 0;
4902 80 : }
4903 :
4904 : /*
4905 : * Adjust the interval for advancing non-removable transaction IDs.
4906 : *
4907 : * If there is no activity on the node or retention has been stopped, we
4908 : * progressively double the interval used to advance non-removable transaction
4909 : * ID. This helps conserve CPU and network resources when there's little benefit
4910 : * to frequent updates.
4911 : *
4912 : * The interval is capped by the lowest of the following:
4913 : * - wal_receiver_status_interval (if set and retention is active),
4914 : * - a default maximum of 3 minutes,
4915 : * - max_retention_duration (if retention is active).
4916 : *
4917 : * This ensures the interval never exceeds the retention boundary, even if other
4918 : * limits are higher. Once activity resumes on the node and the retention is
4919 : * active, the interval is reset to lesser of 100ms and max_retention_duration,
4920 : * allowing timely advancement of non-removable transaction ID.
4921 : *
4922 : * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
4923 : * consider the other interval or a separate GUC if the need arises.
4924 : */
4925 : static void
4926 344 : adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
4927 : {
4928 344 : if (rdt_data->xid_advance_interval && !new_xid_found)
4929 0 : {
4930 0 : int max_interval = wal_receiver_status_interval
4931 0 : ? wal_receiver_status_interval * 1000
4932 0 : : MAX_XID_ADVANCE_INTERVAL;
4933 :
4934 : /*
4935 : * No new transaction ID has been assigned since the last check, so
4936 : * double the interval, but not beyond the maximum allowable value.
4937 : */
4938 0 : rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4939 : max_interval);
4940 : }
4941 344 : else if (rdt_data->xid_advance_interval &&
4942 2 : !MySubscription->retentionactive)
4943 : {
4944 : /*
4945 : * Retention has been stopped, so double the interval-capped at a
4946 : * maximum of 3 minutes. The wal_receiver_status_interval is
4947 : * intentionally not used as a upper bound, since the likelihood of
4948 : * retention resuming is lower than that of general activity resuming.
4949 : */
4950 2 : rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4951 : MAX_XID_ADVANCE_INTERVAL);
4952 : }
4953 : else
4954 : {
4955 : /*
4956 : * A new transaction ID was found or the interval is not yet
4957 : * initialized, so set the interval to the minimum value.
4958 : */
4959 342 : rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
4960 : }
4961 :
4962 : /*
4963 : * Ensure the wait time remains within the maximum retention time limit
4964 : * when retention is active.
4965 : */
4966 344 : if (MySubscription->retentionactive)
4967 340 : rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,
4968 : MySubscription->maxretention);
4969 344 : }
4970 :
4971 : /*
4972 : * Exit routine for apply workers due to subscription parameter changes.
4973 : */
4974 : static void
4975 90 : apply_worker_exit(void)
4976 : {
4977 90 : if (am_parallel_apply_worker())
4978 : {
4979 : /*
4980 : * Don't stop the parallel apply worker as the leader will detect the
4981 : * subscription parameter change and restart logical replication later
4982 : * anyway. This also prevents the leader from reporting errors when
4983 : * trying to communicate with a stopped parallel apply worker, which
4984 : * would accidentally disable subscriptions if disable_on_error was
4985 : * set.
4986 : */
4987 0 : return;
4988 : }
4989 :
4990 : /*
4991 : * Reset the last-start time for this apply worker so that the launcher
4992 : * will restart it without waiting for wal_retrieve_retry_interval if the
4993 : * subscription is still active, and so that we won't leak that hash table
4994 : * entry if it isn't.
4995 : */
4996 90 : if (am_leader_apply_worker())
4997 90 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
4998 :
4999 90 : proc_exit(0);
5000 : }
5001 :
5002 : /*
5003 : * Reread subscription info if needed.
5004 : *
5005 : * For significant changes, we react by exiting the current process; a new
5006 : * one will be launched afterwards if needed.
5007 : */
5008 : void
5009 12638 : maybe_reread_subscription(void)
5010 : {
5011 : MemoryContext oldctx;
5012 : Subscription *newsub;
5013 12638 : bool started_tx = false;
5014 :
5015 : /* When cache state is valid there is nothing to do here. */
5016 12638 : if (MySubscriptionValid)
5017 12466 : return;
5018 :
5019 : /* This function might be called inside or outside of transaction. */
5020 172 : if (!IsTransactionState())
5021 : {
5022 164 : StartTransactionCommand();
5023 164 : started_tx = true;
5024 : }
5025 :
5026 : /* Ensure allocations in permanent context. */
5027 172 : oldctx = MemoryContextSwitchTo(ApplyContext);
5028 :
5029 172 : newsub = GetSubscription(MyLogicalRepWorker->subid, true);
5030 :
5031 : /*
5032 : * Exit if the subscription was removed. This normally should not happen
5033 : * as the worker gets killed during DROP SUBSCRIPTION.
5034 : */
5035 172 : if (!newsub)
5036 : {
5037 0 : ereport(LOG,
5038 : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
5039 : MySubscription->name)));
5040 :
5041 : /* Ensure we remove no-longer-useful entry for worker's start time */
5042 0 : if (am_leader_apply_worker())
5043 0 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5044 :
5045 0 : proc_exit(0);
5046 : }
5047 :
5048 : /* Exit if the subscription was disabled. */
5049 172 : if (!newsub->enabled)
5050 : {
5051 28 : ereport(LOG,
5052 : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
5053 : MySubscription->name)));
5054 :
5055 28 : apply_worker_exit();
5056 : }
5057 :
5058 : /* !slotname should never happen when enabled is true. */
5059 : Assert(newsub->slotname);
5060 :
5061 : /* two-phase cannot be altered while the worker is running */
5062 : Assert(newsub->twophasestate == MySubscription->twophasestate);
5063 :
5064 : /*
5065 : * Exit if any parameter that affects the remote connection was changed.
5066 : * The launcher will start a new worker but note that the parallel apply
5067 : * worker won't restart if the streaming option's value is changed from
5068 : * 'parallel' to any other value or the server decides not to stream the
5069 : * in-progress transaction.
5070 : */
5071 144 : if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
5072 140 : strcmp(newsub->name, MySubscription->name) != 0 ||
5073 138 : strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
5074 138 : newsub->binary != MySubscription->binary ||
5075 126 : newsub->stream != MySubscription->stream ||
5076 116 : newsub->passwordrequired != MySubscription->passwordrequired ||
5077 116 : strcmp(newsub->origin, MySubscription->origin) != 0 ||
5078 112 : newsub->owner != MySubscription->owner ||
5079 110 : !equal(newsub->publications, MySubscription->publications))
5080 : {
5081 52 : if (am_parallel_apply_worker())
5082 0 : ereport(LOG,
5083 : (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
5084 : MySubscription->name)));
5085 : else
5086 52 : ereport(LOG,
5087 : (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
5088 : MySubscription->name)));
5089 :
5090 52 : apply_worker_exit();
5091 : }
5092 :
5093 : /*
5094 : * Exit if the subscription owner's superuser privileges have been
5095 : * revoked.
5096 : */
5097 92 : if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
5098 : {
5099 8 : if (am_parallel_apply_worker())
5100 0 : ereport(LOG,
5101 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
5102 : MySubscription->name));
5103 : else
5104 8 : ereport(LOG,
5105 : errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
5106 : MySubscription->name));
5107 :
5108 8 : apply_worker_exit();
5109 : }
5110 :
5111 : /* Check for other changes that should never happen too. */
5112 84 : if (newsub->dbid != MySubscription->dbid)
5113 : {
5114 0 : elog(ERROR, "subscription %u changed unexpectedly",
5115 : MyLogicalRepWorker->subid);
5116 : }
5117 :
5118 : /* Clean old subscription info and switch to new one. */
5119 84 : FreeSubscription(MySubscription);
5120 84 : MySubscription = newsub;
5121 :
5122 84 : MemoryContextSwitchTo(oldctx);
5123 :
5124 : /* Change synchronous commit according to the user's wishes */
5125 84 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
5126 : PGC_BACKEND, PGC_S_OVERRIDE);
5127 :
5128 84 : if (started_tx)
5129 82 : CommitTransactionCommand();
5130 :
5131 84 : MySubscriptionValid = true;
5132 : }
5133 :
5134 : /*
5135 : * Callback from subscription syscache invalidation.
5136 : */
5137 : static void
5138 184 : subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
5139 : {
5140 184 : MySubscriptionValid = false;
5141 184 : }
5142 :
5143 : /*
5144 : * subxact_info_write
5145 : * Store information about subxacts for a toplevel transaction.
5146 : *
5147 : * For each subxact we store offset of its first change in the main file.
5148 : * The file is always over-written as a whole.
5149 : *
5150 : * XXX We should only store subxacts that were not aborted yet.
5151 : */
5152 : static void
5153 744 : subxact_info_write(Oid subid, TransactionId xid)
5154 : {
5155 : char path[MAXPGPATH];
5156 : Size len;
5157 : BufFile *fd;
5158 :
5159 : Assert(TransactionIdIsValid(xid));
5160 :
5161 : /* construct the subxact filename */
5162 744 : subxact_filename(path, subid, xid);
5163 :
5164 : /* Delete the subxacts file, if exists. */
5165 744 : if (subxact_data.nsubxacts == 0)
5166 : {
5167 580 : cleanup_subxact_info();
5168 580 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
5169 :
5170 580 : return;
5171 : }
5172 :
5173 : /*
5174 : * Create the subxact file if it not already created, otherwise open the
5175 : * existing file.
5176 : */
5177 164 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR,
5178 : true);
5179 164 : if (fd == NULL)
5180 16 : fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path);
5181 :
5182 164 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
5183 :
5184 : /* Write the subxact count and subxact info */
5185 164 : BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
5186 164 : BufFileWrite(fd, subxact_data.subxacts, len);
5187 :
5188 164 : BufFileClose(fd);
5189 :
5190 : /* free the memory allocated for subxact info */
5191 164 : cleanup_subxact_info();
5192 : }
5193 :
5194 : /*
5195 : * subxact_info_read
5196 : * Restore information about subxacts of a streamed transaction.
5197 : *
5198 : * Read information about subxacts into the structure subxact_data that can be
5199 : * used later.
5200 : */
5201 : static void
5202 688 : subxact_info_read(Oid subid, TransactionId xid)
5203 : {
5204 : char path[MAXPGPATH];
5205 : Size len;
5206 : BufFile *fd;
5207 : MemoryContext oldctx;
5208 :
5209 : Assert(!subxact_data.subxacts);
5210 : Assert(subxact_data.nsubxacts == 0);
5211 : Assert(subxact_data.nsubxacts_max == 0);
5212 :
5213 : /*
5214 : * If the subxact file doesn't exist that means we don't have any subxact
5215 : * info.
5216 : */
5217 688 : subxact_filename(path, subid, xid);
5218 688 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
5219 : true);
5220 688 : if (fd == NULL)
5221 530 : return;
5222 :
5223 : /* read number of subxact items */
5224 158 : BufFileReadExact(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
5225 :
5226 158 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
5227 :
5228 : /* we keep the maximum as a power of 2 */
5229 158 : subxact_data.nsubxacts_max = 1 << pg_ceil_log2_32(subxact_data.nsubxacts);
5230 :
5231 : /*
5232 : * Allocate subxact information in the logical streaming context. We need
5233 : * this information during the complete stream so that we can add the sub
5234 : * transaction info to this. On stream stop we will flush this information
5235 : * to the subxact file and reset the logical streaming context.
5236 : */
5237 158 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
5238 158 : subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
5239 : sizeof(SubXactInfo));
5240 158 : MemoryContextSwitchTo(oldctx);
5241 :
5242 158 : if (len > 0)
5243 158 : BufFileReadExact(fd, subxact_data.subxacts, len);
5244 :
5245 158 : BufFileClose(fd);
5246 : }
5247 :
5248 : /*
5249 : * subxact_info_add
5250 : * Add information about a subxact (offset in the main file).
5251 : */
5252 : static void
5253 205028 : subxact_info_add(TransactionId xid)
5254 : {
5255 205028 : SubXactInfo *subxacts = subxact_data.subxacts;
5256 : int64 i;
5257 :
5258 : /* We must have a valid top level stream xid and a stream fd. */
5259 : Assert(TransactionIdIsValid(stream_xid));
5260 : Assert(stream_fd != NULL);
5261 :
5262 : /*
5263 : * If the XID matches the toplevel transaction, we don't want to add it.
5264 : */
5265 205028 : if (stream_xid == xid)
5266 184780 : return;
5267 :
5268 : /*
5269 : * In most cases we're checking the same subxact as we've already seen in
5270 : * the last call, so make sure to ignore it (this change comes later).
5271 : */
5272 20248 : if (subxact_data.subxact_last == xid)
5273 20096 : return;
5274 :
5275 : /* OK, remember we're processing this XID. */
5276 152 : subxact_data.subxact_last = xid;
5277 :
5278 : /*
5279 : * Check if the transaction is already present in the array of subxact. We
5280 : * intentionally scan the array from the tail, because we're likely adding
5281 : * a change for the most recent subtransactions.
5282 : *
5283 : * XXX Can we rely on the subxact XIDs arriving in sorted order? That
5284 : * would allow us to use binary search here.
5285 : */
5286 190 : for (i = subxact_data.nsubxacts; i > 0; i--)
5287 : {
5288 : /* found, so we're done */
5289 152 : if (subxacts[i - 1].xid == xid)
5290 114 : return;
5291 : }
5292 :
5293 : /* This is a new subxact, so we need to add it to the array. */
5294 38 : if (subxact_data.nsubxacts == 0)
5295 : {
5296 : MemoryContext oldctx;
5297 :
5298 16 : subxact_data.nsubxacts_max = 128;
5299 :
5300 : /*
5301 : * Allocate this memory for subxacts in per-stream context, see
5302 : * subxact_info_read.
5303 : */
5304 16 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
5305 16 : subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
5306 16 : MemoryContextSwitchTo(oldctx);
5307 : }
5308 22 : else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
5309 : {
5310 20 : subxact_data.nsubxacts_max *= 2;
5311 20 : subxacts = repalloc(subxacts,
5312 20 : subxact_data.nsubxacts_max * sizeof(SubXactInfo));
5313 : }
5314 :
5315 38 : subxacts[subxact_data.nsubxacts].xid = xid;
5316 :
5317 : /*
5318 : * Get the current offset of the stream file and store it as offset of
5319 : * this subxact.
5320 : */
5321 38 : BufFileTell(stream_fd,
5322 38 : &subxacts[subxact_data.nsubxacts].fileno,
5323 38 : &subxacts[subxact_data.nsubxacts].offset);
5324 :
5325 38 : subxact_data.nsubxacts++;
5326 38 : subxact_data.subxacts = subxacts;
5327 : }
5328 :
5329 : /* format filename for file containing the info about subxacts */
5330 : static inline void
5331 1494 : subxact_filename(char *path, Oid subid, TransactionId xid)
5332 : {
5333 1494 : snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
5334 1494 : }
5335 :
5336 : /* format filename for file containing serialized changes */
5337 : static inline void
5338 876 : changes_filename(char *path, Oid subid, TransactionId xid)
5339 : {
5340 876 : snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
5341 876 : }
5342 :
5343 : /*
5344 : * stream_cleanup_files
5345 : * Cleanup files for a subscription / toplevel transaction.
5346 : *
5347 : * Remove files with serialized changes and subxact info for a particular
5348 : * toplevel transaction. Each subscription has a separate set of files
5349 : * for any toplevel transaction.
5350 : */
5351 : void
5352 62 : stream_cleanup_files(Oid subid, TransactionId xid)
5353 : {
5354 : char path[MAXPGPATH];
5355 :
5356 : /* Delete the changes file. */
5357 62 : changes_filename(path, subid, xid);
5358 62 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false);
5359 :
5360 : /* Delete the subxact file, if it exists. */
5361 62 : subxact_filename(path, subid, xid);
5362 62 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
5363 62 : }
5364 :
5365 : /*
5366 : * stream_open_file
5367 : * Open a file that we'll use to serialize changes for a toplevel
5368 : * transaction.
5369 : *
5370 : * Open a file for streamed changes from a toplevel transaction identified
5371 : * by stream_xid (global variable). If it's the first chunk of streamed
5372 : * changes for this transaction, create the buffile, otherwise open the
5373 : * previously created file.
5374 : */
5375 : static void
5376 726 : stream_open_file(Oid subid, TransactionId xid, bool first_segment)
5377 : {
5378 : char path[MAXPGPATH];
5379 : MemoryContext oldcxt;
5380 :
5381 : Assert(OidIsValid(subid));
5382 : Assert(TransactionIdIsValid(xid));
5383 : Assert(stream_fd == NULL);
5384 :
5385 :
5386 726 : changes_filename(path, subid, xid);
5387 726 : elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
5388 :
5389 : /*
5390 : * Create/open the buffiles under the logical streaming context so that we
5391 : * have those files until stream stop.
5392 : */
5393 726 : oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
5394 :
5395 : /*
5396 : * If this is the first streamed segment, create the changes file.
5397 : * Otherwise, just open the file for writing, in append mode.
5398 : */
5399 726 : if (first_segment)
5400 64 : stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
5401 : path);
5402 : else
5403 : {
5404 : /*
5405 : * Open the file and seek to the end of the file because we always
5406 : * append the changes file.
5407 : */
5408 662 : stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset,
5409 : path, O_RDWR, false);
5410 662 : BufFileSeek(stream_fd, 0, 0, SEEK_END);
5411 : }
5412 :
5413 726 : MemoryContextSwitchTo(oldcxt);
5414 726 : }
5415 :
5416 : /*
5417 : * stream_close_file
5418 : * Close the currently open file with streamed changes.
5419 : */
5420 : static void
5421 786 : stream_close_file(void)
5422 : {
5423 : Assert(stream_fd != NULL);
5424 :
5425 786 : BufFileClose(stream_fd);
5426 :
5427 786 : stream_fd = NULL;
5428 786 : }
5429 :
5430 : /*
5431 : * stream_write_change
5432 : * Serialize a change to a file for the current toplevel transaction.
5433 : *
5434 : * The change is serialized in a simple format, with length (not including
5435 : * the length), action code (identifying the message type) and message
5436 : * contents (without the subxact TransactionId value).
5437 : */
5438 : static void
5439 215110 : stream_write_change(char action, StringInfo s)
5440 : {
5441 : int len;
5442 :
5443 : Assert(stream_fd != NULL);
5444 :
5445 : /* total on-disk size, including the action type character */
5446 215110 : len = (s->len - s->cursor) + sizeof(char);
5447 :
5448 : /* first write the size */
5449 215110 : BufFileWrite(stream_fd, &len, sizeof(len));
5450 :
5451 : /* then the action */
5452 215110 : BufFileWrite(stream_fd, &action, sizeof(action));
5453 :
5454 : /* and finally the remaining part of the buffer (after the XID) */
5455 215110 : len = (s->len - s->cursor);
5456 :
5457 215110 : BufFileWrite(stream_fd, &s->data[s->cursor], len);
5458 215110 : }
5459 :
5460 : /*
5461 : * stream_open_and_write_change
5462 : * Serialize a message to a file for the given transaction.
5463 : *
5464 : * This function is similar to stream_write_change except that it will open the
5465 : * target file if not already before writing the message and close the file at
5466 : * the end.
5467 : */
5468 : static void
5469 10 : stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
5470 : {
5471 : Assert(!in_streamed_transaction);
5472 :
5473 10 : if (!stream_fd)
5474 10 : stream_start_internal(xid, false);
5475 :
5476 10 : stream_write_change(action, s);
5477 10 : stream_stop_internal(xid);
5478 10 : }
5479 :
5480 : /*
5481 : * Sets streaming options including replication slot name and origin start
5482 : * position. Workers need these options for logical replication.
5483 : */
5484 : void
5485 804 : set_stream_options(WalRcvStreamOptions *options,
5486 : char *slotname,
5487 : XLogRecPtr *origin_startpos)
5488 : {
5489 : int server_version;
5490 :
5491 804 : options->logical = true;
5492 804 : options->startpoint = *origin_startpos;
5493 804 : options->slotname = slotname;
5494 :
5495 804 : server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
5496 804 : options->proto.logical.proto_version =
5497 804 : server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
5498 : server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
5499 : server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
5500 : LOGICALREP_PROTO_VERSION_NUM;
5501 :
5502 804 : options->proto.logical.publication_names = MySubscription->publications;
5503 804 : options->proto.logical.binary = MySubscription->binary;
5504 :
5505 : /*
5506 : * Assign the appropriate option value for streaming option according to
5507 : * the 'streaming' mode and the publisher's ability to support that mode.
5508 : */
5509 804 : if (server_version >= 160000 &&
5510 804 : MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
5511 : {
5512 736 : options->proto.logical.streaming_str = "parallel";
5513 736 : MyLogicalRepWorker->parallel_apply = true;
5514 : }
5515 68 : else if (server_version >= 140000 &&
5516 68 : MySubscription->stream != LOGICALREP_STREAM_OFF)
5517 : {
5518 52 : options->proto.logical.streaming_str = "on";
5519 52 : MyLogicalRepWorker->parallel_apply = false;
5520 : }
5521 : else
5522 : {
5523 16 : options->proto.logical.streaming_str = NULL;
5524 16 : MyLogicalRepWorker->parallel_apply = false;
5525 : }
5526 :
5527 804 : options->proto.logical.twophase = false;
5528 804 : options->proto.logical.origin = pstrdup(MySubscription->origin);
5529 804 : }
5530 :
5531 : /*
5532 : * Cleanup the memory for subxacts and reset the related variables.
5533 : */
5534 : static inline void
5535 752 : cleanup_subxact_info()
5536 : {
5537 752 : if (subxact_data.subxacts)
5538 174 : pfree(subxact_data.subxacts);
5539 :
5540 752 : subxact_data.subxacts = NULL;
5541 752 : subxact_data.subxact_last = InvalidTransactionId;
5542 752 : subxact_data.nsubxacts = 0;
5543 752 : subxact_data.nsubxacts_max = 0;
5544 752 : }
5545 :
5546 : /*
5547 : * Common function to run the apply loop with error handling. Disable the
5548 : * subscription, if necessary.
5549 : *
5550 : * Note that we don't handle FATAL errors which are probably because
5551 : * of system resource error and are not repeatable.
5552 : */
5553 : void
5554 804 : start_apply(XLogRecPtr origin_startpos)
5555 : {
5556 804 : PG_TRY();
5557 : {
5558 804 : LogicalRepApplyLoop(origin_startpos);
5559 : }
5560 146 : PG_CATCH();
5561 : {
5562 : /*
5563 : * Reset the origin state to prevent the advancement of origin
5564 : * progress if we fail to apply. Otherwise, this will result in
5565 : * transaction loss as that transaction won't be sent again by the
5566 : * server.
5567 : */
5568 146 : replorigin_reset(0, (Datum) 0);
5569 :
5570 146 : if (MySubscription->disableonerr)
5571 6 : DisableSubscriptionAndExit();
5572 : else
5573 : {
5574 : /*
5575 : * Report the worker failed while applying changes. Abort the
5576 : * current transaction so that the stats message is sent in an
5577 : * idle state.
5578 : */
5579 140 : AbortOutOfAnyTransaction();
5580 140 : pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
5581 :
5582 140 : PG_RE_THROW();
5583 : }
5584 : }
5585 0 : PG_END_TRY();
5586 0 : }
5587 :
5588 : /*
5589 : * Runs the leader apply worker.
5590 : *
5591 : * It sets up replication origin, streaming options and then starts streaming.
5592 : */
5593 : static void
5594 508 : run_apply_worker()
5595 : {
5596 : char originname[NAMEDATALEN];
5597 508 : XLogRecPtr origin_startpos = InvalidXLogRecPtr;
5598 508 : char *slotname = NULL;
5599 : WalRcvStreamOptions options;
5600 : RepOriginId originid;
5601 : TimeLineID startpointTLI;
5602 : char *err;
5603 : bool must_use_password;
5604 :
5605 508 : slotname = MySubscription->slotname;
5606 :
5607 : /*
5608 : * This shouldn't happen if the subscription is enabled, but guard against
5609 : * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
5610 : * slot is NULL.)
5611 : */
5612 508 : if (!slotname)
5613 0 : ereport(ERROR,
5614 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5615 : errmsg("subscription has no replication slot set")));
5616 :
5617 : /* Setup replication origin tracking. */
5618 508 : ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
5619 : originname, sizeof(originname));
5620 508 : StartTransactionCommand();
5621 508 : originid = replorigin_by_name(originname, true);
5622 508 : if (!OidIsValid(originid))
5623 0 : originid = replorigin_create(originname);
5624 508 : replorigin_session_setup(originid, 0);
5625 508 : replorigin_session_origin = originid;
5626 508 : origin_startpos = replorigin_session_get_progress(false);
5627 508 : CommitTransactionCommand();
5628 :
5629 : /* Is the use of a password mandatory? */
5630 980 : must_use_password = MySubscription->passwordrequired &&
5631 472 : !MySubscription->ownersuperuser;
5632 :
5633 508 : LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
5634 : true, must_use_password,
5635 : MySubscription->name, &err);
5636 :
5637 490 : if (LogRepWorkerWalRcvConn == NULL)
5638 52 : ereport(ERROR,
5639 : (errcode(ERRCODE_CONNECTION_FAILURE),
5640 : errmsg("apply worker for subscription \"%s\" could not connect to the publisher: %s",
5641 : MySubscription->name, err)));
5642 :
5643 : /*
5644 : * We don't really use the output identify_system for anything but it does
5645 : * some initializations on the upstream so let's still call it.
5646 : */
5647 438 : (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
5648 :
5649 438 : set_apply_error_context_origin(originname);
5650 :
5651 438 : set_stream_options(&options, slotname, &origin_startpos);
5652 :
5653 : /*
5654 : * Even when the two_phase mode is requested by the user, it remains as
5655 : * the tri-state PENDING until all tablesyncs have reached READY state.
5656 : * Only then, can it become ENABLED.
5657 : *
5658 : * Note: If the subscription has no tables then leave the state as
5659 : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
5660 : * work.
5661 : */
5662 470 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
5663 32 : AllTablesyncsReady())
5664 : {
5665 : /* Start streaming with two_phase enabled */
5666 18 : options.proto.logical.twophase = true;
5667 18 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
5668 :
5669 18 : StartTransactionCommand();
5670 :
5671 : /*
5672 : * Updating pg_subscription might involve TOAST table access, so
5673 : * ensure we have a valid snapshot.
5674 : */
5675 18 : PushActiveSnapshot(GetTransactionSnapshot());
5676 :
5677 18 : UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
5678 18 : MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
5679 18 : PopActiveSnapshot();
5680 18 : CommitTransactionCommand();
5681 : }
5682 : else
5683 : {
5684 420 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
5685 : }
5686 :
5687 438 : ereport(DEBUG1,
5688 : (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
5689 : MySubscription->name,
5690 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
5691 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
5692 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
5693 : "?")));
5694 :
5695 : /* Run the main loop. */
5696 438 : start_apply(origin_startpos);
5697 0 : }
5698 :
5699 : /*
5700 : * Common initialization for leader apply worker, parallel apply worker and
5701 : * tablesync worker.
5702 : *
5703 : * Initialize the database connection, in-memory subscription and necessary
5704 : * config options.
5705 : */
5706 : void
5707 1030 : InitializeLogRepWorker(void)
5708 : {
5709 : MemoryContext oldctx;
5710 :
5711 : /* Run as replica session replication role. */
5712 1030 : SetConfigOption("session_replication_role", "replica",
5713 : PGC_SUSET, PGC_S_OVERRIDE);
5714 :
5715 : /* Connect to our database. */
5716 1030 : BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
5717 1030 : MyLogicalRepWorker->userid,
5718 : 0);
5719 :
5720 : /*
5721 : * Set always-secure search path, so malicious users can't redirect user
5722 : * code (e.g. pg_index.indexprs).
5723 : */
5724 1026 : SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
5725 :
5726 : /* Load the subscription into persistent memory context. */
5727 1026 : ApplyContext = AllocSetContextCreate(TopMemoryContext,
5728 : "ApplyContext",
5729 : ALLOCSET_DEFAULT_SIZES);
5730 1026 : StartTransactionCommand();
5731 1026 : oldctx = MemoryContextSwitchTo(ApplyContext);
5732 :
5733 : /*
5734 : * Lock the subscription to prevent it from being concurrently dropped,
5735 : * then re-verify its existence. After the initialization, the worker will
5736 : * be terminated gracefully if the subscription is dropped.
5737 : */
5738 1026 : LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, 0,
5739 : AccessShareLock);
5740 1024 : MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
5741 1024 : if (!MySubscription)
5742 : {
5743 104 : ereport(LOG,
5744 : (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
5745 : MyLogicalRepWorker->subid)));
5746 :
5747 : /* Ensure we remove no-longer-useful entry for worker's start time */
5748 104 : if (am_leader_apply_worker())
5749 104 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5750 :
5751 104 : proc_exit(0);
5752 : }
5753 :
5754 920 : MySubscriptionValid = true;
5755 920 : MemoryContextSwitchTo(oldctx);
5756 :
5757 920 : if (!MySubscription->enabled)
5758 : {
5759 0 : ereport(LOG,
5760 : (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
5761 : MySubscription->name)));
5762 :
5763 0 : apply_worker_exit();
5764 : }
5765 :
5766 : /*
5767 : * Restart the worker if retain_dead_tuples was enabled during startup.
5768 : *
5769 : * At this point, the replication slot used for conflict detection might
5770 : * not exist yet, or could be dropped soon if the launcher perceives
5771 : * retain_dead_tuples as disabled. To avoid unnecessary tracking of
5772 : * oldest_nonremovable_xid when the slot is absent or at risk of being
5773 : * dropped, a restart is initiated.
5774 : *
5775 : * The oldest_nonremovable_xid should be initialized only when the
5776 : * subscription's retention is active before launching the worker. See
5777 : * logicalrep_worker_launch.
5778 : */
5779 920 : if (am_leader_apply_worker() &&
5780 508 : MySubscription->retaindeadtuples &&
5781 30 : MySubscription->retentionactive &&
5782 30 : !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid))
5783 : {
5784 0 : ereport(LOG,
5785 : errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup",
5786 : MySubscription->name, "retain_dead_tuples"));
5787 :
5788 0 : apply_worker_exit();
5789 : }
5790 :
5791 : /* Setup synchronous commit according to the user's wishes */
5792 920 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
5793 : PGC_BACKEND, PGC_S_OVERRIDE);
5794 :
5795 : /*
5796 : * Keep us informed about subscription or role changes. Note that the
5797 : * role's superuser privilege can be revoked.
5798 : */
5799 920 : CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
5800 : subscription_change_cb,
5801 : (Datum) 0);
5802 :
5803 920 : CacheRegisterSyscacheCallback(AUTHOID,
5804 : subscription_change_cb,
5805 : (Datum) 0);
5806 :
5807 920 : if (am_tablesync_worker())
5808 392 : ereport(LOG,
5809 : (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
5810 : MySubscription->name,
5811 : get_rel_name(MyLogicalRepWorker->relid))));
5812 : else
5813 528 : ereport(LOG,
5814 : (errmsg("logical replication apply worker for subscription \"%s\" has started",
5815 : MySubscription->name)));
5816 :
5817 920 : CommitTransactionCommand();
5818 920 : }
5819 :
5820 : /*
5821 : * Reset the origin state.
5822 : */
5823 : static void
5824 1046 : replorigin_reset(int code, Datum arg)
5825 : {
5826 1046 : replorigin_session_origin = InvalidRepOriginId;
5827 1046 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
5828 1046 : replorigin_session_origin_timestamp = 0;
5829 1046 : }
5830 :
5831 : /* Common function to setup the leader apply or tablesync worker. */
5832 : void
5833 1010 : SetupApplyOrSyncWorker(int worker_slot)
5834 : {
5835 : /* Attach to slot */
5836 1010 : logicalrep_worker_attach(worker_slot);
5837 :
5838 : Assert(am_tablesync_worker() || am_leader_apply_worker());
5839 :
5840 : /* Setup signal handling */
5841 1010 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
5842 1010 : pqsignal(SIGTERM, die);
5843 1010 : BackgroundWorkerUnblockSignals();
5844 :
5845 : /*
5846 : * We don't currently need any ResourceOwner in a walreceiver process, but
5847 : * if we did, we could call CreateAuxProcessResourceOwner here.
5848 : */
5849 :
5850 : /* Initialise stats to a sanish value */
5851 1010 : MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
5852 1010 : MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
5853 :
5854 : /* Load the libpq-specific functions */
5855 1010 : load_file("libpqwalreceiver", false);
5856 :
5857 1010 : InitializeLogRepWorker();
5858 :
5859 : /*
5860 : * Register a callback to reset the origin state before aborting any
5861 : * pending transaction during shutdown (see ShutdownPostgres()). This will
5862 : * avoid origin advancement for an in-complete transaction which could
5863 : * otherwise lead to its loss as such a transaction won't be sent by the
5864 : * server again.
5865 : *
5866 : * Note that even a LOG or DEBUG statement placed after setting the origin
5867 : * state may process a shutdown signal before committing the current apply
5868 : * operation. So, it is important to register such a callback here.
5869 : */
5870 900 : before_shmem_exit(replorigin_reset, (Datum) 0);
5871 :
5872 : /* Connect to the origin and start the replication. */
5873 900 : elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
5874 : MySubscription->conninfo);
5875 :
5876 : /*
5877 : * Setup callback for syscache so that we know when something changes in
5878 : * the subscription relation state.
5879 : */
5880 900 : CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
5881 : invalidate_syncing_table_states,
5882 : (Datum) 0);
5883 900 : }
5884 :
5885 : /* Logical Replication Apply worker entry point */
5886 : void
5887 618 : ApplyWorkerMain(Datum main_arg)
5888 : {
5889 618 : int worker_slot = DatumGetInt32(main_arg);
5890 :
5891 618 : InitializingApplyWorker = true;
5892 :
5893 618 : SetupApplyOrSyncWorker(worker_slot);
5894 :
5895 508 : InitializingApplyWorker = false;
5896 :
5897 508 : run_apply_worker();
5898 :
5899 0 : proc_exit(0);
5900 : }
5901 :
5902 : /*
5903 : * After error recovery, disable the subscription in a new transaction
5904 : * and exit cleanly.
5905 : */
5906 : void
5907 8 : DisableSubscriptionAndExit(void)
5908 : {
5909 : /*
5910 : * Emit the error message, and recover from the error state to an idle
5911 : * state
5912 : */
5913 8 : HOLD_INTERRUPTS();
5914 :
5915 8 : EmitErrorReport();
5916 8 : AbortOutOfAnyTransaction();
5917 8 : FlushErrorState();
5918 :
5919 8 : RESUME_INTERRUPTS();
5920 :
5921 : /* Report the worker failed during either table synchronization or apply */
5922 8 : pgstat_report_subscription_error(MyLogicalRepWorker->subid,
5923 8 : !am_tablesync_worker());
5924 :
5925 : /* Disable the subscription */
5926 8 : StartTransactionCommand();
5927 :
5928 : /*
5929 : * Updating pg_subscription might involve TOAST table access, so ensure we
5930 : * have a valid snapshot.
5931 : */
5932 8 : PushActiveSnapshot(GetTransactionSnapshot());
5933 :
5934 8 : DisableSubscription(MySubscription->oid);
5935 8 : PopActiveSnapshot();
5936 8 : CommitTransactionCommand();
5937 :
5938 : /* Ensure we remove no-longer-useful entry for worker's start time */
5939 8 : if (am_leader_apply_worker())
5940 6 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5941 :
5942 : /* Notify the subscription has been disabled and exit */
5943 8 : ereport(LOG,
5944 : errmsg("subscription \"%s\" has been disabled because of an error",
5945 : MySubscription->name));
5946 :
5947 : /*
5948 : * Skip the track_commit_timestamp check when disabling the worker due to
5949 : * an error, as verifying commit timestamps is unnecessary in this
5950 : * context.
5951 : */
5952 8 : CheckSubDeadTupleRetention(false, true, WARNING,
5953 8 : MySubscription->retaindeadtuples,
5954 8 : MySubscription->retentionactive, false);
5955 :
5956 8 : proc_exit(0);
5957 : }
5958 :
5959 : /*
5960 : * Is current process a logical replication worker?
5961 : */
5962 : bool
5963 4036 : IsLogicalWorker(void)
5964 : {
5965 4036 : return MyLogicalRepWorker != NULL;
5966 : }
5967 :
5968 : /*
5969 : * Is current process a logical replication parallel apply worker?
5970 : */
5971 : bool
5972 2770 : IsLogicalParallelApplyWorker(void)
5973 : {
5974 2770 : return IsLogicalWorker() && am_parallel_apply_worker();
5975 : }
5976 :
5977 : /*
5978 : * Start skipping changes of the transaction if the given LSN matches the
5979 : * LSN specified by subscription's skiplsn.
5980 : */
5981 : static void
5982 1048 : maybe_start_skipping_changes(XLogRecPtr finish_lsn)
5983 : {
5984 : Assert(!is_skipping_changes());
5985 : Assert(!in_remote_transaction);
5986 : Assert(!in_streamed_transaction);
5987 :
5988 : /*
5989 : * Quick return if it's not requested to skip this transaction. This
5990 : * function is called for every remote transaction and we assume that
5991 : * skipping the transaction is not used often.
5992 : */
5993 1048 : if (likely(XLogRecPtrIsInvalid(MySubscription->skiplsn) ||
5994 : MySubscription->skiplsn != finish_lsn))
5995 1042 : return;
5996 :
5997 : /* Start skipping all changes of this transaction */
5998 6 : skip_xact_finish_lsn = finish_lsn;
5999 :
6000 6 : ereport(LOG,
6001 : errmsg("logical replication starts skipping transaction at LSN %X/%08X",
6002 : LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
6003 : }
6004 :
6005 : /*
6006 : * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
6007 : */
6008 : static void
6009 54 : stop_skipping_changes(void)
6010 : {
6011 54 : if (!is_skipping_changes())
6012 48 : return;
6013 :
6014 6 : ereport(LOG,
6015 : errmsg("logical replication completed skipping transaction at LSN %X/%08X",
6016 : LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
6017 :
6018 : /* Stop skipping changes */
6019 6 : skip_xact_finish_lsn = InvalidXLogRecPtr;
6020 : }
6021 :
6022 : /*
6023 : * Clear subskiplsn of pg_subscription catalog.
6024 : *
6025 : * finish_lsn is the transaction's finish LSN that is used to check if the
6026 : * subskiplsn matches it. If not matched, we raise a warning when clearing the
6027 : * subskiplsn in order to inform users for cases e.g., where the user mistakenly
6028 : * specified the wrong subskiplsn.
6029 : */
6030 : static void
6031 1052 : clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
6032 : {
6033 : Relation rel;
6034 : Form_pg_subscription subform;
6035 : HeapTuple tup;
6036 1052 : XLogRecPtr myskiplsn = MySubscription->skiplsn;
6037 1052 : bool started_tx = false;
6038 :
6039 1052 : if (likely(XLogRecPtrIsInvalid(myskiplsn)) || am_parallel_apply_worker())
6040 1046 : return;
6041 :
6042 6 : if (!IsTransactionState())
6043 : {
6044 2 : StartTransactionCommand();
6045 2 : started_tx = true;
6046 : }
6047 :
6048 : /*
6049 : * Updating pg_subscription might involve TOAST table access, so ensure we
6050 : * have a valid snapshot.
6051 : */
6052 6 : PushActiveSnapshot(GetTransactionSnapshot());
6053 :
6054 : /*
6055 : * Protect subskiplsn of pg_subscription from being concurrently updated
6056 : * while clearing it.
6057 : */
6058 6 : LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
6059 : AccessShareLock);
6060 :
6061 6 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
6062 :
6063 : /* Fetch the existing tuple. */
6064 6 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
6065 : ObjectIdGetDatum(MySubscription->oid));
6066 :
6067 6 : if (!HeapTupleIsValid(tup))
6068 0 : elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
6069 :
6070 6 : subform = (Form_pg_subscription) GETSTRUCT(tup);
6071 :
6072 : /*
6073 : * Clear the subskiplsn. If the user has already changed subskiplsn before
6074 : * clearing it we don't update the catalog and the replication origin
6075 : * state won't get advanced. So in the worst case, if the server crashes
6076 : * before sending an acknowledgment of the flush position the transaction
6077 : * will be sent again and the user needs to set subskiplsn again. We can
6078 : * reduce the possibility by logging a replication origin WAL record to
6079 : * advance the origin LSN instead but there is no way to advance the
6080 : * origin timestamp and it doesn't seem to be worth doing anything about
6081 : * it since it's a very rare case.
6082 : */
6083 6 : if (subform->subskiplsn == myskiplsn)
6084 : {
6085 : bool nulls[Natts_pg_subscription];
6086 : bool replaces[Natts_pg_subscription];
6087 : Datum values[Natts_pg_subscription];
6088 :
6089 6 : memset(values, 0, sizeof(values));
6090 6 : memset(nulls, false, sizeof(nulls));
6091 6 : memset(replaces, false, sizeof(replaces));
6092 :
6093 : /* reset subskiplsn */
6094 6 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
6095 6 : replaces[Anum_pg_subscription_subskiplsn - 1] = true;
6096 :
6097 6 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
6098 : replaces);
6099 6 : CatalogTupleUpdate(rel, &tup->t_self, tup);
6100 :
6101 6 : if (myskiplsn != finish_lsn)
6102 0 : ereport(WARNING,
6103 : errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
6104 : errdetail("Remote transaction's finish WAL location (LSN) %X/%08X did not match skip-LSN %X/%08X.",
6105 : LSN_FORMAT_ARGS(finish_lsn),
6106 : LSN_FORMAT_ARGS(myskiplsn)));
6107 : }
6108 :
6109 6 : heap_freetuple(tup);
6110 6 : table_close(rel, NoLock);
6111 :
6112 6 : PopActiveSnapshot();
6113 :
6114 6 : if (started_tx)
6115 2 : CommitTransactionCommand();
6116 : }
6117 :
6118 : /* Error callback to give more context info about the change being applied */
6119 : void
6120 13104 : apply_error_callback(void *arg)
6121 : {
6122 13104 : ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
6123 :
6124 13104 : if (apply_error_callback_arg.command == 0)
6125 10556 : return;
6126 :
6127 : Assert(errarg->origin_name);
6128 :
6129 2548 : if (errarg->rel == NULL)
6130 : {
6131 668 : if (!TransactionIdIsValid(errarg->remote_xid))
6132 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
6133 : errarg->origin_name,
6134 : logicalrep_message_type(errarg->command));
6135 668 : else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
6136 538 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
6137 : errarg->origin_name,
6138 : logicalrep_message_type(errarg->command),
6139 : errarg->remote_xid);
6140 : else
6141 260 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%08X",
6142 : errarg->origin_name,
6143 : logicalrep_message_type(errarg->command),
6144 : errarg->remote_xid,
6145 130 : LSN_FORMAT_ARGS(errarg->finish_lsn));
6146 : }
6147 : else
6148 : {
6149 1880 : if (errarg->remote_attnum < 0)
6150 : {
6151 1880 : if (XLogRecPtrIsInvalid(errarg->finish_lsn))
6152 3504 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
6153 : errarg->origin_name,
6154 : logicalrep_message_type(errarg->command),
6155 1752 : errarg->rel->remoterel.nspname,
6156 1752 : errarg->rel->remoterel.relname,
6157 : errarg->remote_xid);
6158 : else
6159 256 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%08X",
6160 : errarg->origin_name,
6161 : logicalrep_message_type(errarg->command),
6162 128 : errarg->rel->remoterel.nspname,
6163 128 : errarg->rel->remoterel.relname,
6164 : errarg->remote_xid,
6165 128 : LSN_FORMAT_ARGS(errarg->finish_lsn));
6166 : }
6167 : else
6168 : {
6169 0 : if (XLogRecPtrIsInvalid(errarg->finish_lsn))
6170 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
6171 : errarg->origin_name,
6172 : logicalrep_message_type(errarg->command),
6173 0 : errarg->rel->remoterel.nspname,
6174 0 : errarg->rel->remoterel.relname,
6175 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
6176 : errarg->remote_xid);
6177 : else
6178 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%08X",
6179 : errarg->origin_name,
6180 : logicalrep_message_type(errarg->command),
6181 0 : errarg->rel->remoterel.nspname,
6182 0 : errarg->rel->remoterel.relname,
6183 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
6184 : errarg->remote_xid,
6185 0 : LSN_FORMAT_ARGS(errarg->finish_lsn));
6186 : }
6187 : }
6188 : }
6189 :
6190 : /* Set transaction information of apply error callback */
6191 : static inline void
6192 5850 : set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
6193 : {
6194 5850 : apply_error_callback_arg.remote_xid = xid;
6195 5850 : apply_error_callback_arg.finish_lsn = lsn;
6196 5850 : }
6197 :
6198 : /* Reset all information of apply error callback */
6199 : static inline void
6200 2872 : reset_apply_error_context_info(void)
6201 : {
6202 2872 : apply_error_callback_arg.command = 0;
6203 2872 : apply_error_callback_arg.rel = NULL;
6204 2872 : apply_error_callback_arg.remote_attnum = -1;
6205 2872 : set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
6206 2872 : }
6207 :
6208 : /*
6209 : * Request wakeup of the workers for the given subscription OID
6210 : * at commit of the current transaction.
6211 : *
6212 : * This is used to ensure that the workers process assorted changes
6213 : * as soon as possible.
6214 : */
6215 : void
6216 440 : LogicalRepWorkersWakeupAtCommit(Oid subid)
6217 : {
6218 : MemoryContext oldcxt;
6219 :
6220 440 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
6221 440 : on_commit_wakeup_workers_subids =
6222 440 : list_append_unique_oid(on_commit_wakeup_workers_subids, subid);
6223 440 : MemoryContextSwitchTo(oldcxt);
6224 440 : }
6225 :
6226 : /*
6227 : * Wake up the workers of any subscriptions that were changed in this xact.
6228 : */
6229 : void
6230 1034344 : AtEOXact_LogicalRepWorkers(bool isCommit)
6231 : {
6232 1034344 : if (isCommit && on_commit_wakeup_workers_subids != NIL)
6233 : {
6234 : ListCell *lc;
6235 :
6236 430 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
6237 860 : foreach(lc, on_commit_wakeup_workers_subids)
6238 : {
6239 430 : Oid subid = lfirst_oid(lc);
6240 : List *workers;
6241 : ListCell *lc2;
6242 :
6243 430 : workers = logicalrep_workers_find(subid, true, false);
6244 562 : foreach(lc2, workers)
6245 : {
6246 132 : LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
6247 :
6248 132 : logicalrep_worker_wakeup_ptr(worker);
6249 : }
6250 : }
6251 430 : LWLockRelease(LogicalRepWorkerLock);
6252 : }
6253 :
6254 : /* The List storage will be reclaimed automatically in xact cleanup. */
6255 1034344 : on_commit_wakeup_workers_subids = NIL;
6256 1034344 : }
6257 :
6258 : /*
6259 : * Allocate the origin name in long-lived context for error context message.
6260 : */
6261 : void
6262 824 : set_apply_error_context_origin(char *originname)
6263 : {
6264 824 : apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
6265 : originname);
6266 824 : }
6267 :
6268 : /*
6269 : * Return the action to be taken for the given transaction. See
6270 : * TransApplyAction for information on each of the actions.
6271 : *
6272 : * *winfo is assigned to the destination parallel worker info when the leader
6273 : * apply worker has to pass all the transaction's changes to the parallel
6274 : * apply worker.
6275 : */
6276 : static TransApplyAction
6277 652560 : get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
6278 : {
6279 652560 : *winfo = NULL;
6280 :
6281 652560 : if (am_parallel_apply_worker())
6282 : {
6283 137860 : return TRANS_PARALLEL_APPLY;
6284 : }
6285 :
6286 : /*
6287 : * If we are processing this transaction using a parallel apply worker
6288 : * then either we send the changes to the parallel worker or if the worker
6289 : * is busy then serialize the changes to the file which will later be
6290 : * processed by the parallel worker.
6291 : */
6292 514700 : *winfo = pa_find_worker(xid);
6293 :
6294 514700 : if (*winfo && (*winfo)->serialize_changes)
6295 : {
6296 10074 : return TRANS_LEADER_PARTIAL_SERIALIZE;
6297 : }
6298 504626 : else if (*winfo)
6299 : {
6300 137828 : return TRANS_LEADER_SEND_TO_PARALLEL;
6301 : }
6302 :
6303 : /*
6304 : * If there is no parallel worker involved to process this transaction
6305 : * then we either directly apply the change or serialize it to a file
6306 : * which will later be applied when the transaction finish message is
6307 : * processed.
6308 : */
6309 366798 : else if (in_streamed_transaction)
6310 : {
6311 206400 : return TRANS_LEADER_SERIALIZE;
6312 : }
6313 : else
6314 : {
6315 160398 : return TRANS_LEADER_APPLY;
6316 : }
6317 : }
|